From 0ff66671719ea1296ae14d8b9a6e500f795c5eaf Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 29 Oct 2024 13:21:18 -0700 Subject: [PATCH v9] Skip logical decoding of already-aborted transactions. Previously, transaction aborts were detected concurrently only during system catalog scans while replaying a transaction in streaming mode. This commit introduces an additional CLOG lookup check to determine if a transaction is already aborted, so the logical decoding skips further change also when it doesn't touch system catalogs. This optimization enhances logical decoding performance, especially for large transactions that have already been rolled back, as it avoids unnecessary disk or network I/O. To avoid potential slowdowns caused by frequent CLOG lookups for small transactions (most of which commit), the CLOG lookup is performed only for large transactions before eviction. Reviewed-by: Andres Freund, Amit Kapila, Dilip Kumar, Vignesh C Reviewed-by: Ajin Cherian, Peter Smith Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com --- contrib/test_decoding/expected/stats.out | 42 +++- contrib/test_decoding/expected/stream.out | 6 + contrib/test_decoding/sql/stats.sql | 20 +- contrib/test_decoding/sql/stream.sql | 6 + .../replication/logical/reorderbuffer.c | 186 ++++++++++++++---- src/include/replication/reorderbuffer.h | 17 +- 6 files changed, 227 insertions(+), 50 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 78d36429c8a..de6dc416130 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -138,12 +138,46 @@ SELECT slot_name FROM pg_stat_replication_slots; (3 rows) COMMIT; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4; + ?column? +---------- + init +(1 row) + +-- The INSERT changes are large enough to be spilled but will not be, because +-- the transaction is aborted. The logical decoding skips collecting further +-- changes too. The transaction is prepared to make sure the decoding processes +-- the aborted transaction. +BEGIN; +INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i); +PREPARE TRANSACTION 'test1_abort'; +ROLLBACK PREPARED 'test1_abort'; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + count +------- + 1 +(1 row) + +-- Verify that the decoding doesn't spill already-aborted transaction's changes. +SELECT pg_stat_force_next_flush(); + pg_stat_force_next_flush +-------------------------- + +(1 row) + +SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; + slot_name | spill_txns | spill_count +---------------------------------+------------+------------- + regression_slot_stats4_twophase | 0 | 0 +(1 row) + DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), pg_drop_replication_slot('regression_slot_stats2'), - pg_drop_replication_slot('regression_slot_stats3'); - pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot ---------------------------+--------------------------+-------------------------- - | | + pg_drop_replication_slot('regression_slot_stats3'), + pg_drop_replication_slot('regression_slot_stats4_twophase'); + pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot +--------------------------+--------------------------+--------------------------+-------------------------- + | | | (1 row) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index a76f77601e2..9879e02ca84 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -114,7 +114,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl * detect that the subtransaction was aborted, and reset the transaction while having * the TOAST changes in memory, resulting in deallocating both decoded changes and * TOAST reconstruction data. Memory usage counters must be updated correctly. + * + * Set debug_logical_replication_streaming to 'immediate' to disable the transaction + * status check happening before streaming the second insertion, so we can detect a + * concurrent abort while streaming. */ +SET debug_logical_replication_streaming = immediate; BEGIN; INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); ALTER TABLE stream_test ADD COLUMN i INT; @@ -128,6 +133,7 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 5 (1 row) +RESET debug_logical_replication_streaming; DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 630371f147a..a022fe1bf07 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -50,7 +50,25 @@ SELECT slot_name FROM pg_stat_replication_slots; SELECT slot_name FROM pg_stat_replication_slots; COMMIT; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4; + +-- The INSERT changes are large enough to be spilled but will not be, because +-- the transaction is aborted. The logical decoding skips collecting further +-- changes too. The transaction is prepared to make sure the decoding processes +-- the aborted transaction. +BEGIN; +INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i); +PREPARE TRANSACTION 'test1_abort'; +ROLLBACK PREPARED 'test1_abort'; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Verify that the decoding doesn't spill already-aborted transaction's changes. +SELECT pg_stat_force_next_flush(); +SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; + DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), pg_drop_replication_slot('regression_slot_stats2'), - pg_drop_replication_slot('regression_slot_stats3'); + pg_drop_replication_slot('regression_slot_stats3'), + pg_drop_replication_slot('regression_slot_stats4_twophase'); diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index 7f43f0c2ab7..f1269403e0a 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -49,7 +49,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl * detect that the subtransaction was aborted, and reset the transaction while having * the TOAST changes in memory, resulting in deallocating both decoded changes and * TOAST reconstruction data. Memory usage counters must be updated correctly. + * + * Set debug_logical_replication_streaming to 'immediate' to disable the transaction + * status check happening before streaming the second insertion, so we can detect a + * concurrent abort while streaming. */ +SET debug_logical_replication_streaming = immediate; BEGIN; INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); ALTER TABLE stream_test ADD COLUMN i INT; @@ -58,6 +63,7 @@ INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000') ROLLBACK TO s1; COMMIT; SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +RESET debug_logical_replication_streaming; DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e3a5c7b660c..1771c713fd8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -106,6 +106,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/memutils.h" @@ -259,7 +260,8 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, - bool txn_prepared); + bool txn_prepared, bool mark_txn_streaming); +static bool ReorderBufferTruncateTXNIfAborted(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -793,11 +795,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* - * While streaming the previous changes we have detected that the - * transaction is aborted. So there is no point in collecting further - * changes for it. + * If we have detected that the transaction is aborted while streaming the + * previous changes or by checking its CLOG, there is no point in + * collecting further changes for it. */ - if (txn->concurrent_abort) + if (rbtxn_is_aborted(txn)) { /* * We don't need to update memory accounting for this change as we @@ -1620,17 +1622,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Discard changes from a transaction (and subtransactions), either after - * streaming or decoding them at PREPARE. Keep the remaining info - - * transactions, tuplecids, invalidations and snapshots. + * streaming, decoding them at PREPARE, or detecting the transaction abort. + * Keep the remaining info - transactions, tuplecids, invalidations and + * snapshots. * * We additionally remove tuplecids after decoding the transaction at prepare * time as we only need to perform invalidation at rollback or commit prepared. * + * The given transaction is marked as streamed if appropriate and the caller + * asked it by passing 'mark_txn_streaming' being true. + * * 'txn_prepared' indicates that we have decoded the transaction at prepare * time. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared, + bool mark_txn_streaming) { dlist_mutable_iter iter; Size mem_freed = 0; @@ -1650,7 +1657,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, mark_txn_streaming); } /* cleanup changes in the txn */ @@ -1680,24 +1687,6 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* Update the memory counter */ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); - /* - * Mark the transaction as streamed. - * - * The top-level transaction, is marked as streamed always, even if it - * does not contain any changes (that is, when all the changes are in - * subtransactions). - * - * For subtransactions, we only mark them as streamed when there are - * changes in them. - * - * We do it this way because of aborts - we don't want to send aborts for - * XIDs the downstream is not aware of. And of course, it always knows - * about the toplevel xact (we send the XID in all messages), but we never - * stream XIDs of empty subxacts. - */ - if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) - txn->txn_flags |= RBTXN_IS_STREAMED; - if (txn_prepared) { /* @@ -1721,6 +1710,25 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep ReorderBufferReturnChange(rb, change, true); } } + else if (mark_txn_streaming && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) + { + /* + * Mark the transaction as streamed, if appropriate. + * + * The top-level transaction, is marked as streamed always, even if it + * does not contain any changes (that is, when all the changes are in + * subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * changes in them. + * + * We do it this way because of aborts - we don't want to send aborts + * for XIDs the downstream is not aware of. And of course, it always + * knows about the toplevel xact (we send the XID in all messages), + * but we never stream XIDs of empty subxacts. + */ + txn->txn_flags |= RBTXN_IS_STREAMED; + } /* * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any @@ -1752,6 +1760,67 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep txn->nentries = 0; } +/* + * Check the transaction status by looking CLOG and discard all changes if + * the transaction is aborted. The transaction status is cached in txn->txn_flags + * so we can skip future changes and avoid CLOG lookups on the next call. Return + * true if the transaction is aborted, otherwise return false. + * + * When the 'debug_logical_replication_streaming' is set to "immediate", we + * don't check the transaction status, meaning the caller will always process + * this transaction. + */ +static bool +ReorderBufferTruncateTXNIfAborted(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* Quick return for regression tests */ + if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)) + return false; + + /* + * Quick return if the transaction status is already known. + */ + if (rbtxn_is_committed(txn)) + return false; + if (rbtxn_is_aborted(txn)) + return true; + + /* Otherwise, check the transaction status using CLOG lookup */ + + if (TransactionIdIsInProgress(txn->xid)) + return false; + + if (TransactionIdDidCommit(txn->xid)) + { + /* + * Remember the transaction is committed so that we can skip CLOG + * check next time, avoiding the pressure on CLOG lookup. + */ + Assert(!rbtxn_is_aborted(txn)); + txn->txn_flags |= RBTXN_IS_COMMITTED; + return false; + } + + /* + * The transaction aborted. We discard the changes we've collected so far. + * The full cleanup will happen as part of decoding ABORT record of this + * transaction. + * + * Since we don't check the transaction status while replaying the + * transaction, we don't need to reset toast reconstruction data here. + */ + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false); + + /* + * Mark the transaction as aborted so we ignore future changes of this + * transaction. + */ + Assert(!rbtxn_is_committed(txn)); + txn->txn_flags |= RBTXN_IS_ABORTED; + + return true; +} + /* * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by * HeapTupleSatisfiesHistoricMVCC. @@ -1924,7 +1993,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) * full cleanup will happen as part of the COMMIT PREPAREDs, so now * just truncate txn by removing changes and tuplecids. */ - ReorderBufferTruncateTXN(rb, txn, true); + ReorderBufferTruncateTXN(rb, txn, true, true); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2054,10 +2123,10 @@ ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * Helper function for ReorderBufferProcessTXN to handle the concurrent - * abort of the streaming transaction. This resets the TXN such that it - * can be used to stream the remaining data of transaction being processed. - * This can happen when the subtransaction is aborted and we still want to - * continue processing the main or other subtransactions data. + * abort of the streaming (prepared) transaction. This resets the TXN such + * that it can be used to stream the remaining data of transaction being + * processed. This can happen when the subtransaction is aborted and we + * still want to continue processing the main or other subtransactions data. */ static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -2067,7 +2136,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -2595,7 +2664,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (streaming || rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2648,7 +2717,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, FlushErrorState(); FreeErrorData(errdata); errdata = NULL; - curtxn->concurrent_abort = true; + + /* Remember the transaction is aborted. */ + Assert(!rbtxn_is_committed(curtxn)); + curtxn->txn_flags |= RBTXN_IS_ABORTED; /* Reset the TXN so that it is allowed to stream remaining data. */ ReorderBufferResetTXN(rb, txn, snapshot_now, @@ -2810,6 +2882,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid) { ReorderBufferTXN *txn; + bool already_aborted; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -2824,6 +2897,12 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, /* The prepare info must have been updated in txn by now. */ Assert(txn->final_lsn != InvalidXLogRecPtr); + /* + * Remember if the transaction is already aborted to check if we detect + * that the transaction is concurrently aborted during the replay. + */ + already_aborted = rbtxn_is_aborted(txn); + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); @@ -2832,10 +2911,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, * when rollback prepared is decoded and sent, the downstream should be * able to rollback such a xact. See comments atop DecodePrepare. * - * Note, for the concurrent_abort + streaming case a stream_prepare was + * Note, for the concurrent abort + streaming case a stream_prepare was * already sent within the ReorderBufferReplay call above. */ - if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) + if (!already_aborted && rbtxn_is_aborted(txn) && !rbtxn_is_streamed(txn)) rb->prepare(rb, txn, txn->final_lsn); } @@ -3566,7 +3645,8 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) } /* - * Find the largest streamable toplevel transaction to evict (by streaming). + * Find the largest streamable (and non-aborted) toplevel transaction to evict + * (by streaming). * * This can be seen as an optimized version of ReorderBufferLargestTXN, which * should give us the same transaction (because we don't update memory account @@ -3608,9 +3688,15 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) /* base_snapshot must be set */ Assert(txn->base_snapshot != NULL); + /* Don't consider these kinds of transactions for eviction. */ + if (rbtxn_has_partial_change(txn) || + !rbtxn_has_streamable_change(txn) || + rbtxn_is_aborted(txn)) + continue; + + /* Find the largest of the eviction candidates. */ if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) && - rbtxn_has_streamable_change(txn)) + (txn->total_size > 0)) { largest = txn; largest_size = txn->total_size; @@ -3661,8 +3747,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) rb->size > 0)) { /* - * Pick the largest transaction and evict it from memory by streaming, - * if possible. Otherwise, spill to disk. + * Pick the largest non-aborted transaction and evict it from memory + * by streaming, if possible. Otherwise, spill to disk. */ if (ReorderBufferCanStartStreaming(rb) && (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) @@ -3672,6 +3758,14 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); + /* skip the transaction if aborted */ + if (ReorderBufferTruncateTXNIfAborted(rb, txn)) + { + /* All changes should be discarded */ + Assert(txn->size == 0 && txn->total_size == 0); + continue; + } + ReorderBufferStreamTXN(rb, txn); } else @@ -3687,6 +3781,14 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->size > 0); Assert(rb->size >= txn->size); + /* skip the transaction if aborted */ + if (ReorderBufferTruncateTXNIfAborted(rb, txn)) + { + /* All changes should be discarded */ + Assert(txn->size == 0 && txn->total_size == 0); + continue; + } + ReorderBufferSerializeTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 6ad5a8cb9c5..e4c09c86c76 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -173,6 +173,8 @@ typedef struct ReorderBufferChange #define RBTXN_PREPARE 0x0040 #define RBTXN_SKIPPED_PREPARE 0x0080 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 +#define RBTXN_IS_COMMITTED 0x0200 +#define RBTXN_IS_ABORTED 0x0400 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -230,6 +232,18 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ ) +/* Is this transaction committed? */ +#define rbtxn_is_committed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \ +) + +/* Is this transaction aborted? */ +#define rbtxn_is_aborted(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \ +) + /* prepare for this transaction skipped? */ #define rbtxn_skip_prepared(txn) \ ( \ @@ -419,9 +433,6 @@ typedef struct ReorderBufferTXN /* Size of top-transaction including sub-transactions. */ Size total_size; - /* If we have detected concurrent abort then ignore future changes. */ - bool concurrent_abort; - /* * Private data pointer of the output plugin. */ -- 2.43.5