]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Implement WAIT FOR command
authorAlexander Korotkov <akorotkov@postgresql.org>
Wed, 5 Nov 2025 09:43:55 +0000 (11:43 +0200)
committerAlexander Korotkov <akorotkov@postgresql.org>
Wed, 5 Nov 2025 09:44:13 +0000 (11:44 +0200)
WAIT FOR is to be used on standby and specifies waiting for
the specific WAL location to be replayed.  This option is useful when
the user makes some data changes on primary and needs a guarantee to see
these changes are on standby.

WAIT FOR needs to wait without any snapshot held.  Otherwise, the snapshot
could prevent the replay of WAL records, implying a kind of self-deadlock.
This is why separate utility command seems appears to be the most robust
way to implement this functionality.  It's not possible to implement this as
a function.  Previous experience shows that stored procedures also have
limitation in this aspect.

Discussion: https://www.postgresql.org/message-id/flat/CAPpHfdsjtZLVzxjGT8rJHCYbM0D5dwkO+BBjcirozJ6nYbOW8Q@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/flat/CABPTF7UNft368x-RgOXkfj475OwEbp%2BVVO-wEXz7StgjD_%3D6sw%40mail.gmail.com
Author: Kartyshov Ivan <i.kartyshov@postgrespro.ru>
Author: Alexander Korotkov <aekorotkov@gmail.com>
Author: Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Peter Eisentraut <peter.eisentraut@enterprisedb.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Alexander Lakhin <exclusion@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: jian he <jian.universality@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Xuneng Zhou <xunengzhou@gmail.com>
21 files changed:
doc/src/sgml/high-availability.sgml
doc/src/sgml/ref/allfiles.sgml
doc/src/sgml/ref/wait_for.sgml [new file with mode: 0644]
doc/src/sgml/reference.sgml
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogrecovery.c
src/backend/commands/Makefile
src/backend/commands/meson.build
src/backend/commands/wait.c [new file with mode: 0644]
src/backend/parser/gram.y
src/backend/storage/lmgr/proc.c
src/backend/tcop/pquery.c
src/backend/tcop/utility.c
src/include/commands/wait.h [new file with mode: 0644]
src/include/nodes/parsenodes.h
src/include/parser/kwlist.h
src/include/tcop/cmdtaglist.h
src/test/recovery/meson.build
src/test/recovery/t/049_wait_for_lsn.pl [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index b47d8b4106efb9e1cd113bb3e0f0ea6ff8779600..742deb037b73ba8089fa33781b3a7f0f9d31f75b 100644 (file)
@@ -1376,6 +1376,60 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
    </sect3>
   </sect2>
 
+  <sect2 id="read-your-writes-consistency">
+   <title>Read-Your-Writes Consistency</title>
+
+   <para>
+    In asynchronous replication, there is always a short window where changes
+    on the primary may not yet be visible on the standby due to replication
+    lag. This can lead to inconsistencies when an application writes data on
+    the primary and then immediately issues a read query on the standby.
+    However, it is possible to address this without switching to synchronous
+    replication.
+   </para>
+
+   <para>
+    To address this, PostgreSQL offers a mechanism for read-your-writes
+    consistency. The key idea is to ensure that a client sees its own writes
+    by synchronizing the WAL replay on the standby with the known point of
+    change on the primary.
+   </para>
+
+   <para>
+    This is achieved by the following steps.  After performing write
+    operations, the application retrieves the current WAL location using a
+    function call like this.
+
+    <programlisting>
+postgres=# SELECT pg_current_wal_insert_lsn();
+pg_current_wal_insert_lsn
+--------------------
+0/306EE20
+(1 row)
+    </programlisting>
+   </para>
+
+   <para>
+    The <acronym>LSN</acronym> obtained from the primary is then communicated
+    to the standby server. This can be managed at the application level or
+    via the connection pooler.  On the standby, the application issues the
+    <xref linkend="sql-wait-for"/> command to block further processing until
+    the standby's WAL replay process reaches (or exceeds) the specified
+    <acronym>LSN</acronym>.
+
+    <programlisting>
+postgres=# WAIT FOR LSN '0/306EE20';
+ status
+--------
+ success
+(1 row)
+    </programlisting>
+    Once the command returns a status of success, it guarantees that all
+    changes up to the provided <acronym>LSN</acronym> have been applied,
+    ensuring that subsequent read queries will reflect the latest updates.
+   </para>
+  </sect2>
+
   <sect2 id="continuous-archiving-in-standby">
    <title>Continuous Archiving in Standby</title>
 
index f5be638867abee65fd9e63e012455f2cf2c65506..e167406c74490b6d8125dddf419eb2fd5dd0b8dd 100644 (file)
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY waitFor            SYSTEM "wait_for.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml
new file mode 100644 (file)
index 0000000..3b8e842
--- /dev/null
@@ -0,0 +1,234 @@
+<!--
+doc/src/sgml/ref/wait_for.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait-for">
+ <indexterm zone="sql-wait-for">
+  <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>WAIT FOR</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>WAIT FOR</refname>
+  <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+
+<phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
+
+    TIMEOUT '<replaceable class="parameter">timeout</replaceable>'
+    NO_THROW
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+    Waits until recovery replays <parameter>lsn</parameter>.
+    If no <parameter>timeout</parameter> is specified or it is set to
+    zero, this command waits indefinitely for the
+    <parameter>lsn</parameter>.
+    On timeout, or if the server is promoted before
+    <parameter>lsn</parameter> is reached, an error is emitted,
+    unless <literal>NO_THROW</literal> is specified in the WITH clause.
+    If <parameter>NO_THROW</parameter> is specified, then the command
+    doesn't throw errors.
+  </para>
+
+  <para>
+    The possible return values are <literal>success</literal>,
+    <literal>timeout</literal>, and <literal>not in recovery</literal>.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="parameter">lsn</replaceable></term>
+    <listitem>
+     <para>
+      Specifies the target <acronym>LSN</acronym> to wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>WITH ( <replaceable class="parameter">option</replaceable> [, ...] )</literal></term>
+    <listitem>
+     <para>
+      This clause specifies optional parameters for the wait operation.
+      The following parameters are supported:
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>TIMEOUT</literal> '<replaceable class="parameter">timeout</replaceable>'</term>
+        <listitem>
+         <para>
+          When specified and <parameter>timeout</parameter> is greater than zero,
+          the command waits until <parameter>lsn</parameter> is reached or
+          the specified <parameter>timeout</parameter> has elapsed.
+         </para>
+         <para>
+          The <parameter>timeout</parameter> might be given as integer number of
+          milliseconds.  Also it might be given as string literal with
+          integer number of milliseconds or a number with unit
+          (see <xref linkend="config-setting-names-values"/>).
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>NO_THROW</literal></term>
+        <listitem>
+         <para>
+          Specify to not throw an error in the case of timeout or
+          running on the primary.  In this case the result status can be get from
+          the return value.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Outputs</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>success</literal></term>
+    <listitem>
+     <para>
+      This return value denotes that we have successfully reached
+      the target <parameter>lsn</parameter>.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>timeout</literal></term>
+    <listitem>
+     <para>
+      This return value denotes that the timeout happened before reaching
+      the target <parameter>lsn</parameter>.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>not in recovery</literal></term>
+    <listitem>
+     <para>
+      This return value denotes that the database server is not in a recovery
+      state.  This might mean either the database server was not in recovery
+      at the moment of receiving the command, or it was promoted before
+      reaching the target <parameter>lsn</parameter>.
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+    <command>WAIT FOR</command> command waits till
+    <parameter>lsn</parameter> to be replayed on standby.
+    That is, after this command execution, the value returned by
+    <function>pg_last_wal_replay_lsn</function> should be greater or equal
+    to the <parameter>lsn</parameter> value.  This is useful to achieve
+    read-your-writes-consistency, while using async replica for reads and
+    primary for writes.  In that case, the <acronym>lsn</acronym> of the last
+    modification should be stored on the client application side or the
+    connection pooler side.
+  </para>
+
+  <para>
+    <command>WAIT FOR</command> command should be called on standby.
+    If a user runs <command>WAIT FOR</command> on primary, it
+    will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause.
+    However, if <command>WAIT FOR</command> is
+    called on primary promoted from standby and <literal>lsn</literal>
+    was already replayed, then the <command>WAIT FOR</command> command just
+    exits immediately.
+  </para>
+
+</refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+    You can use <command>WAIT FOR</command> command to wait for
+    the <type>pg_lsn</type> value.  For example, an application could update
+    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+    on primary server to get the <acronym>lsn</acronym> given that
+    <varname>synchronous_commit</varname> could be set to
+    <literal>off</literal>.
+
+   <programlisting>
+postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
+UPDATE 100
+postgres=# SELECT pg_current_wal_insert_lsn();
+pg_current_wal_insert_lsn
+--------------------
+0/306EE20
+(1 row)
+</programlisting>
+
+   Then an application could run <command>WAIT FOR</command>
+   with the <parameter>lsn</parameter> obtained from primary.  After that the
+   changes made on primary should be guaranteed to be visible on replica.
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20';
+ status
+--------
+ success
+(1 row)
+postgres=# SELECT * FROM movie WHERE genre = 'Drama';
+ genre
+-------
+(0 rows)
+</programlisting>
+  </para>
+
+  <para>
+    If the target LSN is not reached before the timeout, the error is thrown.
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
+ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
+</programlisting>
+  </para>
+
+  <para>
+   The same example uses <command>WAIT FOR</command> with
+   <parameter>NO_THROW</parameter> option.
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
+ status
+--------
+ timeout
+(1 row)
+</programlisting>
+  </para>
+ </refsect1>
+</refentry>
index ff85ace83fc485af58d3a2505410d2c6bc0b425c..2cf02c37b17bdd05c42fcc04300bae5ebcbcd6a4 100644 (file)
    &update;
    &vacuum;
    &values;
+   &waitFor;
 
  </reference>
 
index 2cf3d4e92b7df5a4f1559476a466ffd4fb0f0bc2..092e197eba33eba6853076322dcd832f923585ff 100644 (file)
@@ -31,6 +31,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_enum.h"
@@ -2843,6 +2844,11 @@ AbortTransaction(void)
         */
        LWLockReleaseAll();
 
+       /*
+        * Cleanup waiting for LSN if any.
+        */
+       WaitLSNCleanup();
+
        /* Clear wait information and command progress indicator */
        pgstat_report_wait_end();
        pgstat_progress_end_command();
index 9900e3e0179f711d085f8fe2593bf2c7235c50fa..7c959051e1109a7bcb02999028d52e66a6618496 100644 (file)
@@ -62,6 +62,7 @@
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "backup/basebackup.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
@@ -6227,6 +6228,12 @@ StartupXLOG(void)
        UpdateControlFile();
        LWLockRelease(ControlFileLock);
 
+       /*
+        * Wake up all waiters for replay LSN.  They need to report an error that
+        * recovery was ended before reaching the target LSN.
+        */
+       WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+
        /*
         * Shutdown the recovery environment.  This must occur after
         * RecoverPreparedTransactions() (see notes in lock_twophase_recover())
index 93c50831b260960593bdc15c77f4d8b69a4af9c3..550de6e4a59144b52314e95ca77ad497efbd7791 100644 (file)
@@ -40,6 +40,7 @@
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
@@ -1838,6 +1839,16 @@ PerformWalRecovery(void)
                                break;
                        }
 
+                       /*
+                        * If we replayed an LSN that someone was waiting for then walk
+                        * over the shared memory array and set latches to notify the
+                        * waiters.
+                        */
+                       if (waitLSNState &&
+                               (XLogRecoveryCtl->lastReplayedEndRecPtr >=
+                                pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
+                               WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+
                        /* Else, try to fetch the next WAL record */
                        record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
                } while (record != NULL);
index cb2fbdc7c6018baf19cb99fb7913d51851f31777..f99acfd2b4bbb32e4d2b4ea2590c09efe42306c6 100644 (file)
@@ -64,6 +64,7 @@ OBJS = \
        vacuum.o \
        vacuumparallel.o \
        variable.o \
-       view.o
+       view.o \
+       wait.o
 
 include $(top_srcdir)/src/backend/common.mk
index dd4cde41d32ccfbdcd6de1b89c721e601a17f392..9f640ad48104c75da785c8f1a827968c95d05aa9 100644 (file)
@@ -53,4 +53,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'wait.c',
 )
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644 (file)
index 0000000..67068a9
--- /dev/null
@@ -0,0 +1,212 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *       Implements WAIT FOR, which allows waiting for events such as
+ *       time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "commands/defrem.h"
+#include "commands/wait.h"
+#include "executor/executor.h"
+#include "parser/parse_node.h"
+#include "storage/proc.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+void
+ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
+{
+       XLogRecPtr      lsn;
+       int64           timeout = 0;
+       WaitLSNResult waitLSNResult;
+       bool            throw = true;
+       TupleDesc       tupdesc;
+       TupOutputState *tstate;
+       const char *result = "<unset>";
+       bool            timeout_specified = false;
+       bool            no_throw_specified = false;
+
+       /* Parse and validate the mandatory LSN */
+       lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+                                                                                 CStringGetDatum(stmt->lsn_literal)));
+
+       foreach_node(DefElem, defel, stmt->options)
+       {
+               if (strcmp(defel->defname, "timeout") == 0)
+               {
+                       char       *timeout_str;
+                       const char *hintmsg;
+                       double          result;
+
+                       if (timeout_specified)
+                               errorConflictingDefElem(defel, pstate);
+                       timeout_specified = true;
+
+                       timeout_str = defGetString(defel);
+
+                       if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
+                       {
+                               ereport(ERROR,
+                                               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                               errmsg("invalid timeout value: \"%s\"", timeout_str),
+                                               hintmsg ? errhint("%s", _(hintmsg)) : 0);
+                       }
+
+                       /*
+                        * Get rid of any fractional part in the input. This is so we
+                        * don't fail on just-out-of-range values that would round into
+                        * range.
+                        */
+                       result = rint(result);
+
+                       /* Range check */
+                       if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
+                               ereport(ERROR,
+                                               errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+                                               errmsg("timeout value is out of range"));
+
+                       if (result < 0)
+                               ereport(ERROR,
+                                               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                               errmsg("timeout cannot be negative"));
+
+                       timeout = (int64) result;
+               }
+               else if (strcmp(defel->defname, "no_throw") == 0)
+               {
+                       if (no_throw_specified)
+                               errorConflictingDefElem(defel, pstate);
+
+                       no_throw_specified = true;
+
+                       throw = !defGetBoolean(defel);
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       errcode(ERRCODE_SYNTAX_ERROR),
+                                       errmsg("option \"%s\" not recognized",
+                                                  defel->defname),
+                                       parser_errposition(pstate, defel->location));
+               }
+       }
+
+       /*
+        * We are going to wait for the LSN replay.  We should first care that we
+        * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+        * Otherwise, our snapshot could prevent the replay of WAL records
+        * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
+        * command, not a procedure or function.
+        *
+        * At first, we should check there is no active snapshot.  According to
+        * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
+        * processed with a snapshot.  Thankfully, we can pop this snapshot,
+        * because PortalRunUtility() can tolerate this.
+        */
+       if (ActiveSnapshotSet())
+               PopActiveSnapshot();
+
+       /*
+        * At second, invalidate a catalog snapshot if any.  And we should be done
+        * with the preparation.
+        */
+       InvalidateCatalogSnapshot();
+
+       /* Give up if there is still an active or registered snapshot. */
+       if (HaveRegisteredOrActiveSnapshot())
+               ereport(ERROR,
+                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("WAIT FOR must be only called without an active or registered snapshot"),
+                               errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
+
+       /*
+        * As the result we should hold no snapshot, and correspondingly our xmin
+        * should be unset.
+        */
+       Assert(MyProc->xmin == InvalidTransactionId);
+
+       waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+
+       /*
+        * Process the result of WaitForLSNReplay().  Throw appropriate error if
+        * needed.
+        */
+       switch (waitLSNResult)
+       {
+               case WAIT_LSN_RESULT_SUCCESS:
+                       /* Nothing to do on success */
+                       result = "success";
+                       break;
+
+               case WAIT_LSN_RESULT_TIMEOUT:
+                       if (throw)
+                               ereport(ERROR,
+                                               errcode(ERRCODE_QUERY_CANCELED),
+                                               errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+                                                          LSN_FORMAT_ARGS(lsn),
+                                                          LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+                       else
+                               result = "timeout";
+                       break;
+
+               case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+                       if (throw)
+                       {
+                               if (PromoteIsTriggered())
+                               {
+                                       ereport(ERROR,
+                                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                       errmsg("recovery is not in progress"),
+                                                       errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+                                                                         LSN_FORMAT_ARGS(lsn),
+                                                                         LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+                               }
+                               else
+                                       ereport(ERROR,
+                                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                       errmsg("recovery is not in progress"),
+                                                       errhint("Waiting for the replay LSN can only be executed during recovery."));
+                       }
+                       else
+                               result = "not in recovery";
+                       break;
+       }
+
+       /* need a tuple descriptor representing a single TEXT column */
+       tupdesc = WaitStmtResultDesc(stmt);
+
+       /* prepare for projection of tuples */
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+       /* Send it */
+       do_text_output_oneline(tstate, result);
+
+       end_tup_output(tstate);
+}
+
+TupleDesc
+WaitStmtResultDesc(WaitStmt *stmt)
+{
+       TupleDesc       tupdesc;
+
+       /* Need a tuple descriptor representing a single TEXT  column */
+       tupdesc = CreateTemplateTupleDesc(1);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                                          TEXTOID, -1, 0);
+       return tupdesc;
+}
index 8a0470d5b84176633a58f3d94c5d586c313d47dd..57fe01865470d3785b30fe210e0ca688f6d19ce2 100644 (file)
@@ -308,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
                SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
                UnlistenStmt UpdateStmt VacuumStmt
                VariableResetStmt VariableSetStmt VariableShowStmt
-               ViewStmt CheckPointStmt CreateConversionStmt
+               ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
                DeallocateStmt PrepareStmt ExecuteStmt
                DropOwnedStmt ReassignOwnedStmt
                AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -325,6 +325,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <boolean>                opt_concurrently
 %type <dbehavior>      opt_drop_behavior
 %type <list>           opt_utility_option_list
+%type <list>           opt_wait_with_clause
 %type <list>           utility_option_list
 %type <defelt>         utility_option_elem
 %type <str>                    utility_option_name
@@ -678,7 +679,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
                                json_object_constructor_null_clause_opt
                                json_array_constructor_null_clause_opt
 
-
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -748,7 +748,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
        LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
        LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-       LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+       LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN_P
 
        MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE MERGE_ACTION METHOD
        MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -792,7 +792,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
        VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
        VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE
 
-       WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+       WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
        XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
        XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1120,6 +1120,7 @@ stmt:
                        | VariableSetStmt
                        | VariableShowStmt
                        | ViewStmt
+                       | WaitStmt
                        | /*EMPTY*/
                                { $$ = NULL; }
                ;
@@ -16482,6 +16483,26 @@ xml_passing_mech:
                        | BY VALUE_P
                ;
 
+/*****************************************************************************
+ *
+ * WAIT FOR LSN
+ *
+ *****************************************************************************/
+
+WaitStmt:
+                       WAIT FOR LSN_P Sconst opt_wait_with_clause
+                               {
+                                       WaitStmt *n = makeNode(WaitStmt);
+                                       n->lsn_literal = $4;
+                                       n->options = $5;
+                                       $$ = (Node *) n;
+                               }
+                       ;
+
+opt_wait_with_clause:
+                       WITH '(' utility_option_list ')'                { $$ = $3; }
+                       | /*EMPTY*/                                                             { $$ = NIL; }
+                       ;
 
 /*
  * Aggregate decoration clauses
@@ -17969,6 +17990,7 @@ unreserved_keyword:
                        | LOCK_P
                        | LOCKED
                        | LOGGED
+                       | LSN_P
                        | MAPPING
                        | MATCH
                        | MATCHED
@@ -18139,6 +18161,7 @@ unreserved_keyword:
                        | VIEWS
                        | VIRTUAL
                        | VOLATILE
+                       | WAIT
                        | WHITESPACE_P
                        | WITHIN
                        | WITHOUT
@@ -18585,6 +18608,7 @@ bare_label_keyword:
                        | LOCK_P
                        | LOCKED
                        | LOGGED
+                       | LSN_P
                        | MAPPING
                        | MATCH
                        | MATCHED
@@ -18796,6 +18820,7 @@ bare_label_keyword:
                        | VIEWS
                        | VIRTUAL
                        | VOLATILE
+                       | WAIT
                        | WHEN
                        | WHITESPACE_P
                        | WORK
index 96f29aafc391edabe28518237be8b406c606766a..26b201eadb856155c100108821e7a47b5342d5ad 100644 (file)
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -947,6 +948,11 @@ ProcKill(int code, Datum arg)
         */
        LWLockReleaseAll();
 
+       /*
+        * Cleanup waiting for LSN if any.
+        */
+       WaitLSNCleanup();
+
        /* Cancel any pending condition variable sleep, too */
        ConditionVariableCancelSleep();
 
index 74179139fa90ef8edb5334b7c2ba131c02d07431..fde78c551607e6abd0ff84aae7aeb8ec48b570a5 100644 (file)
@@ -1158,10 +1158,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
        MemoryContextSwitchTo(portal->portalContext);
 
        /*
-        * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
-        * under us, so don't complain if it's now empty.  Otherwise, our snapshot
-        * should be the top one; pop it.  Note that this could be a different
-        * snapshot from the one we made above; see EnsurePortalSnapshotExists.
+        * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot
+        * stack from under us, so don't complain if it's now empty.  Otherwise,
+        * our snapshot should be the top one; pop it.  Note that this could be a
+        * different snapshot from the one we made above; see
+        * EnsurePortalSnapshotExists.
         */
        if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
        {
@@ -1738,7 +1739,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
                IsA(utilityStmt, ListenStmt) ||
                IsA(utilityStmt, NotifyStmt) ||
                IsA(utilityStmt, UnlistenStmt) ||
-               IsA(utilityStmt, CheckPointStmt))
+               IsA(utilityStmt, CheckPointStmt) ||
+               IsA(utilityStmt, WaitStmt))
                return false;
 
        return true;
index 918db53dd5e7ddc0ea8a306ac1a9227c545e873f..082967c0a86d15ecf300ff3ec33a24889aa945ba 100644 (file)
@@ -56,6 +56,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
                case T_PrepareStmt:
                case T_UnlistenStmt:
                case T_VariableSetStmt:
+               case T_WaitStmt:
                        {
                                /*
                                 * These modify only backend-local state, so they're OK to run
@@ -1055,6 +1057,12 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                break;
                        }
 
+               case T_WaitStmt:
+                       {
+                               ExecWaitStmt(pstate, (WaitStmt *) parsetree, dest);
+                       }
+                       break;
+
                default:
                        /* All other statement types have event trigger support */
                        ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2059,6 +2067,9 @@ UtilityReturnsTuples(Node *parsetree)
                case T_VariableShowStmt:
                        return true;
 
+               case T_WaitStmt:
+                       return true;
+
                default:
                        return false;
        }
@@ -2114,6 +2125,9 @@ UtilityTupleDescriptor(Node *parsetree)
                                return GetPGVariableResultDesc(n->name);
                        }
 
+               case T_WaitStmt:
+                       return WaitStmtResultDesc((WaitStmt *) parsetree);
+
                default:
                        return NULL;
        }
@@ -3091,6 +3105,10 @@ CreateCommandTag(Node *parsetree)
                        }
                        break;
 
+               case T_WaitStmt:
+                       tag = CMDTAG_WAIT;
+                       break;
+
                        /* already-planned queries */
                case T_PlannedStmt:
                        {
@@ -3689,6 +3707,10 @@ GetCommandLogLevel(Node *parsetree)
                        lev = LOGSTMT_DDL;
                        break;
 
+               case T_WaitStmt:
+                       lev = LOGSTMT_ALL;
+                       break;
+
                        /* already-planned queries */
                case T_PlannedStmt:
                        {
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644 (file)
index 0000000..ce33213
--- /dev/null
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *       prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+
+#include "nodes/parsenodes.h"
+#include "parser/parse_node.h"
+#include "tcop/dest.h"
+
+extern void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest);
+extern TupleDesc WaitStmtResultDesc(WaitStmt *stmt);
+
+#endif                                                 /* WAIT_H */
index ecbddd12e1b321d1e9f15fd93a6afd2436038e12..d14294a4eceb7d443118e15bd6b38d1bb890c72d 100644 (file)
@@ -4385,4 +4385,12 @@ typedef struct DropSubscriptionStmt
        DropBehavior behavior;          /* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+typedef struct WaitStmt
+{
+       NodeTag         type;
+       char       *lsn_literal;        /* LSN string from grammar */
+       List       *options;            /* List of DefElem nodes */
+} WaitStmt;
+
+
 #endif                                                 /* PARSENODES_H */
index 84182eaaae2aef470fd0080a62e84ed76a7fa51d..5d4fe27ef96276a58c10593ed052206434dad973 100644 (file)
@@ -270,6 +270,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("lsn", LSN_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -496,6 +497,7 @@ PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("virtual", VIRTUAL, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
index d250a714d597d18037e1558a54a2be3f380a98d2..c4606d65043e17029e740970c5131877a4706a95 100644 (file)
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false)
index 52993c32dbba47204bd950b27573a715ccfc37d8..523a5cd5b52795b3240970c60f431f1a9b7ee3cd 100644 (file)
@@ -56,7 +56,8 @@ tests += {
       't/045_archive_restartpoint.pl',
       't/046_checkpoint_logical_slot.pl',
       't/047_checkpoint_physical_slot.pl',
-      't/048_vacuum_horizon_floor.pl'
+      't/048_vacuum_horizon_floor.pl',
+      't/049_wait_for_lsn.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
new file mode 100644 (file)
index 0000000..e0ddb06
--- /dev/null
@@ -0,0 +1,302 @@
+# Checks waiting for the LSN replay on standby using
+# the WAIT FOR command.
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+       "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+       has_streaming => 1);
+$node_standby->append_conf(
+       'postgresql.conf', qq[
+       recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# 1. Make sure that WAIT FOR works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby.
+$node_primary->safe_psql('postgres',
+       "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn1}' WITH (timeout '1d');
+       SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby is at least as big as the LSN we
+# observed on primary's before.
+ok((split("\n", $output))[-1] >= 0,
+       "standby reached the same LSN as primary after WAIT FOR");
+
+# 2. Check that new data is visible after calling WAIT FOR
+$node_primary->safe_psql('postgres',
+       "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn2}';
+       SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the count(*) on standby reflects the recent changes on primary
+ok((split("\n", $output))[-1] eq 30,
+       "standby reached the same LSN as primary");
+
+# 3. Check that waiting for unreachable LSN triggers the timeout.  The
+# unreachable LSN must be well in advance.  So WAL records issued by
+# the concurrent autovacuum could not affect that.
+my $lsn3 =
+  $node_primary->safe_psql('postgres',
+       "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $stderr;
+$node_standby->safe_psql('postgres',
+       "WAIT FOR LSN '${lsn2}' WITH (timeout '10ms');");
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${lsn3}' WITH (timeout '1000ms');",
+       stderr => \$stderr);
+ok( $stderr =~ /timed out while waiting for target LSN/,
+       "get timeout on waiting for unreachable LSN");
+
+$output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]);
+ok($output eq "success",
+       "WAIT FOR returns correct status after successful waiting");
+$output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
+ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
+
+# 4. Check that WAIT FOR triggers an error if called on primary,
+# within another function, or inside a transaction with an isolation level
+# higher than READ COMMITTED.
+
+$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';",
+       stderr => \$stderr);
+ok( $stderr =~ /recovery is not in progress/,
+       "get an error when running on the primary");
+
+$node_standby->psql(
+       'postgres',
+       "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT 1; WAIT FOR LSN '${lsn3}';",
+       stderr => \$stderr);
+ok( $stderr =~
+         /WAIT FOR must be only called without an active or registered snapshot/,
+       "get an error when running in a transaction with an isolation level higher than REPEATABLE READ"
+);
+
+$node_primary->safe_psql(
+       'postgres', qq[
+CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$
+  BEGIN
+    EXECUTE format('WAIT FOR LSN %L;', target_lsn);
+  END
+\$\$
+LANGUAGE plpgsql;
+]);
+
+$node_primary->wait_for_catchup($node_standby);
+$node_standby->psql(
+       'postgres',
+       "SELECT pg_wal_replay_wait_wrap('${lsn3}');",
+       stderr => \$stderr);
+ok( $stderr =~
+         /WAIT FOR must be only called without an active or registered snapshot/,
+       "get an error when running within another function");
+
+# 5. Check parameter validation error cases on standby before promotion
+my $test_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Test negative timeout
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' WITH (timeout '-1000ms');",
+       stderr => \$stderr);
+ok($stderr =~ /timeout cannot be negative/, "get error for negative timeout");
+
+# Test unknown parameter with WITH clause
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' WITH (unknown_param 'value');",
+       stderr => \$stderr);
+ok($stderr =~ /option "unknown_param" not recognized/,
+       "get error for unknown parameter");
+
+# Test duplicate TIMEOUT parameter with WITH clause
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' WITH (timeout '1000', timeout '2000');",
+       stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+       "get error for duplicate TIMEOUT parameter");
+
+# Test duplicate NO_THROW parameter with WITH clause
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' WITH (no_throw, no_throw);",
+       stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+       "get error for duplicate NO_THROW parameter");
+
+# Test syntax error - options without WITH keyword
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' (timeout '100ms');",
+       stderr => \$stderr);
+ok($stderr =~ /syntax error/,
+       "get syntax error when options specified without WITH keyword");
+
+# Test syntax error - missing LSN
+$node_standby->psql('postgres', "WAIT FOR TIMEOUT 1000;", stderr => \$stderr);
+ok($stderr =~ /syntax error/, "get syntax error for missing LSN");
+
+# Test invalid LSN format
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN 'invalid_lsn';",
+       stderr => \$stderr);
+ok($stderr =~ /invalid input syntax for type pg_lsn/,
+       "get error for invalid LSN format");
+
+# Test invalid timeout format
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' WITH (timeout 'invalid');",
+       stderr => \$stderr);
+ok($stderr =~ /invalid timeout value/,
+       "get error for invalid timeout format");
+
+# Test new WITH clause syntax
+$output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]);
+ok($output eq "success", "WAIT FOR WITH clause syntax works correctly");
+
+$output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn3}' WITH (timeout 100, no_throw);]);
+ok($output eq "timeout",
+       "WAIT FOR WITH clause returns correct timeout status");
+
+# Test WITH clause error case - invalid option
+$node_standby->psql(
+       'postgres',
+       "WAIT FOR LSN '${test_lsn}' WITH (invalid_option 'value');",
+       stderr => \$stderr);
+ok( $stderr =~ /option "invalid_option" not recognized/,
+       "get error for invalid WITH clause option");
+
+# 6. Check the scenario of multiple LSN waiters.  We make 5 background
+# psql sessions each waiting for a corresponding insertion.  When waiting is
+# finished, stored procedures logs if there are visible as many rows as
+# should be.
+$node_primary->safe_psql(
+       'postgres', qq[
+CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
+  DECLARE
+    count int;
+  BEGIN
+    SELECT count(*) FROM wait_test INTO count;
+    IF count >= 31 + i THEN
+      RAISE LOG 'count %', i;
+    END IF;
+  END
+\$\$
+LANGUAGE plpgsql;
+]);
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+my @psql_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+       $node_primary->safe_psql('postgres',
+               "INSERT INTO wait_test VALUES (${i});");
+       my $lsn =
+         $node_primary->safe_psql('postgres',
+               "SELECT pg_current_wal_insert_lsn()");
+       $psql_sessions[$i] = $node_standby->background_psql('postgres');
+       $psql_sessions[$i]->query_until(
+               qr/start/, qq[
+               \\echo start
+               WAIT FOR LSN '${lsn}';
+               SELECT log_count(${i});
+       ]);
+}
+my $log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+for (my $i = 0; $i < 5; $i++)
+{
+       $node_standby->wait_for_log("count ${i}", $log_offset);
+       $psql_sessions[$i]->quit;
+}
+
+ok(1, 'multiple LSN waiters reported consistent data');
+
+# 7. Check that the standby promotion terminates the wait on LSN.  Start
+# waiting for an unreachable LSN then promote.  Check the log for the relevant
+# error message.  Also, check that waiting for already replayed LSN doesn't
+# cause an error even after promotion.
+my $lsn4 =
+  $node_primary->safe_psql('postgres',
+       "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $lsn5 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $psql_session = $node_standby->background_psql('postgres');
+$psql_session->query_until(
+       qr/start/, qq[
+       \\echo start
+       WAIT FOR LSN '${lsn4}';
+]);
+
+# Make sure standby will be promoted at least at the primary insert LSN we
+# have just observed.  Use pg_switch_wal() to force the insert LSN to be
+# written then wait for standby to catchup.
+$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+$node_primary->wait_for_catchup($node_standby);
+
+$log_offset = -s $node_standby->logfile;
+$node_standby->promote;
+$node_standby->wait_for_log('recovery is not in progress', $log_offset);
+
+ok(1, 'got error after standby promote');
+
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
+
+ok(1, 'wait for already replayed LSN exits immediately even after promotion');
+
+$output = $node_standby->safe_psql(
+       'postgres', qq[
+       WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]);
+ok($output eq "not in recovery",
+       "WAIT FOR returns correct status after standby promotion");
+
+
+$node_standby->stop;
+$node_primary->stop;
+
+# If we send \q with $psql_session->quit the command can be sent to the session
+# already closed. So \q is in initial script, here we only finish IPC::Run.
+$psql_session->{run}->finish;
+
+done_testing();
index 73df31344be0d54c706166b1084004f03408aec2..432509277c98118a30cb624fae0340f5b9b07fb0 100644 (file)
@@ -3272,6 +3272,7 @@ WaitLSNState
 WaitLSNProcInfo
 WaitLSNResult
 WaitPMResult
+WaitStmt
 WalCloseMethod
 WalCompression
 WalInsertClass