commit af47f2e05316e7e89ee7e5b59b5f5fe6ae421508 Author: Alvaro Herrera AuthorDate: Mon Apr 6 19:29:56 2020 -0400 CommitDate: Mon Apr 6 19:51:06 2020 -0400 loop in InvalidateObsoleteReplicationSlots fixes a race condition on slot acquisition diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 04510094a8..f5384f1df8 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name), true); + (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 31e12e4043..e7960cb48e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -325,9 +325,15 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* * Find a previously created slot and mark it as used by this backend. + * + * The return value is only useful if behavior is SAB_Inquire, in which + * it's zero if we successfully acquired the slot, or the PID of the + * owning process otherwise. If behavior is SAB_Error, then trying to + * acquire an owned slot is an error. If SAB_Block, we sleep until the + * slot is released by the owning process. */ -void -ReplicationSlotAcquire(const char *name, bool nowait) +int +ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) { ReplicationSlot *slot; int active_pid; @@ -392,11 +398,13 @@ retry: */ if (active_pid != MyProcPid) { - if (nowait) + if (behavior == SAB_Error) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", name, active_pid))); + else if (behavior == SAB_Inquire) + return active_pid; /* Wait here until we get signaled, and then restart */ ConditionVariableSleep(&slot->active_cv, @@ -412,6 +420,9 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; + + /* success */ + return 0; } /* @@ -518,7 +529,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, nowait); + (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block); ReplicationSlotDropAcquired(); } @@ -1097,7 +1108,6 @@ restart: ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn = InvalidXLogRecPtr; char *slotname; - int wspid; if (!s->in_use) continue; @@ -1112,21 +1122,27 @@ restart: slotname = pstrdup(NameStr(s->data.name)); restart_lsn = s->data.restart_lsn; - wspid = s->active_pid; SpinLockRelease(&s->mutex); LWLockRelease(ReplicationSlotControlLock); - if (wspid != 0) + for (;;) { - ereport(LOG, - (errmsg("terminating walsender %d because replication slot is too far behind", - wspid))); - (void) kill(wspid, SIGTERM); - } + int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire); - /* Here we wait until the walsender is gone */ - ReplicationSlotAcquire(slotname, false); + /* no walsender? success! */ + if (wspid == 0) + break; + + ereport(LOG, + (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind", + wspid, slotname))); + (void) kill(wspid, SIGTERM); + + ConditionVariableTimedSleep(&s->active_cv, + 10, WAIT_EVENT_REPLICATION_SLOT_DROP); + ConditionVariableCancelSleep(); + } ereport(LOG, (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 91a5d0f290..f8336129d9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -592,7 +592,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true); + (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9e5611574c..06e8b79036 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -595,7 +595,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname, true); + (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1132,7 +1132,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname, true); + (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 6e469ea749..f984bfd7a6 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -36,6 +36,14 @@ typedef enum ReplicationSlotPersistency RS_TEMPORARY } ReplicationSlotPersistency; +/* For ReplicationSlotAcquire, q.v. */ +typedef enum SlotAcquireBehavior +{ + SAB_Error, + SAB_Block, + SAB_Inquire +} SlotAcquireBehavior; + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -184,7 +192,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name, bool nowait); +extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void);