From c85249360d8f1a45d55b3eab3e995b0436a0b9b2 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 29 Jul 2022 02:31:51 -0400 Subject: [PATCH v3] fix excessive replicating origin slots issue. In the current (HEAD) code the origin slots of the completed tablesync worker are dropped by the apply worker, but this means there can be a small lag between when the tablesync worker exits, and when its origin slot is actually dropped. Meanwhile new tablesync workers are launched and try and acquire slots before the previously finished tablesync worker slots could be dropped. If this happens the the available number of slots might be exceeded leading to the error reported. To avoid this lag, the dropping of replicating origin slots is moved to the tablesync worker when it exits. --- src/backend/replication/logical/tablesync.c | 61 +++++++++++++++++------------ 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6a01ffd..40de753 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; @@ -315,13 +316,43 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* + * Cleanup the tablesync slot and the origin tracking if exists. + * + * This has to be done after updating the state because otherwise if + * there is an error while doing the database operations we won't be + * able to rollback dropped slot or origin tracking. + */ + /* - * Cleanup the tablesync slot. + * Cleanup the tablesync origin tracking if exists. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* + * Reset the origin session before dropping. * - * This has to be done after updating the state because otherwise if - * there is an error while doing the database operations we won't be - * able to rollback dropped slot. + * This is required to reset the ownership of the slot + * and allow the slot to be dropped. */ + replorigin_session_reset(); + + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * There is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + + /* Cleanup the tablesync slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, syncslotname, @@ -441,8 +472,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -451,27 +480,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; } - /* - * Remove the tablesync origin tracking if exists. - * - * The normal case origin drop is done here instead of in the - * process_syncing_tables_for_sync function because we don't - * allow to drop the origin till the process owning the origin - * is alive. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin and by this time the origin might be - * already removed. So passing missing_ok = true. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); /* - * Update the state to READY only after the origin cleanup. + * Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 1.8.3.1