# These allow us to stop WAL streaming so waiters block, then resume it.
my $saved_primary_conninfo;
+# Stop the walreceiver on $node by clearing primary_conninfo and waiting
+# until pg_stat_wal_receiver becomes empty. Used to freeze the
+# walreceiver-tracked positions (writtenUpto, flushedUpto) so a fencepost
+# test can rely on them not advancing. The previous value is saved for
+# resume_walreceiver().
sub stop_walreceiver
{
my ($node) = @_;
"SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
}
+# Restart the walreceiver on $node by restoring primary_conninfo to the
+# value captured by stop_walreceiver() and waiting until walreceiver
+# reconnects. Must be paired with a prior stop_walreceiver() call.
sub resume_walreceiver
{
my ($node) = @_;
"SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
}
+# Verify the wait predicate "target <= currentLSN" at the boundary.
+# Given $current_lsn (the frozen position for $mode), check that:
+# target == current -> success (predicate is <=)
+# target == current - 1 -> success
+# target == current + 1 -> timeout
+# The caller must ensure that the relevant LSN position on $node is
+# actually frozen (e.g. walreceiver stopped and replay paused), otherwise
+# the "+1" case may racily succeed. Returns ($lsn_minus, $lsn_plus) so
+# the caller can reuse them, e.g. to drive an async wakeup test.
+sub check_wait_for_lsn_fencepost
+{
+ my ($node, $mode, $current_lsn, $label) = @_;
+
+ my $lsn_minus = $node->safe_psql('postgres',
+ "SELECT ('$current_lsn'::pg_lsn - 1)::text");
+ my $lsn_plus = $node->safe_psql('postgres',
+ "SELECT ('$current_lsn'::pg_lsn + 1)::text");
+
+ foreach my $case (
+ [ $current_lsn, 'success', 'target == current succeeds', '5s' ],
+ [ $lsn_minus, 'success', 'target == current - 1 succeeds', '5s' ],
+ [ $lsn_plus, 'timeout', 'target == current + 1 times out', '500ms' ])
+ {
+ my ($target_lsn, $expected, $desc, $timeout) = @$case;
+ my $output = $node->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${target_lsn}'
+ WITH (MODE '$mode', timeout '$timeout', no_throw);]);
+
+ is($output, $expected, "$label: $desc");
+ }
+
+ return ($lsn_minus, $lsn_plus);
+}
+
# Initialize primary node
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
$node_primary->init(allows_streaming => 1);
$arc_standby->stop;
$arc_primary->stop;
+# 10. Fresh-shmem walreceiver startup (29e7dbf5e4d).
+# RequestXLogStreaming() initializes writtenUpto/flushedUpto to the
+# segment-aligned receiveStart only when receiveStart was invalid.
+# Restart the standby with the primary stopped, so the walreceiver cannot
+# connect and advance these values past the initial one before we observe it.
+
+my $rcv_primary = PostgreSQL::Test::Cluster->new('rcv_primary');
+$rcv_primary->init(allows_streaming => 1);
+# No background WAL during our probes.
+$rcv_primary->append_conf('postgresql.conf', 'autovacuum = off');
+$rcv_primary->start;
+$rcv_primary->safe_psql('postgres',
+ "CREATE TABLE rcv_test AS SELECT generate_series(1,10) AS a");
+
+my $rcv_backup = 'rcv_backup';
+$rcv_primary->backup($rcv_backup);
+
+my $rcv_standby = PostgreSQL::Test::Cluster->new('rcv_standby');
+$rcv_standby->init_from_backup($rcv_primary, $rcv_backup, has_streaming => 1);
+$rcv_standby->start;
+
+# Switch WAL segments mid-stream so the replay ends mid-segment after the
+# upcoming standby restart. That guarantees the initial value <
+# final replay LSN.
+$rcv_primary->safe_psql('postgres',
+ "INSERT INTO rcv_test VALUES (generate_series(11, 100))");
+$rcv_primary->safe_psql('postgres', "SELECT pg_switch_wal()");
+$rcv_primary->safe_psql('postgres',
+ "INSERT INTO rcv_test VALUES (generate_series(101, 110))");
+$rcv_primary->wait_for_catchup($rcv_standby);
+
+# Restart the standby with the primary down: WalRcvData is initialized, but
+# the walreceiver cannot connect and update writtenUpto/flushedUpto. So,
+# the initial flushedUpto stays observable via pg_last_wal_receive_lsn().
+$rcv_standby->stop;
+$rcv_primary->stop;
+$rcv_standby->start;
+
+$rcv_standby->poll_query_until('postgres',
+ "SELECT pg_last_wal_receive_lsn() IS NOT NULL;")
+ or die "walreceiver initial value did not become visible";
+
+# Freeze the replay so the (received, replay] window stays observable.
+$rcv_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause()");
+$rcv_standby->poll_query_until('postgres',
+ "SELECT pg_get_wal_replay_pause_state() = 'paused'")
+ or die "Timed out waiting for rcv_standby replay to pause";
+
+my $rcv_receive =
+ $rcv_standby->safe_psql('postgres', "SELECT pg_last_wal_receive_lsn()");
+my $rcv_replay =
+ $rcv_standby->safe_psql('postgres', "SELECT pg_last_wal_replay_lsn()");
+my $rcv_gap = $rcv_standby->safe_psql('postgres',
+ "SELECT pg_wal_lsn_diff('$rcv_replay'::pg_lsn, '$rcv_receive'::pg_lsn) > 0"
+);
+ok($rcv_gap eq 't',
+ "replay sits ahead of initial walreceiver flush position");
+
+my $rcv_receive_offset = $rcv_standby->safe_psql(
+ 'postgres',
+ "SELECT mod(pg_wal_lsn_diff('$rcv_receive'::pg_lsn, '0/0'::pg_lsn),
+ setting::numeric)::int
+ FROM pg_settings
+ WHERE name = 'wal_segment_size'");
+is($rcv_receive_offset, '0',
+ "initial walreceiver flush position is segment-aligned");
+
+# WAIT FOR an $rcv_replay LSN succeeds in standby_write / standby_flush
+# modes thanks to GetCurrentLSNForWaitType() taking replay LSN as the floor.
+# We observe flushedUpto directly via pg_last_wal_receive_lsn(). writtenUpto
+# is covered indirectly: without the replay-position floor, standby_write would
+# wait at the seeded segment-start position and time out.
+foreach my $rcv_mode ('standby_write', 'standby_flush')
+{
+ $output = $rcv_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${rcv_replay}'
+ WITH (MODE '$rcv_mode', timeout '5s', no_throw);]);
+ ok($output eq "success",
+ "$rcv_mode succeeds for already-replayed LSN after standby restart");
+}
+
+# Restore primary and resume replay so section 11 can reuse the clusters.
+# Generate fresh WAL after reconnecting so the walreceiver advances its
+# flush position past the replay position before we freeze both frontiers.
+$rcv_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()");
+$rcv_primary->start;
+$rcv_primary->safe_psql('postgres',
+ "INSERT INTO rcv_test VALUES (generate_series(111, 120))");
+$rcv_primary->wait_for_catchup($rcv_standby);
+
+# 11. Off-by-one boundary checks for the wait predicate target <=
+# currentLSN. Stop the walreceiver before pausing replay (stopping
+# after pause can hang -- see section 7d) so both replay and
+# walreceiver positions are frozen.
+stop_walreceiver($rcv_standby);
+$rcv_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause()");
+$rcv_standby->poll_query_until('postgres',
+ "SELECT pg_get_wal_replay_pause_state() = 'paused'")
+ or die "Timed out waiting for rcv_standby replay to pause";
+
+# 11a. standby_replay exact fencepost. The replay position is frozen, so this
+# probes the standby_replay predicate directly.
+my $replay_lsn =
+ $rcv_standby->safe_psql('postgres', "SELECT pg_last_wal_replay_lsn()");
+my (undef, $replay_lsn_plus) =
+ check_wait_for_lsn_fencepost($rcv_standby, 'standby_replay', $replay_lsn,
+ 'standby_replay');
+
+# 11b. standby_flush exact fencepost. pg_last_wal_receive_lsn() exposes the
+# flushed walreceiver position even after walreceiver exits, so this probes
+# the standby_flush predicate directly. standby_write has no stable
+# SQL-visible boundary once walreceiver is stopped; it is covered by the
+# replay-floor and waiter wakeup tests above.
+my $flush_lsn =
+ $rcv_standby->safe_psql('postgres', "SELECT pg_last_wal_receive_lsn()");
+my $flush_covers_replay = $rcv_standby->safe_psql('postgres',
+ "SELECT pg_wal_lsn_diff('$flush_lsn'::pg_lsn, '$replay_lsn'::pg_lsn) >= 0"
+);
+ok($flush_covers_replay eq 't',
+ "standby_flush boundary is not masked by replay floor");
+
+check_wait_for_lsn_fencepost($rcv_standby, 'standby_flush', $flush_lsn,
+ 'standby_flush');
+
+# 11c. A sleeping waiter at current + 1 wakes once replay advances
+# past it. Start the waiter while replay is still paused so it is
+# guaranteed to sleep at replay_lsn_plus regardless of whether
+# flush_lsn > replay_lsn. Then resume replay and restart the
+# walreceiver to deliver new WAL.
+$rcv_primary->safe_psql('postgres',
+ "INSERT INTO rcv_test VALUES (generate_series(200, 210))");
+
+my $boundary_session = $rcv_standby->background_psql('postgres');
+$boundary_session->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${replay_lsn_plus}'
+ WITH (MODE 'standby_replay', timeout '1d', no_throw);
+]);
+
+$rcv_standby->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM pg_stat_activity WHERE wait_event = 'WaitForWalReplay'"
+) or die "Boundary waiter did not sleep";
+
+$rcv_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()");
+resume_walreceiver($rcv_standby);
+$boundary_session->quit;
+chomp($boundary_session->{stdout});
+is($boundary_session->{stdout},
+ 'success',
+ "standby_replay: waiter at current + 1 wakes when replay advances");
+
+$rcv_standby->stop;
+$rcv_primary->stop;
+
+# 12. Timeline switch on a cascade standby. A WAIT FOR LSN waiter on
+# a cascade standby must survive its upstream's promotion: the
+# cascade walreceiver reconnects on the new timeline and replay
+# continues across the boundary.
+
+my $tl_primary = PostgreSQL::Test::Cluster->new('tl_primary');
+$tl_primary->init(allows_streaming => 1);
+$tl_primary->append_conf('postgresql.conf', 'autovacuum = off');
+$tl_primary->start;
+$tl_primary->safe_psql('postgres',
+ "CREATE TABLE tl_test AS SELECT generate_series(1, 10) AS a");
+
+my $tl_backup = 'tl_backup';
+$tl_primary->backup($tl_backup);
+
+my $tl_standby1 = PostgreSQL::Test::Cluster->new('tl_standby1');
+$tl_standby1->init_from_backup($tl_primary, $tl_backup, has_streaming => 1);
+$tl_standby1->start;
+
+# standby2 cascades from standby1.
+my $tl_backup2 = 'tl_backup2';
+$tl_standby1->backup($tl_backup2);
+
+my $tl_standby2 = PostgreSQL::Test::Cluster->new('tl_standby2');
+$tl_standby2->init_from_backup($tl_standby1, $tl_backup2, has_streaming => 1);
+$tl_standby2->start;
+
+$tl_primary->safe_psql('postgres',
+ "INSERT INTO tl_test VALUES (generate_series(11, 20))");
+$tl_primary->wait_for_catchup($tl_standby1);
+$tl_standby1->wait_for_catchup($tl_standby2);
+
+# Target LSN well past current insert LSN, so reaching it requires
+# WAL produced on the new timeline. Pause replay on standby2 to
+# guarantee the waiter is asleep when the switch happens.
+my $tl_target = $tl_primary->safe_psql('postgres',
+ "SELECT (pg_current_wal_insert_lsn() + 65536)::text");
+
+$tl_standby2->safe_psql('postgres', "SELECT pg_wal_replay_pause()");
+$tl_standby2->poll_query_until('postgres',
+ "SELECT pg_get_wal_replay_pause_state() = 'paused'")
+ or die "Timed out waiting for tl_standby2 replay to pause";
+
+my $tl_session = $tl_standby2->background_psql('postgres');
+$tl_session->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${tl_target}'
+ WITH (MODE 'standby_replay', timeout '1d', no_throw);
+]);
+
+$tl_standby2->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM pg_stat_activity WHERE wait_event = 'WaitForWalReplay'"
+) or die "Cascade waiter did not sleep before promotion";
+
+# Promote standby1 to TLI 2; produce enough WAL on the new timeline
+# to push past tl_target and force a segment switch.
+$tl_standby1->promote;
+$tl_standby1->safe_psql('postgres',
+ "INSERT INTO tl_test VALUES (generate_series(21, 1020))");
+$tl_standby1->safe_psql('postgres', "SELECT pg_switch_wal()");
+
+$tl_standby2->safe_psql('postgres', "SELECT pg_wal_replay_resume()");
+
+$tl_standby2->poll_query_until('postgres',
+ "SELECT received_tli > 1 FROM pg_stat_wal_receiver")
+ or die "tl_standby2 did not follow upstream timeline switch";
+
+$tl_session->quit;
+chomp($tl_session->{stdout});
+is($tl_session->{stdout}, 'success',
+ "WAIT FOR LSN survives upstream promotion and timeline switch on cascade standby"
+);
+
+$tl_standby2->stop;
+$tl_standby1->stop;
+$tl_primary->stop;
+
done_testing();