* according to synchronized LSNs of replication slots. The slot's LSN
* might be advanced concurrently, so we call this before
* CheckPointReplicationSlots() synchronizes replication slots.
- */
+ *
+ * We acquire the Allocation lock to serialize the minimum LSN calculation
+ * with concurrent slot WAL reservation. This ensures that the WAL
+ * position being reserved is either included in the miminum LSN or is
+ * beyond or equal to the redo pointer of the current checkpoint (See
+ * ReplicationSlotReserveWal for details), thus preventing its removal by
+ * checkpoints. Note that this lock is required only during checkpoints
+ * where WAL removal is dictated by the slot's minimum LSN.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ LWLockRelease(ReplicationSlotAllocationLock);
/*
* In some cases there are groups of actions that must all occur on one
/*
* Recalculate the current minimum LSN to be used in the WAL segment
* cleanup. Then, we must synchronize the replication slots again in
- * order to make this LSN safe to use.
+ * order to make this LSN safe to use. Here, we don't need to acquire
+ * the ReplicationSlotAllocationLock to serialize the minimum LSN
+ * computation with slot reservation as the RedoRecPtr is not updated
+ * after the previous computation of minimum LSN.
*/
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
CheckPointReplicationSlots();
* according to synchronized LSNs of replication slots. The slot's LSN
* might be advanced concurrently, so we call this before
* CheckPointReplicationSlots() synchronizes replication slots.
+ *
+ * We acquire the Allocation lock to serialize the minimum LSN calculation
+ * with concurrent slot WAL reservation. This ensures that the WAL
+ * position being reserved is either included in the miminum LSN or is
+ * beyond or equal to the redo pointer of the current checkpoint (See
+ * ReplicationSlotReserveWal for details).
*/
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ LWLockRelease(ReplicationSlotAllocationLock);
if (log_checkpoints)
LogCheckpointStart(flags, true);
/*
* Recalculate the current minimum LSN to be used in the WAL segment
* cleanup. Then, we must synchronize the replication slots again in
- * order to make this LSN safe to use.
+ * order to make this LSN safe to use. Here, we don't need to acquire
+ * the ReplicationSlotAllocationLock to serialize the minimum LSN
+ * computation with slot reservation as the RedoRecPtr is not updated
+ * after the previous computation of minimum LSN.
*/
slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
CheckPointReplicationSlots();
ReplicationSlotReserveWal(void)
{
ReplicationSlot *slot = MyReplicationSlot;
+ XLogSegNo segno;
+ XLogRecPtr restart_lsn;
Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
/*
- * The replication slot mechanism is used to prevent removal of required
- * WAL. As there is no interlock between this routine and checkpoints, WAL
- * segments could concurrently be removed when a now stale return value of
- * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
- * this happens we'll just retry.
+ * The replication slot mechanism is used to prevent the removal of
+ * required WAL.
+ *
+ * Acquire an exclusive lock to prevent the checkpoint process from
+ * concurrently computing the minimum slot LSN (see the call to
+ * XLogGetReplicationSlotMinimumLSN in CreateCheckPoint). This ensures
+ * that the WAL reserved for replication cannot be removed during a
+ * checkpoint.
+ *
+ * The mechanism is reliable because if WAL reservation occurs first, the
+ * checkpoint must wait for the restart_lsn update before determining the
+ * minimum non-removable LSN. On the other hand, if the checkpoint happens
+ * first, subsequent WAL reservations will select positions at or beyond
+ * the redo pointer of that checkpoint.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+ /*
+ * For logical slots log a standby snapshot and start logical decoding at
+ * exactly that position. That allows the slot to start up more quickly.
+ *
+ * That's not needed (or indeed helpful) for physical slots as they'll
+ * start replay at the last logged checkpoint anyway. Instead return the
+ * location of the last redo LSN, where a base backup has to start replay
+ * at.
*/
- while (true)
+ if (!RecoveryInProgress() && SlotIsLogical(slot))
{
- XLogSegNo segno;
- XLogRecPtr restart_lsn;
+ XLogRecPtr flushptr;
- /*
- * For logical slots log a standby snapshot and start logical decoding
- * at exactly that position. That allows the slot to start up more
- * quickly.
- *
- * That's not needed (or indeed helpful) for physical slots as they'll
- * start replay at the last logged checkpoint anyway. Instead return
- * the location of the last redo LSN. While that slightly increases
- * the chance that we have to retry, it's where a base backup has to
- * start replay at.
- */
- if (!RecoveryInProgress() && SlotIsLogical(slot))
- {
- XLogRecPtr flushptr;
+ /* start at current insert position */
+ restart_lsn = GetXLogInsertRecPtr();
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
- /* start at current insert position */
- restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
+ /* make sure we have enough information to start */
+ flushptr = LogStandbySnapshot();
- /* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
+ /* and make sure it's fsynced to disk */
+ XLogFlush(flushptr);
+ }
+ else
+ {
+ restart_lsn = GetRedoRecPtr();
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+ }
- /* and make sure it's fsynced to disk */
- XLogFlush(flushptr);
- }
- else
- {
- restart_lsn = GetRedoRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
- }
+ /* prevent WAL removal as fast as possible */
+ ReplicationSlotsComputeRequiredLSN();
- /* prevent WAL removal as fast as possible */
- ReplicationSlotsComputeRequiredLSN();
+ /* Checkpoint shouldn't remove the required WAL. */
+ XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+ if (XLogGetLastRemovedSegno() >= segno)
+ elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+ NameStr(slot->data.name));
- /*
- * If all required WAL is still there, great, otherwise retry. The
- * slot should prevent further removal of WAL, unless there's a
- * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
- * the new restart_lsn above, so normally we should never need to loop
- * more than twice.
- */
- XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
- if (XLogGetLastRemovedSegno() < segno)
- break;
- }
+ LWLockRelease(ReplicationSlotAllocationLock);
}
/*