diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index 77667bdebd..72c5390695 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory. + diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index 8acdff1393..3733ad960b 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -200,6 +200,7 @@ &update; &vacuum; &values; + &waitlsn; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index aa9ee5a0dd..9696b5dbb5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,6 +39,7 @@ #include "catalog/pg_control.h" #include "catalog/pg_database.h" #include "commands/tablespace.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "port/atomics.h" @@ -143,6 +144,9 @@ const struct config_enum_entry sync_method_options[] = { {NULL, 0, false} }; +/* GUC variable */ +int count_waitlsn = 10; +int interval_waitlsn = 100; /* * Although only "on", "off", and "always" are documented, @@ -6781,6 +6785,8 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + TimestampTz time_waitlsn = GetCurrentTimestamp(); + int counter_waitlsn = 0; InRedo = true; @@ -6998,6 +7004,17 @@ StartupXLOG(void) break; } + /* + * After update lastReplayedEndRecPtr set Latches in SHMEM array + */ + if (counter_waitlsn % count_waitlsn == 0 + || TimestampDifferenceExceeds(time_waitlsn,GetCurrentTimestamp(),interval_waitlsn)) + { + WaitLSNSetLatch(); + time_waitlsn = GetCurrentTimestamp(); + } + counter_waitlsn++; + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 6b3742c0a0..091cbe22a0 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ policy.o portalcmds.o prepare.o proclang.o \ schemacmds.o seclabel.o sequence.o tablecmds.o tablespace.o trigger.o \ tsearchcmds.o typecmds.o user.o vacuum.o vacuumlazy.o \ - variable.o view.o + variable.o view.o waitlsn.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 716f1c3318..9ad3275131 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -139,7 +139,6 @@ #include "utils/ps_status.h" #include "utils/timestamp.h" - /* * Maximum size of a NOTIFY payload, including terminating NULL. This * must be kept small enough so that a notification message fits on one diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c new file mode 100644 index 0000000000..f75507ee4e --- /dev/null +++ b/src/backend/commands/waitlsn.c @@ -0,0 +1,242 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.c + * WaitLSN statment: WAITLSN + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 2016, Regents of PostgresPro + * + * IDENTIFICATION + * src/backend/commands/waitlsn.c + * + *------------------------------------------------------------------------- + */ + +/*------------------------------------------------------------------------- + * Wait for LSN been replayed on slave as of 9.5: + * README + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "pgstat.h" +#include "utils/pg_lsn.h" +#include "storage/latch.h" +#include "miscadmin.h" +#include "storage/spin.h" +#include "storage/backendid.h" +#include "access/xact.h" +#include "storage/shmem.h" +#include "storage/ipc.h" +#include "access/xlog_fn.h" +#include "utils/timestamp.h" +#include "storage/pmsignal.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "commands/waitlsn.h" +#include "storage/proc.h" +#include "access/transam.h" +#include "funcapi.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" + +/* Latches Own-DisownLatch and AbortCаllBack */ +static uint32 WaitLSNShmemSize(void); +static void WLDisownLatchAbort(XactEvent event, void *arg); +static void WLOwnLatch(void); +static void WLDisownLatch(void); + +void _PG_init(void); + +/* Shared memory structures */ +typedef struct +{ + int pid; + volatile slock_t slock; + Latch latch; +} BIDLatch; + +typedef struct +{ + char dummy; + int backend_maxid; + BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER]; +} GlobState; + +static volatile GlobState *state; +bool is_latch_owned = false; + +/* Take Latch for current backend at the begining of WAITLSN */ +static void +WLOwnLatch(void) +{ + SpinLockAcquire(&state->l_arr[MyBackendId].slock); + OwnLatch(&state->l_arr[MyBackendId].latch); + is_latch_owned = true; + + if (state->backend_maxid < MyBackendId) + state->backend_maxid = MyBackendId; + + state->l_arr[MyBackendId].pid = MyProcPid; + SpinLockRelease(&state->l_arr[MyBackendId].slock); +} + +/* Release Latch for current backend at the end of WAITLSN */ +static void +WLDisownLatch(void) +{ + int i; + SpinLockAcquire(&state->l_arr[MyBackendId].slock); + DisownLatch(&state->l_arr[MyBackendId].latch); + is_latch_owned = false; + state->l_arr[MyBackendId].pid = 0; + + if (state->backend_maxid == MyBackendId) + for (i = (MaxConnections+1); i >=0; i--) + if (state->l_arr[i].pid != 0) + { + state->backend_maxid = i; + break; + } + + SpinLockRelease(&state->l_arr[MyBackendId].slock); +} + +/* CallBack function on abort*/ +static void +WLDisownLatchAbort(XactEvent event, void *arg) +{ + if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT || + event == XACT_EVENT_ABORT)) + { + WLDisownLatch(); + } +} + +/* Module load callback */ +void +_PG_init(void) +{ + if (!IsUnderPostmaster) + RegisterXactCallback(WLDisownLatchAbort, NULL); +} + +/* Get size of shared memory to room GlobState */ +static uint32 +WaitLSNShmemSize(void) +{ + return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1); +} + +/* Init array of Latches in shared memory */ +void +WaitLSNShmemInit(void) +{ + bool found; + uint i; + + state = (GlobState *) ShmemInitStruct("pg_wait_lsn", + WaitLSNShmemSize(), + &found); + if (!found) + { + for (i = 0; i < (MaxConnections+1); i++) + { + state->l_arr[i].pid = 0; + SpinLockInit(&state->l_arr[i].slock); + InitSharedLatch(&state->l_arr[i].latch); + } + state->backend_maxid = 0; + } +} + +/* Set all Latches in shared memorys cause new LSN been replayed*/ +void +WaitLSNSetLatch(void) +{ + uint i; + for (i = 0; i <= state->backend_maxid; i++) + { + SpinLockAcquire(&state->l_arr[i].slock); + if (state->l_arr[i].pid != 0) + SetLatch(&state->l_arr[i].latch); + SpinLockRelease(&state->l_arr[i].slock); + } +} + +/* + * On WAITLSN own latch and wait till LSN is replayed, Postmaster death, interruption + * or timeout. + */ +void +WaitLSNUtility(const char *lsn, const int delay, DestReceiver *dest) +{ + XLogRecPtr trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn))); + XLogRecPtr cur_lsn; + int latch_events; + uint64_t tdelay = delay; + long secs; + int microsecs; + TimestampTz timer = GetCurrentTimestamp(); + TupOutputState *tstate; + TupleDesc tupdesc; + char *value = "false"; + + if (delay > 0) + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + else + latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + WLOwnLatch(); + + for (;;) + { + cur_lsn = GetXLogReplayRecPtr(NULL); + + /* If LSN had been Replayed */ + if (trg_lsn <= cur_lsn) + break; + + /* If the postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + /* If Delay time is over */ + if (latch_events & WL_TIMEOUT) + { + if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay)) + break; + TimestampDifference(timer,GetCurrentTimestamp(),&secs, µsecs); + tdelay = delay - (secs*1000 + microsecs/1000); + } + + MyPgXact->xmin = InvalidTransactionId; + WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ); + ResetLatch(&state->l_arr[MyBackendId].latch); + + /* CHECK_FOR_INTERRUPTS if they comes then disown latch current */ + if (InterruptPending) + { + WLDisownLatch(); + ProcessInterrupts(); + } + + } + + WLDisownLatch(); + + if (trg_lsn > cur_lsn) + elog(NOTICE,"LSN is not reached. Try to make bigger delay."); + else + value = "true"; + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0); + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc); + /* Send it */ + do_text_output_oneline(tstate, value); + end_tup_output(tstate); +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2ed7b5259d..9b54d75ac6 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -270,7 +270,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt - CreateMatViewStmt RefreshMatViewStmt CreateAmStmt + CreateMatViewStmt RefreshMatViewStmt CreateAmStmt WaitLSNStmt %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -309,7 +309,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type OptSchemaEltList %type TriggerForSpec TriggerForType -%type TriggerActionTime +%type TriggerActionTime WaitDelay %type TriggerEvents TriggerOneEvent %type TriggerFuncArg %type TriggerWhen @@ -663,7 +663,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAITLSN WAITLSN_INFINITE WAITLSN_NO_WAIT WHEN WHERE WHITESPACE_P WINDOW + WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLPARSE XMLPI XMLROOT XMLSERIALIZE @@ -901,6 +902,7 @@ stmt : | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitLSNStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -13235,7 +13237,41 @@ frame_bound: } ; +/***************************************************************************** + * + * QUERY: + * WAITLSN can appear as a query-level command + * + * + *****************************************************************************/ +WaitLSNStmt: + WAITLSN Sconst WaitDelay + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = $3; + $$ = (Node *)n; + } + | WAITLSN_INFINITE Sconst + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = 0; + $$ = (Node *)n; + } + | WAITLSN_NO_WAIT Sconst + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = 1; + $$ = (Node *)n; + } + ; +WaitDelay: + ',' Iconst { $$ = $2; } + | /*EMPTY*/ { $$ = 0; } + ; /* * Supporting nonterminals for expressions. */ @@ -14266,6 +14302,9 @@ unreserved_keyword: | VIEW | VIEWS | VOLATILE + | WAITLSN + | WAITLSN_INFINITE + | WAITLSN_NO_WAIT | WHITESPACE_P | WITHIN | WITHOUT diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 29febb46c4..a80a003b57 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/subtrans.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -268,6 +269,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) AsyncShmemInit(); BackendRandomShmemInit(); + /* + * Init array of Latches in SHMEM for WAITLSN + */ + WaitLSNShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index fd4eff4907..e2746282a6 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -54,6 +54,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -907,6 +908,20 @@ standard_ProcessUtility(Node *parsetree, break; } + case T_WaitLSNStmt: + { + WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree; + if (!RecoveryInProgress()) + { + ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("cannot execute %s not during recovery", + "WaitLSN"))); + } + else + WaitLSNUtility(stmt->lsn, stmt->delay, dest); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, parsetree, queryString, @@ -2371,6 +2386,10 @@ CreateCommandTag(Node *parsetree) tag = "NOTIFY"; break; + case T_WaitLSNStmt: + tag = "WAITLSN"; + break; + case T_ListenStmt: tag = "LISTEN"; break; @@ -2963,6 +2982,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_ALL; break; + case T_WaitLSNStmt: + lev = LOGSTMT_ALL; + break; + case T_ListenStmt: lev = LOGSTMT_ALL; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a02511754e..6021389ee0 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2825,6 +2825,30 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"interval_waitlsn", PGC_SUSET, DEVELOPER_OPTIONS, + gettext_noop("Set interval of time (ms) how often LSN will be checked."), + gettext_noop("Set interval of time (ms) how often LSN will be checked to " + "make less influence on StartupXLOG() process."), + GUC_UNIT_MS + }, + &interval_waitlsn, + 100, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"count_waitlsn", PGC_SUSET, DEVELOPER_OPTIONS, + gettext_noop("How often LSN will be checked."), + gettext_noop("Set count of LSNs that will be passed befor LSN check to " + "make less influence on StartupXLOG() process."), + GUC_NOT_IN_SAMPLE + }, + &count_waitlsn, + 10, 1, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c9f332c908..f8cb00b214 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -109,6 +109,9 @@ extern bool log_checkpoints; extern int CheckPointSegments; +extern int interval_waitlsn; +extern int count_waitlsn; + /* Archive modes */ typedef enum ArchiveMode { diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h new file mode 100644 index 0000000000..2e3960881e --- /dev/null +++ b/src/include/commands/waitlsn.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.h + * WaitLSN notification: WAITLSN + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 2016, Regents of PostgresPRO + * + * src/include/commands/waitlsn.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAITLSN_H +#define WAITLSN_H +#include "tcop/dest.h" + +extern void WaitLSNUtility(const char *lsn, const int delay, DestReceiver *dest); +extern void WaitLSNShmemInit(void); +extern void WaitLSNSetLatch(void); + +#endif /* WAITLSN_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index c514d3fc93..ecacf80576 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -469,6 +469,7 @@ typedef enum NodeTag T_DropReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, + T_WaitLSNStmt, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index fc532fbd43..e8ef4fe67b 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3213,4 +3213,16 @@ typedef struct AlterTSConfigurationStmt bool missing_ok; /* for DROP - skip error if missing? */ } AlterTSConfigurationStmt; +/* ---------------------- + * WaitLSN Statement + * ---------------------- + */ +typedef struct WaitLSNStmt +{ + NodeTag type; + char *lsn; /* Taraget LSN to wait for */ + int delay; /* Delay to wait for LSN*/ + bool nowait; /* No wait for LSN just result*/ +} WaitLSNStmt; + #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 581ff6eedb..0f41ae906c 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -427,6 +427,9 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD) PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD) +PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD) +PG_KEYWORD("waitlsn_infinite", WAITLSN_INFINITE, UNRESERVED_KEYWORD) +PG_KEYWORD("waitlsn_no_wait", WAITLSN_NO_WAIT, UNRESERVED_KEYWORD) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)