From 10f0f8ad284a1d6169e6bd5443af0e4d2385c43f Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 29 Jul 2022 07:31:45 -0400 Subject: [PATCH v4] fix excessive replication origin slots issue The replication origin tracking uses the same GUC (max_replication_slots) as the tablesync slots for limiting resources. In the current (HEAD) code the replication origin tracking of the completed tablesync worker is dropped by the apply worker, but this means there can be a small lag between when the tablesync worker exited, and when its origin tracking is actually dropped. Meanwhile, new tablesync workers can be launched and will immediately try to acquire new slots and origin tracking. If this happens before the worker's origin tracking gets dropped then the available number of slots (max_replication_slots) can be exceeded, leading to the error as reported. To avoid this lag, the dropping of replicating origin tracking is moved to the tablesync worker where it exits. --- src/backend/replication/logical/tablesync.c | 58 +++++++++++++++++------------ 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6a01ffd..b7b23f2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; @@ -316,12 +317,42 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); /* - * Cleanup the tablesync slot. + * Cleanup the origin tracking and tablesync slot. * * This has to be done after updating the state because otherwise if * there is an error while doing the database operations we won't be - * able to rollback dropped slot. + * able to rollback dropped slot or origin tracking. */ + + /* + * Cleanup the tablesync origin tracking if exists. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* + * Reset the origin session before dropping. + * + * This is required to reset the ownership of the slot + * and allow the origin to be dropped. + */ + replorigin_session_reset(); + + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * There is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + + /* Cleanup the tablesync slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, syncslotname, @@ -441,8 +472,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -452,26 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Remove the tablesync origin tracking if exists. - * - * The normal case origin drop is done here instead of in the - * process_syncing_tables_for_sync function because we don't - * allow to drop the origin till the process owning the origin - * is alive. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin and by this time the origin might be - * already removed. So passing missing_ok = true. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - - /* - * Update the state to READY only after the origin cleanup. + * Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 1.8.3.1