From a8a6ab41ef45c7a7c8b878dff47ff23e91af59e3 Mon Sep 17 00:00:00 2001 From: sherlockcpp Date: Sat, 10 Sep 2022 16:03:41 +0800 Subject: [PATCH] Fix unexpected clean of in memory remote_lsn due to origin drop in tablesync worker If the table sync worker errored at walrcv_endstreaming(), we assumed that both dropping the replication origin and updating relstate are rolled back, which however was wrong. Actually, the replication origin is not dropped but the in-memory state is reset. Therefore, after the tablesync worker restarts, it starts logical replication with starting point 0/0. Consequently, it ends up applying the transaction that has already been applied. Fix this by dropping the origin after committing the transaction that has set the relation state to SYNCDONE so that the worker won't restart. Also, add back the origin drop code in process_syncing_tables_for_apply() to ensure that the origin will eventually be dropped even if the table sync worker failed to drop it in some rare cases. --- src/backend/commands/subscriptioncmds.c | 25 +++---- src/backend/replication/logical/tablesync.c | 81 ++++++++++++++------- 2 files changed, 66 insertions(+), 40 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 66d800f0cf..24411c2027 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -931,10 +931,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, logicalrep_worker_stop(sub->oid, relid); /* - * For READY state and SYNCDONE state, we would have already - * dropped the tablesync origin. + * For READY state, we would have already dropped the + * tablesync origin. */ - if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE) + if (state != SUBREL_STATE_READY) { char originname[NAMEDATALEN]; @@ -942,8 +942,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Drop the tablesync's origin tracking if exists. * * It is possible that the origin is not yet created for - * tablesync worker so passing missing_ok = true. This can - * happen for the states before SUBREL_STATE_FINISHEDCOPY. + * tablesync worker, this can happen for the states before + * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or apply + * worker can also concurrently try to drop the origin and + * by this time the origin might be already removed. For + * these reasons, passing missing_ok = true. */ ReplicationOriginNameForTablesync(sub->oid, relid, originname, sizeof(originname)); @@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* * Drop the tablesync's origin tracking if exists. * - * For SYNCDONE/READY states, the tablesync origin tracking is known - * to have already been dropped by the tablesync worker. - * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - if (rstate->state != SUBREL_STATE_SYNCDONE) - { - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - } + ReplicationOriginNameForTablesync(subid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); } /* Clean up dependencies */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8514835ff4..02d8d67242 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -300,7 +300,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * UpdateSubscriptionRelState must be called within a transaction. - * That transaction will be ended within the finish_sync_worker(). */ if (!IsTransactionState()) StartTransactionCommand(); @@ -310,30 +309,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - /* - * Cleanup the tablesync origin tracking. - * - * Resetting the origin session removes the ownership of the slot. - * This is needed to allow the origin to be dropped. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; - - /* - * We expect that origin must be present. The concurrent operations - * that remove origin like a refresh for the subscription take an - * access exclusive lock on pg_subscription which prevent the previous - * operation to update the rel state to SUBREL_STATE_SYNCDONE to - * succeed. - */ - replorigin_drop_by_name(originname, false, false); - /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop * the slot. @@ -359,6 +334,45 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); + CommitTransactionCommand(); + + /* + * Start a new transaction to cleanup the tablesync origin tracking. + * + * We need to do this after the table state is set to SYNCDONE, + * otherwise if an error occurs while performing the database + * operation, the worker will be restarted, but the in-memory + * replication progress(remote_lsn) has been cleaned and will not be + * rolledback, so the restarted worker will use invalid replication + * progress resulting in replay of transactions that have already been + * applied. + */ + StartTransactionCommand(); + + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* + * Resetting the origin session removes the ownership of the slot. + * This is needed to allow the origin to be dropped. + */ + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * Drop the tablesync's origin tracking if exists. + * + * There is a chance that the user is concurrently performing refresh + * for the subscription where we remove the table state and its origin + * and by this time the origin might be already removed. So passing + * missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + finish_sync_worker(); } else @@ -466,6 +480,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { + char originname[NAMEDATALEN]; + rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -475,7 +491,20 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Update the state to READY. + * Remove the tablesync origin tracking if exists. + * + * The tablesync worker can concurrently try to drop the origin + * and by this time the origin might be already removed. So + * passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + + /* + * Update the state to READY only after the origin cleanup. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, -- 2.28.0.windows.1