From c6ba1eaa3c5a44e4a9f6d072cb95fcf7e68ba3d6 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 9 Mar 2017 08:19:06 -0500 Subject: [PATCH] fixup! Logical replication support for initial data copy --- doc/src/sgml/catalogs.sgml | 9 +++++---- doc/src/sgml/logical-replication.sgml | 18 +++++++++--------- doc/src/sgml/monitoring.sgml | 4 ++-- doc/src/sgml/protocol.sgml | 14 +++++++------- doc/src/sgml/ref/alter_subscription.sgml | 8 ++++---- doc/src/sgml/ref/create_subscription.sgml | 13 ++++++------- src/backend/catalog/pg_subscription.c | 13 ++++--------- src/backend/replication/logical/launcher.c | 18 +++++++----------- src/backend/replication/logical/snapbuild.c | 6 +++--- src/backend/replication/logical/worker.c | 4 ++-- src/backend/replication/repl_gram.y | 1 + src/backend/replication/walsender.c | 2 +- src/backend/tcop/postgres.c | 8 +++++--- 13 files changed, 56 insertions(+), 62 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index f587a08b6a..ab78585035 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -302,7 +302,7 @@ System Catalogs pg_subscription_rel - relation state mapping for subscriptions + relation state for subscriptions @@ -6429,14 +6429,14 @@ <structname>pg_subscription_rel</structname> The catalog pg_subscription_rel contains the - status for each replicated relation in each subscription. This is a + state for each replicated relation in each subscription. This is a many-to-many mapping. - This catalog only contains tables known to subscription after running + This catalog only contains tables known to the subscription after running either CREATE SUBSCRIPTION or - ALTER SUBSCRIPTION ... REFRESH commands. + ALTER SUBSCRIPTION ... REFRESH. @@ -6472,6 +6472,7 @@ <structname>pg_subscription_rel</structname> Columnschar + State code: i = initialize, d = data is being copied, s = synchronized, diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f75304cd91..4ec6bb49b7 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -24,8 +24,8 @@ Logical Replication - Logical replication typically starts with a snapshot of the data on - the publisher database. Once that is done, the changes on the publisher + Logical replication typically starts with a taking a snapshot of the data on + the publisher database and copying that to the subscriber. Once that is done, the changes on the publisher are sent to the subscriber as they occur in real-time. The subscriber applies the data in the same order as the publisher so that transactional consistency is guaranteed for publications within a single subscription. @@ -162,7 +162,7 @@ Subscription Each subscription will receive changes via one replication slot (see ). Additional temporary - replication slots may be required for the initial data synchronizations + replication slots may be required for the initial data synchronization of pre-existing table data. @@ -308,7 +308,7 @@ Monitoring Normally, there is a single apply process running for an enabled subscription. A disabled subscription or a crashed subscription will have zero rows in this view. If the initial data synchronization of any - table is in progress there will be additional workers for the tables + table is in progress, there will be additional workers for the tables being synchronized. @@ -355,8 +355,8 @@ Configuration Settings On the publisher side, wal_level must be set to logical, and max_replication_slots - must be set to at least the number of subscriptions expected to connect - with some reserve for table synchronization. And + must be set to at least the number of subscriptions expected to connect, + plus some reserve for table synchronization. And max_wal_senders should be set to at least the same as max_replication_slots plus the number of physical replicas that are connected at the same time. @@ -367,7 +367,7 @@ Configuration Settings to be set. In this case it should be set to at least the number of subscriptions that will be added to the subscriber. max_logical_replication_workers must be set to at - least the number of subscriptions again with some reserve for the table + least the number of subscriptions, again plus some reserve for the table synchronization. Additionally the max_worker_processes may need to be adjusted to accommodate for replication workers, at least (max_logical_replication_workers @@ -413,8 +413,8 @@ Quick Setup The above will start the replication process, which synchronizes the - initial table contents of users and - departments tables and then starts replicating + initial table contents of the tables users and + departments and then starts replicating incremental changes to those tables. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3d3761ec96..88340316cd 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1587,8 +1587,8 @@ <structname>pg_stat_subscription</structname> View relid Oid - Relation id which the worker is synchronizing, this is always - NULL for the main apply worker + OID of the relation that the worker is synchronizing; null for the + main apply worker received_lsn diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 99f1f1f8b7..15c1d8d1db 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1545,14 +1545,14 @@ Streaming Replication Protocol USE_SNAPSHOT - Decides what to do with snapshot created during logical slot - initialization. The EXPORT_SNAPSHOT (which is the - default) will export the snapshot for use in other sessions. This - option can't be used inside a transaction. The - USE_SNAPSHOT will use the snapshot for current + Decides what to do with the snapshot created during logical slot + initialization. EXPORT_SNAPSHOT, which is the + default, will export the snapshot for use in other sessions. This + option can't be used inside a transaction. + USE_SNAPSHOT will use the snapshot for the current transaction executing the command. This option must be used in a - transaction and the CREATE_REPLICATION_SLOT must - be the first command run in that transaction. Finally + transaction, and CREATE_REPLICATION_SLOT must + be the first command run in that transaction. Finally, NOEXPORT_SNAPSHOT will just use the snapshot for logical decoding as normal but won't do anything else with it. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index b34386d3c1..e74614a74a 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -99,14 +99,14 @@ Parameters Fetch missing table info from publisher. This will start replication - of tables that were added to subscribed publications since last - invocation of REFRESH PUBLICATION or since the + of tables that were added to the subscribed-to publications since the last + invocation of REFRESH PUBLICATION or since CREATE SUBSCRIPTION. The COPY DATA and NOCOPY DATA - options specify if the existing data in the publication that are being - subscribed should be copied. COPY DATA is the + options specify if the existing data in the publications that are being + subscribed to should be copied. COPY DATA is the default. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 91127ead88..6468470039 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -137,8 +137,8 @@ Parameters NOCOPY DATA - Specifies if the existing data in the publication that are being - subscribed should be copied once the replication starts. + Specifies if the existing data in the publications that are being + subscribed to should be copied once the replication starts. COPY DATA is the default. @@ -148,18 +148,18 @@ Parameters NOCONNECT - Instructs the CREATE SUBSCRIPTION to skip initial + Instructs CREATE SUBSCRIPTION to skip the initial connection to the provider. This will change default values of other options to DISABLED, - NOCREATE SLOT and NOCOPY DATA. + NOCREATE SLOT, and NOCOPY DATA. It's not allowed to combine NOCONNECT and - ENABLED, CREATE SLOT or + ENABLED, CREATE SLOT, or COPY DATA. - Since no connection is made when this option is specified the tables + Since no connection is made when this option is specified, the tables are not subscribed, so after you enable the subscription nothing will be replicated. It is required to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION in order for @@ -167,7 +167,6 @@ Parameters - diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8850b7eff1..d90c673952 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -263,19 +263,17 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, /* Update the tuple. */ memset(values, 0, sizeof(values)); - memset(nulls, true, sizeof(nulls)); + memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - nulls[Anum_pg_subscription_rel_srsubstate - 1] = false; values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; if (sublsn != InvalidXLogRecPtr) - { - nulls[Anum_pg_subscription_rel_srsublsn - 1] = false; values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - } + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); @@ -289,9 +287,6 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, /* Cleanup. */ heap_close(rel, NoLock); - /* Make the changes visible. */ - CommandCounterIncrement(); - return subrelid; } @@ -323,7 +318,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, { heap_close(rel, RowExclusiveLock); *sublsn = InvalidXLogRecPtr; - return '\0'; + return SUBREL_STATE_UNKNOWN; } ereport(ERROR, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3e724de5f1..06d5509fd3 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -296,15 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; bgw.bgw_main = ApplyWorkerMain; - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u", subid); - if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker %u sync %u", subid, relid); + "logical replication worker for subscription %u sync %u", subid, relid); else snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker %u", subid); + "logical replication worker for subscription %u", subid); bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; @@ -434,7 +431,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); if (worker) - SetLatch(&worker->proc->procLatch); + logicalrep_worker_wakeup_ptr(worker); } /* @@ -443,8 +440,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { - if (worker) - SetLatch(&worker->proc->procLatch); + SetLatch(&worker->proc->procLatch); } /* @@ -817,10 +813,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) MemSet(nulls, 0, sizeof(nulls)); values[0] = ObjectIdGetDatum(worker.subid); - if (!OidIsValid(worker.relid)) - nulls[1] = true; - else + if (OidIsValid(worker.relid)) values[1] = ObjectIdGetDatum(worker.relid); + else + nulls[1] = true; values[2] = Int32GetDatum(worker_pid); if (XLogRecPtrIsInvalid(worker.last_lsn)) nulls[3] = true; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 3f0abd7ce2..de90777cf9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -515,14 +515,14 @@ SnapBuildInitalSnapshot(SnapBuild *builder) Assert(XactIsoLevel = XACT_REPEATABLE_READ); if (builder->state != SNAPBUILD_CONSISTENT) - elog(ERROR, "cannot build and initial slot snapshot before reaching a consistent state"); + elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state"); if (!builder->committed.includes_all_transactions) - elog(ERROR, "cannot build and initial slot snapshot, not all transactions are monitored anymore"); + elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore"); /* so we don't overwrite the existing value */ if (TransactionIdIsValid(MyPgXact->xmin)) - elog(ERROR, "cannot build and initial slot snapshot when MyPgXact->xmin already is valid"); + elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid"); snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId()); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5383364011..e7fda70b75 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -109,8 +109,8 @@ WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; -static char *myslotname = NULL; -bool in_remote_transaction = false; +static char *myslotname = NULL; +bool in_remote_transaction = false; static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0755b88f5a..5990be52db 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -27,6 +27,7 @@ Node *replication_parse_result; static SQLCmd *make_sqlcmd(void); + /* * Bison doesn't allocate anything that needs to live across parser calls, * so we can easily have it use palloc instead of malloc. This prevents diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 02e1652f11..b4c7f73cf5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1380,7 +1380,7 @@ exec_replication_command(const char *cmd_string) * For aborted transactions, don't allow anything except pure SQL, * the exec_simple_query() will handle it correctly. */ - if (IsAbortedTransactionBlockState() && cmd_node->type != T_SQLCmd) + if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) ereport(ERROR, (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, " diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index b54ad50aae..ba41f90712 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4053,7 +4053,6 @@ PostgresMain(int argc, char *argv[], case 'Q': /* simple query */ { const char *query_string; - bool walsender_query = false; /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4062,8 +4061,11 @@ PostgresMain(int argc, char *argv[], pq_getmsgend(&input_message); if (am_walsender) - walsender_query = exec_replication_command(query_string); - if (!walsender_query) + { + if (!exec_replication_command(query_string)) + exec_simple_query(query_string); + } + else exec_simple_query(query_string); send_ready_for_query = true; -- 2.12.0