diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 8c47e0f..e9a1ff5 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -51,6 +51,7 @@ #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" +#include "access/xlogreader.h" #include "catalog/pg_type.h" #include "catalog/storage.h" #include "funcapi.h" @@ -60,6 +61,7 @@ #include "replication/origin.h" #include "replication/syncrep.h" #include "replication/walsender.h" +#include "replication/logicalfuncs.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/predicate.h" @@ -117,7 +119,11 @@ typedef struct GlobalTransactionData int pgprocno; /* ID of associated dummy PGPROC */ BackendId dummyBackendId; /* similar to backend id for backends */ TimestampTz prepared_at; /* time of preparation */ - XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */ + XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start + * or InvalidXLogRecPtr if twophase data + * moved to file after checkpoint. + */ + XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ Oid owner; /* ID of user that executed the xact */ BackendId locking_backend; /* backend currently working on the xact */ bool valid; /* TRUE if PGPROC entry is in proc array */ @@ -166,6 +172,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); +static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); /* * Initialization of shared memory @@ -398,8 +405,9 @@ MarkAsPreparing(TransactionId xid, const char *gid, pgxact->nxids = 0; gxact->prepared_at = prepared_at; - /* initialize LSN to 0 (start of WAL) */ - gxact->prepare_lsn = 0; + /* initialize LSN to InvalidXLogRecPtr */ + gxact->prepare_start_lsn = InvalidXLogRecPtr; + gxact->prepare_end_lsn = InvalidXLogRecPtr; gxact->owner = owner; gxact->locking_backend = MyBackendId; gxact->valid = false; @@ -579,41 +587,6 @@ RemoveGXact(GlobalTransaction gxact) } /* - * TransactionIdIsPrepared - * True iff transaction associated with the identifier is prepared - * for two-phase commit - * - * Note: only gxacts marked "valid" are considered; but notice we do not - * check the locking status. - * - * This is not currently exported, because it is only needed internally. - */ -static bool -TransactionIdIsPrepared(TransactionId xid) -{ - bool result = false; - int i; - - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); - - for (i = 0; i < TwoPhaseState->numPrepXacts; i++) - { - GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - - if (gxact->valid && pgxact->xid == xid) - { - result = true; - break; - } - } - - LWLockRelease(TwoPhaseStateLock); - - return result; -} - -/* * Returns an array of all prepared transactions for the user-level * function pg_prepared_xact. * @@ -1020,14 +993,8 @@ StartPrepare(GlobalTransaction gxact) void EndPrepare(GlobalTransaction gxact) { - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - TransactionId xid = pgxact->xid; TwoPhaseFileHeader *hdr; - char path[MAXPGPATH]; StateFileChunk *record; - pg_crc32c statefile_crc; - pg_crc32c bogus_crc; - int fd; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1048,70 +1015,7 @@ EndPrepare(GlobalTransaction gxact) errmsg("two-phase state file maximum length exceeded"))); /* - * Create the 2PC state file. - */ - TwoPhaseFilePath(path, xid); - - fd = OpenTransientFile(path, - O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, - S_IRUSR | S_IWUSR); - if (fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not create two-phase state file \"%s\": %m", - path))); - - /* Write data to file, and calculate CRC as we pass over it */ - INIT_CRC32C(statefile_crc); - - for (record = records.head; record != NULL; record = record->next) - { - COMP_CRC32C(statefile_crc, record->data, record->len); - if ((write(fd, record->data, record->len)) != record->len) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - } - - FIN_CRC32C(statefile_crc); - - /* - * Write a deliberately bogus CRC to the state file; this is just paranoia - * to catch the case where four more bytes will run us out of disk space. - */ - bogus_crc = ~statefile_crc; - - if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - - /* Back up to prepare for rewriting the CRC */ - if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in two-phase state file: %m"))); - } - - /* - * The state file isn't valid yet, because we haven't written the correct - * CRC yet. Before we do that, insert entry in WAL and flush it to disk. - * - * Between the time we have written the WAL entry and the time we write - * out the correct state file CRC, we have an inconsistency: the xact is - * prepared according to WAL but not according to our on-disk state. We - * use a critical section to force a PANIC if we are unable to complete - * the write --- then, WAL replay should repair the inconsistency. The - * odds of a PANIC actually occurring should be very tiny given that we - * were able to write the bogus CRC above. + * Now writing 2PC state data to WAL. * * We have to set delayChkpt here, too; otherwise a checkpoint starting * immediately after the WAL record is inserted could complete without @@ -1131,24 +1035,13 @@ EndPrepare(GlobalTransaction gxact) XLogBeginInsert(); for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); - gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); - XLogFlush(gxact->prepare_lsn); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ - /* write correct CRC and close file */ - if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - - if (CloseTransientFile(fd) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close two-phase state file: %m"))); + /* Store record's start location to read that later on Commit */ + gxact->prepare_start_lsn = ProcLastRecPtr; /* * Mark the prepared transaction as valid. As soon as xact.c marks @@ -1186,7 +1079,7 @@ EndPrepare(GlobalTransaction gxact) * Note that at this stage we have marked the prepare, but still show as * running in the procarray (twice!) and continue to hold locks. */ - SyncRepWaitForLSN(gxact->prepare_lsn); + SyncRepWaitForLSN(gxact->prepare_end_lsn); records.tail = records.head = NULL; records.num_chunks = 0; @@ -1315,6 +1208,36 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } + +/* + * Reads 2PC data from xlog. During checkpoint this data will be moved to + * twophase files and ReadTwoPhaseFile should be used instead. + */ +static void +XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) +{ + XLogRecord *record; + XLogReaderState *xlogreader; + char *errormsg; + + xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL); + if (xlogreader == NULL) + elog(ERROR, "failed to open xlogreader for reading 2PC data"); + + record = XLogReadRecord(xlogreader, lsn, &errormsg); + if (record == NULL) + elog(ERROR, "failed to read 2PC record from xlog"); + + if (len != NULL) + *len = XLogRecGetDataLen(xlogreader); + + *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader)); + memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader)); + + XLogReaderFree(xlogreader); +} + + /* * Confirms an xid is prepared, during recovery */ @@ -1364,6 +1287,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) int ndelrels; SharedInvalidationMessage *invalmsgs; int i; + bool file_used = false; /* * Validate the GID, and lock the GXACT to ensure that two backends do not @@ -1375,14 +1299,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit) xid = pgxact->xid; /* - * Read and validate the state file + * Read and validate 2PC state data. + * State data can be stored in xlog or in files after xlog checkpoint. + * While checkpointing we set gxact->prepare_start_lsn to NULL to signalize + * that 2PC data is moved to files. */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("two-phase state file for transaction %u is corrupt", - xid))); + if (gxact->prepare_start_lsn) + { + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + } + else + { + buf = ReadTwoPhaseFile(xid, true); + file_used = true; + } /* * Disassemble the header area @@ -1484,7 +1414,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* * And now we can clean up our mess. */ - RemoveTwoPhaseFile(xid, true); + if (file_used) + RemoveTwoPhaseFile(xid, true); RemoveGXact(gxact); MyLockedGxact = NULL; @@ -1539,7 +1470,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) } /* - * Recreates a state file. This is used in WAL replay. + * Recreates a state file. This is used in WAL replay and during + * checkpoint creation. * * Note: content and len don't include CRC. */ @@ -1620,85 +1552,38 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) void CheckPointTwoPhase(XLogRecPtr redo_horizon) { - TransactionId *xids; - int nxids; - char path[MAXPGPATH]; int i; + int len; + char *buf; - /* - * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab - * it just long enough to make a list of the XIDs that require fsyncing, - * and then do the I/O afterwards. - * - * This approach creates a race condition: someone else could delete a - * GXACT between the time we release TwoPhaseStateLock and the time we try - * to open its state file. We handle this by special-casing ENOENT - * failures: if we see that, we verify that the GXACT is no longer valid, - * and if so ignore the failure. - */ if (max_prepared_xacts <= 0) return; /* nothing to do */ TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START(); - xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId)); - nxids = 0; - + /* + * Here we doing whole I/O while holding TwoPhaseStateLock. + * It's also possible to move I/O out of the lock, but on + * every error we should check whether somebody commited our + * transaction in different backend. Let's leave this optimisation + * for future, if somebody will spot that this place cause + * bottleneck. + */ LWLockAcquire(TwoPhaseStateLock, LW_SHARED); - for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - if (gxact->valid && - gxact->prepare_lsn <= redo_horizon) - xids[nxids++] = pgxact->xid; - } - - LWLockRelease(TwoPhaseStateLock); - - for (i = 0; i < nxids; i++) - { - TransactionId xid = xids[i]; - int fd; - - TwoPhaseFilePath(path, xid); - - fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0); - if (fd < 0) - { - if (errno == ENOENT) - { - /* OK if gxact is no longer valid */ - if (!TransactionIdIsPrepared(xid)) - continue; - /* Restore errno in case it was changed */ - errno = ENOENT; - } - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open two-phase state file \"%s\": %m", - path))); - } - - if (pg_fsync(fd) != 0) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not fsync two-phase state file \"%s\": %m", - path))); + if (gxact->valid && gxact->prepare_start_lsn != InvalidXLogRecPtr && + gxact->prepare_end_lsn <= redo_horizon){ + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); + RecreateTwoPhaseFile(pgxact->xid, buf, len); + gxact->prepare_start_lsn = InvalidXLogRecPtr; + pfree(buf); } - - if (CloseTransientFile(fd) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close two-phase state file \"%s\": %m", - path))); } - - pfree(xids); + LWLockRelease(TwoPhaseStateLock); TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE(); } @@ -2030,12 +1915,8 @@ RecoverPreparedTransactions(void) /* * Recreate its GXACT and dummy PGPROC * - * Note: since we don't have the PREPARE record's WAL location at - * hand, we leave prepare_lsn zeroes. This means the GXACT will - * be fsync'd on every future checkpoint. We assume this - * situation is infrequent enough that the performance cost is - * negligible (especially since we know the state file has already - * been fsynced). + * MarkAsPreparing sets prepare_start_lsn to InvalidXLogRecPtr + * so next checkpoint will skip that transaction. */ gxact = MarkAsPreparing(xid, hdr->gid, hdr->prepared_at, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 86debf4..3683785 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -321,8 +321,7 @@ static TimeLineID curFileTLI; * stored here. The parallel leader advances its own copy, when necessary, * in WaitForParallelWorkersToFinish. */ -static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; - +XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 790ca66..a6d04cc 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -86,6 +86,7 @@ typedef enum RECOVERY_TARGET_IMMEDIATE } RecoveryTargetType; +extern XLogRecPtr ProcLastRecPtr; extern XLogRecPtr XactLastRecEnd; extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;