diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2251b02..80aba8c 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -25,11 +25,28 @@
* what keeps the XID considered running by TransactionIdIsInProgress.
* It is also convenient as a PGPROC to hook the gxact's locks to.
*
- * In order to survive crashes and shutdowns, all prepared
- * transactions must be stored in permanent storage. This includes
- * locking information, pending notifications etc. All that state
- * information is written to the per-transaction state file in
- * the pg_twophase directory.
+ * Information to recover prepared transactions in case of crash is
+ * now stored in WAL for the common case. In some cases there will be
+ * an extended period between preparing a GXACT and commit/abort, in
+ * which case we need to separately record prepared transaction data
+ * in permanent storage. This includes locking information, pending
+ * notifications etc. All that state information is written to the
+ * per-transaction state file in the pg_twophase directory.
+ * All prepared transactions will be written prior to shutdown.
+ *
+ * Life track of state data is following:
+ *
+ * * On PREPARE TRANSACTION backend writes state data only to the WAL and
+ * stores pointer to the start of the WAL record in
+ * gxact->prepare_start_lsn.
+ * * If COMMIT occurs before checkpoint then backend reads data from WAL
+ * using prepare_start_lsn.
+ * * On checkpoint state data copied to files in pg_twophase directory and
+ * fsynced
+ * * If COMMIT happens after checkpoint then backend reads state data from
+ * files
+ * * In case of crash replay will move data from xlog to files, if that
+ * hasn't happened before. XXX TODO - move to shmem in replay also
*
*-------------------------------------------------------------------------
*/
@@ -51,6 +68,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "access/xlogreader.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
@@ -60,6 +78,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/logicalfuncs.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
@@ -117,10 +136,21 @@ typedef struct GlobalTransactionData
int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
- XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
+
+ /*
+ * Note that we need to keep track of two LSNs for each GXACT.
+ * We keep track of the start LSN because this is the address we must
+ * use to read state data back from WAL when committing a prepared GXACT.
+ * We keep track of the end LSN because that is the LSN we need to wait
+ * for prior to commit.
+ */
+ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
+ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
+
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
+ bool ondisk; /* TRUE if prepare state file is on disk */
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
} GlobalTransactionData;
@@ -166,6 +196,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
+static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
/*
* Initialization of shared memory
@@ -398,8 +429,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
- /* initialize LSN to 0 (start of WAL) */
- gxact->prepare_lsn = 0;
+ /* initialize LSN to InvalidXLogRecPtr */
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ gxact->prepare_end_lsn = InvalidXLogRecPtr;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
@@ -579,41 +611,6 @@ RemoveGXact(GlobalTransaction gxact)
}
/*
- * TransactionIdIsPrepared
- * True iff transaction associated with the identifier is prepared
- * for two-phase commit
- *
- * Note: only gxacts marked "valid" are considered; but notice we do not
- * check the locking status.
- *
- * This is not currently exported, because it is only needed internally.
- */
-static bool
-TransactionIdIsPrepared(TransactionId xid)
-{
- bool result = false;
- int i;
-
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
- for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
- {
- GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-
- if (gxact->valid && pgxact->xid == xid)
- {
- result = true;
- break;
- }
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- return result;
-}
-
-/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
*
@@ -772,7 +769,7 @@ TwoPhaseGetGXact(TransactionId xid)
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
* repeatedly for the same XID. We can save work with a simple cache.
*/
- if (xid == cached_xid)
+ if (xid == cached_xid && cached_gxact)
return cached_gxact;
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
@@ -1020,14 +1017,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
- char path[MAXPGPATH];
StateFileChunk *record;
- pg_crc32c statefile_crc;
- pg_crc32c bogus_crc;
- int fd;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1048,70 +1039,8 @@ EndPrepare(GlobalTransaction gxact)
errmsg("two-phase state file maximum length exceeded")));
/*
- * Create the 2PC state file.
- */
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
- S_IRUSR | S_IWUSR);
- if (fd < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create two-phase state file \"%s\": %m",
- path)));
-
- /* Write data to file, and calculate CRC as we pass over it */
- INIT_CRC32C(statefile_crc);
-
- for (record = records.head; record != NULL; record = record->next)
- {
- COMP_CRC32C(statefile_crc, record->data, record->len);
- if ((write(fd, record->data, record->len)) != record->len)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
- }
-
- FIN_CRC32C(statefile_crc);
-
- /*
- * Write a deliberately bogus CRC to the state file; this is just paranoia
- * to catch the case where four more bytes will run us out of disk space.
- */
- bogus_crc = ~statefile_crc;
-
- if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- /* Back up to prepare for rewriting the CRC */
- if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in two-phase state file: %m")));
- }
-
- /*
- * The state file isn't valid yet, because we haven't written the correct
- * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
- *
- * Between the time we have written the WAL entry and the time we write
- * out the correct state file CRC, we have an inconsistency: the xact is
- * prepared according to WAL but not according to our on-disk state. We
- * use a critical section to force a PANIC if we are unable to complete
- * the write --- then, WAL replay should repair the inconsistency. The
- * odds of a PANIC actually occurring should be very tiny given that we
- * were able to write the bogus CRC above.
+ * Now writing 2PC state data to WAL. We let the WAL's CRC protection
+ * cover us, so no need to calculate a separate CRC.
*
* We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@@ -1131,24 +1060,13 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
- XLogFlush(gxact->prepare_lsn);
+ gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+ XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
- /* write correct CRC and close file */
- if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file: %m")));
+ /* Store record's start location to read that later on Commit */
+ gxact->prepare_start_lsn = ProcLastRecPtr;
/*
* Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1186,7 +1104,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
- SyncRepWaitForLSN(gxact->prepare_lsn);
+ SyncRepWaitForLSN(gxact->prepare_end_lsn);
records.tail = records.head = NULL;
records.num_chunks = 0;
@@ -1315,6 +1233,45 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+
+/*
+ * Reads 2PC data from xlog. During checkpoint this data will be moved to
+ * twophase files and ReadTwoPhaseFile should be used instead.
+ */
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
+{
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+
+ xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating an XLog reading processor.")));
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ if (record == NULL ||
+ XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
+ (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read two-phase state from xlog at %X/%X",
+ (uint32) (lsn >> 32),
+ (uint32) lsn)));
+
+ if (len != NULL)
+ *len = XLogRecGetDataLen(xlogreader);
+
+ *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
+ memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader));
+
+ XLogReaderFree(xlogreader);
+}
+
+
/*
* Confirms an xid is prepared, during recovery
*/
@@ -1375,14 +1332,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact->xid;
/*
- * Read and validate the state file
+ * Read and validate 2PC state data.
+ * State data will typically be stored in WAL files if the LSN is after the
+ * last checkpoint record, or moved to disk if for some reason they have
+ * lived for a long time.
*/
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("two-phase state file for transaction %u is corrupt",
- xid)));
+ if (gxact->ondisk)
+ buf = ReadTwoPhaseFile(xid, true);
+ else
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
/*
* Disassemble the header area
@@ -1482,9 +1441,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat(isCommit);
/*
- * And now we can clean up our mess.
+ * And now we can clean up any files we may have left.
*/
- RemoveTwoPhaseFile(xid, true);
+ if (gxact->ondisk)
+ RemoveTwoPhaseFile(xid, true);
RemoveGXact(gxact);
MyLockedGxact = NULL;
@@ -1539,7 +1499,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
- * Recreates a state file. This is used in WAL replay.
+ * Recreates a state file. This is used in WAL replay and during
+ * checkpoint creation.
*
* Note: content and len don't include CRC.
*/
@@ -1610,97 +1571,71 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
* This is deliberately run as late as possible in the checkpoint sequence,
* because GXACTs ordinarily have short lifespans, and so it is quite
* possible that GXACTs that were valid at checkpoint start will no longer
- * exist if we wait a little bit.
+ * exist if we wait a little bit. With typical checkpoint settings this
+ * will be about 3 minutes for an online checkpoint, so as a result we
+ * we expect that there will be no GXACTs that need to be copied to disk.
*
- * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
- * each time. This is considered unusual enough that we don't bother to
- * expend any extra code to avoid the redundant fsyncs. (They should be
- * reasonably cheap anyway, since they won't cause I/O.)
+ * If a GXACT remains valid across multiple checkpoints, it will already
+ * be on disk so we don't bother to repeat that write.
*/
void
CheckPointTwoPhase(XLogRecPtr redo_horizon)
{
- TransactionId *xids;
- int nxids;
- char path[MAXPGPATH];
int i;
+ int n = 0;
- /*
- * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
- * it just long enough to make a list of the XIDs that require fsyncing,
- * and then do the I/O afterwards.
- *
- * This approach creates a race condition: someone else could delete a
- * GXACT between the time we release TwoPhaseStateLock and the time we try
- * to open its state file. We handle this by special-casing ENOENT
- * failures: if we see that, we verify that the GXACT is no longer valid,
- * and if so ignore the failure.
- */
if (max_prepared_xacts <= 0)
return; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
- xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
- nxids = 0;
-
+ /*
+ * We are expecting there to be zero GXACTs that need to be
+ * copied to disk, so we perform all I/O while holding
+ * TwoPhaseStateLock for simplicity. This prevents any new xacts
+ * from preparing while this occurs, which shouldn't be a problem
+ * since the presence of long-lived prepared xacts indicates the
+ * transaction manager isn't active.
+ *
+ * It's also possible to move I/O out of the lock, but on
+ * every error we should check whether somebody commited our
+ * transaction in different backend. Let's leave this optimisation
+ * for future, if somebody will spot that this place cause
+ * bottleneck.
+ *
+ * Note that it isn't possible for there to be a GXACT with
+ * a prepare_end_lsn set prior to the last checkpoint yet
+ * is marked invalid, because of the efforts with delayChkpt.
+ */
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
if (gxact->valid &&
- gxact->prepare_lsn <= redo_horizon)
- xids[nxids++] = pgxact->xid;
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- for (i = 0; i < nxids; i++)
- {
- TransactionId xid = xids[i];
- int fd;
-
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
- if (fd < 0)
+ !gxact->ondisk &&
+ gxact->prepare_end_lsn <= redo_horizon)
{
- if (errno == ENOENT)
- {
- /* OK if gxact is no longer valid */
- if (!TransactionIdIsPrepared(xid))
- continue;
- /* Restore errno in case it was changed */
- errno = ENOENT;
- }
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
- }
+ char *buf;
+ int len;
- if (pg_fsync(fd) != 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync two-phase state file \"%s\": %m",
- path)));
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
+ RecreateTwoPhaseFile(pgxact->xid, buf, len);
+ gxact->ondisk = true;
+ pfree(buf);
+ n++;
}
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file \"%s\": %m",
- path)));
}
-
- pfree(xids);
+ LWLockRelease(TwoPhaseStateLock);
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
+
+ if (log_checkpoints && n > 0)
+ ereport(LOG,
+ (errmsg("%u two-phase state files were written "
+ "for long-running prepared transactions",
+ n)));
}
/*
@@ -2029,17 +1964,11 @@ RecoverPreparedTransactions(void)
/*
* Recreate its GXACT and dummy PGPROC
- *
- * Note: since we don't have the PREPARE record's WAL location at
- * hand, we leave prepare_lsn zeroes. This means the GXACT will
- * be fsync'd on every future checkpoint. We assume this
- * situation is infrequent enough that the performance cost is
- * negligible (especially since we know the state file has already
- * been fsynced).
*/
gxact = MarkAsPreparing(xid, hdr->gid,
hdr->prepared_at,
hdr->owner, hdr->database);
+ gxact->ondisk = true;
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
MarkAsPrepared(gxact);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index aa90503..c41baa0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
-static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
-
+XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 3de337a..ecd30ce 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -86,6 +86,7 @@ typedef enum
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
+extern XLogRecPtr ProcLastRecPtr;
extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;