From ddc3250208bd5980a25b0421d607bae202fef06c Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 10 Feb 2026 16:23:05 +0200 Subject: [PATCH] Use ProcNumber rather than pid in ReplicationSlot This helps the next commit. Reviewed-by: Chao Li Discussion: https://www.postgresql.org/message-id/4cc13ba1-4248-4884-b6ba-4805349e7f39@iki.fi --- src/backend/replication/logical/slotsync.c | 2 +- src/backend/replication/slot.c | 63 ++++++++++++---------- src/backend/replication/slotfuncs.c | 13 ++--- src/include/replication/slot.h | 7 ++- 4 files changed, 49 insertions(+), 36 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index af5682ce50e..d02d44d26a0 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1757,7 +1757,7 @@ update_synced_slots_inactive_since(void) Assert(SlotIsLogical(s)); /* The slot must not be acquired by any process */ - Assert(s->active_pid == 0); + Assert(s->active_proc == INVALID_PROC_NUMBER); /* Use the same inactive_since time for all the slots. */ if (now == 0) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 4c47261c7f9..d5628d62117 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -226,6 +226,7 @@ ReplicationSlotsShmemInit(void) ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; /* everything else is zeroed by the memset above */ + slot->active_proc = INVALID_PROC_NUMBER; SpinLockInit(&slot->mutex); LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO); @@ -461,7 +462,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * be doing that. So it's safe to initialize the slot. */ Assert(!slot->in_use); - Assert(slot->active_pid == 0); + Assert(slot->active_proc == INVALID_PROC_NUMBER); /* first initialize persistent data */ memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData)); @@ -505,8 +506,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* We can now mark the slot active, and that makes it our slot. */ SpinLockAcquire(&slot->mutex); - Assert(slot->active_pid == 0); - slot->active_pid = MyProcPid; + Assert(slot->active_proc == INVALID_PROC_NUMBER); + slot->active_proc = MyProcNumber; SpinLockRelease(&slot->mutex); MyReplicationSlot = slot; @@ -620,6 +621,7 @@ void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid) { ReplicationSlot *s; + ProcNumber active_proc; int active_pid; Assert(name != NULL); @@ -672,17 +674,18 @@ retry: * to inactive_since in InvalidatePossiblyObsoleteSlot. */ SpinLockAcquire(&s->mutex); - if (s->active_pid == 0) - s->active_pid = MyProcPid; - active_pid = s->active_pid; + if (s->active_proc == INVALID_PROC_NUMBER) + s->active_proc = MyProcNumber; + active_proc = s->active_proc; ReplicationSlotSetInactiveSince(s, 0, false); SpinLockRelease(&s->mutex); } else { - s->active_pid = active_pid = MyProcPid; + s->active_proc = active_proc = MyProcNumber; ReplicationSlotSetInactiveSince(s, 0, true); } + active_pid = GetPGProcByNumber(active_proc)->pid; LWLockRelease(ReplicationSlotControlLock); /* @@ -690,7 +693,7 @@ retry: * wait until the owning process signals us that it's been released, or * error out. */ - if (active_pid != MyProcPid) + if (active_proc != MyProcNumber) { if (!nowait) { @@ -762,7 +765,7 @@ ReplicationSlotRelease(void) bool is_logical; TimestampTz now = 0; - Assert(slot != NULL && slot->active_pid != 0); + Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER); is_logical = SlotIsLogical(slot); @@ -815,7 +818,7 @@ ReplicationSlotRelease(void) * disconnecting, but wake up others that may be waiting for it. */ SpinLockAcquire(&slot->mutex); - slot->active_pid = 0; + slot->active_proc = INVALID_PROC_NUMBER; ReplicationSlotSetInactiveSince(slot, now, false); SpinLockRelease(&slot->mutex); ConditionVariableBroadcast(&slot->active_cv); @@ -877,7 +880,7 @@ restart: found_valid_logicalslot |= (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE); - if ((s->active_pid == MyProcPid && + if ((s->active_proc == MyProcNumber && (!synced_only || s->data.synced))) { Assert(s->data.persistency == RS_TEMPORARY); @@ -1088,7 +1091,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) bool fail_softly = slot->data.persistency != RS_PERSISTENT; SpinLockAcquire(&slot->mutex); - slot->active_pid = 0; + slot->active_proc = INVALID_PROC_NUMBER; SpinLockRelease(&slot->mutex); /* wake up anyone waiting on this slot */ @@ -1110,7 +1113,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * Also wake up processes waiting for it. */ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); - slot->active_pid = 0; + slot->active_proc = INVALID_PROC_NUMBER; slot->in_use = false; LWLockRelease(ReplicationSlotControlLock); ConditionVariableBroadcast(&slot->active_cv); @@ -1476,7 +1479,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) /* count slots with spinlock held */ SpinLockAcquire(&s->mutex); (*nslots)++; - if (s->active_pid != 0) + if (s->active_proc != INVALID_PROC_NUMBER) (*nactive)++; SpinLockRelease(&s->mutex); } @@ -1520,7 +1523,7 @@ restart: { ReplicationSlot *s; char *slotname; - int active_pid; + ProcNumber active_proc; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1550,11 +1553,11 @@ restart: SpinLockAcquire(&s->mutex); /* can't change while ReplicationSlotControlLock is held */ slotname = NameStr(s->data.name); - active_pid = s->active_pid; - if (active_pid == 0) + active_proc = s->active_proc; + if (active_proc == INVALID_PROC_NUMBER) { MyReplicationSlot = s; - s->active_pid = MyProcPid; + s->active_proc = MyProcNumber; } SpinLockRelease(&s->mutex); @@ -1579,11 +1582,11 @@ restart: * XXX: We can consider shutting down the slot sync worker before * trying to drop synced temporary slots here. */ - if (active_pid) + if (active_proc != INVALID_PROC_NUMBER) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", - slotname, active_pid))); + slotname, GetPGProcByNumber(active_proc)->pid))); /* * To avoid duplicating ReplicationSlotDropAcquired() and to avoid @@ -1974,6 +1977,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, { XLogRecPtr restart_lsn; NameData slotname; + ProcNumber active_proc; int active_pid = 0; ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE; TimestampTz now = 0; @@ -2027,7 +2031,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, } slotname = s->data.name; - active_pid = s->active_pid; + active_proc = s->active_proc; /* * If the slot can be acquired, do so and mark it invalidated @@ -2039,10 +2043,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * is terminated. So, the inactive slot can only be invalidated * immediately without being terminated. */ - if (active_pid == 0) + if (active_proc == INVALID_PROC_NUMBER) { MyReplicationSlot = s; - s->active_pid = MyProcPid; + s->active_proc = MyProcNumber; s->data.invalidated = invalidation_cause; /* @@ -2058,6 +2062,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, /* Let caller know */ invalidated = true; } + else + { + active_pid = GetPGProcByNumber(active_proc)->pid; + Assert(active_pid != 0); + } SpinLockRelease(&s->mutex); @@ -2073,7 +2082,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, &slot_idle_usecs); } - if (active_pid != 0) + if (active_proc != INVALID_PROC_NUMBER) { /* * Prepare the sleep on the slot's condition variable before @@ -2107,7 +2116,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, if (MyBackendType == B_STARTUP) (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, - INVALID_PROC_NUMBER); + active_proc); else (void) kill(active_pid, SIGTERM); @@ -2875,7 +2884,7 @@ RestoreSlotFromDisk(const char *name) slot->candidate_restart_valid = InvalidXLogRecPtr; slot->in_use = true; - slot->active_pid = 0; + slot->active_proc = INVALID_PROC_NUMBER; /* * Set the time since the slot has become inactive after loading the @@ -3158,7 +3167,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) SpinLockAcquire(&slot->mutex); restart_lsn = slot->data.restart_lsn; invalidated = slot->data.invalidated != RS_INVAL_NONE; - inactive = slot->active_pid == 0; + inactive = slot->active_proc == INVALID_PROC_NUMBER; SpinLockRelease(&slot->mutex); if (invalidated) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 1ed2d80c2d2..9f5e4f998fe 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -20,6 +20,7 @@ #include "replication/logical.h" #include "replication/slot.h" #include "replication/slotsync.h" +#include "storage/proc.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/pg_lsn.h" @@ -309,10 +310,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = ObjectIdGetDatum(slot_contents.data.database); values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY); - values[i++] = BoolGetDatum(slot_contents.active_pid != 0); + values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER); - if (slot_contents.active_pid != 0) - values[i++] = Int32GetDatum(slot_contents.active_pid); + if (slot_contents.active_proc != INVALID_PROC_NUMBER) + values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid); else nulls[i++] = true; @@ -377,13 +378,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) */ if (XLogRecPtrIsValid(slot_contents.data.restart_lsn)) { - int pid; + ProcNumber procno; SpinLockAcquire(&slot->mutex); - pid = slot->active_pid; + procno = slot->active_proc; slot_contents.data.restart_lsn = slot->data.restart_lsn; SpinLockRelease(&slot->mutex); - if (pid != 0) + if (procno != INVALID_PROC_NUMBER) { values[i++] = CStringGetTextDatum("unreserved"); walstate = WALAVAIL_UNRESERVED; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index f465e430cc6..72f8be629f3 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -185,8 +185,11 @@ typedef struct ReplicationSlot /* is this slot defined */ bool in_use; - /* Who is streaming out changes for this slot? 0 in unused slots. */ - pid_t active_pid; + /* + * Who is streaming out changes for this slot? INVALID_PROC_NUMBER in + * unused slots. + */ + ProcNumber active_proc; /* any outstanding modifications? */ bool just_dirtied; -- 2.47.3