]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Extend xlogwait infrastructure with write and flush wait types
authorAlexander Korotkov <akorotkov@postgresql.org>
Mon, 5 Jan 2026 17:40:31 +0000 (19:40 +0200)
committerAlexander Korotkov <akorotkov@postgresql.org>
Mon, 5 Jan 2026 17:56:19 +0000 (19:56 +0200)
Add support for waiting on WAL write and flush LSNs in addition to the
existing replay LSN wait type. This provides the foundation for
extending the WAIT FOR command with MODE parameter.

Key changes are following.
- Add WAIT_LSN_TYPE_STANDBY_WRITE and WAIT_LSN_TYPE_STANDBY_FLUSH to
  WaitLSNType.
- Add GetCurrentLSNForWaitType() to retrieve the current LSN for each wait
  type.
- Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and
  WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility.
- Update WaitForLSN() to use GetCurrentLSNForWaitType() internally.

Discussion: https://postgr.es/m/CABPTF7UiArgW-sXj9CNwRzUhYOQrevLzkYcgBydmX5oDes1sjg%40mail.gmail.com
Author: Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Alexander Korotkov <aekorotkov@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@kurilemu.de>
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogrecovery.c
src/backend/access/transam/xlogwait.c
src/backend/commands/wait.c
src/backend/utils/activity/wait_event_names.txt
src/include/access/xlogwait.h

index e71b6e21123c2c1abd090c09fc0fa139555cfdb3..05ac7c5f7f81c13566805fb8f31dd665d0ddafef 100644 (file)
@@ -6280,7 +6280,7 @@ StartupXLOG(void)
         * Wake up all waiters for replay LSN.  They need to report an error that
         * recovery was ended before reaching the target LSN.
         */
-       WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+       WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr);
 
        /*
         * Shutdown the recovery environment.  This must occur after
index a21ac48c9fe302839d2dbe34503a0e67a16f28e4..0b5f871abe748c8c112b097e9b5943dba161cd86 100644 (file)
@@ -1856,8 +1856,8 @@ PerformWalRecovery(void)
                         */
                        if (waitLSNState &&
                                (XLogRecoveryCtl->lastReplayedEndRecPtr >=
-                                pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
-                               WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+                                pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_REPLAY])))
+                               WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
 
                        /* Else, try to fetch the next WAL record */
                        record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
index 6c2bda763e2102310b306c14369d11d4c23ae5c4..5020ae1e52d4ea596c348a1b9b8c74ddaf94c21b 100644 (file)
  *             This file implements waiting for WAL operations to reach specific LSNs
  *             on both physical standby and primary servers. The core idea is simple:
  *             every process that wants to wait publishes the LSN it needs to the
- *             shared memory, and the appropriate process (startup on standby, or
- *             WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *             shared memory, and the appropriate process (startup on standby,
+ *             walreceiver on standby, or WAL writer/backend on primary) wakes it
+ *             once that LSN has been reached.
  *
  *             The shared memory used by this module comprises a procInfos
  *             per-backend array with the information of the awaited LSN for each
  *             of the backend processes.  The elements of that array are organized
- *             into a pairing heap waitersHeap, which allows for very fast finding
- *             of the least awaited LSN.
+ *             into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ *             allows for very fast finding of the least awaited LSN for each type.
  *
- *             In addition, the least-awaited LSN is cached as minWaitedLSN.  The
- *             waiter process publishes information about itself to the shared
- *             memory and waits on the latch until it is woken up by the appropriate
- *             process, standby is promoted, or the postmaster dies.  Then, it cleans
- *             information about itself in the shared memory.
+ *             In addition, the least-awaited LSN for each type is cached in the
+ *             minWaitedLSN array.  The waiter process publishes information about
+ *             itself to the shared memory and waits on the latch until it is woken
+ *             up by the appropriate process, standby is promoted, or the postmaster
+ *             dies.  Then, it cleans information about itself in the shared memory.
  *
- *             On standby servers: After replaying a WAL record, the startup process
- *             first performs a fast path check minWaitedLSN > replayLSN.  If this
- *             check is negative, it checks waitersHeap and wakes up the backend
- *             whose awaited LSNs are reached.
+ *             On standby servers:
+ *             - After replaying a WAL record, the startup process performs a fast
+ *               path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
+ *               negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ *               whose awaited LSNs are reached.
+ *             - After receiving WAL, the walreceiver process performs similar checks
+ *               against the flush and write LSNs, waking up waiters in the FLUSH
+ *               and WRITE heaps, respectively.
  *
  *             On primary servers: After flushing WAL, the WAL writer or backend
  *             process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
 #include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -62,6 +68,47 @@ static int   waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
 
 struct WaitLSNState *waitLSNState = NULL;
 
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+       [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+       [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+       [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+       [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
+                                "WaitLSNWaitEvents must match WaitLSNType enum");
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+       Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
+
+       switch (lsnType)
+       {
+               case WAIT_LSN_TYPE_STANDBY_REPLAY:
+                       return GetXLogReplayRecPtr(NULL);
+
+               case WAIT_LSN_TYPE_STANDBY_WRITE:
+                       return GetWalRcvWriteRecPtr();
+
+               case WAIT_LSN_TYPE_STANDBY_FLUSH:
+                       return GetWalRcvFlushRecPtr(NULL, NULL);
+
+               case WAIT_LSN_TYPE_PRIMARY_FLUSH:
+                       return GetFlushRecPtr(NULL);
+       }
+
+       elog(ERROR, "invalid LSN wait type: %d", lsnType);
+       pg_unreachable();
+}
+
 /* Report the amount of shared memory space needed for WaitLSNState. */
 Size
 WaitLSNShmemSize(void)
@@ -302,6 +349,19 @@ WaitLSNCleanup(void)
        }
 }
 
+/*
+ * Check if the given LSN type requires recovery to be in progress.
+ * Standby wait types (replay, write, flush) require recovery;
+ * primary wait types (flush) do not.
+ */
+static inline bool
+WaitLSNTypeRequiresRecovery(WaitLSNType t)
+{
+       return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
+               t == WAIT_LSN_TYPE_STANDBY_WRITE ||
+               t == WAIT_LSN_TYPE_STANDBY_FLUSH;
+}
+
 /*
  * Wait using MyLatch till the given LSN is reached, the replica gets
  * promoted, or the postmaster dies.
@@ -341,13 +401,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
                int                     rc;
                long            delay_ms = -1;
 
-               if (lsnType == WAIT_LSN_TYPE_REPLAY)
-                       currentLSN = GetXLogReplayRecPtr(NULL);
-               else
-                       currentLSN = GetFlushRecPtr(NULL);
+               /* Get current LSN for the wait type */
+               currentLSN = GetCurrentLSNForWaitType(lsnType);
 
                /* Check that recovery is still in-progress */
-               if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+               if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
                {
                        /*
                         * Recovery was ended, but check if target LSN was already
@@ -376,7 +434,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
                CHECK_FOR_INTERRUPTS();
 
                rc = WaitLatch(MyLatch, wake_events, delay_ms,
-                                          (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+                                          WaitLSNWaitEvents[lsnType]);
 
                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
index e4509fffe069575b3be84e361694310afb0fb7ba..57d2dec07f150426ee3802a7a37ee712c7b83020 100644 (file)
@@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
         */
        Assert(MyProc->xmin == InvalidTransactionId);
 
-       waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+       waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout);
 
        /*
         * Process the result of WaitForLSN().  Throw appropriate error if needed.
index 43d870dbcf10aefb81da1c4de1670d7a6320e681..3299de23bb3ade340f3857194246df4c1c1a369c 100644 (file)
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT      "Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE       "Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER        "Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION  "Waiting for WAL to be received and flushed by the physical standby."
-WAIT_FOR_WAL_FLUSH     "Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_FLUSH     "Waiting for WAL flush to reach a target LSN on a primary or standby."
 WAIT_FOR_WAL_REPLAY    "Waiting for WAL replay to reach a target LSN on a standby."
+WAIT_FOR_WAL_WRITE     "Waiting for WAL write to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL        "Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA  "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
index b5fd3e74f1c52984f759706ae4798a1024d18798..d12531d32b809fed3a622e0a599024f57ee28227 100644 (file)
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * xlogwait.h
- *       Declarations for LSN replay waiting routines.
+ *       Declarations for WAL flush, write, and replay waiting routines.
  *
  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
  *
@@ -35,11 +35,16 @@ typedef enum
  */
 typedef enum WaitLSNType
 {
-       WAIT_LSN_TYPE_REPLAY,           /* Waiting for replay on standby */
-       WAIT_LSN_TYPE_FLUSH,            /* Waiting for flush on primary */
+       /* Standby wait types (walreceiver/startup wakes) */
+       WAIT_LSN_TYPE_STANDBY_REPLAY,
+       WAIT_LSN_TYPE_STANDBY_WRITE,
+       WAIT_LSN_TYPE_STANDBY_FLUSH,
+
+       /* Primary wait types (WAL writer/backends wake) */
+       WAIT_LSN_TYPE_PRIMARY_FLUSH,
 } WaitLSNType;
 
-#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_FLUSH + 1)
+#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_PRIMARY_FLUSH + 1)
 
 /*
  * WaitLSNProcInfo - the shared memory structure representing information
@@ -97,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
 extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,