* according to synchronized LSNs of replication slots. The slot's LSN
* might be advanced concurrently, so we call this before
* CheckPointReplicationSlots() synchronizes replication slots.
- */
+ *
+ * We acquire the Allocation lock to serialize the minimum LSN calculation
+ * with concurrent slot WAL reservation. This ensures that the WAL
+ * position being reserved is either included in the miminum LSN or is
+ * beyond or equal to the redo pointer of the current checkpoint (See
+ * ReplicationSlotReserveWal for details), thus preventing its removal by
+ * checkpoints. Note that this lock is required only during checkpoints
+ * where WAL removal is dictated by the slot's minimum LSN.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ LWLockRelease(ReplicationSlotAllocationLock);
/*
* In some cases there are groups of actions that must all occur on one
/*
* Recalculate the current minimum LSN to be used in the WAL segment
* cleanup. Then, we must synchronize the replication slots again in
- * order to make this LSN safe to use.
+ * order to make this LSN safe to use. Here, we don't need to acquire
+ * the ReplicationSlotAllocationLock to serialize the minimum LSN
+ * computation with slot reservation as the RedoRecPtr is not updated
+ * after the previous computation of minimum LSN.
*/
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
CheckPointReplicationSlots();
* according to synchronized LSNs of replication slots. The slot's LSN
* might be advanced concurrently, so we call this before
* CheckPointReplicationSlots() synchronizes replication slots.
+ *
+ * We acquire the Allocation lock to serialize the minimum LSN calculation
+ * with concurrent slot WAL reservation. This ensures that the WAL
+ * position being reserved is either included in the miminum LSN or is
+ * beyond or equal to the redo pointer of the current checkpoint (See
+ * ReplicationSlotReserveWal for details).
*/
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ LWLockRelease(ReplicationSlotAllocationLock);
if (log_checkpoints)
LogCheckpointStart(flags, true);
/*
* Recalculate the current minimum LSN to be used in the WAL segment
* cleanup. Then, we must synchronize the replication slots again in
- * order to make this LSN safe to use.
+ * order to make this LSN safe to use. Here, we don't need to acquire
+ * the ReplicationSlotAllocationLock to serialize the minimum LSN
+ * computation with slot reservation as the RedoRecPtr is not updated
+ * after the previous computation of minimum LSN.
*/
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
CheckPointReplicationSlots();
ReplicationSlotReserveWal(void)
{
ReplicationSlot *slot = MyReplicationSlot;
+ XLogSegNo segno;
+ XLogRecPtr restart_lsn;
Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
/*
- * The replication slot mechanism is used to prevent removal of required
- * WAL. As there is no interlock between this routine and checkpoints, WAL
- * segments could concurrently be removed when a now stale return value of
- * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
- * this happens we'll just retry.
+ * The replication slot mechanism is used to prevent the removal of
+ * required WAL.
+ *
+ * Acquire an exclusive lock to prevent the checkpoint process from
+ * concurrently computing the minimum slot LSN (see the call to
+ * XLogGetReplicationSlotMinimumLSN in CreateCheckPoint). This ensures
+ * that the WAL reserved for replication cannot be removed during a
+ * checkpoint.
+ *
+ * The mechanism is reliable because if WAL reservation occurs first, the
+ * checkpoint must wait for the restart_lsn update before determining the
+ * minimum non-removable LSN. On the other hand, if the checkpoint happens
+ * first, subsequent WAL reservations will select positions at or beyond
+ * the redo pointer of that checkpoint.
*/
- while (true)
- {
- XLogSegNo segno;
- XLogRecPtr restart_lsn;
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
- /*
- * For logical slots log a standby snapshot and start logical decoding
- * at exactly that position. That allows the slot to start up more
- * quickly. But on a standby we cannot do WAL writes, so just use the
- * replay pointer; effectively, an attempt to create a logical slot on
- * standby will cause it to wait for an xl_running_xact record to be
- * logged independently on the primary, so that a snapshot can be
- * built using the record.
- *
- * None of this is needed (or indeed helpful) for physical slots as
- * they'll start replay at the last logged checkpoint anyway. Instead
- * return the location of the last redo LSN. While that slightly
- * increases the chance that we have to retry, it's where a base
- * backup has to start replay at.
- */
- if (SlotIsPhysical(slot))
- restart_lsn = GetRedoRecPtr();
- else if (RecoveryInProgress())
- restart_lsn = GetXLogReplayRecPtr(NULL);
- else
- restart_lsn = GetXLogInsertRecPtr();
+ /*
+ * For logical slots log a standby snapshot and start logical decoding at
+ * exactly that position. That allows the slot to start up more quickly.
+ * But on a standby we cannot do WAL writes, so just use the replay
+ * pointer; effectively, an attempt to create a logical slot on standby
+ * will cause it to wait for an xl_running_xact record to be logged
+ * independently on the primary, so that a snapshot can be built using the
+ * record.
+ *
+ * None of this is needed (or indeed helpful) for physical slots as
+ * they'll start replay at the last logged checkpoint anyway. Instead,
+ * return the location of the last redo LSN, where a base backup has to
+ * start replay at.
+ */
+ if (SlotIsPhysical(slot))
+ restart_lsn = GetRedoRecPtr();
+ else if (RecoveryInProgress())
+ restart_lsn = GetXLogReplayRecPtr(NULL);
+ else
+ restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
- /* prevent WAL removal as fast as possible */
- ReplicationSlotsComputeRequiredLSN();
+ /* prevent WAL removal as fast as possible */
+ ReplicationSlotsComputeRequiredLSN();
- /*
- * If all required WAL is still there, great, otherwise retry. The
- * slot should prevent further removal of WAL, unless there's a
- * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
- * the new restart_lsn above, so normally we should never need to loop
- * more than twice.
- */
- XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
- if (XLogGetLastRemovedSegno() < segno)
- break;
- }
+ /* Checkpoint shouldn't remove the required WAL. */
+ XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+ if (XLogGetLastRemovedSegno() >= segno)
+ elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+ NameStr(slot->data.name));
+
+ LWLockRelease(ReplicationSlotAllocationLock);
if (!RecoveryInProgress() && SlotIsLogical(slot))
{