* Reserve WAL for the currently active local slot using the specified WAL
* location (restart_lsn).
*
- * If the given WAL location has been removed, reserve WAL using the oldest
- * existing WAL segment.
+ * If the given WAL location has been removed or is at risk of removal,
+ * reserve WAL using the oldest segment that is non-removable.
*/
static void
reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
{
- XLogSegNo oldest_segno;
+ XLogRecPtr slot_min_lsn;
+ XLogRecPtr min_safe_lsn;
XLogSegNo segno;
ReplicationSlot *slot = MyReplicationSlot;
Assert(slot != NULL);
Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
- while (true)
- {
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
-
- /* Prevent WAL removal as fast as possible */
- ReplicationSlotsComputeRequiredLSN();
-
- XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+ /*
+ * Acquire an exclusive lock to prevent the checkpoint process from
+ * concurrently calculating the minimum slot LSN (see
+ * CheckPointReplicationSlots), ensuring that if WAL reservation occurs
+ * first, the checkpoint must wait for the restart_lsn update before
+ * calculating the minimum LSN.
+ *
+ * Note: Unlike ReplicationSlotReserveWal(), this lock does not protect a
+ * newly synced slot from being invalidated if a concurrent checkpoint has
+ * invoked CheckPointReplicationSlots() before the WAL reservation here.
+ * This can happen because the initial restart_lsn received from the
+ * remote server can precede the redo pointer. Therefore, when selecting
+ * the initial restart_lsn, we consider using the redo pointer or the
+ * minimum slot LSN (if those values are greater than the remote
+ * restart_lsn) instead of relying solely on the remote value.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
- /*
- * Find the oldest existing WAL segment file.
- *
- * Normally, we can determine it by using the last removed segment
- * number. However, if no WAL segment files have been removed by a
- * checkpoint since startup, we need to search for the oldest segment
- * file from the current timeline existing in XLOGDIR.
- *
- * XXX: Currently, we are searching for the oldest segment in the
- * current timeline as there is less chance of the slot's restart_lsn
- * from being some prior timeline, and even if it happens, in the
- * worst case, we will wait to sync till the slot's restart_lsn moved
- * to the current timeline.
- */
- oldest_segno = XLogGetLastRemovedSegno() + 1;
+ /*
+ * Determine the minimum non-removable LSN by comparing the redo pointer
+ * with the minimum slot LSN.
+ *
+ * The minimum slot LSN is considered because the redo pointer advances at
+ * every checkpoint, even when replication slots are present on the
+ * standby. In such scenarios, the redo pointer can exceed the remote
+ * restart_lsn, while WALs preceding the remote restart_lsn remain
+ * protected by a local replication slot.
+ */
+ min_safe_lsn = GetRedoRecPtr();
+ slot_min_lsn = XLogGetReplicationSlotMinimumLSN();
- if (oldest_segno == 1)
- {
- TimeLineID cur_timeline;
+ if (XLogRecPtrIsValid(slot_min_lsn) && min_safe_lsn > slot_min_lsn)
+ min_safe_lsn = slot_min_lsn;
- GetWalRcvFlushRecPtr(NULL, &cur_timeline);
- oldest_segno = XLogGetOldestSegno(cur_timeline);
- }
+ /*
+ * If the minimum safe LSN is greater than the given restart_lsn, use it
+ * as the initial restart_lsn for the newly synced slot. Otherwise, use
+ * the given remote restart_lsn.
+ */
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = Max(restart_lsn, min_safe_lsn);
+ SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
- segno, oldest_segno);
+ ReplicationSlotsComputeRequiredLSN();
- /*
- * If all required WAL is still there, great, otherwise retry. The
- * slot should prevent further removal of WAL, unless there's a
- * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
- * the new restart_lsn above, so normally we should never need to loop
- * more than twice.
- */
- if (segno >= oldest_segno)
- break;
+ XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+ if (XLogGetLastRemovedSegno() >= segno)
+ elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+ NameStr(slot->data.name));
- /* Retry using the location of the oldest wal segment */
- XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
- }
+ LWLockRelease(ReplicationSlotAllocationLock);
}
/*
my ($node, $result);
$node = PostgreSQL::Test::Cluster->new('mike');
-$node->init;
-$node->append_conf('postgresql.conf', "wal_level = 'logical'");
+$node->init(allows_streaming => 'logical');
$node->start;
# Check if the extension injection_points is available, as it may be
};
is($@, '', "Logical slot still valid");
+# Verify that the synchronized slots won't be invalidated immediately after
+# synchronization in the presence of a concurrent checkpoint.
+my $primary = $node;
+
+$primary->append_conf('postgresql.conf', "autovacuum = off");
+$primary->reload;
+
+my $backup_name = 'backup';
+
+$primary->backup($backup_name);
+
+# Create a standby
+my $standby = PostgreSQL::Test::Cluster->new('standby');
+$standby->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'phys_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+$primary->safe_psql('postgres',
+ q{SELECT pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true);
+ SELECT pg_create_physical_replication_slot('phys_slot');}
+);
+
+$standby->start;
+
+# Generate some activity and switch WAL file on the primary
+$primary->advance_wal(1);
+$primary->safe_psql('postgres', "CHECKPOINT");
+$primary->wait_for_replay_catchup($standby);
+
+# checkpoint on the standby and make it wait on the injection point so that the
+# checkpoint stops right before invalidating replication slots.
+note('starting checkpoint');
+
+$checkpoint = $standby->background_psql('postgres');
+$checkpoint->query_safe(
+ q(select injection_points_attach('restartpoint-before-slot-invalidation','wait'))
+);
+$checkpoint->query_until(
+ qr/starting_checkpoint/,
+ q(\echo starting_checkpoint
+checkpoint;
+));
+
+# Wait until the checkpoint stops right before invalidating slots
+note('waiting for injection_point');
+$standby->wait_for_event('checkpointer', 'restartpoint-before-slot-invalidation');
+note('injection_point is reached');
+
+# Enable slot sync worker to synchronize the failover slot to the standby
+$standby->append_conf('postgresql.conf', qq(sync_replication_slots = on));
+$standby->reload;
+
+# Wait for the slot to be synced
+$standby->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) > 0 FROM pg_replication_slots WHERE slot_name = 'failover_slot'");
+
+# Release the checkpointer
+$standby->safe_psql('postgres',
+ q{select injection_points_wakeup('restartpoint-before-slot-invalidation');
+ select injection_points_detach('restartpoint-before-slot-invalidation')});
+
+$checkpoint->quit;
+
+# Confirm that the slot is not invalidated
+is( $standby->safe_psql(
+ 'postgres',
+ q{SELECT invalidation_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'failover_slot';}
+ ),
+ "t",
+ 'logical slot is not invalidated');
+
done_testing();