]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Generalized the rte_src lockfree usecount algorithm
authorMaria Matejka <mq@ucw.cz>
Fri, 24 Nov 2023 07:46:50 +0000 (08:46 +0100)
committerMaria Matejka <mq@ucw.cz>
Mon, 4 Dec 2023 09:39:32 +0000 (10:39 +0100)
lib/bitmap.h
lib/lockfree.h [new file with mode: 0644]
lib/route.h
nest/rt-attr.c

index e3351ab193d9ad35eb0538ef6c7dad0e8a501a28..01bb65b66ed0db2dffa7c2333b8b6b7c1dbb85aa 100644 (file)
@@ -10,6 +10,8 @@
 #ifndef _BIRD_BITMAP_H_
 #define _BIRD_BITMAP_H_
 
+#include "lib/resource.h"
+
 struct bmap
 {
   u32 size;
diff --git a/lib/lockfree.h b/lib/lockfree.h
new file mode 100644 (file)
index 0000000..abb4267
--- /dev/null
@@ -0,0 +1,146 @@
+/*
+ *     BIRD Library -- Generic lock-free structures
+ *
+ *     (c) 2023       Maria Matejka <mq@jmq.cz>
+ *     (c) 2023       CZ.NIC, z.s.p.o.
+ *
+ *     Can be freely distributed and used under the terms of the GNU GPL.
+ */
+
+#ifndef _BIRD_LOCKFREE_H_
+#define _BIRD_LOCKFREE_H_
+
+#include "lib/event.h"
+#include "lib/rcu.h"
+
+#include <stdatomic.h>
+
+/**
+ * Lock-free usecounts.
+ */
+
+struct lfuc {
+  _Atomic u64 uc;
+};
+
+#define LFUC_PU_SHIFT      44
+#define LFUC_IN_PROGRESS   (1ULL << LFUC_PU_SHIFT)
+
+/**
+ * lfuc_lock - increase an atomic usecount
+ * @c: the usecount structure
+ */
+static inline void lfuc_lock(struct lfuc *c)
+{
+  /* Locking is trivial; somebody already holds the underlying data structure
+   * so we just increase the use count. Nothing can be freed underneath our hands. */
+  u64 uc = atomic_fetch_add_explicit(&c->uc, 1, memory_order_acq_rel);
+  ASSERT_DIE(uc > 0);
+}
+
+/**
+ * lfuc_lock_revive - increase an atomic usecount even if it's zero
+ * @c: the usecount structure
+ *
+ * If the caller is sure that they can't collide with the prune routine,
+ * they can call this even on structures with already zeroed usecount.
+ * Handy for situations with flapping routes. Use only from the same
+ * loop as which runs the prune routine.
+ */
+static inline void lfuc_lock_revive(struct lfuc *c)
+{
+  UNUSED u64 uc = atomic_fetch_add_explicit(&c->uc, 1, memory_order_acq_rel);
+}
+
+/**
+ * lfuc_unlock - decrease an atomic usecount
+ * @c: the usecount structure
+ * @el: prune event list
+ * @ev: prune event itself
+ *
+ * If the usecount reaches zero, a prune event is run to possibly free the object.
+ * The prune event MUST use lfuc_finished() to check the object state.
+ */
+static inline void lfuc_unlock(struct lfuc *c, event_list *el, event *ev)
+{
+  /* Unlocking is tricky. We do it lockless so at the same time, the prune
+   * event may be running, therefore if the unlock gets us to zero, it must be
+   * the last thing in this routine, otherwise the prune routine may find the
+   * source's usecount zeroed, freeing it prematurely.
+   *
+   * The usecount is split into two parts:
+   * the top 20 bits are an in-progress indicator
+   * the bottom 44 bits keep the actual usecount.
+   *
+   * Therefore at most 1 million of writers can simultaneously unlock the same
+   * structure, while at most ~17T different places can reference it. Both limits
+   * are insanely high from the 2022 point of view. Let's suppose that when 17T
+   * routes or 1M peers/tables get real, we get also 128bit atomic variables in the
+   * C norm. */
+
+  /* First, we push the in-progress indicator */
+  u64 uc = atomic_fetch_add_explicit(&c->uc, LFUC_IN_PROGRESS, memory_order_acq_rel);
+
+  /* Then we split the indicator to its parts. Remember, we got the value
+   * before the operation happened so we're re-doing the operation locally
+   * to get a view how the indicator _would_ look if nobody else was interacting.
+   */
+  u64 pending = (uc >> LFUC_PU_SHIFT) + 1;
+  uc &= LFUC_IN_PROGRESS - 1;
+
+  /* We per-use the RCU critical section indicator to make the prune event wait
+   * until we finish here in the rare case we get preempted. */
+  rcu_read_lock();
+
+  /* Obviously, there can't be more pending unlocks than the usecount itself */
+  if (uc == pending)
+    /* If we're the last unlocker (every owner is already unlocking), schedule
+     * the owner's prune event */
+    ev_send(el, ev);
+  else
+    ASSERT_DIE(uc > pending);
+
+  /* And now, finally, simultaneously pop the in-progress indicator and the
+   * usecount, possibly allowing the pruning routine to free this structure */
+  atomic_fetch_sub_explicit(&c->uc, LFUC_IN_PROGRESS + 1, memory_order_acq_rel);
+
+  /* ... and to reduce the load a bit, the pruning routine will better wait for
+   * RCU synchronization instead of a busy loop. */
+  rcu_read_unlock();
+}
+
+/**
+ * lfuc_finished - auxiliary routine for prune event
+ * @c: usecount structure
+ *
+ * This routine simply waits until all unlockers finish their job and leave
+ * the critical section of lfuc_unlock(). Then we decide whether the usecount
+ * is indeed zero or not, and therefore whether the structure is free to be freed.
+ */
+static inline _Bool
+lfuc_finished(struct lfuc *c)
+{
+  u64 uc;
+  /* Wait until all unlockers finish */
+  while ((uc = atomic_load_explicit(&c->uc, memory_order_acquire)) >> LFUC_PU_SHIFT)
+    synchronize_rcu();
+
+  /* All of them are now done and if the usecount is now zero, then we're
+   * the last place to reference the object and we can call it finished. */
+  return (uc == 0);
+}
+
+/**
+ * lfuc_init - auxiliary routine for usecount initialization
+ * @c: usecount structure
+ *
+ * Called on object initialization, sets the usecount to an initial one to make
+ * sure that the prune routine doesn't free it before somebody else references it.
+ */
+static inline void
+lfuc_init(struct lfuc *c)
+{
+  atomic_store_explicit(&c->uc, 1, memory_order_release);
+}
+
+#endif
index f7e9c7f7239661997c43946baffb347150945ef1..9570583b1f163ac336e99e0dd2946f5938cc726c 100644 (file)
@@ -16,6 +16,7 @@
 #include "lib/rcu.h"
 #include "lib/hash.h"
 #include "lib/event.h"
+#include "lib/lockfree.h"
 
 struct network;
 struct proto;
@@ -67,7 +68,7 @@ struct rte_src {
   struct rte_owner *owner;             /* Route source owner */
   u64 private_id;                      /* Private ID, assigned by the protocol */
   u32 global_id;                       /* Globally unique ID of the source */
-  _Atomic u64 uc;                      /* Use count */
+  struct lfuc uc;                      /* Use count */
 };
 
 struct rte_owner_class {
@@ -111,54 +112,12 @@ struct rte_src *rt_find_source_global(u32 id);
 
 static inline void rt_lock_source(struct rte_src *src)
 {
-  /* Locking a source is trivial; somebody already holds it so we just increase
-   * the use count. Nothing can be freed underneath our hands. */
-  u64 uc = atomic_fetch_add_explicit(&src->uc, 1, memory_order_acq_rel);
-  ASSERT_DIE(uc > 0);
+  lfuc_lock(&src->uc);
 }
 
 static inline void rt_unlock_source(struct rte_src *src)
 {
-  /* Unlocking is tricky. We do it lockless so at the same time, the prune
-   * event may be running, therefore if the unlock gets us to zero, it must be
-   * the last thing in this routine, otherwise the prune routine may find the
-   * source's usecount zeroed, freeing it prematurely.
-   *
-   * The usecount is split into two parts:
-   * the top 20 bits are an in-progress indicator
-   * the bottom 44 bits keep the actual usecount.
-   *
-   * Therefore at most 1 million of writers can simultaneously unlock the same
-   * source, while at most ~17T different routes can reference it. Both limits
-   * are insanely high from the 2022 point of view. Let's suppose that when 17T
-   * routes or 1M writers get real, we get also 128bit atomic variables in the
-   * C norm. */
-
-  /* First, we push the in-progress indicator */
-  u64 uc = atomic_fetch_add_explicit(&src->uc, RTE_SRC_IN_PROGRESS, memory_order_acq_rel);
-
-  /* Then we split the indicator to its parts. Remember, we got the value before the operation happened. */
-  u64 pending = (uc >> RTE_SRC_PU_SHIFT) + 1;
-  uc &= RTE_SRC_IN_PROGRESS - 1;
-
-  /* We per-use the RCU critical section indicator to make the prune event wait
-   * until we finish here in the rare case we get preempted. */
-  rcu_read_lock();
-
-  /* Obviously, there can't be more pending unlocks than the usecount itself */
-  if (uc == pending)
-    /* If we're the last unlocker, schedule the owner's prune event */
-    ev_send(src->owner->list, src->owner->prune);
-  else
-    ASSERT_DIE(uc > pending);
-
-  /* And now, finally, simultaneously pop the in-progress indicator and the
-   * usecount, possibly allowing the source pruning routine to free this structure */
-  atomic_fetch_sub_explicit(&src->uc, RTE_SRC_IN_PROGRESS + 1, memory_order_acq_rel);
-
-  /* ... and to reduce the load a bit, the source pruning routine will better wait for
-   * RCU synchronization instead of a busy loop. */
-  rcu_read_unlock();
+  lfuc_unlock(&src->uc, src->owner->list, src->owner->prune);
 }
 
 #ifdef RT_SOURCE_DEBUG
index 72f6487f06519d20a9a8aaa3005b43b335c0c5fa..481b432f24915c017b93eaa38e4c36ecdf4c00a7 100644 (file)
@@ -243,7 +243,7 @@ rt_get_source_o(struct rte_owner *p, u32 id)
 
   if (src)
   {
-    UNUSED u64 uc = atomic_fetch_add_explicit(&src->uc, 1, memory_order_acq_rel);
+    lfuc_lock_revive(&src->uc);
     return src;
   }
 
@@ -253,7 +253,7 @@ rt_get_source_o(struct rte_owner *p, u32 id)
   src->private_id = id;
   src->global_id = idm_alloc(&src_ids);
 
-  atomic_store_explicit(&src->uc, 1, memory_order_release);
+  lfuc_init(&src->uc);
   p->uc++;
 
   HASH_INSERT2(p->hash, RSH, rta_pool, src);
@@ -330,11 +330,7 @@ rt_prune_sources(void *data)
 
   HASH_WALK_FILTER(o->hash, next, src, sp)
   {
-    u64 uc;
-    while ((uc = atomic_load_explicit(&src->uc, memory_order_acquire)) >> RTE_SRC_PU_SHIFT)
-      synchronize_rcu();
-
-    if (uc == 0)
+    if (lfuc_finished(&src->uc))
     {
       o->uc--;