From 4be4ce4c671499c373ac5f8318f432db182eb8f4 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Thu, 21 Dec 2017 21:23:25 +0900 Subject: [PATCH 2/6] Add monitoring aid for max_slot_wal_keep_size. Adds two columns "status" and "remain" in pg_replication_slot. Setting max_slot_wal_keep_size, long-disconnected slots may lose sync. The two columns show whether the slot is reconnectable or not, or about to lose reserving WAL segments, and the remaining bytes of WAL that can be advance until the slot loses reserving WAL records. --- contrib/test_decoding/expected/ddl.out | 4 +- contrib/test_decoding/sql/ddl.sql | 2 + src/backend/access/transam/xlog.c | 150 +++++++++++++++++++++++++++++++-- src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slotfuncs.c | 16 +++- src/include/access/xlog.h | 1 + src/include/catalog/pg_proc.dat | 6 +- src/test/regress/expected/rules.out | 6 +- 8 files changed, 172 insertions(+), 17 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index b7c76469fc..c5f52d6ee8 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -705,8 +705,8 @@ SELECT pg_drop_replication_slot('regression_slot'); (1 row) /* check that the slot is gone */ +\x SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- (0 rows) +\x diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index c4b10a4cf9..5040d5e85e 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -374,4 +374,6 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ +\x SELECT * FROM pg_replication_slots; +\x diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9988ef943c..aaafa6b74f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -873,7 +873,9 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr); +static XLogSegNo GetOldestXLogFileSegNo(void); +static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, + XLogRecPtr targetLSN, uint64 *restBytes); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -6660,6 +6662,12 @@ StartupXLOG(void) */ StartupReplicationOrigin(); + /* + * Initialize lastRemovedSegNo looking pg_wal directory. The minimum + * segment number is 1 so no wrap-around can happen. + */ + XLogCtl->lastRemovedSegNo = GetOldestXLogFileSegNo() - 1; + /* * Initialize unlogged LSN. On a clean shutdown, it's restored from the * control file. On recovery, all unlogged relations are blown away, so @@ -9327,19 +9335,115 @@ CreateRestartPoint(int flags) return true; } + +/* + * Finds the segment number of the oldest file in XLOG directory. + * + * This function is intended to be used for initialization of + * XLogCtl->lastRemovedSegNo. + */ +static XLogSegNo +GetOldestXLogFileSegNo(void) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo segno = 0; + + xldir = AllocateDir(XLOGDIR); + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID tli; + XLogSegNo fsegno; + + /* Ignore files that are not XLOG segments */ + if (!IsXLogFileName(xlde->d_name) && + !IsPartialXLogFileName(xlde->d_name)) + continue; + + XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size); + + /* + * get minimum segment ignoring timeline ID. Since RemoveOldXlog + * works ignoring timeline ID, this function works the same way. + */ + if (segno == 0 || fsegno < segno) + segno = fsegno; + } + + FreeDir(xldir); + + return segno; +} + +/* + * Returns availability status of the record at given targetLSN + * + * Returns three kinds of value. + * "streaming" when the WAL record at targetLSN is available. + * "keeping" when still available but about to be removed by the next + * checkpoint. + * "lost" when the WAL record at targetLSN is already removed. + * + * If restBytes is not NULL, sets the remaining LSN bytes to advance until the + * segment that contains targetLSN will be removed. + */ +char * +GetLsnAvailability(XLogRecPtr targetLSN, uint64 *restBytes) +{ + XLogRecPtr currpos; + XLogRecPtr slotPtr; + XLogSegNo targetSeg; + XLogSegNo tailSeg; + XLogSegNo oldestSeg; + + Assert(!XLogRecPtrIsInvalid(targetLSN)); + Assert(restBytes); + + currpos = GetXLogWriteRecPtr(); + + SpinLockAcquire(&XLogCtl->info_lck); + oldestSeg = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); + + /* oldest segment is just after the last removed segment */ + oldestSeg++; + + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + + slotPtr = XLogGetReplicationSlotMinimumLSN(); + tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes); + + /* targetSeg is being reserved by slots */ + if (tailSeg <= targetSeg) + return "streaming"; + + /* targetSeg is not reserved but still available */ + if (oldestSeg <= targetSeg) + return "keeping"; + + /* targetSeg has gone */ + return "lost"; +} + /* * Returns minimum segment number the next checkpoint must leave considering * wal_keep_segments, replication slots and max_slot_wal_keep_size. * * currLSN is the current insert location * minSlotLSN is the minimum restart_lsn of all active slots + * targetLSN is used when restBytes is not NULL. + * + * If restBytes is not NULL, sets the remaining LSN bytes to advance until the + * segment that contains targetLSN will be removed. */ static XLogSegNo -GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, + XLogRecPtr targetLSN, uint64 *restBytes) { uint64 keepSegs = 0; XLogSegNo currSeg; XLogSegNo minSlotSeg; + uint64 limitSegs = 0; XLByteToSeg(currLSN, currSeg, wal_segment_size); XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size); @@ -9354,8 +9458,6 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) /* Cap keepSegs by max_slot_wal_keep_size */ if (max_slot_wal_keep_size_mb >= 0) { - uint64 limitSegs; - limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); /* Apply max_slot_wal_keep_size to keepSegs */ @@ -9363,9 +9465,40 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) keepSegs = limitSegs; } - /* but, keep at least wal_keep_segments segments if any */ - if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) - keepSegs = wal_keep_segments; + if (wal_keep_segments > 0) + { + /* but, keep at least wal_keep_segments segments if any */ + if (keepSegs < wal_keep_segments) + keepSegs = wal_keep_segments; + + /* also, limitSegs should be raised if wal_keep_segments is larger */ + if (limitSegs < wal_keep_segments) + limitSegs = wal_keep_segments; + } + + /* + * If requested, return remaining LSN bytes to advance until the slot + * gives up reserving WAL records. + */ + if (restBytes) + { + uint64 fragbytes; + XLogSegNo targetSeg; + + *restBytes = 0; + + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + if (max_slot_wal_keep_size_mb >= 0 && currSeg <= targetSeg + limitSegs) + { + /* + * This slot still has all required segments. Calculate how many + * LSN bytes the slot has until it loses targetLSN. + */ + fragbytes = wal_segment_size - (currLSN % wal_segment_size); + XLogSegNoOffsetToRecPtr(targetSeg + limitSegs - currSeg, fragbytes, + wal_segment_size, *restBytes); + } + } /* avoid underflow, don't go below 1 */ if (currSeg <= keepSegs) @@ -9395,7 +9528,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) /* * We should keep certain number of WAL segments after this checkpoint. */ - minSegNo = GetOldestKeepSegment(recptr, slotminptr); + minSegNo = + GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL); /* * warn if the checkpoint flushes the segments required by replication diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f4d9e9daf7..358df2c183 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -798,7 +798,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.remain FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 224dd920c8..cac66978ed 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -307,6 +307,20 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (restart_lsn == InvalidXLogRecPtr) + { + values[i++] = CStringGetTextDatum("unknown"); + values[i++] = LSNGetDatum(InvalidXLogRecPtr); + } + else + { + uint64 remaining_bytes; + + values[i++] = CStringGetTextDatum( + GetLsnAvailability(restart_lsn, &remaining_bytes)); + values[i++] = Int64GetDatum(remaining_bytes); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index b2eb30b779..b0cdba6d7a 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -302,6 +302,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern char *GetLsnAvailability(XLogRecPtr targetLSN, uint64 *restBytes); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3ecc2e12c3..f7e6d18e35 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9665,9 +9665,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e384cd2279..956c3c9525 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.remain + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.16.3