From cf2044e2c6729d450c8fd9b7a7603254418bb6d5 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Thu, 21 Dec 2017 21:23:25 +0900 Subject: [PATCH 2/4] Add monitoring aid for max_replication_slots. Adds two columns "status" and "min_secure_lsn" in pg_replication_slot. Setting max_slot_wal_keep_size, long-disconnected slots may lose sync. The two columns shows that a slot can be reconnected or not, or about to lose required WAL segments. And the LSN back to where the next checkpoint will secure. --- contrib/test_decoding/expected/ddl.out | 4 +- src/backend/access/transam/xlog.c | 95 ++++++++++++++++++++++++++++++++++ src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slotfuncs.c | 25 ++++++++- src/include/access/xlog.h | 1 + src/include/catalog/pg_proc.dat | 6 +-- src/test/regress/expected/rules.out | 6 ++- 7 files changed, 132 insertions(+), 9 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index b7c76469fc..276f7f6efd 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | min_keep_lsn +-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+-------------- (0 rows) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 959d18e029..5c16750e89 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9379,6 +9379,101 @@ CreateRestartPoint(int flags) return true; } + +/* + * Returns the segment number of the oldest file in XLOG directory. + */ +static XLogSegNo +GetOldestXLogFileSegNo(void) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo segno = 0; + + xldir = AllocateDir(XLOGDIR); + if (xldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open write-ahead log directory \"%s\": %m", + XLOGDIR))); + + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID tli; + XLogSegNo fsegno; + + /* Ignore files that are not XLOG segments */ + if (!IsXLogFileName(xlde->d_name) && + !IsPartialXLogFileName(xlde->d_name)) + continue; + + XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size); + + /* get minimum segment ignorig timeline ID */ + if (segno == 0 || fsegno < segno) + segno = fsegno; + } + + FreeDir(xldir); + + return segno; +} + +/* + * Check if the record on the given restartLSN is present in XLOG files. + * + * Returns true if it is present. If minKeepLSN is given, it receives the + * LSN at the beginning of the oldest existing WAL segment. + */ +bool +IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minKeepLSN) +{ + XLogRecPtr currpos; + XLogSegNo restartSeg; + XLogSegNo tailSeg; + XLogSegNo oldestSeg; + + Assert(!XLogRecPtrIsInvalid(restartLSN)); + + currpos = GetXLogWriteRecPtr(); + + SpinLockAcquire(&XLogCtl->info_lck); + oldestSeg = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * oldestSeg is zero before at least one segment has been removed since + * startup. Use oldest segno taken from file names. + */ + if (oldestSeg == 0) + { + static XLogSegNo oldestFileSeg = 0; + + if (oldestFileSeg == 0) + oldestFileSeg = GetOldestXLogFileSegNo(); + /* let it have the same meaning with lastRemovedSegNo here */ + oldestSeg = oldestFileSeg - 1; + } + + /* oldest segment is just after the last removed segment */ + oldestSeg++; + + XLByteToSeg(restartLSN, restartSeg, wal_segment_size); + + + if (minKeepLSN) + { + XLogRecPtr slotPtr = XLogGetReplicationSlotMinimumLSN(); + Assert(!XLogRecPtrIsInvalid(slotPtr)); + + tailSeg = GetOldestKeepSegment(currpos, slotPtr); + + XLogSegNoOffsetToRecPtr(tailSeg, 0, *minKeepLSN, wal_segment_size); + } + + return oldestSeg <= restartSeg; +} + /* * Returns minimum segment number the next checktpoint must leave considering * wal_keep_segments, replication slots and max_slot_wal_keep_size. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8cd8bf40ac..1664a086e9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.min_keep_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2806e1076c..f13aa4d455 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -307,6 +307,29 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (restart_lsn == InvalidXLogRecPtr) + { + values[i++] = CStringGetTextDatum("unknown"); + values[i++] = LSNGetDatum(InvalidXLogRecPtr); + } + else + { + XLogRecPtr min_keep_lsn; + char *status = "lost"; + + if (BoolGetDatum(IsLsnStillAvaiable(restart_lsn, + &min_keep_lsn))) + { + if (min_keep_lsn <= restart_lsn) + status = "streaming"; + else + status = "keeping"; + } + + values[i++] = CStringGetTextDatum(status); + values[i++] = LSNGetDatum(min_keep_lsn); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 12cd0d1d10..52e64f392d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern bool IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minSecureLSN); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 40d54ed030..ef8f0eab91 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9796,9 +9796,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_keep_lsn}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index ae0cd253d5..93f6bff77e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.min_keep_lsn + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_keep_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.16.3