diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 78d36429c8..fe06e42c98 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -138,12 +138,38 @@ SELECT slot_name FROM pg_stat_replication_slots; (3 rows) COMMIT; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4', 'test_decoding') s4; + ?column? +---------- + init +(1 row) + +-- transaction is large enough to be serialized but aborted. +BEGIN; +INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i); +ROLLBACK; +RESET logical_decoding_work_mem; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4', NULL, NULL, 'skip-empty-xacts', '1'); + count +------- + 0 +(1 row) + +-- Check stats. Since the transaction is already aborted, we don't collect +-- changes, so no data should be spilled. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4'; + slot_name | spill_txns | spill_count | total_txns | total_bytes +------------------------+------------+-------------+------------+------------- + regression_slot_stats4 | t | t | f | f +(1 row) + DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), pg_drop_replication_slot('regression_slot_stats2'), - pg_drop_replication_slot('regression_slot_stats3'); - pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot ---------------------------+--------------------------+-------------------------- - | | + pg_drop_replication_slot('regression_slot_stats3'), + pg_drop_replication_slot('regression_slot_stats4'); + pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot +--------------------------+--------------------------+--------------------------+-------------------------- + | | | (1 row) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 0f21dcb8e0..bdba352f1a 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -24,12 +24,11 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); TRUNCATE table stream_test; rollback to s1; -INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i); COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); - data ----------------------------------------------------------- - streaming message: transactional: 1 prefix: test, sz: 50 + data +------------------------------------------ opening a streamed block for transaction streaming change for transaction streaming change for transaction @@ -51,9 +50,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl streaming change for transaction streaming change for transaction streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction closing a streamed block for transaction committing streamed transaction -(24 rows) +(34 rows) -- streaming test for toast changes ALTER TABLE stream_test ALTER COLUMN data set storage external; diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out index b08bb0e573..55d5c6085f 100644 --- a/contrib/test_decoding/expected/twophase_stream.out +++ b/contrib/test_decoding/expected/twophase_stream.out @@ -25,13 +25,12 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); TRUNCATE table stream_test; ROLLBACK TO s1; -INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i); PREPARE TRANSACTION 'test1'; -- should show the inserts after a ROLLBACK SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); - data ----------------------------------------------------------- - streaming message: transactional: 1 prefix: test, sz: 50 + data +------------------------------------------ opening a streamed block for transaction streaming change for transaction streaming change for transaction @@ -53,9 +52,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl streaming change for transaction streaming change for transaction streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction closing a streamed block for transaction preparing streamed transaction 'test1' -(24 rows) +(34 rows) COMMIT PREPARED 'test1'; --should show the COMMIT PREPARED and the other changes in the transaction @@ -82,10 +92,9 @@ INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20 PREPARE TRANSACTION 'test1_nodecode'; -- should NOT show inserts after a ROLLBACK SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); - data ----------------------------------------------------------- - streaming message: transactional: 1 prefix: test, sz: 50 -(1 row) + data +------ +(0 rows) COMMIT PREPARED 'test1_nodecode'; -- should show the inserts but not show a COMMIT PREPARED but a COMMIT diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 630371f147..8a5819f63d 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -50,7 +50,22 @@ SELECT slot_name FROM pg_stat_replication_slots; SELECT slot_name FROM pg_stat_replication_slots; COMMIT; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4', 'test_decoding') s4; + +-- transaction is large enough to be serialized but aborted. +BEGIN; +INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i); +ROLLBACK; + +RESET logical_decoding_work_mem; +SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4', NULL, NULL, 'skip-empty-xacts', '1'); + +-- Check stats. Since the transaction is already aborted, we don't collect +-- changes, so no data should be spilled. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4'; + DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), pg_drop_replication_slot('regression_slot_stats2'), - pg_drop_replication_slot('regression_slot_stats3'); + pg_drop_replication_slot('regression_slot_stats3'), + pg_drop_replication_slot('regression_slot_stats4'); diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index 4feec62972..ceab9aa627 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -13,7 +13,7 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); TRUNCATE table stream_test; rollback to s1; -INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i); COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql index 646076da20..f607aeae20 100644 --- a/contrib/test_decoding/sql/twophase_stream.sql +++ b/contrib/test_decoding/sql/twophase_stream.sql @@ -15,7 +15,7 @@ SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); TRUNCATE table stream_test; ROLLBACK TO s1; -INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 31) g(i); PREPARE TRANSACTION 'test1'; -- should show the inserts after a ROLLBACK SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 26d252bd87..d58d123957 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -674,6 +674,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, txn->first_lsn = lsn; txn->restart_decoding_lsn = rb->current_restart_decoding_lsn; + /* Check if the transaction already aborted */ + txn->aborted = TransactionIdDidAbort(xid); + if (create_as_top) { dlist_push_tail(&rb->toplevel_by_lsn, &txn->node); @@ -780,11 +783,15 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* - * While streaming the previous changes we have detected that the - * transaction is aborted. So there is no point in collecting further - * changes for it. + * If we have detected that the transaction is aborted while streaming + * the previous changes or by checking its CLOG, there is no point in + * collecting further changes for it. + * + * XXX To pass some TAP tests, we don't skip decoding already-aborted + * transaction changes if logical_replication_mode is immediate, for now. */ - if (txn->concurrent_abort) + if (txn->concurrent_abort || + (txn->aborted && logical_replication_mode != LOGICAL_REP_MODE_IMMEDIATE)) { /* * We don't need to update memory accounting for this change as we diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1b9db22acb..8879083d28 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -431,6 +431,12 @@ typedef struct ReorderBufferTXN /* If we have detected concurrent abort then ignore future changes. */ bool concurrent_abort; + /* + * If the transaction is known to be already aborted then ignore + * changes. + */ + bool aborted; + /* * Private data pointer of the output plugin. */