diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 6ab65a7..ed6f07c 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2749,6 +2749,35 @@ include_dir 'conf.d' across the cluster without problems if that is required. + + All Servers + + These parameters can be set on the primary or any standby. + + + + causal_reads (boolean) + + causal_reads configuration parameter + + + + + Enables causal consistency between transactions run on different + servers. A transaction that is run on a standby + with causal_reads set to on is guaranteed + either to see the effects of all completed transactions run on the + primary with the setting on, or to receive an error "standby is not + available for causal reads". Note that both transactions involved in + a causal dependency (a write on the primary followed by a read on any + server which must see the write) must be run with the setting on. + See for more details. + + + + + + Sending Server(s) @@ -2980,6 +3009,48 @@ include_dir 'conf.d' + + causal_reads_timeout (integer) + + causal_reads_timeout configuration parameter + + + + + Specifies the maximum replay lag the primary will tolerate from a + standby before dropping it from the set of standbys available for + causal reads. + + + This setting is also used to control the leases used to + maintain the causal reads guarantee. It must be set to a value which + is at least 4 times the maximum possible difference in system clocks + between the primary and standby servers, as described + in . + + + + + + causal_reads_standby_names (string) + + causal_reads_standby_names configuration parameter + + + + + Specifies a comma-separated list of standby names that can support + causal reads, as described in + . Follows the same convention + as synchronous_standby_name. + The default is *, matching all standbys. + + + This setting has no effect if causal_reads_timeout is not set. + + + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 03c6c30..7a0910d 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1115,6 +1115,9 @@ primary_slot_name = 'node_a_slot' that it has replayed the transaction, making it visible to user queries. In simple cases, this allows for load balancing with causal consistency on a single hot standby. + (See also + which deals with multiple standbys and + standby failure.) @@ -1233,6 +1236,119 @@ primary_slot_name = 'node_a_slot' + + Causal reads + + causal reads + in standby + + + + The causal reads feature allows read-only queries to run on hot standby + servers without exposing stale data to the client, providing a form of + causal consistency. Transactions can run on any standby with the + following guarantee about the visibility of preceding transactions: If you + set causal_reads to on in any pair of consecutive + transactions tx1, tx2 where tx2 begins after tx1 successfully returns, + then tx2 will either see tx1 or fail with a new error "standby is not + available for causal reads", no matter which server it runs on. Although + the guarantee is expressed in terms of two individual transactions, the + GUC can also be set at session, role or system level to make the guarantee + generally, allowing for load balancing of applications that were not + designed with load balancing in mind. + + + + In order to enable the feature, causal_reads_timeout must be + set to a non-zero value on the primary server. The + GUC causal_reads_standby_names can be used to limit the set of + standbys that can join the dynamic set of causal reads standbys by + providing a comma-separated list of application names. By default, all + standbys are candidates, if the feature is enabled. + + + + The current set of servers that the primary considers to be available for + causal reads can be seen in + the pg_stat_replication + view. Administrators, applications and load balancing middleware can use + this view to discover standbys that can currently handle causal reads + transactions without raising the error. Since that information is only an + instantantaneous snapshot, clients should still be prepared for the error + to be raised at any time, and consider redirecting transactions to another + standby. + + + + The advantages of the causal reads feature over simply + setting synchronous_commit to remote_apply are: + + + + It allows the primary to wait for multiple standbys to replay + transactions. + + + + + It places a configurable limit on how much replay lag (and therefore + delay at commit time) the primary tolerates from standbys before it + drops them from the dynamic set of standbys it waits for. + + + + + It upholds the causal reads guarantee during the transitions that + occur when new standbys are added or removed from the set of standbys, + including scenarios where contact has been lost between the primary + and standbys but the standby is still alive and running client + queries. + + + + + + + The protocol used to uphold the guarantee even in the case of network + failure depends on the system clocks of the primary and standby servers + being synchronized, with an allowance for a difference up to one quarter + of causal_reads_timeout. For example, + if causal_reads_timeout is set to 4s, then the + clocks must not be further than 1 second apart for the guarantee to be + upheld reliably during transitions. The ubiquity of the Network Time + Protocol (NTP) on modern operating systems and availability of high + quality time servers makes it possible to choose a tolerance significantly + higher than the maximum expected clock difference. An effort is + nevertheless made to detect and report misconfigured and faulty systems + with clock differences greater than the configured tolerance. + + + + + Current hardware clocks, NTP implementations and public time servers are + unlikely to allow the system clocks to differ more than tens or hundreds + of milliseconds, and systems synchronized with dedicated local time + servers may be considerably more accurate, but you should only consider + setting causal_reads_timeout below 4 seconds (allowing up to + 1 second of clock difference) after researching your time synchronization + infrastructure thoroughly. + + + + + + While similar to synchronous replication in the sense that both involve + the primary server waiting for responses from standby servers, the + causal reads feature is not concerned with avoiding data loss. A + primary configured for causal reads will drop all standbys that stop + responding or replay too slowly from the dynamic set that it waits for, + so you should consider configuring both synchronous replication and + causal reads if you need data loss avoidance guarantees and causal + consistency guarantees for load balancing. + + + + Continuous archiving in standby @@ -1581,7 +1697,16 @@ if (!triggered) so there will be a measurable delay between primary and standby. Running the same query nearly simultaneously on both primary and standby might therefore return differing results. We say that data on the standby is - eventually consistent with the primary. Once the + eventually consistent with the primary by default. + The data visible to a transaction running on a standby can be + made causally consistent with respect to a transaction that + has completed on the primary by setting causal_reads + to on in both transactions. For more details, + see . + + + + Once the commit record for a transaction is replayed on the standby, the changes made by that transaction will be visible to any new snapshots taken on the standby. Snapshots may be taken at the start of each query or at the diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 7d63782..23d68d5 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1224,6 +1224,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i text Synchronous state of this standby server + + causal_reads_state + text + Causal reads state of this standby server. This field will be + non-null only if cause_reads_timeout is set. If a standby is + in available state, then it can currently serve causal reads + queries. If it is not replaying fast enough or not responding to + keepalive messages, it will be in unavailable state, and if + it is currently transitioning to availability it will be + in joining state for a short time. + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 893c2fa..111198a 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2098,11 +2098,12 @@ RecordTransactionCommitPrepared(TransactionId xid, END_CRIT_SECTION(); /* - * Wait for synchronous replication, if required. + * Wait for causal reads and synchronous replication, if required. * * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ + CausalReadsWaitForLSN(recptr); SyncRepWaitForLSN(recptr, true); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b0464e7..5ad3646 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1324,7 +1324,10 @@ RecordTransactionCommit(void) * in the procarray and continue to hold locks. */ if (wrote_xlog && markXidCommitted) + { + CausalReadsWaitForLSN(XactLastRecEnd); SyncRepWaitForLSN(XactLastRecEnd, true); + } /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -5126,7 +5129,7 @@ XactLogCommitRecord(TimestampTz commit_time, * Check if the caller would like to ask standbys for immediate feedback * once this commit is applied. */ - if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY || causal_reads) xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; /* diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index a53f07b..276ac12 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -664,7 +664,8 @@ CREATE VIEW pg_stat_replication AS W.replay_location, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.causal_reads_state FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W WHERE S.usesysid = U.oid AND diff --git a/src/backend/replication/README.causal_reads b/src/backend/replication/README.causal_reads new file mode 100644 index 0000000..1fddd62 --- /dev/null +++ b/src/backend/replication/README.causal_reads @@ -0,0 +1,193 @@ +The causal reads guarantee says: If you run any two consecutive +transactions tx1, tx2 where tx1 completes before tx2 begins, with +causal_reads set to "on" in both transactions, tx2 will see tx1 or +raise an error to complain that it can't guarantee causal consistency, +no matter which servers (primary or any standby) you run each +transaction on. + +When both transactions run on the primary, the guarantee is trivially +upheld. + +To deal with read-only physical streaming standbys, the primary keeps +track of a set of standbys that it considers to be currently +"available" for causal reads, and sends a stream of "leases" to those +standbys granting them the right to handle causal reads transactions +for a short time without any further communication with the primary. + +In general, the primary provides the guarantee by waiting for all of +the "available" standbys to report that they have applied a +transaction. However, the set of available standbys is dynamic, and +things get more complicated during state transitions. There are two +types of transitions to consider: + +1. unavailable->joining->available + +Standbys start out as "unavailable". If a standby is unavailable and +is applying fast enough and matches causal_reads_standby_names, the +primary transitions it to "available", but first it sets it to +"joining" until it is sure that any transaction committed while it was +unavailable has definitely been applied on the standby. This closes a +race that would otherwise exist if we moved directly to available +state: tx1 might not wait for a given standby because it's +unavailable, then a lease might be granted, and then tx2 might run a +causal reads transaction without error but see stale data. The +joining state acts as an airlock: while in joining state, the primary +waits for that standby to replay causal reads transactions in +anticipation of the move to available, but it doesn't progress to +available state and grant a lease to the standby until everything +preceding joining state has also been applied. + +2. available->unavailable + +If a standby is not applying fast enough or not responding to +keepalive messages, then the primary kicks that standby out of the +dynamic set of available standbys, that is, marks it as "unavailable". +In order to make sure that the standby has started rejecting causal +reads transactions, it needs to revoke the lease it most recently +granted. It does that by waiting for the lease to expire before +allowing any causal reads commits to return. (In future there could +be a fast-path revocation message which waits for a serial-numbered +acknowledgement to reduce waiting in the case where the standby is +lagging but still reachable and responding). + +The rest of this document illustrates how clock skew affects the +available->unavailable transition. + +The following 4 variables are derived from a single GUC, and these +values will be used in the following illustrations: + +causal_reads_timeout = 4s +lease_time = 4s (= causal_reads_timeout) +keepalive_time = 2s (= lease_time / 2) +max_clock_skew = 1s (= lease_time / 4) + +Every keepalive_time, the primary transmits a lease that expires at +local_clock_time + lease_time - max_clock_skew, shown in the following +diagram as 't' for transmission time and '|' for expiry time. If +contact is lost with a standby, the primary will wait until sent_time ++ lease_time for the most recently granted lease to expire, shown on +the following diagram 'x', to be sure that the standby's clock has +reached the expiry time even if its clock differs by up to +max_clock_skew. In other words, the primary tells the standby that +the expiry time is at one time, but it trusts that the standby will +surely agree if it gives it some extra time. The extra time is +max_clock_skew. If the clocks differ by more than max_clock_skew, all +bets are off (but see below for attempt to detect obvious cases). + +0 1 2 3 4 5 6 7 8 9 +t-----------------|-----x + t-----------------|-----x + t-----------------|-----x + t-----------------|... + t------... + +A standby whose clock is 2 seconds ahead of the primary's clock +perceives gaps in the stream of leases, and will reject causal_reads +transactions in those intervals. The causal reads guarantee is +upheld, but spurious errors are raised between leases, as a +consequence of the clock skew being greater than max_clock_skew. In +the following diagram 'r' shows reception time, and the timeline along +the top shows the standby's local clock time. + +2 3 4 5 6 7 8 9 10 11 +r-----| + r-----| + r-----| + r-----| + r-----| + +If there were no network latency, a standby whose clock is exactly 1 +second ahead of the primary's clock would perceive the stream of +leases as being replaced just in time, so there is no gap. Since in +reality the time of receipt is some time after the time of +transmission due to network latency, if the standby's clock is exactly +1 second behind, then there will be small network-latency-sized gaps +before the next lease arrives, but still no correctness problem with +respect to the causal reads guarantee. + +1 2 3 4 5 6 7 8 9 10 +r-----------| + r-----------| + r-----------| + r-----------| + r------... + +A standby whose clock is perfectly in sync with the primary's +perceives the stream of leases overlapping (this matches the primary's +perception of the leases it sent): + +0 1 2 3 4 5 6 7 8 9 +r-----------------| + r-----------------| + r-----------------| + r-----------------| + r------... + +A standby whose clock is exactly 1 second behind the primary's +perceives the stream of leases as overlapping even more, but the time +of expiry as judged by the standby is no later than the time the +primary will wait for if required ('x'). That is, if contact is lost +with the standby, the primary can still reliably hold up causal reads +commits until the standby has started raising the error in +causal_reads transactions. + +-1 0 1 2 3 4 5 6 7 8 +r-----------------------| + r-----------------------| + r-----------------------| + r------------------... + r------... + + +A standby whose clock is 2 seconds behind the primary's would perceive +the stream of leases overlapping even more, and the primary would no +longer be able to wait for a lease to expire if it wanted to revoke +it. But because the expiry time is after local_clock_time + +lease_time, the standby can immediately see that its own clock must be +more than 1 second behind the primary's, so it ignores the lease and +logs a clock skew warning. In the following diagram a lease expiry +time that is obviously generated by a primary with a clock set too far +in the future compared to the local clock is shown with a '!'. + +-2 -1 0 1 2 3 4 5 6 7 +r-----------------------------! + r-----------------------------! + r-----------------------------! + r------------------... + r------... + +A danger window exists when the standby's clock is more than +max_clock_skew behind the primary's clock, but not more than +max_clock_skew + network latency time behind. If the clock difference +is in that range, then the algorithm presented above which is based on +time of receipt cannot detect that the local clock is too far behind. +The consequence of this problem could be as follows: + +1. The standby loses contact with the primary due to a network fault. + +2. The primary decides to drop the standby from the set of available + causal reads standbys due to lack of keepalive responses or + excessive lag, which necessitates holding up commits of causal + reads transactions until the most recently sent lease expires, in + the belief that the standby will definitely have started raising + the 'causal reads unavailable' error in causal reads transactions + by that time, if it is still alive and servicing requests. + +3. The standby still has clients connected and running queries. + +4. Due to clock skew in the problematic range, in the standby's + opinion the lease lasts slightly longer than the primary waits. + +5. For a short window at most the duration of the network latency + time, clients running causal reads transactions are allowed to see + potentially stale data. + +For this reason we say that the causal reads guarantee only holds as +long as the absolute difference between the system clocks of the +machines is no more than max_clock_skew. The theory is that NTP makes +it possible to reason about the maximum possible clock difference +between machines and choose a value that allows for a much larger +difference. However, we do make a best effort attempt to detect +wildly divergent systems as described above, to catch the case of +servers not running a correctly configured ntp daemon, or with a clock +so far out of whack that ntp refuses to fix it. \ No newline at end of file diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 376ddf4..8240d0d 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -57,6 +57,11 @@ #include "utils/builtins.h" #include "utils/ps_status.h" +/* GUC variables */ +int causal_reads_timeout; +bool causal_reads; +char *causal_reads_standby_names; + /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; @@ -69,7 +74,7 @@ static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); -static int SyncRepWakeQueue(bool all, int mode); +static int SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn); static int SyncRepGetStandbyPriority(void); @@ -142,6 +147,198 @@ SyncRepCheckEarlyExit(void) } /* + * Check if we can stop waiting for causal consistency. We can stop waiting + * when the following conditions are met: + * + * 1. All walsenders currently in 'joining' or 'available' state have + * applied the target LSN. + * + * 2. Any stall periods caused by standbys dropping out of 'available' state + * have passed, so that we can be sure that their leases have expired and they + * have started rejecting causal reads transactions. + * + * The output parameter 'waitingFor' is set to the number of nodes we are + * currently waiting for. The output parameters 'stallTimeMillis' is set to + * the number of milliseconds we need to wait for to observe any current + * commit stall. + * + * Returns true if commit can return control, because every standby has either + * applied the LSN or started rejecting causal_reads transactions. + */ +static bool +CausalReadsCommitCanReturn(XLogRecPtr XactCommitLSN, + int *waitingFor, + long *stallTimeMillis) +{ + int i; + TimestampTz now; + + /* Count how many joining/available nodes we are waiting for. */ + *waitingFor = 0; + for (i = 0; i < max_wal_senders; ++i) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* + * Assuming atomic read of pid_t, we can check walsnd->pid without + * acquiring the spinlock to avoid memory synchronization costs for + * unused walsender slots. We see a value that existed sometime at + * least as recently as the last memory barrier. + */ + if (walsnd->pid != 0) + { + /* + * We need to hold the spinlock to read LSNs, because we can't be + * sure they can be read atomically. + */ + SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid != 0 && walsnd->causal_reads_state >= WALSNDCRSTATE_JOINING) + { + if (walsnd->apply < XactCommitLSN) + ++*waitingFor; + } + SpinLockRelease(&walsnd->mutex); + } + } + + /* Check if there is a stall in progress that we need to observe. */ + now = GetCurrentTimestamp(); + LWLockAcquire(SyncRepLock, LW_SHARED); + if (WalSndCtl->stall_causal_reads_until > now) + { + long seconds; + int usecs; + + /* Compute how long we have to wait, rounded up to nearest ms. */ + TimestampDifference(now, WalSndCtl->stall_causal_reads_until, + &seconds, &usecs); + *stallTimeMillis = seconds * 1000 + (usecs + 999) / 1000; + } + else + *stallTimeMillis = 0; + LWLockRelease(SyncRepLock); + + /* We are done if we are not waiting for any nodes or stalls. */ + return *waitingFor == 0 && *stallTimeMillis == 0; +} + +/* + * Wait for causal consistency in causal_reads mode, if requested by user. + */ +void +CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN) +{ + long stallTimeMillis; + int waitingFor; + char *ps_display_buffer = NULL; + + /* Leave if we aren't in causal_reads mode. */ + if (!causal_reads) + return; + + for (;;) + { + /* Reset latch before checking state. */ + ResetLatch(MyLatch); + + /* + * Join the queue to be woken up if any causal reads joining/available + * standby applies XactCommitLSN or the set of causal reads standbys + * changes (if we aren't already in the queue). We don't actually know + * if we need to wait for any peers to reach the target LSN yet, but + * we have to register just in case before checking the walsenders' + * state to avoid a race condition that could occur if we did it after + * calling CausalReadsCommitCanReturn. (SyncRepWaitForLSN doesn't + * have to do this because it can check the highest-seen LSN in + * walsndctl->lsn[mode] which is protected by SyncRepLock, the same + * lock as the queues. We can't do that here, because there is no + * single highest-seen LSN that is useful. We must check + * walsnd->apply for all relevant walsenders. Therefore we must + * register for notifications first, so that we can be notified via + * our latch of any standby applying the LSN we're interested in after + * we check but before we start waiting, or we could wait forever for + * something that has already happened.) + */ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + if (MyProc->syncRepState != SYNC_REP_WAITING) + { + MyProc->waitLSN = XactCommitLSN; + MyProc->syncRepState = SYNC_REP_WAITING; + SyncRepQueueInsert(SYNC_REP_WAIT_CAUSAL_READS); + Assert(SyncRepQueueIsOrderedByLSN(SYNC_REP_WAIT_CAUSAL_READS)); + } + LWLockRelease(SyncRepLock); + + /* Check if we're done. */ + if (CausalReadsCommitCanReturn(XactCommitLSN, &waitingFor, &stallTimeMillis)) + { + SyncRepCancelWait(); + break; + } + + Assert(waitingFor > 0 || stallTimeMillis > 0); + + /* If we aren't actually waiting for any standbys, leave the queue. */ + if (waitingFor == 0) + SyncRepCancelWait(); + + /* Update the ps title. */ + if (update_process_title) + { + char buffer[80]; + + /* Remember the old value if this is our first update. */ + if (ps_display_buffer == NULL) + { + int len; + const char *ps_display = get_ps_display(&len); + + ps_display_buffer = palloc(len + 1); + memcpy(ps_display_buffer, ps_display, len); + ps_display_buffer[len] = '\0'; + } + + snprintf(buffer, sizeof(buffer), + "waiting for %d peer(s) to apply %X/%X%s", + waitingFor, + (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN, + stallTimeMillis > 0 ? " (stalling)" : ""); + set_ps_display(buffer, false); + } + + /* Check if we need to exit early due to postmaster death etc. */ + if (SyncRepCheckEarlyExit()) /* Calls SyncRepCancelWait() if true. */ + break; + + /* + * If are still waiting for peers, then we wait for any joining or + * available peer to reach the LSN (or possibly stop being in one of + * those states or go away). + * + * If not, there must be a non-zero stall time, so we wait for that to + * elapse. + */ + if (waitingFor > 0) + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1); + else + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, + stallTimeMillis); + } + + /* There is no way out of the loop that could leave us in the queue. */ + Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + MyProc->waitLSN = 0; + + /* Restore the ps display. */ + if (ps_display_buffer != NULL) + { + set_ps_display(ps_display_buffer, false); + pfree(ps_display_buffer); + } +} + +/* * Wait for synchronous replication, if requested by user. * * Initially backends start in state SYNC_REP_NOT_WAITING and then @@ -425,6 +622,53 @@ SyncRepGetSynchronousStandby(void) } /* + * Check if the current WALSender process's application_name matches a name in + * causal_reads_standby_names (including '*' for wildcard). + */ +bool +CausalReadsPotentialStandby(void) +{ + char *rawstring; + List *elemlist; + ListCell *l; + bool found = false; + + /* If the feature is disable, then no. */ + if (causal_reads_timeout == 0) + return false; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(causal_reads_standby_names); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist); + /* GUC machinery will have already complained - no need to do again */ + return 0; + } + + foreach(l, elemlist) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, application_name) == 0 || + pg_strcasecmp(standby_name, "*") == 0) + { + found = true; + break; + } + } + + pfree(rawstring); + list_free(elemlist); + + return found; +} + +/* * Update the LSNs on each queue based upon our latest state. This * implements a simple policy of first-valid-standby-releases-waiter. * @@ -432,23 +676,27 @@ SyncRepGetSynchronousStandby(void) * perhaps also which information we store as well. */ void -SyncRepReleaseWaiters(void) +SyncRepReleaseWaiters(bool walsender_cr_available_or_joining) { volatile WalSndCtlData *walsndctl = WalSndCtl; WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; int numapply = 0; + int numcausalreadsapply = 0; + bool is_highest_priority_sync_standby; /* * If this WALSender is serving a standby that is not on the list of - * potential sync standbys then we have nothing to do. If we are still - * starting up, still running base backup or the current flush position - * is still invalid, then leave quickly also. + * potential sync standbys and not in a state that causal_reads waits for, + * then we have nothing to do. If we are still starting up, still running + * base backup or the current flush position is still invalid, then leave + * quickly also. */ - if (MyWalSnd->sync_standby_priority == 0 || - MyWalSnd->state < WALSNDSTATE_STREAMING || - XLogRecPtrIsInvalid(MyWalSnd->flush)) + if (!walsender_cr_available_or_joining && + (MyWalSnd->sync_standby_priority == 0 || + MyWalSnd->state < WALSNDSTATE_STREAMING || + XLogRecPtrIsInvalid(MyWalSnd->flush))) return; /* @@ -458,13 +706,19 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); syncWalSnd = SyncRepGetSynchronousStandby(); - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + /* + * If we aren't managing the highest priority standby then make a note of + * that so we can announce a takeover in the log if we ever get that job. + */ + is_highest_priority_sync_standby = syncWalSnd == MyWalSnd; + if (!is_highest_priority_sync_standby) + announce_next_takeover = true; /* - * If we aren't managing the highest priority standby then just leave. + * If we aren't managing the highest priority standby or a standby in + * causal reads 'joining' or 'available' state, then just leave. */ - if (syncWalSnd != MyWalSnd) + if (!is_highest_priority_sync_standby && !walsender_cr_available_or_joining) { LWLockRelease(SyncRepLock); announce_next_takeover = true; @@ -473,24 +727,45 @@ SyncRepReleaseWaiters(void) /* * Set the lsn first so that when we wake backends they will release up to - * this location. + * this location. For the single-standby synchronous commit levels, we + * only do this if we are the current synchronous standby and we are + * advancing the LSN further than it has been advanced before, so that + * SyncRepWaitForLSN can skip waiting in some cases. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) - { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; - numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); - } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) - { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; - numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); - } - if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + if (is_highest_priority_sync_standby) { - walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; - numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + { + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, + MyWalSnd->write); + } + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + { + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, + MyWalSnd->flush); + } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, + MyWalSnd->apply); + } } + /* + * For causal_reads, all walsenders currently in available or joining + * state must reach the LSN on their own, and standbys will reach LSNs in + * any order. It doesn't make sense to keep the highest seen LSN in a + * single walsndctl->lsn element. (CausalReadsWaitForLSN has handling for + * LSNs that have already been reached). + */ + if (walsender_cr_available_or_joining) + numcausalreadsapply = + SyncRepWakeQueue(false, SYNC_REP_WAIT_CAUSAL_READS, + MyWalSnd->apply); + LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", @@ -502,7 +777,7 @@ SyncRepReleaseWaiters(void) * If we are managing the highest priority standby, though we weren't * prior to this, then announce we are now the sync standby. */ - if (announce_next_takeover) + if (is_highest_priority_sync_standby && announce_next_takeover) { announce_next_takeover = false; ereport(LOG, @@ -577,9 +852,8 @@ SyncRepGetStandbyPriority(void) * Must hold SyncRepLock. */ static int -SyncRepWakeQueue(bool all, int mode) +SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn) { - volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; @@ -596,7 +870,7 @@ SyncRepWakeQueue(bool all, int mode) /* * Assume the queue is ordered by LSN */ - if (!all && walsndctl->lsn[mode] < proc->waitLSN) + if (!all && lsn < proc->waitLSN) return numprocs; /* @@ -656,7 +930,7 @@ SyncRepUpdateSyncStandbysDefined(void) int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + SyncRepWakeQueue(true, i, InvalidXLogRecPtr); } /* @@ -708,13 +982,31 @@ SyncRepQueueIsOrderedByLSN(int mode) #endif /* + * Make sure that CausalReadsWaitForLSN can't return until after the given + * lease expiry time has been reached. In other words, revoke the lease. + * + * Wake up all backends waiting in CausalReadsWaitForLSN, because the set of + * available/joining peers has changed, and there is a new stall time they + * need to observe. + */ +void +CausalReadsBeginStall(TimestampTz lease_expiry_time) +{ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + WalSndCtl->stall_causal_reads_until = + Max(WalSndCtl->stall_causal_reads_until, lease_expiry_time); + SyncRepWakeQueue(true, SYNC_REP_WAIT_CAUSAL_READS, InvalidXLogRecPtr); + LWLockRelease(SyncRepLock); +} + +/* * =========================================================== * Synchronous Replication functions executed by any process * =========================================================== */ bool -check_synchronous_standby_names(char **newval, void **extra, GucSource source) +check_standby_names(char **newval, void **extra, GucSource source) { char *rawstring; List *elemlist; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index fd7aecb..22587a2 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -55,6 +55,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" @@ -149,7 +150,8 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(bool force, bool requestReply, bool includeApplyTimestamp); static void XLogWalRcvSendHSFeedback(bool immed); -static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *causalReadsUntil); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -831,6 +833,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) XLogRecPtr walEnd; TimestampTz sendTime; bool replyRequested; + TimestampTz causalReadsLease; resetStringInfo(&incoming_message); @@ -851,7 +854,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( pq_getmsgint64(&incoming_message)); - ProcessWalSndrMessage(walEnd, sendTime); + ProcessWalSndrMessage(walEnd, sendTime, NULL); buf += hdrlen; len -= hdrlen; @@ -863,7 +866,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) bool reportApplyTimestamp = false; /* copy message to StringInfo */ - hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(int64); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -875,8 +878,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) sendTime = IntegerTimestampToTimestampTz( pq_getmsgint64(&incoming_message)); replyRequested = pq_getmsgbyte(&incoming_message); + causalReadsLease = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); - ProcessWalSndrMessage(walEnd, sendTime); + ProcessWalSndrMessage(walEnd, sendTime, &causalReadsLease); /* * If no apply timestamps have been sent at the request of the @@ -1225,15 +1230,52 @@ XLogWalRcvSendHSFeedback(bool immed) * Update shared memory status upon receiving a message from primary. * * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest - * message, reported by primary. + * message, reported by primary. 'causalReadsLease' is a pointer to + * the time the primary promises that this standby can safely claim to be + * causally consistent, to 0 if it cannot, or a NULL pointer for no change. */ static void -ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) +ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *causalReadsLease) { WalRcvData *walrcv = WalRcv; TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); static TimestampTz lastRecordedTimestamp = 0; + /* Sanity check for the causalReadsLease time. */ + if (causalReadsLease != NULL && *causalReadsLease != 0) + { + /* Deduce max_clock_skew from the causalReadsLease and sendTime. */ +#ifdef HAVE_INT64_TIMESTAMP + int64 diffMillis = (*causalReadsLease - sendTime) / 1000; +#else + int64 diffMillis = (*causalReadsLease - sendTime) * 1000; +#endif + int64 max_clock_skew = diffMillis / (CAUSAL_READS_CLOCK_SKEW_RATIO - 1); + + if (sendTime > TimestampTzPlusMilliseconds(lastMsgReceiptTime, max_clock_skew)) + { + /* + * The primary's clock is more than max_clock_skew + network + * latency ahead of the standby's clock. (If the primary's clock + * is more than max_clock_skew ahead of the standby's clock, but + * by less than the network latency, then there isn't much we can + * do to detect that; but it still seems useful to have this basic + * sanity check for wildly misconfigured servers.) + */ + elog(LOG, "the primary server's clock time is too far ahead"); + causalReadsLease = NULL; + } + /* + * We could also try to detect cases where sendTime is more than + * max_clock_skew in the past according to the standby's clock, but + * that is indistinguishable from network latency/buffering, so we + * could produce misleading error messages; if we do nothing, the + * consequence is 'standby is not available for causal reads' errors + * which should cause the user to investigate. + */ + } + /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); if (walrcv->latestWalEnd < walEnd) @@ -1241,6 +1283,8 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) walrcv->latestWalEnd = walEnd; walrcv->lastMsgSendTime = sendTime; walrcv->lastMsgReceiptTime = lastMsgReceiptTime; + if (causalReadsLease != NULL) + walrcv->causalReadsLease = *causalReadsLease; SpinLockRelease(&walrcv->mutex); /* diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 5f6e423..e502f74 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -28,6 +28,7 @@ #include "replication/walreceiver.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/guc.h" #include "utils/timestamp.h" WalRcvData *WalRcv = NULL; @@ -374,3 +375,21 @@ GetReplicationTransferLatency(void) return ms; } + +/* + * Used by snapmgr to check if this standby has a valid lease, granting it the + * right to consider itself available for causal reads. + */ +bool +WalRcvCausalReadsAvailable(void) +{ + WalRcvData *walrcv = WalRcv; + TimestampTz now = GetCurrentTimestamp(); + bool result; + + SpinLockAcquire(&walrcv->mutex); + result = walrcv->causalReadsLease != 0 && now <= walrcv->causalReadsLease; + SpinLockRelease(&walrcv->mutex); + + return result; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 16d7abc..b4dad72 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -154,9 +154,20 @@ static StringInfoData tmpbuf; */ static TimestampTz last_reply_timestamp = 0; +static TimestampTz last_keepalive_timestamp = 0; + /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; +/* How long do need to stay in JOINING state? */ +static XLogRecPtr causal_reads_joining_until = 0; + +/* The last causal reads lease sent to the standby. */ +static TimestampTz causal_reads_last_lease = 0; + +/* Is this WALSender listed in causal_reads_standby_names? */ +static bool am_potential_causal_reads_standby = false; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -242,6 +253,57 @@ InitWalSender(void) SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); } + /* + * If we are exiting unexpectedly, we may need to communicate with concurrent + * causal_reads commits to maintain the causal consistency guarantee. + */ +static void +PrepareUncleanExit(void) +{ + if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + /* + * We've lost contact with the standby, but it may still be alive. We + * can't let any causal_reads transactions return until we've stalled + * for long enough for a zombie standby to start raising errors + * because its lease has expired. + */ + elog(LOG, "standby \"%s\" is lost (no longer available for causal reads)", application_name); + CausalReadsBeginStall(causal_reads_last_lease); + + /* + * We set the state to a lower level _after_ beginning the stall, + * otherwise there would be a tiny window where commits could return + * without observing the stall. + */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* + * We are shutting down because we received a goodbye message from the + * walreceiver. + */ +static void +PrepareCleanExit(void) +{ + if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + /* + * The standby is shutting down, so it won't be running any more + * transactions. It is therefore safe to stop waiting for it, and no + * stall is necessary. + */ + elog(LOG, "standby \"%s\" is leaving (no longer available for causal reads)", application_name); + + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + /* * Clean up after an error. * @@ -266,7 +328,10 @@ WalSndErrorCleanup(void) replication_active = false; if (walsender_ready_to_stop) + { + PrepareUncleanExit(); proc_exit(0); + } /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); @@ -278,6 +343,8 @@ WalSndErrorCleanup(void) static void WalSndShutdown(void) { + PrepareUncleanExit(); + /* * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. @@ -1388,6 +1455,7 @@ ProcessRepliesIfAny(void) if (r < 0) { /* unexpected error or EOF */ + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1404,6 +1472,7 @@ ProcessRepliesIfAny(void) resetStringInfo(&reply_message); if (pq_getmessage(&reply_message, 0)) { + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1453,6 +1522,7 @@ ProcessRepliesIfAny(void) * 'X' means that the standby is closing down the socket. */ case 'X': + PrepareCleanExit(); proc_exit(0); default: @@ -1584,6 +1654,83 @@ ProcessStandbyReplyMessage(void) */ { WalSnd *walsnd = MyWalSnd; + WalSndCausalReadsState causal_reads_state = walsnd->causal_reads_state; + bool causal_reads_state_changed = false; + bool causal_reads_set_joining_until = false; + + /* + * Handle causal reads state transitions, if a causal_reads_timeout is + * configured, this standby is listed in causal_reads_standby_names, + * and we are a primary database (not a cascading standby). + */ + if (am_potential_causal_reads_standby && + !am_cascading_walsender && + applyLagUs >= 0) + { + if (applyLagUs / 1000 < causal_reads_timeout) + { + if (causal_reads_state == WALSNDCRSTATE_UNAVAILABLE) + { + /* + * The standby is applying fast enough. We can't grant a + * lease yet though, we need to wait for everything that + * was committed while this standby was unavailable to be + * applied first. We move to joining state while we wait + * for the standby to catch up. + */ + causal_reads_state = WALSNDCRSTATE_JOINING; + causal_reads_set_joining_until = true; + causal_reads_state_changed = true; + } + else if (causal_reads_state == WALSNDCRSTATE_JOINING && + applyPtr >= causal_reads_joining_until) + { + /* + * The standby has applied everything committed before we + * reached joining state, and has been waiting for remote + * apply on this standby while it's been in joining state, + * so it is safe to move to available state and send a + * lease. + */ + causal_reads_state = WALSNDCRSTATE_AVAILABLE; + causal_reads_state_changed = true; + } + } + else + { + if (causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + causal_reads_state_changed = true; + /* + * We are dropping a causal reads available standby, so we + * mustn't let any commit command that is waiting in + * CausalReadsWaitForLSN return until we are sure that the + * standby definitely knows that it's not available and + * starts raising errors for causal_reads transactions. + * TODO: We could just wait until the standby acks that + * its lease has been cancelled, and start numbering + * keepalives and sending the number back in replies, so + * we know it's acking the right message; then lagging + * standbys would be less disruptive, but for now we just + * wait for the lease to expire, as we do when we lose + * contact with a standby, for the sake of simplicity. + */ + CausalReadsBeginStall(causal_reads_last_lease); + } + else if (causal_reads_state == WALSNDCRSTATE_JOINING) + { + /* + * Dropping a joining standby doesn't require a stall, + * because the standby doesn't think it's available, so + * it's already raising the error for causal_reads + * transactions. + */ + causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + causal_reads_state_changed = true; + } + } + } SpinLockAcquire(&walsnd->mutex); walsnd->write = writePtr; @@ -1591,11 +1738,33 @@ ProcessStandbyReplyMessage(void) walsnd->apply = applyPtr; if (applyLagUs >= 0) walsnd->applyLagUs = applyLagUs; + walsnd->causal_reads_state = causal_reads_state; SpinLockRelease(&walsnd->mutex); + + if (causal_reads_set_joining_until) + { + /* + * Record the end of the primary's WAL at some arbitrary point + * observed _after_ we moved to joining state (so that causal + * reads commits start waiting, closing a race). The standby + * won't become available until it has replayed up to here. + */ + causal_reads_joining_until = GetFlushRecPtr(); + } + + if (causal_reads_state_changed) + { + WalSndKeepalive(true); + elog(LOG, "standby \"%s\" is %s", application_name, + causal_reads_state == WALSNDCRSTATE_UNAVAILABLE ? "unavailable for causal reads" : + causal_reads_state == WALSNDCRSTATE_JOINING ? "joining as a causal reads standby..." : + causal_reads_state == WALSNDCRSTATE_AVAILABLE ? "available for causal reads" : + "UNKNOWN"); + } } if (!am_cascading_walsender) - SyncRepReleaseWaiters(); + SyncRepReleaseWaiters(MyWalSnd->causal_reads_state >= WALSNDCRSTATE_JOINING); /* * Advance our local xmin horizon when the client confirmed a flush. @@ -1736,33 +1905,53 @@ ProcessStandbyHSFeedbackMessage(void) * If wal_sender_timeout is enabled we want to wake up in time to send * keepalives and to abort the connection if wal_sender_timeout has been * reached. + * + * But if causal_reads_timeout is enabled, we override that and send + * keepalives at a constant rate to replace expiring leases. */ static long WalSndComputeSleeptime(TimestampTz now) { long sleeptime = 10000; /* 10 s */ - if (wal_sender_timeout > 0 && last_reply_timestamp > 0) + if ((wal_sender_timeout > 0 && last_reply_timestamp > 0) || + am_potential_causal_reads_standby) { TimestampTz wakeup_time; long sec_to_timeout; int microsec_to_timeout; - /* - * At the latest stop sleeping once wal_sender_timeout has been - * reached. - */ - wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); - - /* - * If no ping has been sent yet, wakeup when it's time to do so. - * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of - * the timeout passed without a response. - */ - if (!waiting_for_ping_response) + if (am_potential_causal_reads_standby) + { + /* + * Leases last for a period of between 50% and 100% of + * causal_reads_timeout, depending on clock skew, assuming clock + * skew is under the 25% of causal_reads_timeout. We send new + * leases every half a lease, so that there are no gaps between + * leases. + */ + wakeup_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + causal_reads_timeout / + CAUSAL_READS_KEEPALIVE_RATIO); + } + else + { + /* + * At the latest stop sleeping once wal_sender_timeout has been + * reached. + */ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + wal_sender_timeout); + + /* + * If no ping has been sent yet, wakeup when it's time to do so. + * WalSndKeepaliveIfNecessary() wants to send a keepalive once + * half of the timeout passed without a response. + */ + if (!waiting_for_ping_response) + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + } /* Compute relative time until wakeup. */ TimestampDifference(now, wakeup_time, @@ -1778,20 +1967,33 @@ WalSndComputeSleeptime(TimestampTz now) /* * Check whether there have been responses by the client within * wal_sender_timeout and shutdown if not. + * + * If causal_reads_timeout is configured we override that, so that + * unresponsive standbys are detected sooner. */ static void WalSndCheckTimeOut(TimestampTz now) { TimestampTz timeout; + int allowed_time; /* don't bail out if we're doing something that doesn't require timeouts */ if (last_reply_timestamp <= 0) return; - timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); + /* + * If a causal_reads_timeout is configured, it is used instead of + * wal_sender_timeout, to limit the time before an unresponsive causal + * reads standby is dropped. + */ + if (am_potential_causal_reads_standby) + allowed_time = causal_reads_timeout; + else + allowed_time = wal_sender_timeout; - if (wal_sender_timeout > 0 && now >= timeout) + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, + allowed_time); + if (allowed_time > 0 && now >= timeout) { /* * Since typically expiration of replication timeout means @@ -1824,6 +2026,9 @@ WalSndLoop(WalSndSendDataCallback send_data) last_reply_timestamp = GetCurrentTimestamp(); waiting_for_ping_response = false; + /* Check if we are managing potential causal_reads standby. */ + am_potential_causal_reads_standby = CausalReadsPotentialStandby(); + /* * Loop until we reach the end of this timeline or the client requests to * stop streaming. @@ -1984,6 +2189,7 @@ InitWalSenderSlot(void) walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->state = WALSNDSTATE_STARTUP; + walsnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ @@ -2753,6 +2959,24 @@ WalSndGetStateString(WalSndState state) return "UNKNOWN"; } +/* + * Return a string constant representing the causal reads state. This is used + * in system views, and should *not* be translated. + */ +static const char * +WalSndGetCausalReadsStateString(WalSndCausalReadsState causal_reads_state) +{ + switch (causal_reads_state) + { + case WALSNDCRSTATE_UNAVAILABLE: + return "unavailable"; + case WALSNDCRSTATE_JOINING: + return "joining"; + case WALSNDCRSTATE_AVAILABLE: + return "available"; + } + return "UNKNOWN"; +} /* * Returns activity of walsenders, including pids and xlog locations sent to @@ -2761,7 +2985,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 9 +#define PG_STAT_GET_WAL_SENDERS_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2812,6 +3036,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 applyLagUs; int priority; WalSndState state; + WalSndCausalReadsState causalReadsState; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2821,6 +3046,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockAcquire(&walsnd->mutex); sentPtr = walsnd->sentPtr; state = walsnd->state; + causalReadsState = walsnd->causal_reads_state; write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; @@ -2895,6 +3121,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) values[8] = CStringGetTextDatum("sync"); else values[8] = CStringGetTextDatum("potential"); + + values[9] = + CStringGetTextDatum(WalSndGetCausalReadsStateString(causalReadsState)); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -2914,14 +3143,52 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) static void WalSndKeepalive(bool requestReply) { + TimestampTz now; + TimestampTz causal_reads_lease; + elog(DEBUG2, "sending replication keepalive"); + /* + * If the walsender currently deems the standby to be available for causal + * reads, then it grants a causal reads lease. The lease authorizes the + * standby to consider itself available for causal reads until a short + * time in the future. The primary promises to uphold the causal reads + * guarantee until that time, by stalling commits until the the lease has + * expired if necessary. + */ + now = GetCurrentTimestamp(); + if (MyWalSnd->causal_reads_state < WALSNDCRSTATE_AVAILABLE) + causal_reads_lease = 0; /* Not available, no lease granted. */ + else + { + /* + * Since this timestamp is being sent to the standby where it will be + * compared against a time generated by the standby's system clock, we + * must consider clock skew. First, we decide on a maximum tolerable + * difference between system clocks. If the primary's clock is ahead + * of the standby's by more than this, then all bets are off (the + * standby could falsely believe it has a valid lease). If the + * primary's clock is behind the standby's by more than this, then the + * standby will err the other way and generate spurious errors in + * causal_reads mode. Rather than having a separate GUC for this, we + * derive it from causal_reads_timeout. + */ + int max_clock_skew = causal_reads_timeout / CAUSAL_READS_CLOCK_SKEW_RATIO; + + /* Compute and remember the expiry time of the lease we're granting. */ + causal_reads_last_lease = TimestampTzPlusMilliseconds(now, causal_reads_timeout); + /* The version we'll send to the standby is adjusted to tolerate clock skew. */ + causal_reads_lease = + TimestampTzPlusMilliseconds(causal_reads_last_lease, -max_clock_skew); + } + /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); pq_sendint64(&output_message, sentPtr); - pq_sendint64(&output_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(now)); pq_sendbyte(&output_message, requestReply ? 1 : 0); + pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(causal_reads_lease)); /* ... and send it wrapped in CopyData */ pq_putmessage_noblock('d', output_message.data, output_message.len); @@ -2939,23 +3206,35 @@ WalSndKeepaliveIfNecessary(TimestampTz now) * Don't send keepalive messages if timeouts are globally disabled or * we're doing something not partaking in timeouts. */ - if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) - return; - - if (waiting_for_ping_response) - return; + if (!am_potential_causal_reads_standby) + { + if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) + return; + if (waiting_for_ping_response) + return; + } /* * If half of wal_sender_timeout has lapsed without receiving any reply * from the standby, send a keep-alive message to the standby requesting * an immediate reply. + * + * If causal_reads_timeout has been configured, use it to control + * keepalive intervals rather than wal_sender_timeout, so that we can keep + * replacing leases at the right frequency. */ - ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + if (am_potential_causal_reads_standby) + ping_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + causal_reads_timeout / + CAUSAL_READS_KEEPALIVE_RATIO); + else + ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); if (now >= ping_time) { WalSndKeepalive(true); waiting_for_ping_response = true; + last_keepalive_timestamp = now; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt index 49494f9..d81c089 100644 --- a/src/backend/utils/errcodes.txt +++ b/src/backend/utils/errcodes.txt @@ -306,6 +306,7 @@ Section: Class 40 - Transaction Rollback 40001 E ERRCODE_T_R_SERIALIZATION_FAILURE serialization_failure 40003 E ERRCODE_T_R_STATEMENT_COMPLETION_UNKNOWN statement_completion_unknown 40P01 E ERRCODE_T_R_DEADLOCK_DETECTED deadlock_detected +40P02 E ERRCODE_T_R_CAUSAL_READS_NOT_AVAILABLE causal_reads_not_available Section: Class 42 - Syntax Error or Access Rule Violation diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 06cb166..ac422e7 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1634,6 +1634,16 @@ static struct config_bool ConfigureNamesBool[] = }, { + {"causal_reads", PGC_USERSET, REPLICATION_STANDBY, + gettext_noop("Enables causal reads."), + NULL + }, + &causal_reads, + false, + NULL, NULL, NULL + }, + + { {"syslog_sequence_numbers", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Add sequence number to syslog messages to avoid duplicate suppression."), NULL @@ -1811,6 +1821,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"causal_reads_timeout", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the maximum apply lag before causal reads standbys are no longer available."), + NULL, + GUC_UNIT_MS + }, + &causal_reads_timeout, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL @@ -3454,7 +3475,18 @@ static struct config_string ConfigureNamesString[] = }, &SyncRepStandbyNames, "", - check_synchronous_standby_names, NULL, NULL + check_standby_names, NULL, NULL + }, + + { + {"causal_reads_standby_names", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("List of names of potential causal reads standbys."), + NULL, + GUC_LIST_INPUT + }, + &causal_reads_standby_names, + "*", + check_standby_names, NULL, NULL }, { diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ec4427f..fcc2c35 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -244,6 +244,15 @@ # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#causal_reads_timeout = 0s # maximum replication delay to tolerate from + # standbys before dropping them from the set of + # available causal reads peers; 0 to disable + # causal reads + +#causal_reads_standy_names = '*' + # standby servers that can potentially become + # available for causal reads; '*' = all + # - Standby Servers - # These settings are ignored on a master server. @@ -266,6 +275,14 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt +# - All Servers - + +#causal_reads = off # "on" in any pair of consecutive + # transactions guarantees that the second + # can see the first (even if the second + # is run on a standby), or will raise an + # error to report that the standby is + # unavailable for causal reads #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index b88e012..6336240 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -46,8 +46,11 @@ #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" #include "lib/pairingheap.h" #include "miscadmin.h" +#include "replication/syncrep.h" +#include "replication/walreceiver.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -209,6 +212,16 @@ GetTransactionSnapshot(void) "cannot take query snapshot during a parallel operation"); /* + * In causal_reads mode on a standby, check if we have definitely + * applied WAL for any COMMIT that returned successfully on the + * primary. + */ + if (causal_reads && RecoveryInProgress() && !WalRcvCausalReadsAvailable()) + ereport(ERROR, + (errcode(ERRCODE_T_R_CAUSAL_READS_NOT_AVAILABLE), + errmsg("standby is not available for causal reads"))); + + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must * make a copy of it rather than returning CurrentSnapshotData diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 4054726..c0d7173 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2712,7 +2712,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ )); DESCR("statistics: information about progress of backends running maintenance command"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25}" "{o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25,25}" "{o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state,causal_reads_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c005a42..dbfd601 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -24,14 +24,33 @@ #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 #define SYNC_REP_WAIT_APPLY 2 +#define SYNC_REP_WAIT_CAUSAL_READS 3 -#define NUM_SYNC_REP_WAIT_MODE 3 +#define NUM_SYNC_REP_WAIT_MODE 4 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* + * ratio of causal_read_timeout to max_clock_skew (4 means than the maximum + * tolerated clock difference between primary and standbys using causal_reads + * is 1/4 of causal_reads_timeout) + */ +#define CAUSAL_READS_CLOCK_SKEW_RATIO 4 + +/* + * ratio of causal_reads_timeout to keepalive time (2 means that the effective + * keepalive time is 1/2 of the causal_reads_timeout GUC when it is non-zero) + */ +#define CAUSAL_READS_KEEPALIVE_RATIO 2 + +/* GUC variables */ +extern int causal_reads_timeout; +extern bool causal_reads; +extern char *causal_reads_standby_names; + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; @@ -43,16 +62,23 @@ extern void SyncRepCleanupAtProcExit(void); /* called by wal sender */ extern void SyncRepInitConfig(void); -extern void SyncRepReleaseWaiters(void); +extern void SyncRepReleaseWaiters(bool walsender_cr_available_or_joining); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); +/* called by user backend (xact.c) */ +extern void CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN); + +/* called by wal sender */ +extern void CausalReadsBeginStall(TimestampTz lease_expiry_time); +extern bool CausalReadsPotentialStandby(void); + /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; extern struct WalSnd *SyncRepGetSynchronousStandby(void); -extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); +extern bool check_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); #endif /* _SYNCREP_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e4a1c3a..056c448 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -80,6 +80,13 @@ typedef struct TimeLineID receivedTLI; /* + * causalReadsLease is the time until which the primary has authorized + * this standby to consider itself available for causal_reads mode, or 0 + * for not authorized. + */ + TimestampTz causalReadsLease; + + /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of * receivedUpto before the last flush to disk. Startup process can use @@ -165,4 +172,6 @@ extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvWakeup(void); +extern bool WalRcvCausalReadsAvailable(void); + #endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4de43e8..f6e0e9e 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -27,6 +27,13 @@ typedef enum WalSndState WALSNDSTATE_STREAMING } WalSndState; +typedef enum WalSndCausalReadsState +{ + WALSNDCRSTATE_UNAVAILABLE = 0, + WALSNDCRSTATE_JOINING, + WALSNDCRSTATE_AVAILABLE +} WalSndCausalReadsState; + /* * Each walsender has a WalSnd struct in shared memory. */ @@ -34,6 +41,7 @@ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ + WalSndCausalReadsState causal_reads_state; /* the walsender's causal reads state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be * reloaded? */ @@ -89,6 +97,12 @@ typedef struct */ bool sync_standbys_defined; + /* + * Until when must commits in causal_reads stall? This is used to wait + * for causal reads leases to expire. + */ + TimestampTz stall_causal_reads_until; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index fc4b765..44f826f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1785,10 +1785,11 @@ pg_stat_replication| SELECT s.pid, w.replay_location, w.replay_lag, w.sync_priority, - w.sync_state + w.sync_state, + w.causal_reads_state FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn), pg_authid u, - pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state) + pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state, causal_reads_state) WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid)); pg_stat_ssl| SELECT s.pid, s.ssl,