From c192fc65438a14047001e69ad906c31f8c0dda3a Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 11 Aug 2022 10:59:55 -0400 Subject: [PATCH v6] fix excessive replication origin slots issue. The replication origin tracking uses the same GUC (max_replication_slots) as the tablesync slots for limiting resources. In the current (HEAD) code the replication origin tracking of the completed tablesync worker is dropped by the apply worker, but this means there can be a small lag between when the tablesync worker exited, and when its origin tracking is actually dropped. Meanwhile, new tablesync workers can be launched and will immediately try to acquire new slots and origin tracking. If this happens before the worker's origin tracking gets dropped then the available number of slots (max_replication_slots) can be exceeded, leading to the error as reported. To avoid this lag, the dropping of replicating origin tracking is moved to the tablesync worker where it exits. --- src/backend/replication/logical/tablesync.c | 58 +++++++++++++++-------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6a01ffd..df04b1c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; @@ -316,22 +317,46 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); /* - * Cleanup the tablesync slot. + * Cleanup the origin tracking and tablesync slot. * * This has to be done after updating the state because otherwise if * there is an error while doing the database operations we won't be * able to rollback dropped slot. */ - ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - syncslotname, - sizeof(syncslotname)); /* + * Cleanup the tablesync origin tracking. + * + * Resetting the origin session removes the ownership of the slot. + * This is needed to allow the origin to be dropped. + * + * Also, there is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + replorigin_drop_by_name(originname, true, false); + + /* + * Cleanup the tablesync slot. + * * It is important to give an error if we are unable to drop the slot, * otherwise, it won't be dropped till the corresponding subscription * is dropped. So passing missing_ok = false. */ + ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + syncslotname, + sizeof(syncslotname)); ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); finish_sync_worker(); @@ -441,8 +466,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -452,26 +475,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Remove the tablesync origin tracking if exists. - * - * The normal case origin drop is done here instead of in the - * process_syncing_tables_for_sync function because we don't - * allow to drop the origin till the process owning the origin - * is alive. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin and by this time the origin might be - * already removed. So passing missing_ok = true. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - - /* - * Update the state to READY only after the origin cleanup. + * Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 1.8.3.1