From dcb359a4be847c6dfc551be500201b5d34c3cbff Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Thu, 17 Aug 2023 12:05:54 +0530 Subject: [PATCH v14 2/2] Add logical slot sync capability to physical standby For max number of slot-synchronization workers, new GUC max_slot_sync_workers has been added, default value and max value is kept at 2 and 50 respectively for this PoC patch. This is not a run-time modifiable GUC. For slots to be synchronised, another GUC is added: synchronize_slot_names: This is a runtime modifiable GUC. Now replication launcher on physical standby queries primary to get list of dbids which belong to slots mentioned in GUC 'synchronize_slot_names'. Once it gets the dbids, if dbids < max_slot_sync_workers, it starts only that many workers and if dbids > max_slot_sync_workers, it starts max_slot_sync_workers and divides the work equally among them. Each worker is then responsible to keep on syncing all the slots belonging to the DBs assigned to it. Let us say slots mentioned in 'synchronize_slot_names' on primary belongs to 10 DBs and say the new GUC is set at default value of 2, then each worker will manage 5 dbs and will keep on synching the slots for them. If a new DB is found by replication launcher, it will assign this new db to the worker handling the minimum number of dbs currently (or first worker in case of equal count) Each worker slot will have its own dbids list. Since the upper limit of this dbid-count is not known, it needs to be handled using dsa. We initially allocate memory to hold 100 dbids for each worker. If this limit is exhausted, we reallocate this memory with size incremented again by 100. The naptime of worker is tuned as per the activity on primary. Each worker starts with naptime of 10ms and if no activity is observed on primary for some time, then naptime is increased to 10sec. And if activity is observed again, naptime is reduced back to 10ms. Each worker does it by choosing one slot (first one assigned to it) for monitoring purpose. If there is no change in lsn of that slot for say over 5 sync-checks, naptime is increased to 10sec and as soon as a change is observed, naptime is reduced back to 10ms. --- src/backend/postmaster/bgworker.c | 3 + .../libpqwalreceiver/libpqwalreceiver.c | 79 ++ src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 840 ++++++++++++++++-- .../replication/logical/logicalfuncs.c | 9 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 787 ++++++++++++++++ src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/repl_gram.y | 32 +- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 3 +- src/backend/replication/walsender.c | 104 +++ src/backend/storage/lmgr/lwlocknames.txt | 1 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 18 +- src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/commands/subscriptioncmds.h | 4 + src/include/nodes/replnodes.h | 9 + src/include/postmaster/bgworker_internals.h | 1 + src/include/replication/logicallauncher.h | 4 + src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 7 +- src/include/replication/walreceiver.h | 22 + src/include/replication/worker_internal.h | 50 +- src/include/storage/lwlock.h | 3 +- src/test/recovery/meson.build | 1 + src/test/recovery/t/051_slot_sync.pl | 132 +++ 27 files changed, 2043 insertions(+), 74 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c mode change 100644 => 100755 src/backend/replication/walsender.c create mode 100644 src/test/recovery/t/051_slot_sync.pl diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 505e38376c..216287d56a 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -129,6 +129,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncMain", ReplSlotSyncMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..b7b79d667c 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,8 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static List *libpqrcv_list_db_for_logical_slots(WalReceiverConn *conn, + const char *slot_names); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -96,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_list_db_for_logical_slots = libpqrcv_list_db_for_logical_slots, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -409,6 +413,81 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * List DB for logical slots + * + * It gets the list of unique DBIDs for logical slots mentioned in slot_names + * from primary. + */ +static List * +libpqrcv_list_db_for_logical_slots(WalReceiverConn *conn, + const char *slot_names) +{ + PGresult *res; + List *slotlist = NIL; + int ntuples; + StringInfoData s; + WalRecvReplicationSlotDbData *slot_data; + + initStringInfo(&s); + appendStringInfoString(&s, "LIST_DBID_FOR_LOGICAL_SLOTS"); + + if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + + appendStringInfoChar(&s, ' '); + rawname = pstrdup(slot_names); + SplitIdentifierString(rawname, ',', &namelist); + foreach (lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_identifier(lfirst(lc))); + } + } + + res = libpqrcv_PQexec(conn->streamConn, s.data); + pfree(s.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of slots the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQnfields(res) < 1) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, " + "expected %d or more fields.", + nfields, 1))); + } + + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + + slot_data = palloc0(sizeof(WalRecvReplicationSlotDbData)); + if (!PQgetisnull(res, i, 0)) + slot_data->database = atooid(PQgetvalue(res, i, 0)); + + slot_data->last_sync_time = 0; + slotlist = lappend(slotlist, slot_data); + } + + PQclear(res); + + return slotlist; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7882fc91ce..7a01f2da48 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -22,6 +22,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_authid.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -57,6 +58,17 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_slot_sync_workers = 2; + +/* + * Initial and incremental allocation size for dbids array for each + * SlotSyncWorker in dynamic shared memory i.e. we start with this size + * and once it is exhausted, dbids is rellocated with size incremented + * by ALLOC_DB_PER_WORKER + */ +#define ALLOC_DB_PER_WORKER 100 + +SlotSyncWorker *MySlotSyncWorker = NULL; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -70,6 +82,7 @@ typedef struct LogicalRepCtxStruct dshash_table_handle last_start_dsh; /* Background workers. */ + SlotSyncWorker *ss_workers; /* slot sync workers */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -107,7 +120,6 @@ static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); - /* * Load the list of subscriptions. * @@ -931,7 +943,16 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + + /* + * The launcher now takes care of launching both logical apply workers and + * logical slot sync workers. Thus to cater to the requirements of both, + * start it as soon as a consistent state is reached. This will help + * slot-sync workers to start timely on a physical standby while on a + * non-standby server, it holds same meaning as that of + * BgWorkerStart_RecoveryFinished. + */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -953,6 +974,7 @@ void ApplyLauncherShmemInit(void) { bool found; + bool foundSlotSync; LogicalRepCtx = (LogicalRepCtxStruct *) ShmemInitStruct("Logical Replication Launcher Data", @@ -977,6 +999,24 @@ ApplyLauncherShmemInit(void) SpinLockInit(&worker->relmutex); } } + + /* Allocate shared-memory for slot-sync workers pool now */ + LogicalRepCtx->ss_workers = (SlotSyncWorker *) + ShmemInitStruct("Replication slot synchronization workers", + mul_size(max_slot_sync_workers, sizeof(SlotSyncWorker)), + &foundSlotSync); + + if (!foundSlotSync) + { + int slot; + + for (slot = 0; slot < max_slot_sync_workers; slot++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[slot]; + + memset(worker, 0, sizeof(SlotSyncWorker)); + } + } } /* @@ -1114,6 +1154,731 @@ ApplyLauncherWakeup(void) kill(LogicalRepCtx->launcher_pid, SIGUSR1); } +/* + * Clean up slot-sync worker info. + */ +static void +slotsync_worker_cleanup(SlotSyncWorker * worker) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_EXCLUSIVE)); + + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; + worker->userid = InvalidOid; + worker->slot = -1; + + if (DsaPointerIsValid(worker->dbids_dp)) + { + dsa_free(worker->dbids_dsa, worker->dbids_dp); + worker->dbids_dp = InvalidDsaPointer; + } + + if (worker->dbids_dsa) + { + dsa_detach(worker->dbids_dsa); + worker->dbids_dsa = NULL; + } + + worker->dbcount = 0; + + MemSet(NameStr(worker->monitor.slot_name), 0, NAMEDATALEN); + worker->monitor.inactivity_count = 0; + worker->monitor.confirmed_lsn = 0; +} + +/* + * Attach Slot-sync worker to worker-slot assigned by launcher. + */ +void +slotsync_worker_attach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_slot_sync_workers); + MySlotSyncWorker = &LogicalRepCtx->ss_workers[slot]; + MySlotSyncWorker->slot = slot; + + if (!MySlotSyncWorker->in_use) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot synchronization worker slot %d is " + "empty, cannot attach", slot))); + } + + if (MySlotSyncWorker->proc) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot synchronization worker slot %d is " + "already used by another worker, cannot attach", slot))); + } + + MySlotSyncWorker->proc = MyProc; + + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Wait for a slot-sync worker to start up and attach to the shmem context. + * + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. + * + * Returns whether the attach was successful. + */ +static bool +WaitForSlotSyncWorkerAttach(SlotSyncWorker * worker, + uint16 generation, + BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + + /* Worker either died or has started. Return false if died. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(SlotSyncWorkerLock); + return worker->in_use; + } + + LWLockRelease(SlotSyncWorkerLock); + + /* Check if worker has died before attaching, and clean up after it. */ + status = GetBackgroundWorkerPid(handle, &pid); + + if (status == BGWH_STOPPED) + { + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + return false; + } + + /* + * We need timeout because we generally don't get notified via latch + * about the worker attach. But we don't expect to have to wait long. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } +} + +/* + * Slot Sync worker find. + * + * Walks the slot-sync workers pool and searches for one that matches given + * dbid. Since one worker can manage multiple dbs, so it walks the db array in + * each worker to find the match. + */ +static SlotSyncWorker * +slotsync_worker_find(Oid dbid) +{ + int i; + SlotSyncWorker *res = NULL; + Oid *dbids; + + Assert(LWLockHeldByMe(SlotSyncWorkerLock)); + + /* Search for attached worker for a given dbid */ + for (i = 0; i < max_slot_sync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + int cnt; + + if (!w->in_use) + continue; + + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); + SpinLockAcquire(&w->mutex); + for (cnt = 0; cnt < w->dbcount; cnt++) + { + Oid wdbid = dbids[cnt]; + + if (wdbid == dbid) + { + res = w; + break; + } + } + SpinLockRelease(&w->mutex); + + /* if worker is found, break the outer loop */ + if (res) + break; + } + + return res; +} + +/* + * Setup DSA for slot-sync worker. + * + * DSA is needed for dbids array. Since max number of dbs a worker can + * manage is not known, so initially fixed size to hold ALLOC_DB_PER_WORKER + * dbs is allocated. If this size if exhausted, it can be reallocated + * using dsa free and allocate routines. + */ +static dsa_handle +slot_sync_dsa_setup(SlotSyncWorker * worker, int alloc_db_count) +{ + dsa_area *dbids_dsa; + dsa_pointer dbids_dp; + dsa_handle dbids_dsa_handle; + MemoryContext oldcontext; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + dbids_dsa = dsa_create(LWTRANCHE_SLOT_SYNC_DSA); + dsa_pin(dbids_dsa); + dsa_pin_mapping(dbids_dsa); + + dbids_dp = dsa_allocate0(dbids_dsa, alloc_db_count * sizeof(Oid)); + + /* set-up worker */ + SpinLockInit(&worker->mutex); + worker->dbcount = 0; + worker->dbids_dsa = dbids_dsa; + worker->dbids_dp = dbids_dp; + + /* Get the handle. This is the one which can be passed to worker processes */ + dbids_dsa_handle = dsa_get_handle(dbids_dsa); + + ereport(DEBUG1, + (errmsg("allocated dsa for slot sync worker for dbcount: %d", + alloc_db_count))); + + MemoryContextSwitchTo(oldcontext); + + return dbids_dsa_handle; +} + +/* + * Stop the slot-sync worker and wait until it detaches from the + * slot. + */ +static void +slot_sync_worker_stop(SlotSyncWorker * worker) +{ + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* send SIGINT so that it exists cleanly ... */ + kill(worker->proc->pid, SIGINT); + + /* ... and wait for it to exit. */ + for (;;) + { + int rc; + + /* is it gone? */ + if (!worker->proc) + break; + + LWLockRelease(SlotSyncWorkerLock); + + /* Wait a bit --- we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + } + +} + +/* + * Slot sync worker launch or reuse + * + * Start new slot-sync background worker from the pool of available workers + * going by max_slot_sync_workers count. If the worker pool is exhausted, + * reuse the existing worker with minimum number of dbs. The idea is to + * always distribute the dbs equally among launched workers. + * If initially allocated dbids array is exhausted for the selected worker, + * reallocate the dbids array with increased size and copy the existing + * dbids to it and assign the new one as well. + * + * Returns true on success, false on failure. + */ +static bool +slot_sync_worker_launch_or_reuse(Oid dbid, Oid userid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + uint i; + SlotSyncWorker *worker = NULL; + uint mindbcnt = 0; + uint alloc_count = 0; + uint copied_dbcnt = 0; + Oid *copied_dbids = NULL; + int worker_slot = -1; + dsa_handle handle; + Oid *dbids; + + Assert(OidIsValid(dbid)); + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Find unused worker slot. */ + for (i = 0; i < max_slot_sync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (!w->in_use) + { + worker = w; + worker_slot = i; + break; + } + } + + /* + * If all the workers are currently in use. Find the one with minimum + * number of dbs and use that. + */ + if (!worker) + { + for (i = 0; i < max_slot_sync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (i == 0) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + else if (w->dbcount < mindbcnt) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + } + } + + /* + * If worker is being reused, and there is vacancy in dbids array, just + * update dbids array and dbcount and we are done. But if dbids array is + * exhausted, reallocate dbids using dsa and copy the old dbids and assign + * the new one as well. + */ + if (worker->in_use) + { + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + SpinLockAcquire(&worker->mutex); + + if (worker->dbcount < ALLOC_DB_PER_WORKER) + { + dbids[worker->dbcount++] = dbid; + + } + else + { + MemoryContext oldcontext; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + /* Remember the old dbids before we reallocate dsa. */ + copied_dbcnt = worker->dbcount; + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid)); + + alloc_count = copied_dbcnt + ALLOC_DB_PER_WORKER; + + /* Free the existing dbids and allocate new with increased size */ + if (DsaPointerIsValid(worker->dbids_dp)) + dsa_free(worker->dbids_dsa, worker->dbids_dp); + + worker->dbids_dp = dsa_allocate0(worker->dbids_dsa, + alloc_count * sizeof(Oid)); + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + /* Copy the existing dbids */ + worker->dbcount = copied_dbcnt; + memcpy(dbids, copied_dbids, copied_dbcnt * sizeof(Oid)); + + /* Assign new dbid */ + dbids[worker->dbcount++] = dbid; + + MemoryContextSwitchTo(oldcontext); + } + + SpinLockRelease(&worker->mutex); + LWLockRelease(SlotSyncWorkerLock); + + ereport(LOG, + (errmsg("Adding database %d to replication slot" + " synchronization worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); + + return true; + + } + + /* Prepare the new worker. */ + worker->launch_time = GetCurrentTimestamp(); + worker->in_use = true; + + /* + * 'proc' and 'slot' will be assigned in ReplSlotSyncMain when we attach + * this worker to a particular worker-pool slot + */ + worker->proc = NULL; + worker->slot = -1; + + /* TODO: do we really need these 2? analyse more here */ + worker->dbid = dbid; + worker->generation++; + + /* Initial DSA setup for dbids array to hold ALLOC_DB_PER_WORKER dbs */ + handle = slot_sync_dsa_setup(worker, ALLOC_DB_PER_WORKER); + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + dbids[worker->dbcount++] = dbid; + worker->userid = userid; + + /* Before releasing lock, remember generation for future identification. */ + generation = worker->generation; + + LWLockRelease(SlotSyncWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain"); + + Assert(worker_slot >= 0); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot synchronization worker %d", worker_slot); + + snprintf(bgw.bgw_type, BGW_MAXLEN, "slot synchronization worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(worker_slot); + + memcpy(bgw.bgw_extra, &handle, sizeof(dsa_handle)); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->generation); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase %s.", "max_worker_processes"))); + return false; + } + + /* Now wait until it attaches. */ + return WaitForSlotSyncWorkerAttach(worker, generation, bgw_handle); +} + +/* + * Slot-sync workers remove obsolete DBs from db-list + * + * If the DBIds fetched from primary are lesser than the ones being managed by + * slot sync workers, remove extra dbs from worker's db-list. This may happen + * if some slots are removed on primary or 'synchronize_slot_names' have been + * changed by user. + */ +static void +slot_sync_remove_obsolete_dbs(List *remote_dbs) +{ + ListCell *lc; + Oid *dbids; + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Traverse slot-sync-workers to validate the DBs */ + for (int widx = 0; widx < max_slot_sync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (!worker->in_use) + continue; + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + for (int dbidx = 0; dbidx < worker->dbcount;) + { + Oid wdbid = dbids[dbidx]; + bool found = false; + + /* Check if current DB is still present in remote-db-list */ + foreach(lc, remote_dbs) + { + WalRecvReplicationSlotDbData *slot_db_data = lfirst(lc); + + if (slot_db_data->database == wdbid) + { + found = true; + break; + } + } + + /* If not found, then delete this db from worker's db-list */ + if (!found) + { + SpinLockAcquire(&worker->mutex); + + for (int i = dbidx; i < worker->dbcount; i++) + { + /* Shift the DBs and get rid of wdbid */ + if (i < (worker->dbcount - 1)) + dbids[i] = dbids[i + 1]; + } + + worker->dbcount--; + SpinLockRelease(&worker->mutex); + + ereport(LOG, + (errmsg("Removed database %d from replication slot" + " synchronization worker %d", + wdbid, worker->slot))); + } + /* Else move to next db-position */ + else + { + dbidx++; + } + } + } + + LWLockRelease(SlotSyncWorkerLock); + + /* If dbcount for any worker has become 0, shut it down */ + for (int widx = 0; widx < max_slot_sync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker->in_use && !worker->dbcount) + { + int slot = worker->slot; + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + slot_sync_worker_stop(worker); + LWLockRelease(SlotSyncWorkerLock); + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + ereport(LOG, + (errmsg("Stopped replication slot synchronization worker %d", + slot))); + } + } + +} + +/* + * Start slot-sync background workers. + * + * It connects to primary, get the list of DBIDs for slots configured in + * synchronize_slot_names. It then launces the slot-sync workers as per + * max_slot_sync_workers and then assign the DBs equally to the workers + * launched. + */ +static void +ApplyLauncherStartSlotSync(long *wait_time) +{ + WalReceiverConn *wrconn; + char *err; + List *slots_dbs; + ListCell *lc; + MemoryContext tmpctx; + MemoryContext oldctx; + + if (max_slot_sync_workers == 0) + return; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + wrconn = walrcv_connect(PrimaryConnInfo, false, false, + "Logical Replication Launcher", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + /* Use temporary context for the slot list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher Slot Sync ctx", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(tmpctx); + + slots_dbs = walrcv_list_db_for_logical_slots(wrconn, synchronize_slot_names); + + slot_sync_remove_obsolete_dbs(slots_dbs); + + foreach(lc, slots_dbs) + { + WalRecvReplicationSlotDbData *slot_db_data = lfirst(lc); + SlotSyncWorker *w; + TimestampTz last_sync; + TimestampTz now; + long elapsed; + + if (!OidIsValid(slot_db_data->database)) + continue; + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + w = slotsync_worker_find(slot_db_data->database); + LWLockRelease(SlotSyncWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. + */ + last_sync = slot_db_data->last_sync_time; + now = GetCurrentTimestamp(); + if (last_sync == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_sync, now)) >= + wal_retrieve_retry_interval) + { + slot_db_data->last_sync_time = now; + slot_sync_worker_launch_or_reuse(slot_db_data->database, + BOOTSTRAP_SUPERUSERID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(tmpctx); + + walrcv_disconnect(wrconn); +} + +static void +ApplyLauncherStartSubs(long *wait_time) +{ + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases where + * a restart is expected (e.g., subscription parameter changes), + * another process should remove the last-start entry for the + * subscription so that the worker can be restarted without waiting + * for wal_retrieve_retry_interval to elapse. + */ + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + /* * Main loop for the apply launcher process. */ @@ -1139,79 +1904,20 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - /* Use temporary context to avoid leaking memory across cycles. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* Start any missing workers for enabled subscriptions. */ - sublist = get_subscription_list(); - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - TimestampTz last_start; - TimestampTz now; - long elapsed; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w != NULL) - continue; /* worker is running already */ - - /* - * If the worker is eligible to start now, launch it. Otherwise, - * adjust wait_time so that we'll wake up as soon as it can be - * started. - * - * Each subscription's apply worker can only be restarted once per - * wal_retrieve_retry_interval, so that errors do not cause us to - * repeatedly restart the worker as fast as possible. In cases - * where a restart is expected (e.g., subscription parameter - * changes), another process should remove the last-start entry - * for the subscription so that the worker can be restarted - * without waiting for wal_retrieve_retry_interval to elapse. - */ - last_start = ApplyLauncherGetWorkerStartTime(sub->oid); - now = GetCurrentTimestamp(); - if (last_start == 0 || - (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) - { - ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); - } - else - { - wait_time = Min(wait_time, - wal_retrieve_retry_interval - elapsed); - } - } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + if (!RecoveryInProgress()) + ApplyLauncherStartSubs(&wait_time); + else + ApplyLauncherStartSlotSync(&wait_time); /* Wait for more work. */ rc = WaitLatch(MyLatch, diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 55a24c02c9..0dea546f36 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -202,6 +202,15 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ReplicationSlotAcquire(NameStr(*name), true); + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("operation not permitted on replication slots on " + "standby which are synchronized from primary"))); + } + PG_TRY(); { /* restart at slot's confirmed_flush */ diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..b011b4cf01 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,787 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby from primary + * + * Copyright (c) 2016-2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot synchronization worker on physical + * standby that fetches the logical replication slots information from + * primary server (PrimaryConnInfo) and creates the slots on standby and + * synchronizes them periodically. It synchronizes only the slots configured + * in 'synchronize_slot_names'. + * + * It also takes care of dropping the slots which were created by it and are + * currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME before every next synchronization. + * If there is no acitivity observed on primary for sometime, it increases the + * naptime to WORKER_INACTIVITY_NAPTIME and as soon as any activity is observed, + * it brings back the naptime to default value. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; +} RemoteSlot; + +/* Worker's naptime in case of regular activity on primary */ +#define WORKER_DEFAULT_NAPTIME 10L /* 10 ms */ + +/* Worker's naptime in case of no-activity on primary */ +#define WORKER_INACTIVITY_NAPTIME 10000L /* 10 sec */ + +/* + * Inactivity Threshold Count before increasing naptime of worker. + * + * If the lsn of slot being monitored did not change for these many times, + * then increase naptime of current worker from WORKER_DEFAULT_NAPTIME to + * WORKER_INACTIVITY_NAPTIME. + */ +#define WORKER_INACTIVITY_THRESHOLD 10 + +/* + * Wait for remote slot to pass localy reserved position. + */ +static void +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name, + XLogRecPtr min_lsn) +{ + WalRcvExecResult *res; + TupleTableSlot *slot; + Oid slotRow[1] = {LSNOID}; + StringInfoData cmd; + bool isnull; + XLogRecPtr restart_lsn; + + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT restart_lsn" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from" + " primary: %s", slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disapeared from provider", + slot_name))); + + restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (restart_lsn >= min_lsn) + break; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot * remote_slot) +{ + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + ReplicationSlotMarkDirty(); +} + +/* + * Get list of local logical slot names which are synchronized from + * primary and belongs to one of the DBs passed in. + */ +static List * +get_local_synced_slot_names(Oid *dbids) +{ + List *slotNames = NIL; + + Assert(LWLockHeldByMe(SlotSyncWorkerLock)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + char *slotName; + + if (s->in_use && SlotIsLogical(s)) + { + SpinLockAcquire(&MySlotSyncWorker->mutex); + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + /* + * Add it to output list if this is the synchronized slot and + * belongs to worker's db. + */ + if (s->data.synced && s->data.database == dbids[j]) + { + slotName = pstrdup(NameStr(s->data.name)); + slotNames = lappend(slotNames, slotName); + break; + } + } + SpinLockRelease(&MySlotSyncWorker->mutex); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return slotNames; +} + +/* + * Helper function to check if a replication slot name exists in the list. + */ +static bool +replication_slot_name_exists(List *list_slot_names, const char *slot_name) +{ + ListCell *cell; + + foreach(cell, list_slot_names) + { + char *name = (char *) lfirst(cell); + + if (strcmp(name, slot_name) == 0) + return true; + } + + return false; +} + +/* + * Use slot_name in query. + * + * Check the dbid of the slot and if the dbid is one of the dbids managed by + * current worker, then use this slot-name in query to get the data from + * primary. If the slot is not created yet on standby (first time it is being + * queried), then too, use this slot in query. + */ +static bool +use_slot_in_query(char *slot_name, Oid *dbids) +{ + bool slot_found = false; + bool relevant_db = false; + ReplicationSlot *slot; + + Assert(LWLockHeldByMe(SlotSyncWorkerLock)); + + /* Search for the local slot with the same name as slot_name */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && SlotIsLogical(s) && + (strcmp(NameStr(s->data.name), slot_name) == 0)) + { + slot_found = true; + slot = s; + break; + } + } + + /* Check if slot belongs to one of the input dbids */ + if (slot_found) + { + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + if (slot->data.database == dbids[j]) + { + relevant_db = true; + break; + } + } + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Return TRUE if either slot is not yet created on standby or if it + * belongs to one of the dbs passed in dbids. + */ + if (!slot_found || relevant_db) + return true; + + return false; +} + + +/* + * Compute naptime for MySlotSyncWorker. + * + * Slot sync worker takes a nap before it again checks for slots on primary. + * The time for each nap is computed here. + * + * The first slot managed by each worker is chosen for monitoring purpose. + * If the lsn of that slot changes during each sync-check time, then the + * naptime is kept at regular value of WORKER_DEFAULT_NAPTIME. + * When no lsn change is observed for contiguous WORKER_INACTIVITY_THRESHOLD + * times, then the naptime is increased to WORKER_INACTIVITY_NAPTIME. + * This naptime is brought back to WORKER_DEFAULT_NAPTIME as soon as lsn change + * is observed. + * + * The caller is supposed to ignore return-value of 0. The 0 value is returned + * for the slots other that slot being monitored. + */ +static long +compute_naptime(RemoteSlot * remote_slot) +{ + if (NameStr(MySlotSyncWorker->monitor.slot_name)[0] == '\0') + { + /* + * First time, just update the name and lsn and return regular + * naptime. Start comparison from next time onward. + */ + strcpy(NameStr(MySlotSyncWorker->monitor.slot_name), remote_slot->name); + MySlotSyncWorker->monitor.confirmed_lsn = remote_slot->confirmed_lsn; + return WORKER_DEFAULT_NAPTIME; + } + + /* If this is the slot being monitored by this worker, compute naptime */ + if (strcmp(remote_slot->name, + NameStr(MySlotSyncWorker->monitor.slot_name)) == 0) + { + /* + * if last lsn (monitored one) is same as current lsn (remote one), + * increment inactivity_count. + */ + if (MySlotSyncWorker->monitor.confirmed_lsn == remote_slot->confirmed_lsn) + MySlotSyncWorker->monitor.inactivity_count++; + else + MySlotSyncWorker->monitor.inactivity_count = 0; + + MySlotSyncWorker->monitor.confirmed_lsn = remote_slot->confirmed_lsn; + + /* If inactivity_count reaches the threshold, increase naptime */ + if (MySlotSyncWorker->monitor.inactivity_count >= + WORKER_INACTIVITY_THRESHOLD) + return WORKER_INACTIVITY_NAPTIME; + else + return WORKER_DEFAULT_NAPTIME; + } + + /* if it is not the slot being monitored, return 0 */ + return 0; +} + +/* + * Drop obsolete slots + * + * Drop slots which no longer need to be synced i.e. these either do not + * exist on primary or are no longer part of synchronize_slot_names. + */ +static void +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) +{ + List *local_slot_list = NIL; + ListCell *lc_slot; + + /* + * Get the list of local slots for dbids managed by this worker, so that + * those not on remote could be dropped. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + local_slot_list = get_local_synced_slot_names(dbids); + LWLockRelease(SlotSyncWorkerLock); + + foreach(lc_slot, local_slot_list) + { + char *slotname = (char *) lfirst(lc_slot); + + /* Check if the local slot name is not in the remote slot names list */ + if (!replication_slot_name_exists(remote_slot_list, slotname)) + { + ReplicationSlotDrop(slotname, true); + + /* if this slot is being monitored, clean-up the monitoring info */ + if (strcmp(slotname, + NameStr(MySlotSyncWorker->monitor.slot_name)) == 0) + { + MemSet(NameStr(MySlotSyncWorker->monitor.slot_name), + 0, NAMEDATALEN); + MySlotSyncWorker->monitor.inactivity_count = 0; + MySlotSyncWorker->monitor.confirmed_lsn = 0; + } + + elog(LOG, "Dropped replication slot \"%s\" ", slotname); + } + } +} + +/* + * Synchronize single slot to given position. + * + * This creates new slot if there is no existing one and updates the + * metadata of existing slots as per the data received from primary. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot * remote_slot) +{ + bool found = false; + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + if (strcmp(NameStr(s->data.name), remote_slot->name) == 0) + { + found = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + StartTransactionCommand(); + + /* Already existing slot, acquire */ + if (found) + { + ReplicationSlotAcquire(remote_slot->name, true); + + if (remote_slot->confirmed_lsn < MyReplicationSlot->data.confirmed_flush) + { + elog(DEBUG1, + "not synchronizing slot %s; synchronization would move" + " it backward", remote_slot->name); + + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + + /* update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + slot->data.synced = true; + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + if (remote_slot->confirmed_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass " + "local slot LSN (%X/%X)", remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn))); + + wait_for_primary_slot_catchup(wrconn, remote_slot->name, + MyReplicationSlot->data.restart_lsn); + } + + + /* update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotPersist(); + } + + ReplicationSlotRelease(); + CommitTransactionCommand(); +} + +/* + * Synchronize slots. + * + * It lookes into dbids array maintained by dsa and gets the slots info from + * primary for the slots configured in synchronize_slot_names and belonging + * to concerned dbids and updates the slots locally as per the data received + * from primary. + */ +static long +synchronize_slots(dsa_area *dsa) +{ + WalRcvExecResult *res; + WalReceiverConn *wrconn = NULL; + TupleTableSlot *slot; + Oid slotRow[7] = {TEXTOID, TEXTOID, LSNOID, LSNOID, + XIDOID, BOOLOID, TEXTOID}; + StringInfoData s; + List *remote_slot_list = NIL; + char *database; + char *err; + MemoryContext oldctx = CurrentMemoryContext; + long naptime = WORKER_DEFAULT_NAPTIME; + long value; + Oid *dbids; + + if (!WalRcv) + return naptime; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(oldctx); + + database = get_database_name(MyDatabaseId); + initStringInfo(&s); + appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database); + wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err); + + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + resetStringInfo(&s); + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, database" + " FROM pg_catalog.pg_replication_slots" + " WHERE database IN "); + + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + + SpinLockAcquire(&MySlotSyncWorker->mutex); + dbids = (Oid *) dsa_get_address(dsa, MySlotSyncWorker->dbids_dp); + + /* + * If dbcount has become zero, then return. It can happen in a case when + * synchronize_slot_names changes and the dbs assigned to this worker are + * no longer valid. Launcher will make dbcount=0 and will send SIGINT to + * this worker. There is a small window between the last time + * CHECK_FOR_INTERRUPTS was done and this stage, so there is scope that + * SIGINT is sent in-between and dbcount is made zero, so check for + * dbcount before further processing. + */ + if (!MySlotSyncWorker->dbcount) + { + /* return and let the worker handle the interrupts in main loop */ + pfree(database); + pfree(s.data); + CommitTransactionCommand(); + SpinLockRelease(&MySlotSyncWorker->mutex); + LWLockRelease(SlotSyncWorkerLock); + return naptime; + } + + appendStringInfoChar(&s, '('); + for (int i = 0; i < MySlotSyncWorker->dbcount; i++) + { + char *dbname; + + if (i != 0) + appendStringInfoChar(&s, ','); + + dbname = get_database_name(dbids[i]); + appendStringInfo(&s, "%s", + quote_literal_cstr(dbname)); + pfree(dbname); + } + appendStringInfoChar(&s, ')'); + + if (strcmp(synchronize_slot_names, "") != 0 && + strcmp(synchronize_slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + bool first_slot = true; + + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &namelist); + + foreach(lc, namelist) + { + if (!use_slot_in_query(lfirst(lc), dbids)) + continue; + + if (first_slot) + appendStringInfoString(&s, " AND slot_name IN ("); + else + appendStringInfoChar(&s, ','); + + appendStringInfo(&s, "%s", + quote_literal_cstr(lfirst(lc))); + first_slot = false; + } + + if (!first_slot) + appendStringInfoChar(&s, ')'); + } + + SpinLockRelease(&MySlotSyncWorker->mutex); + LWLockRelease(SlotSyncWorkerLock); + + elog(DEBUG2, "Slot sync worker%d, Query:%s \n", MySlotSyncWorker->slot, s.data); + + res = walrcv_exec(wrconn, s.data, 7, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info from primary: %s", + res->err))); + + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(oldctx); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull)); + Assert(!isnull); + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 5, &isnull)); + Assert(!isnull); + + remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(slot, + 7, &isnull)); + Assert(!isnull); + + /* Create list of remote slot names to be used by drop_obsolete_slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot->name); + + synchronize_one_slot(wrconn, remote_slot); + + /* + * Update naptime in case of non-zero value returned. The zero value + * is returned if remote_slot is not the one being monitored. + */ + value = compute_naptime(remote_slot); + if (value) + naptime = value; + + pfree(remote_slot); + + ExecClearTuple(slot); + } + + /* + * Drop slots which no longer need to be synced i.e. these either do not + * exist on primary or are no longer part of synchronize_slot_names. + */ + drop_obsolete_slots(dbids, remote_slot_list); + + walrcv_clear_result(res); + pfree(database); + + walrcv_disconnect(wrconn); + + return naptime; +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts() +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + (errmsg("replication slot synchronization worker %d is shutting" + " down on receiving SIGINT", MySlotSyncWorker->slot))); + + proc_exit(0); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } +} + +/* + * Detach the worker from DSM and update 'proc' and 'in_use'. + * Logical replication launcher will come to know using these + * that the worker has shutdown. + */ +static void +slotsync_worker_detach(int code, Datum arg) +{ + dsa_detach((dsa_area *) DatumGetPointer(arg)); + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + MySlotSyncWorker->in_use = false; + MySlotSyncWorker->proc = NULL; + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + dsa_handle handle; + dsa_area *dsa; + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + BackgroundWorkerUnblockSignals(); + + /* + * Attach to the dynamic shared memory segment for the slot sync worker + * and find its table of contents. + */ + memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsa_handle)); + dsa = dsa_attach(handle); + if (!dsa) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory " + "segment for slot sync worker"))); + + + /* Primary initialization is complete. Now, attach to our slot. */ + slotsync_worker_attach(worker_slot); + + before_shmem_exit(slotsync_worker_detach, PointerGetDatum(dsa)); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* Connect to our database. */ + BackgroundWorkerInitializeConnectionByOid(MySlotSyncWorker->dbid, + MySlotSyncWorker->userid, + 0); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("replication slot synchronization worker %d " + "started managing database \"%s\" (dbid: %d) ", + worker_slot, get_database_name(MySlotSyncWorker->dbid), + MySlotSyncWorker->dbid))); + CommitTransactionCommand(); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime; + + ProcessSlotSyncInterrupts(); + + if (!RecoveryInProgress()) + return; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + naptime = synchronize_slots(dsa); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + naptime, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } + + /* + * The slot-sync worker must not get here because it will only stop when + * it receives a SIGINT from the logical replication launcher, or when + * there is an error. None of these cases will allow the code to reach + * here. + */ + Assert(false); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e2cee92cf2..36b5ca0898 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..2b00bf845c 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -76,11 +76,12 @@ Node *replication_parse_result; %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_LIST_DBID_FOR_LOGICAL_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show + read_replication_slot timeline_history show list_dbid_for_logical_slots %type generic_option_list %type generic_option %type opt_timeline @@ -91,6 +92,7 @@ Node *replication_parse_result; %type opt_temporary %type create_slot_options create_slot_legacy_opt_list %type create_slot_legacy_opt +%type slot_name_list slot_name_list_opt %% @@ -114,6 +116,7 @@ command: | read_replication_slot | timeline_history | show + | list_dbid_for_logical_slots ; /* @@ -126,6 +129,33 @@ identify_system: } ; +slot_name_list: + IDENT + { + $$ = list_make1($1); + } + | slot_name_list ',' IDENT + { + $$ = lappend($1, $3); + } + +slot_name_list_opt: + slot_name_list { $$ = $1; } + | /* EMPTY */ { $$ = NIL; } + ; + +/* + * LIST_DBID_FOR_LOGICAL_SLOTS + */ +list_dbid_for_logical_slots: + K_LIST_DBID_FOR_LOGICAL_SLOTS slot_name_list_opt + { + ListDBForLogicalSlotsCmd *cmd = makeNode(ListDBForLogicalSlotsCmd); + cmd->slot_names = $2; + $$ = (Node *) cmd; + } + ; + /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 1cc7fb858c..d4ecce6a47 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } +LIST_DBID_FOR_LOGICAL_SLOTS { return K_LIST_DBID_FOR_LOGICAL_SLOTS; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } @@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void) case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: + case K_LIST_DBID_FOR_LOGICAL_SLOTS: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; return true; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 793093b32c..f718159ce1 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -92,7 +92,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -317,6 +317,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.synced = false; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c old mode 100644 new mode 100755 index 80374c55be..c0ce3425b4 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -473,6 +473,103 @@ IdentifySystem(void) end_tup_output(tstate); } +static int +pg_qsort_namecmp(const void *a, const void *b) +{ + return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN); +} + +/* + * Handle the LIST_SLOT_DATABASE_OIDS command. + */ +static void +ListSlotDatabaseOIDs(ListDBForLogicalSlotsCmd *cmd) +{ + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + NameData *slot_names = NULL; + int numslot_names; + List *database_oids_list = NIL; + + numslot_names = list_length(cmd->slot_names); + if (numslot_names) + { + ListCell *lc; + int i = 0; + + slot_names = palloc(numslot_names * sizeof(NameData)); + foreach (lc, cmd->slot_names) + { + char *slot_name = lfirst(lc); + + ReplicationSlotValidateName(slot_name, ERROR); + namestrcpy(&slot_names[i++], slot_name); + } + + qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp); + } + + dest = CreateDestReceiver(DestRemoteSimple); + + /* need a tuple descriptor representing a single column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)1, "database_oid", + INT8OID, -1, 0); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + Oid datoid; /* Variable to store the database OID for each slot */ + Datum values[1]; + bool nulls[1]; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + + datoid = slot->data.database; + + SpinLockRelease(&slot->mutex); + + /* + * If slot names were provided and the current slot name is not in the + * list, skip it. + */ + if (numslot_names && + !bsearch((void *)&slot->data.name, (void *)slot_names, + numslot_names, sizeof(NameData), pg_qsort_namecmp)) + continue; + + /* + * Check if the database OID is already in the list, and if so, skip + * this slot. + */ + if ((OidIsValid(datoid) && list_member_oid(database_oids_list, datoid))) + continue; + + /* Add the database OID to the list */ + database_oids_list = lappend_oid(database_oids_list, datoid); + + values[0] = Int64GetDatum(datoid); + nulls[0] = (datoid == InvalidOid); + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + } + LWLockRelease(ReplicationSlotControlLock); + + /* Clean up the list */ + list_free(database_oids_list); + + end_tup_output(tstate); +} + /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -1819,6 +1916,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ListDBForLogicalSlotsCmd: + cmdtag = "LIST_DBID_FOR_LOGICAL_SLOTS"; + set_ps_display(cmdtag); + ListSlotDatabaseOIDs((ListDBForLogicalSlotsCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 811ad94742..2cb8fd9ed5 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +SlotSyncWorkerLock 49 diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 13774254d2..48f5196a7e 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,7 @@ WAIT_EVENT_LOGICAL_APPLY_MAIN LogicalApplyMain "Waiting in main loop of logical WAIT_EVENT_LOGICAL_LAUNCHER_MAIN LogicalLauncherMain "Waiting in main loop of logical replication launcher process." WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN LogicalParallelApplyMain "Waiting in main loop of logical replication parallel apply process." WAIT_EVENT_RECOVERY_WAL_STREAM RecoveryWalStream "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +WAIT_EVENT_REPL_SLOT_SYNC_MAIN ReplSlotSyncMain "Waiting in main loop of worker for synchronizing slots to a standby from primary." WAIT_EVENT_SYSLOGGER_MAIN SysLoggerMain "Waiting in main loop of syslogger process." WAIT_EVENT_WAL_RECEIVER_MAIN WalReceiverMain "Waiting in main loop of WAL receiver process." WAIT_EVENT_WAL_SENDER_MAIN WalSenderMain "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 820f716cc2..20c55bb7e4 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -63,8 +63,11 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3507,6 +3510,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_sync_workers", + PGC_SIGHUP, + REPLICATION_STANDBY, + gettext_noop("Maximum number of slots synchronization workers " + "on a standby."), + NULL, + }, + &max_slot_sync_workers, + 2, 0, MAX_SLOT_SYNC_WORKER_LIMIT, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -4556,7 +4572,7 @@ struct config_string ConfigureNamesString[] = * standby, therefore, we might need a new group REPLICATION. */ { - {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY, + {"synchronize_slot_names", PGC_USERSET, REPLICATION_STANDBY, gettext_noop("List of replication slot names to synchronize from " "primary to streaming replication standby server."), gettext_noop("Value of \"*\" means all."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 63daf586f3..4e0ae87b54 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -359,6 +359,7 @@ #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery #synchronize_slot_names = '' # replication slot names to synchronize from # primary to streaming replication standby server +#max_slot_sync_workers = 2 # max number of slot synchronization workers # - Subscribers - diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..75b4b2040d 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,7 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, + char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 4321ba8f86..bc9c1baea1 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ------------------------------- + * LIST_DBID_FOR_LOGICAL_SLOTS command + * ------------------------------- + */ +typedef struct ListDBForLogicalSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListDBForLogicalSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/postmaster/bgworker_internals.h b/src/include/postmaster/bgworker_internals.h index 4ad63fd9bd..19c5421a55 100644 --- a/src/include/postmaster/bgworker_internals.h +++ b/src/include/postmaster/bgworker_internals.h @@ -22,6 +22,7 @@ * Maximum possible value of parallel workers. */ #define MAX_PARALLEL_WORKER_LIMIT 1024 +#define MAX_SLOT_SYNC_WORKER_LIMIT 50 /* * List of background workers, private to postmaster. diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index a07c9cb311..690f3deebd 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,8 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_slot_sync_workers; + extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); @@ -31,4 +33,6 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern PGDLLIMPORT char *PrimaryConnInfo; + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index bbd71d0b42..e1af29af4a 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void ReplSlotSyncMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 2765f99ccf..d61697b9ac 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -111,6 +110,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is standby synced slot? + */ + bool synced; } ReplicationSlotPersistentData; /* @@ -240,7 +244,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..6ef254eb5c 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -20,6 +20,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -191,6 +192,15 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Slot's DBids receiver from remote. + */ +typedef struct WalRecvReplicationSlotDbData +{ + Oid database; + TimestampTz last_sync_time; +} WalRecvReplicationSlotDbData; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -280,6 +290,15 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_list_db_for_logical_slots_fn + * + * Run LIST_DBID_FOR_LOGICAL_SLOTS on primary server to get the + * list of unique DBIDs for logical slots mentioned in 'slots' + */ +typedef List *(*walrcv_list_db_for_logical_slots_fn) (WalReceiverConn *conn, + const char *slots); + /* * walrcv_server_version_fn * @@ -393,6 +412,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_list_db_for_logical_slots_fn walrcv_list_db_for_logical_slots; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -417,6 +437,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_list_db_for_logical_slots(conn, slots) \ + WalReceiverFunctions->walrcv_list_db_for_logical_slots(conn, slots) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8f4bed0958..05fa86d7f1 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -77,7 +77,7 @@ typedef struct LogicalRepWorker * would be created for each transaction which will be deleted after the * transaction is finished. */ - FileSet *stream_fileset; + struct FileSet *stream_fileset; /* * PID of leader apply worker if this slot is used for a parallel apply @@ -96,6 +96,52 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +typedef struct SlotSyncWorkerWatchSlot +{ + NameData slot_name; + XLogRecPtr confirmed_lsn; + int inactivity_count; +} SlotSyncWorkerWatchSlot; + +typedef struct SlotSyncWorker +{ + /* Time at which this worker was launched. */ + TimestampTz launch_time; + + /* Indicates if this slot is used or free. */ + bool in_use; + + /* The slot in worker pool to which it is attached */ + int slot; + + /* Increased every time the slot is taken by new worker. */ + uint16 generation; + + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* User to use for connection (will be same as owner of subscription). */ + Oid userid; + + /* Database id to connect to. */ + Oid dbid; + + /* Count of Database ids it manages */ + uint32 dbcount; + + /* DSA for dbids */ + dsa_area *dbids_dsa; + + /* dsa_pointer for database ids it manages */ + dsa_pointer dbids_dp; + + /* Mutex to access dbids in dsa */ + slock_t mutex; + + /* Info about slot being monitored for worker's naptime purpose */ + SlotSyncWorkerWatchSlot monitor; +} SlotSyncWorker; + /* * State of the transaction in parallel apply worker. * @@ -234,12 +280,14 @@ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; /* Worker and subscription objects. */ extern PGDLLIMPORT Subscription *MySubscription; extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; +extern PGDLLIMPORT SlotSyncWorker *MySlotSyncWorker; extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); +extern void slotsync_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index d77410bdea..a7be908b39 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,7 +207,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, - LWTRANCHE_FIRST_USER_DEFINED + LWTRANCHE_FIRST_USER_DEFINED, + LWTRANCHE_SLOT_SYNC_DSA } BuiltinTrancheIds; /* diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index ee590eeac7..ca043d2009 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -44,6 +44,7 @@ tests += { 't/036_truncated_dropped.pl', 't/037_invalid_database.pl', 't/050_verify_slot_order.pl', + 't/051_slot_sync.pl', ], }, } diff --git a/src/test/recovery/t/051_slot_sync.pl b/src/test/recovery/t/051_slot_sync.pl new file mode 100644 index 0000000000..febe4e3db8 --- /dev/null +++ b/src/test/recovery/t/051_slot_sync.pl @@ -0,0 +1,132 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby'); +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} + +# Check invalidation in the logfile +sub check_for_invalidation +{ + my ($log_start, $test_name) = @_; + + # message should be issued + ok( find_in_log( + $node_phys_standby, + "invalidating obsolete replication slot \"sub1\"", $log_start), + "sub1 slot invalidation is logged $test_name"); +} + +# Check conflicting status in pg_replication_slots. +sub check_slots_conflicting_status +{ + my $res = $node_phys_standby->safe_psql( + 'postgres', qq( + select bool_and(conflicting) from pg_replication_slots;)); + + is($res, 't', + "Logical slot is reported as conflicting"); +} + +$node_primary->init(allows_streaming => 'logical'); +$node_primary->append_conf('postgresql.conf', q{ +synchronize_slot_names = '*' +standby_slot_names = 'pslot1' +}); +$node_primary->start; +$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');}); + +$node_primary->backup('backup'); + +$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1); +$node_phys_standby->append_conf('postgresql.conf', q{ +synchronize_slot_names = '*' +primary_slot_name = 'pslot1' +hot_standby_feedback = off +}); +$node_phys_standby->start; + +$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)"); + +# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush +# WAL. An insert into flush_wal outside transaction does guarantee a flush. +$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]); + +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); + +$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' dbname=postgres') . "' PUBLICATION pub1"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = $node_primary->safe_psql('postgres', + "SELECT slot_name, plugin, database FROM pg_replication_slots WHERE slot_type = 'logical'"); + +is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary'); + +# FIXME: standby needs restart to pick up new slots +$node_phys_standby->restart; +sleep 3; + +$result = $node_phys_standby->safe_psql('postgres', + "SELECT slot_name, plugin, database FROM pg_replication_slots"); + +is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby'); + +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)"); +$node_primary->wait_for_catchup('sub1'); + +$node_primary->wait_for_catchup($node_phys_standby->name); + +# Logical subscriber and physical replica are caught up at this point. + +# Drop the subscription so that catalog_xmin is unknown on the primary +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); + +# This should trigger a conflict as hot_standby_feedback is off on the standby +$node_primary->safe_psql('postgres', qq[ + CREATE TABLE conflict_test(x integer, y text); + DROP TABLE conflict_test; + VACUUM full pg_class; + INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal +]); + +# Ensure physical replay catches up +$node_primary->wait_for_catchup($node_phys_standby); + +# Check invalidation in the logfile +check_for_invalidation(1, 'with vacuum FULL on pg_class'); + +# Check conflicting status in pg_replication_slots. +check_slots_conflicting_status(); + +done_testing(); -- 2.34.1