diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index 78d36429c8..fe06e42c98 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -138,12 +138,38 @@ SELECT slot_name FROM pg_stat_replication_slots;
(3 rows)
COMMIT;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4', 'test_decoding') s4;
+ ?column?
+----------
+ init
+(1 row)
+
+-- transaction is large enough to be serialized but aborted.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+ROLLBACK;
+RESET logical_decoding_work_mem;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4', NULL, NULL, 'skip-empty-xacts', '1');
+ count
+-------
+ 0
+(1 row)
+
+-- Check stats. Since the transaction is already aborted, we don't collect
+-- changes, so no data should be spilled.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4';
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+------------------------+------------+-------------+------------+-------------
+ regression_slot_stats4 | t | t | f | f
+(1 row)
+
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
- pg_drop_replication_slot('regression_slot_stats3');
- pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
---------------------------+--------------------------+--------------------------
- | |
+ pg_drop_replication_slot('regression_slot_stats3'),
+ pg_drop_replication_slot('regression_slot_stats4');
+ pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
+--------------------------+--------------------------+--------------------------+--------------------------
+ | | |
(1 row)
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 0f21dcb8e0..bdba352f1a 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -24,12 +24,11 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
rollback to s1;
-INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i);
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
- data
-----------------------------------------------------------
- streaming message: transactional: 1 prefix: test, sz: 50
+ data
+------------------------------------------
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
@@ -51,9 +50,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction
streaming change for transaction
streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
-(24 rows)
+(34 rows)
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index b08bb0e573..55d5c6085f 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -25,13 +25,12 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
-INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
- data
-----------------------------------------------------------
- streaming message: transactional: 1 prefix: test, sz: 50
+ data
+------------------------------------------
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
@@ -53,9 +52,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction
streaming change for transaction
streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
closing a streamed block for transaction
preparing streamed transaction 'test1'
-(24 rows)
+(34 rows)
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
@@ -82,10 +92,9 @@ INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
- data
-----------------------------------------------------------
- streaming message: transactional: 1 prefix: test, sz: 50
-(1 row)
+ data
+------
+(0 rows)
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 630371f147..8a5819f63d 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -50,7 +50,22 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4', 'test_decoding') s4;
+
+-- transaction is large enough to be serialized but aborted.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+ROLLBACK;
+
+RESET logical_decoding_work_mem;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4', NULL, NULL, 'skip-empty-xacts', '1');
+
+-- Check stats. Since the transaction is already aborted, we don't collect
+-- changes, so no data should be spilled.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4';
+
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
- pg_drop_replication_slot('regression_slot_stats3');
+ pg_drop_replication_slot('regression_slot_stats3'),
+ pg_drop_replication_slot('regression_slot_stats4');
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 4feec62972..ceab9aa627 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -13,7 +13,7 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
rollback to s1;
-INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i);
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
index 646076da20..f607aeae20 100644
--- a/contrib/test_decoding/sql/twophase_stream.sql
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -15,7 +15,7 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
-INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 26d252bd87..d58d123957 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -674,6 +674,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
txn->first_lsn = lsn;
txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
+ /* Check if the transaction already aborted */
+ txn->aborted = TransactionIdDidAbort(xid);
+
if (create_as_top)
{
dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
@@ -780,11 +783,15 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
- * While streaming the previous changes we have detected that the
- * transaction is aborted. So there is no point in collecting further
- * changes for it.
+ * If we have detected that the transaction is aborted while streaming
+ * the previous changes or by checking its CLOG, there is no point in
+ * collecting further changes for it.
+ *
+ * XXX To pass some TAP tests, we don't skip decoding already-aborted
+ * transaction changes if logical_replication_mode is immediate, for now.
*/
- if (txn->concurrent_abort)
+ if (txn->concurrent_abort ||
+ (txn->aborted && logical_replication_mode != LOGICAL_REP_MODE_IMMEDIATE))
{
/*
* We don't need to update memory accounting for this change as we
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1b9db22acb..8879083d28 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -431,6 +431,12 @@ typedef struct ReorderBufferTXN
/* If we have detected concurrent abort then ignore future changes. */
bool concurrent_abort;
+ /*
+ * If the transaction is known to be already aborted then ignore
+ * changes.
+ */
+ bool aborted;
+
/*
* Private data pointer of the output plugin.
*/