*** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 367,372 **** typedef struct XLogCtlData --- 367,374 ---- XLogRecPtr asyncCommitLSN; /* LSN of newest async commit */ uint32 lastRemovedLog; /* latest removed/recycled XLOG segment */ uint32 lastRemovedSeg; + XLogRecPtr oldestCacheLSN; /* start of oldest block in cache */ + int sendidx; /* cache index of next block to send */ /* Protected by WALWriteLock: */ XLogCtlWrite Write; *************** *** 550,555 **** static void XLogReportParameters(void); --- 552,559 ---- static void LocalSetXLogInsertAllowed(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); + static bool XLByteInBuf(XLogRecPtr recptr); + static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); static bool AdvanceXLInsertBuffer(bool new_segment); *************** *** 956,961 **** begin:; --- 960,971 ---- RecPtr.xrecoff = XLogFileSize; } + /* + * Report the end of the prior segment, so that the walsenders know + * to send WAL up to that + */ + SetXLogSendRqst(RecPtr); + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->Write.LogwrtResult; if (!XLByteLE(RecPtr, LogwrtResult.Flush)) *************** *** 1083,1093 **** begin:; LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); /* * Flush through the end of the page containing XLOG_SWITCH, and * perform end-of-segment actions (eg, notifying archiver). */ - WriteRqst = XLogCtl->xlblocks[curridx]; FlushRqst.Write = WriteRqst; FlushRqst.Flush = WriteRqst; XLogWrite(FlushRqst, false, true); --- 1093,1110 ---- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + WriteRqst = XLogCtl->xlblocks[curridx]; + + /* + * Report the end of the page containing XLOG_SWITCH, so that the + * walsenders know to send WAL up to that + */ + SetXLogSendRqst(WriteRqst); + /* * Flush through the end of the page containing XLOG_SWITCH, and * perform end-of-segment actions (eg, notifying archiver). */ FlushRqst.Write = WriteRqst; FlushRqst.Flush = WriteRqst; XLogWrite(FlushRqst, false, true); *************** *** 1156,1161 **** begin:; --- 1173,1185 ---- LWLockRelease(WALInsertLock); + /* + * Report the current WAL insert location, so that the walsenders + * know to send WAL up to that. + */ + if (!isLogSwitch) + SetXLogSendRqst(RecPtr); + if (updrqst) { /* use volatile pointer to prevent code rearrangement */ *************** *** 1239,1244 **** XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, --- 1263,1413 ---- } /* + * Read bytes in WAL buffers into 'buf', up to 'endptr', starting at 'recptr'. + * + * If at least one byte was read, sets *sendidx to the cache index of next + * block to read, sets *ReadResult to the byte position we've already read, + * and returns the number of the read bytes. If nothing was read, returns zero. + * + * If we could read the log up to 'endptr', we can expect that the next block + * to send is available in WAL buffers, so sets *frombuf to true. Otherwise, + * false. + */ + Size + XLogBufRead(char *buf, XLogRecPtr recptr, XLogRecPtr endptr, int *sendidx, + XLogRecPtr *ReadResult, bool *frombuf) + { + bool last_iteration; + char *from; + Size nbytes; + Size readbytes = 0; + uint32 startpos; + + Assert(frombuf); + Assert(sendidx != -1); + + /* Calculate the starting position of reading in the first page */ + startpos = recptr.xrecoff % XLOG_BLCKSZ; + from = XLogCtl->pages + *sendidx * (Size) XLOG_BLCKSZ + startpos; + nbytes = (Size) XLOG_BLCKSZ - startpos; + + /* + * Within the loop, read one block at a time. Instead, like XLogWrite(), + * we can gather multiple blocks together and issue just one memcpy() call. + * But since we don't hold WALInsertLock here to avoid lock contention, + * some earlier blocks might be purged from the cache during being gathered. + * If this happens, we must start reading the log from the disk over again, + * which would degrade the performance of walsender. + * + * To avoid that redo, we read the block available in the cache as soon as + * possible. + */ + while (XLByteLT(recptr, endptr)) + { + XLogRecPtr prevptr; + + /* + * Advance recptr to end of current block. We read XLogCtl->xlblocks + * without holding neither WALInsertLock nor WALWriteLock to avoid lock + * contention. So it might be changed because of buffer replacement + * while being read. We check whether current block is in the cache, + * and if not, we give up advancing recptr and reading the log anymore. + * + * prevptr indicates the end + 1 of previous block, i.e., the start of + * current one. The check is done by comparing prevptr with the start + * of oldest block in the cache. + */ + prevptr = recptr; + recptr = XLogCtl->xlblocks[*sendidx]; + if (!XLByteInBuf(prevptr)) + { + if (readbytes > 0) + *ReadResult = prevptr; + + /* + * Since current block is not in the cache, we must read it from + * the disk + */ + *frombuf = false; + break; + } + + /* + * If XLOG_SWITCH is in the previous block, we send all the read data, + * and then forcibly advance to the start of the next segment + */ + if ((recptr.xrecoff - XLOG_BLCKSZ) % XLogSegSize == 0 && + prevptr.xrecoff % XLogSegSize != 0) + { + recptr.xrecoff -= XLOG_BLCKSZ; + if (recptr.xrecoff == 0) + { + recptr.xlogid -= 1; + recptr.xrecoff = XLogSegSize; + } + *ReadResult = recptr; + break; + } + + /* Is this the last loop iteration? */ + last_iteration = XLByteLE(endptr, recptr); + if (last_iteration) + { + uint32 endpos; + + if ((endpos = endptr.xrecoff % XLOG_BLCKSZ) != 0) + nbytes += endpos - (Size) XLOG_BLCKSZ; + } + + /* + * Attempt to read WAL buffers without holding any locks. So since the + * current block might be replaced during being read, we need to check + * whether it's still in the cache later. + */ + Assert(nbytes <= XLOG_BLCKSZ); + memcpy(buf, from, nbytes); + if (!XLByteInBuf(prevptr)) + { + if (readbytes > 0) + *ReadResult = prevptr; + + /* + * Since current block is not in the cache, we must read it from + * the disk + */ + *frombuf = false; + break; + } + readbytes += nbytes; + + if (last_iteration) + { + /* If we went beyond endptr, back off */ + if (XLByteLT(endptr, recptr)) + recptr = endptr; + else + *sendidx = NextBufIdx(*sendidx); + *ReadResult = recptr; + + /* + * We can expect that the next block to read is in WAL buffers + * since we could read all the data up to endptr from there + */ + *frombuf = true; + break; + } + + /* Update states for next read */ + *sendidx = NextBufIdx(*sendidx); + from = XLogCtl->pages + *sendidx * (Size) XLOG_BLCKSZ; + buf += nbytes; + nbytes = (Size) XLOG_BLCKSZ; + } + + return readbytes; + } + + /* * XLogArchiveNotify * * Create an archive notification file *************** *** 1493,1498 **** AdvanceXLInsertBuffer(bool new_segment) --- 1662,1681 ---- } /* + * Update the starting location of the oldest block in WAL buffers + * if streaming replication is enabled + */ + if (max_wal_senders > 0) + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->oldestCacheLSN = XLogCtl->xlblocks[nextidx]; + SpinLockRelease(&xlogctl->info_lck); + } + + /* * Now the next buffer slot is free and we can set it up to be the next * output page. */ *************** *** 1845,1850 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) --- 2028,2037 ---- * We make sure that the shared 'request' values do not fall behind the * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. + * + * Set XLogCtl->sendidx to the cache index of next block to write. This + * can be used as the index of next block to send after sending all WAL + * written to the disk before now. */ { /* use volatile pointer to prevent code rearrangement */ *************** *** 1852,1857 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) --- 2039,2045 ---- SpinLockAcquire(&xlogctl->info_lck); xlogctl->LogwrtResult = LogwrtResult; + xlogctl->sendidx = Write->curridx; if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write)) xlogctl->LogwrtRqst.Write = LogwrtResult.Write; if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush)) *************** *** 6313,6318 **** StartupXLOG(void) --- 6501,6513 ---- ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ; /* + * We don't need to calculate the accurate starting location of oldest + * block in the cache since we will never attempt to read the older data + * than EndOfLog. + */ + XLogCtl->oldestCacheLSN = EndOfLog; + + /* * Tricky point here: readBuf contains the *last* block that the LastRec * record spans, not the one it starts in. The last block is indeed the * one we want to use. *************** *** 6803,6822 **** GetInsertRecPtr(void) } /* ! * GetWriteRecPtr -- Returns the current write position. */ ! XLogRecPtr ! GetWriteRecPtr(void) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; - XLogRecPtr recptr; SpinLockAcquire(&xlogctl->info_lck); ! recptr = xlogctl->LogwrtResult.Write; SpinLockRelease(&xlogctl->info_lck); ! return recptr; } /* --- 6998,7030 ---- } /* ! * GetWriteRecPtrAndIndex -- Gets the current write position and associated index. */ ! void ! GetWriteRecPtrAndIndex(XLogRecPtr *recptr, int *index) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire(&xlogctl->info_lck); ! *recptr = xlogctl->LogwrtResult.Write; ! *index = xlogctl->sendidx; SpinLockRelease(&xlogctl->info_lck); + } ! /* XLByteInBuf -- Is the LSN within WAL buffers? */ ! static bool ! XLByteInBuf(XLogRecPtr recptr) ! { ! /* use volatile pointer to prevent code rearrangement */ ! volatile XLogCtlData *xlogctl = XLogCtl; ! XLogRecPtr oldestCacheLSN; ! ! SpinLockAcquire(&xlogctl->info_lck); ! oldestCacheLSN = xlogctl->oldestCacheLSN; ! SpinLockRelease(&xlogctl->info_lck); ! ! return XLByteLE(oldestCacheLSN, recptr); } /* *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 529,537 **** WalSndKill(int code, Datum arg) /* * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. */ static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) --- 529,534 ---- *************** *** 641,648 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) } /* ! * Read up to MAX_SEND_SIZE bytes of WAL that's been written to disk, ! * but not yet sent to the client, and send it. * * msgbuf is a work area in which the output message is constructed. It's * passed in just so we can avoid re-palloc'ing the buffer on each cycle. --- 638,645 ---- } /* ! * Read up to MAX_SEND_SIZE bytes of WAL that's been inserted to WAL buffers ! * or written to disk, but not yet sent to the client, and send it. * * msgbuf is a work area in which the output message is constructed. It's * passed in just so we can avoid re-palloc'ing the buffer on each cycle. *************** *** 661,677 **** XLogSend(char *msgbuf, bool *caughtup) XLogRecPtr endptr; Size nbytes; WalDataMessageHeader msghdr; /* ! * Attempt to send all data that's already been written out from WAL ! * buffers (note it might not yet be fsync'd to disk). We cannot go ! * further than that given the current implementation of XLogRead(). */ ! SendRqstPtr = GetWriteRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; return true; } --- 658,701 ---- XLogRecPtr endptr; Size nbytes; WalDataMessageHeader msghdr; + static int sendidx = -1; + static bool frombuf = false; + retry: /* ! * Attempt to send all data that's already been inserted into or written ! * out from WAL buffers (note it might not yet be fsync'd to disk) */ ! if (frombuf) ! { ! /* use volatile pointer to prevent code rearrangement */ ! WalSndCtlData *walsndctl = WalSndCtl; ! ! SpinLockAcquire(&walsndctl->info_lck); ! SendRqstPtr = walsndctl->sendRqst; ! SpinLockRelease(&walsndctl->info_lck); ! } ! else ! /* ! * XXX: if we've gotten within XLOG_BLCKSZ bytes of the current WAL ! * write location, we should attempt to read data from WAL buffers ! * instead of the disk? ! */ ! GetWriteRecPtrAndIndex(&SendRqstPtr, &sendidx); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) { + /* + * Attempt to read data from WAL buffers and send it, if we've + * already sent all WAL written to the disk + */ + if (!frombuf) + { + frombuf = true; + goto retry; + } + *caughtup = true; return true; } *************** *** 722,740 **** XLogSend(char *msgbuf, bool *caughtup) *caughtup = false; } - nbytes = endptr.xrecoff - startptr.xrecoff; - Assert(nbytes <= MAX_SEND_SIZE); - /* * OK to read and send the slice. */ msgbuf[0] = 'w'; ! /* ! * Read the log directly into the output buffer to avoid extra memcpy ! * calls. ! */ ! XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); /* * We fill the message header last so that the send timestamp is taken --- 746,798 ---- *caughtup = false; } /* * OK to read and send the slice. */ msgbuf[0] = 'w'; ! if (frombuf) ! { ! /* ! * Read the log from WAL buffers, up to endptr, starting at startptr. ! * If no log could be read, we immediately retry to read it from the ! * disk. ! */ ! if ((nbytes = XLogBufRead(msgbuf + 1 + sizeof(WalDataMessageHeader), ! startptr, endptr, &sendidx, &sentPtr, ! &frombuf)) <= 0) ! goto retry; ! ! /* ! * If we could not reach endptr though at least one byte was read, ! * we retry to read the log from the disk after sending the read data. ! */ ! if (!frombuf) ! *caughtup = false; ! } ! else ! { ! nbytes = endptr.xrecoff - startptr.xrecoff; ! Assert(nbytes <= MAX_SEND_SIZE); ! ! /* ! * Read the log directly into the output buffer to avoid extra memcpy ! * calls. ! */ ! XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); ! ! sentPtr = endptr; ! ! /* ! * Attempt to read data from WAL buffers and send it, if we've already ! * sent all WAL written to the disk ! */ ! if (*caughtup) ! { ! frombuf = true; ! *caughtup = false; ! } ! } /* * We fill the message header last so that the send timestamp is taken *************** *** 752,759 **** XLogSend(char *msgbuf, bool *caughtup) if (pq_flush()) return false; - sentPtr = endptr; - /* Update shared memory status */ { /* use volatile pointer to prevent code rearrangement */ --- 810,815 ---- *************** *** 880,885 **** WalSndShmemInit(void) --- 936,942 ---- { /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); + SpinLockInit(&WalSndCtl->info_lck); for (i = 0; i < max_wal_senders; i++) { *************** *** 890,895 **** WalSndShmemInit(void) --- 947,974 ---- } } + /* Record the LSN for walsenders to send WAL up to that */ + void + SetXLogSendRqst(XLogRecPtr recptr) + { + /* use volatile pointer to prevent code rearrangement */ + WalSndCtlData *walsndctl = WalSndCtl; + + /* + * Do nothing if streaming replication is disabled. + * + * XXX: even if it's enabled, should we skip recording if + * there is no walsender in progress? + */ + if (max_wal_senders == 0) + return; + + SpinLockAcquire(&walsndctl->info_lck); + if (XLByteLT(walsndctl->sendRqst, recptr)) + walsndctl->sendRqst = recptr; + SpinLockRelease(&walsndctl->info_lck); + } + /* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the *** a/src/include/access/xlog.h --- b/src/include/access/xlog.h *************** *** 266,271 **** extern int XLogFileInit(uint32 log, uint32 seg, --- 266,273 ---- bool *use_existent, bool use_lock); extern int XLogFileOpen(uint32 log, uint32 seg); + extern Size XLogBufRead(char *buf, XLogRecPtr recptr, XLogRecPtr endptr, + int *sendidx, XLogRecPtr *ReadResult, bool *frombuf); extern void XLogGetLastRemoved(uint32 *log, uint32 *seg); extern void XLogSetAsyncCommitLSN(XLogRecPtr record); *************** *** 294,300 **** extern bool CreateRestartPoint(int flags); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); ! extern XLogRecPtr GetWriteRecPtr(void); extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch); extern TimeLineID GetRecoveryTargetTLI(void); --- 296,302 ---- extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); ! extern void GetWriteRecPtrAndIndex(XLogRecPtr *recptr, int *index); extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch); extern TimeLineID GetRecoveryTargetTLI(void); *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 29,34 **** typedef struct WalSnd --- 29,38 ---- /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + XLogRecPtr sendRqst; /* last byte + 1 to send */ + + slock_t info_lck; /* protects the above field */ + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; *************** *** 45,50 **** extern int WalSenderMain(void); --- 49,55 ---- extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); + extern void SetXLogSendRqst(XLogRecPtr recptr); extern XLogRecPtr GetOldestWALSendPointer(void); #endif /* _WALSENDER_H */