]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix a race condition in updating procArray->replication_slot_xmin.
authorMasahiko Sawada <msawada@postgresql.org>
Tue, 30 Dec 2025 18:56:25 +0000 (10:56 -0800)
committerMasahiko Sawada <msawada@postgresql.org>
Tue, 30 Dec 2025 18:56:25 +0000 (10:56 -0800)
Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest
xmin across all slots without holding ProcArrayLock (when
already_locked is false), acquiring the lock just before updating the
replication slot xmin.

This could lead to a race condition: if a backend created a new slot
and updates the global replication slot xmin, another backend
concurrently running ReplicationSlotsComputeRequiredXmin() could
overwrite that update with an invalid or stale value. This happens
because the concurrent backend might have computed the aggregate xmin
before the new slot was accounted for, but applied the update after
the new slot had already updated the global value.

In the reported failure, a walsender for an apply worker computed
InvalidTransactionId as the oldest xmin and overwrote a valid
replication slot xmin value computed by a walsender for a tablesync
worker. Consequently, the tablesync worker computed a transaction ID
via GetOldestSafeDecodingTransactionId() effectively without
considering the replication slot xmin. This led to the error "cannot
build an initial slot snapshot as oldest safe xid %u follows
snapshot's xmin %u", which was an assertion failure prior to commit
240e0dbacd3.

To fix this, we acquire ReplicationSlotControlLock in exclusive mode
during slot creation to perform the initial update of the slot
xmin. In ReplicationSlotsComputeRequiredXmin(), we hold
ReplicationSlotControlLock in shared mode until the global slot xmin
is updated in ProcArraySetReplicationSlotXmin(). This prevents
concurrent computations and updates of the global xmin by other
backends during the initial slot xmin update process, while still
permitting concurrent calls to ReplicationSlotsComputeRequiredXmin().

Backpatch to all supported versions.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Pradeep Kumar <spradeepkumar29@gmail.com>
Reviewed-by: Hayato Kuroda (Fujitsu) <kuroda.hayato@fujitsu.com>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1L8wYcyTPxNzPGkhuO52WBGoOZbT0A73Le=ZUWYAYmdfw@mail.gmail.com
Backpatch-through: 14

src/backend/replication/logical/logical.c
src/backend/replication/logical/slotsync.c
src/backend/replication/slot.c

index 206fb932484ca3479ea7710edcf31afd1007d080..fa8c3e2f5023f39b6b5912f26e143ad03f722839 100644 (file)
@@ -405,11 +405,11 @@ CreateInitDecodingContext(const char *plugin,
         * without further interlock its return value might immediately be out of
         * date.
         *
-        * So we have to acquire the ProcArrayLock to prevent computation of new
-        * xmin horizons by other backends, get the safe decoding xid, and inform
-        * the slot machinery about the new limit. Once that's done the
-        * ProcArrayLock can be released as the slot machinery now is
-        * protecting against vacuum.
+        * So we have to acquire both the ReplicationSlotControlLock and the
+        * ProcArrayLock to prevent concurrent computation and update of new xmin
+        * horizons by other backends, get the safe decoding xid, and inform the
+        * slot machinery about the new limit. Once that's done both locks can be
+        * released as the slot machinery now is protecting against vacuum.
         *
         * Note that, temporarily, the data, not just the catalog, xmin has to be
         * reserved if a data snapshot is to be exported.  Otherwise the initial
@@ -422,6 +422,7 @@ CreateInitDecodingContext(const char *plugin,
         *
         * ----
         */
+       LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
        xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
@@ -436,6 +437,7 @@ CreateInitDecodingContext(const char *plugin,
        ReplicationSlotsComputeRequiredXmin(true);
 
        LWLockRelease(ProcArrayLock);
+       LWLockRelease(ReplicationSlotControlLock);
 
        ReplicationSlotMarkDirty();
        ReplicationSlotSave();
index 27e262ecbf22087b2bbe907b3244a1feca39825c..35874e6f1bfdbf5b8c3b1ae5a4892add4015ba28 100644 (file)
@@ -760,6 +760,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 
                reserve_wal_for_local_slot(remote_slot->restart_lsn);
 
+               LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
                LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
                xmin_horizon = GetOldestSafeDecodingTransactionId(true);
                SpinLockAcquire(&slot->mutex);
@@ -768,6 +769,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
                SpinLockRelease(&slot->mutex);
                ReplicationSlotsComputeRequiredXmin(true);
                LWLockRelease(ProcArrayLock);
+               LWLockRelease(ReplicationSlotControlLock);
 
                update_and_persist_local_synced_slot(remote_slot, remote_dbid);
 
index 33b7da21a94c53987e985ab7228fa9ea65d0d97b..4efbe85e5b67af9fadea7a8759e8011e7e73bf64 100644 (file)
@@ -1071,8 +1071,11 @@ ReplicationSlotPersist(void)
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  *
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and the
+ * ProcArrayLock have already been acquired exclusively. It is crucial that the
+ * caller first acquires the ReplicationSlotControlLock, followed by the
+ * ProcArrayLock, to prevent any undetectable deadlocks since this function
+ * acquires them in that order.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -1082,8 +1085,33 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
        TransactionId agg_catalog_xmin = InvalidTransactionId;
 
        Assert(ReplicationSlotCtl != NULL);
+       Assert(!already_locked ||
+                  (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+                       LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
 
-       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       /*
+        * Hold the ReplicationSlotControlLock until after updating the slot xmin
+        * values, so no backend updates the initial xmin for newly created slot
+        * concurrently. A shared lock is used here to minimize lock contention,
+        * especially when many slots exist and advancements occur frequently.
+        * This is safe since an exclusive lock is taken during initial slot xmin
+        * update in slot creation.
+        *
+        * One might think that we can hold the ProcArrayLock exclusively and
+        * update the slot xmin values, but it could increase lock contention on
+        * the ProcArrayLock, which is not great since this function can be called
+        * at non-negligible frequency.
+        *
+        * Concurrent invocation of this function may cause the computed slot xmin
+        * to regress. However, this is harmless because tuples prior to the most
+        * recent xmin are no longer useful once advancement occurs (see
+        * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
+        * before updating the effective_xmin). Thus, such regression merely
+        * prevents VACUUM from prematurely removing tuples without causing the
+        * early deletion of required data.
+        */
+       if (!already_locked)
+               LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
        for (i = 0; i < max_replication_slots; i++)
        {
@@ -1118,9 +1146,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                        agg_catalog_xmin = effective_catalog_xmin;
        }
 
-       LWLockRelease(ReplicationSlotControlLock);
-
        ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+       if (!already_locked)
+               LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*