/* Timestamp when walsender received the shutdown request */
static TimestampTz shutdown_request_timestamp = 0;
+/*
+ * Set after queueing the CommandComplete message that ends WAL streaming
+ * during shutdown. This prevents WalSndDone() and WalSndDoneImmediate()
+ * from queueing the same message twice.
+ */
+static bool shutdown_stream_done_queued = false;
+
/*
* While streaming WAL in Copy mode, streamingDoneSending is set to true
* after we have sent CopyDone. We should not send any more CopyData messages
{
WalSndState state = MyWalSnd->state;
- if (state == WALSNDSTATE_CATCHUP ||
- state == WALSNDSTATE_STREAMING ||
- state == WALSNDSTATE_STOPPING)
+ if ((state == WALSNDSTATE_CATCHUP ||
+ state == WALSNDSTATE_STREAMING ||
+ state == WALSNDSTATE_STOPPING) &&
+ !shutdown_stream_done_queued)
{
QueryCompletion qc;
/* Try to inform receiver that XLOG streaming is done */
SetQueryCompletion(&qc, CMDTAG_COPY, 0);
- EndCommand(&qc, DestRemote, false);
+ EndCommandExtended(&qc, DestRemote, false, true);
+ shutdown_stream_done_queued = true;
/*
* Note that the output buffer may be full during the forced shutdown
{
QueryCompletion qc;
+ Assert(!shutdown_stream_done_queued);
+
/* Inform the standby that XLOG streaming is done */
SetQueryCompletion(&qc, CMDTAG_COPY, 0);
- EndCommand(&qc, DestRemote, false);
- pq_flush();
+ EndCommandExtended(&qc, DestRemote, false, true);
+ shutdown_stream_done_queued = true;
+
+ /*
+ * Reset last_reply_timestamp so subsequent WalSndComputeSleeptime()
+ * calls ignore wal_sender_timeout during shutdown.
+ */
+ last_reply_timestamp = 0;
+
+ /*
+ * Do not call pq_flush() here, since it can block indefinitely while
+ * waiting for the socket to become writable, preventing
+ * wal_sender_shutdown_timeout from being enforced. Instead, use the
+ * walsender nonblocking flush path so the shutdown timeout continues
+ * to be checked while the send buffer drains.
+ */
+ for (;;)
+ {
+ long sleeptime;
+
+ /*
+ * During shutdown, die if the shutdown timeout expires. Call this
+ * before WalSndComputeSleeptime() so the timeout is considered
+ * when computing sleep time.
+ */
+ WalSndCheckShutdownTimeout();
+
+ if (!pq_is_send_pending())
+ break;
+
+ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+
+ /* Sleep until something happens or we time out */
+ WalSndWait(WL_SOCKET_WRITEABLE, sleeptime,
+ WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+ }
proc_exit(0);
}
* EndCommand - clean up the destination at end of command
* ----------------
*/
+
void
-EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
+EndCommandExtended(const QueryCompletion *qc, CommandDest dest,
+ bool force_undecorated_output, bool noblock)
{
char completionTag[COMPLETION_TAG_BUFSIZE];
Size len;
len = BuildQueryCompletionString(completionTag, qc,
force_undecorated_output);
- pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1);
+ if (noblock)
+ pq_putmessage_noblock(PqMsg_CommandComplete, completionTag, len + 1);
+ else
+ pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1);
break;
case DestNone:
}
}
+void
+EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
+{
+ EndCommandExtended(qc, dest, force_undecorated_output, false);
+}
+
/* ----------------
* EndReplicationCommand - stripped down version of EndCommand
*