]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Route attribute cache is now lockless on read / clone.
authorMaria Matejka <mq@ucw.cz>
Sat, 13 Nov 2021 21:13:51 +0000 (22:13 +0100)
committerMaria Matejka <mq@ucw.cz>
Mon, 22 Nov 2021 18:05:44 +0000 (19:05 +0100)
Lots of time was spent locking when accessing route attribute cache.
This overhead should be now reduced to a minimum.

nest/route.h
nest/rt-attr.c

index 3f8bf4330c26ab790a3e4f9f5a3b4d7c2fe9f20a..531e004be46bbce60dfe0fd70e62cd135fb70b7b 100644 (file)
@@ -606,8 +606,8 @@ struct rte_src {
 
 
 typedef struct rta {
-  struct rta *next, **pprev;           /* Hash chain */
-  u32 uc;                              /* Use count */
+  struct rta * _Atomic next, * _Atomic *pprev; /* Hash chain */
+  _Atomic u32 uc;                      /* Use count */
   u32 hash_key;                                /* Hash over important fields */
   struct ea_list *eattrs;              /* Extended Attribute chain */
   struct hostentry *hostentry;         /* Hostentry for recursive next-hops */
@@ -758,12 +758,6 @@ struct rte_owner {
   event *stop;
 };
 
-DEFINE_DOMAIN(attrs);
-extern DOMAIN(attrs) attrs_domain;
-
-#define RTA_LOCK       LOCK_DOMAIN(attrs, attrs_domain)
-#define RTA_UNLOCK     UNLOCK_DOMAIN(attrs, attrs_domain)
-
 #define RTE_SRC_PU_SHIFT      44
 #define RTE_SRC_IN_PROGRESS   (1ULL << RTE_SRC_PU_SHIFT)
 
@@ -879,20 +873,23 @@ static inline size_t rta_size(const rta *a) { return sizeof(rta) + sizeof(u32)*a
 #define RTA_MAX_SIZE (sizeof(rta) + sizeof(u32)*MPLS_MAX_LABEL_STACK)
 rta *rta_lookup(rta *);                        /* Get rta equivalent to this one, uc++ */
 static inline int rta_is_cached(rta *r) { return r->cached; }
+
 static inline rta *rta_clone(rta *r) {
-  RTA_LOCK;
-  r->uc++;
-  RTA_UNLOCK;
+  u32 uc = atomic_fetch_add_explicit(&r->uc, 1, memory_order_acq_rel);
+  ASSERT_DIE(uc > 0);
   return r;
 }
 
 void rta__free(rta *r);
 static inline void rta_free(rta *r) {
-  RTA_LOCK;
-  if (r && !--r->uc)
+  if (!r)
+    return;
+
+  u32 uc = atomic_fetch_sub_explicit(&r->uc, 1, memory_order_acq_rel);
+  if (uc == 1)
     rta__free(r);
-  RTA_UNLOCK;
 }
+
 rta *rta_do_cow(rta *o, linpool *lp);
 static inline rta * rta_cow(rta *r, linpool *lp) { return rta_is_cached(r) ? rta_do_cow(r, lp) : r; }
 static inline void rta_uncache(rta *r) { r->cached = 0; r->uc = 0; }
index cb66b65d77b4de47d7a80200006c840089c9d153..9a5498ed702b6de502655d9d739b4fcbb5494156 100644 (file)
@@ -54,6 +54,7 @@
 #include "lib/hash.h"
 #include "lib/idm.h"
 #include "lib/resource.h"
+#include "lib/rcu.h"
 #include "lib/string.h"
 
 #include <stddef.h>
@@ -85,8 +86,8 @@ const char * rta_dest_names[RTD_MAX] = {
   [RTD_PROHIBIT]       = "prohibited",
 };
 
-DOMAIN(attrs) attrs_domain;
-DOMAIN(attrs) src_domain;
+DEFINE_DOMAIN(attrs);
+static DOMAIN(attrs) src_domain;
 
 #define SRC_LOCK       LOCK_DOMAIN(attrs, src_domain)
 #define SRC_UNLOCK     UNLOCK_DOMAIN(attrs, src_domain)
@@ -1166,21 +1167,28 @@ ea_append(ea_list *to, ea_list *what)
  *     rta's
  */
 
-static uint rta_cache_count;
-static uint rta_cache_size = 32;
-static uint rta_cache_limit;
-static uint rta_cache_mask;
-static rta **rta_hash_table;
+static DOMAIN(attrs) attrs_domain;
 
-static void
-rta_alloc_hash(void)
+#define RTA_LOCK       LOCK_DOMAIN(attrs, attrs_domain)
+#define RTA_UNLOCK     UNLOCK_DOMAIN(attrs, attrs_domain)
+
+struct rta_cache {
+  u32 count;
+  u32 size;
+  u32 limit;
+  u32 mask;
+  rta * _Atomic table[0];
+} * _Atomic rta_cache;
+// rta_aux, rta_cache = { .size = ATOMIC_VAR_INIT(32), };
+
+static struct rta_cache *
+rta_alloc_hash(u32 size)
 {
-  rta_hash_table = mb_allocz(rta_pool, sizeof(rta *) * rta_cache_size);
-  if (rta_cache_size < 32768)
-    rta_cache_limit = rta_cache_size * 2;
-  else
-    rta_cache_limit = ~0;
-  rta_cache_mask = rta_cache_size - 1;
+  struct rta_cache *c = mb_allocz(rta_pool, sizeof(struct rta_cache) + sizeof(rta * _Atomic) * size);
+  c->size = size;
+  c->limit = (size >> 20) ? (~0U) : (size * 2);
+  c->mask = size - 1;
+  return c;
 }
 
 static inline uint
@@ -1234,34 +1242,88 @@ rta_copy(rta *o)
 }
 
 static inline void
-rta_insert(rta *r)
+rta_insert(rta *r, struct rta_cache *c)
 {
-  uint h = r->hash_key & rta_cache_mask;
-  r->next = rta_hash_table[h];
-  if (r->next)
-    r->next->pprev = &r->next;
-  r->pprev = &rta_hash_table[h];
-  rta_hash_table[h] = r;
+  uint h = r->hash_key & c->mask;
+  rta *next = atomic_load_explicit(&c->table[h], memory_order_relaxed);
+
+  atomic_store_explicit(&r->next, next, memory_order_relaxed);
+  r->pprev = &c->table[h];
+
+  if (next)
+    next->pprev = &r->next;
+
+  /* This store MUST be the last and MUST have release order for thread-safety */
+  atomic_store_explicit(&c->table[h], r, memory_order_release);
 }
 
 static void
-rta_rehash(void)
+rta_rehash(struct rta_cache *c)
 {
-  uint ohs = rta_cache_size;
-  uint h;
-  rta *r, *n;
-  rta **oht = rta_hash_table;
-
-  rta_cache_size = 2*rta_cache_size;
-  DBG("Rehashing rta cache from %d to %d entries.\n", ohs, rta_cache_size);
-  rta_alloc_hash();
-  for(h=0; h<ohs; h++)
-    for(r=oht[h]; r; r=n)
+  u32 os = c->size;
+
+  struct rta_cache *nc = rta_alloc_hash(os * 2);
+  nc->count = c->count;
+
+  /* First we simply copy every chain to both new locations */
+  for (u32 h = 0; h < os; h++)
+  {
+    rta *r = atomic_load_explicit(&c->table[h], memory_order_relaxed);
+    atomic_store_explicit(&nc->table[h], r, memory_order_relaxed);
+    atomic_store_explicit(&nc->table[h + os], r, memory_order_relaxed);
+  }
+
+  /* Then we exchange the hashes; release semantics forces the previous code to be already done */
+  atomic_store_explicit(&rta_cache, nc, memory_order_release);
+
+  /* And now we pass through both chains and filter them */
+  for (u32 h = 0; h < c->size; h++)
+  {
+    rta * _Atomic * ap = &nc->table[h];
+    rta * _Atomic * bp = &nc->table[h + os];
+
+    rta *r = atomic_load_explicit(ap, memory_order_relaxed);
+    ASSERT_DIE(r == atomic_load_explicit(bp, memory_order_relaxed));
+
+    while (r)
+    {
+      if (r->hash_key & os)
+      {
+       r->pprev = bp;
+       atomic_store_explicit(bp, r, memory_order_release);
+       bp = &r->next;
+      }
+      else
       {
-       n = r->next;
-       rta_insert(r);
+       r->pprev = ap;
+       atomic_store_explicit(ap, r, memory_order_release);
+       ap = &r->next;
       }
-  mb_free(oht);
+
+      r = atomic_load_explicit(&r->next, memory_order_acquire);
+    }
+
+    atomic_store_explicit(ap, NULL, memory_order_release);
+    atomic_store_explicit(bp, NULL, memory_order_release);
+  }
+
+  synchronize_rcu();
+  mb_free(c);
+}
+
+static rta *
+rta_find(rta *o, u32 h, struct rta_cache *c)
+{
+  rta *r = NULL;
+
+  for (r = atomic_load_explicit(&c->table[h & c->mask], memory_order_acquire); r; r = atomic_load_explicit(&r->next, memory_order_acquire))
+    if (r->hash_key == h && rta_same(r, o))
+    {
+      atomic_fetch_add_explicit(&r->uc, 1, memory_order_acq_rel);
+      return r;
+    }
+
+  return NULL;
 }
 
 /**
@@ -1289,24 +1351,34 @@ rta_lookup(rta *o)
 
   h = rta_hash(o);
 
+  /* Lockless lookup */
+  rcu_read_lock();
+  r = rta_find(o, h, atomic_load_explicit(&rta_cache, memory_order_acquire));
+  rcu_read_unlock();
+
+  if (r)
+    return r;
+
   RTA_LOCK;
 
-  for(r=rta_hash_table[h & rta_cache_mask]; r; r=r->next)
-    if (r->hash_key == h && rta_same(r, o))
-    {
-      r->uc++;
-      RTA_UNLOCK;
-      return r;
-    }
+  /* Locked lookup to avoid duplicates if possible */
+  struct rta_cache *c = atomic_load_explicit(&rta_cache, memory_order_acquire);
+  r = rta_find(o, h, c);
+  if (r)
+  {
+    RTA_UNLOCK;
+    return r;
+  }
 
+  /* Store the rta */
   r = rta_copy(o);
   r->hash_key = h;
   r->cached = 1;
   rt_lock_hostentry(r->hostentry);
-  rta_insert(r);
+  rta_insert(r, c);
 
-  if (++rta_cache_count > rta_cache_limit)
-    rta_rehash();
+  if (++c->count > c->limit)
+    rta_rehash(c);
 
   RTA_UNLOCK;
   return r;
@@ -1315,17 +1387,47 @@ rta_lookup(rta *o)
 void
 rta__free(rta *a)
 {
-  ASSERT(rta_cache_count && a->cached);
-  rta_cache_count--;
-  *a->pprev = a->next;
-  if (a->next)
-    a->next->pprev = a->pprev;
+  ASSERT(a->cached);
+
+  RTA_LOCK;
+  struct rta_cache *c = atomic_load_explicit(&rta_cache, memory_order_acquire);
+
+  if (atomic_load_explicit(&a->uc, memory_order_acquire))
+  {
+    /* Acquired inbetween */
+    RTA_UNLOCK;
+    return;
+  }
+
+  /* Relink the forward pointer */
+  rta *next = atomic_load_explicit(&a->next, memory_order_acquire);
+  atomic_store_explicit(a->pprev, next, memory_order_release);
+
+  /* Relink the backwards pointer */
+  if (next)
+    next->pprev = a->pprev;
+
+  /* Wait until nobody knows about us */
+  synchronize_rcu();
+
+  if (atomic_load_explicit(&a->uc, memory_order_acquire))
+  {
+    /* Acquired inbetween, relink back */
+    rta_insert(a, c);
+    RTA_UNLOCK;
+    return;
+  }
+
+  /* Cleared to free the memory */
   rt_unlock_hostentry(a->hostentry);
   if (a->nh.next)
     nexthop_free(a->nh.next);
   ea_free(a->eattrs);
   a->cached = 0;
+  c->count--;
   sl_free(rta_slab(a), a);
+
+  RTA_UNLOCK;
 }
 
 rta *
@@ -1394,9 +1496,13 @@ rta_dump_all(void)
 
   RTA_LOCK;
 
-  debug("Route attribute cache (%d entries, rehash at %d):\n", rta_cache_count, rta_cache_limit);
-  for(h=0; h<rta_cache_size; h++)
-    for(a=rta_hash_table[h]; a; a=a->next)
+  struct rta_cache *c = atomic_load_explicit(&rta_cache, memory_order_acquire);
+
+  debug("Route attribute cache (%d entries, rehash at %d):\n", c->count, c->limit);
+  for(h=0; h<c->size; h++)
+    for(a = atomic_load_explicit(&c->table[h], memory_order_acquire);
+       a;
+       a = atomic_load_explicit(&a->next, memory_order_acquire))
       {
        debug("%p ", a);
        rta_dump(a);
@@ -1440,7 +1546,7 @@ rta_init(void)
   nexthop_slab_[2] = sl_new(rta_pool, sizeof(struct nexthop) + sizeof(u32)*2);
   nexthop_slab_[3] = sl_new(rta_pool, sizeof(struct nexthop) + sizeof(u32)*MPLS_MAX_LABEL_STACK);
 
-  rta_alloc_hash();
+  atomic_store_explicit(&rta_cache, rta_alloc_hash(32), memory_order_relaxed);
   rte_src_init();
 }