From 2de0bc1beafbb1852c64df3133f57fa2e2ff91a3 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada
Date: Mon, 3 Jul 2023 10:28:00 +0900
Subject: [PATCH v4] Skip logical decoding of already-aborted transactions.
Currently, concurrent aborts are detected only during system catalog
scans while replaying a transaction. This commit introduces an
additional check to determine if a transaction is already aborted by a
CLOG lookup, so the logical decoding skips further change also when it
doesn't touch system catalogs.
This optimization enhances logical decoding performance, especially
for large transactions that have already been rolled back, as it
avoids unnecessary disk or network I/O.
To avoid potential slowdowns caused by frequent CLOG lookups for small
transactions (most of which commit), the CLOG lookup is performed only
for large transactions before eviction.
Reviewed-by: Andres Freund, Amit Kapila, Dilip Kumar, Vignesh C,
Ajin Cherian
Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com
---
contrib/test_decoding/sql/stats.sql | 22 +++-
.../replication/logical/reorderbuffer.c | 119 +++++++++++++++---
src/include/replication/reorderbuffer.h | 18 ++-
3 files changed, 137 insertions(+), 22 deletions(-)
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 630371f147..7e05f39fc5 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -50,7 +50,27 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
+
+-- Execute a transaction that is prepared and aborted. We detect that the
+-- transaction is aborted before spilling changes, and skip to collect
+-- further changes. So the transaction should not be spilled at all.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+TRUNCATE table stats_test;
+PREPARE TRANSACTION 'test1_abort';
+ROLLBACK PREPARED 'test1_abort';
+-- should show only ROLLBACK PREAPRED.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check stats. We should not spill anything as the transaction is already
+-- aborted.
+SELECT pg_stat_force_next_flush();
+SELECT slot_name, spill_txns = 0 AS spill_txn, spill_count = 0 AS spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
- pg_drop_replication_slot('regression_slot_stats3');
+ pg_drop_replication_slot('regression_slot_stats3'),
+ pg_drop_replication_slot('regression_slot_stats4_twophase');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 92cf39ff74..d91e93a011 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -101,6 +101,7 @@
#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
+#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -255,7 +256,7 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *data);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
- bool txn_prepared);
+ bool txn_prepared, bool mark_streamed);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -776,11 +777,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
- * While streaming the previous changes we have detected that the
- * transaction is aborted. So there is no point in collecting further
- * changes for it.
+ * If we have detected that the transaction is aborted while streaming the
+ * previous changes or by checking its CLOG, there is no point in
+ * collecting further changes for it.
*/
- if (txn->concurrent_abort)
+ if (rbtxn_did_abort(txn))
{
/*
* We don't need to update memory accounting for this change as we
@@ -1591,17 +1592,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Discard changes from a transaction (and subtransactions), either after
- * streaming or decoding them at PREPARE. Keep the remaining info -
- * transactions, tuplecids, invalidations and snapshots.
+ * streaming, decoding them at PREPARE, or detecting the transaction abort.
+ * Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
*
* We additionally remove tuplecids after decoding the transaction at prepare
* time as we only need to perform invalidation at rollback or commit prepared.
*
- * 'txn_prepared' indicates that we have decoded the transaction at prepare
- * time.
+ * If mark_streamed is true, we could mark the transaction as streamed.
+ *
+ * 'streaming_txn' indicates that the given transaction is a streaming transaction.
*/
static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared,
+ bool mark_streamed)
{
dlist_mutable_iter iter;
@@ -1620,7 +1624,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
- ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, mark_streamed);
}
/* cleanup changes in the txn */
@@ -1654,7 +1658,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
- if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
+ if (mark_streamed && (!txn_prepared) &&
+ (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
if (txn_prepared)
@@ -1883,7 +1888,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* full cleanup will happen as part of the COMMIT PREPAREDs, so now
* just truncate txn by removing changes and tuplecids.
*/
- ReorderBufferTruncateTXN(rb, txn, true);
+ ReorderBufferTruncateTXN(rb, txn, true, true);
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2026,7 +2031,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true);
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -2551,7 +2556,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (streaming || rbtxn_prepared(txn))
{
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming);
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2604,7 +2609,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
FlushErrorState();
FreeErrorData(errdata);
errdata = NULL;
- curtxn->concurrent_abort = true;
+
+ /* Update transaction status */
+ Assert((curtxn->txn_flags & (RBTXN_COMMITTED | RBTXN_ABORTED)) == 0);
+ curtxn->txn_flags |= RBTXN_ABORTED;
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
@@ -2766,6 +2774,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
char *gid)
{
ReorderBufferTXN *txn;
+ bool txn_aborted;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
@@ -2777,6 +2786,12 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
txn->txn_flags |= RBTXN_PREPARE;
txn->gid = pstrdup(gid);
+ /*
+ * We remember whether the transaction is already aborted before the
+ * replay in order to detect the concurrent abort below.
+ */
+ txn_aborted = rbtxn_did_abort(txn);
+
/* The prepare info must have been updated in txn by now. */
Assert(txn->final_lsn != InvalidXLogRecPtr);
@@ -2788,10 +2803,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
* when rollback prepared is decoded and sent, the downstream should be
* able to rollback such a xact. See comments atop DecodePrepare.
*
- * Note, for the concurrent_abort + streaming case a stream_prepare was
+ * Note, for the concurrent abort + streaming case a stream_prepare was
* already sent within the ReorderBufferReplay call above.
*/
- if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+ if (!txn_aborted && rbtxn_did_abort(txn) && !rbtxn_is_streamed(txn))
rb->prepare(rb, txn, txn->final_lsn);
}
@@ -3557,6 +3572,66 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
return largest;
}
+/*
+ * Check the transaction status of the given transaction. If the transaction
+ * already aborted, we discards all changes accumulated so far and ignore
+ * future changes, and return true. Otherwise return false.
+ *
+ * If logical_replication_mode is set to "immediate", we disable this check
+ * for regression tests.
+ */
+static bool
+ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ /*
+ * If logical_replication_mode is "immediate", we don't check the
+ * transaction status so the caller always processes this transaction.
+ */
+ if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)
+ return false;
+
+ /*
+ * Quick return if the transaction status is already known.
+ */
+ if (rbtxn_did_abort(txn))
+ return true;
+ if (rbtxn_did_commit(txn))
+ return false;
+
+ /* Check the transaction status using CLOG lookup */
+ if (TransactionIdIsInProgress(txn->xid))
+ return false;
+
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ /*
+ * Remember the transaction is committed so that we can skip CLOG
+ * check next time, avoiding the pressure on CLOG lookup.
+ */
+ txn->txn_flags |= RBTXN_COMMITTED;
+ return false;
+ }
+
+ /*
+ * The transaction aborted. We discard the changes we've collected so far,
+ * and free all resources allocated for toast reconstruction. The full
+ * cleanup will happen as part of decoding ABORT record of this
+ *
+ * We don't mark the transaction as streamed since this function can be
+ * called for non-streamed transactions too.
+ */
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false);
+ ReorderBufferToastReset(rb, txn);
+
+ /*
+ * Mark the transaction as aborted so we ignore future changes of this
+ * transaction.
+ */
+ txn->txn_flags |= RBTXN_ABORTED;
+
+ return true;
+}
+
/*
* Check whether the logical_decoding_work_mem limit was reached, and if yes
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
@@ -3609,6 +3684,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->total_size > 0);
Assert(rb->size >= txn->total_size);
+ /* skip the transaction if aborted */
+ if (ReorderBufferCheckTXNAbort(rb, txn))
+ continue;
+
ReorderBufferStreamTXN(rb, txn);
}
else
@@ -3624,6 +3703,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->size > 0);
Assert(rb->size >= txn->size);
+ /* skip the transaction if aborted */
+ if (ReorderBufferCheckTXNAbort(rb, txn))
+ continue;
+
ReorderBufferSerializeTXN(rb, txn);
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..23c505f29b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -167,6 +167,8 @@ typedef struct ReorderBufferChange
#define RBTXN_PREPARE 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
+#define RBTXN_COMMITTED 0x0200
+#define RBTXN_ABORTED 0x0400
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -224,6 +226,19 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
+/* Did this transaction committed? */
+#define rbtxn_did_commit(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_COMMITTED) != 0 \
+)
+
+/* Did this transaction aborted? */
+#define rbtxn_did_abort(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_ABORTED) != 0 \
+)
+
+
/* prepare for this transaction skipped? */
#define rbtxn_skip_prepared(txn) \
( \
@@ -409,9 +424,6 @@ typedef struct ReorderBufferTXN
/* Size of top-transaction including sub-transactions. */
Size total_size;
- /* If we have detected concurrent abort then ignore future changes. */
- bool concurrent_abort;
-
/*
* Private data pointer of the output plugin.
*/
--
2.39.3