diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f3082c3..0270474 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -185,6 +185,12 @@ static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; static XLogRecPtr logical_startptr = InvalidXLogRecPtr; +/* + * Segment keep pointer for physical slots. Has a valid value only when it + * differs from the current flush pointer. + */ +static XLogRecPtr keepPtr = InvalidXLogRecPtr; + /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); @@ -217,7 +223,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok); /* Initialize walsender process before entering the main command loop */ @@ -538,6 +544,9 @@ StartReplication(StartReplicationCmd *cmd) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a logical replication slot for physical replication")))); + + /* Restore keepPtr from replication slot */ + keepPtr = MyReplicationSlot->data.restart_lsn; } /* @@ -553,6 +562,10 @@ StartReplication(StartReplicationCmd *cmd) else FlushPtr = GetFlushRecPtr(); + /* Set InvalidXLogRecPtr if catching up */ + if (keepPtr == FlushPtr) + keepPtr = InvalidXLogRecPtr; + if (cmd->timeline != 0) { XLogRecPtr switchpoint; @@ -774,7 +787,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false); return count; } @@ -1551,7 +1564,7 @@ static void ProcessStandbyReplyMessage(void) { XLogRecPtr writePtr, - flushPtr, + flushPtr, oldFlushPtr, applyPtr; bool replyRequested; @@ -1580,6 +1593,7 @@ ProcessStandbyReplyMessage(void) WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); + oldFlushPtr = walsnd->flush; walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; @@ -1597,7 +1611,78 @@ ProcessStandbyReplyMessage(void) if (SlotIsLogical(MyReplicationSlot)) LogicalConfirmReceivedLocation(flushPtr); else - PhysicalConfirmReceivedLocation(flushPtr); + { + /* + * On recovery, a continuation reocrd must be available from + * single WAL source. So physical replication slot should stay in + * the first segment for a continuation record spanning multiple + * segments. Since this doesn't look into individual record, + * keepPtr may stay a bit too behind. + * + * Since the objective is avoding to remove required segments, + * checking every segment is enough. But once keepPtr goes behind, + * check every page for quick restoration. + * + * keepPtr has a valid value only when it is behind flushPtr. + */ + if (oldFlushPtr != InvalidXLogRecPtr && + (keepPtr == InvalidXLogRecPtr ? + oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE : + keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ)) + { + XLogRecPtr rp; + XLogRecPtr oldKeepPtr = keepPtr; /* for debug */ + + if (keepPtr == InvalidXLogRecPtr) + keepPtr = oldFlushPtr; + + rp = keepPtr - (keepPtr % XLOG_BLCKSZ); + + /* + * We may have let the record at flushPtr sent, so it's worth + * looking + */ + while (rp <= flushPtr) + { + XLogPageHeaderData header; + + /* + * If the page header is not available for now, don't move + * keepPtr forward. We can read it by the next chance. + */ + if(sentPtr - rp >= sizeof(XLogPageHeaderData)) + { + bool found; + /* + * Fetch the page header of the next page. Move + * keepPtr forward only if when it is not a + * continuation page. + */ + found = XLogRead((char *)&header, rp, + sizeof(XLogPageHeaderData), true); + if (found && + (header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0) + keepPtr = rp; + } + rp += XLOG_BLCKSZ; + } + + /* + * If keepPtr is on the same page with flushPtr, it means that + * we are catching up + */ + if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ) + keepPtr = InvalidXLogRecPtr; + + if (oldKeepPtr != keepPtr) + elog(LOG, "%lX => %lX / %lX", + oldKeepPtr, keepPtr, flushPtr); + } + + /* keepPtr == InvalidXLogRecPtr means catching up */ + PhysicalConfirmReceivedLocation(keepPtr != InvalidXLogRecPtr ? + keepPtr : flushPtr); + } } } @@ -2019,6 +2104,7 @@ WalSndKill(int code, Datum arg) /* * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' + * Returns false if the segment file is not found when notfoundok is true. * * XXX probably this should be improved to suck data directly from the * WAL buffers when possible. @@ -2028,8 +2114,8 @@ WalSndKill(int code, Datum arg) * always be one descriptor left open until the process ends, but never * more than one. */ -static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +static bool +XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok) { char *p; XLogRecPtr recptr; @@ -2106,10 +2192,15 @@ retry: * removed or recycled. */ if (errno == ENOENT) + { + if (notfoundok) + return false; + ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", XLogFileNameP(curFileTimeLine, sendSegNo)))); + } else ereport(ERROR, (errcode_for_file_access(), @@ -2189,6 +2280,8 @@ retry: goto retry; } } + + return true; } /* @@ -2393,7 +2486,7 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + XLogRead(&output_message.data[output_message.len], startptr, nbytes, false); output_message.len += nbytes; output_message.data[output_message.len] = '\0';