From c846dea7369c953ce62fa68c6b6cb0e961a9d5ee Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 13 Jan 2025 10:35:17 -0800 Subject: [PATCH v16 2/2] Rename RBTXN_PREPARE to RBTXN_IS_PREPARE for better clarification. Previously, RBTXN_PREPARE flag and rbtxn_prepared macro could be misinterpreted as either indicating the transaction type (e.g. a prepared transaction or a normal transaction) or its current state (e.g. skipped or its prepare message is sent), especially after commit XXX introduced the RBTXN_SENT_PREPARE flag and the rbtxn_sent_prepare macro. The RBTXN_PREPARE flag (and its corresponding macro) have been renamed to RBTXN_IS_PREPARE to explicitly indicate the transaction type. Therefore, this commit also adds the RBTXN_IS_PREAPRE flag also to the transaction that is a prepared transaction and has been skipped, which previously had only the RBTXN_SKIPPED_PREPARE flag. Reviewed-by: Amit Kapila, Peter Smith Discussion: https://postgr.es/m/CAA4eK1KgNmBsG%3D155E7QQ6TX9RoWnM4z5Z20SvsbwxSe_QXYsg%40mail.gmail.com --- src/backend/replication/logical/proto.c | 2 +- .../replication/logical/reorderbuffer.c | 46 ++++++++++++------- src/backend/replication/logical/snapbuild.c | 2 +- src/include/replication/reorderbuffer.h | 6 +-- 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index bef350714db..61b5283a2e1 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -163,7 +163,7 @@ logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, * which case we expect to have a valid GID. */ Assert(txn->gid != NULL); - Assert(rbtxn_prepared(txn)); + Assert(rbtxn_is_prepared(txn)); Assert(TransactionIdIsValid(txn->xid)); /* send the flags field */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8278e6f2223..92d2d7e6c69 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1793,7 +1793,7 @@ ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn * and the toast reconstruction data. The full cleanup will happen as part * of decoding ABORT record of this transaction. */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); ReorderBufferToastReset(rb, txn); /* All changes should be discarded */ @@ -1968,7 +1968,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - if (rbtxn_prepared(txn)) + if (rbtxn_is_prepared(txn)) { /* * Note, we send stream prepare even if a concurrent abort is @@ -2150,7 +2150,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -2238,7 +2238,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (!streaming) { - if (rbtxn_prepared(txn)) + if (rbtxn_is_prepared(txn)) rb->begin_prepare(rb, txn); else rb->begin(rb, txn); @@ -2280,7 +2280,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * required for the cases when we decode the changes before the * COMMIT record is processed. */ - if (streaming || rbtxn_prepared(change->txn)) + if (streaming || rbtxn_is_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2625,7 +2625,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * Call either PREPARE (for two-phase transactions) or COMMIT (for * regular ones). */ - if (rbtxn_prepared(txn)) + if (rbtxn_is_prepared(txn)) { rb->prepare(rb, txn, commit_lsn); txn->txn_flags |= RBTXN_SENT_PREPARE; @@ -2679,12 +2679,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * For 4, as the entire txn has been decoded, we can fully clean up * the TXN reorder buffer. */ - if (streaming || rbtxn_prepared(txn)) + if (streaming || rbtxn_is_prepared(txn)) { if (streaming) ReorderBufferMaybeMarkTXNStreamed(rb, txn); - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2728,7 +2728,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * during a two-phase commit. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK && - (stream_started || rbtxn_prepared(txn))) + (stream_started || rbtxn_is_prepared(txn))) { /* curtxn must be set for streaming or prepared transactions */ Assert(curtxn); @@ -2815,7 +2815,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn, * Removing this txn before a commit might result in the computation * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts. */ - if (!rbtxn_prepared(txn)) + if (!rbtxn_is_prepared(txn)) ReorderBufferCleanupTXN(rb, txn); return; } @@ -2852,7 +2852,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* - * Record the prepare information for a transaction. + * Record the prepare information for a transaction. Also, mark the transaction + * as a prepared transaction. */ bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, @@ -2878,6 +2879,11 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; + /* Mark this transaction as a prepared transaction */ + Assert((txn->txn_flags & + (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)) == 0); + txn->txn_flags |= RBTXN_IS_PREPARED; + return true; } @@ -2893,6 +2899,9 @@ ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid) if (txn == NULL) return; + /* txn must have been marked as a prepared transaction */ + Assert((txn->txn_flags & RBTXN_IS_PREPARED) != 0); + txn->txn_flags |= RBTXN_SKIPPED_PREPARE; } @@ -2914,12 +2923,17 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, if (txn == NULL) return; - txn->txn_flags |= RBTXN_PREPARE; - txn->gid = pstrdup(gid); - - /* The prepare info must have been updated in txn by now. */ + /* + * txn must have been marked as a prepared transaction and must have + * neither been skipped nor sent a prepare. Also, the prepare info must + * have been updated in it by now. + */ + Assert((txn->txn_flags & RBTXN_IS_PREPARED) != 0); + Assert((txn->txn_flags & (RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)) == 0); Assert(txn->final_lsn != InvalidXLogRecPtr); + txn->gid = pstrdup(gid); + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); @@ -2975,7 +2989,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, */ if ((txn->final_lsn < two_phase_at) && is_commit) { - txn->txn_flags |= RBTXN_PREPARE; + txn->txn_flags |= RBTXN_IS_PREPARED; /* * The prepare info must have been updated in txn even if we skip diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index bbedd3de318..05687fd75e5 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -761,7 +761,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * We don't need to add snapshot to prepared transactions as they * should not see the new catalog contents. */ - if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) + if (rbtxn_is_prepared(txn)) continue; elog(DEBUG2, "adding a new snapshot to %u at %X/%X", diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 9d9ac2f0830..27d134198e3 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -170,7 +170,7 @@ typedef struct ReorderBufferChange #define RBTXN_IS_SERIALIZED_CLEAR 0x0008 #define RBTXN_IS_STREAMED 0x0010 #define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 +#define RBTXN_IS_PREPARED 0x0040 #define RBTXN_SKIPPED_PREPARE 0x0080 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 #define RBTXN_SENT_PREPARE 0x0200 @@ -234,9 +234,9 @@ typedef struct ReorderBufferChange * committed. To check whether a prepare or a stream_prepare has already * been sent for this transaction, we need to use rbtxn_sent_prepare(). */ -#define rbtxn_prepared(txn) \ +#define rbtxn_is_prepared(txn) \ ( \ - ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ + ((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \ ) /* Has a prepare or stream_prepare already been sent? */ -- 2.43.5