* This file implements waiting for WAL operations to reach specific LSNs
* on both physical standby and primary servers. The core idea is simple:
* every process that wants to wait publishes the LSN it needs to the
- * shared memory, and the appropriate process (startup on standby, or
- * WAL writer/backend on primary) wakes it once that LSN has been reached.
+ * shared memory, and the appropriate process (startup on standby,
+ * walreceiver on standby, or WAL writer/backend on primary) wakes it
+ * once that LSN has been reached.
*
* The shared memory used by this module comprises a procInfos
* per-backend array with the information of the awaited LSN for each
* of the backend processes. The elements of that array are organized
- * into a pairing heap waitersHeap, which allows for very fast finding
- * of the least awaited LSN.
+ * into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ * allows for very fast finding of the least awaited LSN for each type.
*
- * In addition, the least-awaited LSN is cached as minWaitedLSN. The
- * waiter process publishes information about itself to the shared
- * memory and waits on the latch until it is woken up by the appropriate
- * process, standby is promoted, or the postmaster dies. Then, it cleans
- * information about itself in the shared memory.
+ * In addition, the least-awaited LSN for each type is cached in the
+ * minWaitedLSN array. The waiter process publishes information about
+ * itself to the shared memory and waits on the latch until it is woken
+ * up by the appropriate process, standby is promoted, or the postmaster
+ * dies. Then, it cleans information about itself in the shared memory.
*
- * On standby servers: After replaying a WAL record, the startup process
- * first performs a fast path check minWaitedLSN > replayLSN. If this
- * check is negative, it checks waitersHeap and wakes up the backend
- * whose awaited LSNs are reached.
+ * On standby servers:
+ * - After replaying a WAL record, the startup process performs a fast
+ * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
+ * negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ * whose awaited LSNs are reached.
+ * - After receiving WAL, the walreceiver process performs similar checks
+ * against the flush and write LSNs, waking up waiters in the FLUSH
+ * and WRITE heaps, respectively.
*
* On primary servers: After flushing WAL, the WAL writer or backend
* process performs a similar check against the flush LSN and wakes up
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
struct WaitLSNState *waitLSNState = NULL;
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+ [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+ [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+ [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+ [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
+ "WaitLSNWaitEvents must match WaitLSNType enum");
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+ Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
+
+ switch (lsnType)
+ {
+ case WAIT_LSN_TYPE_STANDBY_REPLAY:
+ return GetXLogReplayRecPtr(NULL);
+
+ case WAIT_LSN_TYPE_STANDBY_WRITE:
+ return GetWalRcvWriteRecPtr();
+
+ case WAIT_LSN_TYPE_STANDBY_FLUSH:
+ return GetWalRcvFlushRecPtr(NULL, NULL);
+
+ case WAIT_LSN_TYPE_PRIMARY_FLUSH:
+ return GetFlushRecPtr(NULL);
+ }
+
+ elog(ERROR, "invalid LSN wait type: %d", lsnType);
+ pg_unreachable();
+}
+
/* Report the amount of shared memory space needed for WaitLSNState. */
Size
WaitLSNShmemSize(void)
}
}
+/*
+ * Check if the given LSN type requires recovery to be in progress.
+ * Standby wait types (replay, write, flush) require recovery;
+ * primary wait types (flush) do not.
+ */
+static inline bool
+WaitLSNTypeRequiresRecovery(WaitLSNType t)
+{
+ return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
+ t == WAIT_LSN_TYPE_STANDBY_WRITE ||
+ t == WAIT_LSN_TYPE_STANDBY_FLUSH;
+}
+
/*
* Wait using MyLatch till the given LSN is reached, the replica gets
* promoted, or the postmaster dies.
int rc;
long delay_ms = -1;
- if (lsnType == WAIT_LSN_TYPE_REPLAY)
- currentLSN = GetXLogReplayRecPtr(NULL);
- else
- currentLSN = GetFlushRecPtr(NULL);
+ /* Get current LSN for the wait type */
+ currentLSN = GetCurrentLSNForWaitType(lsnType);
/* Check that recovery is still in-progress */
- if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+ if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
{
/*
* Recovery was ended, but check if target LSN was already
CHECK_FOR_INTERRUPTS();
rc = WaitLatch(MyLatch, wake_events, delay_ms,
- (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+ WaitLSNWaitEvents[lsnType]);
/*
* Emergency bailout if postmaster has died. This is to avoid the
/*-------------------------------------------------------------------------
*
* xlogwait.h
- * Declarations for LSN replay waiting routines.
+ * Declarations for WAL flush, write, and replay waiting routines.
*
* Copyright (c) 2025-2026, PostgreSQL Global Development Group
*
*/
typedef enum WaitLSNType
{
- WAIT_LSN_TYPE_REPLAY, /* Waiting for replay on standby */
- WAIT_LSN_TYPE_FLUSH, /* Waiting for flush on primary */
+ /* Standby wait types (walreceiver/startup wakes) */
+ WAIT_LSN_TYPE_STANDBY_REPLAY,
+ WAIT_LSN_TYPE_STANDBY_WRITE,
+ WAIT_LSN_TYPE_STANDBY_FLUSH,
+
+ /* Primary wait types (WAL writer/backends wake) */
+ WAIT_LSN_TYPE_PRIMARY_FLUSH,
} WaitLSNType;
-#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_FLUSH + 1)
+#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_PRIMARY_FLUSH + 1)
/*
* WaitLSNProcInfo - the shared memory structure representing information
extern Size WaitLSNShmemSize(void);
extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
extern void WaitLSNCleanup(void);
extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,