* without further interlock its return value might immediately be out of
* date.
*
- * So we have to acquire the ProcArrayLock to prevent computation of new
- * xmin horizons by other backends, get the safe decoding xid, and inform
- * the slot machinery about the new limit. Once that's done the
- * ProcArrayLock can be released as the slot machinery now is
- * protecting against vacuum.
+ * So we have to acquire both the ReplicationSlotControlLock and the
+ * ProcArrayLock to prevent concurrent computation and update of new xmin
+ * horizons by other backends, get the safe decoding xid, and inform the
+ * slot machinery about the new limit. Once that's done both locks can be
+ * released as the slot machinery now is protecting against vacuum.
*
* Note that, temporarily, the data, not just the catalog, xmin has to be
* reserved if a data snapshot is to be exported. Otherwise the initial
*
* ----
*/
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
+ LWLockRelease(ReplicationSlotControlLock);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
/*
* Compute the oldest xmin across all slots and store it in the ProcArray.
*
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and the
+ * ProcArrayLock have already been acquired exclusively. It is crucial that the
+ * caller first acquires the ReplicationSlotControlLock, followed by the
+ * ProcArrayLock, to prevent any undetectable deadlocks since this function
+ * acquires them in that order.
*/
void
ReplicationSlotsComputeRequiredXmin(bool already_locked)
TransactionId agg_catalog_xmin = InvalidTransactionId;
Assert(ReplicationSlotCtl != NULL);
+ Assert(!already_locked ||
+ (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+ LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ /*
+ * Hold the ReplicationSlotControlLock until after updating the slot xmin
+ * values, so no backend updates the initial xmin for newly created slot
+ * concurrently. A shared lock is used here to minimize lock contention,
+ * especially when many slots exist and advancements occur frequently.
+ * This is safe since an exclusive lock is taken during initial slot xmin
+ * update in slot creation.
+ *
+ * One might think that we can hold the ProcArrayLock exclusively and
+ * update the slot xmin values, but it could increase lock contention on
+ * the ProcArrayLock, which is not great since this function can be called
+ * at non-negligible frequency.
+ *
+ * Concurrent invocation of this function may cause the computed slot xmin
+ * to regress. However, this is harmless because tuples prior to the most
+ * recent xmin are no longer useful once advancement occurs (see
+ * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
+ * before updating the effective_xmin). Thus, such regression merely
+ * prevents VACUUM from prematurely removing tuples without causing the
+ * early deletion of required data.
+ */
+ if (!already_locked)
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
agg_catalog_xmin = effective_catalog_xmin;
}
- LWLockRelease(ReplicationSlotControlLock);
-
ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+ if (!already_locked)
+ LWLockRelease(ReplicationSlotControlLock);
}
/*