From c81b2168241a0acb470bab5d439617e70cb716cb Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 25 Jul 2022 07:35:21 -0400 Subject: [PATCH v2] fix excessive replicating origin slots issue Move the removal of the replicating origin slot from the apply worker to the tablesync worker. --- src/backend/replication/logical/tablesync.c | 46 ++++++++++++++++------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 670c6fc..ad474cf 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; @@ -316,6 +317,29 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); /* + * Cleanup the tablesync origin tracking if exists. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* reset the origin session before dropping */ + replorigin_session_reset(); + + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * There is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + + /* * Cleanup the tablesync slot. * * This has to be done after updating the state because otherwise if @@ -441,8 +465,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -451,27 +473,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; } - /* - * Remove the tablesync origin tracking if exists. - * - * The normal case origin drop is done here instead of in the - * process_syncing_tables_for_sync function because we don't - * allow to drop the origin till the process owning the origin - * is alive. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin and by this time the origin might be - * already removed. So passing missing_ok = true. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); /* - * Update the state to READY only after the origin cleanup. + * Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 1.8.3.1