From 3b58990c088936122f38d855a5a3900602deacf7 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 6 Apr 2020 21:28:55 -0700 Subject: [PATCH v7 01/11] TMP: work around missing snapshot registrations. This is just what's hit by the tests. It's not an actual fix. --- src/backend/catalog/namespace.c | 7 +++++++ src/backend/catalog/pg_subscription.c | 4 ++++ src/backend/commands/indexcmds.c | 9 +++++++++ src/backend/commands/tablecmds.c | 8 ++++++++ src/backend/replication/logical/tablesync.c | 12 ++++++++++++ src/backend/replication/logical/worker.c | 4 ++++ src/backend/utils/time/snapmgr.c | 4 ++++ 7 files changed, 48 insertions(+) diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 2ec23016fe5..e4696d8d417 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -55,6 +55,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -4244,12 +4245,18 @@ RemoveTempRelationsCallback(int code, Datum arg) { if (OidIsValid(myTempNamespace)) /* should always be true */ { + Snapshot snap; + /* Need to ensure we have a usable transaction. */ AbortOutOfAnyTransaction(); StartTransactionCommand(); + /* ensure xmin stays set */ + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); + RemoveTempRelations(myTempNamespace); + UnregisterSnapshot(snap); CommitTransactionCommand(); } } diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index cb157311154..4a324dfb4f1 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -31,6 +31,7 @@ #include "utils/fmgroids.h" #include "utils/pg_lsn.h" #include "utils/rel.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" static List *textarray_to_stringlist(ArrayType *textarray); @@ -286,6 +287,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; + Snapshot snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); @@ -321,6 +323,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, /* Cleanup. */ table_close(rel, NoLock); + + UnregisterSnapshot(snap); } /* diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 2baca12c5f4..094bf6139f0 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -2837,6 +2837,7 @@ ReindexRelationConcurrently(Oid relationOid, int options) char *relationName = NULL; char *relationNamespace = NULL; PGRUsage ru0; + Snapshot snap; /* * Create a memory context that will survive forced transaction commits we @@ -3306,6 +3307,7 @@ ReindexRelationConcurrently(Oid relationOid, int options) */ StartTransactionCommand(); + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); forboth(lc, indexIds, lc2, newIndexIds) { @@ -3354,8 +3356,11 @@ ReindexRelationConcurrently(Oid relationOid, int options) } /* Commit this transaction and make index swaps visible */ + UnregisterSnapshot(snap); CommitTransactionCommand(); + StartTransactionCommand(); + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); /* * Phase 5 of REINDEX CONCURRENTLY @@ -3386,7 +3391,9 @@ ReindexRelationConcurrently(Oid relationOid, int options) } /* Commit this transaction to make the updates visible. */ + UnregisterSnapshot(snap); CommitTransactionCommand(); + StartTransactionCommand(); /* @@ -3400,6 +3407,7 @@ ReindexRelationConcurrently(Oid relationOid, int options) WaitForLockersMultiple(lockTags, AccessExclusiveLock, true); PushActiveSnapshot(GetTransactionSnapshot()); + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); { ObjectAddresses *objects = new_object_addresses(); @@ -3425,6 +3433,7 @@ ReindexRelationConcurrently(Oid relationOid, int options) } PopActiveSnapshot(); + UnregisterSnapshot(snap); CommitTransactionCommand(); /* diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 6162fb018c7..e1eacc6a4a6 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -15200,6 +15200,7 @@ PreCommit_on_commit_actions(void) ListCell *l; List *oids_to_truncate = NIL; List *oids_to_drop = NIL; + Snapshot snap; foreach(l, on_commits) { @@ -15231,6 +15232,11 @@ PreCommit_on_commit_actions(void) } } + if (oids_to_truncate == NIL && oids_to_drop == NIL) + return; + + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); + /* * Truncate relations before dropping so that all dependencies between * relations are removed after they are worked on. Doing it like this @@ -15284,6 +15290,8 @@ PreCommit_on_commit_actions(void) } #endif } + + UnregisterSnapshot(snap); } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index c27d9705895..aec5a044790 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -863,6 +863,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { Relation rel; WalRcvExecResult *res; + Snapshot snap; SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -871,10 +872,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); + + UnregisterSnapshot(snap); CommitTransactionCommand(); pgstat_report_stat(false); @@ -918,6 +923,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CRS_USE_SNAPSHOT, origin_startpos); PushActiveSnapshot(GetTransactionSnapshot()); + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); copy_table(rel); PopActiveSnapshot(); @@ -933,6 +939,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Make the copy visible. */ CommandCounterIncrement(); + UnregisterSnapshot(snap); + /* * We are done with the initial data synchronization, update * the state. @@ -957,6 +965,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) { + snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); + /* * Update the new state in catalog. No need to bother * with the shmem state as we are exiting for good. @@ -965,6 +975,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, SUBREL_STATE_SYNCDONE, *origin_startpos); + UnregisterSnapshot(snap); + finish_sync_worker(); } break; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a752a1224d6..f10f3f843d1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1245,6 +1245,9 @@ apply_handle_truncate(StringInfo s) ensure_transaction(); + /* catalog modifications need a set snapshot */ + PushActiveSnapshot(GetTransactionSnapshot()); + remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); foreach(lc, remote_relids) @@ -1332,6 +1335,7 @@ apply_handle_truncate(StringInfo s) } CommandCounterIncrement(); + PopActiveSnapshot(); } diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 1c063c592ce..b5cff157bf6 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -441,6 +441,8 @@ GetOldestSnapshot(void) Snapshot GetCatalogSnapshot(Oid relid) { + Assert(IsTransactionState()); + /* * Return historic snapshot while we're doing logical decoding, so we can * see the appropriate state of the catalog. @@ -1017,6 +1019,8 @@ SnapshotResetXmin(void) if (pairingheap_is_empty(&RegisteredSnapshots)) { MyPgXact->xmin = InvalidTransactionId; + TransactionXmin = InvalidTransactionId; + RecentXmin = InvalidTransactionId; return; } -- 2.25.0.114.g5b0ca878e0