diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 6fde2bd..b91922e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -45,8 +45,26 @@ * fsynced * * If COMMIT happens after checkpoint then backend reads state data from * files - * * In case of crash replay will move data from xlog to files, if that - * hasn't happened before. XXX TODO - move to shmem in replay also + * + * During replay and replication, TwoPhaseState also holds information + * about active prepared transactions that haven't been moved to disk yet. + * + * Replay of twophase records happens by the following rules: + * + * * On PREPARE redo we add the transaction to TwoPhaseState->prepXacts. + * We set gxact->inredo to true for such entries. + * + * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries + * that have gxact->inredo set and are behind the redo_horizon. We + * save them to disk and also set gxact->ondisk to true. + * + * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts. + * If gxact->ondisk is true, we delete the corresponding entry from + * the disk as well. + * + * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions() + * and PrescanPreparedTransactions() have been modified to go through + * gxact->inredo entries that have not made to disk yet. * *------------------------------------------------------------------------- */ @@ -148,11 +166,13 @@ typedef struct GlobalTransactionData */ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ + TransactionId xid; /* The GXACT id */ Oid owner; /* ID of user that executed the xact */ BackendId locking_backend; /* backend currently working on the xact */ bool valid; /* TRUE if PGPROC entry is in proc array */ bool ondisk; /* TRUE if prepare state file is on disk */ + bool inredo; /* TRUE if entry was added via xlog_redo */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ } GlobalTransactionData; @@ -350,12 +370,14 @@ PostPrepare_Twophase(void) */ GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, - TimestampTz prepared_at, Oid owner, Oid databaseid) + TimestampTz prepared_at, Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn) { GlobalTransaction gxact; PGPROC *proc; PGXACT *pgxact; int i; + bool found = false; if (strlen(gid) >= GIDSIZE) ereport(ERROR, @@ -385,22 +407,32 @@ MarkAsPreparing(TransactionId xid, const char *gid, gxact = TwoPhaseState->prepXacts[i]; if (strcmp(gxact->gid, gid) == 0) { - ereport(ERROR, + /* It's ok to find an entry in the redo/recovery case */ + if (!gxact->inredo) + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("transaction identifier \"%s\" is already in use", gid))); + else + { + found = true; + break; + } } } /* Get a free gxact from the freelist */ - if (TwoPhaseState->freeGXacts == NULL) + if (!found && TwoPhaseState->freeGXacts == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("maximum number of prepared transactions reached"), errhint("Increase max_prepared_transactions (currently %d).", max_prepared_xacts))); - gxact = TwoPhaseState->freeGXacts; - TwoPhaseState->freeGXacts = gxact->next; + if (!found) + { + gxact = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = gxact->next; + } proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -432,18 +464,24 @@ MarkAsPreparing(TransactionId xid, const char *gid, pgxact->nxids = 0; gxact->prepared_at = prepared_at; - /* initialize LSN to InvalidXLogRecPtr */ - gxact->prepare_start_lsn = InvalidXLogRecPtr; - gxact->prepare_end_lsn = InvalidXLogRecPtr; + /* initialize LSN to passed in values */ + gxact->prepare_start_lsn = prepare_start_lsn; + gxact->prepare_end_lsn = prepare_end_lsn; + gxact->xid = xid; gxact->owner = owner; gxact->locking_backend = MyBackendId; gxact->valid = false; gxact->ondisk = false; + gxact->inredo = false; strcpy(gxact->gid, gid); /* And insert it into the active array */ - Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); - TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + if (!found) + { + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + } + /* * Remember that we have this GlobalTransaction entry locked for us. If we @@ -457,6 +495,92 @@ MarkAsPreparing(TransactionId xid, const char *gid, } /* + * MarkAsPreparingInRedo + * Reserve the GID for the given transaction in the redo code path. + * + * Internally, this creates a gxact struct and puts it into the active array. + * + * In redo, this struct is mainly used to track PREPARE/COMMIT entries + * in shared memory. Hence, we only fill up the bare minimum contents here. + * The gxact also gets marked with gxact->inredo set to true to indicate + * that it got added in the redo phase + */ +GlobalTransaction +MarkAsPreparingInRedo(TransactionId xid, const char *gid, + TimestampTz prepared_at, Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn) +{ + GlobalTransaction gxact; + int i; + + if (strlen(gid) >= GIDSIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("transaction identifier \"%s\" is too long", + gid))); + + /* fail immediately if feature is disabled */ + if (max_prepared_xacts == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepared transactions are disabled"), + errhint("Set max_prepared_transactions to a nonzero value."))); + + /* on first call, register the exit hook */ + if (!twophaseExitRegistered) + { + before_shmem_exit(AtProcExit_Twophase, 0); + twophaseExitRegistered = true; + } + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + /* Check for conflicting GID */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + if (strcmp(gxact->gid, gid) == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("transaction identifier \"%s\" is already in use", + gid))); + } + } + + /* Get a free gxact from the freelist */ + if (TwoPhaseState->freeGXacts == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of prepared transactions reached"), + errhint("Increase max_prepared_transactions (currently %d).", + max_prepared_xacts))); + gxact = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = gxact->next; + + + gxact->prepared_at = prepared_at; + /* initialize LSN to passed in values */ + gxact->prepare_start_lsn = prepare_start_lsn; + gxact->prepare_end_lsn = prepare_end_lsn; + gxact->xid = xid; + gxact->owner = owner; + gxact->locking_backend = InvalidBackendId; + gxact->valid = false; + gxact->ondisk = false; + gxact->inredo = true; /* yes, added in redo */ + strcpy(gxact->gid, gid); + + /* And insert it into the active array */ + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + + LWLockRelease(TwoPhaseStateLock); + + return gxact; +} + +/* * GXactLoadSubxactData * * If the transaction being persisted had any subtransactions, this must @@ -1242,9 +1366,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) * Reads 2PC data from xlog. During checkpoint this data will be moved to * twophase files and ReadTwoPhaseFile should be used instead. * - * Note clearly that this function accesses WAL during normal operation, similarly - * to the way WALSender or Logical Decoding would do. It does not run during - * crash recovery or standby processing. + * Note clearly that this function can access WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. + * */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1253,8 +1377,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - Assert(!RecoveryInProgress()); - xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, @@ -1624,9 +1746,8 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - if (gxact->valid && + if ((gxact->valid || gxact->inredo) && !gxact->ondisk && gxact->prepare_end_lsn <= redo_horizon) { @@ -1634,7 +1755,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) int len; XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); - RecreateTwoPhaseFile(pgxact->xid, buf, len); + RecreateTwoPhaseFile(gxact->xid, buf, len); gxact->ondisk = true; pfree(buf); serialized_xacts++; @@ -1691,6 +1812,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId *xids = NULL; int nxids = 0; int allocsize = 0; + int i; cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) @@ -1702,7 +1824,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) char *buf; TwoPhaseFileHeader *hdr; TransactionId *subxids; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); @@ -1801,6 +1922,90 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) } FreeDir(cldir); + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + TransactionId xid; + int len; + int j; + char *buf; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* only look at entries added by redo and not already on disk */ + if (!gxact->inredo || gxact->ondisk) + continue; + + xid = gxact->xid; + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + ereport(WARNING, + (errmsg("removing future two-phase state data from memory \"%u\"", + xid))); + PrepareRedoRemove(xid); + continue; + } + + /* Read xlog data */ + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + + if (TransactionIdPrecedes(xid, result)) + result = xid; + + /* + * Examine subtransaction XIDs ... they should all follow main + * XID, and they may force us to advance nextXid. + * + * We don't expect anyone else to modify nextXid, hence we don't + * need to hold a lock while examining it. We still acquire the + * lock to modify it, though. + */ + subxids = (TransactionId *) (buf + + MAXALIGN(sizeof(TwoPhaseFileHeader)) + + MAXALIGN(hdr->gidlen)); + for (j = 0; j < hdr->nsubxacts; j++) + { + TransactionId subxid = subxids[j]; + + Assert(TransactionIdFollows(subxid, xid)); + if (TransactionIdFollowsOrEquals(subxid, + ShmemVariableCache->nextXid)) + { + LWLockAcquire(XidGenLock, LW_EXCLUSIVE); + ShmemVariableCache->nextXid = subxid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + LWLockRelease(XidGenLock); + } + } + + if (xids_p) + { + if (nxids == allocsize) + { + if (nxids == 0) + { + allocsize = 10; + xids = palloc(allocsize * sizeof(TransactionId)); + } + else + { + allocsize = allocsize * 2; + xids = repalloc(xids, allocsize * sizeof(TransactionId)); + } + } + xids[nxids++] = xid; + } + + pfree(buf); + } + LWLockRelease(TwoPhaseStateLock); + if (xids_p) { *xids_p = xids; @@ -1827,6 +2032,7 @@ StandbyRecoverPreparedTransactions(bool overwriteOK) { DIR *cldir; struct dirent *clde; + int i; cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) @@ -1838,7 +2044,6 @@ StandbyRecoverPreparedTransactions(bool overwriteOK) char *buf; TwoPhaseFileHeader *hdr; TransactionId *subxids; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); @@ -1894,6 +2099,56 @@ StandbyRecoverPreparedTransactions(bool overwriteOK) } } FreeDir(cldir); + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + TransactionId xid; + int len; + char *buf; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + int j; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* only look at entries added by redo and not already on disk */ + if (!gxact->inredo || gxact->ondisk) + continue; + + xid = gxact->xid; + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + ereport(WARNING, + (errmsg("removing stale 2PC data from shared memory"))); + PrepareRedoRemove(xid); + continue; + } + + /* Read xlog data */ + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + + /* + * Examine subtransaction XIDs ... they should all follow main + * XID + */ + subxids = (TransactionId *) (buf + + MAXALIGN(sizeof(TwoPhaseFileHeader)) + + MAXALIGN(hdr->gidlen)); + for (j = 0; j < hdr->nsubxacts; j++) + { + TransactionId subxid = subxids[j]; + + Assert(TransactionIdFollows(subxid, xid)); + SubTransSetParent(xid, subxid, overwriteOK); + } + pfree(buf); + } + LWLockRelease(TwoPhaseStateLock); } /* @@ -1910,6 +2165,7 @@ RecoverPreparedTransactions(void) DIR *cldir; struct dirent *clde; bool overwriteOK = false; + int i; snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); @@ -1926,7 +2182,6 @@ RecoverPreparedTransactions(void) TransactionId *subxids; GlobalTransaction gxact; const char *gid; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); @@ -1990,7 +2245,8 @@ RecoverPreparedTransactions(void) */ gxact = MarkAsPreparing(xid, gid, hdr->prepared_at, - hdr->owner, hdr->database); + hdr->owner, hdr->database, + InvalidXLogRecPtr, InvalidXLogRecPtr); gxact->ondisk = true; GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); @@ -2019,6 +2275,109 @@ RecoverPreparedTransactions(void) } } FreeDir(cldir); + + /* + * Don't need a lock in the recovery phase. + */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + TransactionId xid; + char *buf; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + const char *gid; + int len; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + GlobalTransaction gxactnew; + + /* only look at entries added by redo and not already on disk */ + if (!gxact->inredo || gxact->ondisk) + continue; + + xid = gxact->xid; + + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + ereport(WARNING, + (errmsg("removing stale 2PC data from shared memory %u", xid))); + PrepareRedoRemove(xid); + continue; + } + + /* Read xlog data */ + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); + + ereport(LOG, + (errmsg("recovering prepared transaction %u from shared memory", xid))); + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + bufptr += MAXALIGN(hdr->gidlen); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + + /* + * It's possible that SubTransSetParent has been set before, if + * the prepared transaction generated xid assignment records. Test + * here must match one used in AssignTransactionId(). + */ + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) + overwriteOK = true; + + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart. Note that + * we are linking all the subtransactions directly to the + * top-level XID; there may originally have been a more complex + * hierarchy, but there's no need to restore that exactly. + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], xid, overwriteOK); + + /* + * Recreate its GXACT and dummy PGPROC + */ + gxactnew = MarkAsPreparing(xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database, + gxact->prepare_start_lsn, + gxact->prepare_end_lsn); + + Assert(gxactnew == gxact); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); + MarkAsPrepared(gxact); + + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, xid, twophase_recover_callbacks); + + /* + * Release locks held by the standby process after we process each + * prepared transaction. As a result, we don't need too many + * additional locks at any one time. + */ + if (InHotStandby) + StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); + + /* + * We're done with recovering this transaction. Clear + * MyLockedGxact, like we do in PrepareTransaction() during normal + * operation. + */ + PostPrepare_Twophase(); + + pfree(buf); + } } /* @@ -2163,3 +2522,83 @@ RecordTransactionAbortPrepared(TransactionId xid, */ SyncRepWaitForLSN(recptr, false); } + +/* + * PrepareRedoAdd + * + * Store pointers to the start/end of the WAL record along with the xid in + * a gxact entry in shared memory TwoPhaseState structure + */ +void +PrepareRedoAdd(XLogReaderState *record) +{ + char *buf = XLogRecGetData(record); + TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; + char *bufptr; + const char *gid; + GlobalTransaction gxact; + + Assert(RecoveryInProgress()); + + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + + /* + * Add a GXACT entry + */ + gxact = MarkAsPreparingInRedo(hdr->xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database, + record->ReadRecPtr, + record->EndRecPtr); + + elog(DEBUG2, "Adding 2PC data to shared memory %u", hdr->xid); +} + +/* + * PrepareRedoRemove + * + * Remove the corresponding gxact entry from TwoPhaseState. Also + * remove the 2PC file. + */ +void +PrepareRedoRemove(TransactionId xid) +{ + GlobalTransaction gxact; + int i; + bool found = false; + + Assert(RecoveryInProgress()); + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + + if (gxact->xid == xid) + { + Assert(gxact->inredo); + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + if (found) + { + /* + * And now we can clean up any files we may have left. + */ + if (gxact->ondisk) + RemoveTwoPhaseFile(xid, true); + RemoveGXact(gxact); + elog(DEBUG2, "Removing 2PC data from shared memory %u", xid); + } + else + { + /* + * Entry could be on disk. Call with giveWarning=false + * since it can be expected during replay. + */ + RemoveTwoPhaseFile(xid, false); + } +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index f6f136d..8f027a9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2294,7 +2294,8 @@ PrepareTransaction(void) * GID is invalid or already in use. */ gxact = MarkAsPreparing(xid, prepareGID, prepared_at, - GetUserId(), MyDatabaseId); + GetUserId(), MyDatabaseId, + InvalidXLogRecPtr, InvalidXLogRecPtr); prepareGID = NULL; /* @@ -5606,7 +5607,9 @@ xact_redo(XLogReaderState *record) Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, record->EndRecPtr, XLogRecGetOrigin(record)); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + PrepareRedoRemove(parsed.twophase_xid); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5626,14 +5629,18 @@ xact_redo(XLogReaderState *record) { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_abort(&parsed, parsed.twophase_xid); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + PrepareRedoRemove(parsed.twophase_xid); } } else if (info == XLOG_XACT_PREPARE) { - /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(XLogRecGetXid(record), - XLogRecGetData(record), XLogRecGetDataLen(record)); + /* + * Store xid and start/end pointers of the WAL record in + * TwoPhaseState gxact entry. + */ + PrepareRedoAdd(record); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b2b7848..063b946 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xlogreader.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -38,7 +39,12 @@ extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid); extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, - Oid owner, Oid databaseid); + Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn); +extern GlobalTransaction MarkAsPreparingInRedo(TransactionId xid, const char *gid, + TimestampTz prepared_at, + Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn); extern void StartPrepare(GlobalTransaction gxact); extern void EndPrepare(GlobalTransaction gxact); @@ -56,4 +62,6 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void PrepareRedoAdd(XLogReaderState *record); +extern void PrepareRedoRemove(TransactionId xid); #endif /* TWOPHASE_H */ diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl new file mode 100755 index 0000000..dd2c708 --- /dev/null +++ b/src/test/recovery/t/009_twophase.pl @@ -0,0 +1,315 @@ +# Tests dedicated to two-phase commit in recovery +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 13; + +# Setup master node +my $node_master = get_new_node("master"); +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( + max_prepared_transactions = 10 + log_checkpoints = true +)); +$node_master->start; +$node_master->backup('master_backup'); +$node_master->psql('postgres', "create table t(id int)"); + +# Setup slave node +my $node_slave = get_new_node('slave'); +$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1); +$node_slave->start; + +# Switch to synchronous replication +$node_master->append_conf('postgresql.conf', qq( + synchronous_standby_names = '*' +)); +$node_master->psql('postgres', "select pg_reload_conf()"); + +my $psql_out = ''; +my $psql_rc = ''; + +############################################################################### +# Check that we can commit and abort transaction after soft restart. +# Here checkpoint happens before shutdown and no WAL replay will occur at next +# startup. In this case postgres re-creates shared-memory state from twophase +# files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + begin; + insert into t values (142); + savepoint s1; + insert into t values (143); + prepare transaction 'y';"); +$node_master->stop; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared transaction after restart.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared transaction after restart.'); + +############################################################################### +# Check that we can commit and abort after a hard restart. +# At next startup, WAL replay will re-create shared memory state for prepared +# transaction using dedicated WAL records. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + begin; + insert into t values (142); + savepoint s1; + insert into t values (143); + prepare transaction 'y';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared transaction after teardown.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared transaction after teardown.'); + +############################################################################### +# Check that WAL replay can handle several transactions with same GID name. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + commit prepared 'x'; + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Replay several transactions with same GID.'); + +############################################################################### +# Check that WAL replay cleans up its shared memory state and releases locks +# while replaying transaction commits. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + commit prepared 'x';"); +$node_master->teardown_node; +$node_master->start; +$psql_rc = $node_master->psql('postgres', "begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + -- This prepare can fail due to conflicting GID or locks conflicts if + -- replay did not fully cleanup its state on previous commit. + prepare transaction 'x';"); +is($psql_rc, '0', "Cleanup of shared memory state for 2PC commit"); + +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that WAL replay will cleanup its shared memory state on running slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", + stdout => \$psql_out); +is($psql_out, '0', + "Cleanup of shared memory state on running standby without checkpoint."); + +############################################################################### +# Same as in previous case, but let's force checkpoint on slave between +# prepare and commit to use on-disk twophase files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_slave->psql('postgres', "checkpoint;"); +$node_master->psql('postgres', "commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", + stdout => \$psql_out); +is($psql_out, '0', + "Cleanup of shared memory state on running standby after checkpoint."); + +############################################################################### +# Check that prepared transactions can be committed on promoted slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$psql_rc = $node_slave->psql('postgres', "commit prepared 'x';"); +is($psql_rc, '0', "Restore of prepared transaction on promoted slave."); + +# change roles +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; + +############################################################################### +# Check that prepared transactions are replayed after soft restart of standby +# while master is down. Since standby knows that master is down it uses a +# different code path on startup to ensure that the status of transactions is +# consistent. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_master->stop; +$node_slave->restart; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '1', + "Restore prepared transactions from files with master down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that prepared transactions are correctly replayed after slave hard +# restart while master is down. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (242); + savepoint s1; + insert into t values (243); + prepare transaction 'x'; + "); +$node_master->stop; +$node_slave->teardown_node; +$node_slave->start; +$node_slave->promote; +$node_slave->poll_query_until('postgres', + "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '1', + "Restore prepared transactions from records with master down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres', "commit prepared 'x'"); + + +############################################################################### +# Check for a lock conflict between prepared transaction with DDL inside and replay of +# XLOG_STANDBY_LOCK wal record. +############################################################################### + +$node_master->psql('postgres', " + begin; + create table t2(id int); + savepoint s1; + insert into t2 values (42); + prepare transaction 'x'; + -- checkpoint will issue XLOG_STANDBY_LOCK that can conflict with lock + -- held by 'create table' statement + checkpoint; + commit prepared 'x';"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '0', "Replay prepared transaction with DDL."); + + +############################################################################### +# Check that replay will correctly set SUBTRANS and properly advance nextXid +# so that it won't conflict with savepoint xids. +############################################################################### + +$node_master->psql('postgres', " + begin; + delete from t; + insert into t values (43); + savepoint s1; + insert into t values (43); + savepoint s2; + insert into t values (43); + savepoint s3; + insert into t values (43); + savepoint s4; + insert into t values (43); + savepoint s5; + insert into t values (43); + prepare transaction 'x'; + checkpoint;"); + +$node_master->stop; +$node_master->start; +$node_master->psql('postgres', " + -- here we can get xid of previous savepoint if nextXid + -- wasn't properly advanced + begin; + insert into t values (142); + abort; + commit prepared 'x';"); + +$node_master->psql('postgres', "select count(*) from t", + stdout => \$psql_out); +is($psql_out, '6', "Check nextXid handling for prepared subtransactions");