]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Refactor slot synchronization logic in slotsync.c.
authorAmit Kapila <akapila@postgresql.org>
Thu, 12 Feb 2026 09:08:31 +0000 (14:38 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 12 Feb 2026 09:08:31 +0000 (14:38 +0530)
Following e68b6adad9, the reason for skipping slot synchronization is
stored as a slot property. This commit removes redundant function
parameters that previously tracked this state, instead relying directly on
the slot property.

Additionally, this change centralizes the logic for skipping
synchronization when required WAL has not yet been received or flushed. By
consolidating this check, we reduce code duplication and the risk of
inconsistent state updates across different code paths.

In passing, add an assertion to ensure a slot is marked as temporary if a
consistent point has not been reached during synchronization.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/TY4PR01MB16907DD16098BE3B20486D4569463A@TY4PR01MB16907.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/CAFPTHDZAA+gWDntpa5ucqKKba41=tXmoXqN3q4rpjO9cdxgQrw@mail.gmail.com

src/backend/replication/logical/slotsync.c

index d02d44d26a03a996bddc428ccb4ac9094166fa1d..062a08ccb886646113b4f6373f28e370c706ec83 100644 (file)
@@ -194,31 +194,40 @@ update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
  *
  * If no update was needed (the data of the remote slot is the same as the
  * local slot) return false, otherwise true.
- *
- * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
- * modified, and decoding from the corresponding LSN's can reach a
- * consistent snapshot.
- *
- * *remote_slot_precedes will be true if the remote slot's LSN or xmin
- * precedes locally reserved position.
  */
 static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
-                                                bool *found_consistent_snapshot,
-                                                bool *remote_slot_precedes)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
        ReplicationSlot *slot = MyReplicationSlot;
        bool            updated_xmin_or_lsn = false;
        bool            updated_config = false;
        SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
+       XLogRecPtr      latestFlushPtr = GetStandbyFlushRecPtr(NULL);
 
        Assert(slot->data.invalidated == RS_INVAL_NONE);
 
-       if (found_consistent_snapshot)
-               *found_consistent_snapshot = false;
+       /*
+        * Make sure that concerned WAL is received and flushed before syncing
+        * slot to target lsn received from the primary server.
+        */
+       if (remote_slot->confirmed_lsn > latestFlushPtr)
+       {
+               update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
 
-       if (remote_slot_precedes)
-               *remote_slot_precedes = false;
+               /*
+                * Can get here only if GUC 'synchronized_standby_slots' on the
+                * primary server was not configured correctly.
+                */
+               ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("skipping slot synchronization because the received slot sync"
+                                          " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+                                          LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+                                          remote_slot->name,
+                                          LSN_FORMAT_ARGS(latestFlushPtr)));
+
+               return false;
+       }
 
        /*
         * Don't overwrite if we already have a newer catalog_xmin and
@@ -262,9 +271,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                                                  LSN_FORMAT_ARGS(slot->data.restart_lsn),
                                                  slot->data.catalog_xmin));
 
-               if (remote_slot_precedes)
-                       *remote_slot_precedes = true;
-
                /*
                 * Skip updating the configuration. This is required to avoid syncing
                 * two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
@@ -304,14 +310,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                        slot->data.confirmed_flush = remote_slot->confirmed_lsn;
                        slot->data.catalog_xmin = remote_slot->catalog_xmin;
                        SpinLockRelease(&slot->mutex);
-
-                       if (found_consistent_snapshot)
-                               *found_consistent_snapshot = true;
                }
                else
                {
+                       bool            found_consistent_snapshot;
+
                        LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
-                                                                                               found_consistent_snapshot);
+                                                                                               &found_consistent_snapshot);
 
                        /* Sanity check */
                        if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
@@ -326,8 +331,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                         * If we can't reach a consistent snapshot, the slot won't be
                         * persisted. See update_and_persist_local_synced_slot().
                         */
-                       if (found_consistent_snapshot && !(*found_consistent_snapshot))
+                       if (!found_consistent_snapshot)
+                       {
+                               Assert(MyReplicationSlot->data.persistency == RS_TEMPORARY);
+
+                               ereport(LOG,
+                                               errmsg("could not synchronize replication slot \"%s\"",
+                                                          remote_slot->name),
+                                               errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
+                                                                 LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+
                                skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
+                       }
                }
 
                updated_xmin_or_lsn = true;
@@ -619,27 +634,27 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                                                                         bool *slot_persistence_pending)
 {
        ReplicationSlot *slot = MyReplicationSlot;
-       bool            found_consistent_snapshot = false;
-       bool            remote_slot_precedes = false;
 
        /* Slotsync skip stats are handled in function update_local_synced_slot() */
-       (void) update_local_synced_slot(remote_slot, remote_dbid,
-                                                                       &found_consistent_snapshot,
-                                                                       &remote_slot_precedes);
+       (void) update_local_synced_slot(remote_slot, remote_dbid);
 
        /*
-        * Check if the primary server has caught up. Refer to the comment atop
-        * the file for details on this check.
+        * Check if the slot cannot be synchronized. Refer to the comment atop the
+        * file for details on this check.
         */
-       if (remote_slot_precedes)
+       if (slot->slotsync_skip_reason != SS_SKIP_NONE)
        {
                /*
-                * The remote slot didn't catch up to locally reserved position.
+                * We reach this point when the remote slot didn't catch up to locally
+                * reserved position, or it cannot reach the consistent point from the
+                * restart_lsn, or the WAL prior to the remote confirmed flush LSN has
+                * not been received and flushed.
                 *
-                * We do not drop the slot because the restart_lsn can be ahead of the
-                * current location when recreating the slot in the next cycle. It may
-                * take more time to create such a slot. Therefore, we keep this slot
-                * and attempt the synchronization in the next cycle.
+                * We do not drop the slot because the restart_lsn and confirmed_lsn
+                * can be ahead of the current location when recreating the slot in
+                * the next cycle. It may take more time to create such a slot or
+                * reach the consistent point. Therefore, we keep this slot and
+                * attempt the synchronization in the next cycle.
                 *
                 * We also update the slot_persistence_pending parameter, so the SQL
                 * function can retry.
@@ -650,24 +665,6 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                return false;
        }
 
-       /*
-        * Don't persist the slot if it cannot reach the consistent point from the
-        * restart_lsn. See comments atop this file.
-        */
-       if (!found_consistent_snapshot)
-       {
-               ereport(LOG,
-                               errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
-                               errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
-                                                 LSN_FORMAT_ARGS(slot->data.restart_lsn)));
-
-               /* Set this, so that SQL function can retry */
-               if (slot_persistence_pending)
-                       *slot_persistence_pending = true;
-
-               return false;
-       }
-
        ReplicationSlotPersist();
 
        ereport(LOG,
@@ -698,7 +695,6 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                                         bool *slot_persistence_pending)
 {
        ReplicationSlot *slot;
-       XLogRecPtr      latestFlushPtr = GetStandbyFlushRecPtr(NULL);
        bool            slot_updated = false;
 
        /* Search for the named slot */
@@ -765,34 +761,6 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                        return slot_updated;
                }
 
-               /*
-                * Make sure that concerned WAL is received and flushed before syncing
-                * slot to target lsn received from the primary server.
-                *
-                * Report statistics only after the slot has been acquired, ensuring
-                * it cannot be dropped during the reporting process.
-                */
-               if (remote_slot->confirmed_lsn > latestFlushPtr)
-               {
-                       update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
-
-                       /*
-                        * Can get here only if GUC 'synchronized_standby_slots' on the
-                        * primary server was not configured correctly.
-                        */
-                       ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
-                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                       errmsg("skipping slot synchronization because the received slot sync"
-                                                  " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
-                                                  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
-                                                  remote_slot->name,
-                                                  LSN_FORMAT_ARGS(latestFlushPtr)));
-
-                       ReplicationSlotRelease();
-
-                       return slot_updated;
-               }
-
                /* Slot not ready yet, let's attempt to make it sync-ready now. */
                if (slot->data.persistency == RS_TEMPORARY)
                {
@@ -819,8 +787,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                                                                                   LSN_FORMAT_ARGS(slot->data.confirmed_flush),
                                                                                   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
 
-                       slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
-                                                                                                       NULL, NULL);
+                       slot_updated = update_local_synced_slot(remote_slot, remote_dbid);
                }
        }
        /* Otherwise create the slot first. */
@@ -869,34 +836,6 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                LWLockRelease(ProcArrayLock);
                LWLockRelease(ReplicationSlotControlLock);
 
-               /*
-                * Make sure that concerned WAL is received and flushed before syncing
-                * slot to target lsn received from the primary server.
-                *
-                * Report statistics only after the slot has been acquired, ensuring
-                * it cannot be dropped during the reporting process.
-                */
-               if (remote_slot->confirmed_lsn > latestFlushPtr)
-               {
-                       update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
-
-                       /*
-                        * Can get here only if GUC 'synchronized_standby_slots' on the
-                        * primary server was not configured correctly.
-                        */
-                       ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
-                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                       errmsg("skipping slot synchronization because the received slot sync"
-                                                  " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
-                                                  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
-                                                  remote_slot->name,
-                                                  LSN_FORMAT_ARGS(latestFlushPtr)));
-
-                       ReplicationSlotRelease();
-
-                       return false;
-               }
-
                update_and_persist_local_synced_slot(remote_slot, remote_dbid,
                                                                                         slot_persistence_pending);