diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a09ceb2..eee6d0b 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2664,6 +2664,35 @@ include_dir 'conf.d' across the cluster without problems if that is required. + + All Servers + + These parameters can be set on the primary or any standby. + + + + causal_reads (boolean) + + causal_reads configuration parameter + + + + + Enables causal consistency between transactions run on different + servers. A transaction that is run with causal_reads set + to on is guaranteed either to see the effects of all + completed transactions run on the primary with the setting on, or to + receive an error "standby is not available for causal reads". Note + that both transactions involved in a causal dependency (a write on + the primary followed by a read on any server which must see the + write) must be run with the setting on. + See for more details. + + + + + + Sending Server(s) @@ -2895,6 +2924,48 @@ include_dir 'conf.d' + + causal_reads_timeout (integer) + + causal_reads_timeout configuration parameter + + + + + Specifies the maximum replay lag the primary will tolerate from a + standby before dropping it from the set of standbys available for + causal reads. + + + This setting is also used to control the leases used to + maintain the causal reads guarantee. It must be set to a value which + is at least 4 times the maximum possible difference in system clocks + between the primary and standby servers, as described + in . + + + + + + causal_reads_standby_names (string) + + causal_reads_standby_names configuration parameter + + + + + Specifies a comma-separated list of standby names that can support + causal reads, as described in + . Follows the same convention + as synchronous_standby_name. + The default is *, matching all standbys. + + + This setting has no effect if causal_reads_timeout is not set. + + + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 6cb690c..a338fd8 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1081,6 +1081,9 @@ primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless wal_receiver_status_interval is set to zero on the standby. + In the case that synchronous_commit is set to + remote_apply, the standby sends reply messages when the commit + record is replayed, making the transaction visible. If the standby is the first matching standby, as specified in synchronous_standby_names on the primary, the reply messages from that standby will be used to wake users waiting for @@ -1107,6 +1110,16 @@ primary_slot_name = 'node_a_slot' + Setting synchronous_commit to remote_apply will + cause each commit to wait until the current synchronous standby reports + that it has replayed the transaction, making it visible to user queries. + In simple cases, this allows for load balancing with causal consistency + on a single hot standby. (See also + which deals with multiple standbys and + standby failure.) + + + Users will stop waiting if a fast shutdown is requested. However, as when using asynchronous replication, the server will not fully shutdown until all outstanding WAL records are transferred to the currently @@ -1160,8 +1173,9 @@ primary_slot_name = 'node_a_slot' Planning for High Availability - Commits made when synchronous_commit is set to on - or remote_write will wait until the synchronous standby responds. The response + Commits made when synchronous_commit is set to on, + remote_write or remote_apply will wait until the + synchronous standby responds. The response may never occur if the last, or only, standby should crash. @@ -1221,6 +1235,119 @@ primary_slot_name = 'node_a_slot' + + Causal reads + + causal reads + in standby + + + + The causal reads feature allows read-only queries to run on hot standby + servers without exposing stale data to the client, providing a form of + causal consistency. Transactions can run on any standby with the + following guarantee about the visibility of preceding transactions: If you + set causal_reads to on in any pair of consecutive + transactions tx1, tx2 where tx2 begins after tx1 successfully returns, + then tx2 will either see tx1 or fail with a new error "standby is not + available for causal reads", no matter which server it runs on. Although + the guarantee is expressed in terms of two individual transactions, the + GUC can also be set at session, role or system level to make the guarantee + generally, allowing for load balancing of applications that were not + designed with load balancing in mind. + + + + In order to enable the feature, causal_reads_timeout must be + set to a non-zero value on the primary server. The + GUC causal_reads_standby_names can be used to limit the set of + standbys that can join the dynamic set of causal reads standbys by + providing a comma-separated list of application names. By default, all + standbys are candidates, if the feature is enabled. + + + + The current set of servers that the primary considers to be available for + causal reads can be seen in + the pg_stat_replication + view. Administrators, applications and load balancing middleware can use + this view to discover standbys that can currently handle causal reads + transactions without raising the error. Since that information is only an + instantantaneous snapshot, clients should still be prepared for the error + to be raised at any time, and consider redirecting transactions to another + standby. + + + + The advantages of the causal reads feature over simply + setting synchronous_commit to remote_apply are: + + + + It allows the primary to wait for multiple standbys to replay + transactions. + + + + + It places a configurable limit on how much replay lag (and therefore + delay at commit time) the primary tolerates from standbys before it + drops them from the dynamic set of standbys it waits for. + + + + + It upholds the causal reads guarantee during the transitions that + occur when new standbys are added or removed from the set of standbys, + including scenarios where contact has been lost between the primary + and standbys but the standby is still alive and running client + queries. + + + + + + + The protocol used to uphold the guarantee even in the case of network + failure depends on the system clocks of the primary and standby servers + being synchronized, with an allowance for a difference up to one quarter + of causal_reads_timeout. For example, + if causal_reads_timeout is set to 4s, then the + clocks must not be further than 1 second apart for the guarantee to be + upheld reliably during transitions. The ubiquity of the Network Time + Protocol (NTP) on modern operating systems and availability of high + quality time servers makes it possible to choose a tolerance significantly + higher than the maximum expected clock difference. An effort is + nevertheless made to detect and report misconfigured and faulty systems + with clock differences greater than the configured tolerance. + + + + + Current hardware clocks, NTP implementations and public time servers are + unlikely to allow the system clocks to differ more than tens or hundreds + of milliseconds, and systems synchronized with dedicated local time + servers may be considerably more accurate, but you should only consider + setting causal_reads_timeout below 4 seconds (allowing up to + 1 second of clock difference) after researching your time synchronization + infrastructure thoroughly. + + + + + + While similar to synchronous replication in the sense that both involve + the primary server waiting for responses from standby servers, the + causal reads feature is not concerned with avoiding data loss. A + primary configured for causal reads will drop all standbys that stop + responding or replay too slowly from the dynamic set that it waits for, + so you should consider configuring both synchronous replication and + causal reads if you need data loss avoidance guarantees and causal + consistency guarantees for load balancing. + + + + Continuous archiving in standby @@ -1569,7 +1696,16 @@ if (!triggered) so there will be a measurable delay between primary and standby. Running the same query nearly simultaneously on both primary and standby might therefore return differing results. We say that data on the standby is - eventually consistent with the primary. Once the + eventually consistent with the primary by default. + The data visible to a transaction running on a standby can be + made causally consistent with respect to a transaction that + has completed on the primary by setting causal_reads + to on in both transactions. For more details, + see . + + + + Once the commit record for a transaction is replayed on the standby, the changes made by that transaction will be visible to any new snapshots taken on the standby. Snapshots may be taken at the start of each query or at the diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 85459d0..5a87f37 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -820,6 +820,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser standby server + replay_lag + interval + Estimated time taken for recent WAL records to be replayed on this + standby server + + sync_priority integer Priority of this standby server for being chosen as the @@ -830,6 +836,17 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser text Synchronous state of this standby server + + causal_reads_state + text + Causal reads state of this standby server. This field will be + non-null only if cause_reads_timeout is set. If a standby is + in available state, then it can currently serve causal reads + queries. If it is not replaying fast enough or not responding to + keepalive messages, it will be in unavailable state, and if + it is currently transitioning to availability it will be + in joining state for a short time. + diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b0d5440..8bfc510 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1324,7 +1324,10 @@ RecordTransactionCommit(void) * in the procarray and continue to hold locks. */ if (wrote_xlog && markXidCommitted) + { + CausalReadsWaitForLSN(XactLastRecEnd); SyncRepWaitForLSN(XactLastRecEnd); + } /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -5117,6 +5120,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; /* + * Check if the caller would like to ask standbys for immediate feedback + * once this commit is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY || causal_reads) + xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; + + /* * Relcache invalidations requires information about the current database * and so does logical decoding. */ @@ -5452,6 +5462,19 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); + /* + * Record the primary's timestamp for the commit record, so it can be used + * for tracking replay lag. + */ + SetXLogReplayTimestamp(parsed->xact_time); + + /* + * If asked by the primary (because someone is waiting for a synchronous + * commit or causal reads), we will need to ask walreceiver to send a + * reply immediately. + */ + if (XactCompletionSyncApplyFeedback(parsed->xinfo)) + XLogRequestWalReceiverReply(); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 94b79ac..b7348ab 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -81,6 +81,8 @@ extern uint32 bootstrap_data_checksum_version; #define PROMOTE_SIGNAL_FILE "promote" #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote" +/* Size of the circular buffer of timestamped LSNs. */ +#define MAX_TIMESTAMPED_LSNS 8192 /* User-settable parameters */ int max_wal_size = 64; /* 1 GB */ @@ -346,6 +348,12 @@ static XLogRecPtr RedoRecPtr; static bool doPageWrites; /* + * doRequestWalReceiverReply is used by recovery code to ask the main recovery + * loop to trigger a walreceiver reply. + */ +static bool doRequestWalReceiverReply; + +/* * RedoStartLSN points to the checkpoint's REDO location which is specified * in a backup label file, backup history file or control file. In standby * mode, XLOG streaming usually starts from the position where an invalid @@ -357,6 +365,13 @@ static bool doPageWrites; */ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; +/* + * LastReplayedTimestamp can be set by redo handlers when they apply a record + * that carries a timestamp, by calling SetXLogReplayedTimestamp. The xlog + * apply loop can then update the value in shared memory. + */ +static TimestampTz LastReplayedTimestamp = 0; + /*---------- * Shared-memory data structures for XLOG control * @@ -631,6 +646,21 @@ typedef struct XLogCtlData /* current effective recovery target timeline */ TimeLineID RecoveryTargetTLI; + /* timestamp from the most recently applied record that carried a timestamp. */ + TimestampTz lastReplayedTimestamp; + + /* + * We maintain a circular buffer of LSNs and associated timestamps. + * Walreceiver writes into it using information from timestamps, and the + * startup recovery process reads from it and notifies walreceiver when + * LSNs are replayed so that the timestamps can be fed back to the + * upstream server, to track lag. + */ + Index timestampedLsnRead; + Index timestampedLsnWrite; + XLogRecPtr timestampedLsn[MAX_TIMESTAMPED_LSNS]; + TimestampTz timestampedLsnTime[MAX_TIMESTAMPED_LSNS]; + /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery @@ -6897,14 +6927,58 @@ StartupXLOG(void) error_context_stack = errcallback.previous; /* - * Update lastReplayedEndRecPtr after this record has been - * successfully replayed. + * Update lastReplayedEndRecPtr and lastReplayedTimestamp + * after this record has been successfully replayed. */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->lastReplayedEndRecPtr = EndRecPtr; XLogCtl->lastReplayedTLI = ThisTimeLineID; + if (LastReplayedTimestamp != 0) + { + /* If replaying a record produced a timestamp, use that. */ + XLogCtl->lastReplayedTimestamp = LastReplayedTimestamp; + LastReplayedTimestamp = 0; + } + else + { + /* + * If we have applied LSNs associated with timestamps + * received by walreceiver, then use the recorded + * timestamp. We consume from the read end of the + * circular buffer. + */ + while (XLogCtl->timestampedLsnRead != + XLogCtl->timestampedLsnWrite && + XLogCtl->timestampedLsn[XLogCtl->timestampedLsnRead] + <= EndRecPtr) + { + if (XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead] > + XLogCtl->lastReplayedTimestamp) + { + XLogCtl->lastReplayedTimestamp = + XLogCtl->timestampedLsnTime[XLogCtl->timestampedLsnRead]; + doRequestWalReceiverReply = true; + } + XLogCtl->timestampedLsnRead = + (XLogCtl->timestampedLsnRead + 1) % MAX_TIMESTAMPED_LSNS; + } + } SpinLockRelease(&XLogCtl->info_lck); + /* + * If rm_redo reported that it applied a commit record that + * the master is waiting for by calling + * XLogRequestWalReceiverReply, or we encountered a WAL + * location that was associated with a timestamp above, then + * we wake up the receiver so that it notices the updated + * lastReplayedEndRecPtr and sends a reply to the master. + */ + if (doRequestWalReceiverReply) + { + doRequestWalReceiverReply = false; + WalRcvWakeup(); + } + /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -11626,3 +11700,103 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Called by redo code to indicate that the xlog replay loop should wake up + * the walreceiver process so that a reply can be sent to the primary. + */ +void +XLogRequestWalReceiverReply(void) +{ + doRequestWalReceiverReply = true; +} + +/* + * Merge timestamps from keepalive messages with the timestamps from WAL + * records, so that we can track lag while idle or while replaying large + * amounts of WAL without commit records. In the former case there is no lag, + * and in the latter case we will remember a timestamp that goes with an + * arbitrary LSN, and wait for that LSN to be replayed before using the + * timestamp. + * + * This is called by walreceiver on standby servers when keepalive messages + * arrive. + */ +void +SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn) +{ + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn == XLogCtl->lastReplayedEndRecPtr) + { + /* + * That is the last replayed LSN: we are fully replayed, so we can + * update the replay timestamp immediately. + */ + XLogCtl->lastReplayedTimestamp = timestamp; + } + else + { + /* + * There is WAL still to be applied. We will associate the timestamp + * with this WAL position and wait for it to be replayed. We add it + * at the 'write' end of the circular buffer of LSN/timestamp + * mappings, which the replay loop will eventually read. + */ + Index w = XLogCtl->timestampedLsnWrite; + Index r = XLogCtl->timestampedLsnRead; + + XLogCtl->timestampedLsn[w] = lsn; + XLogCtl->timestampedLsnTime[w] = timestamp; + + /* Advance the write point. */ + w = (w + 1) % MAX_TIMESTAMPED_LSNS; + XLogCtl->timestampedLsnWrite = w; + if (w == r) + { + /* + * The buffer is full. Advance the read point (throwing away + * oldest values; we will begin to overestimate replay lag, until + * lag decreases to a size our buffer can manage, or the next + * commit record is replayed). + */ + r = (r + 1) % MAX_TIMESTAMPED_LSNS; + XLogCtl->timestampedLsnRead = r; + } + } + SpinLockRelease(&XLogCtl->info_lck); +} + +/* + * Set the timestamp for the most recently applied WAL record that carried a + * timestamp from the primary. This can be called by redo handlers that have + * an appropriate timestamp (currently only commit records). Updating the + * shared memory value is deferred until after the redo handler returns. + */ +void +SetXLogReplayTimestamp(TimestampTz timestamp) +{ + LastReplayedTimestamp = timestamp; +} + +/* + * Get the timestamp for the most recently applied WAL record that carried a + * timestamp from the master, and also the most recently applied LSN. (Note + * that the timestamp and the LSN don't necessarily relate to the same + * record.) + * + * This is similar to GetLatestXTime, except that it is not only advanced by + * commit records (see SetXLogReplayTimestampAtLsn). + */ +TimestampTz +GetXLogReplayTimestamp(XLogRecPtr *lsn) +{ + TimestampTz result; + + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn) + *lsn = XLogCtl->lastReplayedEndRecPtr; + result = XLogCtl->lastReplayedTimestamp; + SpinLockRelease(&XLogCtl->info_lck); + + return result; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index abf9a70..b80206e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -661,8 +661,10 @@ CREATE VIEW pg_stat_replication AS W.write_location, W.flush_location, W.replay_location, + W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.causal_reads_state FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W WHERE S.usesysid = U.oid AND diff --git a/src/backend/replication/README.causal-reads b/src/backend/replication/README.causal-reads new file mode 100644 index 0000000..b85d695 --- /dev/null +++ b/src/backend/replication/README.causal-reads @@ -0,0 +1,190 @@ +The causal reads guarantee says: If you run any two consecutive +transactions tx1, tx2 where tx1 completes before tx2 begins, with +causal_reads set to "on" in both transactions, tx2 will see tx1 or +raise an error to complain that it can't guarantee causal consistency, +no matter which servers (primary or any standby) you run each +transaction on. + +When both transactions run on the primary, the guarantee is trivial. + +To deal with read-only physical streaming standbys, the primary keeps +track of a set of standbys that it considers to be currently +"available" for causal reads, and sends a stream of "leases" to those +standbys granting them the right to handle causal reads transactions +for a short time without any further communication with the primary. + +In general, the primary provides the guarantee by waiting for all of +the "available" standbys to report that they have applied a +transaction. However, the set of available standbys is dynamic, and +things get more complicated during state transitions. There are two +types of transitions to consider: + +1. unavailable->joining->available + +Standbys start out as "unavailable". If a standby is unavailable and +is applying fast enough and matches causal_reads_standby_names, the +primary transitions it to "available", but first it sets it to +"joining" until it is sure that any transaction committed while it was +unavailable has definitely been applied on the standby. This closes a +race that would otherwise exist if we moved directly to available +state: tx1 might not wait for a given standby because it's +unavailable, then a lease might be granted, and then tx2 might run a +causal reads transaction without error but see stale data. The +joining state acts as an airlock: while in joining state, the primary +waits for that standby to replay causal reads transactions in +anticipation of the move to available, but it doesn't progress to +available state and grant a lease to the standby until everything +preceding joining state has also been applied. + +2. available->unavailable + +If a standby is not applying fast enough or not responding to +keepalive messages, then the primary kicks that standby out of the +dynamic set of available standbys, that is, marks it as "unavailable". +In order to make sure that the standby has started rejecting causal +reads transactions, it needs to revoke the lease it most recently +granted. It does that by waiting for the lease to expire before +allowing any causal reads commits to return. (In future there could +be a fast-path recovation message which waits for a serial-numbered +acknowledgement to reduce waiting in the case where the standby is +lagging but still reachable and responding). + +The rest of this document illustrates how clock skew affects the +available->unavailable transition. + +The following 4 variables are derived from a single GUC, and these +values will be used in the following illustrations: + +causal_reads_timeout = 4s +lease_time = 4s (= causal_reads_timeout) +keepalive_time = 2s (= lease_time / 2) +max_clock_skew = 1s (= lease_time / 4) + +Every keepalive_time, the primary transmits a lease that expires at +local_clock_time + lease_time - max_clock_skew, shown in the following +diagram as 't' for transmission time and '|' for expiry time. If +contact is lost with a standby, the primary will wait until sent_time ++ lease_time for the most recently granted lease to expire, shown on +the following diagram 'x', to be sure that the standby's clock has +reached the expiry time even if its clock differs by up to +max_clock_skew. In other words, the primary tells the standby that +the expiry time is at one time, but it trusts that the standby will +surely agree if it gives it some extra time. The extra time is +max_clock_skew. If the clocks differ by more than max_clock_skew, all +bets are off (but see below for attempt to detect obvious cases). + +0 1 2 3 4 5 6 7 8 9 +t-----------------|-----x + t-----------------|-----x + t-----------------|-----x + t-----------------|... + t------... + +A standby whose clock is 2 seconds ahead of the primary's clock +perceives gaps in the stream of leases, and will reject causal_reads +transactions in those intervals. The causal reads guarantee is +upheld, but spurious errors are raised between leases, as a +consequence of the clock skew being greater than max_clock_skew. In +the following diagram 'r' shows reception time, and the timeline along +the top shows the standby's local clock time. + +2 3 4 5 6 7 8 9 10 11 +r-----| + r-----| + r-----| + r-----| + r-----| + +If there were no network latency, a standby whose clock is exactly 1 +second ahead of the primary's clock would perceive the stream of +leases as being replaced just in time, so there is no gap. Since in +reality the time of receipt is some time after the time of +transmission due to network latency, if the standby's clock is exactly +1 second behind, then there will be small network-latency-sized gaps +before the next lease arrives, but still no correctness problem with +respect to the causal reads guarantee. + +1 2 3 4 5 6 7 8 9 10 +r-----------| + r-----------| + r-----------| + r-----------| + r------... + +A standby whose clock is perfectly in sync with the primary's +perceives the stream of leases overlapping (this matches the primary's +perception of the leases it sent): + +0 1 2 3 4 5 6 7 8 9 +r-----------------| + r-----------------| + r-----------------| + r-----------------| + r------... + +A standby whose clock is exactly 1 second behind the primary's +perceives the stream of leases as overlapping even more, but the time +of expiry as judged by the standby is no later than the time the +primary will wait for if required ('x'). That is, if contact is lost +with the standby, the primary can still reliably hold up causal reads +commits until the standby has started raising the error in +causal_reads transactions. + +-1 0 1 2 3 4 5 6 7 8 +r-----------------------| + r-----------------------| + r-----------------------| + r------------------... + r------... + + +A standby whose clock is 2 seconds behind the primary's would perceive +the stream of leases overlapping even more, and the primary would no +longer be able to wait for a lease to expire if it wanted to revoke +it. But because the expiry time is after local_clock_time + +lease_time, the standby can immediately see that its own clock must be +more than 1 second behind the primary's, so it ignores the lease and +logs a clock skew warning. In the following diagram a lease expiry +time that is obviously generated by a primary with a clock set too far +in the future compared to the local clock is shown with a '!'. + +-2 -1 0 1 2 3 4 5 6 7 +r-----------------------------! + r-----------------------------! + r-----------------------------! + r------------------... + r------... + +A danger window exists when the standby's clock is more than +max_clock_skew behind the primary's clock, but not more than +max_clock_skew + network latency time behind. If the clock difference +is in that range, then the algorithm presented above which is based on +time of receipt cannot detect that the local clock is too far behind. +The consequence of this problem could be as follows: + +1. The standby loses contact with the primary due to a network fault. + +2. The primary decides to drop the standby from the set of available + causal reads standbys due to lack of keepalive responses or + excessive lag, which necessitates holding up commits of causal + reads transactions until the most recently sent lease expires, in + the belief that the standby will definitely have started raising + the 'causal reads unavailable' error in causal reads transactions + by that time, if it is still alive and servicing requests. + +3. The standby still has clients connected and running queries. + +4. Due to clock skew in the problematic range, in the standby's + opinion the lease lasts slightly longer than the primary waits. + +5. For a short window at most the duration of the network latency + time, clients running causal reads transactions are allowed to see + potentially stale data. + +For this reason we say that the causal reads guarantee only holds as +long as the absolute difference between the system clocks of the +machines is no more than max_clock_skew. The theory is that NTP makes +it possible to reason about the maximum possible clock difference +between machines and choose a value that is safe even with a much +larger difference. However, we do make a best effort attempt to +detect misconfigured systems as described above. \ No newline at end of file diff --git a/src/backend/replication/README.causal_reads b/src/backend/replication/README.causal_reads new file mode 100644 index 0000000..35b29e9 --- /dev/null +++ b/src/backend/replication/README.causal_reads @@ -0,0 +1,193 @@ +The causal reads guarantee says: If you run any two consecutive +transactions tx1, tx2 where tx1 completes before tx2 begins, with +causal_reads set to "on" in both transactions, tx2 will see tx1 or +raise an error to complain that it can't guarantee causal consistency, +no matter which servers (primary or any standby) you run each +transaction on. + +When both transactions run on the primary, the guarantee is trivially +upheld. + +To deal with read-only physical streaming standbys, the primary keeps +track of a set of standbys that it considers to be currently +"available" for causal reads, and sends a stream of "leases" to those +standbys granting them the right to handle causal reads transactions +for a short time without any further communication with the primary. + +In general, the primary provides the guarantee by waiting for all of +the "available" standbys to report that they have applied a +transaction. However, the set of available standbys is dynamic, and +things get more complicated during state transitions. There are two +types of transitions to consider: + +1. unavailable->joining->available + +Standbys start out as "unavailable". If a standby is unavailable and +is applying fast enough and matches causal_reads_standby_names, the +primary transitions it to "available", but first it sets it to +"joining" until it is sure that any transaction committed while it was +unavailable has definitely been applied on the standby. This closes a +race that would otherwise exist if we moved directly to available +state: tx1 might not wait for a given standby because it's +unavailable, then a lease might be granted, and then tx2 might run a +causal reads transaction without error but see stale data. The +joining state acts as an airlock: while in joining state, the primary +waits for that standby to replay causal reads transactions in +anticipation of the move to available, but it doesn't progress to +available state and grant a lease to the standby until everything +preceding joining state has also been applied. + +2. available->unavailable + +If a standby is not applying fast enough or not responding to +keepalive messages, then the primary kicks that standby out of the +dynamic set of available standbys, that is, marks it as "unavailable". +In order to make sure that the standby has started rejecting causal +reads transactions, it needs to revoke the lease it most recently +granted. It does that by waiting for the lease to expire before +allowing any causal reads commits to return. (In future there could +be a fast-path revocation message which waits for a serial-numbered +acknowledgement to reduce waiting in the case where the standby is +lagging but still reachable and responding). + +The rest of this document illustrates how clock skew affects the +available->unavailable transition. + +The following 4 variables are derived from a single GUC, and these +values will be used in the following illustrations: + +causal_reads_timeout = 4s +lease_time = 4s (= causal_reads_timeout) +keepalive_time = 2s (= lease_time / 2) +max_clock_skew = 1s (= lease_time / 4) + +Every keepalive_time, the primary transmits a lease that expires at +local_clock_time + lease_time - max_clock_skew, shown in the following +diagram as 't' for transmission time and '|' for expiry time. If +contact is lost with a standby, the primary will wait until sent_time ++ lease_time for the most recently granted lease to expire, shown on +the following diagram 'x', to be sure that the standby's clock has +reached the expiry time even if its clock differs by up to +max_clock_skew. In other words, the primary tells the standby that +the expiry time is at one time, but it trusts that the standby will +surely agree if it gives it some extra time. The extra time is +max_clock_skew. If the clocks differ by more than max_clock_skew, all +bets are off (but see below for attempt to detect obvious cases). + +0 1 2 3 4 5 6 7 8 9 +t-----------------|-----x + t-----------------|-----x + t-----------------|-----x + t-----------------|... + t------... + +A standby whose clock is 2 seconds ahead of the primary's clock +perceives gaps in the stream of leases, and will reject causal_reads +transactions in those intervals. The causal reads guarantee is +upheld, but spurious errors are raised between leases, as a +consequence of the clock skew being greater than max_clock_skew. In +the following diagram 'r' shows reception time, and the timeline along +the top shows the standby's local clock time. + +2 3 4 5 6 7 8 9 10 11 +r-----| + r-----| + r-----| + r-----| + r-----| + +If there were no network latency, a standby whose clock is exactly 1 +second ahead of the primary's clock would perceive the stream of +leases as being replaced just in time, so there is no gap. Since in +reality the time of receipt is some time after the time of +transmission due to network latency, if the standby's clock is exactly +1 second behind, then there will be small network-latency-sized gaps +before the next lease arrives, but still no correctness problem with +respect to the causal reads guarantee. + +1 2 3 4 5 6 7 8 9 10 +r-----------| + r-----------| + r-----------| + r-----------| + r------... + +A standby whose clock is perfectly in sync with the primary's +perceives the stream of leases overlapping (this matches the primary's +perception of the leases it sent): + +0 1 2 3 4 5 6 7 8 9 +r-----------------| + r-----------------| + r-----------------| + r-----------------| + r------... + +A standby whose clock is exactly 1 second behind the primary's +perceives the stream of leases as overlapping even more, but the time +of expiry as judged by the standby is no later than the time the +primary will wait for if required ('x'). That is, if contact is lost +with the standby, the primary can still reliably hold up causal reads +commits until the standby has started raising the error in +causal_reads transactions. + +-1 0 1 2 3 4 5 6 7 8 +r-----------------------| + r-----------------------| + r-----------------------| + r------------------... + r------... + + +A standby whose clock is 2 seconds behind the primary's would perceive +the stream of leases overlapping even more, and the primary would no +longer be able to wait for a lease to expire if it wanted to revoke +it. But because the expiry time is after local_clock_time + +lease_time, the standby can immediately see that its own clock must be +more than 1 second behind the primary's, so it ignores the lease and +logs a clock skew warning. In the following diagram a lease expiry +time that is obviously generated by a primary with a clock set too far +in the future compared to the local clock is shown with a '!'. + +-2 -1 0 1 2 3 4 5 6 7 +r-----------------------------! + r-----------------------------! + r-----------------------------! + r------------------... + r------... + +A danger window exists when the standby's clock is more than +max_clock_skew behind the primary's clock, but not more than +max_clock_skew + network latency time behind. If the clock difference +is in that range, then the algorithm presented above which is based on +time of receipt cannot detect that the local clock is too far behind. +The consequence of this problem could be as follows: + +1. The standby loses contact with the primary due to a network fault. + +2. The primary decides to drop the standby from the set of available + causal reads standbys due to lack of keepalive responses or + excessive lag, which necessitates holding up commits of causal + reads transactions until the most recently sent lease expires, in + the belief that the standby will definitely have started raising + the 'causal reads unavailable' error in causal reads transactions + by that time, if it is still alive and servicing requests. + +3. The standby still has clients connected and running queries. + +4. Due to clock skew in the problematic range, in the standby's + opinion the lease lasts slightly longer than the primary waits. + +5. For a short window at most the duration of the network latency + time, clients running causal reads transactions are allowed to see + potentially stale data. + +For this reason we say that the causal reads guarantee only holds as +long as the absolute difference between the system clocks of the +machines is no more than max_clock_skew. The theory is that NTP makes +it possible to reason about the maximum possible clock difference +between machines and choose a value that allows for a much larger +difference. However, we do make a best effort attempt to detect +misconfigured systems as described above, to catch the case of servers +not running ntp a correctly configured ntp daemon, or with a clock so +far out of whack that ntp refuses to fix it. \ No newline at end of file diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 7f85b88..e0ea7b7 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -57,6 +57,11 @@ #include "utils/builtins.h" #include "utils/ps_status.h" +/* GUC variables */ +int causal_reads_timeout; +bool causal_reads; +char *causal_reads_standby_names; + /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; @@ -69,7 +74,7 @@ static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); -static int SyncRepWakeQueue(bool all, int mode); +static int SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn); static int SyncRepGetStandbyPriority(void); @@ -83,6 +88,255 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * =========================================================== */ +static bool +SyncRepCheckEarlyExit(void) +{ + /* + * If a wait for synchronous replication is pending, we can neither + * acknowledge the commit nor raise ERROR or FATAL. The latter would + * lead the client to believe that the transaction aborted, which + * is not true: it's already committed locally. The former is no good + * either: the client has requested synchronous replication, and is + * entitled to assume that an acknowledged commit is also replicated, + * which might not be true. So in this case we issue a WARNING (which + * some clients may be able to interpret) and shut off further output. + * We do NOT reset ProcDiePending, so that the process will die after + * the commit is cleaned up. + */ + if (ProcDiePending) + { + ereport(WARNING, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), + errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + whereToSendOutput = DestNone; + SyncRepCancelWait(); + return true; + } + + /* + * It's unclear what to do if a query cancel interrupt arrives. We + * can't actually abort at this point, but ignoring the interrupt + * altogether is not helpful, so we just terminate the wait with a + * suitable warning. + */ + if (QueryCancelPending) + { + QueryCancelPending = false; + ereport(WARNING, + (errmsg("canceling wait for synchronous replication due to user request"), + errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + SyncRepCancelWait(); + return true; + } + + /* + * If the postmaster dies, we'll probably never get an + * acknowledgement, because all the wal sender processes will exit. So + * just bail out. + */ + if (!PostmasterIsAlive()) + { + ProcDiePending = true; + whereToSendOutput = DestNone; + SyncRepCancelWait(); + return true; + } + + return false; +} + +/* + * Check if we can stop waiting for causal consistency. We can stop waiting + * when the following conditions are met: + * + * 1. All walsenders currently in 'joining' or 'available' state have + * applied the target LSN. + * + * 2. Any stall periods caused by standbys dropping out of 'available' state + * have passed, so that we can be sure that their leases have expired and they + * have started rejecting causal reads transactions. + * + * The output parameter 'waitingFor' is set to the number of nodes we are + * currently waiting for. The output parameters 'stallTimeMillis' is set to + * the number of milliseconds we need to wait for to observe any current + * commit stall. + * + * Returns true if commit can return control, because every standby has either + * applied the LSN or started rejecting causal_reads transactions. + */ +static bool +CausalReadsCommitCanReturn(XLogRecPtr XactCommitLSN, + int *waitingFor, + long *stallTimeMillis) +{ + int i; + TimestampTz now; + + /* Count how many joining/available nodes we are waiting for. */ + *waitingFor = 0; + for (i = 0; i < max_wal_senders; ++i) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* + * Assuming atomic read of pid_t, we can check walsnd->pid without + * acquiring the spinlock to avoid memory synchronization costs for + * unused walsender slots. We see a value that existed sometime at + * least as recently as the last memory barrier. + */ + if (walsnd->pid != 0) + { + /* + * We need to hold the spinlock to read LSNs, because we can't be + * sure they can be read atomically. + */ + SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid != 0 && walsnd->causal_reads_state >= WALSNDCRSTATE_JOINING) + { + if (walsnd->apply < XactCommitLSN) + ++*waitingFor; + } + SpinLockRelease(&walsnd->mutex); + } + } + + /* Check if there is a stall in progress that we need to observe. */ + now = GetCurrentTimestamp(); + LWLockAcquire(SyncRepLock, LW_SHARED); + if (WalSndCtl->stall_causal_reads_until > now) + { + long seconds; + int usecs; + + /* Compute how long we have to wait, rounded up to nearest ms. */ + TimestampDifference(now, WalSndCtl->stall_causal_reads_until, + &seconds, &usecs); + *stallTimeMillis = seconds * 1000 + (usecs + 999) / 1000; + } + else + *stallTimeMillis = 0; + LWLockRelease(SyncRepLock); + + /* We are done if we are not waiting for any nodes or stalls. */ + return *waitingFor == 0 && *stallTimeMillis == 0; +} + +/* + * Wait for causal consistency in causal_reads mode, if requested by user. + */ +void +CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN) +{ + long stallTimeMillis; + int waitingFor; + char *ps_display_buffer = NULL; + + /* Leave if we aren't in causal_reads mode. */ + if (!causal_reads) + return; + + for (;;) + { + /* Reset latch before checking state. */ + ResetLatch(MyLatch); + + /* + * Join the queue to be woken up if any causal reads joining/available + * standby applies XactCommitLSN, if we aren't already in it. We + * don't actually know if we need to wait for any peers yet, but we + * have to register just in case before checking the walsenders' state + * to avoid a race condition that could occur if we did it after + * calling CausalReadsCommitCanReturn. (SyncRepWaitForLSN doesn't + * have to do this because it can check the highest-seen LSN in + * walsndctl->lsn[mode] which is protected by SyncRepLock, the same + * lock as the queues. We can't do that here, because there is no + * single highest-seen LSN that is useful. We must check + * walsnd->apply for all relevant walsenders. Therefore we must + * register for notifications first, so that we can be notified via + * our latch of any standby applying the LSN we're interested in after + * we check but before we start waiting, or we could wait forever for + * something that has already happened.) + */ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + if (MyProc->syncRepState != SYNC_REP_WAITING) + { + MyProc->waitLSN = XactCommitLSN; + MyProc->syncRepState = SYNC_REP_WAITING; + SyncRepQueueInsert(SYNC_REP_WAIT_CAUSAL_READS_APPLY); + Assert(SyncRepQueueIsOrderedByLSN(SYNC_REP_WAIT_CAUSAL_READS_APPLY)); + } + LWLockRelease(SyncRepLock); + + /* Check if we're done. */ + if (CausalReadsCommitCanReturn(XactCommitLSN, &waitingFor, &stallTimeMillis)) + { + SyncRepCancelWait(); + break; + } + + Assert(waitingFor > 0 || stallTimeMillis > 0); + + /* If we aren't actually waiting for any standbys, leave the queue. */ + if (waitingFor == 0) + SyncRepCancelWait(); + + /* Update the ps title. */ + if (update_process_title) + { + char buffer[80]; + + /* Remember the old value if this is our first update. */ + if (ps_display_buffer == NULL) + { + int len; + const char *ps_display = get_ps_display(&len); + + ps_display_buffer = palloc(len + 1); + memcpy(ps_display_buffer, ps_display, len); + ps_display_buffer[len] = '\0'; + } + + snprintf(buffer, sizeof(buffer), + "waiting for %d peer(s) to apply %X/%X%s", + waitingFor, + (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN, + stallTimeMillis > 0 ? " (stalling)" : ""); + set_ps_display(buffer, false); + } + + /* Check if we need to exit early due to postmaster death etc. */ + if (SyncRepCheckEarlyExit()) /* Calls SyncRepCancelWait() if true. */ + break; + + /* + * If are still waiting for peers, then we wait for any joining or + * available peer to reach the LSN (or possibly stop being in one of + * those states or go away). + * + * If not, there must be a non-zero stall time, so we wait for that to + * elapse. + */ + if (waitingFor > 0) + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1); + else + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, + stallTimeMillis); + } + + /* There is no way out of the loop that could leave us in the queue. */ + Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + MyProc->waitLSN = 0; + + /* Restore the ps display. */ + if (ps_display_buffer != NULL) + { + set_ps_display(ps_display_buffer, false); + pfree(ps_display_buffer); + } +} + /* * Wait for synchronous replication, if requested by user. * @@ -180,57 +434,9 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) if (syncRepState == SYNC_REP_WAIT_COMPLETE) break; - /* - * If a wait for synchronous replication is pending, we can neither - * acknowledge the commit nor raise ERROR or FATAL. The latter would - * lead the client to believe that the transaction aborted, which - * is not true: it's already committed locally. The former is no good - * either: the client has requested synchronous replication, and is - * entitled to assume that an acknowledged commit is also replicated, - * which might not be true. So in this case we issue a WARNING (which - * some clients may be able to interpret) and shut off further output. - * We do NOT reset ProcDiePending, so that the process will die after - * the commit is cleaned up. - */ - if (ProcDiePending) - { - ereport(WARNING, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); - whereToSendOutput = DestNone; - SyncRepCancelWait(); - break; - } - - /* - * It's unclear what to do if a query cancel interrupt arrives. We - * can't actually abort at this point, but ignoring the interrupt - * altogether is not helpful, so we just terminate the wait with a - * suitable warning. - */ - if (QueryCancelPending) - { - QueryCancelPending = false; - ereport(WARNING, - (errmsg("canceling wait for synchronous replication due to user request"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); - SyncRepCancelWait(); - break; - } - - /* - * If the postmaster dies, we'll probably never get an - * acknowledgement, because all the wal sender processes will exit. So - * just bail out. - */ - if (!PostmasterIsAlive()) - { - ProcDiePending = true; - whereToSendOutput = DestNone; - SyncRepCancelWait(); + /* Check if we need to exit early due to postmaster death etc. */ + if (SyncRepCheckEarlyExit()) break; - } /* * Wait on latch. Any condition that should wake us up will set the @@ -403,6 +609,49 @@ SyncRepGetSynchronousStandby(void) } /* + * Check if the current WALSender process's application_name matches a name in + * causal_reads_standby_names (including '*' for wildcard). + */ +bool +CausalReadsPotentialStandby(void) +{ + char *rawstring; + List *elemlist; + ListCell *l; + bool found = false; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(causal_reads_standby_names); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist); + /* GUC machinery will have already complained - no need to do again */ + return 0; + } + + foreach(l, elemlist) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, application_name) == 0 || + pg_strcasecmp(standby_name, "*") == 0) + { + found = true; + break; + } + } + + pfree(rawstring); + list_free(elemlist); + + return found; +} + +/* * Update the LSNs on each queue based upon our latest state. This * implements a simple policy of first-valid-standby-releases-waiter. * @@ -410,22 +659,27 @@ SyncRepGetSynchronousStandby(void) * perhaps also which information we store as well. */ void -SyncRepReleaseWaiters(void) +SyncRepReleaseWaiters(bool walsender_cr_available_or_joining) { volatile WalSndCtlData *walsndctl = WalSndCtl; WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; + int numcausalreadsapply = 0; + bool is_highest_priority_sync_standby; /* * If this WALSender is serving a standby that is not on the list of - * potential standbys then we have nothing to do. If we are still starting - * up, still running base backup or the current flush position is still - * invalid, then leave quickly also. + * potential standbys and not in a state that causal_reads waits for, then + * we have nothing to do. If we are still starting up, still running base + * backup or the current flush position is still invalid, then leave + * quickly also. */ - if (MyWalSnd->sync_standby_priority == 0 || - MyWalSnd->state < WALSNDSTATE_STREAMING || - XLogRecPtrIsInvalid(MyWalSnd->flush)) + if (!walsender_cr_available_or_joining && + (MyWalSnd->sync_standby_priority == 0 || + MyWalSnd->state < WALSNDSTATE_STREAMING || + XLogRecPtrIsInvalid(MyWalSnd->flush))) return; /* @@ -435,45 +689,77 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); syncWalSnd = SyncRepGetSynchronousStandby(); - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + /* + * If we aren't managing the highest priority standby then make a note of + * that so we can announce a takeover in the log if we ever get that job. + */ + is_highest_priority_sync_standby = syncWalSnd == MyWalSnd; + if (!is_highest_priority_sync_standby) + announce_next_takeover = true; /* - * If we aren't managing the highest priority standby then just leave. + * If we aren't managing the highest priority standby or a standby in + * causal reads 'joining' or 'available' state, then just leave. */ - if (syncWalSnd != MyWalSnd) + if (!is_highest_priority_sync_standby && !walsender_cr_available_or_joining) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; return; } /* * Set the lsn first so that when we wake backends they will release up to - * this location. + * this location. For the single-standby synchronous commit levels, we + * only do this if we are the current synchronous standby and we are + * advancing the LSN further than it has been advanced before, so that + * SyncRepWaitForLSN can skip waiting in some cases. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (is_highest_priority_sync_standby) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; - numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); - } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) - { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; - numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + { + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, + MyWalSnd->write); + } + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + { + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, + MyWalSnd->flush); + } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, + MyWalSnd->apply); + } } + /* + * For causal_reads, all walsenders currently in available or joining + * state must reach the LSN on their own, and standbys will reach LSNs in + * any order. It doesn't make sense to keep the highest seen LSN in a + * single walsndctl->lsn element. (CausalReadsWaitForLSN has handling for + * LSNs that have already been reached). + */ + if (walsender_cr_available_or_joining) + numcausalreadsapply = + SyncRepWakeQueue(false, SYNC_REP_WAIT_CAUSAL_READS_APPLY, + MyWalSnd->apply); LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X, %d procs to causal_reads apply", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply, + numcausalreadsapply); /* * If we are managing the highest priority standby, though we weren't * prior to this, then announce we are now the sync standby. */ - if (announce_next_takeover) + if (is_highest_priority_sync_standby && announce_next_takeover) { announce_next_takeover = false; ereport(LOG, @@ -548,9 +834,8 @@ SyncRepGetStandbyPriority(void) * Must hold SyncRepLock. */ static int -SyncRepWakeQueue(bool all, int mode) +SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn) { - volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; @@ -567,7 +852,7 @@ SyncRepWakeQueue(bool all, int mode) /* * Assume the queue is ordered by LSN */ - if (!all && walsndctl->lsn[mode] < proc->waitLSN) + if (!all && lsn < proc->waitLSN) return numprocs; /* @@ -627,7 +912,7 @@ SyncRepUpdateSyncStandbysDefined(void) int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + SyncRepWakeQueue(true, i, InvalidXLogRecPtr); } /* @@ -679,13 +964,31 @@ SyncRepQueueIsOrderedByLSN(int mode) #endif /* + * Make sure that CausalReadsWaitForLSN can't return until after the given + * lease expiry time has been reached. + * + * Wake up all backends waiting in CausalReadsWaitForLSN, because the set of + * available/joining peers has changed, and there is a new stall time they + * need to observe. + */ +void +CausalReadsBeginStall(TimestampTz lease_expiry_time) +{ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + WalSndCtl->stall_causal_reads_until = + Max(WalSndCtl->stall_causal_reads_until, lease_expiry_time); + SyncRepWakeQueue(true, SYNC_REP_WAIT_CAUSAL_READS_APPLY, InvalidXLogRecPtr); + LWLockRelease(SyncRepLock); +} + +/* * =========================================================== * Synchronous Replication functions executed by any process * =========================================================== */ bool -check_synchronous_standby_names(char **newval, void **extra, GucSource source) +check_standby_names(char **newval, void **extra, GucSource source) { char *rawstring; List *elemlist; @@ -728,6 +1031,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7b36e02..4526d73 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -55,6 +55,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" @@ -101,6 +102,7 @@ static uint32 recvOff = 0; */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGUSR2 = false; /* * LogstreamResult indicates the byte positions that we have already @@ -145,14 +147,33 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); -static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *causalReadsUntil); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); static void WalRcvSigUsr1Handler(SIGNAL_ARGS); +static void WalRcvSigUsr2Handler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); +static void WalRcvBlockSigUsr2(void) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_BLOCK, &mask, NULL); +} + +static void WalRcvUnblockSigUsr2(void) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_UNBLOCK, &mask, NULL); +} static void ProcessWalRcvInterrupts(void) @@ -200,6 +221,7 @@ WalReceiverMain(void) WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; bool ping_sent; + bool forceReply; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -251,6 +273,7 @@ WalReceiverMain(void) /* Initialise to a sanish value */ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp(); + walrcv->causalReadsLease = 0; SpinLockRelease(&walrcv->mutex); @@ -268,7 +291,7 @@ WalReceiverMain(void) pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalRcvSigUsr1Handler); - pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGUSR2, WalRcvSigUsr2Handler); /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); @@ -299,6 +322,10 @@ WalReceiverMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* Block SIGUSR2 (we unblock it only during network waits). */ + WalRcvBlockSigUsr2(); + got_SIGUSR2 = false; + /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo); @@ -408,7 +435,9 @@ WalReceiverMain(void) } /* Wait a while for data to arrive */ + WalRcvUnblockSigUsr2(); len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + WalRcvBlockSigUsr2(); if (len != 0) { /* @@ -439,11 +468,21 @@ WalReceiverMain(void) endofwal = true; break; } + WalRcvUnblockSigUsr2(); len = walrcv_receive(0, &buf); + WalRcvBlockSigUsr2(); + } + + if (got_SIGUSR2) + { + /* The recovery process asked us to force a reply. */ + got_SIGUSR2 = false; + forceReply = true; } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(forceReply, false); + forceReply = false; /* * If we've written some records, flush them to disk and @@ -498,7 +537,15 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + /* Check if the startup process has signaled us. */ + if (got_SIGUSR2) + { + got_SIGUSR2 = false; + forceReply = true; + } + + XLogWalRcvSendReply(requestReply || forceReply, requestReply); + forceReply = false; XLogWalRcvSendHSFeedback(false); } } @@ -740,6 +787,13 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) errno = save_errno; } +/* SIGUSR2: used to receive wakeups from recovery */ +static void +WalRcvSigUsr2Handler(SIGNAL_ARGS) +{ + got_SIGUSR2 = true; +} + /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ static void WalRcvShutdownHandler(SIGNAL_ARGS) @@ -800,6 +854,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) XLogRecPtr walEnd; TimestampTz sendTime; bool replyRequested; + TimestampTz causalReadsLease; resetStringInfo(&incoming_message); @@ -820,7 +875,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( pq_getmsgint64(&incoming_message)); - ProcessWalSndrMessage(walEnd, sendTime); + ProcessWalSndrMessage(walEnd, sendTime, NULL); buf += hdrlen; len -= hdrlen; @@ -830,7 +885,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) case 'k': /* Keepalive */ { /* copy message to StringInfo */ - hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(int64); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -842,8 +897,12 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) sendTime = IntegerTimestampToTimestampTz( pq_getmsgint64(&incoming_message)); replyRequested = pq_getmsgbyte(&incoming_message); + causalReadsLease = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); + ProcessWalSndrMessage(walEnd, sendTime, &causalReadsLease); - ProcessWalSndrMessage(walEnd, sendTime); + /* Remember primary's timestamp at this WAL location. */ + SetXLogReplayTimestampAtLsn(sendTime, walEnd); /* If the primary requested a reply, send one immediately */ if (replyRequested) @@ -1037,6 +1096,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) XLogRecPtr applyPtr; static TimestampTz sendTime = 0; TimestampTz now; + TimestampTz applyTimestamp = 0; /* * If the user doesn't want status to be reported to the master, be sure @@ -1068,7 +1128,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyTimestamp = GetXLogReplayTimestamp(&applyPtr); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); @@ -1076,6 +1136,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp)); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ @@ -1174,15 +1235,52 @@ XLogWalRcvSendHSFeedback(bool immed) * Update shared memory status upon receiving a message from primary. * * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest - * message, reported by primary. + * message, reported by primary. 'causalReadsLease' is a pointer to + * the time the primary promises that this standby can safely claim to be + * causally consistent, to 0 if it cannot, or a NULL pointer for no change. */ static void -ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) +ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *causalReadsLease) { WalRcvData *walrcv = WalRcv; TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); + /* Sanity check for the causalReadsLease time. */ + if (causalReadsLease != NULL && *causalReadsLease != 0) + { + /* Deduce max_clock_skew from the causalReadsLease and sendTime. */ +#ifdef HAVE_INT64_TIMESTAMP + int64 diffMillis = (*causalReadsLease - sendTime) / 1000; +#else + int64 diffMillis = (*causalReadsLease - sendTime) * 1000; +#endif + int64 max_clock_skew = diffMillis / (CAUSAL_READS_CLOCK_SKEW_RATIO - 1); + + if (sendTime > TimestampTzPlusMilliseconds(lastMsgReceiptTime, max_clock_skew)) + { + /* + * The primary's clock is more than max_clock_skew + network + * latency ahead of the standby's clock. (If the primary's clock + * is more than max_clock_skew ahead of the standby's clock, but + * by less than the network latency, then there isn't much we can + * do to detect that; but it still seems useful to have this basic + * sanity check for wildly misconfigured servers.) + */ + elog(LOG, "the primary server's clock time is too far ahead"); + causalReadsLease = NULL; + } + /* + * We could also try to detect cases where sendTime is more than + * max_clock_skew in the past according to the standby's clock, but + * that is indistinguishable from network latency/buffering, so we + * could produce misleading error messages; if we do nothing, the + * consequence is 'standby is not available for causal reads' errors + * which should cause the user to investigate. + */ + } + /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); if (walrcv->latestWalEnd < walEnd) @@ -1190,6 +1288,8 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) walrcv->latestWalEnd = walEnd; walrcv->lastMsgSendTime = sendTime; walrcv->lastMsgReceiptTime = lastMsgReceiptTime; + if (causalReadsLease != NULL) + walrcv->causalReadsLease = *causalReadsLease; SpinLockRelease(&walrcv->mutex); if (log_min_messages <= DEBUG2) @@ -1222,6 +1322,22 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } /* + * Wake up the walreceiver if it happens to be blocked in walrcv_receive, + * and tell it that a commit record has been applied. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = apply or causal_reads = on. + */ +void +WalRcvWakeup(void) +{ + if (WalRcv->pid != 0) + kill(WalRcv->pid, SIGUSR2); +} + +/* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 5f6e423..f398a75 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -25,9 +25,11 @@ #include "access/xlog_internal.h" #include "postmaster/startup.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/guc.h" #include "utils/timestamp.h" WalRcvData *WalRcv = NULL; @@ -374,3 +376,21 @@ GetReplicationTransferLatency(void) return ms; } + +/* + * Used by snapmgr to check if this standby has a valid lease, granting it the + * right to consider itself available for causal reads. + */ +bool +WalRcvCausalReadsAvailable(void) +{ + WalRcvData *walrcv = WalRcv; + TimestampTz now = GetCurrentTimestamp(); + bool result; + + SpinLockAcquire(&walrcv->mutex); + result = walrcv->causalReadsLease != 0 && now <= walrcv->causalReadsLease; + SpinLockRelease(&walrcv->mutex); + + return result; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c03e045..55c10e4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -153,9 +153,20 @@ static StringInfoData tmpbuf; */ static TimestampTz last_reply_timestamp = 0; +static TimestampTz last_keepalive_timestamp = 0; + /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; +/* How long do need to stay in JOINING state? */ +static XLogRecPtr causal_reads_joining_until = 0; + +/* The last causal reads lease sent to the standby. */ +static TimestampTz causal_reads_last_lease = 0; + +/* Is this WALSender listed in causal_reads_standby_names? */ +static bool am_potential_causal_reads_standby = false; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -242,6 +253,57 @@ InitWalSender(void) } /* + * If we are exiting unexpectedly, we may need to communicate with concurrent + * causal_reads commits to maintain the causal consistency guarantee. + */ +static void +PrepareUncleanExit(void) +{ + if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + /* + * We've lost contact with the standby, but it may still be alive. We + * can't let any causal_reads transactions return until we've stalled + * for long enough for a zombie standby to start raising errors + * because its lease has expired. + */ + elog(LOG, "standby \"%s\" is lost (no longer available for causal reads)", application_name); + CausalReadsBeginStall(causal_reads_last_lease); + + /* + * We set the state to a lower level _after_ beginning the stall, + * otherwise there would be a tiny window where commits could return + * without observing the stall. + */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* + * We are shutting down because we received a goodbye message from the + * walreceiver. + */ +static void +PrepareCleanExit(void) +{ + if (MyWalSnd->causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + /* + * The standby is shutting down, so it won't be running any more + * transactions. It is therefore safe to stop waiting for it, and no + * stall is necessary. + */ + elog(LOG, "standby \"%s\" is leaving (no longer available for causal reads)", application_name); + + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* * Clean up after an error. * * WAL sender processes don't use transactions like regular backends do. @@ -264,7 +326,10 @@ WalSndErrorCleanup(void) replication_active = false; if (walsender_ready_to_stop) + { + PrepareUncleanExit(); proc_exit(0); + } /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); @@ -276,6 +341,8 @@ WalSndErrorCleanup(void) static void WalSndShutdown(void) { + PrepareUncleanExit(); + /* * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. @@ -1386,6 +1453,7 @@ ProcessRepliesIfAny(void) if (r < 0) { /* unexpected error or EOF */ + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1402,6 +1470,7 @@ ProcessRepliesIfAny(void) resetStringInfo(&reply_message); if (pq_getmessage(&reply_message, 0)) { + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1451,6 +1520,7 @@ ProcessRepliesIfAny(void) * 'X' means that the standby is closing down the socket. */ case 'X': + PrepareCleanExit(); proc_exit(0); default: @@ -1543,15 +1613,29 @@ ProcessStandbyReplyMessage(void) XLogRecPtr writePtr, flushPtr, applyPtr; + int applyLagMs; bool replyRequested; + TimestampTz now = GetCurrentTimestamp(); + TimestampTz applyTimestamp; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); replyRequested = pq_getmsgbyte(&reply_message); + /* Compute the apply lag in milliseconds. */ + if (applyTimestamp == 0) + applyLagMs = -1; + else +#ifdef HAVE_INT64_TIMESTAMP + applyLagMs = (now - applyTimestamp) / 1000; +#else + applyLagMs = (now - applyTimestamp) * 1000.0; +#endif + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, @@ -1568,16 +1652,116 @@ ProcessStandbyReplyMessage(void) */ { WalSnd *walsnd = MyWalSnd; + WalSndCausalReadsState causal_reads_state = walsnd->causal_reads_state; + bool causal_reads_state_changed = false; + bool causal_reads_set_joining_until = false; + + /* + * Handle causal reads state transitions, if a causal_reads_timeout is + * configured, this standby is listed in causal_reads_standby_names, + * and we are a primary database (not a cascading standby). + */ + if (causal_reads_timeout != 0 && + am_potential_causal_reads_standby && + !am_cascading_walsender) + { + if (applyLagMs >= 0 && applyLagMs < causal_reads_timeout) + { + if (causal_reads_state == WALSNDCRSTATE_UNAVAILABLE) + { + /* + * The standby is applying fast enough. We can't grant a + * lease yet though, we need to wait for everything that + * was committed while this standby was unavailable to be + * applied first. We move to joining state while we wait + * for the standby to catch up. + */ + causal_reads_state = WALSNDCRSTATE_JOINING; + causal_reads_set_joining_until = true; + causal_reads_state_changed = true; + } + else if (causal_reads_state == WALSNDCRSTATE_JOINING && + applyPtr >= causal_reads_joining_until) + { + /* + * The standby has applied everything committed before we + * reached joining state, and has been waiting for remote + * apply on this standby while it's been in joining state, + * so it is safe to move to available state and send a + * lease. + */ + causal_reads_state = WALSNDCRSTATE_AVAILABLE; + causal_reads_state_changed = true; + } + } + else + { + if (causal_reads_state == WALSNDCRSTATE_AVAILABLE) + { + causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + causal_reads_state_changed = true; + /* + * We are dropping a causal reads available standby, so we + * mustn't let any commit command that is waiting in + * CausalReadsWaitForLSN return until we are sure that the + * standby definitely knows that it's not available and + * starts raising errors for causal_reads transactions. + * TODO: We could just wait until the standby acks that + * its lease has been cancelled, and start numbering + * keepalives and sending the number back in replies, so + * we know it's acking the right message; then lagging + * standbys would be less disruptive, but for now we just + * wait for the lease to expire, as we do when we lose + * contact with a standby, for the sake of simplicity. + */ + CausalReadsBeginStall(causal_reads_last_lease); + } + else if (causal_reads_state == WALSNDCRSTATE_JOINING) + { + /* + * Dropping a joining standby doesn't require a stall, + * because the standby doesn't think it's available, so + * it's already raising the error for causal_reads + * transactions. + */ + causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; + causal_reads_state_changed = true; + } + } + } SpinLockAcquire(&walsnd->mutex); walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; + walsnd->applyLagMs = applyLagMs; + walsnd->causal_reads_state = causal_reads_state; SpinLockRelease(&walsnd->mutex); + + if (causal_reads_set_joining_until) + { + /* + * Record the end of the primary's WAL at some arbitrary point + * observed _after_ we moved to joining state (so that causal + * reads commits start waiting, closing a race). The standby + * won't become available until it has replayed up to here. + */ + causal_reads_joining_until = GetFlushRecPtr(); + } + + if (causal_reads_state_changed) + { + WalSndKeepalive(true); + elog(LOG, "standby \"%s\" is %s", application_name, + causal_reads_state == WALSNDCRSTATE_UNAVAILABLE ? "unavailable for causal reads" : + causal_reads_state == WALSNDCRSTATE_JOINING ? "joining as a causal reads standby..." : + causal_reads_state == WALSNDCRSTATE_AVAILABLE ? "available for causal reads" : + "UNKNOWN"); + } } if (!am_cascading_walsender) - SyncRepReleaseWaiters(); + SyncRepReleaseWaiters(MyWalSnd->causal_reads_state >= WALSNDCRSTATE_JOINING); /* * Advance our local xmin horizon when the client confirmed a flush. @@ -1724,27 +1908,34 @@ WalSndComputeSleeptime(TimestampTz now) { long sleeptime = 10000; /* 10 s */ - if (wal_sender_timeout > 0 && last_reply_timestamp > 0) + if ((wal_sender_timeout > 0 || causal_reads_timeout > 0) && last_reply_timestamp > 0) { TimestampTz wakeup_time; long sec_to_timeout; int microsec_to_timeout; - /* - * At the latest stop sleeping once wal_sender_timeout has been - * reached. - */ - wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); - - /* - * If no ping has been sent yet, wakeup when it's time to do so. - * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of - * the timeout passed without a response. - */ - if (!waiting_for_ping_response) + if (causal_reads_timeout != 0) + wakeup_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + causal_reads_timeout / + CAUSAL_READS_KEEPALIVE_RATIO); + else + { + /* + * At the latest stop sleeping once wal_sender_timeout has been + * reached. + */ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + wal_sender_timeout); + + /* + * If no ping has been sent yet, wakeup when it's time to do so. + * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of + * the timeout passed without a response. + */ + if (!waiting_for_ping_response) + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + } /* Compute relative time until wakeup. */ TimestampDifference(now, wakeup_time, @@ -1765,15 +1956,28 @@ static void WalSndCheckTimeOut(TimestampTz now) { TimestampTz timeout; + int allowed_time; /* don't bail out if we're doing something that doesn't require timeouts */ if (last_reply_timestamp <= 0) return; + /* + * If a causal_reads_timeout is configured, it is used instead of + * wal_sender_timeout. Ideally we'd use causal_reads_timeout / 2 + + * allowance for network latency, but since walreceiver can become quite + * bogged down fsyncing WAL we allow more tolerance. (This could be + * tightened up once standbys hand writing off to the WAL writer). + */ + if (causal_reads_timeout != 0) + allowed_time = causal_reads_timeout; + else + allowed_time = wal_sender_timeout; + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); + allowed_time); - if (wal_sender_timeout > 0 && now >= timeout) + if (allowed_time > 0 && now >= timeout) { /* * Since typically expiration of replication timeout means @@ -1806,6 +2010,9 @@ WalSndLoop(WalSndSendDataCallback send_data) last_reply_timestamp = GetCurrentTimestamp(); waiting_for_ping_response = false; + /* Check if we are managing potential causal_reads standby. */ + am_potential_causal_reads_standby = CausalReadsPotentialStandby(); + /* * Loop until we reach the end of this timeline or the client requests to * stop streaming. @@ -1966,6 +2173,7 @@ InitWalSenderSlot(void) walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->state = WALSNDSTATE_STARTUP; + walsnd->causal_reads_state = WALSNDCRSTATE_UNAVAILABLE; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ @@ -2735,6 +2943,24 @@ WalSndGetStateString(WalSndState state) return "UNKNOWN"; } +/* + * Return a string constant representing the causal reads state. This is used + * in system views, and should *not* be translated. + */ +static const char * +WalSndGetCausalReadsStateString(WalSndCausalReadsState causal_reads_state) +{ + switch (causal_reads_state) + { + case WALSNDCRSTATE_UNAVAILABLE: + return "unavailable"; + case WALSNDCRSTATE_JOINING: + return "joining"; + case WALSNDCRSTATE_AVAILABLE: + return "available"; + } + return "UNKNOWN"; +} /* * Returns activity of walsenders, including pids and xlog locations sent to @@ -2743,7 +2969,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 8 +#define PG_STAT_GET_WAL_SENDERS_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2791,8 +3017,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int applyLagMs; int priority; WalSndState state; + WalSndCausalReadsState causalReadsState; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2802,9 +3030,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockAcquire(&walsnd->mutex); sentPtr = walsnd->sentPtr; state = walsnd->state; + causalReadsState = walsnd->causal_reads_state; write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + applyLagMs = walsnd->applyLagMs; priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); @@ -2839,6 +3069,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[5] = true; values[5] = LSNGetDatum(apply); + if (applyLagMs < 0) + nulls[6] = true; + else + { + Interval *applyLagInterval = palloc(sizeof(Interval)); + + applyLagInterval->month = 0; + applyLagInterval->day = 0; +#ifdef HAVE_INT64_TIMESTAMP + applyLagInterval->time = applyLagMs * 1000; +#else + applyLagInterval->time = applyLagMs / 1000.0; +#endif + nulls[6] = false; + values[6] = IntervalPGetDatum(applyLagInterval); + } + /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an @@ -2846,18 +3093,21 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; - values[6] = Int32GetDatum(priority); + values[7] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely * informational, not different from priority. */ if (priority == 0) - values[7] = CStringGetTextDatum("async"); + values[8] = CStringGetTextDatum("async"); else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); + values[8] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + values[8] = CStringGetTextDatum("potential"); + + values[9] = + CStringGetTextDatum(WalSndGetCausalReadsStateString(causalReadsState)); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -2877,14 +3127,52 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) static void WalSndKeepalive(bool requestReply) { + TimestampTz now; + TimestampTz causal_reads_lease; + elog(DEBUG2, "sending replication keepalive"); + /* + * If the walsender currently deems the standby to be available for causal + * reads, then it grants a causal reads lease. The lease authorizes the + * standby to consider itself available for causal reads until a short + * time in the future. The primary promises to uphold the causal reads + * guarantee until that time, by stalling commits until the the lease has + * expired if necessary. + */ + now = GetCurrentTimestamp(); + if (MyWalSnd->causal_reads_state < WALSNDCRSTATE_AVAILABLE) + causal_reads_lease = 0; /* Not available, no lease granted. */ + else + { + /* + * Since this timestamp is being sent to the standby where it will be + * compared against a time generated by the standby's system clock, we + * must consider clock skew. First, we decide on a maximum tolerable + * difference between system clocks. If the primary's clock is ahead + * of the standby's by more than this, then all bets are off (the + * standby could falsely believe it has a valid lease). If the + * primary's clock is behind the standby's by more than this, then the + * standby will err the other way and generate spurious errors in + * causal_reads mode. Rather than having a separate GUC for this, we + * derive it from causal_reads_timeout. + */ + int max_clock_skew = causal_reads_timeout / CAUSAL_READS_CLOCK_SKEW_RATIO; + + /* Compute and remember the expiry time of the lease we're granting. */ + causal_reads_last_lease = TimestampTzPlusMilliseconds(now, causal_reads_timeout); + /* The version we'll send to the standby is adjusted to tolerate clock skew. */ + causal_reads_lease = + TimestampTzPlusMilliseconds(causal_reads_last_lease, -max_clock_skew); + } + /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); pq_sendint64(&output_message, sentPtr); - pq_sendint64(&output_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(now)); pq_sendbyte(&output_message, requestReply ? 1 : 0); + pq_sendint64(&output_message, TimestampTzToIntegerTimestamp(causal_reads_lease)); /* ... and send it wrapped in CopyData */ pq_putmessage_noblock('d', output_message.data, output_message.len); @@ -2902,23 +3190,32 @@ WalSndKeepaliveIfNecessary(TimestampTz now) * Don't send keepalive messages if timeouts are globally disabled or * we're doing something not partaking in timeouts. */ - if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) + if ((wal_sender_timeout <= 0 && causal_reads_timeout == 0) || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + if (waiting_for_ping_response && causal_reads_timeout == 0) return; /* * If half of wal_sender_timeout has lapsed without receiving any reply * from the standby, send a keep-alive message to the standby requesting * an immediate reply. + * + * If causal_reads_timeout has been configured, use it to control + * keepalive intervals rather than wal_sender_timeout. */ - ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + if (causal_reads_timeout != 0) + ping_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + causal_reads_timeout / + CAUSAL_READS_KEEPALIVE_RATIO); + else + ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); if (now >= ping_time) { WalSndKeepalive(true); waiting_for_ping_response = true; + last_keepalive_timestamp = now; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 1525d2a..6ff111f 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1611,6 +1611,20 @@ IntegerTimestampToTimestampTz(int64 timestamp) #endif /* + * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format + * + * When compiled with --enable-integer-datetimes, this is implemented as a + * no-op macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +int64 +TimestampTzToIntegerTimestamp(TimestampTz timestamp) +{ + return timestamp * 1000000; +} +#endif + +/* * TimestampDifference -- convert the difference between two timestamps * into integer seconds and microseconds * diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt index 04c9c00..d4bf0c0 100644 --- a/src/backend/utils/errcodes.txt +++ b/src/backend/utils/errcodes.txt @@ -302,6 +302,7 @@ Section: Class 40 - Transaction Rollback 40001 E ERRCODE_T_R_SERIALIZATION_FAILURE serialization_failure 40003 E ERRCODE_T_R_STATEMENT_COMPLETION_UNKNOWN statement_completion_unknown 40P01 E ERRCODE_T_R_DEADLOCK_DETECTED deadlock_detected +40P02 E ERRCODE_T_R_CAUSAL_READS_NOT_AVAILABLE causal_reads_not_available Section: Class 42 - Syntax Error or Access Rule Violation diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ea5a09a..fb91cad 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = { }; /* - * Although only "on", "off", "remote_write", and "local" are documented, we - * accept all the likely variants of "on" and "off". + * Although only "on", "off", "remote_apply", "remote_write", and "local" are + * documented, we accept all the likely variants of "on" and "off". */ static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, + {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, @@ -1632,6 +1633,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"causal_reads", PGC_USERSET, REPLICATION_STANDBY, + gettext_noop("Enables causal reads."), + NULL + }, + &causal_reads, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -1790,6 +1801,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"causal_reads_timeout", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the maximum apply lag before causal reads standbys are no longer available."), + NULL, + GUC_UNIT_MS + }, + &causal_reads_timeout, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL @@ -3386,7 +3408,18 @@ static struct config_string ConfigureNamesString[] = }, &SyncRepStandbyNames, "", - check_synchronous_standby_names, NULL, NULL + check_standby_names, NULL, NULL + }, + + { + {"causal_reads_standby_names", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("List of names of potential causal reads standbys."), + NULL, + GUC_LIST_INPUT + }, + &causal_reads_standby_names, + "*", + check_standby_names, NULL, NULL }, { diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 63e908d..b1455e1 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -46,8 +46,11 @@ #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" #include "lib/pairingheap.h" #include "miscadmin.h" +#include "replication/syncrep.h" +#include "replication/walreceiver.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -209,6 +212,18 @@ GetTransactionSnapshot(void) "cannot take query snapshot during a parallel operation"); /* + * In causal_reads mode on a standby, check if we have definitely + * applied WAL for any COMMIT that returned successfully on the + * primary. + * + * TODO: Machine readable error code? + */ + if (causal_reads && RecoveryInProgress() && !WalRcvCausalReadsAvailable()) + ereport(ERROR, + (errcode(ERRCODE_T_R_CAUSAL_READS_NOT_AVAILABLE), + errmsg("standby is not available for causal reads"))); + + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must * make a copy of it rather than returning CurrentSnapshotData diff --git a/src/include/access/xact.h b/src/include/access/xact.h index ebeb582..4037dc6 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -60,7 +60,11 @@ typedef enum SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote * write */ - SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_APPLY, /* wait for local flush and remote + * apply */ + SYNCHRONOUS_COMMIT_CONSISTENT_APPLY /* wait for local flusha and remote + apply with causal consistency */ } SyncCommitLevel; /* Define the default setting for synchonous_commit */ @@ -144,10 +148,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, * EOXact... routines which run at the end of the original transaction * completion. */ +#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK (1U << 29) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) /* Access macros for above flags */ +#define XactCompletionSyncApplyFeedback(xinfo) \ + (!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK)) #define XactCompletionRelcacheInitFileInval(xinfo) \ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)) #define XactCompletionForceSyncCommit(xinfo) \ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index ecd30ce..efb9719 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -236,6 +236,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); +extern void SetXLogReplayTimestamp(TimestampTz timestamp); +extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn); +extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn); extern bool RecoveryIsPaused(void); extern void SetRecoveryPause(bool recoveryPause); extern TimestampTz GetLatestXTime(void); @@ -268,6 +271,8 @@ extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogRequestWalReceiverReply(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 62b9125..fa9b184 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2710,7 +2710,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 0 f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f f f t s r 1 0 2249 "23" "{23,26,23,26,25,25,25,16,1184,1184,1184,1184,869,25,23,28,28,16,25,25,23,16,25}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,waiting,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,ssl,sslversion,sslcipher,sslbits,sslcompression,sslclientdn}" _null_ _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25,25}" "{o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state,causal_reads_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 96e059b..76a4ee9 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -23,14 +23,34 @@ #define SYNC_REP_NO_WAIT -1 #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 +#define SYNC_REP_WAIT_CAUSAL_READS_APPLY 3 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 4 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* + * ratio of causal_read_timeout to max_clock_skew (4 means than the maximum + * tolerated clock difference between primary and standbys using causal_reads + * is 1/4 of causal_reads_timeout) + */ +#define CAUSAL_READS_CLOCK_SKEW_RATIO 4 + +/* + * ratio of causal_reads_timeout to keepalive time (2 means that the effective + * keepalive time is 1/2 of the causal_reads_timeout GUC when it is non-zero) + */ +#define CAUSAL_READS_KEEPALIVE_RATIO 2 + +/* GUC variables */ +extern int causal_reads_timeout; +extern bool causal_reads; +extern char *causal_reads_standby_names; + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; @@ -42,16 +62,23 @@ extern void SyncRepCleanupAtProcExit(void); /* called by wal sender */ extern void SyncRepInitConfig(void); -extern void SyncRepReleaseWaiters(void); +extern void SyncRepReleaseWaiters(bool walsender_cr_available_or_joining); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); +/* called by user backend (xact.c) */ +extern void CausalReadsWaitForLSN(XLogRecPtr XactCommitLSN); + +/* called by wal sender */ +extern void CausalReadsBeginStall(TimestampTz lease_expiry_time); +extern bool CausalReadsPotentialStandby(void); + /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; extern struct WalSnd *SyncRepGetSynchronousStandby(void); -extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); +extern bool check_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); #endif /* _SYNCREP_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 6eacb09..7f83934 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -80,6 +80,13 @@ typedef struct TimeLineID receivedTLI; /* + * causalReadsLease is the time until which the primary has authorized + * this standby to consider itself available for causal_reads mode, or 0 + * for not authorized. + */ + TimestampTz causalReadsLease; + + /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of * receivedUpto before the last flush to disk. Startup process can use @@ -162,5 +169,8 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); +extern void WalRcvWakeup(void); + +extern bool WalRcvCausalReadsAvailable(void); #endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7794aa5..81a2776 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -27,6 +27,13 @@ typedef enum WalSndState WALSNDSTATE_STREAMING } WalSndState; +typedef enum WalSndCausalReadsState +{ + WALSNDCRSTATE_UNAVAILABLE = 0, + WALSNDCRSTATE_JOINING, + WALSNDCRSTATE_AVAILABLE +} WalSndCausalReadsState; + /* * Each walsender has a WalSnd struct in shared memory. */ @@ -34,6 +41,7 @@ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ + WalSndCausalReadsState causal_reads_state; /* the walsender's causal reads state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be * reloaded? */ @@ -46,6 +54,7 @@ typedef struct WalSnd XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int applyLagMs; /* Protects shared variables shown above. */ slock_t mutex; @@ -88,6 +97,12 @@ typedef struct */ bool sync_standbys_defined; + /* + * Until when must commits in causal_reads stall? This is used to wait + * for causal reads leases to expire. + */ + TimestampTz stall_causal_reads_until; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index fbead3a..297e151 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -227,9 +227,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, #ifndef HAVE_INT64_TIMESTAMP extern int64 GetCurrentIntegerTimestamp(void); extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp); +extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp); #else #define GetCurrentIntegerTimestamp() GetCurrentTimestamp() #define IntegerTimestampToTimestampTz(timestamp) (timestamp) +#define TimestampTzToIntegerTimestamp(timestamp) (timestamp) #endif extern TimestampTz time_t_to_timestamptz(pg_time_t tm);