From ff9d7ecbf16a834f4877d71d9f1075fd2ecf927b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada
Date: Tue, 29 Oct 2024 13:21:18 -0700
Subject: [PATCH v6] Skip logical decoding of already-aborted transactions.
Previously, concurrent aborts were detected only during system catalog
scans while replaying a transaction in streaming mode.
This commit introduces an additional CLOG lookup check to determine if
a transaction is already aborted, so the logical decoding skips
further change also when it doesn't touch system catalogs. This
optimization enhances logical decoding performance, especially for
large transactions that have already been rolled back, as it avoids
unnecessary disk or network I/O.
To avoid potential slowdowns caused by frequent CLOG lookups for small
transactions (most of which commit), the CLOG lookup is performed only
for large transactions before eviction.
Reviewed-by: Andres Freund, Amit Kapila, Dilip Kumar, Vignesh C
Reviewed-by: Ajin Cherian, Peter Smith
Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com
---
contrib/test_decoding/expected/stats.out | 44 +++++-
contrib/test_decoding/expected/stream.out | 4 +-
contrib/test_decoding/sql/stats.sql | 23 ++-
contrib/test_decoding/sql/stream.sql | 2 +-
.../replication/logical/reorderbuffer.c | 144 ++++++++++++++----
src/include/replication/reorderbuffer.h | 17 ++-
6 files changed, 190 insertions(+), 44 deletions(-)
diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index 78d36429c8a..253236e3973 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -138,12 +138,48 @@ SELECT slot_name FROM pg_stat_replication_slots;
(3 rows)
COMMIT;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
+ ?column?
+----------
+ init
+(1 row)
+
+-- Execute a transaction that is prepared and aborted. We detect that the
+-- transaction is aborted before spilling changes, and then skip collecting
+-- further changes. So, the transaction should not be spilled at all.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+TRUNCATE table stats_test;
+PREPARE TRANSACTION 'test1_abort';
+ROLLBACK PREPARED 'test1_abort';
+-- Should show only ROLLBACK PREPARED.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------
+ ROLLBACK PREPARED 'test1_abort'
+(1 row)
+
+-- Check stats. We should not spill anything as the transaction is already
+-- aborted.
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush
+--------------------------
+
+(1 row)
+
+SELECT slot_name, spill_txns AS spill_txn, spill_count AS spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+ slot_name | spill_txn | spill_count
+---------------------------------+-----------+-------------
+ regression_slot_stats4_twophase | 0 | 0
+(1 row)
+
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
- pg_drop_replication_slot('regression_slot_stats3');
- pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
---------------------------+--------------------------+--------------------------
- | |
+ pg_drop_replication_slot('regression_slot_stats3'),
+ pg_drop_replication_slot('regression_slot_stats4_twophase');
+ pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
+--------------------------+--------------------------+--------------------------+--------------------------
+ | | |
(1 row)
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index a76f77601e2..0950f552c45 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -110,7 +110,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
(17 rows)
/*
- * Test concurrent abort with toast data. When streaming the second insertion, we
+ * Test concurrent abort with toast data. Before streaming the second insertion, we
* detect that the subtransaction was aborted, and reset the transaction while having
* the TOAST changes in memory, resulting in deallocating both decoded changes and
* TOAST reconstruction data. Memory usage counters must be updated correctly.
@@ -125,7 +125,7 @@ COMMIT;
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
count
-------
- 5
+ 4
(1 row)
DROP TABLE stream_test;
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 630371f147a..77113cd1942 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -50,7 +50,28 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
+
+-- Execute a transaction that is prepared and aborted. We detect that the
+-- transaction is aborted before spilling changes, and then skip collecting
+-- further changes. So, the transaction should not be spilled at all.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+TRUNCATE table stats_test;
+PREPARE TRANSACTION 'test1_abort';
+ROLLBACK PREPARED 'test1_abort';
+
+-- Should show only ROLLBACK PREPARED.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check stats. We should not spill anything as the transaction is already
+-- aborted.
+SELECT pg_stat_force_next_flush();
+SELECT slot_name, spill_txns AS spill_txn, spill_count AS spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
- pg_drop_replication_slot('regression_slot_stats3');
+ pg_drop_replication_slot('regression_slot_stats3'),
+ pg_drop_replication_slot('regression_slot_stats4_twophase');
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 7f43f0c2ab7..5d07cd583e4 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -45,7 +45,7 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
/*
- * Test concurrent abort with toast data. When streaming the second insertion, we
+ * Test concurrent abort with toast data. Before streaming the second insertion, we
* detect that the subtransaction was aborted, and reset the transaction while having
* the TOAST changes in memory, resulting in deallocating both decoded changes and
* TOAST reconstruction data. Memory usage counters must be updated correctly.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e3a5c7b660c..d1b2ec9b638 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -106,6 +106,7 @@
#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
+#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -259,7 +260,7 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *data);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
- bool txn_prepared);
+ bool txn_prepared, bool txn_streaming);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -793,11 +794,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
- * While streaming the previous changes we have detected that the
- * transaction is aborted. So there is no point in collecting further
- * changes for it.
+ * If we have detected that the transaction is aborted while streaming the
+ * previous changes or by checking its CLOG, there is no point in
+ * collecting further changes for it.
*/
- if (txn->concurrent_abort)
+ if (rbtxn_is_aborted(txn))
{
/*
* We don't need to update memory accounting for this change as we
@@ -1620,17 +1621,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Discard changes from a transaction (and subtransactions), either after
- * streaming or decoding them at PREPARE. Keep the remaining info -
- * transactions, tuplecids, invalidations and snapshots.
+ * streaming, decoding them at PREPARE, or detecting the transaction abort.
+ * Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
*
* We additionally remove tuplecids after decoding the transaction at prepare
* time as we only need to perform invalidation at rollback or commit prepared.
*
* 'txn_prepared' indicates that we have decoded the transaction at prepare
* time.
+ * 'txn_streaming' indicates that the transaction is being streamed.
*/
static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared,
+ bool txn_streaming)
{
dlist_mutable_iter iter;
Size mem_freed = 0;
@@ -1650,7 +1654,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
- ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, txn_streaming);
}
/* cleanup changes in the txn */
@@ -1680,24 +1684,6 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
- /*
- * Mark the transaction as streamed.
- *
- * The top-level transaction, is marked as streamed always, even if it
- * does not contain any changes (that is, when all the changes are in
- * subtransactions).
- *
- * For subtransactions, we only mark them as streamed when there are
- * changes in them.
- *
- * We do it this way because of aborts - we don't want to send aborts for
- * XIDs the downstream is not aware of. And of course, it always knows
- * about the toplevel xact (we send the XID in all messages), but we never
- * stream XIDs of empty subxacts.
- */
- if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
- txn->txn_flags |= RBTXN_IS_STREAMED;
-
if (txn_prepared)
{
/*
@@ -1721,6 +1707,25 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
ReorderBufferReturnChange(rb, change, true);
}
}
+ else if (txn_streaming && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
+ {
+ /*
+ * Mark the transaction as streamed, if appropriate.
+ *
+ * The top-level transaction, is marked as streamed always, even if it
+ * does not contain any changes (that is, when all the changes are in
+ * subtransactions).
+ *
+ * For subtransactions, we only mark them as streamed when there are
+ * changes in them.
+ *
+ * We do it this way because of aborts - we don't want to send aborts
+ * for XIDs the downstream is not aware of. And of course, it always
+ * knows about the toplevel xact (we send the XID in all messages),
+ * but we never stream XIDs of empty subxacts.
+ */
+ txn->txn_flags |= RBTXN_IS_STREAMED;
+ }
/*
* Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
@@ -1924,7 +1929,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* full cleanup will happen as part of the COMMIT PREPAREDs, so now
* just truncate txn by removing changes and tuplecids.
*/
- ReorderBufferTruncateTXN(rb, txn, true);
+ ReorderBufferTruncateTXN(rb, txn, true, true);
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2067,7 +2072,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true);
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -2595,7 +2600,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (streaming || rbtxn_prepared(txn))
{
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming);
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2648,7 +2653,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
FlushErrorState();
FreeErrorData(errdata);
errdata = NULL;
- curtxn->concurrent_abort = true;
+
+ /* Remember the transaction is aborted. */
+ Assert((curtxn->txn_flags & RBTXN_IS_COMMITTED) == 0);
+ curtxn->txn_flags |= RBTXN_IS_ABORTED;
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
@@ -2832,10 +2840,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
* when rollback prepared is decoded and sent, the downstream should be
* able to rollback such a xact. See comments atop DecodePrepare.
*
- * Note, for the concurrent_abort + streaming case a stream_prepare was
+ * Note, for the concurrent abort + streaming case a stream_prepare was
* already sent within the ReorderBufferReplay call above.
*/
- if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+ if (rbtxn_is_aborted(txn) && !rbtxn_is_streamed(txn))
rb->prepare(rb, txn, txn->final_lsn);
}
@@ -3620,6 +3628,68 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
return largest;
}
+/*
+ * Check the transaction status of the given transaction. If the transaction
+ * already aborted, we discard all changes accumulated so far, ignore future
+ * changes, and return true. Otherwise return false.
+ *
+ * When the 'debug_logical_replication_streaming' is set to "immediate", we
+ * don't check the transaction status, meaning the caller will always process
+ * this transaction. This mode is used by regression tests to avoid unnecessary
+ * transaction status checking.
+ */
+static bool
+ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ /* Quick return for regression tests */
+ if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
+ return false;
+
+ /*
+ * Quick return if the transaction status is already known.
+ */
+ if (rbtxn_is_aborted(txn))
+ return true;
+ if (rbtxn_is_committed(txn))
+ return false;
+
+ /* Check the transaction status using CLOG lookup */
+ if (TransactionIdIsInProgress(txn->xid))
+ return false;
+
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ /*
+ * Remember the transaction is committed so that we can skip CLOG
+ * check next time, avoiding the pressure on CLOG lookup.
+ */
+ Assert((txn->txn_flags & RBTXN_IS_ABORTED) == 0);
+ txn->txn_flags |= RBTXN_IS_COMMITTED;
+ return false;
+ }
+
+ /*
+ * The transaction aborted. We discard the changes we've collected so far,
+ * and free all resources allocated for toast reconstruction. The full
+ * cleanup will happen as part of decoding ABORT record of this
+ * transaction.
+ *
+ * We don't mark the transaction as streamed since this function can be
+ * called for non-streamed transactions too.
+ */
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false);
+ ReorderBufferToastReset(rb, txn);
+
+ /*
+ * Mark the transaction as aborted so we ignore future changes of this
+ * transaction.
+ */
+ Assert((txn->txn_flags & RBTXN_IS_COMMITTED) == 0);
+ txn->txn_flags |= RBTXN_IS_ABORTED;
+
+ return true;
+}
+
/*
* Check whether the logical_decoding_work_mem limit was reached, and if yes
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
@@ -3672,6 +3742,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->total_size > 0);
Assert(rb->size >= txn->total_size);
+ /* skip the transaction if already aborted */
+ if (ReorderBufferCheckTXNAbort(rb, txn))
+ continue;
+
ReorderBufferStreamTXN(rb, txn);
}
else
@@ -3687,6 +3761,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->size > 0);
Assert(rb->size >= txn->size);
+ /* skip the transaction if already aborted */
+ if (ReorderBufferCheckTXNAbort(rb, txn))
+ continue;
+
ReorderBufferSerializeTXN(rb, txn);
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 6ad5a8cb9c5..e4c09c86c76 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -173,6 +173,8 @@ typedef struct ReorderBufferChange
#define RBTXN_PREPARE 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
+#define RBTXN_IS_COMMITTED 0x0200
+#define RBTXN_IS_ABORTED 0x0400
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -230,6 +232,18 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
+/* Is this transaction committed? */
+#define rbtxn_is_committed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
+)
+
+/* Is this transaction aborted? */
+#define rbtxn_is_aborted(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
+)
+
/* prepare for this transaction skipped? */
#define rbtxn_skip_prepared(txn) \
( \
@@ -419,9 +433,6 @@ typedef struct ReorderBufferTXN
/* Size of top-transaction including sub-transactions. */
Size total_size;
- /* If we have detected concurrent abort then ignore future changes. */
- bool concurrent_abort;
-
/*
* Private data pointer of the output plugin.
*/
--
2.43.5