From 9b12fbff5c08726ee50d94aceaf3bcff76e1b9ab Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 29 Oct 2024 13:21:18 -0700 Subject: [PATCH v6] Skip logical decoding of already-aborted transactions. Previously, transaction aborts were detected concurrently only during system catalog scans while replaying a transaction in streaming mode. This commit introduces an additional CLOG lookup check to determine if a transaction is already aborted, so the logical decoding skips further change also when it doesn't touch system catalogs. This optimization enhances logical decoding performance, especially for large transactions that have already been rolled back, as it avoids unnecessary disk or network I/O. To avoid potential slowdowns caused by frequent CLOG lookups for small transactions (most of which commit), the CLOG lookup is performed only for large transactions before eviction. Reviewed-by: Andres Freund, Amit Kapila, Dilip Kumar, Vignesh C Reviewed-by: Ajin Cherian, Peter Smith Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com --- contrib/test_decoding/expected/stats.out | 41 ++++- contrib/test_decoding/expected/stream.out | 6 + contrib/test_decoding/sql/stats.sql | 19 +- contrib/test_decoding/sql/stream.sql | 6 + .../replication/logical/reorderbuffer.c | 168 ++++++++++++++---- src/include/replication/reorderbuffer.h | 17 +- 6 files changed, 212 insertions(+), 45 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 78d36429c8a..1fe9c5f190a 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -138,12 +138,45 @@ SELECT slot_name FROM pg_stat_replication_slots; (3 rows) COMMIT; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4; + ?column? +---------- + init +(1 row) + +-- Execute a transaction that is prepared and aborted. We detect that the +-- transaction is aborted before spilling changes, and then skip collecting +-- further changes. +BEGIN; +INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i); +PREPARE TRANSACTION 'test1_abort'; +ROLLBACK PREPARED 'test1_abort'; +-- Check if the transaction is not spilled as it's already aborted. +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + count +------- + 1 +(1 row) + +SELECT pg_stat_force_next_flush(); + pg_stat_force_next_flush +-------------------------- + +(1 row) + +SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; + slot_name | spill_txns | spill_count +---------------------------------+------------+------------- + regression_slot_stats4_twophase | 0 | 0 +(1 row) + DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), pg_drop_replication_slot('regression_slot_stats2'), - pg_drop_replication_slot('regression_slot_stats3'); - pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot ---------------------------+--------------------------+-------------------------- - | | + pg_drop_replication_slot('regression_slot_stats3'), + pg_drop_replication_slot('regression_slot_stats4_twophase'); + pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot +--------------------------+--------------------------+--------------------------+-------------------------- + | | | (1 row) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index a76f77601e2..9879e02ca84 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -114,7 +114,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl * detect that the subtransaction was aborted, and reset the transaction while having * the TOAST changes in memory, resulting in deallocating both decoded changes and * TOAST reconstruction data. Memory usage counters must be updated correctly. + * + * Set debug_logical_replication_streaming to 'immediate' to disable the transaction + * status check happening before streaming the second insertion, so we can detect a + * concurrent abort while streaming. */ +SET debug_logical_replication_streaming = immediate; BEGIN; INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); ALTER TABLE stream_test ADD COLUMN i INT; @@ -128,6 +133,7 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 5 (1 row) +RESET debug_logical_replication_streaming; DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 630371f147a..f2df0fe869c 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -50,7 +50,24 @@ SELECT slot_name FROM pg_stat_replication_slots; SELECT slot_name FROM pg_stat_replication_slots; COMMIT; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4; + +-- Execute a transaction that is prepared and aborted. We detect that the +-- transaction is aborted before spilling changes, and then skip collecting +-- further changes. +BEGIN; +INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i); +PREPARE TRANSACTION 'test1_abort'; +ROLLBACK PREPARED 'test1_abort'; + +-- Check if the transaction is not spilled as it's already aborted. +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_stat_force_next_flush(); +SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; + DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), pg_drop_replication_slot('regression_slot_stats2'), - pg_drop_replication_slot('regression_slot_stats3'); + pg_drop_replication_slot('regression_slot_stats3'), + pg_drop_replication_slot('regression_slot_stats4_twophase'); diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index 7f43f0c2ab7..f1269403e0a 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -49,7 +49,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl * detect that the subtransaction was aborted, and reset the transaction while having * the TOAST changes in memory, resulting in deallocating both decoded changes and * TOAST reconstruction data. Memory usage counters must be updated correctly. + * + * Set debug_logical_replication_streaming to 'immediate' to disable the transaction + * status check happening before streaming the second insertion, so we can detect a + * concurrent abort while streaming. */ +SET debug_logical_replication_streaming = immediate; BEGIN; INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); ALTER TABLE stream_test ADD COLUMN i INT; @@ -58,6 +63,7 @@ INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000') ROLLBACK TO s1; COMMIT; SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +RESET debug_logical_replication_streaming; DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e3a5c7b660c..96cf4eef3f1 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -106,6 +106,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/memutils.h" @@ -259,11 +260,12 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, - bool txn_prepared); + bool txn_prepared, bool txn_streaming); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg); +static bool ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -793,11 +795,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* - * While streaming the previous changes we have detected that the - * transaction is aborted. So there is no point in collecting further - * changes for it. + * If we have detected that the transaction is aborted while streaming the + * previous changes or by checking its CLOG, there is no point in + * collecting further changes for it. */ - if (txn->concurrent_abort) + if (rbtxn_is_aborted(txn)) { /* * We don't need to update memory accounting for this change as we @@ -1620,17 +1622,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Discard changes from a transaction (and subtransactions), either after - * streaming or decoding them at PREPARE. Keep the remaining info - - * transactions, tuplecids, invalidations and snapshots. + * streaming, decoding them at PREPARE, or detecting the transaction abort. + * Keep the remaining info - transactions, tuplecids, invalidations and + * snapshots. * * We additionally remove tuplecids after decoding the transaction at prepare * time as we only need to perform invalidation at rollback or commit prepared. * * 'txn_prepared' indicates that we have decoded the transaction at prepare * time. + * 'txn_streaming' indicates that the transaction is being streamed. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared, + bool txn_streaming) { dlist_mutable_iter iter; Size mem_freed = 0; @@ -1650,7 +1655,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, txn_streaming); } /* cleanup changes in the txn */ @@ -1680,24 +1685,6 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* Update the memory counter */ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); - /* - * Mark the transaction as streamed. - * - * The top-level transaction, is marked as streamed always, even if it - * does not contain any changes (that is, when all the changes are in - * subtransactions). - * - * For subtransactions, we only mark them as streamed when there are - * changes in them. - * - * We do it this way because of aborts - we don't want to send aborts for - * XIDs the downstream is not aware of. And of course, it always knows - * about the toplevel xact (we send the XID in all messages), but we never - * stream XIDs of empty subxacts. - */ - if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) - txn->txn_flags |= RBTXN_IS_STREAMED; - if (txn_prepared) { /* @@ -1721,6 +1708,25 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep ReorderBufferReturnChange(rb, change, true); } } + else if (txn_streaming && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) + { + /* + * Mark the transaction as streamed, if appropriate. + * + * The top-level transaction, is marked as streamed always, even if it + * does not contain any changes (that is, when all the changes are in + * subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * changes in them. + * + * We do it this way because of aborts - we don't want to send aborts + * for XIDs the downstream is not aware of. And of course, it always + * knows about the toplevel xact (we send the XID in all messages), + * but we never stream XIDs of empty subxacts. + */ + txn->txn_flags |= RBTXN_IS_STREAMED; + } /* * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any @@ -1924,7 +1930,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) * full cleanup will happen as part of the COMMIT PREPAREDs, so now * just truncate txn by removing changes and tuplecids. */ - ReorderBufferTruncateTXN(rb, txn, true); + ReorderBufferTruncateTXN(rb, txn, true, true); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2067,7 +2073,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -2595,7 +2601,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (streaming || rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2648,7 +2654,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, FlushErrorState(); FreeErrorData(errdata); errdata = NULL; - curtxn->concurrent_abort = true; + + /* Remember the transaction is aborted. */ + Assert(!rbtxn_is_committed(curtxn)); + curtxn->txn_flags |= RBTXN_IS_ABORTED; /* Reset the TXN so that it is allowed to stream remaining data. */ ReorderBufferResetTXN(rb, txn, snapshot_now, @@ -2810,6 +2819,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid) { ReorderBufferTXN *txn; + bool already_aborted; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -2824,6 +2834,12 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, /* The prepare info must have been updated in txn by now. */ Assert(txn->final_lsn != InvalidXLogRecPtr); + /* + * Remember if the transaction is already aborted to check if we detect + * that the transaction is concurrently aborted during the replay. + */ + already_aborted = rbtxn_is_aborted(txn); + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); @@ -2832,10 +2848,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, * when rollback prepared is decoded and sent, the downstream should be * able to rollback such a xact. See comments atop DecodePrepare. * - * Note, for the concurrent_abort + streaming case a stream_prepare was + * Note, for the concurrent abort + streaming case a stream_prepare was * already sent within the ReorderBufferReplay call above. */ - if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) + if (!already_aborted && rbtxn_is_aborted(txn) && !rbtxn_is_streamed(txn)) rb->prepare(rb, txn, txn->final_lsn); } @@ -3566,7 +3582,8 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) } /* - * Find the largest streamable toplevel transaction to evict (by streaming). + * Find the largest streamable (and non-aborted) toplevel transaction to evict + * (by streaming). * * This can be seen as an optimized version of ReorderBufferLargestTXN, which * should give us the same transaction (because we don't update memory account @@ -3610,7 +3627,7 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) if ((largest == NULL || txn->total_size > largest_size) && (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) && - rbtxn_has_streamable_change(txn)) + rbtxn_has_streamable_change(txn) && !(rbtxn_is_aborted(txn))) { largest = txn; largest_size = txn->total_size; @@ -3620,6 +3637,67 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) return largest; } +/* + * Check the transaction status of the given transaction. If the transaction + * already aborted, we discard all changes accumulated so far, ignore future + * changes, and return true. Otherwise return false. + * + * When the 'debug_logical_replication_streaming' is set to "immediate", we + * don't check the transaction status, meaning the caller will always process + * this transaction. + */ +static bool +ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* Quick return for regression tests */ + if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)) + return false; + + /* + * Quick return if the transaction status is already known. + */ + if (rbtxn_is_committed(txn)) + return false; + if (rbtxn_is_aborted(txn)) + return true; + + /* Otherwise, check the transaction status using CLOG lookup */ + + if (TransactionIdIsInProgress(txn->xid)) + return false; + + if (TransactionIdDidCommit(txn->xid)) + { + /* + * Remember the transaction is committed so that we can skip CLOG + * check next time, avoiding the pressure on CLOG lookup. + */ + Assert(!rbtxn_is_aborted(txn)); + txn->txn_flags |= RBTXN_IS_COMMITTED; + return false; + } + + /* + * The transaction aborted. We discard the changes we've collected so far, + * and free all resources allocated for toast reconstruction. The full + * cleanup will happen as part of decoding ABORT record of this + * transaction. + * + * Since we don't check the transaction status while replaying the + * transaction, we don't need to reset toast reconstruction data here. + */ + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), rbtxn_is_streamed(txn)); + + /* + * Mark the transaction as aborted so we ignore future changes of this + * transaction. + */ + Assert(!rbtxn_is_committed(txn)); + txn->txn_flags |= RBTXN_IS_ABORTED; + + return true; +} + /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to @@ -3661,8 +3739,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) rb->size > 0)) { /* - * Pick the largest transaction and evict it from memory by streaming, - * if possible. Otherwise, spill to disk. + * Pick the largest non-aborted transaction and evict it from memory + * by streaming, if possible. Otherwise, spill to disk. */ if (ReorderBufferCanStartStreaming(rb) && (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) @@ -3672,6 +3750,14 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); + /* skip the transaction if already aborted */ + if (ReorderBufferCheckTXNAbort(rb, txn)) + { + /* All changes should be truncated */ + Assert(txn->size == 0 && txn->total_size == 0); + continue; + } + ReorderBufferStreamTXN(rb, txn); } else @@ -3687,6 +3773,14 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->size > 0); Assert(rb->size >= txn->size); + /* skip the transaction if already aborted */ + if (ReorderBufferCheckTXNAbort(rb, txn)) + { + /* All changes should be truncated */ + Assert(txn->size == 0 && txn->total_size == 0); + continue; + } + ReorderBufferSerializeTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 6ad5a8cb9c5..e4c09c86c76 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -173,6 +173,8 @@ typedef struct ReorderBufferChange #define RBTXN_PREPARE 0x0040 #define RBTXN_SKIPPED_PREPARE 0x0080 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 +#define RBTXN_IS_COMMITTED 0x0200 +#define RBTXN_IS_ABORTED 0x0400 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -230,6 +232,18 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ ) +/* Is this transaction committed? */ +#define rbtxn_is_committed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \ +) + +/* Is this transaction aborted? */ +#define rbtxn_is_aborted(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \ +) + /* prepare for this transaction skipped? */ #define rbtxn_skip_prepared(txn) \ ( \ @@ -419,9 +433,6 @@ typedef struct ReorderBufferTXN /* Size of top-transaction including sub-transactions. */ Size total_size; - /* If we have detected concurrent abort then ignore future changes. */ - bool concurrent_abort; - /* * Private data pointer of the output plugin. */ -- 2.43.5