*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2586,2597 **** include_dir 'conf.d' Specifies a comma-separated list of standby names that can support synchronous replication, as described in . ! At any one time there will be at most one active synchronous standby; ! transactions waiting for commit will be allowed to proceed after ! this standby server confirms receipt of their data. ! The synchronous standby will be the first standby named in this list ! that is both currently connected and streaming data in real-time ! (as shown by a state of streaming in the pg_stat_replication view). Other standby servers appearing later in this list represent potential --- 2586,2598 ---- Specifies a comma-separated list of standby names that can support synchronous replication, as described in . ! At any one time there will be at a number of active synchronous standbys ! defined by , transactions ! waiting for commit will be allowed to proceed after those standby ! servers confirm receipt of their data. The synchronous standbys will be ! the first entries named in this list that are both currently connected ! and streaming data in real-time (as shown by a state of ! streaming in the pg_stat_replication view). Other standby servers appearing later in this list represent potential *************** *** 2627,2632 **** include_dir 'conf.d' --- 2628,2685 ---- + + synchronous_standby_num (integer) + + synchronous_standby_num configuration parameter + + + + + Specifies the number of standbys that support + synchronous replication. + + + Default value is -1. In this case, if + is empty all the + standby nodes are considered asynchronous. If there is at least + one node name defined, process will wait for one synchronous + standby listed. + + + When this parameter is set to 0, all the standby + nodes will be considered as asynchronous. + + + This parameter value cannot be higher than + . + + + Are considered as synchronous the first elements of + in number of + that are + connected. If there are more elements than the number of stansbys + required, all the additional standbys are potential synchronous + candidates. If is + empty, all the standbys are asynchronous. If it is set to the + special entry *, a number of standbys equal to + with the highest + pritority are elected as being synchronous. + + + Server will wait for commit confirmation from + standbys, meaning that + if has less elements + than the number of standbys required, server will wait indefinitely + for a commit confirmation. + + + This parameter can only be set in the postgresql.conf + file or on the server command line. + + + + vacuum_defer_cleanup_age (integer) *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 1081,1092 **** primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless wal_receiver_status_interval is set to zero on the standby. ! If the standby is the first matching standby, as specified in ! synchronous_standby_names on the primary, the reply ! messages from that standby will be used to wake users waiting for ! confirmation that the commit record has been received. These parameters ! allow the administrator to specify which standby servers should be ! synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. --- 1081,1092 ---- WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless wal_receiver_status_interval is set to zero on the standby. ! If the standby is the first synchronous_standby_num matching ! standbys, as specified in synchronous_standby_names on the ! primary, the reply messages from that standby will be used to wake users ! waiting for confirmation that the commit record has been received. These ! parameters allow the administrator to specify which standby servers should ! be synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. *************** *** 1167,1177 **** primary_slot_name = 'node_a_slot' The best solution for avoiding data loss is to ensure you don't lose ! your last remaining synchronous standby. This can be achieved by naming multiple potential synchronous standbys using synchronous_standby_names. ! The first named standby will be used as the synchronous standby. Standbys ! listed after this will take over the role of synchronous standby if the ! first one should fail. --- 1167,1177 ---- The best solution for avoiding data loss is to ensure you don't lose ! your last remaining synchronous standbys. This can be achieved by naming multiple potential synchronous standbys using synchronous_standby_names. ! The first synchronous_standby_num named standbys will be used as ! the synchronous standbys. Standbys listed after this will take over the role ! of synchronous standby if the first one should fail. *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 5,11 **** * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the sync standby. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming --- 5,11 ---- * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming *************** *** 29,39 **** * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * ! * In 9.1 we support only a single synchronous standby, chosen from a ! * priority list of synchronous_standby_names. Before it can become the ! * synchronous standby it must have caught up with the primary; that may ! * take some time. Once caught up, the current highest priority standby ! * will release waiters from the queue. * * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group * --- 29,50 ---- * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * ! * In 9.4 we support the possibility to have multiple synchronous standbys, ! * whose number is defined by synchronous_standby_num, chosen from a ! * priority list of synchronous_standby_names. Before one standby can ! * become a synchronous standby it must have caught up with the primary; ! * that may take some time. ! * ! * Waiters will be released from the queue once the number of standbys ! * defined by synchronous_standby_num have caught. ! * ! * There are special cases though. If synchronous_standby_num is set to 0, ! * all the nodes are considered as asynchronous and fastpath is out to ! * leave this portion of the code as soon as possible. If it is set to ! * -1, process will wait for one node to catch up with the primary only ! * if synchronous_standby_names is non-empty. This is compatible with ! * what has been defined in 9.1 as -1 is the default value of ! * synchronous_standby_num. * * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group * *************** *** 59,67 **** /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; #define SyncStandbysDefined() \ ! (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') static bool announce_next_takeover = true; --- 70,87 ---- /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; + int synchronous_standby_num = -1; + /* + * Synchronous standbys are defined if there is more than + * one synchronous standby wanted. In default case, the list + * of standbys defined needs to be not empty. + */ #define SyncStandbysDefined() \ ! (synchronous_standby_num > 0 || \ ! (synchronous_standby_num == -1 && \ ! SyncRepStandbyNames != NULL && \ ! SyncRepStandbyNames[0] != '\0')) static bool announce_next_takeover = true; *************** *** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; --- 226,232 ---- ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; *************** *** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); SyncRepCancelWait(); break; } --- 243,249 ---- QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); SyncRepCancelWait(); break; } *************** *** 357,365 **** SyncRepInitConfig(void) } } /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standby-releases-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. --- 377,477 ---- } } + + /* + * Obtain a palloc'd array containing positions of standbys currently + * considered as synchronous. Caller is responsible for freeing the + * data obtained. + * Callers of this function should as well take a necessary lock on + * SyncRepLock. + */ + int * + SyncRepGetSynchronousNodes(int *num_sync) + { + int *sync_nodes; + int priority = 0; + int i; + int allowed_sync_nodes = synchronous_standby_num; + + /* Initialize */ + *num_sync = 0; + + /* + * Determine the number of nodes that can be synchronized. + * synchronous_standby_num can have the special value -1, + * meaning that only one node with the highest non-null priority + * can be considered as synchronous. + */ + if (synchronous_standby_num == -1) + allowed_sync_nodes = 1; + + /* + * Make enough room, there is a maximum of max_wal_senders synchronous + * nodes as we scan though WAL senders here. + */ + sync_nodes = (int *) palloc(max_wal_senders * sizeof(int)); + + for (i = 0; i < max_wal_senders; i++) + { + /* Use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* Process to next if not active */ + if (walsnd->pid == 0) + continue; + + /* Process to next if not streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Process to next one if asynchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Process to next one if priority conditions not satisfied */ + if (priority != 0 && + priority <= walsnd->sync_standby_priority && + *num_sync == allowed_sync_nodes) + continue; + + /* Process to next one if flush position is invalid */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * We have a potential synchronous candidate, add it to the + * list of nodes already present or evict the node with highest + * priority found until now. + */ + if (*num_sync == allowed_sync_nodes) + { + int j; + for (j = 0; j < *num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_nodes[j]]; + if (walsndloc->sync_standby_priority == priority) + { + sync_nodes[j] = i; + break; + } + } + } + else + { + sync_nodes[*num_sync] = i; + (*num_sync)++; + } + + /* Update priority for next tracking */ + priority = walsnd->sync_standby_priority; + } + + return sync_nodes; + } + /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standbys-release-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. *************** *** 368,378 **** void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! volatile WalSnd *syncWalSnd = NULL; int numwrite = 0; int numflush = 0; ! int priority = 0; int i; /* * If this WALSender is serving a standby that is not on the list of --- 480,493 ---- SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! int *sync_standbys; int numwrite = 0; int numflush = 0; ! int num_sync = 0; int i; + bool found = false; + XLogRecPtr min_write_pos; + XLogRecPtr min_flush_pos; /* * If this WALSender is serving a standby that is not on the list of *************** *** 388,454 **** SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standby. If you change this, also ! * change pg_stat_get_wal_senders(). */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); ! for (i = 0; i < max_wal_senders; i++) { ! /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = &walsndctl->walsnds[i]; ! ! if (walsnd->pid != 0 && ! walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) { ! priority = walsnd->sync_standby_priority; ! syncWalSnd = walsnd; } } /* ! * We should have found ourselves at least. */ ! Assert(syncWalSnd); /* ! * If we aren't managing the highest priority standby then just leave. */ ! if (syncWalSnd != MyWalSnd) { LWLockRelease(SyncRepLock); ! announce_next_takeover = true; return; } /* * Set the lsn first so that when we wake backends they will release up to ! * this location. */ ! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) { ! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } ! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) { ! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", ! numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, ! numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now the sync standby. */ if (announce_next_takeover) { --- 503,601 ---- /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); ! /* ! * We should have found ourselves at least, except if it is not expected ! * to find any synchronous nodes. ! */ ! Assert(num_sync > 0); ! ! /* ! * If we aren't managing one of the standbys with highest priority ! * then just leave. ! */ ! for (i = 0; i < num_sync; i++) { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! if (walsndloc == MyWalSnd) { ! found = true; ! break; } } /* ! * We are definitely not one of the chosen... But we could by ! * taking the next takeover. */ ! if (!found) ! { ! LWLockRelease(SyncRepLock); ! pfree(sync_standbys); ! announce_next_takeover = true; ! return; ! } /* ! * Even if we are one of the chosen standbys, leave if there ! * are less synchronous standbys in waiting state than what is ! * expected by the user. */ ! if (num_sync < synchronous_standby_num && ! synchronous_standby_num != -1) { LWLockRelease(SyncRepLock); ! pfree(sync_standbys); return; } /* * Set the lsn first so that when we wake backends they will release up to ! * this location, of course only if all the standbys found as synchronous ! * have already reached that point, so first find what are the oldest ! * write and flush positions of all the standbys considered in sync... */ ! min_write_pos = MyWalSnd->write; ! min_flush_pos = MyWalSnd->flush; ! for (i = 0; i < num_sync; i++) ! { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! ! SpinLockAcquire(&walsndloc->mutex); ! if (min_write_pos > walsndloc->write) ! min_write_pos = walsndloc->write; ! if (min_flush_pos > walsndloc->flush) ! min_flush_pos = walsndloc->flush; ! SpinLockRelease(&walsndloc->mutex); ! } ! ! /* ... And now update if necessary */ ! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos) { ! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } ! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos) { ! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", ! numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32), ! (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE], ! numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32), ! (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]); /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now a sync standby. */ if (announce_next_takeover) { *************** *** 457,462 **** SyncRepReleaseWaiters(void) --- 604,612 ---- (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + pfree(sync_standbys); } /* *************** *** 483,488 **** SyncRepGetStandbyPriority(void) --- 633,642 ---- if (am_cascading_walsender) return 0; + /* If no synchronous nodes allowed, no cake for this WAL sender */ + if (synchronous_standby_num == 0) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 2735,2742 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int priority = 0; ! int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ --- 2735,2742 ---- MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int *sync_standbys; ! int num_sync = 0; int i; /* check to see if caller supports us returning a tuplestore */ *************** *** 2767,2802 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. This code must match the code in SyncRepReleaseWaiters(). */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! if (walsnd->pid != 0) ! { ! /* ! * Treat a standby such as a pg_basebackup background process ! * which always returns an invalid flush location, as an ! * asynchronous standby. ! */ ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; ! ! if (walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) ! { ! priority = walsnd->sync_standby_priority; ! sync_standby = i; ! } ! } } LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) --- 2767,2789 ---- /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); + + /* Get first the priorities on each standby as long as we hold a lock */ for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; } + + /* Obtain list of synchronous standbys */ + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) *************** *** 2858,2872 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); - else if (i == sync_standby) - values[7] = CStringGetTextDatum("sync"); else ! values[7] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); --- 2845,2876 ---- */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); else ! { ! int j; ! bool found = false; ! ! for (j = 0; j < num_sync; j++) ! { ! /* Found that this node is one in sync */ ! if (i == sync_standbys[j]) ! { ! values[7] = CStringGetTextDatum("sync"); ! found = true; ! break; ! } ! } ! if (!found) ! values[7] = CStringGetTextDatum("potential"); ! } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + + /* Cleanup */ pfree(sync_priority); + pfree(sync_standbys); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 2548,2553 **** static struct config_int ConfigureNamesInt[] = --- 2548,2563 ---- NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number of potential synchronous standbys."), + NULL + }, + &synchronous_standby_num, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 235,240 **** --- 235,241 ---- #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all + #synchronous_standby_num = -1 # number of standbys servers using sync rep #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 33,38 **** --- 33,39 ---- /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; + extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); *************** *** 49,54 **** extern void SyncRepUpdateSyncStandbysDefined(void); --- 50,56 ---- /* called by various procs */ extern int SyncRepWakeQueue(bool all, int mode); + extern int *SyncRepGetSynchronousNodes(int *num_sync); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);