TimeLineID tli);
static void XLogWalRcvFlush(bool dying, TimeLineID tli);
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
WalRcvComputeNextWakeup(i, now);
/* Send initial reply/feedback messages. */
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(true, false, false);
XLogWalRcvSendHSFeedback(true);
/* Loop until end-of-streaming or error */
}
/* Let the primary know that we received some data. */
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(false, false, false);
/*
* If we've written some records, flush them to disk and
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
- if (walrcv->force_reply)
+ if (walrcv->apply_reply_requested)
{
/*
* The recovery process has asked us to send apply
* false in shared memory before sending the reply, so
* we don't miss a new request for a reply.
*/
- walrcv->force_reply = false;
+ walrcv->apply_reply_requested = false;
pg_memory_barrier();
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(false, false, true);
}
}
if (rc & WL_TIMEOUT)
wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
}
- XLogWalRcvSendReply(requestReply, requestReply);
+ XLogWalRcvSendReply(requestReply, requestReply, false);
XLogWalRcvSendHSFeedback(false);
}
}
/* If the primary requested a reply, send one immediately */
if (replyRequested)
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(true, false, false);
break;
}
default:
/* Also let the primary know that we made some progress */
if (!dying)
{
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(false, false, false);
XLogWalRcvSendHSFeedback(false);
}
}
}
/*
- * Send reply message to primary, indicating our current WAL locations, oldest
- * xmin and the current time.
+ * Send reply message to primary, indicating our current WAL locations and
+ * time.
*
- * If 'force' is not set, the message is only sent if enough time has
- * passed since last status update to reach wal_receiver_status_interval.
- * If wal_receiver_status_interval is disabled altogether and 'force' is
- * false, this is a no-op.
+ * The message is sent if 'force' is set, if enough time has passed since the
+ * last update to reach wal_receiver_status_interval, or if WAL locations have
+ * advanced since the previous status update. If wal_receiver_status_interval
+ * is disabled and 'force' is false, this function does nothing. Set 'force' to
+ * send the message unconditionally.
+ *
+ * Whether WAL locations are considered "advanced" depends on 'checkApply'.
+ * If 'checkApply' is false, only the write and flush locations are checked.
+ * This should be used when the call is triggered by write/flush activity
+ * (e.g., after walreceiver writes or flushes WAL), and avoids the
+ * apply-location check, which requires a spinlock. If 'checkApply' is true,
+ * the apply location is also considered. This should be used when the apply
+ * location is expected to advance (e.g., when the startup process requests
+ * an apply notification).
*
* If 'requestReply' is true, requests the server to reply immediately upon
* receiving this message. This is used for heartbeats, when approaching
* wal_receiver_timeout.
*/
static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply)
{
static XLogRecPtr writePtr = InvalidXLogRecPtr;
static XLogRecPtr flushPtr = InvalidXLogRecPtr;
- XLogRecPtr applyPtr;
+ static XLogRecPtr applyPtr = InvalidXLogRecPtr;
+ XLogRecPtr latestApplyPtr = InvalidXLogRecPtr;
TimestampTz now;
/*
/*
* We can compare the write and flush positions to the last message we
* sent without taking any lock, but the apply position requires a spin
- * lock, so we don't check that unless something else has changed or 10
- * seconds have passed. This means that the apply WAL location will
- * appear, from the primary's point of view, to lag slightly, but since
- * this is only for reporting purposes and only on idle systems, that's
- * probably OK.
+ * lock, so we don't check that unless it is expected to advance since the
+ * previous update, i.e., when 'checkApply' is true.
*/
- if (!force
- && writePtr == LogstreamResult.Write
- && flushPtr == LogstreamResult.Flush
- && now < wakeup[WALRCV_WAKEUP_REPLY])
- return;
+ if (!force && now < wakeup[WALRCV_WAKEUP_REPLY])
+ {
+ if (checkApply)
+ latestApplyPtr = GetXLogReplayRecPtr(NULL);
+
+ if (writePtr == LogstreamResult.Write
+ && flushPtr == LogstreamResult.Flush
+ && (!checkApply || applyPtr == latestApplyPtr))
+ return;
+ }
/* Make sure we wake up when it's time to send another reply. */
WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyPtr = (latestApplyPtr == InvalidXLogRecPtr) ?
+ GetXLogReplayRecPtr(NULL) : latestApplyPtr;
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
* synchronous_commit = remote_apply.
*/
void
-WalRcvForceReply(void)
+WalRcvRequestApplyReply(void)
{
ProcNumber procno;
- WalRcv->force_reply = true;
+ WalRcv->apply_reply_requested = true;
/* fetching the proc number is probably atomic, but don't rely on it */
SpinLockAcquire(&WalRcv->mutex);
procno = WalRcv->procno;