From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001 From: tomas Date: Wed, 6 Nov 2024 12:46:16 +0100 Subject: [PATCH vnew 2/2] WIP: change asserts to elog + defense When updating LSNs, use Max() with the preceding value to protect against moves backwards. --- src/backend/replication/logical/logical.c | 57 ++++++++++++----------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 16fed53063b..988f4add977 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) xmin, LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); - /* don't allow the LSN to go backwards */ - Assert(slot->candidate_xmin_lsn <= current_lsn); + if (slot->candidate_xmin_lsn > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn"); + /* XXX doesn't the Max() put the fields out of sync? */ slot->candidate_catalog_xmin = xmin; - slot->candidate_xmin_lsn = current_lsn; + slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); /* our candidate can directly be used */ updated_xmin = true; @@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) xmin, LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); - /* don't allow the LSN to go backwards */ - Assert(slot->candidate_xmin_lsn <= current_lsn); + if (slot->candidate_xmin_lsn > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn"); + /* XXX doesn't the Max() put the fields out of sync? */ slot->candidate_catalog_xmin = xmin; - slot->candidate_xmin_lsn = current_lsn; + slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); } SpinLockRelease(&slot->mutex); @@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); - /* don't allow LSNs to go back */ - Assert(slot->candidate_restart_valid <= current_lsn); - Assert(slot->candidate_restart_lsn <= restart_lsn); + if (slot->candidate_restart_valid > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn"); - /* also don't even consider going back for actual restart_lsn */ - Assert(slot->data.restart_lsn <= restart_lsn); + if (slot->candidate_restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn"); - slot->candidate_restart_valid = current_lsn; - slot->candidate_restart_lsn = restart_lsn; + if (slot->data.restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn"); + + slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn); + slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn)); /* our candidate can directly be used */ updated_lsn = true; @@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); - /* don't allow LSNs to go back */ - Assert(slot->candidate_restart_valid <= current_lsn); - Assert(slot->candidate_restart_lsn <= restart_lsn); + if (slot->candidate_restart_valid > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn"); - /* - * also don't even consider going back for actual restart_lsn - * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); - */ - Assert(slot->data.restart_lsn <= restart_lsn); + if (slot->candidate_restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn"); + + if (slot->data.restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn"); + + slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn); + slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn)); - slot->candidate_restart_valid = current_lsn; - slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); elog(DEBUG1, "got new restart lsn %X/%X at %X/%X", @@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid), LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn)); - /* don't allow the LSN to go backwards */ - Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn); + if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn"); - MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; + MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn, + MyReplicationSlot->candidate_restart_lsn); MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; updated_restart = true; -- 2.39.5