From 0ccecb9ec572b2a46e69f9b9d8edcd84261ab4c3 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 12 Oct 2022 11:49:04 +0900 Subject: [PATCH v1] Fix the assertion failure while processing NEW_CID in logical decoding. When the logical decoding restarts from NEW_CID, since there is no association between the top transaction and its sub transaction, both are created as top transactions and have the same LSN. This caused the assertion failure in AssertTXNLsnOrder(). With this change, we skip the assertion check until we reach the LSN at which we start decoding the contents of transaction, specifically start_decoding_at in SnapBuild. This is okay because we don't guarantee to make the association between top transaction and sub transaction until we try to decode the actual contents of transactions. By skipping the check in some cases, we could miss some faulty cases where two unrelated top-transactions could have same LSN. Therefore, for transactions we skipped, we do the assertion check when reaching the LSN. --- .../expected/catalog_change_snapshot.out | 44 +++++++++++++++++++ .../specs/catalog_change_snapshot.spec | 11 +++++ src/backend/replication/logical/decode.c | 9 ++++ .../replication/logical/reorderbuffer.c | 19 ++++++-- src/backend/replication/logical/snapbuild.c | 9 ++++ src/include/replication/reorderbuffer.h | 2 + src/include/replication/snapbuild.h | 1 + 7 files changed, 92 insertions(+), 3 deletions(-) diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index d2a4bdfcc1..d85552699f 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -96,3 +96,47 @@ COMMIT stop (1 row) + +starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_analyze s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_analyze: ANALYZE tbl1; +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index ff8f68489b..30673da61f 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -21,6 +21,7 @@ step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolatio step "s0_begin" { BEGIN; } step "s0_savepoint" { SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } +step "s0_analyze" { ANALYZE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } step "s0_commit" { COMMIT; } @@ -57,3 +58,13 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s # checkpoint record it prunes one of the xacts in that list and when decoding the # next checkpoint, it will completely prune that list. permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# Test that we can handle the case where there is no association between top-level +# transaction and its subtransactions. The last decoding restarts from the first +# checkpoint, decodes NEW_CID generated by "s0_analyze", and marks the subtransaction +# as containing catalog changes while adding tuple cids to its top-level transaction. +# During that, both transaction entries are created in ReorderBuffer as top-level +# transactions and have the same LSN. We check if the assertion check for the order +# of transaction LSNs in AssertTXNLsnOrder() is skipped since we are still before the +# LSN at which we start replaying the contents of transactions. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_analyze" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 2cc0ac9eb0..17349da1f0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -113,6 +113,15 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } +#ifdef USE_ASSERT_CHECKING + /* + * Check the order of transaction LSNs when we reached the start decoding + * LSN. See the comments in AssertTXNLsnOrder() for details. + */ + if (SnapBuildGetStartDecodingAt(ctx->snapshot_builder) == buf.origptr) + AssertTXNLsnOrder(ctx->reorder); +#endif + rmgr = GetRmgr(XLogRecGetRmid(record)); if (rmgr.rm_decode != NULL) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 6dff9915a5..3a442e0d1e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -221,8 +221,6 @@ static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn); -static void AssertTXNLsnOrder(ReorderBuffer *rb); - /* --------------------------------------- * support functions for lsn-order iterating over the ->changes of a * transaction and its subtransactions @@ -877,14 +875,29 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, * * No-op if assertions are not in use. */ -static void +void AssertTXNLsnOrder(ReorderBuffer *rb) { #ifdef USE_ASSERT_CHECKING + LogicalDecodingContext *ctx = rb->private_data; dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; + /* + * Skip the verification if we don't reach the LSN at which we start + * decoding the contents of transactions yet. This is okay because + * until we reach the LSN, we could have transactions that don't have + * the association between the top-level transaction and subtransaction + * yet. We don't guarantee this association until we try to decode the + * actual contents of transaction. Since we typically do this + * verification when manipulating the transaction lists in the reorder + * buffer, we may skip verifying some transactions. For those that we + * skipped, we do that when we reached the LSN. + */ + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr)) + return; + dlist_foreach(iter, &rb->toplevel_by_lsn) { ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 54499c06fe..4640a6321c 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -431,6 +431,15 @@ SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr) return ptr < builder->start_decoding_at; } +/* + * Return the LSN at which the contents of transaction are first decoded. + */ +XLogRecPtr +SnapBuildGetStartDecodingAt(SnapBuild *builder) +{ + return builder->start_decoding_at; +} + /* * Increase refcount of a snapshot. * diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 02b59a1931..97f6c26e99 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -703,4 +703,6 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr); extern void StartupReorderBuffer(void); +extern void AssertTXNLsnOrder(ReorderBuffer *rb); + #endif diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 2a697e57c3..3b917ad3ec 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -76,6 +76,7 @@ extern SnapBuildState SnapBuildCurrentState(SnapBuild *builder); extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder); extern bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr); +extern XLogRecPtr SnapBuildGetStartDecodingAt(SnapBuild *builder); extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder); extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); -- 2.31.1