From a9c5a09eda9c63ab39db7336b04863bead352846 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Mon, 9 Sep 2019 11:53:54 +0200 Subject: [PATCH 2/4] Introduce XLogSegment structure. --- src/backend/access/transam/xlog.c | 6 +- src/backend/access/transam/xlogreader.c | 57 +++++++++++------- src/backend/access/transam/xlogutils.c | 20 +++---- src/backend/replication/walsender.c | 78 ++++++++++++------------- src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/pg_waldump.c | 3 + src/include/access/xlogreader.h | 32 ++++++---- 7 files changed, 111 insertions(+), 87 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cd948dbefc..c5bfda74f0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4295,7 +4295,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size); offset = XLogSegmentOffset(xlogreader->latestPagePtr, wal_segment_size); - XLogFileName(fname, xlogreader->readPageTLI, segno, + XLogFileName(fname, xlogreader->seg.tli, segno, wal_segment_size); ereport(emode_for_corrupt_record(emode, RecPtr ? RecPtr : EndRecPtr), @@ -7354,7 +7354,7 @@ StartupXLOG(void) * and we were reading the old WAL from a segment belonging to a higher * timeline. */ - EndOfLogTLI = xlogreader->readPageTLI; + EndOfLogTLI = xlogreader->seg.tli; /* * Complain if we did not roll forward far enough to render the backup @@ -11639,7 +11639,7 @@ retry: Assert(targetPageOff == readOff); Assert(reqLen <= readLen); - xlogreader->readPageTLI = curFileTLI; + xlogreader->seg.tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 2184f4291d..7b4ec81493 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -96,7 +96,9 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, return NULL; } - state->wal_segment_size = wal_segment_size; + /* Initialize segment pointer. */ + XLogSegmentInit(&state->seg, wal_segment_size); + state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; @@ -490,8 +492,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->wal_segment_size - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size); + state->EndRecPtr += state->seg.size - 1; + state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->seg.size); } if (DecodeXLogRecord(state, record, errormsg)) @@ -533,12 +535,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) Assert((pageptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); - targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); + XLByteToSeg(pageptr, targetSegNo, state->seg.size); + targetPageOff = XLogSegmentOffset(pageptr, state->seg.size); /* check whether we have all the requested data already */ - if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && - reqLen <= state->readLen) + if (targetSegNo == state->seg.num && + targetPageOff == state->seg.off && reqLen <= state->readLen) return state->readLen; /* @@ -553,7 +555,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. */ - if (targetSegNo != state->readSegNo && targetPageOff != 0) + if (targetSegNo != state->seg.num && targetPageOff != 0) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; @@ -608,8 +610,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) goto err; /* update read state information */ - state->readSegNo = targetSegNo; - state->readOff = targetPageOff; + state->seg.num = targetSegNo; + state->seg.off = targetPageOff; state->readLen = readLen; return readLen; @@ -625,8 +627,8 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->readSegNo = 0; - state->readOff = 0; + state->seg.num = 0; + state->seg.off = 0; state->readLen = 0; } @@ -745,16 +747,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, Assert((recptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(recptr, segno, state->wal_segment_size); - offset = XLogSegmentOffset(recptr, state->wal_segment_size); + XLByteToSeg(recptr, segno, state->seg.size); + offset = XLogSegmentOffset(recptr, state->seg.size); - XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr); + XLogSegNoOffsetToRecPtr(segno, offset, state->seg.size, recaddr); if (hdr->xlp_magic != XLOG_PAGE_MAGIC) { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "invalid magic number %04X in log segment %s, offset %u", @@ -768,7 +770,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "invalid info bits %04X in log segment %s, offset %u", @@ -791,7 +793,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, (unsigned long long) state->system_identifier); return false; } - else if (longhdr->xlp_seg_size != state->wal_segment_size) + else if (longhdr->xlp_seg_size != state->seg.size) { report_invalid_record(state, "WAL file is from different database system: incorrect segment size in page header"); @@ -808,7 +810,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, @@ -828,7 +830,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "unexpected pageaddr %X/%X in log segment %s, offset %u", @@ -853,7 +855,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", @@ -997,6 +999,19 @@ out: #endif /* FRONTEND */ +/* + * Initialize the passed segment pointer. + */ +void +XLogSegmentInit(XLogSegment *seg, int size) +{ + seg->file = -1; + seg->num = 0; + seg->off = 0; + seg->tli = 0; + seg->dir = NULL; + seg->size = size; +} /* ---------------------------------------- * Functions for decoding the data and block references in a record. diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 680bed8278..424bb06919 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->readSegNo * - state->wal_segment_size + state->readOff; + const XLogRecPtr lastReadPage = state->seg.num * + state->seg.size + state->seg.off; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa if (state->currTLIValidUntil != InvalidXLogRecPtr && state->currTLI != ThisTimeLineID && state->currTLI != 0 && - ((wantPage + wantLength) / state->wal_segment_size) < - (state->currTLIValidUntil / state->wal_segment_size)) + ((wantPage + wantLength) / state->seg.size) < + (state->currTLIValidUntil / state->seg.size)) return; /* @@ -870,11 +870,11 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa */ List *timelineHistory = readTimeLineHistory(ThisTimeLineID); - XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1) - * state->wal_segment_size) - 1; + XLogRecPtr endOfSegment = (((wantPage / state->seg.size) + 1) + * state->seg.size) - 1; - Assert(wantPage / state->wal_segment_size == - endOfSegment / state->wal_segment_size); + Assert(wantPage / state->seg.size == + endOfSegment / state->seg.size); /* * Find the timeline of the last LSN on the segment containing @@ -1022,9 +1022,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->wal_segment_size, pageTLI, targetPagePtr, + XLogRead(cur_page, state->seg.size, state->seg.tli, targetPagePtr, XLOG_BLCKSZ); - state->readPageTLI = pageTLI; + state->seg.tli = pageTLI; /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 28d8c31af8..f5630a63cf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -128,16 +128,7 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -/* - * These variables are used similarly to openLogFile/SegNo/Off, - * but for walsender to read the XLOG. - */ -static int sendFile = -1; -static XLogSegNo sendSegNo = 0; -static uint32 sendOff = 0; - -/* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static XLogSegment *sendSeg = NULL; /* * These variables keep track of the state of the timeline we're currently @@ -285,6 +276,10 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); + + /* Make sure we can remember the current read position in XLOG. */ + sendSeg = (XLogSegment *) MemoryContextAlloc(TopMemoryContext, sizeof(XLogSegment)); + XLogSegmentInit(sendSeg, wal_segment_size); } /* @@ -301,10 +296,10 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendFile >= 0) + if (sendSeg->file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->file); + sendSeg->file = -1; } if (MyReplicationSlot != NULL) @@ -2384,15 +2379,16 @@ retry: startoff = XLogSegmentOffset(recptr, wal_segment_size); - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size)) + if (sendSeg->file < 0 || + !XLByteInSeg(recptr, sendSeg->num, sendSeg->size)) { char path[MAXPGPATH]; /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); + if (sendSeg->file >= 0) + close(sendSeg->file); - XLByteToSeg(recptr, sendSegNo, wal_segment_size); + XLByteToSeg(recptr, sendSeg->num, sendSeg->size); /*------- * When reading from a historic timeline, and there is a timeline @@ -2420,20 +2416,20 @@ retry: * used portion of the old segment is copied to the new file. *------- */ - curFileTimeLine = sendTimeLine; + sendSeg->tli = sendTimeLine; if (sendTimeLineIsHistoric) { XLogSegNo endSegNo; XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); - if (sendSegNo == endSegNo) - curFileTimeLine = sendTimeLineNextTLI; + if (sendSeg->num == endSegNo) + sendSeg->tli = sendTimeLineNextTLI; } - XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size); + XLogFilePath(path, sendSeg->tli, sendSeg->num, wal_segment_size); - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendFile < 0) + sendSeg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (sendSeg->file < 0) { /* * If the file is not found, assume it's because the standby @@ -2444,26 +2440,26 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); + XLogFileNameP(sendSeg->tli, sendSeg->num)))); else ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); } - sendOff = 0; + sendSeg->off = 0; } /* Need to seek in the file? */ - if (sendOff != startoff) + if (sendSeg->off != startoff) { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + if (lseek(sendSeg->file, (off_t) startoff, SEEK_SET) < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), + XLogFileNameP(sendSeg->tli, sendSeg->num), startoff))); - sendOff = startoff; + sendSeg->off = startoff; } /* How many bytes are within this segment? */ @@ -2473,29 +2469,29 @@ retry: segbytes = nbytes; pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); + readbytes = read(sendSeg->file, p, segbytes); pgstat_report_wait_end(); if (readbytes < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (Size) segbytes))); + XLogFileNameP(sendSeg->tli, sendSeg->num), + sendSeg->off, (Size) segbytes))); } else if (readbytes == 0) { ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, readbytes, (Size) segbytes))); + XLogFileNameP(sendSeg->tli, sendSeg->num), + sendSeg->off, readbytes, (Size) segbytes))); } /* Update state for read */ recptr += readbytes; - sendOff += readbytes; + sendSeg->off += readbytes; nbytes -= readbytes; p += readbytes; } @@ -2526,10 +2522,10 @@ retry: walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); - if (reload && sendFile >= 0) + if (reload && sendSeg->file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->file); + sendSeg->file = -1; goto retry; } @@ -2695,9 +2691,9 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendFile >= 0) - close(sendFile); - sendFile = -1; + if (sendSeg->file >= 0) + close(sendSeg->file); + sendSeg->file = -1; /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 0a89f9c02a..e663fec6a7 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -320,7 +320,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, Assert(targetSegNo == xlogreadsegno); - xlogreader->readPageTLI = targetHistory[private->tliIndex].tli; + xlogreader->seg.tli = targetHistory[private->tliIndex].tli; return XLOG_BLCKSZ; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 40c64a0bbf..a16793bb8b 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -1105,6 +1105,9 @@ main(int argc, char **argv) if (!xlogreader_state) fatal_error("out of memory"); + /* Finalize the segment pointer. */ + xlogreader_state->seg.dir = private.inpath; + /* first find a valid recptr to start from */ first_record = XLogFindNextRecord(xlogreader_state, private.startptr); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d64a9ad82f..c2724fff74 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -31,6 +31,20 @@ #include "access/xlogrecord.h" +/* + * Position in XLOG file while reading it. + */ +typedef struct XLogSegment +{ + int file; /* segment file descriptor */ + XLogSegNo num; /* segment number */ + uint32 off; /* offset in the segment */ + TimeLineID tli; /* timeline ID of the currently open file */ + + char *dir; /* directory (only needed by frontends) */ + int size; /* segment size */ +} XLogSegment; + typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ @@ -76,11 +90,6 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Segment size of the to-be-parsed data (mandatory). - */ - int wal_segment_size; - /* * Data input callback (mandatory). * @@ -98,8 +107,8 @@ struct XLogReaderState * actual WAL record it's interested in. In that case, targetRecPtr can * be used to determine which timeline to read the page from. * - * The callback shall set ->readPageTLI to the TLI of the file the page - * was read from. + * The callback shall set ->seg.tli to the TLI of the file the page was + * read from. */ XLogPageReadCB read_page; @@ -154,10 +163,8 @@ struct XLogReaderState char *readBuf; uint32 readLen; - /* last read segment, segment offset, TLI for data currently in readBuf */ - XLogSegNo readSegNo; - uint32 readOff; - TimeLineID readPageTLI; + /* last read XLOG position for data currently in readBuf */ + XLogSegment seg; /* * beginning of prior page read, and its TLI. Doesn't necessarily @@ -217,6 +224,9 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ + +extern void XLogSegmentInit(XLogSegment *seg, int size); + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, -- 2.22.0