* Struct for sharing information to control slot synchronization.
*
* The 'pid' is either the slot sync worker's pid or the backend's pid running
- * the SQL function pg_sync_replication_slots(). When the startup process sets
- * 'stopSignaled' during promotion, it uses this 'pid' to wake up the currently
- * synchronizing process so that the process can immediately stop its
- * synchronizing work on seeing 'stopSignaled' set.
- * Setting 'stopSignaled' is also used to handle the race condition when the
- * postmaster has not noticed the promotion yet and thus may end up restarting
- * the slot sync worker. If 'stopSignaled' is set, the worker will exit in such a
- * case. The SQL function pg_sync_replication_slots() will also error out if
- * this flag is set. Note that we don't need to reset this variable as after
- * promotion the slot sync worker won't be restarted because the pmState
- * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
- * primary without restarting the server. See LaunchMissingBackgroundProcesses.
+ * the SQL function pg_sync_replication_slots(). On promotion, the startup
+ * process sets 'stopSignaled' and uses this 'pid' to signal the synchronizing
+ * process with PROCSIG_SLOTSYNC_MESSAGE and also to wake it up so that the
+ * process can immediately stop its synchronizing work.
+ * Setting 'stopSignaled' on the other hand is used to handle the race
+ * condition when the postmaster has not noticed the promotion yet and thus may
+ * end up restarting the slot sync worker. If 'stopSignaled' is set, the worker
+ * will exit in such a case. The SQL function pg_sync_replication_slots() will
+ * also error out if this flag is set. Note that we don't need to reset this
+ * variable as after promotion the slot sync worker won't be restarted because
+ * the pmState changes to PM_RUN from PM_HOT_STANDBY and we don't support
+ * demoting primary without restarting the server.
+ * See LaunchMissingBackgroundProcesses.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
*/
static bool syncing_slots = false;
+/*
+ * Interrupt flag set when PROCSIG_SLOTSYNC_MESSAGE is received, asking the
+ * slotsync worker or pg_sync_replication_slots() to stop because
+ * standby promotion has been triggered.
+ */
+volatile sig_atomic_t SlotSyncShutdownPending = false;
+
/*
* Structure to hold information fetched from the primary server about a logical
* replication slot.
}
/*
- * Interrupt handler for process performing slot synchronization.
+ * Handle receipt of an interrupt indicating a slotsync shutdown message.
+ *
+ * This is called within the SIGUSR1 handler. All we do here is set a flag
+ * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * ProcessSlotSyncMessage().
*/
-static void
-ProcessSlotSyncInterrupts(void)
+void
+HandleSlotSyncMessageInterrupt(void)
{
- CHECK_FOR_INTERRUPTS();
+ InterruptPending = true;
+ SlotSyncShutdownPending = true;
+ /* latch will be set by procsignal_sigusr1_handler */
+}
- if (SlotSyncCtx->stopSignaled)
- {
- if (AmLogicalSlotSyncWorkerProcess())
- {
- ereport(LOG,
- errmsg("replication slot synchronization worker will stop because promotion is triggered"));
+/*
+ * Handle a PROCSIG_SLOTSYNC_MESSAGE signal, called from ProcessInterrupts().
+ *
+ * If the current process is the slotsync background worker, log a message
+ * and exit cleanly. If it is a backend executing pg_sync_replication_slots(),
+ * raise an error, unless the sync has already finished, in which case there
+ * is no need to interrupt the caller.
+ */
+void
+ProcessSlotSyncMessage(void)
+{
+ SlotSyncShutdownPending = false;
- proc_exit(0);
- }
- else
- {
- /*
- * For the backend executing SQL function
- * pg_sync_replication_slots().
- */
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot synchronization will stop because promotion is triggered"));
- }
+ if (AmLogicalSlotSyncWorkerProcess())
+ {
+ ereport(LOG,
+ errmsg("replication slot synchronization worker will stop because promotion is triggered"));
+ proc_exit(0);
}
+ else
+ {
+ /*
+ * If sync has already completed, there is no need to interrupt the
+ * caller with an error.
+ */
+ if (!IsSyncingReplicationSlots())
+ return;
- if (ConfigReloadPending)
- slotsync_reread_config();
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot synchronization will stop because promotion is triggered"));
+ }
}
/*
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+ /*
+ * Exit immediately if promotion has been triggered. This guards against
+ * a new worker (or a call to pg_sync_replication_slots()) that starts
+ * after the old worker was stopped by ShutDownSlotSync().
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ if (AmLogicalSlotSyncWorkerProcess())
+ {
+ ereport(DEBUG1,
+ errmsg("replication slot synchronization worker will not start because promotion was triggered"));
+
+ proc_exit(0);
+ }
+ else
+ {
+ /*
+ * For the backend executing SQL function
+ * pg_sync_replication_slots().
+ */
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot synchronization will not start because promotion was triggered"));
+ }
+ }
+
if (SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
bool started_tx = false;
List *remote_slots;
- ProcessSlotSyncInterrupts();
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ slotsync_reread_config();
/*
* The syscache access in fetch_remote_slots() needs a transaction
SpinLockRelease(&SlotSyncCtx->mutex);
/*
- * Signal process doing slotsync, if any. The process will stop upon
- * detecting that the stopSignaled flag is set to true.
+ * Signal process doing slotsync, if any, asking it to stop.
*/
if (sync_process_pid != InvalidPid)
- kill(sync_process_pid, SIGUSR1);
+ SendProcSignal(sync_process_pid, PROCSIG_SLOTSYNC_MESSAGE,
+ INVALID_PROC_NUMBER);
/* Wait for slot sync to end */
for (;;)
check_and_set_sync_info(MyProcPid);
- /* Check for interrupts and config changes */
- ProcessSlotSyncInterrupts();
-
validate_remote_info(wrconn);
/* Retry until all the slots are sync-ready */
bool some_slot_updated = false;
/* Check for interrupts and config changes */
- ProcessSlotSyncInterrupts();
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ slotsync_reread_config();
/* We must be in a valid transaction state */
Assert(IsTransactionState());