From c5c78f5d53d375f7a79b2561c551f7bb3ff57717 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 3 Jul 2023 10:28:00 +0900 Subject: [PATCH v3] Skip logical decoding of already-aborted transactions. If we detect a concurrent abort of a streaming transaction, we discard all changes and skip decoding further changes of the transaction. This commit introduces a new check if a (streaming or non-streaming) transaction is already aborted by CLOG lookup, enabling us to skip decoding further changes of the transaction. This helps a lot in logical decoding performance in a case where the transaction is large and already rolled back since we can save disk or network I/O. We do this new check for only large-transactions when eviction since checking CLOG is costly and could cause a slowdown with lots of small transactions, where most transactions commit. Reviewed-by: Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com --- .../replication/logical/reorderbuffer.c | 98 ++++++++++++++++--- src/include/replication/reorderbuffer.h | 13 ++- 2 files changed, 94 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index bbf0966182..f3284708bf 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -100,6 +100,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -256,7 +257,7 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, - bool txn_prepared); + bool txn_prepared, bool streaming); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -777,11 +778,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* - * While streaming the previous changes we have detected that the - * transaction is aborted. So there is no point in collecting further - * changes for it. + * If we have detected that the transaction is aborted while streaming the + * previous changes or by checking its CLOG, there is no point in + * collecting further changes for it. */ - if (txn->concurrent_abort) + if (txn->aborted) { /* * We don't need to update memory accounting for this change as we @@ -1600,9 +1601,12 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * * 'txn_prepared' indicates that we have decoded the transaction at prepare * time. + * + * 'streaming_txn' indicates that the given transaction is a streaming transaction. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared, + bool streaming_txn) { dlist_mutable_iter iter; @@ -1621,7 +1625,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, streaming_txn); } /* cleanup changes in the txn */ @@ -1655,7 +1659,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) + if (streaming_txn && (!txn_prepared) && + (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; if (txn_prepared) @@ -1884,7 +1889,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) * full cleanup will happen as part of the COMMIT PREPAREDs, so now * just truncate txn by removing changes and tuplecids. */ - ReorderBufferTruncateTXN(rb, txn, true); + ReorderBufferTruncateTXN(rb, txn, true, true); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2027,7 +2032,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -2552,7 +2557,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (streaming || rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2605,7 +2610,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, FlushErrorState(); FreeErrorData(errdata); errdata = NULL; - curtxn->concurrent_abort = true; + curtxn->aborted = true; /* Reset the TXN so that it is allowed to stream remaining data. */ ReorderBufferResetTXN(rb, txn, snapshot_now, @@ -2789,10 +2794,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, * when rollback prepared is decoded and sent, the downstream should be * able to rollback such a xact. See comments atop DecodePrepare. * - * Note, for the concurrent_abort + streaming case a stream_prepare was - * already sent within the ReorderBufferReplay call above. + * Note, for the abort + streaming case a stream_prepare was already sent + * within the ReorderBufferReplay call above. */ - if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) + if (txn->aborted && !rbtxn_is_streamed(txn)) rb->prepare(rb, txn, txn->final_lsn); } @@ -3558,6 +3563,63 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) return largest; } +/* + * Check the transaction status of the given transaction. If the transaction + * already aborted, we discards all changes accumulated so far and ignore + * future changes, and return true. Otherwise return false. + * + * If logical_replication_mode is set to "immediate", we disable this check + * for regression tests. + */ +static bool +ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* + * If logical_replication_mode is "immediate", we don't check the + * transaction status so the caller always process this transaction. + */ + if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE) + return false; + + /* Quick return if we've already knew the transaction status */ + if (txn->aborted) + return true; + + if (txn->committed) + return false; + + /* Check the transaction status using CLOG lookup */ + if (TransactionIdIsInProgress(txn->xid)) + return false; + + if (TransactionIdDidCommit(txn->xid)) + { + /* + * Remember the transaction is committed so that we can skip CLOG + * check next time, avoiding the pressure on CLOG lookup. + */ + txn->committed = true; + return false; + } + + /* + * The transaction aborted. We discard the changes we've collected so far, + * and free all resources allocated for toast reconstruction. The full + * cleanup will happen as part of decoding ABORT record of this + * transaction. + */ + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false); + ReorderBufferToastReset(rb, txn); + + /* + * Mark the transaction as aborted so we ignore future changes of this + * transaction. + */ + txn->aborted = true; + + return true; +} + /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to @@ -3610,6 +3672,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); + if (ReorderBufferCheckTXNAbort(rb, txn)) + continue; + ReorderBufferStreamTXN(rb, txn); } else @@ -3625,6 +3690,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->size > 0); Assert(rb->size >= txn->size); + if (ReorderBufferCheckTXNAbort(rb, txn)) + continue; + ReorderBufferSerializeTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0b2c95f7aa..fe7874bc10 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -409,8 +409,17 @@ typedef struct ReorderBufferTXN /* Size of top-transaction including sub-transactions. */ Size total_size; - /* If we have detected concurrent abort then ignore future changes. */ - bool concurrent_abort; + /* + * True if the transaction committed. Then we skip transaction status + * check for this transaction. + */ + bool committed; + + /* + * True if the transaction (concurrently) aborted. Then we ignore + * future changes. + */ + bool aborted; /* * Private data pointer of the output plugin. -- 2.39.3