From 903b747312afe69f35a645dcbb1680d23a34404b Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 17 Aug 2022 23:37:09 -0400 Subject: [PATCH v9] fix excessive replication origin slots issue. The replication origin tracking uses the same GUC (max_replication_slots) as the tablesync slots for limiting resources. In the current (HEAD) code the replication origin tracking of the completed tablesync worker is dropped by the apply worker, but this means there can be a small lag between when the tablesync worker exited, and when its origin tracking is actually dropped. Meanwhile, new tablesync workers can be launched and will immediately try to acquire new slots and origin tracking. If this happens before the worker's origin tracking gets dropped then the available number of slots (max_replication_slots) can be exceeded, leading to the error as reported. To avoid this lag, the dropping of replicating origin tracking is moved to the tablesync worker where it exits. --- src/backend/commands/subscriptioncmds.c | 20 +++++++---- src/backend/replication/logical/tablesync.c | 52 +++++++++++++++-------------- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 670b219..ab0ee7f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -919,10 +919,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, logicalrep_worker_stop(sub->oid, relid); /* - * For READY state, we would have already dropped the - * tablesync origin. + * For READY state and SYNCDONE state, we would have already + * dropped the tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE) { char originname[NAMEDATALEN]; @@ -931,7 +931,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * * It is possible that the origin is not yet created for * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The apply worker can also + * SUBREL_STATE_FINISHEDCOPY. The tablesync worker can also * concurrently try to drop the origin and by this time * the origin might be already removed. For these reasons, * passing missing_ok = true. @@ -1507,13 +1507,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* * Drop the tablesync's origin tracking if exists. * + * For SYNCDONE/READY states, the tablesync origin tracking is + * known to have already been dropped by the tablesync worker. + * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + ReplicationOriginNameForTablesync(subid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } } /* Clean up dependencies */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index bfcb80b..c7d1972 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; @@ -310,6 +311,28 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate_lsn); /* + * Cleanup the tablesync origin tracking. + * + * Resetting the origin session removes the ownership of the slot. + * This is needed to allow the origin to be dropped. + * + * Also, there is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + replorigin_drop_by_name(originname, true, false); + + /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop * the slot. */ @@ -318,9 +341,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Cleanup the tablesync slot. * - * This has to be done after updating the state because otherwise if - * there is an error while doing the database operations we won't be - * able to rollback dropped slot. + * This has to be done after the data changes because otherwise if + * there is an error while doing the database operations we won't + * be able to rollback dropped slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, @@ -441,8 +464,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -452,26 +473,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Remove the tablesync origin tracking if exists. - * - * The normal case origin drop is done here instead of in the - * process_syncing_tables_for_sync function because we don't - * allow to drop the origin till the process owning the origin - * is alive. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin and by this time the origin might be - * already removed. So passing missing_ok = true. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - - /* - * Update the state to READY only after the origin cleanup. + * Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 1.8.3.1