am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
- GetXLogReplayRecPtr(&currTLI);
+ {
+ TimeLineID insertTLI;
+
+ /*
+ * If the insertion timeline has already been set, use it.
+ * InsertTimeLineID is set before the WAL segments of the old timeline
+ * are removed, before SharedRecoveryState switches to
+ * RECOVERY_STATE_DONE.
+ *
+ * There is a window where RecoveryInProgress() still returns true but
+ * the old timeline's WAL segments have already been removed or
+ * recycled. Using the WAL insertion timeline avoids attempting to
+ * read from those removed segments, improving availability, and is a
+ * safe thing to do as promotion copies the contents in the last
+ * segment of the old timeline to the first segment of the new
+ * timeline, up to the switchpoint.
+ */
+ insertTLI = GetWALInsertionTimeLineIfSet();
+ if (insertTLI != 0)
+ currTLI = insertTLI;
+ else
+ GetXLogReplayRecPtr(&currTLI);
+ }
else
currTLI = GetWALInsertionTimeLine();
'got same expected output from pg_recvlogical decoding session on cascading standby'
);
+##################################################
+# Test that logical decoding on standby correctly handles a timeline
+# change during promotion. This relies on an injection point that
+# waits between the moment the segments of the old timeline are removed
+# and the moment RecoveryInProgress() would set, catching that a WAL
+# sender is still able to decode changes across a promotion.
+##################################################
+
+# Create a logical slot on the cascading standby for this test.
+$node_cascading_standby->create_logical_slot_on_standby($node_standby,
+ 'race_slot', 'testdb');
+
+$node_standby->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(10,13) s;]
+);
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:10 y[text]:'10'
+table public.decoding_test: INSERT: x[integer]:11 y[text]:'11'
+table public.decoding_test: INSERT: x[integer]:12 y[text]:'12'
+table public.decoding_test: INSERT: x[integer]:13 y[text]:'13'
+COMMIT};
+
+$node_standby->safe_psql('testdb', 'CREATE EXTENSION injection_points;');
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+# Attach injection point to pause startup after WAL segment cleanup
+# but before RecoveryInProgress() flips to false.
+$node_cascading_standby->safe_psql('testdb',
+ "SELECT injection_points_attach('promotion-after-wal-segment-cleanup', 'wait');"
+);
+
+# Promote, wait for the removal of the segments on the old timeline.
+$node_cascading_standby->safe_psql('testdb', "SELECT pg_promote(false)");
+$node_cascading_standby->wait_for_event('startup',
+ 'promotion-after-wal-segment-cleanup');
+
+# Start pg_recvlogical.
+my ($stdout2, $stderr2);
+my $handle2 = IPC::Run::start(
+ [
+ 'pg_recvlogical',
+ '--dbname' => $node_cascading_standby->connstr('testdb'),
+ '--slot' => 'race_slot',
+ '--option' => 'include-xids=0',
+ '--option' => 'skip-empty-xacts=1',
+ '--file' => '-',
+ '--no-loop',
+ '--start',
+ ],
+ '>' => \$stdout2,
+ '2>' => \$stderr2,
+ IPC::Run::timeout($default_timeout));
+
+# Verify that pg_recvlogical successfully decodes the data while startup
+# is still paused in the injection point.
+$pump_timeout = IPC::Run::timer($default_timeout);
+ok( pump_until($handle2, $pump_timeout, \$stdout2, qr/COMMIT/s),
+ 'pg_recvlogical works during promotion timeline switch');
+chomp($stdout2);
+is($stdout2, $expected,
+ 'got expected output from pg_recvlogical during promotion timeline switch'
+);
+
+# Resume promotion.
+$node_cascading_standby->safe_psql('testdb',
+ "SELECT injection_points_wakeup('promotion-after-wal-segment-cleanup');");
+
done_testing();