From afd3a4556e4992ca5fb407a66b6739f29c038fd4 Mon Sep 17 00:00:00 2001 From: sherlockcpp Date: Sat, 10 Sep 2022 16:03:41 +0800 Subject: [PATCH] Fix unexpected clean of in memory remote_lsn due to origin drop in tablesync worker If the table sync worker errored at walrcv_endstreaming(), we assumed that both dropping the replication origin and updating relstate are rolled back, which however was wrong. Actually, the replication origin is not dropped but the in-memory state is reset. Therefore, after the tablesync worker restarts, it starts logical replication with starting point 0/0. Consequently, it ends up applying the transaction that has already been applied. Fix this by dropping the origin after committing the transaction that has set the relation state to SYNCDONE so that the worker won't restart. Also, add back the origin drop code in process_syncing_tables_for_apply() to ensure that the origin will eventually be dropped even if the table sync worker failed to drop it in some rare cases. --- src/backend/commands/subscriptioncmds.c | 25 +++---- src/backend/replication/logical/tablesync.c | 80 ++++++++++++++------- 2 files changed, 66 insertions(+), 39 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 66d800f0cf..24411c2027 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -931,10 +931,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, logicalrep_worker_stop(sub->oid, relid); /* - * For READY state and SYNCDONE state, we would have already - * dropped the tablesync origin. + * For READY state, we would have already dropped the + * tablesync origin. */ - if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE) + if (state != SUBREL_STATE_READY) { char originname[NAMEDATALEN]; @@ -942,8 +942,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Drop the tablesync's origin tracking if exists. * * It is possible that the origin is not yet created for - * tablesync worker so passing missing_ok = true. This can - * happen for the states before SUBREL_STATE_FINISHEDCOPY. + * tablesync worker, this can happen for the states before + * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or apply + * worker can also concurrently try to drop the origin and + * by this time the origin might be already removed. For + * these reasons, passing missing_ok = true. */ ReplicationOriginNameForTablesync(sub->oid, relid, originname, sizeof(originname)); @@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* * Drop the tablesync's origin tracking if exists. * - * For SYNCDONE/READY states, the tablesync origin tracking is known - * to have already been dropped by the tablesync worker. - * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - if (rstate->state != SUBREL_STATE_SYNCDONE) - { - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - } + ReplicationOriginNameForTablesync(subid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); } /* Clean up dependencies */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8514835ff4..b1eae251e1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -310,30 +310,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - /* - * Cleanup the tablesync origin tracking. - * - * Resetting the origin session removes the ownership of the slot. - * This is needed to allow the origin to be dropped. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; - - /* - * We expect that origin must be present. The concurrent operations - * that remove origin like a refresh for the subscription take an - * access exclusive lock on pg_subscription which prevent the previous - * operation to update the rel state to SUBREL_STATE_SYNCDONE to - * succeed. - */ - replorigin_drop_by_name(originname, false, false); - /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop * the slot. @@ -359,6 +335,45 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); + CommitTransactionCommand(); + + /* + * Start a new transaction to cleanup the tablesync origin tracking + * after changing the state of relation to SUBREL_STATE_SYNCDONE which + * ensures that the tablesync worker will not restart. + * + * Otherwise if an error occurs while performing a database operation, + * the restarted worker would get an invalid remote_lsn from the origin + * because the in-memory remote_lsn has been cleaned up and this + * operation will not be rolled back. + */ + StartTransactionCommand(); + + /* + * Cleanup the tablesync origin tracking. + * + * Resetting the origin session removes the ownership of the slot. + * This is needed to allow the origin to be dropped. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * Drop the tablesync's origin tracking if exists. + * + * There is a chance that the user is concurrently performing refresh + * for the subscription where we remove the table state and its origin + * and by this time the origin might be already removed. So passing + * missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + finish_sync_worker(); } else @@ -466,6 +481,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { + char originname[NAMEDATALEN]; + rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -475,7 +492,20 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Update the state to READY. + * Remove the tablesync origin tracking if exists. + * + * The tablesync worker can concurrently try to drop the origin + * and by this time the origin might be already removed. So + * passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + + /* + * Update the state to READY only after the origin cleanup. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 2.28.0.windows.1