From 4fbb52a567c021c5abf862afba342e59c16d6bee Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 12 Oct 2022 11:49:04 +0900 Subject: [PATCH v4] Fix the assertion failure while processing NEW_CID in logical decoding. When the logical decoding restarts from NEW_CID, since there is no association between the top transaction and its sub transaction, both are created as top transactions and have the same LSN. This caused the assertion failure in AssertTXNLsnOrder(). Besides, when decoding the commit record of top transaction, we missed setting needs_timetravel = true although one of its subtransactions has been marked as containing catalog change, which caused another assertion failure in SnapBuildCommitTxn(). With this change, we skip the assertion check until we reach the LSN at which we start decoding the contents of transaction, specifically start_decoding_at in SnapBuild. This is okay because we don't guarantee to make the association between top transaction and sub transaction until we try to decode the actual contents of transactions. The orderig ordering of the records prior to the start_decoding_at LSN should have been checked before the restart. In addition, when decoding the commit record, we force top transaction to do timetravel if one of its subtransactions has been marked as containing catalog changes. Back-patch to all supported branches. Reported-by: Thomas Vondra, Osumi Takamichi Author: Osumi Takamichi, Masahiko Sawada Reviewed-by: Amit Kapila, Dilip Kumar, Kuroda Hayato, Kyotaro Horiguchi, Masahiko Sawada Discussion: https://postgr.es/m/a89b46b6-0239-2fd5-71a9-b19b1f7a7145%40enterprisedb.com Discussion: https://postgr.es/m/TYCPR01MB83733C6CEAE47D0280814D5AED7A9%40TYCPR01MB8373.jpnprd01.prod.outlook.com Backpatch-through: 10 --- .../expected/catalog_change_snapshot.out | 45 +++++++++++++++++++ .../specs/catalog_change_snapshot.spec | 16 +++++++ .../replication/logical/reorderbuffer.c | 14 ++++++ src/backend/replication/logical/snapbuild.c | 3 ++ 4 files changed, 78 insertions(+) diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index d2a4bdfcc1..8722787e6e 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -96,3 +96,48 @@ COMMIT stop (1 row) + +starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_insert2 s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO user_cat VALUES (1); +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +table public.user_cat: INSERT: val1[integer]:1 +COMMIT +(4 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index ff8f68489b..673bccf4b0 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -6,12 +6,14 @@ setup DROP TABLE IF EXISTS tbl2; CREATE TABLE tbl1 (val1 integer, val2 integer); CREATE TABLE tbl2 (val1 integer, val2 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); } teardown { DROP TABLE tbl1; DROP TABLE tbl2; + DROP TABLE user_cat; SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); } @@ -22,6 +24,7 @@ step "s0_begin" { BEGIN; } step "s0_savepoint" { SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_insert2" { INSERT INTO user_cat VALUES (1); } step "s0_commit" { COMMIT; } session "s1" @@ -57,3 +60,16 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s # checkpoint record it prunes one of the xacts in that list and when decoding the # next checkpoint, it will completely prune that list. permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# Test that we can handle the case where there is no association between top-level +# transaction and its subtransactions. The last decoding restarts from the first +# checkpoint, decodes NEW_CID generated by "s0_insert2", and marks the subtransaction +# as containing catalog changes while adding tuple cids to its top-level transaction. +# During that, both transaction entries are created in ReorderBuffer as top-level +# transactions and have the same LSN. We check if the assertion check for the order +# of transaction LSNs in AssertTXNLsnOrder() is skipped since we are still before the +# LSN at which we start replaying the contents of transactions. Besides, when decoding +# the commit record of the top-level transaction, we must force the top-level +# transaction to do timetravel since one of its subtransactions has been marked as +# containing catalog changes. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 6dff9915a5..f55bf44290 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -881,10 +881,24 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb) { #ifdef USE_ASSERT_CHECKING + LogicalDecodingContext *ctx = rb->private_data; dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; + /* + * Skip the verification if we don't reach the LSN at which we start + * decoding the contents of transactions yet because until we reach the + * LSN, we could have transactions that don't have the association between + * the top-level transaction and subtransaction yet and consequently have + * the same LSN. We don't guarantee this association until we try to + * decode the actual contents of transaction. The ordering of the records + * prior to the start_decoding_at LSN should have been checked before the + * restart. + */ + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr)) + return; + dlist_foreach(iter, &rb->toplevel_by_lsn) { ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 54499c06fe..f892d19bfb 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1099,6 +1099,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, else if (sub_needs_timetravel) { /* track toplevel txn as well, subxact alone isn't meaningful */ + elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions", + xid); + needs_timetravel = true; SnapBuildAddCommittedTxn(builder, xid); } else if (needs_timetravel) -- 2.31.1