Use only xlogreader.c:XLogRead(). The implementations in xlogutils.c and walsender.c are just renamed now, to be removed by the following diff. diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 98cc5d6d9f..1044d4e4dd 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include + #include "access/transam.h" #include "access/xlogrecord.h" #include "access/xlog_internal.h" @@ -26,6 +28,7 @@ #include "replication/origin.h" #ifndef FRONTEND +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -1022,6 +1025,134 @@ XLogSegmentInit(XLogSegment *seg, int size) seg->size = size; } +#ifndef FRONTEND +/* + * Backend should have wal_segment_size variable initialized, segsize is not + * used. + */ +#define XLogFileNameCommon(tli, num, segsize) XLogFileNameP((tli), (num)) +#define xlr_error(...) ereport(ERROR, (errcode_for_file_access(), errmsg(__VA_ARGS__))) +#else +static char xlr_error_msg[MAXFNAMELEN]; +#define XLogFileNameCommon(tli, num, segsize) (XLogFileName(xlr_error_msg, (tli), (num), (segsize)),\ + xlr_error_msg) +#include "fe_utils/logging.h" +/* + * Frontend application (currently only pg_waldump.c) cannot catch and further + * process errors, so they simply treat them as fatal. + */ +#define xlr_error(...) do {pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0) +#endif /* FRONTEND */ + +/* + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If + * tli is passed, get the data from timeline *tli. 'pos' is the current + * position in the XLOG file and openSegment is a callback that opens the next + * segment for reading. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + */ +void +XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID *tli, XLogSegment *seg, XLogOpenSegment openSegment) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = XLogSegmentOffset(recptr, seg->size); + + if (seg->file < 0 || + !XLByteInSeg(recptr, seg->num, seg->size) || + (tli != NULL && *tli != seg->tli)) + { + XLogSegNo nextSegNo; + + /* Switch to another logfile segment */ + if (seg->file >= 0) + close(seg->file); + + XLByteToSeg(recptr, nextSegNo, seg->size); + + /* Open the next segment in the caller's way. */ + openSegment(nextSegNo, tli, seg); + + /* + * If the function is called by the XLOG reader, the reader will + * eventually set both "num" and "off". However we need to care + * about them too because the function can also be used directly, + * see walsender.c. + */ + seg->num = nextSegNo; + seg->off = 0; + } + + /* Need to seek in the file? */ + if (seg->off != startoff) + { + if (lseek(seg->file, (off_t) startoff, SEEK_SET) < 0) + xlr_error("could not seek in log segment %s to offset %u: %m", + XLogFileNameCommon(seg->tli, seg->num, seg->size), + startoff); + seg->off = startoff; + } + + /* How many bytes are within this segment? */ + if (nbytes > (seg->size - startoff)) + segbytes = seg->size - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + readbytes = read(seg->file, p, segbytes); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + if (readbytes < 0) + { + xlr_error("could not read from log segment %s, offset %u, length %zu: %m", + XLogFileNameCommon(seg->tli, seg->num, seg->size), + seg->off, + (Size) segbytes); + } + else if (readbytes == 0) + { + xlr_error("could not read from log segment %s, offset %u: read %d of %zu", + XLogFileNameCommon(seg->tli, seg->num, seg->size), + seg->off, + readbytes, + (Size) segbytes); + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + + /* + * If the function is called by the XLOG reader, the reader will + * eventually set this field. However we need to care about it too + * because the function can also be used directly (see walsender.c). + */ + seg->off += readbytes; + } +} + /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 836c2e2927..f4a90a602c 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * frontend). Probably these should be merged at some point. */ static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) +XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, + Size count) { char *p; XLogRecPtr recptr; @@ -897,6 +897,35 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } /* + * Callback for XLogRead() to open the next segment. + */ +static void +read_local_xlog_page_open_segment(XLogSegNo nextSegNo, TimeLineID *tli, + XLogSegment *seg) +{ + char path[MAXPGPATH]; + + XLogFilePath(path, *tli, nextSegNo, seg->size); + seg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (seg->file < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + seg->tli = *tli; +} + +/* * read_page callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another @@ -1022,10 +1051,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->seg.size, state->seg.tli, targetPagePtr, - XLOG_BLCKSZ); - state->seg.tli = pageTLI; - + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, &pageTLI, + &state->seg, read_local_xlog_page_open_segment); /* number of valid bytes in the buffer */ return count; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 6dfb525e1a..afafd4082e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -247,7 +247,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static void WalSndOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli, + XLogSegment *seg); +static void XLogReadOld(char *buf, XLogRecPtr startptr, Size count); /* Initialize walsender process before entering the main command loop */ @@ -782,7 +784,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, NULL, sendSeg, + WalSndOpenSegment); return count; } @@ -2353,7 +2356,7 @@ WalSndKill(int code, Datum arg) * more than one. */ static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogReadOld(char *buf, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -2527,6 +2530,76 @@ retry: } /* + * Callback for XLogRead() to open the next segment. + */ +void +WalSndOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli, XLogSegment *seg) +{ + char path[MAXPGPATH]; + + /* + * The timeline is determined below, caller should not do anything about + * it. + */ + Assert(tli == NULL); + + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + seg->tli = sendTimeLine; + if (sendTimeLineIsHistoric) + { + XLogSegNo endSegNo; + + XLByteToSeg(sendTimeLineValidUpto, endSegNo, seg->size); + if (seg->num == endSegNo) + seg->tli = sendTimeLineNextTLI; + } + + XLogFilePath(path, seg->tli, nextSegNo, seg->size); + seg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (seg->file < 0) + { + /* + * If the file is not found, assume it's because the standby asked for + * a too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(seg->tli, seg->num)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } +} + +/* * Send out the WAL in its normal physical/stored form. * * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, @@ -2543,6 +2616,7 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2758,7 +2832,48 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + +retry: + XLogRead(&output_message.data[output_message.len], startptr, nbytes, + NULL, /* WalSndOpenSegment will determine TLI */ + sendSeg, + WalSndOpenSegment); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(startptr, segno, wal_segment_size); + CheckXLogRemoved(segno, ThisTimeLineID); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendSeg->file >= 0) + { + close(sendSeg->file); + sendSeg->file = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index b31b6cdcaf..caf5533aeb 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -296,6 +296,45 @@ identify_target_directory(XLogDumpPrivate *private, char *directory, fatal_error("could not find any WAL file"); } +static void +XLogDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli, XLogSegment *seg) +{ + char fname[MAXPGPATH]; + int tries; + + XLogFileName(fname, *tli, nextSegNo, seg->size); + + /* + * In follow mode there is a short period of time after the server has + * written the end of the previous file before the new file is available. + * So we loop for 5 seconds looking for the file to appear before giving + * up. + */ + for (tries = 0; tries < 10; tries++) + { + seg->file = open_file_in_directory(seg->dir, fname); + if (seg->file >= 0) + break; + if (errno == ENOENT) + { + int save_errno = errno; + + /* File not there yet, try again */ + pg_usleep(500 * 1000); + + errno = save_errno; + continue; + } + /* Any other error, fall through and fail */ + break; + } + + if (seg->file < 0) + fatal_error("could not find file \"%s\": %s", + fname, strerror(errno)); + seg->tli = *tli; +} + /* * Read count bytes from a segment file in the specified directory, for the * given timeline, containing the specified record pointer; store the data in @@ -441,8 +480,8 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr, - readBuff, count); + XLogRead(readBuff, targetPagePtr, count, &private->timeline, + &state->seg, XLogDumpOpenSegment); return count; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 0d801d5903..5ba89a7035 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -221,7 +221,22 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ +/* + * Callback to open the specified XLOG segment nextSegNo in timeline *tli for + * reading, and assign the descriptor to ->file. BasicOpenFile() is the + * preferred way to open the segment file in backend code, whereas open(2) + * should be used in frontend. + * + * If NULL is passed for tli, the callback must determine the timeline + * itself. In any case it's supposed to eventually set ->tli. + */ +typedef void (*XLogOpenSegment) (XLogSegNo nextSegNo, TimeLineID *tli, + XLogSegment *seg); + extern void XLogSegmentInit(XLogSegment *seg, int size); +extern void XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID *tli, XLogSegment *seg, + XLogOpenSegment openSegment); /* Functions for decoding an XLogRecord */