#include <linux/types.h>
#include <vdso/time64.h>
+#include <linux/percpu.h>
struct qspinlock;
typedef struct qspinlock rqspinlock_t;
*/
#define RES_DEF_TIMEOUT (NSEC_PER_SEC / 4)
+/*
+ * Choose 31 as it makes rqspinlock_held cacheline-aligned.
+ */
+#define RES_NR_HELD 31
+
+struct rqspinlock_held {
+ int cnt;
+ void *locks[RES_NR_HELD];
+};
+
+DECLARE_PER_CPU_ALIGNED(struct rqspinlock_held, rqspinlock_held_locks);
+
+static __always_inline void grab_held_lock_entry(void *lock)
+{
+ int cnt = this_cpu_inc_return(rqspinlock_held_locks.cnt);
+
+ if (unlikely(cnt > RES_NR_HELD)) {
+ /* Still keep the inc so we decrement later. */
+ return;
+ }
+
+ /*
+ * Implied compiler barrier in per-CPU operations; otherwise we can have
+ * the compiler reorder inc with write to table, allowing interrupts to
+ * overwrite and erase our write to the table (as on interrupt exit it
+ * will be reset to NULL).
+ *
+ * It is fine for cnt inc to be reordered wrt remote readers though,
+ * they won't observe our entry until the cnt update is visible, that's
+ * all.
+ */
+ this_cpu_write(rqspinlock_held_locks.locks[cnt - 1], lock);
+}
+
+/*
+ * We simply don't support out-of-order unlocks, and keep the logic simple here.
+ * The verifier prevents BPF programs from unlocking out-of-order, and the same
+ * holds for in-kernel users.
+ *
+ * It is possible to run into misdetection scenarios of AA deadlocks on the same
+ * CPU, and missed ABBA deadlocks on remote CPUs if this function pops entries
+ * out of order (due to lock A, lock B, unlock A, unlock B) pattern. The correct
+ * logic to preserve right entries in the table would be to walk the array of
+ * held locks and swap and clear out-of-order entries, but that's too
+ * complicated and we don't have a compelling use case for out of order unlocking.
+ */
+static __always_inline void release_held_lock_entry(void)
+{
+ struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
+
+ if (unlikely(rqh->cnt > RES_NR_HELD))
+ goto dec;
+ WRITE_ONCE(rqh->locks[rqh->cnt - 1], NULL);
+dec:
+ /*
+ * Reordering of clearing above with inc and its write in
+ * grab_held_lock_entry that came before us (in same acquisition
+ * attempt) is ok, we either see a valid entry or NULL when it's
+ * visible.
+ *
+ * But this helper is invoked when we unwind upon failing to acquire the
+ * lock. Unlike the unlock path which constitutes a release store after
+ * we clear the entry, we need to emit a write barrier here. Otherwise,
+ * we may have a situation as follows:
+ *
+ * <error> for lock B
+ * release_held_lock_entry
+ *
+ * try_cmpxchg_acquire for lock A
+ * grab_held_lock_entry
+ *
+ * Lack of any ordering means reordering may occur such that dec, inc
+ * are done before entry is overwritten. This permits a remote lock
+ * holder of lock B (which this CPU failed to acquire) to now observe it
+ * as being attempted on this CPU, and may lead to misdetection (if this
+ * CPU holds a lock it is attempting to acquire, leading to false ABBA
+ * diagnosis).
+ *
+ * In case of unlock, we will always do a release on the lock word after
+ * releasing the entry, ensuring that other CPUs cannot hold the lock
+ * (and make conclusions about deadlocks) until the entry has been
+ * cleared on the local CPU, preventing any anomalies. Reordering is
+ * still possible there, but a remote CPU cannot observe a lock in our
+ * table which it is already holding, since visibility entails our
+ * release store for the said lock has not retired.
+ *
+ * In theory we don't have a problem if the dec and WRITE_ONCE above get
+ * reordered with each other, we either notice an empty NULL entry on
+ * top (if dec succeeds WRITE_ONCE), or a potentially stale entry which
+ * cannot be observed (if dec precedes WRITE_ONCE).
+ *
+ * Emit the write barrier _before_ the dec, this permits dec-inc
+ * reordering but that is harmless as we'd have new entry set to NULL
+ * already, i.e. they cannot precede the NULL store above.
+ */
+ smp_wmb();
+ this_cpu_dec(rqspinlock_held_locks.cnt);
+}
+
#endif /* __ASM_GENERIC_RQSPINLOCK_H */
*/
#include "../locking/qspinlock.h"
#include "../locking/lock_events.h"
+#include "rqspinlock.h"
/*
* The basic principle of a queue-based spinlock can best be understood
struct rqspinlock_timeout {
u64 timeout_end;
u64 duration;
+ u64 cur;
u16 spin;
};
#define RES_TIMEOUT_VAL 2
-static noinline int check_timeout(struct rqspinlock_timeout *ts)
+DEFINE_PER_CPU_ALIGNED(struct rqspinlock_held, rqspinlock_held_locks);
+EXPORT_SYMBOL_GPL(rqspinlock_held_locks);
+
+static bool is_lock_released(rqspinlock_t *lock, u32 mask, struct rqspinlock_timeout *ts)
+{
+ if (!(atomic_read_acquire(&lock->val) & (mask)))
+ return true;
+ return false;
+}
+
+static noinline int check_deadlock_AA(rqspinlock_t *lock, u32 mask,
+ struct rqspinlock_timeout *ts)
+{
+ struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
+ int cnt = min(RES_NR_HELD, rqh->cnt);
+
+ /*
+ * Return an error if we hold the lock we are attempting to acquire.
+ * We'll iterate over max 32 locks; no need to do is_lock_released.
+ */
+ for (int i = 0; i < cnt - 1; i++) {
+ if (rqh->locks[i] == lock)
+ return -EDEADLK;
+ }
+ return 0;
+}
+
+/*
+ * This focuses on the most common case of ABBA deadlocks (or ABBA involving
+ * more locks, which reduce to ABBA). This is not exhaustive, and we rely on
+ * timeouts as the final line of defense.
+ */
+static noinline int check_deadlock_ABBA(rqspinlock_t *lock, u32 mask,
+ struct rqspinlock_timeout *ts)
+{
+ struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
+ int rqh_cnt = min(RES_NR_HELD, rqh->cnt);
+ void *remote_lock;
+ int cpu;
+
+ /*
+ * Find the CPU holding the lock that we want to acquire. If there is a
+ * deadlock scenario, we will read a stable set on the remote CPU and
+ * find the target. This would be a constant time operation instead of
+ * O(NR_CPUS) if we could determine the owning CPU from a lock value, but
+ * that requires increasing the size of the lock word.
+ */
+ for_each_possible_cpu(cpu) {
+ struct rqspinlock_held *rqh_cpu = per_cpu_ptr(&rqspinlock_held_locks, cpu);
+ int real_cnt = READ_ONCE(rqh_cpu->cnt);
+ int cnt = min(RES_NR_HELD, real_cnt);
+
+ /*
+ * Let's ensure to break out of this loop if the lock is available for
+ * us to potentially acquire.
+ */
+ if (is_lock_released(lock, mask, ts))
+ return 0;
+
+ /*
+ * Skip ourselves, and CPUs whose count is less than 2, as they need at
+ * least one held lock and one acquisition attempt (reflected as top
+ * most entry) to participate in an ABBA deadlock.
+ *
+ * If cnt is more than RES_NR_HELD, it means the current lock being
+ * acquired won't appear in the table, and other locks in the table are
+ * already held, so we can't determine ABBA.
+ */
+ if (cpu == smp_processor_id() || real_cnt < 2 || real_cnt > RES_NR_HELD)
+ continue;
+
+ /*
+ * Obtain the entry at the top, this corresponds to the lock the
+ * remote CPU is attempting to acquire in a deadlock situation,
+ * and would be one of the locks we hold on the current CPU.
+ */
+ remote_lock = READ_ONCE(rqh_cpu->locks[cnt - 1]);
+ /*
+ * If it is NULL, we've raced and cannot determine a deadlock
+ * conclusively, skip this CPU.
+ */
+ if (!remote_lock)
+ continue;
+ /*
+ * Find if the lock we're attempting to acquire is held by this CPU.
+ * Don't consider the topmost entry, as that must be the latest lock
+ * being held or acquired. For a deadlock, the target CPU must also
+ * attempt to acquire a lock we hold, so for this search only 'cnt - 1'
+ * entries are important.
+ */
+ for (int i = 0; i < cnt - 1; i++) {
+ if (READ_ONCE(rqh_cpu->locks[i]) != lock)
+ continue;
+ /*
+ * We found our lock as held on the remote CPU. Is the
+ * acquisition attempt on the remote CPU for a lock held
+ * by us? If so, we have a deadlock situation, and need
+ * to recover.
+ */
+ for (int i = 0; i < rqh_cnt - 1; i++) {
+ if (rqh->locks[i] == remote_lock)
+ return -EDEADLK;
+ }
+ /*
+ * Inconclusive; retry again later.
+ */
+ return 0;
+ }
+ }
+ return 0;
+}
+
+static noinline int check_deadlock(rqspinlock_t *lock, u32 mask,
+ struct rqspinlock_timeout *ts)
+{
+ int ret;
+
+ ret = check_deadlock_AA(lock, mask, ts);
+ if (ret)
+ return ret;
+ ret = check_deadlock_ABBA(lock, mask, ts);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+static noinline int check_timeout(rqspinlock_t *lock, u32 mask,
+ struct rqspinlock_timeout *ts)
{
u64 time = ktime_get_mono_fast_ns();
+ u64 prev = ts->cur;
if (!ts->timeout_end) {
+ ts->cur = time;
ts->timeout_end = time + ts->duration;
return 0;
}
if (time > ts->timeout_end)
return -ETIMEDOUT;
+ /*
+ * A millisecond interval passed from last time? Trigger deadlock
+ * checks.
+ */
+ if (prev + NSEC_PER_MSEC < time) {
+ ts->cur = time;
+ return check_deadlock(lock, mask, ts);
+ }
+
return 0;
}
* as the macro does internal amortization for us.
*/
#ifndef res_smp_cond_load_acquire
-#define RES_CHECK_TIMEOUT(ts, ret) \
- ({ \
- if (!(ts).spin++) \
- (ret) = check_timeout(&(ts)); \
- (ret); \
+#define RES_CHECK_TIMEOUT(ts, ret, mask) \
+ ({ \
+ if (!(ts).spin++) \
+ (ret) = check_timeout((lock), (mask), &(ts)); \
+ (ret); \
})
#else
-#define RES_CHECK_TIMEOUT(ts, ret, mask) \
+#define RES_CHECK_TIMEOUT(ts, ret, mask) \
({ (ret) = check_timeout(&(ts)); })
#endif
/*
* Initialize the 'spin' member.
+ * Set spin member to 0 to trigger AA/ABBA checks immediately.
*/
-#define RES_INIT_TIMEOUT(ts) ({ (ts).spin = 1; })
+#define RES_INIT_TIMEOUT(ts) ({ (ts).spin = 0; })
/*
* We only need to reset 'timeout_end', 'spin' will just wrap around as necessary.
*
* Return:
* * 0 - Lock was acquired successfully.
+ * * -EDEADLK - Lock acquisition failed because of AA/ABBA deadlock.
* * -ETIMEDOUT - Lock acquisition failed because of timeout.
*
* (queue tail, pending bit, lock value)
goto queue;
}
+ /*
+ * Grab an entry in the held locks array, to enable deadlock detection.
+ */
+ grab_held_lock_entry(lock);
+
/*
* We're pending, wait for the owner to go away.
*
*/
if (val & _Q_LOCKED_MASK) {
RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT);
- res_smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, ret));
+ res_smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, ret, _Q_LOCKED_MASK));
}
if (ret) {
*/
clear_pending(lock);
lockevent_inc(rqspinlock_lock_timeout);
- return ret;
+ goto err_release_entry;
}
/*
*/
queue:
lockevent_inc(lock_slowpath);
+ /*
+ * Grab deadlock detection entry for the queue path.
+ */
+ grab_held_lock_entry(lock);
+
node = this_cpu_ptr(&rqnodes[0].mcs);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);
lockevent_inc(lock_no_node);
RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT);
while (!queued_spin_trylock(lock)) {
- if (RES_CHECK_TIMEOUT(ts, ret)) {
+ if (RES_CHECK_TIMEOUT(ts, ret, ~0u)) {
lockevent_inc(rqspinlock_lock_timeout);
- break;
+ goto err_release_node;
}
cpu_relax();
}
*/
RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT * 2);
val = res_atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK) ||
- RES_CHECK_TIMEOUT(ts, ret));
+ RES_CHECK_TIMEOUT(ts, ret, _Q_LOCKED_PENDING_MASK));
waitq_timeout:
if (ret) {
WRITE_ONCE(next->locked, RES_TIMEOUT_VAL);
}
lockevent_inc(rqspinlock_lock_timeout);
- goto release;
+ goto err_release_node;
}
/*
*/
__this_cpu_dec(rqnodes[0].mcs.count);
return ret;
+err_release_node:
+ trace_contention_end(lock, ret);
+ __this_cpu_dec(rqnodes[0].mcs.count);
+err_release_entry:
+ release_held_lock_entry();
+ return ret;
}
EXPORT_SYMBOL_GPL(resilient_queued_spin_lock_slowpath);