From c367eac75a96a8d450c18e516fa675691187da48 Mon Sep 17 00:00:00 2001 From: songyutao Date: Sun, 14 Jan 2024 14:27:40 +0800 Subject: [PATCH v2] fix logical replication data sync bug --- src/backend/replication/logical/decode.c | 12 +++++++ .../replication/logical/reorderbuffer.c | 31 +++++++++++++++++++ src/include/replication/reorderbuffer.h | 4 +++ 3 files changed, 47 insertions(+) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index b3f8f908d1..f31756cc40 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -685,6 +685,18 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, parsed->nsubxacts, parsed->subxacts, parsed->xinfo); + /* + * When Alter Publication is performed during DML transaction, catalog + * snapshot is updated later than cache invalidation. As a result, DML + * operations in the transaction cannot be published. Adding new + * invalidations after SnapBuildDistributeNewCatalogSnapshot will recheck + * whether it should been published or not. + */ + if (ReorderBufferXidHasCatalogChanges(ctx->reorder, xid)) + { + ReorderBufferDistributeInvalidation(ctx->reorder, buf->origptr, xid); + } + /* ---- * Check whether we are interested in this specific transaction, and tell * the reorderbuffer to forget the content of the (sub-)transactions diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index d1334ffb55..4a50417d96 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2625,6 +2625,37 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, PG_END_TRY(); } + +/* + * Add all invalidations contained in specified ReorderBufferTXN to all + * currently in-progress transactions. + */ +void +ReorderBufferDistributeInvalidation(ReorderBuffer *reorder, XLogRecPtr lsn, + TransactionId xid) +{ + dlist_iter iter; + ReorderBufferTXN *txn = NULL; + Size nmsgs; + SharedInvalidationMessage *msgs = NULL; + + txn = ReorderBufferTXNByXid(reorder, xid, false, false, lsn, false); + nmsgs = txn->ninvalidations; + msgs = txn->invalidations; + + dlist_foreach(iter, &reorder->toplevel_by_lsn) + { + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, + iter.cur); + + Assert(TransactionIdIsValid(cur_txn->xid)); + if (xid == cur_txn->xid) + continue; + + ReorderBufferAddInvalidations(reorder, cur_txn->xid, lsn, nmsgs, msgs); + } +} + /* * Perform the replay of a transaction and its non-aborted subtransactions. * diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3e232c6c27..b94873f50b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -750,4 +750,8 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr); extern void StartupReorderBuffer(void); +extern void ReorderBufferDistributeInvalidation(ReorderBuffer *reorder, + XLogRecPtr lsn, + TransactionId xid); + #endif -- 2.43.0