]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Avoid blocking indefinitely while finishing walsender shutdown
authorFujii Masao <fujii@postgresql.org>
Fri, 1 May 2026 03:12:44 +0000 (12:12 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 1 May 2026 03:12:44 +0000 (12:12 +0900)
When walsender finishes streaming during shutdown, it sends a
CommandComplete message to tell the receiver that WAL streaming is done.
Previously, that path used EndCommand() followed by pq_flush().

Those functions can block indefinitely waiting for the socket to become
writeable. As a result, even when wal_sender_shutdown_timeout is set,
walsender could remain stuck while sending the final completion message,
and the shutdown timeout would not be enforced.

Fix this by introducing EndCommandExtended(), which allows
CommandComplete to be queued with pq_putmessage_noblock(), and by
using the walsender nonblocking flush path instead of pq_flush(), so
the shutdown timeout continues to be checked while pending output is
flushed.

Per CI testing on FreeBSD.

Reported-by: Andres Freund <andres@anarazel.de>
Author: Fujii Masao <masao.fujii@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/vwlugmsogfn36jhm56zwrgd7m6xe6ircltvfh3kzt6kldvbtht@f45dgow5uhnx

src/backend/replication/walsender.c
src/backend/tcop/dest.c
src/include/tcop/dest.h

index 3d4ab929f91651194d791304bfdb8c69878e756f..04aa770d981cdef96b302c568f36fce6ad10e883 100644 (file)
@@ -209,6 +209,13 @@ static bool waiting_for_ping_response = false;
 /* Timestamp when walsender received the shutdown request */
 static TimestampTz shutdown_request_timestamp = 0;
 
+/*
+ * Set after queueing the CommandComplete message that ends WAL streaming
+ * during shutdown. This prevents WalSndDone() and WalSndDoneImmediate()
+ * from queueing the same message twice.
+ */
+static bool shutdown_stream_done_queued = false;
+
 /*
  * While streaming WAL in Copy mode, streamingDoneSending is set to true
  * after we have sent CopyDone. We should not send any more CopyData messages
@@ -3713,15 +3720,17 @@ WalSndDoneImmediate(void)
 {
        WalSndState state = MyWalSnd->state;
 
-       if (state == WALSNDSTATE_CATCHUP ||
-               state == WALSNDSTATE_STREAMING ||
-               state == WALSNDSTATE_STOPPING)
+       if ((state == WALSNDSTATE_CATCHUP ||
+                state == WALSNDSTATE_STREAMING ||
+                state == WALSNDSTATE_STOPPING) &&
+               !shutdown_stream_done_queued)
        {
                QueryCompletion qc;
 
                /* Try to inform receiver that XLOG streaming is done */
                SetQueryCompletion(&qc, CMDTAG_COPY, 0);
-               EndCommand(&qc, DestRemote, false);
+               EndCommandExtended(&qc, DestRemote, false, true);
+               shutdown_stream_done_queued = true;
 
                /*
                 * Note that the output buffer may be full during the forced shutdown
@@ -3778,10 +3787,55 @@ WalSndDone(WalSndSendDataCallback send_data)
        {
                QueryCompletion qc;
 
+               Assert(!shutdown_stream_done_queued);
+
                /* Inform the standby that XLOG streaming is done */
                SetQueryCompletion(&qc, CMDTAG_COPY, 0);
-               EndCommand(&qc, DestRemote, false);
-               pq_flush();
+               EndCommandExtended(&qc, DestRemote, false, true);
+               shutdown_stream_done_queued = true;
+
+               /*
+                * Reset last_reply_timestamp so subsequent WalSndComputeSleeptime()
+                * calls ignore wal_sender_timeout during shutdown.
+                */
+               last_reply_timestamp = 0;
+
+               /*
+                * Do not call pq_flush() here, since it can block indefinitely while
+                * waiting for the socket to become writable, preventing
+                * wal_sender_shutdown_timeout from being enforced. Instead, use the
+                * walsender nonblocking flush path so the shutdown timeout continues
+                * to be checked while the send buffer drains.
+                */
+               for (;;)
+               {
+                       long            sleeptime;
+
+                       /*
+                        * During shutdown, die if the shutdown timeout expires. Call this
+                        * before WalSndComputeSleeptime() so the timeout is considered
+                        * when computing sleep time.
+                        */
+                       WalSndCheckShutdownTimeout();
+
+                       if (!pq_is_send_pending())
+                               break;
+
+                       sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+
+                       /* Sleep until something happens or we time out */
+                       WalSndWait(WL_SOCKET_WRITEABLE, sleeptime,
+                                          WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+
+                       /* Clear any already-pending wakeups */
+                       ResetLatch(MyLatch);
+
+                       CHECK_FOR_INTERRUPTS();
+
+                       /* Try to flush pending output to the client */
+                       if (pq_flush_if_writable() != 0)
+                               WalSndShutdown();
+               }
 
                proc_exit(0);
        }
index fb163930c896b4d9ff62a725726fa4aa55645713..bdc3dad335713e206e02d4b0789d4a4b2e0c1fe3 100644 (file)
@@ -165,8 +165,10 @@ CreateDestReceiver(CommandDest dest)
  *             EndCommand - clean up the destination at end of command
  * ----------------
  */
+
 void
-EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
+EndCommandExtended(const QueryCompletion *qc, CommandDest dest,
+                                  bool force_undecorated_output, bool noblock)
 {
        char            completionTag[COMPLETION_TAG_BUFSIZE];
        Size            len;
@@ -179,7 +181,10 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
 
                        len = BuildQueryCompletionString(completionTag, qc,
                                                                                         force_undecorated_output);
-                       pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1);
+                       if (noblock)
+                               pq_putmessage_noblock(PqMsg_CommandComplete, completionTag, len + 1);
+                       else
+                               pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1);
                        break;
 
                case DestNone:
@@ -196,6 +201,12 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
        }
 }
 
+void
+EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
+{
+       EndCommandExtended(qc, dest, force_undecorated_output, false);
+}
+
 /* ----------------
  *             EndReplicationCommand - stripped down version of EndCommand
  *
index 4e4f532d8cc2fc7b75162807355ac32aa8bedb7e..103f27fc3cbde75396c74e1687620a2ccbbd5278 100644 (file)
@@ -136,6 +136,8 @@ extern PGDLLIMPORT DestReceiver *None_Receiver; /* permanent receiver for
 
 extern void BeginCommand(CommandTag commandTag, CommandDest dest);
 extern DestReceiver *CreateDestReceiver(CommandDest dest);
+extern void EndCommandExtended(const QueryCompletion *qc, CommandDest dest,
+                                                          bool force_undecorated_output, bool noblock);
 extern void EndCommand(const QueryCompletion *qc, CommandDest dest,
                                           bool force_undecorated_output);
 extern void EndReplicationCommand(const char *commandTag);