diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 66dbf58..995f51f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -58,6 +58,7 @@ #include "replication/walsender.h" #include "replication/syncrep.h" #include "storage/fd.h" +#include "storage/ipc.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -82,25 +83,25 @@ int max_prepared_xacts = 0; * * The lifecycle of a global transaction is: * - * 1. After checking that the requested GID is not in use, set up an - * entry in the TwoPhaseState->prepXacts array with the correct XID and GID, - * with locking_xid = my own XID and valid = false. + * 1. After checking that the requested GID is not in use, set up an entry in + * the TwoPhaseState->prepXacts array with the correct GID and valid = false, + * and mark it as locked by my backend. * * 2. After successfully completing prepare, set valid = true and enter the * referenced PGPROC into the global ProcArray. * - * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry - * is valid and its locking_xid is no longer active, then store my current - * XID into locking_xid. This prevents concurrent attempts to commit or - * rollback the same prepared xact. + * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is + * valid and not locked, then mark the entry as locked by storing my current + * backend ID into locking_backend. This prevents concurrent attempts to + * commit or rollback the same prepared xact. * * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry * from the ProcArray and the TwoPhaseState->prepXacts array and return it to * the freelist. * * Note that if the preparing transaction fails between steps 1 and 2, the - * entry will remain in prepXacts until recycled. We can detect recyclable - * entries by checking for valid = false and locking_xid no longer active. + * entry must be removed so that the GID and the GlobalTransaction struct + * can be reused. See AtAbort_Twophase(). * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h @@ -115,8 +116,8 @@ typedef struct GlobalTransactionData TimestampTz prepared_at; /* time of preparation */ XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */ Oid owner; /* ID of user that executed the xact */ - TransactionId locking_xid; /* top-level XID of backend working on xact */ - bool valid; /* TRUE if fully prepared */ + BackendId locking_backend; /* backend currently working on the xact */ + bool valid; /* TRUE if PGPROC entry is in proc array */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ } GlobalTransactionData; @@ -141,6 +142,12 @@ typedef struct TwoPhaseStateData static TwoPhaseStateData *TwoPhaseState; +/* + * Global transaction entry currently locked by us, if any. + */ +static GlobalTransaction MyLockedGxact = NULL; + +static bool twophaseExitRegistered = false; static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, @@ -157,6 +164,7 @@ static void RecordTransactionAbortPrepared(TransactionId xid, RelFileNode *rels); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); +static void RemoveGXact(GlobalTransaction gxact); /* @@ -230,6 +238,66 @@ TwoPhaseShmemInit(void) Assert(found); } +/* + * Exit hook to unlock the global transaction entry we're working on. + */ +static void +AtProcExit_Twophase(int code, Datum arg) +{ + /* same logic as abort */ + AtAbort_Twophase(); +} + +/* + * Abort hook to unlock the global transaction entry we're working on. + */ +void +AtAbort_Twophase(void) +{ + if (MyLockedGxact == NULL) + return; + + /* + * If we were in process of preparing the transaction, but haven't + * written the WAL record yet, remove the global transaction entry. + * Same if we are in the process of finishing an already-prepared + * transaction, and fail after having already written the WAL 2nd + * phase commit or rollback record. + * + * After that it's too late to abort, so just unlock the GlobalTransaction + * entry. We might not have transfered all locks and other state to the + * prepared transaction yet, so this is a bit bogus, but it's the best we + * can do. + */ + if (!MyLockedGxact->valid) + { + RemoveGXact(MyLockedGxact); + } + else + { + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + MyLockedGxact->locking_backend = InvalidBackendId; + + LWLockRelease(TwoPhaseStateLock); + } + MyLockedGxact = NULL; +} + +/* + * This is called after we have finished transfering state to the prepared + * PGXACT entry. + */ +void +PostPrepare_Twophase() +{ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + MyLockedGxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); + + MyLockedGxact = NULL; +} + /* * MarkAsPreparing @@ -261,29 +329,15 @@ MarkAsPreparing(TransactionId xid, const char *gid, errmsg("prepared transactions are disabled"), errhint("Set max_prepared_transactions to a nonzero value."))); - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); - - /* - * First, find and recycle any gxacts that failed during prepare. We do - * this partly to ensure we don't mistakenly say their GIDs are still - * reserved, and partly so we don't fail on out-of-slots unnecessarily. - */ - for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + /* on first call, register the exit hook */ + if (!twophaseExitRegistered) { - gxact = TwoPhaseState->prepXacts[i]; - if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid)) - { - /* It's dead Jim ... remove from the active array */ - TwoPhaseState->numPrepXacts--; - TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; - /* and put it back in the freelist */ - gxact->next = TwoPhaseState->freeGXacts; - TwoPhaseState->freeGXacts = gxact; - /* Back up index count too, so we don't miss scanning one */ - i--; - } + before_shmem_exit(AtProcExit_Twophase, 0); + twophaseExitRegistered = true; } + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + /* Check for conflicting GID */ for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { @@ -340,7 +394,7 @@ MarkAsPreparing(TransactionId xid, const char *gid, /* initialize LSN to 0 (start of WAL) */ gxact->prepare_lsn = 0; gxact->owner = owner; - gxact->locking_xid = xid; + gxact->locking_backend = MyBackendId; gxact->valid = false; strcpy(gxact->gid, gid); @@ -348,6 +402,12 @@ MarkAsPreparing(TransactionId xid, const char *gid, Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + /* + * Remember that we have this GlobalTransaction entry locked for us. + * If we abort after this, we must release the entry. + */ + MyLockedGxact = gxact; + LWLockRelease(TwoPhaseStateLock); return gxact; @@ -410,6 +470,13 @@ LockGXact(const char *gid, Oid user) { int i; + /* on first call, register the exit hook */ + if (!twophaseExitRegistered) + { + before_shmem_exit(AtProcExit_Twophase, 0); + twophaseExitRegistered = true; + } + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -424,15 +491,11 @@ LockGXact(const char *gid, Oid user) continue; /* Found it, but has someone else got it locked? */ - if (TransactionIdIsValid(gxact->locking_xid)) - { - if (TransactionIdIsActive(gxact->locking_xid)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("prepared transaction with identifier \"%s\" is busy", - gid))); - gxact->locking_xid = InvalidTransactionId; - } + if (gxact->locking_backend != InvalidBackendId) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepared transaction with identifier \"%s\" is busy", + gid))); if (user != gxact->owner && !superuser_arg(user)) ereport(ERROR, @@ -453,7 +516,8 @@ LockGXact(const char *gid, Oid user) errhint("Connect to the database where the transaction was prepared to finish it."))); /* OK for me to lock it */ - gxact->locking_xid = GetTopTransactionId(); + gxact->locking_backend = MyBackendId; + MyLockedGxact = gxact; LWLockRelease(TwoPhaseStateLock); @@ -1089,6 +1153,13 @@ EndPrepare(GlobalTransaction gxact) */ MyPgXact->delayChkpt = false; + /* + * Remember that we have this GlobalTransaction entry locked for us. If + * we crash after this point, it's too late to abort, but we must unlock + * it so that the prepared transaction can be committed or rolled back. + */ + MyLockedGxact = gxact; + END_CRIT_SECTION(); /* @@ -1335,8 +1406,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* * In case we fail while running the callbacks, mark the gxact invalid so - * no one else will try to commit/rollback, and so it can be recycled - * properly later. It is still locked by our XID so it won't go away yet. + * no one else will try to commit/rollback, and so it will be recycled + * if we fail after this point. It is still locked by our backend so it + * won't go away yet. * * (We assume it's safe to do this without taking TwoPhaseStateLock.) */ @@ -1396,6 +1468,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RemoveTwoPhaseFile(xid, true); RemoveGXact(gxact); + MyLockedGxact = NULL; pfree(buf); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 9ee11f3..ea8035f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2268,6 +2268,8 @@ PrepareTransaction(void) RESOURCE_RELEASE_AFTER_LOCKS, true, true); + PostPrepare_Twophase(); + /* Check we've released all catcache entries */ AtEOXact_CatCache(true); @@ -2394,6 +2396,7 @@ AbortTransaction(void) AtEOXact_LargeObject(false); AtAbort_Notify(); AtEOXact_RelationMap(false); + AtAbort_Twophase(); /* * Advertise the fact that we aborted in pg_clog (assuming that we got as diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 9d29e35..80079b2 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -30,6 +30,9 @@ extern int max_prepared_xacts; extern Size TwoPhaseShmemSize(void); extern void TwoPhaseShmemInit(void); +extern void AtAbort_Twophase(void); +extern void PostPrepare_Twophase(void); + extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid); extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid);