ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
/* everything else is zeroed by the memset above */
+ slot->active_proc = INVALID_PROC_NUMBER;
SpinLockInit(&slot->mutex);
LWLockInitialize(&slot->io_in_progress_lock,
LWTRANCHE_REPLICATION_SLOT_IO);
* be doing that. So it's safe to initialize the slot.
*/
Assert(!slot->in_use);
- Assert(slot->active_pid == 0);
+ Assert(slot->active_proc == INVALID_PROC_NUMBER);
/* first initialize persistent data */
memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
/* We can now mark the slot active, and that makes it our slot. */
SpinLockAcquire(&slot->mutex);
- Assert(slot->active_pid == 0);
- slot->active_pid = MyProcPid;
+ Assert(slot->active_proc == INVALID_PROC_NUMBER);
+ slot->active_proc = MyProcNumber;
SpinLockRelease(&slot->mutex);
MyReplicationSlot = slot;
ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
{
ReplicationSlot *s;
+ ProcNumber active_proc;
int active_pid;
Assert(name != NULL);
* to inactive_since in InvalidatePossiblyObsoleteSlot.
*/
SpinLockAcquire(&s->mutex);
- if (s->active_pid == 0)
- s->active_pid = MyProcPid;
- active_pid = s->active_pid;
+ if (s->active_proc == INVALID_PROC_NUMBER)
+ s->active_proc = MyProcNumber;
+ active_proc = s->active_proc;
ReplicationSlotSetInactiveSince(s, 0, false);
SpinLockRelease(&s->mutex);
}
else
{
- s->active_pid = active_pid = MyProcPid;
+ s->active_proc = active_proc = MyProcNumber;
ReplicationSlotSetInactiveSince(s, 0, true);
}
+ active_pid = GetPGProcByNumber(active_proc)->pid;
LWLockRelease(ReplicationSlotControlLock);
/*
* wait until the owning process signals us that it's been released, or
* error out.
*/
- if (active_pid != MyProcPid)
+ if (active_proc != MyProcNumber)
{
if (!nowait)
{
bool is_logical;
TimestampTz now = 0;
- Assert(slot != NULL && slot->active_pid != 0);
+ Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
is_logical = SlotIsLogical(slot);
* disconnecting, but wake up others that may be waiting for it.
*/
SpinLockAcquire(&slot->mutex);
- slot->active_pid = 0;
+ slot->active_proc = INVALID_PROC_NUMBER;
ReplicationSlotSetInactiveSince(slot, now, false);
SpinLockRelease(&slot->mutex);
ConditionVariableBroadcast(&slot->active_cv);
found_valid_logicalslot |=
(SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
- if ((s->active_pid == MyProcPid &&
+ if ((s->active_proc == MyProcNumber &&
(!synced_only || s->data.synced)))
{
Assert(s->data.persistency == RS_TEMPORARY);
bool fail_softly = slot->data.persistency != RS_PERSISTENT;
SpinLockAcquire(&slot->mutex);
- slot->active_pid = 0;
+ slot->active_proc = INVALID_PROC_NUMBER;
SpinLockRelease(&slot->mutex);
/* wake up anyone waiting on this slot */
* Also wake up processes waiting for it.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
- slot->active_pid = 0;
+ slot->active_proc = INVALID_PROC_NUMBER;
slot->in_use = false;
LWLockRelease(ReplicationSlotControlLock);
ConditionVariableBroadcast(&slot->active_cv);
/* count slots with spinlock held */
SpinLockAcquire(&s->mutex);
(*nslots)++;
- if (s->active_pid != 0)
+ if (s->active_proc != INVALID_PROC_NUMBER)
(*nactive)++;
SpinLockRelease(&s->mutex);
}
{
ReplicationSlot *s;
char *slotname;
- int active_pid;
+ ProcNumber active_proc;
s = &ReplicationSlotCtl->replication_slots[i];
SpinLockAcquire(&s->mutex);
/* can't change while ReplicationSlotControlLock is held */
slotname = NameStr(s->data.name);
- active_pid = s->active_pid;
- if (active_pid == 0)
+ active_proc = s->active_proc;
+ if (active_proc == INVALID_PROC_NUMBER)
{
MyReplicationSlot = s;
- s->active_pid = MyProcPid;
+ s->active_proc = MyProcNumber;
}
SpinLockRelease(&s->mutex);
* XXX: We can consider shutting down the slot sync worker before
* trying to drop synced temporary slots here.
*/
- if (active_pid)
+ if (active_proc != INVALID_PROC_NUMBER)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
- slotname, active_pid)));
+ slotname, GetPGProcByNumber(active_proc)->pid)));
/*
* To avoid duplicating ReplicationSlotDropAcquired() and to avoid
{
XLogRecPtr restart_lsn;
NameData slotname;
+ ProcNumber active_proc;
int active_pid = 0;
ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
TimestampTz now = 0;
}
slotname = s->data.name;
- active_pid = s->active_pid;
+ active_proc = s->active_proc;
/*
* If the slot can be acquired, do so and mark it invalidated
* is terminated. So, the inactive slot can only be invalidated
* immediately without being terminated.
*/
- if (active_pid == 0)
+ if (active_proc == INVALID_PROC_NUMBER)
{
MyReplicationSlot = s;
- s->active_pid = MyProcPid;
+ s->active_proc = MyProcNumber;
s->data.invalidated = invalidation_cause;
/*
/* Let caller know */
invalidated = true;
}
+ else
+ {
+ active_pid = GetPGProcByNumber(active_proc)->pid;
+ Assert(active_pid != 0);
+ }
SpinLockRelease(&s->mutex);
&slot_idle_usecs);
}
- if (active_pid != 0)
+ if (active_proc != INVALID_PROC_NUMBER)
{
/*
* Prepare the sleep on the slot's condition variable before
if (MyBackendType == B_STARTUP)
(void) SendProcSignal(active_pid,
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
- INVALID_PROC_NUMBER);
+ active_proc);
else
(void) kill(active_pid, SIGTERM);
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->in_use = true;
- slot->active_pid = 0;
+ slot->active_proc = INVALID_PROC_NUMBER;
/*
* Set the time since the slot has become inactive after loading the
SpinLockAcquire(&slot->mutex);
restart_lsn = slot->data.restart_lsn;
invalidated = slot->data.invalidated != RS_INVAL_NONE;
- inactive = slot->active_pid == 0;
+ inactive = slot->active_proc == INVALID_PROC_NUMBER;
SpinLockRelease(&slot->mutex);
if (invalidated)