]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add WALRCV_CONNECTING state to the WAL receiver
authorMichael Paquier <michael@paquier.xyz>
Fri, 23 Jan 2026 05:17:28 +0000 (14:17 +0900)
committerMichael Paquier <michael@paquier.xyz>
Fri, 23 Jan 2026 05:17:28 +0000 (14:17 +0900)
Previously, a WAL receiver freshly started would set its state to
WALRCV_STREAMING immediately at startup, before actually establishing a
replication connection.

This commit introduces a new state called WALRCV_CONNECTING, which is
the state used when the WAL receiver freshly starts, or when a restart
is requested, with a switch to WALRCV_STREAMING once the connection to
the upstream server has been established with COPY_BOTH, meaning that
the WAL receiver is ready to stream changes.  This change is useful for
monitoring purposes, especially in environments with a high latency
where a connection could take some time to be established, giving some
room between the [re]start phase and the streaming activity.

From the point of view of the startup process, that flips the shared
memory state of the WAL receiver when it needs to be stopped, the
existing WALRCV_STREAMING and the new WALRCV_CONNECTING states have the
same semantics: the WAL receiver has started and it can be stopped.

Based on an initial suggestion from Noah Misch, with some input from me
about the design.

Author: Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Rahila Syed <rahilasyed90@gmail.com>
Discussion: https://postgr.es/m/CABPTF7VQ5tGOSG5TS-Cg+Fb8gLCGFzxJ_eX4qg+WZ3ZPt=FtwQ@mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/include/replication/walreceiver.h

index 88450facebd130bc9239ffe18a5e94b30aba721d..b77d189a50042993953c372d3764971af92abae4 100644 (file)
@@ -1751,6 +1751,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage
           but is not yet initialized.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>connecting</literal>: WAL receiver is connecting to the
+          upstream server, replication has not yet started.
+         </para>
+        </listitem>
         <listitem>
          <para>
           <literal>stopping</literal>: WAL receiver has been requested to
index a41453530a151ccc3445127cccaaddb783f21705..6970af3f3ffcdb202d855005f10b2ea361693fbf 100644 (file)
@@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
                        /* The usual case */
                        break;
 
+               case WALRCV_CONNECTING:
                case WALRCV_WAITING:
                case WALRCV_STREAMING:
                case WALRCV_RESTARTING:
@@ -215,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
        }
        /* Advertise our PID so that the startup process can kill us */
        walrcv->pid = MyProcPid;
-       walrcv->walRcvState = WALRCV_STREAMING;
+       walrcv->walRcvState = WALRCV_CONNECTING;
 
        /* Fetch information required to start streaming */
        walrcv->ready_to_display = false;
@@ -395,6 +396,17 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
                                                           LSN_FORMAT_ARGS(startpoint), startpointTLI));
                        first_stream = false;
 
+                       /*
+                        * Switch to STREAMING after a successful connection if current
+                        * state is CONNECTING.  This switch happens after an initial
+                        * startup, or after a restart as determined by
+                        * WalRcvWaitForStartPosition().
+                        */
+                       SpinLockAcquire(&walrcv->mutex);
+                       if (walrcv->walRcvState == WALRCV_CONNECTING)
+                               walrcv->walRcvState = WALRCV_STREAMING;
+                       SpinLockRelease(&walrcv->mutex);
+
                        /* Initialize LogstreamResult and buffers for processing messages */
                        LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
                        initStringInfo(&reply_message);
@@ -650,7 +662,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
        SpinLockAcquire(&walrcv->mutex);
        state = walrcv->walRcvState;
-       if (state != WALRCV_STREAMING)
+       if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
        {
                SpinLockRelease(&walrcv->mutex);
                if (state == WALRCV_STOPPING)
@@ -689,7 +701,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
                         */
                        *startpoint = walrcv->receiveStart;
                        *startpointTLI = walrcv->receiveStartTLI;
-                       walrcv->walRcvState = WALRCV_STREAMING;
+                       walrcv->walRcvState = WALRCV_CONNECTING;
                        SpinLockRelease(&walrcv->mutex);
                        break;
                }
@@ -792,6 +804,7 @@ WalRcvDie(int code, Datum arg)
        /* Mark ourselves inactive in shared memory */
        SpinLockAcquire(&walrcv->mutex);
        Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+                  walrcv->walRcvState == WALRCV_CONNECTING ||
                   walrcv->walRcvState == WALRCV_RESTARTING ||
                   walrcv->walRcvState == WALRCV_STARTING ||
                   walrcv->walRcvState == WALRCV_WAITING ||
@@ -1391,6 +1404,8 @@ WalRcvGetStateString(WalRcvState state)
                        return "stopped";
                case WALRCV_STARTING:
                        return "starting";
+               case WALRCV_CONNECTING:
+                       return "connecting";
                case WALRCV_STREAMING:
                        return "streaming";
                case WALRCV_WAITING:
index da8794cba7c548472ab106950ad9b808ca19518b..42e3e170bc044528d13e3c46844a43a8991fd824 100644 (file)
@@ -179,7 +179,7 @@ WalRcvStreaming(void)
        }
 
        if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
-               state == WALRCV_RESTARTING)
+               state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
                return true;
        else
                return false;
@@ -211,6 +211,7 @@ ShutdownWalRcv(void)
                        stopped = true;
                        break;
 
+               case WALRCV_CONNECTING:
                case WALRCV_STREAMING:
                case WALRCV_WAITING:
                case WALRCV_RESTARTING:
index f3ad00fb6f344945fabc77903e832cbb150b7cd2..9b9bd91631487a9da47edc8f7802a15358739a4e 100644 (file)
@@ -47,6 +47,7 @@ typedef enum
        WALRCV_STOPPED,                         /* stopped and mustn't start up again */
        WALRCV_STARTING,                        /* launched, but the process hasn't
                                                                 * initialized yet */
+       WALRCV_CONNECTING,                      /* connecting to upstream server */
        WALRCV_STREAMING,                       /* walreceiver is streaming */
        WALRCV_WAITING,                         /* stopped streaming, waiting for orders */
        WALRCV_RESTARTING,                      /* asked to restart streaming */