<refnamediv>
<refname>WAIT FOR</refname>
- <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose>
+ <refpurpose>wait for WAL to reach a target <acronym>LSN</acronym></refpurpose>
</refnamediv>
<refsynopsisdiv>
<synopsis>
-WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>'
+ [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
<phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
+ MODE '<replaceable class="parameter">mode</replaceable>'
TIMEOUT '<replaceable class="parameter">timeout</replaceable>'
NO_THROW
+
+<phrase>and <replaceable class="parameter">mode</replaceable> can be:</phrase>
+
+ standby_replay | standby_write | standby_flush | primary_flush
</synopsis>
</refsynopsisdiv>
<title>Description</title>
<para>
- Waits until recovery replays <parameter>lsn</parameter>.
- If no <parameter>timeout</parameter> is specified or it is set to
- zero, this command waits indefinitely for the
- <parameter>lsn</parameter>.
- On timeout, or if the server is promoted before
- <parameter>lsn</parameter> is reached, an error is emitted,
- unless <literal>NO_THROW</literal> is specified in the WITH clause.
- If <parameter>NO_THROW</parameter> is specified, then the command
- doesn't throw errors.
+ Waits until the specified <parameter>lsn</parameter> is reached
+ according to the specified <parameter>mode</parameter>,
+ which determines whether to wait for WAL to be written, flushed, or replayed.
+ If no <parameter>timeout</parameter> is specified or it is set to
+ zero, this command waits indefinitely for the
+ <parameter>lsn</parameter>.
+ </para>
+
+ <para>
+ On timeout, an error is emitted unless <literal>NO_THROW</literal>
+ is specified in the WITH clause. For standby modes
+ (<literal>standby_replay</literal>, <literal>standby_write</literal>,
+ <literal>standby_flush</literal>), an error is also emitted if the
+ server is promoted before the <parameter>lsn</parameter> is reached.
+ If <parameter>NO_THROW</parameter> is specified, the command returns
+ a status string instead of throwing errors.
</para>
<para>
- The possible return values are <literal>success</literal>,
- <literal>timeout</literal>, and <literal>not in recovery</literal>.
+ The possible return values are <literal>success</literal>,
+ <literal>timeout</literal>, and <literal>not in recovery</literal>.
</para>
</refsect1>
The following parameters are supported:
<variablelist>
+ <varlistentry>
+ <term><literal>MODE</literal> '<replaceable class="parameter">mode</replaceable>'</term>
+ <listitem>
+ <para>
+ Specifies the type of LSN processing to wait for. If not specified,
+ the default is <literal>standby_replay</literal>. The valid modes are:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>standby_replay</literal>: Wait for the LSN to be replayed
+ (applied to the database) on a standby server. After successful
+ completion, <function>pg_last_wal_replay_lsn()</function> will
+ return a value greater than or equal to the target LSN. This mode
+ can only be used during recovery.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>standby_write</literal>: Wait for the WAL containing the
+ LSN to be received from the primary and written to disk on a
+ standby server, but not yet flushed. This is faster than
+ <literal>standby_flush</literal> but provides weaker durability
+ guarantees since the data may still be in operating system
+ buffers. After successful completion, the
+ <structfield>written_lsn</structfield> column in
+ <link linkend="monitoring-pg-stat-wal-receiver-view">
+ <structname>pg_stat_wal_receiver</structname></link> will show
+ a value greater than or equal to the target LSN. This mode can
+ only be used during recovery.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>standby_flush</literal>: Wait for the WAL containing the
+ LSN to be received from the primary and flushed to disk on a
+ standby server. This provides a durability guarantee without
+ waiting for the WAL to be applied. After successful completion,
+ <function>pg_last_wal_receive_lsn()</function> will return a
+ value greater than or equal to the target LSN. This value is
+ also available as the <structfield>flushed_lsn</structfield>
+ column in <link linkend="monitoring-pg-stat-wal-receiver-view">
+ <structname>pg_stat_wal_receiver</structname></link>. This mode
+ can only be used during recovery.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>primary_flush</literal>: Wait for the WAL containing the
+ LSN to be flushed to disk on a primary server. After successful
+ completion, <function>pg_current_wal_flush_lsn()</function> will
+ return a value greater than or equal to the target LSN. This mode
+ can only be used on a primary server (not during recovery).
+ </para>
+ </listitem>
+ </itemizedlist>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>TIMEOUT</literal> '<replaceable class="parameter">timeout</replaceable>'</term>
<listitem>
<listitem>
<para>
This return value denotes that the database server is not in a recovery
- state. This might mean either the database server was not in recovery
- at the moment of receiving the command, or it was promoted before
- reaching the target <parameter>lsn</parameter>.
+ state. This might mean either the database server was not in recovery
+ at the moment of receiving the command (i.e., executed on a primary),
+ or it was promoted before reaching the target <parameter>lsn</parameter>.
+ In the promotion case, this status indicates a timeline change occurred,
+ and the application should re-evaluate whether the target LSN is still
+ relevant.
</para>
</listitem>
</varlistentry>
<title>Notes</title>
<para>
- <command>WAIT FOR</command> command waits till
- <parameter>lsn</parameter> to be replayed on standby.
- That is, after this command execution, the value returned by
- <function>pg_last_wal_replay_lsn</function> should be greater or equal
- to the <parameter>lsn</parameter> value. This is useful to achieve
- read-your-writes-consistency, while using async replica for reads and
- primary for writes. In that case, the <acronym>lsn</acronym> of the last
- modification should be stored on the client application side or the
- connection pooler side.
+ <command>WAIT FOR</command> waits until the specified
+ <parameter>lsn</parameter> is reached according to the specified
+ <parameter>mode</parameter>. The <literal>standby_replay</literal> mode
+ waits for the LSN to be replayed (applied to the database), which is
+ useful to achieve read-your-writes consistency while using an async
+ replica for reads and the primary for writes. The
+ <literal>standby_flush</literal> mode waits for the WAL to be flushed
+ to durable storage on the replica, providing a durability guarantee
+ without waiting for replay. The <literal>standby_write</literal> mode
+ waits for the WAL to be written to the operating system, which is
+ faster than flush but provides weaker durability guarantees. The
+ <literal>primary_flush</literal> mode waits for WAL to be flushed on
+ a primary server. In all cases, the <acronym>LSN</acronym> of the last
+ modification should be stored on the client application side or the
+ connection pooler side.
</para>
<para>
- <command>WAIT FOR</command> command should be called on standby.
- If a user runs <command>WAIT FOR</command> on primary, it
- will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause.
- However, if <command>WAIT FOR</command> is
- called on primary promoted from standby and <literal>lsn</literal>
- was already replayed, then the <command>WAIT FOR</command> command just
- exits immediately.
+ The standby modes (<literal>standby_replay</literal>,
+ <literal>standby_write</literal>, <literal>standby_flush</literal>)
+ can only be used during recovery, and <literal>primary_flush</literal>
+ can only be used on a primary server. Using the wrong mode for the
+ current server state will result in an error. If a standby is promoted
+ while waiting with a standby mode, the command will return
+ <literal>not in recovery</literal> (or throw an error if
+ <literal>NO_THROW</literal> is not specified). Promotion creates a new
+ timeline, and the LSN being waited for may refer to WAL from the old
+ timeline.
</para>
</refsect1>
<title>Examples</title>
<para>
- You can use <command>WAIT FOR</command> command to wait for
- the <type>pg_lsn</type> value. For example, an application could update
- the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
- changes just made. This example uses <function>pg_current_wal_insert_lsn</function>
- on primary server to get the <acronym>lsn</acronym> given that
- <varname>synchronous_commit</varname> could be set to
- <literal>off</literal>.
+ You can use <command>WAIT FOR</command> command to wait for
+ the <type>pg_lsn</type> value. For example, an application could update
+ the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+ changes just made. This example uses <function>pg_current_wal_insert_lsn</function>
+ on primary server to get the <acronym>lsn</acronym> given that
+ <varname>synchronous_commit</varname> could be set to
+ <literal>off</literal>.
<programlisting>
postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
UPDATE 100
postgres=# SELECT pg_current_wal_insert_lsn();
-pg_current_wal_insert_lsn
---------------------
-0/306EE20
+ pg_current_wal_insert_lsn
+---------------------------
+ 0/306EE20
(1 row)
</programlisting>
<programlisting>
postgres=# WAIT FOR LSN '0/306EE20';
status
---------
+---------
success
(1 row)
postgres=# SELECT * FROM movie WHERE genre = 'Drama';
</para>
<para>
- If the target LSN is not reached before the timeout, the error is thrown.
+ Wait for flush (data durable on replica):
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_flush');
+ status
+---------
+ success
+(1 row)
+</programlisting>
+ </para>
+
+ <para>
+ Wait for write with timeout:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_write', TIMEOUT '100ms', NO_THROW);
+ status
+---------
+ success
+(1 row)
+</programlisting>
+ </para>
+
+ <para>
+ Wait for flush on primary:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'primary_flush');
+ status
+---------
+ success
+(1 row)
+</programlisting>
+ </para>
+
+ <para>
+ If the target LSN is not reached before the timeout, an error is thrown:
<programlisting>
postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
<para>
The same example uses <command>WAIT FOR</command> with
- <parameter>NO_THROW</parameter> option.
+ <parameter>NO_THROW</parameter> option:
+
<programlisting>
postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
status
---------
+---------
timeout
(1 row)
</programlisting>
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
+ /*
+ * If we flushed an LSN that someone was waiting for, notify the waiters.
+ */
+ if (waitLSNState &&
+ (LogwrtResult.Flush >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush);
+
/*
* If we still haven't flushed to the request point then we have a
* problem; most likely, the requested flush point is past end of XLOG.
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
+ /*
+ * If we flushed an LSN that someone was waiting for, notify the waiters.
+ */
+ if (waitLSNState &&
+ (LogwrtResult.Flush >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush);
+
/*
* Great, done. To take some work off the critical path, try to initialize
* as many of the no-longer-needed WAL buffers for future use as we can.
WakeupCheckpointer();
/*
- * Wake up all waiters for replay LSN. They need to report an error that
- * recovery was ended before reaching the target LSN.
+ * Wake up all waiters. They need to report an error that recovery was
+ * ended before reaching the target LSN.
*/
WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr);
+ WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, InvalidXLogRecPtr);
+ WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, InvalidXLogRecPtr);
/*
* Shutdown the recovery environment. This must occur after
*
* wait.c
* Implements WAIT FOR, which allows waiting for events such as
- * time passing or LSN having been replayed on replica.
+ * time passing or LSN having been replayed, flushed, or written.
*
* Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
*
#include <math.h>
+#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "commands/defrem.h"
XLogRecPtr lsn;
int64 timeout = 0;
WaitLSNResult waitLSNResult;
+ WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
bool throw = true;
TupleDesc tupdesc;
TupOutputState *tstate;
const char *result = "<unset>";
bool timeout_specified = false;
bool no_throw_specified = false;
+ bool mode_specified = false;
/* Parse and validate the mandatory LSN */
lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
foreach_node(DefElem, defel, stmt->options)
{
- if (strcmp(defel->defname, "timeout") == 0)
+ if (strcmp(defel->defname, "mode") == 0)
+ {
+ char *mode_str;
+
+ if (mode_specified)
+ errorConflictingDefElem(defel, pstate);
+ mode_specified = true;
+
+ mode_str = defGetString(defel);
+
+ if (pg_strcasecmp(mode_str, "standby_replay") == 0)
+ lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
+ else if (pg_strcasecmp(mode_str, "standby_write") == 0)
+ lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
+ else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
+ lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
+ else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
+ lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized value for %s option \"%s\": \"%s\"",
+ "WAIT", defel->defname, mode_str),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "timeout") == 0)
{
char *timeout_str;
const char *hintmsg;
}
/*
- * We are going to wait for the LSN replay. We should first care that we
- * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * We are going to wait for the LSN. We should first care that we don't
+ * hold a snapshot and correspondingly our MyProc->xmin is invalid.
* Otherwise, our snapshot could prevent the replay of WAL records
* implying a kind of self-deadlock. This is the reason why WAIT FOR is a
* command, not a procedure or function.
*/
Assert(MyProc->xmin == InvalidTransactionId);
- waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout);
+ /*
+ * Validate that the requested mode matches the current server state.
+ * Primary modes can only be used on a primary.
+ */
+ if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
+ {
+ if (RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is in progress"),
+ errhint("Waiting for primary_flush can only be done on a primary server. "
+ "Use standby_flush mode on a standby server.")));
+ }
+
+ /* Now wait for the LSN */
+ waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
/*
* Process the result of WaitForLSN(). Throw appropriate error if needed.
case WAIT_LSN_RESULT_TIMEOUT:
if (throw)
- ereport(ERROR,
- errcode(ERRCODE_QUERY_CANCELED),
- errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
- LSN_FORMAT_ARGS(lsn),
- LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ {
+ XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
+
+ switch (lsnType)
+ {
+ case WAIT_LSN_TYPE_STANDBY_REPLAY:
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ case WAIT_LSN_TYPE_STANDBY_WRITE:
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ case WAIT_LSN_TYPE_STANDBY_FLUSH:
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ case WAIT_LSN_TYPE_PRIMARY_FLUSH:
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ default:
+ elog(ERROR, "unexpected wait LSN type %d", lsnType);
+ pg_unreachable();
+ }
+ }
else
result = "timeout";
break;
{
if (PromoteIsTriggered())
{
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("recovery is not in progress"),
- errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
- LSN_FORMAT_ARGS(lsn),
- LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
+
+ switch (lsnType)
+ {
+ case WAIT_LSN_TYPE_STANDBY_REPLAY:
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ case WAIT_LSN_TYPE_STANDBY_WRITE:
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ case WAIT_LSN_TYPE_STANDBY_FLUSH:
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(currentLSN)));
+ break;
+
+ default:
+ elog(ERROR, "unexpected wait LSN type %d", lsnType);
+ pg_unreachable();
+ }
}
else
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("recovery is not in progress"),
- errhint("Waiting for the replay LSN can only be executed during recovery."));
+ {
+ switch (lsnType)
+ {
+ case WAIT_LSN_TYPE_STANDBY_REPLAY:
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
+ break;
+
+ case WAIT_LSN_TYPE_STANDBY_WRITE:
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the standby_write LSN can only be executed during recovery."));
+ break;
+
+ case WAIT_LSN_TYPE_STANDBY_FLUSH:
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
+ break;
+
+ default:
+ elog(ERROR, "unexpected wait LSN type %d", lsnType);
+ pg_unreachable();
+ }
+ }
}
else
result = "not in recovery";
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
/* Update shared-memory status */
pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
+ /*
+ * If we wrote an LSN that someone was waiting for, notify the waiters.
+ */
+ if (waitLSNState &&
+ (LogstreamResult.Write >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_WRITE])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, LogstreamResult.Write);
+
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop, to create its archive notification file soon. Otherwise WAL
}
SpinLockRelease(&walrcv->mutex);
+ /*
+ * If we flushed an LSN that someone was waiting for, notify the
+ * waiters.
+ */
+ if (waitLSNState &&
+ (LogstreamResult.Flush >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_FLUSH])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, LogstreamResult.Flush);
+
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
if (AllowCascadeReplication())
-# Checks waiting for the LSN replay on standby using
-# the WAIT FOR command.
+# Checks waiting for the LSN using the WAIT FOR command.
+# Tests standby modes (standby_replay/standby_write/standby_flush) on standby
+# and primary_flush mode on primary.
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Utils;
use Test::More;
+# Helper functions to control walreceiver for testing wait conditions.
+# These allow us to stop WAL streaming so waiters block, then resume it.
+my $saved_primary_conninfo;
+
+sub stop_walreceiver
+{
+ my ($node) = @_;
+ $saved_primary_conninfo = $node->safe_psql(
+ 'postgres', qq[
+ SELECT pg_catalog.quote_literal(setting)
+ FROM pg_settings
+ WHERE name = 'primary_conninfo';
+ ]);
+ $node->safe_psql(
+ 'postgres', qq[
+ ALTER SYSTEM SET primary_conninfo = '';
+ SELECT pg_reload_conf();
+ ]);
+
+ $node->poll_query_until('postgres',
+ "SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
+sub resume_walreceiver
+{
+ my ($node) = @_;
+ $node->safe_psql(
+ 'postgres', qq[
+ ALTER SYSTEM SET primary_conninfo = $saved_primary_conninfo;
+ SELECT pg_reload_conf();
+ ]);
+
+ $node->poll_query_until('postgres',
+ "SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
# Initialize primary node
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
$node_primary->init(allows_streaming => 1);
ok((split("\n", $output))[-1] eq 30,
"standby reached the same LSN as primary");
-# 3. Check that waiting for unreachable LSN triggers the timeout. The
+# 3. Check that WAIT FOR works with standby_write, standby_flush, and
+# primary_flush modes.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn_write =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn_write}' WITH (MODE 'standby_write', timeout '1d');
+ SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+ "standby wrote WAL up to target LSN after WAIT FOR with MODE 'standby_write'"
+);
+
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn_flush =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn_flush}' WITH (MODE 'standby_flush', timeout '1d');
+ SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+ "standby flushed WAL up to target LSN after WAIT FOR with MODE 'standby_flush'"
+);
+
+# Check primary_flush mode on primary
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn_primary_flush =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_primary->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn_primary_flush}' WITH (MODE 'primary_flush', timeout '1d');
+ SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '${lsn_primary_flush}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+ "primary flushed WAL up to target LSN after WAIT FOR with MODE 'primary_flush'"
+);
+
+# 4. Check that waiting for unreachable LSN triggers the timeout. The
# unreachable LSN must be well in advance. So WAL records issued by
# the concurrent autovacuum could not affect that.
my $lsn3 =
WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
-# 4. Check that WAIT FOR triggers an error if called on primary,
-# within another function, or inside a transaction with an isolation level
-# higher than READ COMMITTED.
+# 5. Check mode validation: standby modes error on primary, primary mode errors
+# on standby, and primary_flush works on primary. Also check that WAIT FOR
+# triggers an error if called within another function or inside a transaction
+# with an isolation level higher than READ COMMITTED.
+
+# Test standby_flush on primary - should error
+$node_primary->psql(
+ 'postgres',
+ "WAIT FOR LSN '${lsn3}' WITH (MODE 'standby_flush');",
+ stderr => \$stderr);
+ok($stderr =~ /recovery is not in progress/,
+ "get an error when running standby_flush on the primary");
-$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';",
+# Test primary_flush on standby - should error
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${lsn3}' WITH (MODE 'primary_flush');",
stderr => \$stderr);
-ok( $stderr =~ /recovery is not in progress/,
- "get an error when running on the primary");
+ok($stderr =~ /recovery is in progress/,
+ "get an error when running primary_flush on the standby");
$node_standby->psql(
'postgres',
/WAIT FOR must be called without an active or registered snapshot/,
"get an error when running within another function");
-# 5. Check parameter validation error cases on standby before promotion
+# 6. Check parameter validation error cases on standby before promotion
my $test_lsn =
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
ok( $stderr =~ /option "invalid_option" not recognized/,
"get error for invalid WITH clause option");
-# 6. Check the scenario of multiple LSN waiters. We make 5 background
-# psql sessions each waiting for a corresponding insertion. When waiting is
-# finished, stored procedures logs if there are visible as many rows as
-# should be.
+# Test invalid MODE value
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (MODE 'invalid');",
+ stderr => \$stderr);
+ok($stderr =~ /unrecognized value for WAIT option "mode": "invalid"/,
+ "get error for invalid MODE value");
+
+# Test duplicate MODE parameter
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (MODE 'standby_replay', MODE 'standby_write');",
+ stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+ "get error for duplicate MODE parameter");
+
+# 7a. Check the scenario of multiple standby_replay waiters. We make 5
+# background psql sessions each waiting for a corresponding insertion. When
+# waiting is finished, stored procedures logs if there are visible as many
+# rows as should be.
$node_primary->safe_psql(
'postgres', qq[
CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
END
\$\$
LANGUAGE plpgsql;
+
+CREATE FUNCTION log_wait_done(prefix text, i int) RETURNS void AS \$\$
+ BEGIN
+ RAISE LOG '% %', prefix, i;
+ END
+\$\$
+LANGUAGE plpgsql;
]);
+
$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
my @psql_sessions;
for (my $i = 0; $i < 5; $i++)
{
SELECT log_count(${i});
]);
}
+
my $log_offset = -s $node_standby->logfile;
$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
for (my $i = 0; $i < 5; $i++)
$psql_sessions[$i]->quit;
}
-ok(1, 'multiple LSN waiters reported consistent data');
+ok(1, 'multiple standby_replay waiters reported consistent data');
+
+# 7b. Check the scenario of multiple standby_write waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @write_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (100 + ${i});");
+ $write_lsns[$i] =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start standby_write waiters (they will block since walreceiver is stopped)
+my @write_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $write_sessions[$i] = $node_standby->background_psql('postgres');
+ $write_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '$write_lsns[$i]' WITH (MODE 'standby_write', timeout '1d');
+ SELECT log_wait_done('write_done', $i);
+ ]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+ "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'"
+);
+
+# Restore walreceiver to unblock waiters
+my $write_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_standby->wait_for_log("write_done $i", $write_log_offset);
+ $write_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was written up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+ "multiple standby_write waiters: standby wrote WAL up to target LSN");
+
+# 7c. Check the scenario of multiple standby_flush waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (200 + ${i});");
+ $flush_lsns[$i] =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start standby_flush waiters (they will block since walreceiver is stopped)
+my @flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $flush_sessions[$i] = $node_standby->background_psql('postgres');
+ $flush_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '$flush_lsns[$i]' WITH (MODE 'standby_flush', timeout '1d');
+ SELECT log_wait_done('flush_done', $i);
+ ]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+ "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'"
+);
+
+# Restore walreceiver to unblock waiters
+my $flush_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_standby->wait_for_log("flush_done $i", $flush_log_offset);
+ $flush_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was flushed up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+ "multiple standby_flush waiters: standby flushed WAL up to target LSN");
+
+# 7d. Check the scenario of mixed standby mode waiters (standby_replay,
+# standby_write, standby_flush) running concurrently. We start 6 sessions:
+# 2 for each mode, all waiting for the same target LSN. We stop the
+# walreceiver and pause replay to ensure all waiters block. Then we resume
+# replay and restart the walreceiver to verify they unblock and complete
+# correctly.
+
+# Stop walreceiver first to ensure we can control the flow without hanging
+# (stopping it after pausing replay can hang if the startup process is paused).
+stop_walreceiver($node_standby);
+
+# Pause replay
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
+# Generate WAL on primary
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(301, 310));");
+my $mixed_target_lsn =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Start 6 waiters: 2 for each mode
+my @mixed_sessions;
+my @mixed_modes = ('standby_replay', 'standby_write', 'standby_flush');
+for (my $i = 0; $i < 6; $i++)
+{
+ $mixed_sessions[$i] = $node_standby->background_psql('postgres');
+ $mixed_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${mixed_target_lsn}' WITH (MODE '$mixed_modes[$i % 3]', timeout '1d');
+ SELECT log_wait_done('mixed_done', $i);
+ ]);
+}
+
+# Verify all waiters are blocked
+$node_standby->poll_query_until('postgres',
+ "SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'"
+);
+
+# Resume replay (waiters should still be blocked as no WAL has arrived)
+my $mixed_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+$node_standby->poll_query_until('postgres',
+ "SELECT NOT pg_is_wal_replay_paused();");
+
+# Restore walreceiver to allow WAL to arrive
+resume_walreceiver($node_standby);
-# 7. Check that the standby promotion terminates the wait on LSN. Start
-# waiting for an unreachable LSN then promote. Check the log for the relevant
-# error message. Also, check that waiting for already replayed LSN doesn't
-# cause an error even after promotion.
+# Wait for all sessions to complete and close them
+for (my $i = 0; $i < 6; $i++)
+{
+ $node_standby->wait_for_log("mixed_done $i", $mixed_log_offset);
+ $mixed_sessions[$i]->quit;
+}
+
+# Verify all modes reached the target LSN
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+ pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+ pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0;
+]);
+
+ok($output eq 't',
+ "mixed mode waiters: all modes completed and reached target LSN");
+
+# 7e. Check the scenario of multiple primary_flush waiters on primary.
+# We start 5 background sessions waiting for different LSNs with primary_flush
+# mode. Each waiter logs when done.
+my @primary_flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (400 + ${i});");
+ $primary_flush_lsns[$i] =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+}
+
+my $primary_flush_log_offset = -s $node_primary->logfile;
+
+# Start primary_flush waiters
+my @primary_flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $primary_flush_sessions[$i] = $node_primary->background_psql('postgres');
+ $primary_flush_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '$primary_flush_lsns[$i]' WITH (MODE 'primary_flush', timeout '1d');
+ SELECT log_wait_done('primary_flush_done', $i);
+ ]);
+}
+
+# The WAL should already be flushed, so waiters should complete quickly
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->wait_for_log("primary_flush_done $i",
+ $primary_flush_log_offset);
+ $primary_flush_sessions[$i]->quit;
+}
+
+# Verify on primary that WAL was flushed up to the target LSN
+$output = $node_primary->safe_psql('postgres',
+ "SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '$primary_flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+ "multiple primary_flush waiters: primary flushed WAL up to target LSN");
+
+# 8. Check that the standby promotion terminates all standby wait modes. Start
+# waiting for unreachable LSNs with standby_replay, standby_write, and
+# standby_flush modes, then promote. Check the log for the relevant error
+# messages. Also, check that waiting for already replayed LSN doesn't cause
+# an error even after promotion.
my $lsn4 =
$node_primary->safe_psql('postgres',
"SELECT pg_current_wal_insert_lsn() + 10000000000");
+
my $lsn5 =
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
-my $psql_session = $node_standby->background_psql('postgres');
-$psql_session->query_until(
- qr/start/, qq[
- \\echo start
- WAIT FOR LSN '${lsn4}';
-]);
+
+# Start background sessions waiting for unreachable LSN with all modes
+my @wait_modes = ('standby_replay', 'standby_write', 'standby_flush');
+my @wait_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+ $wait_sessions[$i] = $node_standby->background_psql('postgres');
+ $wait_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${lsn4}' WITH (MODE '$wait_modes[$i]');
+ ]);
+}
# Make sure standby will be promoted at least at the primary insert LSN we
# have just observed. Use pg_switch_wal() to force the insert LSN to be
$log_offset = -s $node_standby->logfile;
$node_standby->promote;
-$node_standby->wait_for_log('recovery is not in progress', $log_offset);
-ok(1, 'got error after standby promote');
+# Wait for all three sessions to get the error (each mode has distinct message)
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/,
+ $log_offset);
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/,
+ $log_offset);
+$node_standby->wait_for_log(
+ qr/Recovery ended before target LSN.*was replayed/, $log_offset);
+
+ok(1, 'promotion interrupted all wait modes');
$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
$node_standby->stop;
$node_primary->stop;
-# If we send \q with $psql_session->quit the command can be sent to the session
+# If we send \q with $session->quit the command can be sent to the session
# already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
+for (my $i = 0; $i < 3; $i++)
+{
+ $wait_sessions[$i]->{run}->finish;
+}
done_testing();