diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f3082c3..6b3abc5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -217,7 +217,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok); /* Initialize walsender process before entering the main command loop */ @@ -774,7 +774,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false); return count; } @@ -1551,8 +1551,9 @@ static void ProcessStandbyReplyMessage(void) { XLogRecPtr writePtr, - flushPtr, - applyPtr; + flushPtr, oldFlushPtr, + applyPtr, + keepPtr; bool replyRequested; /* the caller already consumed the msgtype byte */ @@ -1580,24 +1581,99 @@ ProcessStandbyReplyMessage(void) WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); + keepPtr = walsnd->keep; + oldFlushPtr = walsnd->flush; walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; SpinLockRelease(&walsnd->mutex); } + /* + * If we are managed by a replication slot, maintain keepPtr on the page + * where the first fragment of the continuation record at flushPtr. Since + * this doesn't look into individual record, keepPtr may stay a bit too + * behind. + */ + if (MyReplicationSlot && + flushPtr != InvalidXLogRecPtr && oldFlushPtr != InvalidXLogRecPtr) + { + /* + * If keepPtr is cathing up, we do nothing until the next segment + * comes. Otherwise check on every page boundary. + */ + if (oldKeepPtr == InvalidXLogRecPtr ? + keepPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE : + keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ) + { + XLogRecPtr oldKeepPtr = keepPtr; + XLogRecPtr rp; + + if (keepPtr == InvalidXLogRecPtr) + keepPtr = oldFlushPtr; + + rp = keepPtr - (keepPtr % XLOG_BLCKSZ); + + /* We may have the record at flushPtr, so it's worth looking */ + while (rp <= flushPtr) + { + XLogPageHeaderData header; + + /* + * If we don't have enough wal data, don't move keepPtr + * forward. We may read it by the next chance. + */ + if(sentPtr - rp >= sizeof(XLogPageHeaderData)) + { + bool found; + /* + * Fetch the page header of the next page. Move keepPtr + * forward only if when it is not a continuing page. + */ + found = XLogRead((char *)&header, + rp, sizeof(XLogPageHeaderData), true); + if (found && + (header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0) + keepPtr = rp; + } + rp += XLOG_BLCKSZ; + } + + /* + * If keepPtr is on the same page with flushPtr, it means catching + * up + */ + if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ) + keepPtr = InvalidXLogRecPtr; + + if (oldKeepPtr != keepPtr) + { + WalSnd *walsnd = MyWalSnd; + elog(LOG, "%lX => %lX / %lX", oldKeepPtr, keepPtr, flushPtr); + SpinLockAcquire(&walsnd->mutex); + walsnd->keep = keepPtr; + SpinLockRelease(&walsnd->mutex); + } + } + } + if (!am_cascading_walsender) SyncRepReleaseWaiters(); /* * Advance our local xmin horizon when the client confirmed a flush. */ - if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) + if (MyReplicationSlot) { - if (SlotIsLogical(MyReplicationSlot)) + if (SlotIsLogical(MyReplicationSlot) && flushPtr != InvalidXLogRecPtr) LogicalConfirmReceivedLocation(flushPtr); else - PhysicalConfirmReceivedLocation(flushPtr); + { + /* keepPtr == InvalidXLogRecPtr means catching up */ + if (keepPtr == InvalidXLogRecPtr) + keepPtr = flushPtr; + PhysicalConfirmReceivedLocation(keepPtr); + } } } @@ -2019,6 +2095,7 @@ WalSndKill(int code, Datum arg) /* * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' + * Returns false if the segment file is not found iff notfoundok is true. * * XXX probably this should be improved to suck data directly from the * WAL buffers when possible. @@ -2028,8 +2105,8 @@ WalSndKill(int code, Datum arg) * always be one descriptor left open until the process ends, but never * more than one. */ -static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +static bool +XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok) { char *p; XLogRecPtr recptr; @@ -2106,10 +2183,15 @@ retry: * removed or recycled. */ if (errno == ENOENT) + { + if (notfoundok) + return false; + ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", XLogFileNameP(curFileTimeLine, sendSegNo)))); + } else ereport(ERROR, (errcode_for_file_access(), @@ -2189,6 +2271,8 @@ retry: goto retry; } } + + return true; } /* @@ -2393,7 +2477,7 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + XLogRead(&output_message.data[output_message.len], startptr, nbytes, false); output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5e6ccfc..084146d 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -47,6 +47,13 @@ typedef struct WalSnd XLogRecPtr flush; XLogRecPtr apply; + /* + * Segment-spanning continuation records requires that the all related + * segments preserved. This holds how far we should preserve older + * segments only when it differs to flush location. + */ + XLogRecPtr keep; + /* Protects shared variables shown above. */ slock_t mutex;