diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 47312f6..535b5a9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1324,7 +1324,10 @@ RecordTransactionCommit(void) * in the procarray and continue to hold locks. */ if (wrote_xlog && markXidCommitted) + { + CausalReadsWaitForLSN(XactLastRecEnd); SyncRepWaitForLSN(XactLastRecEnd); + } /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -5117,6 +5120,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; /* + * Check if the caller would like to ask standbys for immediate feedback + * once this commit is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY || causal_reads) + xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; + + /* * Relcache invalidations requires information about the current database * and so does logical decoding. */ @@ -5452,6 +5462,19 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); + /* + * Record the primary's timestamp for the commit record, so it can be used + * for tracking replay lag. + */ + SetXLogReplayTimestamp(parsed->xact_time); + + /* + * If asked by the primary (because someone is waiting for a synchronous + * commit or causal reads), we will need to ask walreceiver to send a + * reply immediately. + */ + if (XactCompletionSyncApplyFeedback(parsed->xinfo)) + XLogRequestWalReceiverReply(); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 08d1682..7779c34 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -80,6 +80,8 @@ extern uint32 bootstrap_data_checksum_version; #define PROMOTE_SIGNAL_FILE "promote" #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote" +/* Size of the circular buffer of timestamped LSNs. */ +#define MAX_TIMESTAMPED_LSNS 256 /* User-settable parameters */ int max_wal_size = 64; /* 1 GB */ @@ -346,6 +348,12 @@ static XLogRecPtr RedoRecPtr; static bool doPageWrites; /* + * doRequestWalReceiverReply is used by recovery code to ask the main recovery + * loop to trigger a walreceiver reply. + */ +static bool doRequestWalReceiverReply; + +/* * RedoStartLSN points to the checkpoint's REDO location which is specified * in a backup label file, backup history file or control file. In standby * mode, XLOG streaming usually starts from the position where an invalid @@ -357,6 +365,13 @@ static bool doPageWrites; */ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; +/* + * LastReplayedTimestamp can be set by redo handlers when they apply a record + * that carries a timestamp, by calling SetXLogReplayedTimestamp. The xlog + * apply loop can then update the value in shared memory. + */ +static TimestampTz LastReplayedTimestamp = 0; + /*---------- * Shared-memory data structures for XLOG control * @@ -632,6 +647,21 @@ typedef struct XLogCtlData /* current effective recovery target timeline */ TimeLineID RecoveryTargetTLI; + /* timestamp from the most recently applied record that carried a timestamp. */ + TimestampTz lastReplayedTimestamp; + + /* + * We maintain a circular buffer of LSNs and associated timestamps. + * Walreceiver writes into it using information from timestamps, and the + * startup recovery process reads from it and notifies walreceiver when + * LSNs are replayed so that the timestamps can be fed back to the + * upstream server, to track lag. + */ + Index timestampedLsnRead; + Index timestampedLsnWrite; + XLogRecPtr timestampedLsn[MAX_TIMESTAMPED_LSNS]; + TimestampTz timestampedLsnTime[MAX_TIMESTAMPED_LSNS]; + /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery @@ -6844,14 +6874,57 @@ StartupXLOG(void) error_context_stack = errcallback.previous; /* - * Update lastReplayedEndRecPtr after this record has been - * successfully replayed. + * Update lastReplayedEndRecPtr and lastReplayedTimestamp + * after this record has been successfully replayed. */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->lastReplayedEndRecPtr = EndRecPtr; XLogCtl->lastReplayedTLI = ThisTimeLineID; + if (LastReplayedTimestamp != 0) + { + /* If replaying a record produced a timestamp, use that. */ + XLogCtl->lastReplayedTimestamp = LastReplayedTimestamp; + LastReplayedTimestamp = 0; + } + else + { + /* + * If we have applied LSNs associated with timestamps + * received by walreceiver, then use the recorded + * timestamp. We consume from the read end of the + * circular buffer. + */ + while (XLogCtl->timestampedLsnRead != + XLogCtl->timestampedLsnWrite && + XLogCtl->timestampedLsn[XLogCtl->timestampedLsnRead] + <= EndRecPtr) + { + if (XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead] > + XLogCtl->lastReplayedTimestamp) + { + XLogCtl->lastReplayedTimestamp = + XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead]; + doRequestWalReceiverReply = true; + } + ++XLogCtl->timestampedLsnRead; + } + } SpinLockRelease(&XLogCtl->info_lck); + /* + * If rm_redo reported that it applied a commit record that + * the master is waiting for by calling + * XLogRequestWalReceiverReply, or we encountered a WAL + * location that was associated with a timestamp above, then + * we wake up the receiver so that it notices the updated + * lastReplayedEndRecPtr and sends a reply to the master. + */ + if (doRequestWalReceiverReply) + { + doRequestWalReceiverReply = false; + WalRcvWakeup(); + } + /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -11580,3 +11653,103 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Called by redo code to indicate that the xlog replay loop should wake up + * the walreceiver process so that a reply can be sent to the primary. + */ +void +XLogRequestWalReceiverReply(void) +{ + doRequestWalReceiverReply = true; +} + +/* + * Merge timestamps from keepalive messages with the timestamps from WAL + * records, so that we can track lag while idle or while replaying large + * amounts of WAL without commit records. In the former case there is no lag, + * and in the latter case we will remember a timestamp that goes with an + * arbitrary LSN, and wait for that LSN to be replayed before using the + * timestamp. + * + * This is called by walreceiver on standby servers when keepalive messages + * arrive. + */ +void +SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn) +{ + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn == XLogCtl->lastReplayedEndRecPtr) + { + /* + * That is the last replayed LSN: we are fully replayed, so we can + * update the replay timestamp immediately. + */ + XLogCtl->lastReplayedTimestamp = timestamp; + } + else + { + /* + * There is WAL still to be applied. We will associate the timestamp + * with this WAL position and wait for it to be replayed. We add it + * at the 'write' end of the circular buffer of LSN/timestamp + * mappings, which the replay loop will eventually read. + */ + Index w = XLogCtl->timestampedLsnWrite; + Index r = XLogCtl->timestampedLsnRead; + + XLogCtl->timestampedLsn[w] = lsn; + XLogCtl->timestampedLsnTime[w] = timestamp; + + /* Advance the write point. */ + w = (w + 1) % MAX_TIMESTAMPED_LSNS; + XLogCtl->timestampedLsnWrite = w; + if (w == r) + { + /* + * The buffer is full. Advance the read point (throwing away + * oldest values; we will begin to oversestimate replay lag, until + * lag decreases to a size our buffer can manage, or the next + * commit record is replayed). + */ + r = (r + 1) % MAX_TIMESTAMPED_LSNS; + XLogCtl->timestampedLsnRead = r; + } + } + SpinLockRelease(&XLogCtl->info_lck); +} + +/* + * Set the timestamp for the most recently applied WAL record that carried a + * timestamp from the primary. This can be called by redo handlers that have + * an appropriate timestamp (currently only commit records). Updating the + * shared memory value is deferred until after the redo handler returns. + */ +void +SetXLogReplayTimestamp(TimestampTz timestamp) +{ + LastReplayedTimestamp = timestamp; +} + +/* + * Get the timestamp for the most recently applied WAL record that carried a + * timestamp from the master, and also the most recently applied LSN. (Note + * that the timestamp and the LSN don't necessarily relate to the same + * record.) + * + * This is similar to GetLatestXTime, except that it is not only advanced by + * commit records (see SetXLogReplayTimestampAtLsn). + */ +TimestampTz +GetXLogReplayTimestamp(XLogRecPtr *lsn) +{ + TimestampTz result; + + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn) + *lsn = XLogCtl->lastReplayedEndRecPtr; + result = XLogCtl->lastReplayedTimestamp; + SpinLockRelease(&XLogCtl->info_lck); + + return result; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ccc030f..f9b0e53 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -647,8 +647,10 @@ CREATE VIEW pg_stat_replication AS W.write_location, W.flush_location, W.replay_location, + W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.causal_reads_state FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W WHERE S.usesysid = U.oid AND diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 325239d..2abf299 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -57,6 +57,10 @@ #include "utils/builtins.h" #include "utils/ps_status.h" +/* GUC variables */ +int causal_reads_timeout; +bool causal_reads; + /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; @@ -69,7 +73,7 @@ static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); -static int SyncRepWakeQueue(bool all, int mode); +static int SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn); static int SyncRepGetStandbyPriority(void); @@ -83,6 +87,239 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * =========================================================== */ +static bool +SyncRepCheckEarlyExit(void) +{ + /* + * If a wait for synchronous replication is pending, we can neither + * acknowledge the commit nor raise ERROR or FATAL. The latter would + * lead the client to believe that that the transaction aborted, which + * is not true: it's already committed locally. The former is no good + * either: the client has requested synchronous replication, and is + * entitled to assume that an acknowledged commit is also replicated, + * which might not be true. So in this case we issue a WARNING (which + * some clients may be able to interpret) and shut off further output. + * We do NOT reset ProcDiePending, so that the process will die after + * the commit is cleaned up. + */ + if (ProcDiePending) + { + ereport(WARNING, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), + errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + whereToSendOutput = DestNone; + SyncRepCancelWait(); + return true; + } + + /* + * It's unclear what to do if a query cancel interrupt arrives. We + * can't actually abort at this point, but ignoring the interrupt + * altogether is not helpful, so we just terminate the wait with a + * suitable warning. + */ + if (QueryCancelPending) + { + QueryCancelPending = false; + ereport(WARNING, + (errmsg("canceling wait for synchronous replication due to user request"), + errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + SyncRepCancelWait(); + return true; + } + + /* + * If the postmaster dies, we'll probably never get an + * acknowledgement, because all the wal sender processes will exit. So + * just bail out. + */ + if (!PostmasterIsAlive()) + { + ProcDiePending = true; + whereToSendOutput = DestNone; + SyncRepCancelWait(); + return true; + } + + return false; +} + +/* + * Check if we can stop waiting for causal consistency. We can stop waiting + * when the following conditions are met: + * + * 1. All walsenders currently in 'joining' or 'available' state have + * applied the target LSN. + * + * 2. Any stall periods caused by standbys dropping out of 'available' state + * have passed, so that we can be sure that any causalReadsUntil authorization + * has expired. + * + * The output parameter 'waitingFor' is set to the number of nodes we are + * currently waiting for. The output parameters 'stallTimeMillis' is set to + * the number of milliseconds we need to wait for to observe any current + * commit stall. + * + * Returns true if commit can return control, because every standby has either + * applied the LSN or started rejecting causal_reads transactions. + */ +static bool +CausalReadsCommitCanReturn(XLogRecPtr XactCommitLSN, + int *waitingFor, + long *stallTimeMillis) +{ + int i; + TimestampTz now; + + /* Count how many joining/available nodes we are waiting for. */ + *waitingFor = 0; + for (i = 0; i < max_wal_senders; ++i) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* + * Assuming atomic read of pid_t, we can check walsnd->pid without + * acquiring the spinlock to avoid memory synchronization costs for + * unused walsender slots. We see a value that existed sometime at + * least as recently as the last memory barrier. + */ + if (walsnd->pid != 0) + { + /* + * We need to hold the spinlock to read LSNs, because we can't be + * sure they can be read atomically. + */ + SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid != 0 && walsnd->causal_reads_state >= WALSNDCRSTATE_JOINING) + { + if (walsnd->apply < XactCommitLSN) + ++*waitingFor; + } + SpinLockRelease(&walsnd->mutex); + } + } + + /* Check if there is a stall in progress that we need to observe. */ + now = GetCurrentTimestamp(); + LWLockAcquire(SyncRepLock, LW_SHARED); + if (WalSndCtl->stall_causal_reads_until > now) + { + long seconds; + int usecs; + + /* Compute how long we have to wait, rounded up to nearest ms. */ + TimestampDifference(now, WalSndCtl->stall_causal_reads_until, + &seconds, &usecs); + *stallTimeMillis = seconds * 1000 + (usecs + 999) / 1000; + } + else + *stallTimeMillis = 0; + LWLockRelease(SyncRepLock); + + /* We are done if we are not waiting for any nodes or stalls. */ + return *waitingFor == 0 && *stallTimeMillis == 0; +} + +/* + * Wait for causal consistency in causal_reads mode, if requested by user. + */ +void +CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN) +{ + long stallTimeMillis; + int waitingFor; + + /* Leave if we aren't in causal_reads mode. */ + if (!causal_reads) + return; + + for (;;) + { + /* Reset latch before checking state. */ + ResetLatch(MyLatch); + + /* + * Join the queue to be woken up if any causal reads joining/available + * standby applies XactCommitLSN, if we aren't already in it. We + * don't actually know if we need to wait for any peers yet, but we + * have to register just in case before checking the walsenders' state + * to avoid a race condition that could occur if we did it after + * calling CausalReadsCommitCanReturn. (SyncRepWaitForLSN doesn't + * have to do this because it can check the highest-seen LSN in + * walsndctl->lsn[mode] which is protected by SyncRepLock, the same + * lock as the queues. We can't do that here, because there is no + * single highest-seen LSN that is useful. We must check + * walsnd->apply for all relevant walsenders. Therefore we must + * register for notifications first, so that we can be notified via + * our latch of any standby applying the LSN we're interested in after + * we check but before we start waiting, or we could wait forever for + * something that has already happened.) + */ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + if (MyProc->syncRepState != SYNC_REP_WAITING) + { + MyProc->waitLSN = XactCommitLSN; + MyProc->syncRepState = SYNC_REP_WAITING; + SyncRepQueueInsert(SYNC_REP_WAIT_CAUSAL_READS_APPLY); + Assert(SyncRepQueueIsOrderedByLSN(SYNC_REP_WAIT_CAUSAL_READS_APPLY)); + } + LWLockRelease(SyncRepLock); + + /* Check if we're done. */ + if (CausalReadsCommitCanReturn(XactCommitLSN, &waitingFor, &stallTimeMillis)) + { + SyncRepCancelWait(); + break; + } + + Assert(waitingFor > 0 || stallTimeMillis > 0); + + /* If we aren't actually waiting for any standbys, leave the queue. */ + if (waitingFor == 0) + SyncRepCancelWait(); + + /* Update the ps title. */ + if (update_process_title) + { + char buffer[80]; + + snprintf(buffer, sizeof(buffer), + "waiting for %d peer(s) to apply %X/%X%s", + waitingFor, + (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN, + stallTimeMillis > 0 ? " (stalling)" : ""); + set_ps_display(buffer, false); + } + + /* Check if we need to exit early due to postmaster death etc. */ + if (SyncRepCheckEarlyExit()) /* Calls SyncRepCancelWait() if true. */ + break; + + /* + * If are still waiting for peers, then we wait for any joining or + * available peer to reach the LSN (or possibly stop being in one of + * those states or go away). + * + * If not, there must be a non-zero stall time, so we wait for that to + * elapse. + */ + if (waitingFor > 0) + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1); + else + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, + stallTimeMillis); + } + + /* There is no way out of the loop that could leave us in the queue. */ + Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + MyProc->waitLSN = 0; + + if (update_process_title) + set_ps_display("", false); /* TODO: restore what was there */ +} + /* * Wait for synchronous replication, if requested by user. * @@ -180,57 +417,9 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) if (syncRepState == SYNC_REP_WAIT_COMPLETE) break; - /* - * If a wait for synchronous replication is pending, we can neither - * acknowledge the commit nor raise ERROR or FATAL. The latter would - * lead the client to believe that that the transaction aborted, which - * is not true: it's already committed locally. The former is no good - * either: the client has requested synchronous replication, and is - * entitled to assume that an acknowledged commit is also replicated, - * which might not be true. So in this case we issue a WARNING (which - * some clients may be able to interpret) and shut off further output. - * We do NOT reset ProcDiePending, so that the process will die after - * the commit is cleaned up. - */ - if (ProcDiePending) - { - ereport(WARNING, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); - whereToSendOutput = DestNone; - SyncRepCancelWait(); - break; - } - - /* - * It's unclear what to do if a query cancel interrupt arrives. We - * can't actually abort at this point, but ignoring the interrupt - * altogether is not helpful, so we just terminate the wait with a - * suitable warning. - */ - if (QueryCancelPending) - { - QueryCancelPending = false; - ereport(WARNING, - (errmsg("canceling wait for synchronous replication due to user request"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); - SyncRepCancelWait(); + /* Check if we need to exit early due to postmaster death etc. */ + if (SyncRepCheckEarlyExit()) break; - } - - /* - * If the postmaster dies, we'll probably never get an - * acknowledgement, because all the wal sender processes will exit. So - * just bail out. - */ - if (!PostmasterIsAlive()) - { - ProcDiePending = true; - whereToSendOutput = DestNone; - SyncRepCancelWait(); - break; - } /* * Wait on latch. Any condition that should wake us up will set the @@ -410,22 +599,27 @@ SyncRepGetSynchronousStandby(void) * perhaps also which information we store as well. */ void -SyncRepReleaseWaiters(void) +SyncRepReleaseWaiters(bool walsender_cr_available_or_joining) { volatile WalSndCtlData *walsndctl = WalSndCtl; WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; + int numcausalreadsapply = 0; + bool is_highest_priority_sync_standby; /* * If this WALSender is serving a standby that is not on the list of - * potential standbys then we have nothing to do. If we are still starting - * up, still running base backup or the current flush position is still - * invalid, then leave quickly also. + * potential standbys and not in a state that causal_reads waits for, then + * we have nothing to do. If we are still starting up, still running base + * backup or the current flush position is still invalid, then leave + * quickly also. */ - if (MyWalSnd->sync_standby_priority == 0 || - MyWalSnd->state < WALSNDSTATE_STREAMING || - XLogRecPtrIsInvalid(MyWalSnd->flush)) + if (!walsender_cr_available_or_joining && + (MyWalSnd->sync_standby_priority == 0 || + MyWalSnd->state < WALSNDSTATE_STREAMING || + XLogRecPtrIsInvalid(MyWalSnd->flush))) return; /* @@ -435,45 +629,77 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); syncWalSnd = SyncRepGetSynchronousStandby(); - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + /* + * If we aren't managing the highest priority standby then make a note of + * that so we can announce a takeover in the log if we ever get that job. + */ + is_highest_priority_sync_standby = syncWalSnd == MyWalSnd; + if (!is_highest_priority_sync_standby) + announce_next_takeover = true; /* - * If we aren't managing the highest priority standby then just leave. + * If we aren't managing the highest priority standby or a standby in + * causal reads 'joining' or 'available' state, then just leave. */ - if (syncWalSnd != MyWalSnd) + if (!is_highest_priority_sync_standby && !walsender_cr_available_or_joining) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; return; } /* * Set the lsn first so that when we wake backends they will release up to - * this location. + * this location. For the single-standby synchronous commit levels, we + * only do this if we are the current synchronous standby and we are + * advancing the LSN further than it has been advanced before, so that + * SyncRepWaitForLSN can skip waiting in some cases. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) - { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; - numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); - } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (is_highest_priority_sync_standby) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; - numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + { + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, + MyWalSnd->write); + } + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->write) + { + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, + MyWalSnd->flush); + } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, + MyWalSnd->apply); + } } + /* + * For causal_reads, all walsenders currently in available or joining + * state must reach the LSN on their own, and standbys will reach LSNs in + * any order. It doesn't make sense to keep the highest seen LSN in a + * single walsndctl->lsn element. (CausalReadsWaitForLSN has handling for + * LSNs that have already been reached). + */ + if (walsender_cr_available_or_joining) + numcausalreadsapply = + SyncRepWakeQueue(false, SYNC_REP_WAIT_CAUSAL_READS_APPLY, + MyWalSnd->apply); LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X, %d procs to causal_reads apply", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply, + numcausalreadsapply); /* * If we are managing the highest priority standby, though we weren't * prior to this, then announce we are now the sync standby. */ - if (announce_next_takeover) + if (is_highest_priority_sync_standby && announce_next_takeover) { announce_next_takeover = false; ereport(LOG, @@ -548,9 +774,8 @@ SyncRepGetStandbyPriority(void) * Must hold SyncRepLock. */ static int -SyncRepWakeQueue(bool all, int mode) +SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn) { - volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; @@ -567,7 +792,7 @@ SyncRepWakeQueue(bool all, int mode) /* * Assume the queue is ordered by LSN */ - if (!all && walsndctl->lsn[mode] < proc->waitLSN) + if (!all && lsn < proc->waitLSN) return numprocs; /* @@ -627,7 +852,7 @@ SyncRepUpdateSyncStandbysDefined(void) int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + SyncRepWakeQueue(true, i, InvalidXLogRecPtr); } /* @@ -679,6 +904,28 @@ SyncRepQueueIsOrderedByLSN(int mode) #endif /* + * Make sure that CausalReadsWaitForLSN can't return until after + * any 'causalReadsUntil' time that walsender could possibly have sent to any + * standby in a keepalive message. This maintains the causal consistency + * guarantee: in causal_reads mode, we will not return control until any + * standby we have lost contact with has started generating 'standby not + * available for causal reads' errors. + * + * Also, wake up all backends waiting in CausalReadsWaitForLSN, because the + * set of available/joining peers has changed, and there is a new stall time + * they need to observe. + */ +void +CausalReadsBeginStall(void) +{ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + WalSndCtl->stall_causal_reads_until = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), causal_reads_timeout); + SyncRepWakeQueue(true, SYNC_REP_WAIT_CAUSAL_READS_APPLY, InvalidXLogRecPtr); + LWLockRelease(SyncRepLock); +} + +/* * =========================================================== * Synchronous Replication functions executed by any process * =========================================================== @@ -728,6 +975,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 183a3a5..94dd334 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -52,6 +52,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" @@ -96,6 +97,7 @@ static uint32 recvOff = 0; */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGUSR1 = false; /* * LogstreamResult indicates the byte positions that we have already @@ -140,7 +142,8 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); -static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *causalReadsUntil); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -195,6 +198,7 @@ WalReceiverMain(void) WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; bool ping_sent; + bool forceReply; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -246,6 +250,7 @@ WalReceiverMain(void) /* Initialise to a sanish value */ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp(); + walrcv->causalReadsUntil = 0; SpinLockRelease(&walrcv->mutex); @@ -410,7 +415,7 @@ WalReceiverMain(void) * Process the received data, and any subsequent data we * can read without blocking. */ - for (;;) + while (!got_SIGUSR1) { if (len > 0) { @@ -437,8 +442,16 @@ WalReceiverMain(void) len = walrcv_receive(0, &buf); } + if (got_SIGUSR1) + { + /* The recovery process asked us to force a reply. */ + got_SIGUSR1 = false; + forceReply = true; + } + /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(forceReply, false); + forceReply = false; /* * If we've written some records, flush them to disk and @@ -493,7 +506,15 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + /* Check if the startup process has signaled us. */ + if (got_SIGUSR1) + { + got_SIGUSR1 = false; + forceReply = true; + } + + XLogWalRcvSendReply(requestReply || forceReply, requestReply); + forceReply = false; XLogWalRcvSendHSFeedback(false); } } @@ -730,6 +751,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) { int save_errno = errno; + got_SIGUSR1 = true; latch_sigusr1_handler(); errno = save_errno; @@ -795,6 +817,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) XLogRecPtr walEnd; TimestampTz sendTime; bool replyRequested; + TimestampTz causalReadsUntil; resetStringInfo(&incoming_message); @@ -815,7 +838,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( pq_getmsgint64(&incoming_message)); - ProcessWalSndrMessage(walEnd, sendTime); + ProcessWalSndrMessage(walEnd, sendTime, NULL); buf += hdrlen; len -= hdrlen; @@ -825,7 +848,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) case 'k': /* Keepalive */ { /* copy message to StringInfo */ - hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(int64); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -837,8 +860,12 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) sendTime = IntegerTimestampToTimestampTz( pq_getmsgint64(&incoming_message)); replyRequested = pq_getmsgbyte(&incoming_message); + causalReadsUntil = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); + ProcessWalSndrMessage(walEnd, sendTime, &causalReadsUntil); - ProcessWalSndrMessage(walEnd, sendTime); + /* Remember primary's timestamp at this WAL location. */ + SetXLogReplayTimestampAtLsn(sendTime, walEnd); /* If the primary requested a reply, send one immediately */ if (replyRequested) @@ -1032,6 +1059,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) XLogRecPtr applyPtr; static TimestampTz sendTime = 0; TimestampTz now; + TimestampTz applyTimestamp = 0; /* * If the user doesn't want status to be reported to the master, be sure @@ -1063,7 +1091,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyTimestamp = GetXLogReplayTimestamp(&applyPtr); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); @@ -1071,6 +1099,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp)); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ @@ -1169,15 +1198,52 @@ XLogWalRcvSendHSFeedback(bool immed) * Update shared memory status upon receiving a message from primary. * * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest - * message, reported by primary. + * message, reported by primary. 'causalReadsUntil' is a pointer to + * the time the primary promises that this standby can safely claim to be + * causally consistent, to 0 if it cannot, or a NULL pointer for no change. */ static void -ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) +ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *causalReadsUntil) { WalRcvData *walrcv = WalRcv; TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); + /* Sanity check for the causalReadsUntil time. */ + if (causalReadsUntil != NULL && *causalReadsUntil != 0) + { + /* Deduce max_clock_skew from the causalReadsUntil and sendTime. */ +#ifdef HAVE_INT64_TIMESTAMP + int64 diffMillis = (*causalReadsUntil - sendTime) / 1000; +#else + int64 diffMillis = (*causalReadsUntil - sendTime) * 1000; +#endif + int64 max_clock_skew = diffMillis / (CAUSAL_READS_CLOCK_SKEW_RATIO - 1); + + if (sendTime > TimestampTzPlusMilliseconds(lastMsgReceiptTime, max_clock_skew)) + { + /* + * The primary's clock is more than max_clock_skew + network + * latency ahead of the standby's clock. (If the primary's clock + * is more than max_clock_skew ahead of the standby's clock, but + * by less than the network latency, then there isn't much we can + * do to detect that; but it still seems useful to have this basic + * sanity check for wildly misconfigured servers.) + */ + elog(LOG, "the primary server's clock time is too far ahead"); + causalReadsUntil = NULL; + } + /* + * We could also try to detect cases where sendTime is more than + * max_clock_skew in the past according to the standby's clock, but + * that is indistinguishable from network latency/buffering, so we + * could produce misleading error messages; if we do nothing, the + * consequence is 'standby is not available for causal reads' errors + * which should cause the user to investigate. + */ + } + /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); if (walrcv->latestWalEnd < walEnd) @@ -1185,6 +1251,8 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) walrcv->latestWalEnd = walEnd; walrcv->lastMsgSendTime = sendTime; walrcv->lastMsgReceiptTime = lastMsgReceiptTime; + if (causalReadsUntil != NULL) + walrcv->causalReadsUntil = *causalReadsUntil; SpinLockRelease(&walrcv->mutex); if (log_min_messages <= DEBUG2) @@ -1215,3 +1283,23 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) pfree(receipttime); } } + +/* + * Wake up the walreceiver if it happens to be blocked in walrcv_receive, + * and tell it that a commit record has been applied. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = apply or causal_reads = on. + * + * TODO: This may change -- Simon Riggs suggested latches for this. Maybe + * pipes would work too (and avoid interrupting systems calls and allow for + * multiplexed IO with the replication socket). + */ +void +WalRcvWakeup(void) +{ + if (WalRcv->pid != 0) + kill(WalRcv->pid, SIGUSR1); +} diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 4452f25..db9c397 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -25,9 +25,11 @@ #include "access/xlog_internal.h" #include "postmaster/startup.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/guc.h" #include "utils/timestamp.h" WalRcvData *WalRcv = NULL; @@ -374,3 +376,23 @@ GetReplicationTransferLatency(void) return ms; } + +/* + * Used by snapmgr to check if this standby has been authorized by the primary + * server to consider itself available for causal reads. That is, to have + * applied all commits for which the COMMIT command has returned control on + * the primary server. + */ +bool +WalRcvCausalReadsAvailable(void) +{ + WalRcvData *walrcv = WalRcv; + TimestampTz now = GetCurrentTimestamp(); + bool result; + + SpinLockAcquire(&walrcv->mutex); + result = walrcv->causalReadsUntil != 0 && now <= walrcv->causalReadsUntil; + SpinLockRelease(&walrcv->mutex); + + return result; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4a4643e..01b9c20 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -153,9 +153,14 @@ static StringInfoData tmpbuf; */ static TimestampTz last_reply_timestamp = 0; +static TimestampTz last_keepalive_timestamp = 0; + /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; +/* How long do need to stay in JOINING state? */ +static TimestampTz causal_reads_joining_until = 0; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -242,6 +247,57 @@ InitWalSender(void) } /* + * If we are exiting unexpectedly, we may need to communicate with concurrent + * causal_reads commits to maintain the causal consistency guarantee. + */ +static void +PrepareUncleanExit(void) +{ + if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + /* + * We've lost contact with the standby, but it may still be alive. We + * can't let any causal_reads transactions return until we've stalled + * for long enough for a zombie standby to start raising errors due + * to lack of keepalives with early enough timestamps. + */ + elog(LOG, "standby \"%s\" is lost (no longer available for causal reads)", application_name); + CausalReadsBeginStall(); + + /* + * We set the state to a lower level _after_ beginning the stall, + * otherwise there would be a tiny window where commits could return + * without observing the stall. + */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* + * We are shutting down because we received a goodbye message from the + * walreceiver. + */ +static void +PrepareCleanExit(void) +{ + if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + /* + * The standby is shutting down, so it won't be running any more + * transations. It is therefore safe to stop waiting for it, and no + * stall is necessary. + */ + elog(LOG, "standby \"%s\" is leaving (no longer available for causal reads)", application_name); + + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* * Clean up after an error. * * WAL sender processes don't use transactions like regular backends do. @@ -264,7 +320,10 @@ WalSndErrorCleanup(void) replication_active = false; if (walsender_ready_to_stop) + { + PrepareUncleanExit(); proc_exit(0); + } /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); @@ -276,6 +335,8 @@ WalSndErrorCleanup(void) static void WalSndShutdown(void) { + PrepareUncleanExit(); + /* * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. @@ -1386,6 +1447,7 @@ ProcessRepliesIfAny(void) if (r < 0) { /* unexpected error or EOF */ + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1402,6 +1464,7 @@ ProcessRepliesIfAny(void) resetStringInfo(&reply_message); if (pq_getmessage(&reply_message, 0)) { + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1451,6 +1514,7 @@ ProcessRepliesIfAny(void) * 'X' means that the standby is closing down the socket. */ case 'X': + PrepareCleanExit(); proc_exit(0); default: @@ -1543,15 +1607,29 @@ ProcessStandbyReplyMessage(void) XLogRecPtr writePtr, flushPtr, applyPtr; + int applyLagMs; bool replyRequested; + TimestampTz now = GetCurrentTimestamp(); + TimestampTz applyTimestamp; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); replyRequested = pq_getmsgbyte(&reply_message); + /* Compute the apply lag in milliseconds. */ + if (applyTimestamp == 0) + applyLagMs = -1; + else +#ifdef HAVE_INT64_TIMESTAMP + applyLagMs = (now - applyTimestamp) / 1000; +#else + applyLagMs = (now - applyTimestamp) * 1000.0; +#endif + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, @@ -1568,16 +1646,81 @@ ProcessStandbyReplyMessage(void) */ { WalSnd *walsnd = MyWalSnd; + WalSndCausalReadsState causal_reads_state = walsnd->causal_reads_state; + bool causal_reads_state_changed = false; + + /* + * Handle causal reads state transitions, if a causal_reads_timeout is + * configured. + */ + if (causal_reads_timeout != 0) + { + if (applyLagMs >= 0 && applyLagMs < causal_reads_timeout) + { + if (causal_reads_state == WALSNDCRSTATE_UNAVAILABLE) + { + causal_reads_state = WALSNDCRSTATE_JOINING; + causal_reads_joining_until = + TimestampTzPlusMilliseconds(now, causal_reads_timeout); + causal_reads_state_changed = true; + } + else if (causal_reads_state == WALSNDCRSTATE_JOINING && + now >= causal_reads_joining_until) + { + causal_reads_state = WALSNDCRSTATE_AVAILABLE; + causal_reads_state_changed = true; + } + } + else + { + if (causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + causal_reads_state_changed = true; + /* + * We are dropping a causal reads available standby, so we + * mustn't let any commit command that is waiting in + * CausalReadsWaitForLSN return until we are sure that the + * standby definitely knows that it's not available and + * starts raising errors for causal_reads transactions. + */ + CausalReadsBeginStall(); + } + else if (causal_reads_state == WALSNDCRSTATE_JOINING) + { + /* + * Dropping a joining standby doesn't require a stall, + * because the standby doesn't think it's available, so + * it's already raising the error for causal_reads + * transactions. + */ + causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + causal_reads_state_changed = true; + } + } + } SpinLockAcquire(&walsnd->mutex); walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; + walsnd->applyLagMs = applyLagMs; + walsnd->causal_reads_state = causal_reads_state; SpinLockRelease(&walsnd->mutex); + + if (causal_reads_state_changed) + { + WalSndKeepalive(true); + elog(LOG, "standby \"%s\" is %s", application_name, + causal_reads_state == WALSNDCRSTATE_UNAVAILABLE ? "unavailable for causal reads" : + causal_reads_state == WALSNDCRSTATE_JOINING ? "joining as a causal reads standby..." : + causal_reads_state == WALSNDCRSTATE_AVAILABLE ? "available for causal reads" : + "UNKNOWN"); + } } if (!am_cascading_walsender) - SyncRepReleaseWaiters(); + SyncRepReleaseWaiters(MyWalSnd->causal_reads_state >= WALSNDCRSTATE_JOINING); /* * Advance our local xmin horizon when the client confirmed a flush. @@ -1724,27 +1867,34 @@ WalSndComputeSleeptime(TimestampTz now) { long sleeptime = 10000; /* 10 s */ - if (wal_sender_timeout > 0 && last_reply_timestamp > 0) + if ((wal_sender_timeout > 0 || causal_reads_timeout > 0) && last_reply_timestamp > 0) { TimestampTz wakeup_time; long sec_to_timeout; int microsec_to_timeout; - /* - * At the latest stop sleeping once wal_sender_timeout has been - * reached. - */ - wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); - - /* - * If no ping has been sent yet, wakeup when it's time to do so. - * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of - * the timeout passed without a response. - */ - if (!waiting_for_ping_response) + if (causal_reads_timeout != 0) + wakeup_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + causal_reads_timeout / + CAUSAL_READS_KEEPALIVE_RATIO); + else + { + /* + * At the latest stop sleeping once wal_sender_timeout has been + * reached. + */ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + wal_sender_timeout); + + /* + * If no ping has been sent yet, wakeup when it's time to do so. + * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of + * the timeout passed without a response. + */ + if (!waiting_for_ping_response) + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + } /* Compute relative time until wakeup. */ TimestampDifference(now, wakeup_time, @@ -1765,15 +1915,28 @@ static void WalSndCheckTimeOut(TimestampTz now) { TimestampTz timeout; + int allowed_time; /* don't bail out if we're doing something that doesn't require timeouts */ if (last_reply_timestamp <= 0) return; + /* + * If a causal_reads_timeout is configured, it is used instead of + * wal_sender_timeout. Ideally we'd use causal_reads_timeout / 2 + + * allowance for network latency, but since walreceiver can become quite + * bogged down fsyncing WAL we allow more tolerance. (This could be + * tightened up once standbys hand writing off to the WAL writer). + */ + if (causal_reads_timeout != 0) + allowed_time = causal_reads_timeout; + else + allowed_time = wal_sender_timeout; + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); + allowed_time); - if (wal_sender_timeout > 0 && now >= timeout) + if (allowed_time > 0 && now >= timeout) { /* * Since typically expiration of replication timeout means @@ -1963,6 +2126,7 @@ InitWalSenderSlot(void) walsnd->pid = MyProcPid; walsnd->sentPtr = InvalidXLogRecPtr; walsnd->state = WALSNDSTATE_STARTUP; + walsnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ @@ -2732,6 +2896,24 @@ WalSndGetStateString(WalSndState state) return "UNKNOWN"; } +/* + * Return a string constant representing the causal reads state. This is used + * in system views, and should *not* be translated. + */ +static const char * +WalSndGetCausalReadsStateString(WalSndCausalReadsState causal_reads_state) +{ + switch (causal_reads_state) + { + case WALSNDCRSTATE_UNAVAILABLE: + return "unavailable"; + case WALSNDCRSTATE_JOINING: + return "joining"; + case WALSNDCRSTATE_AVAILABLE: + return "available"; + } + return "UNKNOWN"; +} /* * Returns activity of walsenders, including pids and xlog locations sent to @@ -2740,7 +2922,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 8 +#define PG_STAT_GET_WAL_SENDERS_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2788,8 +2970,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int applyLagMs; int priority; WalSndState state; + WalSndCausalReadsState causalReadsState; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2799,9 +2983,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockAcquire(&walsnd->mutex); sentPtr = walsnd->sentPtr; state = walsnd->state; + causalReadsState = walsnd->causal_reads_state; write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + applyLagMs = walsnd->applyLagMs; priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); @@ -2833,6 +3019,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[5] = true; values[5] = LSNGetDatum(apply); + if (applyLagMs < 0) + nulls[6] = true; + else + { + Interval *applyLagInterval = palloc(sizeof(Interval)); + + applyLagInterval->month = 0; + applyLagInterval->day = 0; +#ifdef HAVE_INT64_TIMESTAMP + applyLagInterval->time = applyLagMs * 1000; +#else + applyLagInterval->time = applyLagMs / 1000.0; +#endif + nulls[6] = false; + values[6] = IntervalPGetDatum(applyLagInterval); + } + /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an @@ -2840,18 +3043,21 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; - values[6] = Int32GetDatum(priority); + values[7] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely * informational, not different from priority. */ if (priority == 0) - values[7] = CStringGetTextDatum("async"); + values[8] = CStringGetTextDatum("async"); else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); + values[8] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + values[8] = CStringGetTextDatum("potential"); + + values[9] = + CStringGetTextDatum(WalSndGetCausalReadsStateString(causalReadsState)); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -2871,14 +3077,50 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) static void WalSndKeepalive(bool requestReply) { + TimestampTz now; + TimestampTz causal_reads_until; + elog(DEBUG2, "sending replication keepalive"); + /* + * If the walsender currently deems the standby to be available for causal + * reads, then we authorize the standby to consider itself avialable until + * a certain time in the future. If we lose contact with the standby or + * drop it from the set of standbys we wait for in causal_reads mode + * because of excessive lag, then we'll stall until after that time to + * maintain our causal consistency guarantee. + */ + now = GetCurrentTimestamp(); + if (MyWalSnd->causal_reads_state < WALSNDCRSTATE_AVAILABLE) + causal_reads_until = 0; /* Not available */ + else + { + /* + * Since this timestamp is being sent to the standby where it will be + * compared against a time generated by the standby's system clock, we + * must consider clock skew. First, we decide on a maximum tolerable + * difference between system clocks. If the primary's clock is ahead + * of the standby's by more than this, then all bets are off (the + * standby could falsely believe it is available). If the primary's + * clock is behind the standby's by more than this, then the standby + * will err the other way and generate spurious errors in + * causal_reads mode. Rather than having a separate GUC for this, + * we derive it from causal_reads_timeout. + */ + int max_clock_skew = causal_reads_timeout / CAUSAL_READS_CLOCK_SKEW_RATIO; + + causal_reads_until = + TimestampTzPlusMilliseconds(now, + causal_reads_timeout - max_clock_skew); + } + /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); pq_sendint64(&output_message, sentPtr); - pq_sendint64(&output_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(now)); pq_sendbyte(&output_message, requestReply ? 1 : 0); + pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(causal_reads_until)); /* ... and send it wrapped in CopyData */ pq_putmessage_noblock('d', output_message.data, output_message.len); @@ -2896,23 +3138,32 @@ WalSndKeepaliveIfNecessary(TimestampTz now) * Don't send keepalive messages if timeouts are globally disabled or * we're doing something not partaking in timeouts. */ - if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) + if ((wal_sender_timeout <= 0 && causal_reads_timeout == 0) || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + if (waiting_for_ping_response && causal_reads_timeout == 0) return; /* * If half of wal_sender_timeout has lapsed without receiving any reply * from the standby, send a keep-alive message to the standby requesting * an immediate reply. + * + * If causal_reads_timeout has been configured, use it to control + * keepalive intervals rather than wal_sender_timeout. */ - ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + if (causal_reads_timeout != 0) + ping_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + causal_reads_timeout / + CAUSAL_READS_KEEPALIVE_RATIO); + else + ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); if (now >= ping_time) { WalSndKeepalive(true); waiting_for_ping_response = true; + last_keepalive_timestamp = now; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 8fbb310..12c8b88 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1611,6 +1611,20 @@ IntegerTimestampToTimestampTz(int64 timestamp) #endif /* + * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format + * + * When compiled with --enable-integer-datetimes, this is implemented as a + * no-op macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +int64 +TimestampTzToIntegerTimestamp(TimestampTz timestamp) +{ + return timestamp * 1000000; +} +#endif + +/* * TimestampDifference -- convert the difference between two timestamps * into integer seconds and microseconds * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index fda0fb9..23cfa85 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = { static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, + {"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, @@ -1618,6 +1619,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"causal_reads", PGC_USERSET, REPLICATION_STANDBY, + gettext_noop("Enables causal reads."), + NULL + }, + &causal_reads, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -1776,6 +1787,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"causal_reads_timeout", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the maximum apply lag before causal reads standbys are no longer available."), + NULL, + GUC_UNIT_MS + }, + &causal_reads_timeout, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 074935c..a466732 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -46,8 +46,11 @@ #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" #include "lib/pairingheap.h" #include "miscadmin.h" +#include "replication/syncrep.h" +#include "replication/walreceiver.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -209,6 +212,16 @@ GetTransactionSnapshot(void) "cannot take query snapshot during a parallel operation"); /* + * In causal_reads mode on a standby, check if we have definitely + * applied WAL for any COMMIT that returned successfully on the + * primary. + * + * TODO: Machine readable error code? + */ + if (causal_reads && RecoveryInProgress() && !WalRcvCausalReadsAvailable()) + elog(ERROR, "standby is not available for causal reads"); + + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must * make a copy of it rather than returning CurrentSnapshotData diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cb1c2db..0f08ff5 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -60,7 +60,11 @@ typedef enum SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote * write */ - SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_APPLY, /* wait for local flush and remote + * apply */ + SYNCHRONOUS_COMMIT_CONSISTENT_APPLY /* wait for local flusha and remote + apply with causal consistency */ } SyncCommitLevel; /* Define the default setting for synchonous_commit */ @@ -144,10 +148,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, * EOXact... routines which run at the end of the original transaction * completion. */ +#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK (1U << 29) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) /* Access macros for above flags */ +#define XactCompletionSyncApplyFeedback(xinfo) \ + (!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK)) #define XactCompletionRelcacheInitFileInval(xinfo) \ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)) #define XactCompletionForceSyncCommit(xinfo) \ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 790ca66..8aeda11 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -235,6 +235,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); +extern void SetXLogReplayTimestamp(TimestampTz timestamp); +extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn); +extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn); extern bool RecoveryIsPaused(void); extern void SetRecoveryPause(bool recoveryPause); extern TimestampTz GetLatestXTime(void); @@ -267,6 +270,8 @@ extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogRequestWalReceiverReply(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index d8640db..acb6796 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2783,7 +2783,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 0 f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f f f t s r 1 0 2249 "23" "{23,26,23,26,25,25,25,16,1184,1184,1184,1184,869,25,23,28,28,16,25,25,23,16,25}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,waiting,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,ssl,sslversion,sslcipher,sslbits,sslcompression,sslclientdn}" _null_ _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25,25}" "{o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state,causal_reads_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 0 f f f f t f s r 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 71e2857..5746383 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -23,14 +23,33 @@ #define SYNC_REP_NO_WAIT -1 #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 +#define SYNC_REP_WAIT_CAUSAL_READS_APPLY 3 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 4 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* + * ratio of causal_read_timeout to max_clock_skew (4 means than the maximum + * tolerated clock difference between primary and standbys using causal_reads + * is 1/4 of causal_reads_timeout) + */ +#define CAUSAL_READS_CLOCK_SKEW_RATIO 4 + +/* + * ratio of causal_reads_timeout to keepalive time (2 means that the effective + * keepalive time is 1/2 of the causal_reads_timeout GUC when it is non-zero) + */ +#define CAUSAL_READS_KEEPALIVE_RATIO 2 + +/* GUC variables */ +extern int causal_reads_timeout; +extern bool causal_reads; + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; @@ -42,11 +61,17 @@ extern void SyncRepCleanupAtProcExit(void); /* called by wal sender */ extern void SyncRepInitConfig(void); -extern void SyncRepReleaseWaiters(void); +extern void SyncRepReleaseWaiters(bool walsender_cr_available_or_joining); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); +/* called by user backend (xact.c) */ +extern void CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN); + +/* called by wal sender */ +extern void CausalReadsBeginStall(void); + /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; extern struct WalSnd *SyncRepGetSynchronousStandby(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 61255a9..507af9f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -79,6 +79,13 @@ typedef struct TimeLineID receivedTLI; /* + * causallyReadsUntil is the time until which the primary has authorized + * this standby to consider itself avialable for causal_reads mode, or 0 + * for not authorized. + */ + TimestampTz causalReadsUntil; + + /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of * receivedUpto before the last flush to disk. Startup process can use @@ -160,5 +167,8 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); +extern void WalRcvWakeup(void); + +extern bool WalRcvCausalReadsAvailable(void); #endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 6dae480..deeb277 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -27,6 +27,13 @@ typedef enum WalSndState WALSNDSTATE_STREAMING } WalSndState; +typedef enum WalSndCausalReadsState +{ + WALSNDCRSTATE_UNAVAILABLE = 0, + WALSNDCRSTATE_JOINING, + WALSNDCRSTATE_AVAILABLE +} WalSndCausalReadsState; + /* * Each walsender has a WalSnd struct in shared memory. */ @@ -34,6 +41,7 @@ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ + WalSndCausalReadsState causal_reads_state; /* the walsender's causal reads state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be * reloaded? */ @@ -46,6 +54,7 @@ typedef struct WalSnd XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int applyLagMs; /* Protects shared variables shown above. */ slock_t mutex; @@ -88,6 +97,14 @@ typedef struct */ bool sync_standbys_defined; + /* + * Until when must commits in causal_reads stall? This is set to a time + * in the future whenever a standby is dropped from the set of consistent + * standbys, to give standbys time to know that they are not able to + * provide causal consistency guarantees. + */ + TimestampTz stall_causal_reads_until; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 530fef1..0f4b166 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -227,9 +227,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, #ifndef HAVE_INT64_TIMESTAMP extern int64 GetCurrentIntegerTimestamp(void); extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp); +extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp); #else #define GetCurrentIntegerTimestamp() GetCurrentTimestamp() #define IntegerTimestampToTimestampTz(timestamp) (timestamp) +#define TimestampTzToIntegerTimestamp(timestamp) (timestamp) #endif extern TimestampTz time_t_to_timestamptz(pg_time_t tm);