]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Use ProcNumber rather than pid in ReplicationSlot
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 10 Feb 2026 14:23:05 +0000 (16:23 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 10 Feb 2026 14:23:05 +0000 (16:23 +0200)
This helps the next commit.

Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://www.postgresql.org/message-id/4cc13ba1-4248-4884-b6ba-4805349e7f39@iki.fi

src/backend/replication/logical/slotsync.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/include/replication/slot.h

index af5682ce50e65a858e01e33b1e397b81f8688a29..d02d44d26a03a996bddc428ccb4ac9094166fa1d 100644 (file)
@@ -1757,7 +1757,7 @@ update_synced_slots_inactive_since(void)
                        Assert(SlotIsLogical(s));
 
                        /* The slot must not be acquired by any process */
-                       Assert(s->active_pid == 0);
+                       Assert(s->active_proc == INVALID_PROC_NUMBER);
 
                        /* Use the same inactive_since time for all the slots. */
                        if (now == 0)
index 4c47261c7f913dbc9fab5691316c646a2dd9d8de..d5628d62117af0bbf3edb133a73c4ea8bcc5f84d 100644 (file)
@@ -226,6 +226,7 @@ ReplicationSlotsShmemInit(void)
                        ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
 
                        /* everything else is zeroed by the memset above */
+                       slot->active_proc = INVALID_PROC_NUMBER;
                        SpinLockInit(&slot->mutex);
                        LWLockInitialize(&slot->io_in_progress_lock,
                                                         LWTRANCHE_REPLICATION_SLOT_IO);
@@ -461,7 +462,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
         * be doing that.  So it's safe to initialize the slot.
         */
        Assert(!slot->in_use);
-       Assert(slot->active_pid == 0);
+       Assert(slot->active_proc == INVALID_PROC_NUMBER);
 
        /* first initialize persistent data */
        memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
@@ -505,8 +506,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
        /* We can now mark the slot active, and that makes it our slot. */
        SpinLockAcquire(&slot->mutex);
-       Assert(slot->active_pid == 0);
-       slot->active_pid = MyProcPid;
+       Assert(slot->active_proc == INVALID_PROC_NUMBER);
+       slot->active_proc = MyProcNumber;
        SpinLockRelease(&slot->mutex);
        MyReplicationSlot = slot;
 
@@ -620,6 +621,7 @@ void
 ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
        ReplicationSlot *s;
+       ProcNumber      active_proc;
        int                     active_pid;
 
        Assert(name != NULL);
@@ -672,17 +674,18 @@ retry:
                 * to inactive_since in InvalidatePossiblyObsoleteSlot.
                 */
                SpinLockAcquire(&s->mutex);
-               if (s->active_pid == 0)
-                       s->active_pid = MyProcPid;
-               active_pid = s->active_pid;
+               if (s->active_proc == INVALID_PROC_NUMBER)
+                       s->active_proc = MyProcNumber;
+               active_proc = s->active_proc;
                ReplicationSlotSetInactiveSince(s, 0, false);
                SpinLockRelease(&s->mutex);
        }
        else
        {
-               s->active_pid = active_pid = MyProcPid;
+               s->active_proc = active_proc = MyProcNumber;
                ReplicationSlotSetInactiveSince(s, 0, true);
        }
+       active_pid = GetPGProcByNumber(active_proc)->pid;
        LWLockRelease(ReplicationSlotControlLock);
 
        /*
@@ -690,7 +693,7 @@ retry:
         * wait until the owning process signals us that it's been released, or
         * error out.
         */
-       if (active_pid != MyProcPid)
+       if (active_proc != MyProcNumber)
        {
                if (!nowait)
                {
@@ -762,7 +765,7 @@ ReplicationSlotRelease(void)
        bool            is_logical;
        TimestampTz now = 0;
 
-       Assert(slot != NULL && slot->active_pid != 0);
+       Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
 
        is_logical = SlotIsLogical(slot);
 
@@ -815,7 +818,7 @@ ReplicationSlotRelease(void)
                 * disconnecting, but wake up others that may be waiting for it.
                 */
                SpinLockAcquire(&slot->mutex);
-               slot->active_pid = 0;
+               slot->active_proc = INVALID_PROC_NUMBER;
                ReplicationSlotSetInactiveSince(slot, now, false);
                SpinLockRelease(&slot->mutex);
                ConditionVariableBroadcast(&slot->active_cv);
@@ -877,7 +880,7 @@ restart:
                found_valid_logicalslot |=
                        (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
 
-               if ((s->active_pid == MyProcPid &&
+               if ((s->active_proc == MyProcNumber &&
                         (!synced_only || s->data.synced)))
                {
                        Assert(s->data.persistency == RS_TEMPORARY);
@@ -1088,7 +1091,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
                bool            fail_softly = slot->data.persistency != RS_PERSISTENT;
 
                SpinLockAcquire(&slot->mutex);
-               slot->active_pid = 0;
+               slot->active_proc = INVALID_PROC_NUMBER;
                SpinLockRelease(&slot->mutex);
 
                /* wake up anyone waiting on this slot */
@@ -1110,7 +1113,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
         * Also wake up processes waiting for it.
         */
        LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
-       slot->active_pid = 0;
+       slot->active_proc = INVALID_PROC_NUMBER;
        slot->in_use = false;
        LWLockRelease(ReplicationSlotControlLock);
        ConditionVariableBroadcast(&slot->active_cv);
@@ -1476,7 +1479,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
                /* count slots with spinlock held */
                SpinLockAcquire(&s->mutex);
                (*nslots)++;
-               if (s->active_pid != 0)
+               if (s->active_proc != INVALID_PROC_NUMBER)
                        (*nactive)++;
                SpinLockRelease(&s->mutex);
        }
@@ -1520,7 +1523,7 @@ restart:
        {
                ReplicationSlot *s;
                char       *slotname;
-               int                     active_pid;
+               ProcNumber      active_proc;
 
                s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1550,11 +1553,11 @@ restart:
                SpinLockAcquire(&s->mutex);
                /* can't change while ReplicationSlotControlLock is held */
                slotname = NameStr(s->data.name);
-               active_pid = s->active_pid;
-               if (active_pid == 0)
+               active_proc = s->active_proc;
+               if (active_proc == INVALID_PROC_NUMBER)
                {
                        MyReplicationSlot = s;
-                       s->active_pid = MyProcPid;
+                       s->active_proc = MyProcNumber;
                }
                SpinLockRelease(&s->mutex);
 
@@ -1579,11 +1582,11 @@ restart:
                 * XXX: We can consider shutting down the slot sync worker before
                 * trying to drop synced temporary slots here.
                 */
-               if (active_pid)
+               if (active_proc != INVALID_PROC_NUMBER)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_IN_USE),
                                         errmsg("replication slot \"%s\" is active for PID %d",
-                                                       slotname, active_pid)));
+                                                       slotname, GetPGProcByNumber(active_proc)->pid)));
 
                /*
                 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
@@ -1974,6 +1977,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
        {
                XLogRecPtr      restart_lsn;
                NameData        slotname;
+               ProcNumber      active_proc;
                int                     active_pid = 0;
                ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
                TimestampTz now = 0;
@@ -2027,7 +2031,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
                }
 
                slotname = s->data.name;
-               active_pid = s->active_pid;
+               active_proc = s->active_proc;
 
                /*
                 * If the slot can be acquired, do so and mark it invalidated
@@ -2039,10 +2043,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
                 * is terminated. So, the inactive slot can only be invalidated
                 * immediately without being terminated.
                 */
-               if (active_pid == 0)
+               if (active_proc == INVALID_PROC_NUMBER)
                {
                        MyReplicationSlot = s;
-                       s->active_pid = MyProcPid;
+                       s->active_proc = MyProcNumber;
                        s->data.invalidated = invalidation_cause;
 
                        /*
@@ -2058,6 +2062,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
                        /* Let caller know */
                        invalidated = true;
                }
+               else
+               {
+                       active_pid = GetPGProcByNumber(active_proc)->pid;
+                       Assert(active_pid != 0);
+               }
 
                SpinLockRelease(&s->mutex);
 
@@ -2073,7 +2082,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
                                                                &slot_idle_usecs);
                }
 
-               if (active_pid != 0)
+               if (active_proc != INVALID_PROC_NUMBER)
                {
                        /*
                         * Prepare the sleep on the slot's condition variable before
@@ -2107,7 +2116,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
                                if (MyBackendType == B_STARTUP)
                                        (void) SendProcSignal(active_pid,
                                                                                  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
-                                                                                 INVALID_PROC_NUMBER);
+                                                                                 active_proc);
                                else
                                        (void) kill(active_pid, SIGTERM);
 
@@ -2875,7 +2884,7 @@ RestoreSlotFromDisk(const char *name)
                slot->candidate_restart_valid = InvalidXLogRecPtr;
 
                slot->in_use = true;
-               slot->active_pid = 0;
+               slot->active_proc = INVALID_PROC_NUMBER;
 
                /*
                 * Set the time since the slot has become inactive after loading the
@@ -3158,7 +3167,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
                SpinLockAcquire(&slot->mutex);
                restart_lsn = slot->data.restart_lsn;
                invalidated = slot->data.invalidated != RS_INVAL_NONE;
-               inactive = slot->active_pid == 0;
+               inactive = slot->active_proc == INVALID_PROC_NUMBER;
                SpinLockRelease(&slot->mutex);
 
                if (invalidated)
index 1ed2d80c2d2d65c648874452de8e15886309f79e..9f5e4f998fe9d8bfb92006eb0e52ee604ab81814 100644 (file)
@@ -20,6 +20,7 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
+#include "storage/proc.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/pg_lsn.h"
@@ -309,10 +310,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                        values[i++] = ObjectIdGetDatum(slot_contents.data.database);
 
                values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
-               values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
+               values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER);
 
-               if (slot_contents.active_pid != 0)
-                       values[i++] = Int32GetDatum(slot_contents.active_pid);
+               if (slot_contents.active_proc != INVALID_PROC_NUMBER)
+                       values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
                else
                        nulls[i++] = true;
 
@@ -377,13 +378,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                                 */
                                if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
                                {
-                                       int                     pid;
+                                       ProcNumber      procno;
 
                                        SpinLockAcquire(&slot->mutex);
-                                       pid = slot->active_pid;
+                                       procno = slot->active_proc;
                                        slot_contents.data.restart_lsn = slot->data.restart_lsn;
                                        SpinLockRelease(&slot->mutex);
-                                       if (pid != 0)
+                                       if (procno != INVALID_PROC_NUMBER)
                                        {
                                                values[i++] = CStringGetTextDatum("unreserved");
                                                walstate = WALAVAIL_UNRESERVED;
index f465e430cc67a03b1a6dd5765b01ca66a6fa8b1d..72f8be629f39760801332cc4a53177ced9ff3964 100644 (file)
@@ -185,8 +185,11 @@ typedef struct ReplicationSlot
        /* is this slot defined */
        bool            in_use;
 
-       /* Who is streaming out changes for this slot? 0 in unused slots. */
-       pid_t           active_pid;
+       /*
+        * Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
+        * unused slots.
+        */
+       ProcNumber      active_proc;
 
        /* any outstanding modifications? */
        bool            just_dirtied;