diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a65048b..bc9bffa 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -45,8 +45,8 @@ * fsynced * * If COMMIT happens after checkpoint then backend reads state data from * files - * * In case of crash replay will move data from xlog to files, if that - * hasn't happened before. XXX TODO - move to shmem in replay also + * + * The same procedure happens during replication and crash recovery. * *------------------------------------------------------------------------- */ @@ -578,6 +578,37 @@ LockGXact(const char *gid, Oid user) } /* + * LockGXactByXid + * + * Find prepared transaction by xid and lock corresponding gxact. + * This is used during recovery as an alternative to LockGXact(). + */ +static GlobalTransaction +LockGXactByXid(TransactionId xid) +{ + int i; + GlobalTransaction gxact = NULL; + PGXACT *pgxact; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (TransactionIdEquals(xid, pgxact->xid)) + { + gxact->locking_backend = MyBackendId; + MyLockedGxact = gxact; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + + return gxact; +} + +/* * RemoveGXact * Remove the prepared transaction from the shared memory array. * @@ -1241,9 +1272,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) * Reads 2PC data from xlog. During checkpoint this data will be moved to * twophase files and ReadTwoPhaseFile should be used instead. * - * Note clearly that this function accesses WAL during normal operation, similarly - * to the way WALSender or Logical Decoding would do. It does not run during - * crash recovery or standby processing. + * Note clearly that this function access WAL not only during recovery/replay + * but also during normal operation, similarly to the way WALSender or + * Logical Decoding would do. */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1252,8 +1283,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - Assert(!RecoveryInProgress()); - xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, @@ -1296,12 +1325,30 @@ StandbyTransactionIdIsPrepared(TransactionId xid) char *buf; TwoPhaseFileHeader *hdr; bool result; + int i; Assert(TransactionIdIsValid(xid)); if (max_prepared_xacts <= 0) return false; /* nothing to do */ + /* + * At first check prepared tx that wasn't yet moved to disk. + */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (TransactionIdEquals(pgxact->xid, xid)) + { + LWLockRelease(TwoPhaseStateLock); + return true; + } + } + LWLockRelease(TwoPhaseStateLock); + /* Read and validate file */ buf = ReadTwoPhaseFile(xid, false); if (buf == NULL) @@ -1316,12 +1363,17 @@ StandbyTransactionIdIsPrepared(TransactionId xid) } /* - * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED + * FinishGXact + * + * Do the actual finish of COMMIT/ABORT PREPARED. It is a caller + * responsibility to properly lock corresponding gxact. + * + * This function can be called during replay to clean memory state + * for previously prepared xact. In that case actions are the same + * as in normal mode but without any writes to WAL or files. */ -void -FinishPreparedTransaction(const char *gid, bool isCommit) +static void FinishGXact(GlobalTransaction gxact, bool isCommit) { - GlobalTransaction gxact; PGPROC *proc; PGXACT *pgxact; TransactionId xid; @@ -1337,11 +1389,6 @@ FinishPreparedTransaction(const char *gid, bool isCommit) SharedInvalidationMessage *invalmsgs; int i; - /* - * Validate the GID, and lock the GXACT to ensure that two backends do not - * try to commit the same GID at once. - */ - gxact = LockGXact(gid, GetUserId()); proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; xid = pgxact->xid; @@ -1385,16 +1432,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * progress), then run the post-commit or post-abort callbacks. The * callbacks will release the locks the transaction held. */ - if (isCommit) - RecordTransactionCommitPrepared(xid, + if (!RecoveryInProgress()) + { + if (isCommit) + RecordTransactionCommitPrepared(xid, hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, hdr->initfileinval); - else - RecordTransactionAbortPrepared(xid, + else + RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, hdr->nabortrels, abortrels); + } ProcArrayRemove(proc, latestXid); @@ -1425,12 +1475,15 @@ FinishPreparedTransaction(const char *gid, bool isCommit) delrels = abortrels; ndelrels = hdr->nabortrels; } - for (i = 0; i < ndelrels; i++) + if (!RecoveryInProgress()) { - SMgrRelation srel = smgropen(delrels[i], InvalidBackendId); + for (i = 0; i < ndelrels; i++) + { + SMgrRelation srel = smgropen(delrels[i], InvalidBackendId); - smgrdounlink(srel, false); - smgrclose(srel); + smgrdounlink(srel, false); + smgrclose(srel); + } } /* @@ -1439,11 +1492,14 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * Relcache init file invalidation requires processing both before and * after we send the SI messages. See AtEOXact_Inval() */ - if (hdr->initfileinval) - RelationCacheInitFilePreInvalidate(); - SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs); - if (hdr->initfileinval) - RelationCacheInitFilePostInvalidate(); + if (!RecoveryInProgress()) + { + if (hdr->initfileinval) + RelationCacheInitFilePreInvalidate(); + SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs); + if (hdr->initfileinval) + RelationCacheInitFilePostInvalidate(); + } /* And now do the callbacks */ if (isCommit) @@ -1469,6 +1525,49 @@ FinishPreparedTransaction(const char *gid, bool isCommit) } /* + * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED + */ +void +FinishPreparedTransaction(const char *gid, bool isCommit) +{ + GlobalTransaction gxact; + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to commit the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + FinishGXact(gxact, isCommit); +} + +/* + * XlogRedoFinishPrepared() + * + * This function is called during replay when xlog reader faces 2pc commit or + * abort record. That function should clean up memory state that was created + * while replaying prepare xlog record. + */ +void +XlogRedoFinishPrepared(TransactionId xid, bool isCommit) +{ + GlobalTransaction gxact; + + Assert(RecoveryInProgress()); + + gxact = LockGXactByXid(xid); + + /* + * If requested xid wasn't found that means that prepare record was moved + * to files before our replay started. That's okay and we have nothing to + * clean/finish. + */ + if (!gxact) + return; + + FinishGXact(gxact, isCommit); +} + +/* * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record. */ static void @@ -1690,7 +1789,48 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId *xids = NULL; int nxids = 0; int allocsize = 0; + int i; + /* + * We need to check the PGXACT array for prepared transactions that doesn't + * have any state file in case of a slave restart with the master being off. + */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (!gxact->valid) + continue; + + if (TransactionIdPrecedes(pgxact->xid, result)) + result = pgxact->xid; + + if (xids_p) + { + if (nxids == allocsize) + { + if (nxids == 0) + { + allocsize = 10; + xids = palloc(allocsize * sizeof(TransactionId)); + } + else + { + allocsize = allocsize * 2; + xids = repalloc(xids, allocsize * sizeof(TransactionId)); + } + } + xids[nxids++] = pgxact->xid; + } + } + LWLockRelease(TwoPhaseStateLock); + + + /* + * And now scan files in pg_twophase directory + */ cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { @@ -1701,7 +1841,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) char *buf; TwoPhaseFileHeader *hdr; TransactionId *subxids; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); @@ -1809,102 +1948,105 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) } /* - * StandbyRecoverPreparedTransactions + * RecoverPreparedFromBuffer + * + * Parse data in given buffer (that can be a pointer to WAL record or file) + * and load shared-memory state for that prepared transaction. * - * Scan the pg_twophase directory and setup all the required information to - * allow standby queries to treat prepared transactions as still active. - * This is never called at the end of recovery - we use - * RecoverPreparedTransactions() at that point. + * It's a caller responsibility to call MarkAsPrepared() on returned gxact. * - * Currently we simply call SubTransSetParent() for any subxids of prepared - * transactions. If overwriteOK is true, it's OK if some XIDs have already - * been marked in pg_subtrans. */ -void -StandbyRecoverPreparedTransactions(bool overwriteOK) +static GlobalTransaction +RecoverPreparedFromBuffer(char *buf, bool forceOverwriteOK) { - DIR *cldir; - struct dirent *clde; + char *bufptr; + const char *gid; + TransactionId *subxids; + bool overwriteOK = false; + int i; + GlobalTransaction gxact; + TwoPhaseFileHeader *hdr; - cldir = AllocateDir(TWOPHASE_DIR); - while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) - { - if (strlen(clde->d_name) == 8 && - strspn(clde->d_name, "0123456789ABCDEF") == 8) - { - TransactionId xid; - char *buf; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - int i; + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + bufptr += MAXALIGN(hdr->gidlen); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); - xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + /* + * It's possible that SubTransSetParent has been set before, if + * the prepared transaction generated xid assignment records. Test + * here must match one used in AssignTransactionId(). + */ + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) + overwriteOK = true; - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) - { - ereport(WARNING, - (errmsg("removing stale two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + /* + * Caller can also force overwriteOK. + */ + if (forceOverwriteOK) + overwriteOK = true; - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart. Note that + * we are linking all the subtransactions directly to the + * top-level XID; there may originally have been a more complex + * hierarchy, but there's no need to restore that exactly. + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], hdr->xid, overwriteOK); - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - if (!TransactionIdEquals(hdr->xid, xid)) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - pfree(buf); - continue; - } + /* + * Recreate its GXACT and dummy PGPROC + */ + gxact = MarkAsPreparing(hdr->xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); - /* - * Examine subtransaction XIDs ... they should all follow main - * XID. - */ - subxids = (TransactionId *) - (buf + MAXALIGN(sizeof(TwoPhaseFileHeader))); - for (i = 0; i < hdr->nsubxacts; i++) - { - TransactionId subxid = subxids[i]; + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, hdr->xid, twophase_recover_callbacks); - Assert(TransactionIdFollows(subxid, xid)); - SubTransSetParent(xid, subxid, overwriteOK); - } - } - } - FreeDir(cldir); + /* + * Release locks held by the standby process after we process each + * prepared transaction. As a result, we don't need too many + * additional locks at any one time. + */ + if (InHotStandby) + StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids); + + /* + * We're done with recovering this transaction. Clear + * MyLockedGxact, like we do in PrepareTransaction() during normal + * operation. + */ + PostPrepare_Twophase(); + + return gxact; } /* - * RecoverPreparedTransactions + * RecoverPreparedFromFiles * * Scan the pg_twophase directory and reload shared-memory state for each * prepared transaction (reacquire locks, etc). This is run during database * startup. */ void -RecoverPreparedTransactions(void) +RecoverPreparedFromFiles(bool forceOverwriteOK) { char dir[MAXPGPATH]; DIR *cldir; struct dirent *clde; - bool overwriteOK = false; snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); @@ -1916,15 +2058,27 @@ RecoverPreparedTransactions(void) { TransactionId xid; char *buf; - char *bufptr; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; GlobalTransaction gxact; - const char *gid; int i; + PGXACT *pgxact; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + /* Already recovered from WAL? */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (TransactionIdEquals(xid, pgxact->xid)) + { + LWLockRelease(TwoPhaseStateLock); + goto next_file; + } + } + LWLockRelease(TwoPhaseStateLock); + /* Already processed? */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { @@ -1949,73 +2103,44 @@ RecoverPreparedTransactions(void) ereport(LOG, (errmsg("recovering prepared transaction %u", xid))); - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - Assert(TransactionIdEquals(hdr->xid, xid)); - bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); - gid = (const char *) bufptr; - bufptr += MAXALIGN(hdr->gidlen); - subxids = (TransactionId *) bufptr; - bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); - bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); - bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); - bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); - - /* - * It's possible that SubTransSetParent has been set before, if - * the prepared transaction generated xid assignment records. Test - * here must match one used in AssignTransactionId(). - */ - if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || - XLogLogicalInfoActive())) - overwriteOK = true; - - /* - * Reconstruct subtrans state for the transaction --- needed - * because pg_subtrans is not preserved over a restart. Note that - * we are linking all the subtransactions directly to the - * top-level XID; there may originally have been a more complex - * hierarchy, but there's no need to restore that exactly. - */ - for (i = 0; i < hdr->nsubxacts; i++) - SubTransSetParent(subxids[i], xid, overwriteOK); - - /* - * Recreate its GXACT and dummy PGPROC - */ - gxact = MarkAsPreparing(xid, gid, - hdr->prepared_at, - hdr->owner, hdr->database); + gxact = RecoverPreparedFromBuffer(buf, forceOverwriteOK); gxact->ondisk = true; - GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); - /* - * Recover other state (notably locks) using resource managers - */ - ProcessRecords(bufptr, xid, twophase_recover_callbacks); - - /* - * Release locks held by the standby process after we process each - * prepared transaction. As a result, we don't need too many - * additional locks at any one time. - */ - if (InHotStandby) - StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); - - /* - * We're done with recovering this transaction. Clear - * MyLockedGxact, like we do in PrepareTransaction() during normal - * operation. - */ - PostPrepare_Twophase(); - pfree(buf); } + +next_file: + continue; + } FreeDir(cldir); } + +/* + * RecoverPreparedFromXLOG + * + * To avoid creation of state files during replay we registering + * prepare xlog records in shared memory in the same way as it happens + * while not in recovery. If replay faces commit xlog record before + * checkpoint/restartpoint happens then we avoid using files at all. + * + * We need this behaviour because the speed of the 2PC replay on the replica + * should be at least the same as the 2PC transaction speed of the master. + */ +void +RecoverPreparedFromXLOG(XLogReaderState *record) +{ + GlobalTransaction gxact; + + gxact = RecoverPreparedFromBuffer((char *) XLogRecGetData(record), false); + gxact->prepare_start_lsn = record->ReadRecPtr; + gxact->prepare_end_lsn = record->EndRecPtr; + MarkAsPrepared(gxact); +} + + /* * RecordTransactionCommitPrepared * diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 7e37331..9323ba2 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5588,7 +5588,7 @@ xact_redo(XLogReaderState *record) Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, record->EndRecPtr, XLogRecGetOrigin(record)); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + XlogRedoFinishPrepared(parsed.twophase_xid, true); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5608,14 +5608,12 @@ xact_redo(XLogReaderState *record) { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_abort(&parsed, parsed.twophase_xid); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + XlogRedoFinishPrepared(parsed.twophase_xid, false); } } else if (info == XLOG_XACT_PREPARE) { - /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(XLogRecGetXid(record), - XLogRecGetData(record), XLogRecGetDataLen(record)); + RecoverPreparedFromXLOG(record); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f9644db..11c06b4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6630,7 +6630,7 @@ StartupXLOG(void) ProcArrayApplyRecoveryInfo(&running); - StandbyRecoverPreparedTransactions(false); + RecoverPreparedFromFiles(false); } } @@ -7383,7 +7383,7 @@ StartupXLOG(void) TrimMultiXact(); /* Reload shared-memory state for prepared transactions */ - RecoverPreparedTransactions(); + RecoverPreparedFromFiles(false); /* * Shutdown the recovery environment. This must occur after @@ -9297,7 +9297,7 @@ xlog_redo(XLogReaderState *record) ProcArrayApplyRecoveryInfo(&running); - StandbyRecoverPreparedTransactions(true); + RecoverPreparedFromFiles(true); } /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b7ce0c6..416ef5e 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -17,6 +17,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "access/xlogreader.h" /* * GlobalTransactionData is defined in twophase.c; other places have no @@ -46,8 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); -extern void StandbyRecoverPreparedTransactions(bool overwriteOK); -extern void RecoverPreparedTransactions(void); +extern void RecoverPreparedFromFiles(bool overwriteOK); +extern void RecoverPreparedFromXLOG(XLogReaderState *record); extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); @@ -56,4 +57,5 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void XlogRedoFinishPrepared(TransactionId xid, bool isCommit); #endif /* TWOPHASE_H */ diff --git a/src/test/recovery/t/007_twophase.pl b/src/test/recovery/t/007_twophase.pl new file mode 100644 index 0000000..cfc7316 --- /dev/null +++ b/src/test/recovery/t/007_twophase.pl @@ -0,0 +1,236 @@ +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 11; + +# Setup master node +my $node_master = get_new_node("master"); +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +)); +$node_master->start; +$node_master->backup('master_backup'); +$node_master->psql('postgres', "create table t(id int)"); + +# Setup master node +my $node_slave = get_new_node('slave'); +$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1); +$node_slave->start; + +# Switch to synchronous replication +$node_master->append_conf('postgresql.conf', qq( + synchronous_standby_names = '*' +)); +$node_master->psql('postgres', "select pg_reload_conf()"); + +my $psql_out = ''; +my $psql_rc = ''; + +############################################################################### +# Check that we can commit and abort tx after soft restart. +# Here checkpoint happens before shutdown and no WAL replay will not occur +# during start. So postgres should re-create memory state from files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + begin; + insert into t values (142); + prepare transaction 'y';"); +$node_master->stop; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared tx after restart.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared tx after restart.'); + +############################################################################### +# Check that we can commit and abort after hard restart. +# On startup WAL replay will re-create memory for global transactions that +# happend after the last checkpoint. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + prepare transaction 'x'; + begin; + insert into t values (142); + prepare transaction 'y';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared tx after teardown.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared tx after teardown.'); + +############################################################################### +# Check that we can replay several tx with same name. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + prepare transaction 'x'; + commit prepared 'x'; + begin; + insert into t values (42); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Replay several tx with same name.'); + +############################################################################### +# Check that WAL replay will cleanup it's memory state and release locks while +# replaying commit. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + commit prepared 'x';"); +$node_master->teardown_node; +$node_master->start; +$psql_rc = $node_master->psql('postgres'," + begin; + insert into t values (42); + -- This prepare can fail due to 2pc identifier or locks conflicts if replay + -- didn't fully cleanup it's state on commit. + prepare transaction 'x';"); +is($psql_rc, '0', "Check that replay will cleanup it's memory state"); + +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that WAL replay will cleanup it's memory state on running slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + commit prepared 'x'; + "); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", stdout => \$psql_out); +is($psql_out, '0', "Check that replay will cleanup it's memory state on running slave"); + +############################################################################### +# The same as in previous case, but let's force checkpoint on slave between +# prepare and commit. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + "); +$node_slave->psql('postgres',"checkpoint;"); +$node_master->psql('postgres', "commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", stdout => \$psql_out); +is($psql_out, '0', "Check that replay will cleanup it's memory state on slave after checkpoint"); + +############################################################################### +# Check that we can commit transaction on promoted slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + "); +$node_master->teardown_node; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$psql_rc = $node_slave->psql('postgres', "commit prepared 'x';"); +is($psql_rc, '0', "Restore prepared transaction on promoted slave."); + +# change roles +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; + +############################################################################### +# Check that we restore prepared xacts after slave soft restart while master is +# down. Since slave knows that master is down it uses different code path on +# start. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + "); +$node_master->stop; +$node_slave->restart; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres',"select count(*) from pg_prepared_xacts", stdout => \$psql_out); +is($psql_out, '1', "Restore prepared xacts after slave soft restart while master is down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres',"commit prepared 'x'"); + +############################################################################### +# Check that we restore prepared xacts after slave hard restart while master is +# down. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (242); + prepare transaction 'x'; + "); +$node_master->stop; +$node_slave->teardown_node; +$node_slave->start; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres',"select count(*) from pg_prepared_xacts", stdout => \$psql_out); +is($psql_out, '1', "Restore prepared xacts after slave hard restart while master is down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres',"commit prepared 'x'"); + + +############################################################################### +# Commit prepared on master while slave is down. +############################################################################### + +# Switch to asynchronous replication +#$node_master->append_conf('postgresql.conf', qq( +# synchronous_standby_names = '' +#)); +#$node_master->psql('postgres', "select pg_reload_conf()"); + + diff --git a/src/test/recovery/t/twophase_recovery_bug.pl b/src/test/recovery/t/twophase_recovery_bug.pl new file mode 100644 index 0000000..9b03878 --- /dev/null +++ b/src/test/recovery/t/twophase_recovery_bug.pl @@ -0,0 +1,45 @@ +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Setup master node +my $node_master = get_new_node("master"); +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +)); +$node_master->start; +$node_master->backup('master_backup'); +$node_master->psql('postgres', "create table t(id int)"); + +# Setup slave node +my $node_slave = get_new_node('slave'); +$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1); +$node_slave->start; + +my $psql_out = ''; +my $psql_rc = ''; + +$node_master->psql('postgres', " + begin; + insert into t values(0); + create table t1(id int); + insert into t1 values(1); + create table t2(id int); + insert into t2 values(2); + savepoint s1; + drop table t1; + select * from t for update; + select * from t2 for share; + prepare transaction 'x'; +"); +sleep 2; # wait for changes to arrive on slave +$node_slave->teardown_node; +$node_master->psql('postgres',"commit prepared 'x'"); +$node_slave->start; +$node_slave->psql('postgres',"select count(*) from pg_prepared_xacts", stdout => \$psql_out); +is($psql_out, '0', "Commit prepared on master while slave is down."); +$node_slave->psql('postgres',"select sum(id) from t2", stdout => \$psql_out); +is($psql_out, '2', "Check that tx changes are visible.");