From: Amit Kapila Date: Tue, 27 Jan 2026 05:45:25 +0000 (+0000) Subject: Prevent invalidation of newly synced replication slots. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=919c9fa13cd0684b437a88719d670a9bf6dd0dc8;p=thirdparty%2Fpostgresql.git Prevent invalidation of newly synced replication slots. A race condition could cause a newly synced replication slot to become invalidated between its initial sync and the checkpoint. When syncing a replication slot to a standby, the slot's initial restart_lsn is taken from the publisher's remote_restart_lsn. Because slot sync happens asynchronously, this value can lag behind the standby's current redo pointer. Without any interlocking between WAL reservation and checkpoints, a checkpoint may remove WAL required by the newly synced slot, causing the slot to be invalidated. To fix this, we acquire ReplicationSlotAllocationLock before reserving WAL for a newly synced slot, similar to commit 006dd4b2e5. This ensures that if WAL reservation happens first, the checkpoint process must wait for slotsync to update the slot's restart_lsn before it computes the minimum required LSN. However, unlike in ReplicationSlotReserveWal(), this lock alone cannot protect a newly synced slot if a checkpoint has already run CheckPointReplicationSlots() before slotsync updates the slot. In such cases, the remote restart_lsn may be stale and earlier than the current redo pointer. To prevent relying on an outdated LSN, we use the oldest WAL location available if it is greater than the remote restart_lsn. This ensures that newly synced slots always start with a safe, non-stale restart_lsn and are not invalidated by concurrent checkpoints. Author: Zhijie Hou Reviewed-by: Hayato Kuroda Reviewed-by: Amit Kapila Reviewed-by: Vitaly Davydov Reviewed-by: Chao Li Backpatch-through: 17 Discussion: https://postgr.es/m/TY4PR01MB16907E744589B1AB2EE89A31F94D7A%40TY4PR01MB16907.jpnprd01.prod.outlook.com --- diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 596d2ca5836..d03db6b1f85 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -668,7 +668,6 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, TimeLineID newTLI); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); -static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic); @@ -2676,7 +2675,7 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) * Return the oldest LSN we must retain to satisfy the needs of some * replication slot. */ -static XLogRecPtr +XLogRecPtr XLogGetReplicationSlotMinimumLSN(void) { XLogRecPtr retval; @@ -7812,6 +7811,9 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); + + INJECTION_POINT("restartpoint-before-slot-invalidation", NULL); + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 64db4fedbe8..f9e19771aeb 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -483,70 +483,71 @@ drop_local_obsolete_slots(List *remote_slot_list) * Reserve WAL for the currently active local slot using the specified WAL * location (restart_lsn). * - * If the given WAL location has been removed, reserve WAL using the oldest - * existing WAL segment. + * If the given WAL location has been removed or is at risk of removal, + * reserve WAL using the oldest segment that is non-removable. */ static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn) { - XLogSegNo oldest_segno; + XLogRecPtr slot_min_lsn; + XLogRecPtr min_safe_lsn; XLogSegNo segno; ReplicationSlot *slot = MyReplicationSlot; Assert(slot != NULL); - Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn)); + Assert(!XLogRecPtrIsValid(slot->data.restart_lsn)); - while (true) - { - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - - /* Prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - - XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + /* + * Acquire an exclusive lock to prevent the checkpoint process from + * concurrently calculating the minimum slot LSN (see + * CheckPointReplicationSlots), ensuring that if WAL reservation occurs + * first, the checkpoint must wait for the restart_lsn update before + * calculating the minimum LSN. + * + * Note: Unlike ReplicationSlotReserveWal(), this lock does not protect a + * newly synced slot from being invalidated if a concurrent checkpoint has + * invoked CheckPointReplicationSlots() before the WAL reservation here. + * This can happen because the initial restart_lsn received from the + * remote server can precede the redo pointer. Therefore, when selecting + * the initial restart_lsn, we consider using the redo pointer or the + * minimum slot LSN (if those values are greater than the remote + * restart_lsn) instead of relying solely on the remote value. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); - /* - * Find the oldest existing WAL segment file. - * - * Normally, we can determine it by using the last removed segment - * number. However, if no WAL segment files have been removed by a - * checkpoint since startup, we need to search for the oldest segment - * file from the current timeline existing in XLOGDIR. - * - * XXX: Currently, we are searching for the oldest segment in the - * current timeline as there is less chance of the slot's restart_lsn - * from being some prior timeline, and even if it happens, in the - * worst case, we will wait to sync till the slot's restart_lsn moved - * to the current timeline. - */ - oldest_segno = XLogGetLastRemovedSegno() + 1; + /* + * Determine the minimum non-removable LSN by comparing the redo pointer + * with the minimum slot LSN. + * + * The minimum slot LSN is considered because the redo pointer advances at + * every checkpoint, even when replication slots are present on the + * standby. In such scenarios, the redo pointer can exceed the remote + * restart_lsn, while WALs preceding the remote restart_lsn remain + * protected by a local replication slot. + */ + min_safe_lsn = GetRedoRecPtr(); + slot_min_lsn = XLogGetReplicationSlotMinimumLSN(); - if (oldest_segno == 1) - { - TimeLineID cur_timeline; + if (XLogRecPtrIsValid(slot_min_lsn) && min_safe_lsn > slot_min_lsn) + min_safe_lsn = slot_min_lsn; - GetWalRcvFlushRecPtr(NULL, &cur_timeline); - oldest_segno = XLogGetOldestSegno(cur_timeline); - } + /* + * If the minimum safe LSN is greater than the given restart_lsn, use it + * as the initial restart_lsn for the newly synced slot. Otherwise, use + * the given remote restart_lsn. + */ + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = Max(restart_lsn, min_safe_lsn); + SpinLockRelease(&slot->mutex); - elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available", - segno, oldest_segno); + ReplicationSlotsComputeRequiredLSN(); - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - if (segno >= oldest_segno) - break; + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + if (XLogGetLastRemovedSegno() >= segno) + elog(ERROR, "WAL required by replication slot %s has been removed concurrently", + NameStr(slot->data.name)); - /* Retry using the location of the oldest wal segment */ - XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn); - } + LWLockRelease(ReplicationSlotAllocationLock); } /* diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index adddac6710e..0992c27b428 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -215,6 +215,7 @@ extern XLogSegNo XLogGetLastRemovedSegno(void); extern XLogSegNo XLogGetOldestSegno(TimeLineID tli); extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN); extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); +extern XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); extern void xlog_redo(struct XLogReaderState *record); extern void xlog_desc(StringInfo buf, struct XLogReaderState *record); diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl index 4fd709e3a03..8268dbb79c7 100644 --- a/src/test/recovery/t/046_checkpoint_logical_slot.pl +++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl @@ -20,8 +20,7 @@ if ($ENV{enable_injection_points} ne 'yes') my ($node, $result); $node = PostgreSQL::Test::Cluster->new('mike'); -$node->init; -$node->append_conf('postgresql.conf', "wal_level = 'logical'"); +$node->init(allows_streaming => 'logical'); $node->start; # Check if the extension injection_points is available, as it may be @@ -139,4 +138,85 @@ eval { }; is($@, '', "Logical slot still valid"); +# Verify that the synchronized slots won't be invalidated immediately after +# synchronization in the presence of a concurrent checkpoint. +my $primary = $node; + +$primary->append_conf('postgresql.conf', "autovacuum = off"); +$primary->reload; + +my $backup_name = 'backup'; + +$primary->backup($backup_name); + +# Create a standby +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'phys_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->safe_psql('postgres', + q{SELECT pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true); + SELECT pg_create_physical_replication_slot('phys_slot');} +); + +$standby->start; + +# Generate some activity and switch WAL file on the primary +$primary->advance_wal(1); +$primary->safe_psql('postgres', "CHECKPOINT"); +$primary->wait_for_replay_catchup($standby); + +# checkpoint on the standby and make it wait on the injection point so that the +# checkpoint stops right before invalidating replication slots. +note('starting checkpoint'); + +$checkpoint = $standby->background_psql('postgres'); +$checkpoint->query_safe( + q(select injection_points_attach('restartpoint-before-slot-invalidation','wait')) +); +$checkpoint->query_until( + qr/starting_checkpoint/, + q(\echo starting_checkpoint +checkpoint; +)); + +# Wait until the checkpoint stops right before invalidating slots +note('waiting for injection_point'); +$standby->wait_for_event('checkpointer', 'restartpoint-before-slot-invalidation'); +note('injection_point is reached'); + +# Enable slot sync worker to synchronize the failover slot to the standby +$standby->append_conf('postgresql.conf', qq(sync_replication_slots = on)); +$standby->reload; + +# Wait for the slot to be synced +$standby->poll_query_until( + 'postgres', + "SELECT COUNT(*) > 0 FROM pg_replication_slots WHERE slot_name = 'failover_slot'"); + +# Release the checkpointer +$standby->safe_psql('postgres', + q{select injection_points_wakeup('restartpoint-before-slot-invalidation'); + select injection_points_detach('restartpoint-before-slot-invalidation')}); + +$checkpoint->quit; + +# Confirm that the slot is not invalidated +is( $standby->safe_psql( + 'postgres', + q{SELECT invalidation_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'failover_slot';} + ), + "t", + 'logical slot is not invalidated'); + done_testing();