diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6a01ffd273..df0fb78139 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
TimeLineID tli;
char syncslotname[NAMEDATALEN] = {0};
+ char originname[NAMEDATALEN] = {0};
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
@@ -309,6 +310,28 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
+ /*
+ * Cleanup the tablesync origin tracking.
+ *
+ * Resetting the origin session removes the ownership of the slot.
+ * This is needed to allow the origin to be dropped.
+ *
+ * Also, there is a chance that the user is concurrently performing
+ * refresh for the subscription where we remove the table
+ * state and its origin and by this time the origin might be
+ * already removed. So passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ replorigin_session_reset();
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+
+ replorigin_drop_by_name(originname, true, false);
+
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
* the slot.
@@ -318,9 +341,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/*
* Cleanup the tablesync slot.
*
- * This has to be done after updating the state because otherwise if
- * there is an error while doing the database operations we won't be
- * able to rollback dropped slot.
+ * This has to be done after the data changes because otherwise if
+ * there is an error while doing the database operations we won't
+ * be able to rollback dropped slot.
*/
ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
@@ -441,8 +464,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
if (current_lsn >= rstate->lsn)
{
- char originname[NAMEDATALEN];
-
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
@@ -452,26 +473,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
/*
- * Remove the tablesync origin tracking if exists.
- *
- * The normal case origin drop is done here instead of in the
- * process_syncing_tables_for_sync function because we don't
- * allow to drop the origin till the process owning the origin
- * is alive.
- *
- * There is a chance that the user is concurrently performing
- * refresh for the subscription where we remove the table
- * state and its origin and by this time the origin might be
- * already removed. So passing missing_ok = true.
- */
- ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
- rstate->relid,
- originname,
- sizeof(originname));
- replorigin_drop_by_name(originname, true, false);
-
- /*
- * Update the state to READY only after the origin cleanup.
+ * Update the state to READY.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,