From d41fb5b4e457c787b3763aca92b8932be550b48e Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Thu, 21 Dec 2017 21:23:25 +0900 Subject: [PATCH 2/4] Add monitoring aid for max_slot_wal_keep_size. Adds two columns "status" and "remain" in pg_replication_slot. Setting max_slot_wal_keep_size, long-disconnected slots may lose sync. The two columns shows whether the slot can be reconnected or not, or about to lose reserving WAL segments, and the remaing bytes of WAL that can be written until the slot loses reserving WAL records. --- contrib/test_decoding/expected/ddl.out | 4 +- src/backend/access/transam/xlog.c | 152 +++++++++++++++++++++++++++++---- src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slotfuncs.c | 32 ++++++- src/include/access/xlog.h | 1 + src/include/catalog/pg_proc.dat | 6 +- src/test/regress/expected/rules.out | 6 +- 7 files changed, 181 insertions(+), 24 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index b7c76469fc..6b6a2df213 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | remain +-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+-------- (0 rows) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4bf1536d8f..1b9cc619f1 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -868,7 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr); +static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, XLogRecPtr restartLSN, uint64 *restBytes); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -9493,44 +9493,165 @@ CreateRestartPoint(int flags) return true; } + +/* + * Returns the segment number of the oldest file in XLOG directory. + */ +static XLogSegNo +GetOldestXLogFileSegNo(void) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo segno = 0; + + xldir = AllocateDir(XLOGDIR); + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID tli; + XLogSegNo fsegno; + + /* Ignore files that are not XLOG segments */ + if (!IsXLogFileName(xlde->d_name) && + !IsPartialXLogFileName(xlde->d_name)) + continue; + + XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size); + + /* get minimum segment ignoring timeline ID */ + if (segno == 0 || fsegno < segno) + segno = fsegno; + } + + FreeDir(xldir); + + return segno; +} + +/* + * Check if the record on the given targetLSN is present in XLOG files. + * + * Returns three kind of values. + * 0 means that WAL record at targetLSN is alredy removed. + * 1 means that WAL record at tagetLSN is availble. + * 2 means that WAL record at tagetLSN is availble but about to be removed by + * the next checkpoint. + */ +int +IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes) +{ + XLogRecPtr currpos; + XLogRecPtr slotPtr; + XLogSegNo targetSeg; + XLogSegNo tailSeg; + XLogSegNo oldestSeg; + + Assert(!XLogRecPtrIsInvalid(targetLSN)); + Assert(restBytes); + + currpos = GetXLogWriteRecPtr(); + + SpinLockAcquire(&XLogCtl->info_lck); + oldestSeg = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * oldestSeg is zero before at least one segment has been removed since + * startup. Use oldest segno taken from file names. + */ + if (oldestSeg == 0) + { + static XLogSegNo oldestFileSeg = 0; + + if (oldestFileSeg == 0) + oldestFileSeg = GetOldestXLogFileSegNo(); + /* let it have the same meaning with lastRemovedSegNo here */ + oldestSeg = oldestFileSeg - 1; + } + + /* oldest segment is just after the last removed segment */ + oldestSeg++; + + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + + slotPtr = XLogGetReplicationSlotMinimumLSN(); + tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes); + + /* targetSeg is being reserved by slots */ + if (tailSeg <= targetSeg) + return 1; + + /* targetSeg is not reserved but still available */ + if (oldestSeg <= targetSeg) + return 2; + + /* targetSeg has gone */ + return 0; +} + /* * Returns minimum segment number the next checktpoint must leave considering * wal_keep_segments, replication slots and max_slot_wal_keep_size. + * + * If resetBytes is not NULL, returns remaining LSN bytes to advance until any + * slot loses reserving a WAL record. */ static XLogSegNo -GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, XLogRecPtr restartLSN, uint64 *restBytes) { uint64 keepSegs = 0; + uint64 limitSegs = 0; XLogSegNo currSeg; - XLogSegNo slotSeg; + XLogSegNo minSlotSeg; XLByteToSeg(currLSN, currSeg, wal_segment_size); - XLByteToSeg(minSlotLSN, slotSeg, wal_segment_size); + XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size); /* - * Calcualte keep segments by slots first. The second term of the + * Calculate keep segments by slots first. The second term of the * condition is just a sanity check. */ - if (minSlotLSN != InvalidXLogRecPtr && slotSeg <= currSeg) - keepSegs = currSeg - slotSeg; + if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg) + keepSegs = currSeg - minSlotSeg; - /* - * slot keep segments is limited by max_slot_wal_keep_size, fragment of a - * segment is ignored - */ + /* Cap keepSegs by max_slot_wal_keep_size */ if (max_slot_wal_keep_size_mb > 0) { - uint64 limitSegs; - limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + + /* Apply max_slot_wal_keep_size to keeping segments */ if (limitSegs < keepSegs) keepSegs = limitSegs; } - /* but, keep larger than wal_segment_size if any*/ + /* but, keep at least wal_keep_segments segments if any */ if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) keepSegs = wal_keep_segments; + /* + * Return remaining LSN bytes to advance until the slot gives up reserving + * WAL records if requested. + */ + if (restBytes) + { + uint64 fragbytes; + XLogSegNo restartSeg; + + *restBytes = 0; + + XLByteToSeg(restartLSN, restartSeg, wal_segment_size); + if (limitSegs > 0 && currSeg <= restartSeg + limitSegs) + { + /* + * This slot still has all required segments. Calculate how many + * LSN bytes the slot has until it loses restart_lsn. + */ + fragbytes = wal_segment_size - (currLSN % wal_segment_size); + *restBytes = + (restartSeg + limitSegs - currSeg) * wal_segment_size + + fragbytes; + } + } + /* avoid underflow, don't go below 1 */ if (currSeg <= keepSegs) return 1; @@ -9562,7 +9683,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) /* * We should keep certain number of WAL segments after this checktpoint. */ - minSegNo = GetOldestKeepSegment(recptr, slotminptr); + minSegNo = + GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL); /* * warn if the checkpoint flushes the segments required by replication diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 7251552419..d28896dc58 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.remain FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8782bad4a2..d9ed9e8cf2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -307,6 +307,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (restart_lsn == InvalidXLogRecPtr) + { + values[i++] = CStringGetTextDatum("unknown"); + values[i++] = LSNGetDatum(InvalidXLogRecPtr); + } + else + { + uint64 remaining_bytes; + char *status; + + switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes)) + { + case 0: + status = "lost"; + break; + case 1: + status = "streaming"; + break; + case 2: + status = "keeping"; + break; + default: + status = "unknown"; + break; + } + + values[i++] = CStringGetTextDatum(status); + values[i++] = Int64GetDatum(remaining_bytes); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 12cd0d1d10..ad9d1dec29 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a14651010f..4a096c9478 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9796,9 +9796,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 744d501e31..dcd5f19644 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.remain + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.16.3