]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Unlocking the global attribute cache
authorKaterina Kubecova <katerina.kubecova@nic.cz>
Fri, 31 Jan 2025 10:13:26 +0000 (11:13 +0100)
committerMaria Matejka <mq@ucw.cz>
Wed, 19 Feb 2025 12:52:16 +0000 (13:52 +0100)
Now, when we have the unlocked slab, we can also go for a completely
lockless global attribute cache, which has been the major contention
point in BIRD 3.

This is a complicated data structure, basically a lockless hash table
with use-counted items inside, allowing concurrent insertions and
deletions.

Hash tables have to dynamically resize its base array, and the rehash
routine runs from an event if needed.

Also, due to some funny meta-race conditions, it is now possible to
end up with multiple ea_list_storage instances with the same contents,
yet it looks like it happens very rarely. (Prove us wrong, we dare you!)

lib/route.h
nest/rt-attr.c

index 6ad4a7c7d94b9d6ef6b579f866fa6c3c20cb04bd..bf38a78a0f20ecbe41e2508ac339ed09cf8851bf 100644 (file)
@@ -253,7 +253,7 @@ enum ea_stored {
 };
 
 struct ea_storage {
-  struct ea_storage *next_hash;                /* Next in hash chain */
+  struct ea_storage *_Atomic next_hash;        /* Next in hash chain */
   _Atomic u64 uc;                      /* Use count */
   u32 hash_key;                                /* List hash */
   PADDING(unused, 0, 4);               /* Sorry, we need u64 for the usecount */
@@ -573,7 +573,7 @@ static inline struct ea_storage *ea_get_storage(ea_list *r)
 
 static inline ea_list *ea_ref(ea_list *r)
 {
-  ASSERT_DIE(0 < atomic_fetch_add_explicit(&ea_get_storage(r)->uc, 1, memory_order_acq_rel));
+  ASSERT_DIE(0 < atomic_fetch_add_explicit(&ea_get_storage(r)->uc, 1, memory_order_acq_rel)); //TODO Is this ok? Are we sure no one will decrease the count?
   return r;
 }
 
@@ -586,6 +586,12 @@ static inline ea_list *ea_lookup(ea_list *r, u32 squash_upto, enum ea_stored oid
     return ea_lookup_slow(r, squash_upto, oid);
 }
 
+struct ea_finally_free_deferred_call {
+  struct deferred_call dc;
+  struct rcu_stored_phase phase;
+  struct ea_storage *attrs;
+};
+
 struct ea_free_deferred {
   struct deferred_call dc;
   ea_list *attrs;
index 249bfae48f350505fa3c6f26bd852f199eadcb79..2534f3962cc0817332cb61364bb893a3b0367ef7 100644 (file)
@@ -226,6 +226,8 @@ static struct idm src_ids;
 static struct rte_src * _Atomic * _Atomic rte_src_global;
 static _Atomic uint rte_src_global_max;
 
+void ea_rehash(void*);
+
 static void
 rte_src_init(struct event_list *ev)
 {
@@ -1507,188 +1509,531 @@ ea_append(ea_list *to, ea_list *what)
   return res;
 }
 
-/*
- *     rta's
- */
-
-static SPINHASH(struct ea_storage) rta_hash_table;
-
-#define RTAH_KEY(a)            a->l, a->hash_key
-#define RTAH_NEXT(a)           a->next_hash
-#define RTAH_EQ(a, b)          ((a)->hash_key == (b)->hash_key) && (ea_same((a), (b)))
-#define RTAH_FN(a, h)          h
-#define RTAH_ORDER             12
-
-#define RTAH_REHASH            rta_rehash
-#define RTAH_PARAMS            /8, *2, 2, 2, 12, 28
 
-static void RTAH_REHASH(void *_ UNUSED) {
-  int step;
-
-  RTA_LOCK;
-  SPINHASH_REHASH_PREPARE(&rta_hash_table, RTAH, struct ea_storage, step);
-  RTA_UNLOCK;
+struct ea_stor_array {
+  struct ea_storage *_Atomic *eas;
+  _Atomic uint order;
+};
 
-  if (!step)   return;
+struct hash_head {
+  struct ea_stor_array *_Atomic cur;
+  struct ea_stor_array esa1;
+  struct ea_stor_array esa2;
+  _Atomic uint count;
+  pool *pool;
+  struct event_list *ev_list;
+  event rehash_event;
+  event rehash;
+};
 
-  if (step > 0) SPINHASH_REHASH_UP  (&rta_hash_table, RTAH, struct ea_storage,  step);
-  if (step < 0) SPINHASH_REHASH_DOWN(&rta_hash_table, RTAH, struct ea_storage, -step);
+static struct hash_head rta_hash_table;
 
-  RTA_LOCK;
-  SPINHASH_REHASH_FINISH(&rta_hash_table, RTAH);
-  RTA_UNLOCK;
+static void
+ea_increment_table_count(uint order)
+{
+  int count = atomic_fetch_add_explicit(&rta_hash_table.count, 1, memory_order_relaxed);
+  if (count > 1 << (order +1))
+    ev_send(rta_hash_table.ev_list, &rta_hash_table.rehash_event);
 }
 
-static ea_list *
-ea_lookup_existing(ea_list *o, u32 squash_upto, uint h)
+static struct ea_storage *
+ea_walk_chain_for_storage(struct ea_storage *eap_first_next, ea_list *o, u32 squash_upto, uint h)
 {
-  struct ea_storage *r = NULL;
-
-  SPINHASH_BEGIN_CHAIN(rta_hash_table, RTAH, read, eap, o, h);
-  for (struct ea_storage *ea; ea = *eap; eap = &RTAH_NEXT(ea))
+  for (struct ea_storage *eap = eap_first_next; eap;
+      eap = atomic_load_explicit(&eap->next_hash, memory_order_acquire))
+  {
     if (
-       (h == ea->hash_key) &&
-       ea_same(o, ea->l) &&
-       BIT32_TEST(&squash_upto, ea->l->stored) &&
-       (!r || r->l->stored > ea->l->stored))
-      r = ea;
-  if (r)
-    atomic_fetch_add_explicit(&r->uc, 1, memory_order_acq_rel);
-  SPINHASH_END_CHAIN(read);
-
-  return r ? r->l : NULL;
+      (h == eap->hash_key) && ea_same(o, eap->l) &&
+          BIT32_TEST(&squash_upto, eap->l->stored))
+    return eap;
+  }
+  return NULL;
 }
 
-
-/**
- * rta_lookup - look up a &rta in attribute cache
- * @o: a un-cached &rta
- *
- * rta_lookup() gets an un-cached &rta structure and returns its cached
- * counterpart. It starts with examining the attribute cache to see whether
- * there exists a matching entry. If such an entry exists, it's returned and
- * its use count is incremented, else a new entry is created with use count
- * set to 1.
- *
- * The extended attribute lists attached to the &rta are automatically
- * converted to the normalized form.
- */
 ea_list *
 ea_lookup_slow(ea_list *o, u32 squash_upto, enum ea_stored oid)
 {
-  uint h;
-
   ASSERT(o->stored != oid);
   ASSERT(oid);
   o = ea_normalize(o, squash_upto);
-  h = ea_hash(o);
+  uint h = ea_hash(o);
 
   squash_upto |= BIT32_VAL(oid);
 
-  struct ea_list *rr = ea_lookup_existing(o, squash_upto, h);
-  if (rr) return rr;
-
-  RTA_LOCK;
-  if (rr = ea_lookup_existing(o, squash_upto, oid))
-  {
-    RTA_UNLOCK;
-    return rr;
-  }
-
-  struct ea_storage *r = NULL;
+  /* We are about to go to critical section. Allocate the storage now.
+   * If it is not used, we will free it later. */
+  struct ea_storage *r_new = NULL;
   uint elen = ea_list_size(o);
   uint sz = elen + sizeof(struct ea_storage);
-  for (uint i=0; i<ARRAY_SIZE(ea_slab_sizes); i++)
+  for (uint i = 0; i < ARRAY_SIZE(ea_slab_sizes); i++)
     if (sz <= ea_slab_sizes[i])
     {
-      r = sl_alloc(ea_slab[i]);
+      r_new = sl_alloc(ea_slab[i]);
       break;
     }
 
-  int huge = r ? 0 : EALF_HUGE;;
+  int huge = r_new ? 0 : EALF_HUGE;
+
   if (huge)
-    r = mb_alloc(rta_pool, sz);
+  {
+    RTA_LOCK;
+    r_new = mb_alloc(rta_pool, sz);
+    RTA_UNLOCK;
+  }
 
-  ea_list_copy(r->l, o, elen);
-  ea_list_ref(r->l);
+  ea_list_copy(r_new->l, o, elen);
 
-  r->l->flags |= huge;
-  r->l->stored = oid;
-  r->hash_key = h;
-  atomic_store_explicit(&r->uc, 1, memory_order_release);
+  r_new->l->flags |= huge;
+  r_new->l->stored = oid;
+  r_new->hash_key = h;
+  atomic_store_explicit(&r_new->uc, 1, memory_order_relaxed);
 
-  SPINHASH_INSERT(rta_hash_table, RTAH, r);
+  /* The storage is ready to be added if needed. */
 
-  RTA_UNLOCK;
-  return r->l;
-}
+  struct ea_storage *r_found = NULL;
 
-#define EA_UC_SUB   (1ULL << 44)
-#define EA_UC_IN_PROGRESS(x)   ((x) >> 44)
-#define EA_UC_ASSIGNED(x)      ((x) & (EA_UC_SUB-1))
+  lookup_loop:
+  rcu_read_lock();
+    struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_acquire);
+    struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; //maybe it would be more effective if we load it later
+    uint cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed);
+    uint in = h >> (32 - cur_order);
 
-#define EA_UC_DONE(x)          (EA_UC_ASSIGNED(x) == EA_UC_IN_PROGRESS(x))
+    struct ea_storage *eap_first;
+    struct ea_storage *eap_first_next;
 
-void
-ea_free_deferred(struct deferred_call *dc)
-{
-  struct ea_storage *r = ea_get_storage(SKIP_BACK(struct ea_free_deferred, dc, dc)->attrs);
+    /* Actualy search for the ea_storage - maybe we already have it */
+
+    if (cur_order)
+    {
+      eap_first = atomic_load_explicit(&cur->eas[in], memory_order_acquire);
+
+      r_found = ea_walk_chain_for_storage(eap_first, o, squash_upto, h);
+    }
+    /* Maybe rehashing is running right now. Lets check it. */
+    uint next_order = atomic_load_explicit(&next->order, memory_order_relaxed);
+    uint next_in = h >> (32 - next_order);
+
+    if (r_found == NULL && next_order)
+    {
+      eap_first_next = atomic_load_explicit(&next->eas[next_in], memory_order_acquire);
+
+      r_found = ea_walk_chain_for_storage(eap_first_next, o, squash_upto, h);
+    }
+
+    if (r_found)
+    {
+      /* We found out we already have a suitable ea_storage. Lets increment its use count */
+      u64 uc = atomic_load_explicit(&r_found->uc, memory_order_relaxed);
+      do
+      {
+        if (uc == 0)
+        {
+          /* The found storage has zero use count. In that case, we are not longer allowed to increase it. */
+          rcu_read_unlock();
+          goto lookup_loop;
+        }
+      } while (!atomic_compare_exchange_strong_explicit(
+          &r_found->uc, &uc, uc + 1,
+          memory_order_acq_rel, memory_order_acquire));
+      /* we succesfully increased count, ea_storrage is ours */
+      /* free ea_storage we allocated earlier */
+      if (huge)
+      {
+        RTA_LOCK;
+        mb_free(r_new);
+        RTA_UNLOCK;
+      } else
+        sl_free(r_new);
+
+      rcu_read_unlock();
+      return r_found->l;
+    }
 
-  /* Indicate free intent */
-  u64 prev = atomic_fetch_add_explicit(&r->uc, EA_UC_SUB, memory_order_acq_rel);
-  prev |= EA_UC_SUB;
+    /* suitable ea_storage not found, we need to add it */
+    if (next_order)
+    {
+      /* Rehash is running, so we put the new storage to the new array */
+      atomic_store_explicit(&r_new->next_hash, eap_first_next, memory_order_release);
+      if (!atomic_compare_exchange_strong_explicit(
+              &next->eas[next_in], &eap_first_next, r_new,
+              memory_order_acq_rel, memory_order_acquire))
+      {
+        /* Someone was quicker and added something. Maybe added the storage we are about to add, lets check out. */
+        rcu_read_unlock();
+        goto lookup_loop;
+      }
+    } else
+    {
+      atomic_store_explicit(&r_new->next_hash, eap_first, memory_order_release);
+      if (!atomic_compare_exchange_strong_explicit(
+              &cur->eas[in], &eap_first, r_new,
+              memory_order_acq_rel, memory_order_acquire))
+      {
+        /* Someone was quicker and added something. Maybe added the storage we are about to add, lets check out. */
+        rcu_read_unlock();
+        goto lookup_loop;
+      }
+    }
 
-  if (!EA_UC_DONE(prev))
+  /* ea_storrage succesfully added */
+  rcu_read_unlock();
+
+  /* Increase the counter of stored ea_storages and check if we need rehash */
+  ea_increment_table_count(cur_order);
+  ea_list_ref(r_new->l);
+  return r_new->l;
+}
+
+
+static void
+ea_finally_free(struct deferred_call *dc)
+{
+  /* Free an ea_storrage in defer call */
+  SKIP_BACK_DECLARE(struct ea_finally_free_deferred_call, eafdc, dc, dc);
+
+  if (!rcu_end_sync(eafdc->phase))
   {
-    /* The easy case: somebody else holds the usecount */
-    ASSERT_DIE(EA_UC_IN_PROGRESS(prev) < EA_UC_ASSIGNED(prev));
-    atomic_fetch_sub_explicit(&r->uc, EA_UC_SUB + 1, memory_order_acq_rel);
+    /* Somebody may still have the pointer of the storage, retry later */
+    defer_call(dc, sizeof *eafdc);
     return;
   }
 
-  /* Wait for others to finish */
-  while (EA_UC_DONE(prev) && (EA_UC_IN_PROGRESS(prev) > 1))
-    prev = atomic_load_explicit(&r->uc, memory_order_acquire);
+  struct ea_storage *r = eafdc->attrs;
+  ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_relaxed) == 0);
+  ea_list_unref(r->l);
 
-  if (!EA_UC_DONE(prev))
+  if (r->l->flags & EALF_HUGE)
   {
-    /* Somebody else has found us inbetween, no need to free */
-    ASSERT_DIE(EA_UC_IN_PROGRESS(prev) < EA_UC_ASSIGNED(prev));
-    atomic_fetch_sub_explicit(&r->uc, EA_UC_SUB + 1, memory_order_acq_rel);
-    return;
+    RTA_LOCK;
+    mb_free(r);
+    RTA_UNLOCK;
   }
+  else
+    sl_free(r);
+}
 
-  /* We are the last one to free this ea */
-  RTA_LOCK;
+static struct ea_storage *
+ea_free_prepare(struct ea_stor_array *esa, struct ea_storage *r, uint order, bool *success)
+{
+  /* Removes r from esa, returns NULL if nothing else needed, ea_storage if we need to remove the ea_storage.
+   * The use counter of this storage is already zero.
+   * (It might be the given storage if we were unsuccesful, or the storage previously pointing to it.) */
+
+  /* What are the golden rules here:
+   *   1) anyone can ADD ea_storage any time, BUT ONLY AS THE HEAD OF THE CHAIN - newer to the end, never anywhere in between
+   *   2) anyone can REMOVE any of the ea_storage, BUT ONLY IF HE SUCCEDED TO INCREASE THE USECOUNT OF THE PREVIOUS EA_STORAGE,
+   *      OR if it is the FIRST STORAGE in the chain
+   *   3) nobody can increase an already zero usecount.
+   *   4) and, obviously, if we increase an usecount, we have to make sure we will decrease it.
+   *   5) no reorder, only add or remove
+   */
+  uint in = r->hash_key >> (32 - order);
+  struct ea_storage *eap = atomic_load_explicit(&esa->eas[in], memory_order_relaxed);
+  struct ea_storage *ea_old = NULL;
+
+  for (; eap; eap = atomic_load_explicit(&eap->next_hash, memory_order_acquire))
+  {
+    if (eap == r)
+    {
+      /* We found the ea_storage, lets remove it */
+      struct ea_storage *ea_next = atomic_load_explicit(&r->next_hash, memory_order_acquire);
 
-  /* Trying to remove the ea from the hash */
-  SPINHASH_REMOVE(rta_hash_table, RTAH, r);
+      if (ea_old == NULL)
+      {
+        /* It is the first storage in chain */
+        if (!atomic_compare_exchange_strong_explicit(&esa->eas[in], &r, ea_next,
+            memory_order_acq_rel, memory_order_acquire))
+        {
+          success[0] = false;
+          return r; /* Not success, somebody else added a storage. Lets try again.*/
+        }
+        success[0] = true;
+        return NULL;
+      } else
+      {
+        u64 uc_prev;
+        do
+        {
+          /* Try to increase use count of the previous ea_storage (rule 2) */
+          uc_prev = atomic_load_explicit(&ea_old->uc, memory_order_acquire);
+
+          if (uc_prev == 0)
+          {
+            success[0] = false;
+            return r;
+          }
+        } while (!atomic_compare_exchange_strong_explicit(&ea_old->uc, &uc_prev, uc_prev +1, memory_order_acq_rel, memory_order_acquire));
+
+        /* remove eap */
+        atomic_store_explicit(&ea_old->next_hash, ea_next, memory_order_release);
+        /* decrease increased use count of the previous storage */
+        uc_prev = atomic_fetch_sub_explicit(&ea_old->uc, 1, memory_order_release);
+        if (uc_prev == 0)
+        {
+          /* This was the last reference, we ned to remove the previous storage as well. */
+          success[0] = true;
+          return ea_old;
+        }
+        success[0] = true;
+        return NULL;
+      }
+    }
+    ea_old = eap;
+  }
+  success[0] = false;
+  return NULL;
+}
 
-  /* Somebody has cloned this rta inbetween. This sometimes happens. */
-  prev = atomic_load_explicit(&r->uc, memory_order_acquire);
-  if (!EA_UC_DONE(prev) || (EA_UC_IN_PROGRESS(prev) > 1))
+static void
+ea_storage_free(struct ea_storage *r)
+{
+  u64 uc = atomic_fetch_sub_explicit(&r->uc, 1, memory_order_acq_rel);
+
+  if (uc != 1)
   {
-    /* Reinsert the ea and go away */
-    SPINHASH_INSERT(rta_hash_table, RTAH, r);
-    atomic_fetch_sub_explicit(&r->uc, EA_UC_SUB + 1, memory_order_acq_rel);
-    RTA_UNLOCK;
+    /* Someone else has a reference to this ea_storage. We can just decrease use count. */
+    ASSERT_DIE(uc > 0); /* Check this is not a double free. */
     return;
   }
 
-  /* Fine, now everybody is actually done */
-  ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_acquire) == EA_UC_SUB + 1);
+  do
+  {
+    rcu_read_lock();
+      /* Find the storage in one of the stor arrays and remove it, so nobody will found it again */
+      struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_acquire);
+      struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1;
 
-  /* And now we can free the object, finally */
-  ea_list_unref(r->l);
-  if (r->l->flags & EALF_HUGE)
-    mb_free(r);
-  else
-    sl_free(r);
+      bool success[1]; // two return values needed and creating new structure makes no sence
+      struct ea_storage *next_to_free;
+
+      uint cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed);
+
+      if (cur_order)
+        /* search in old array */
+        next_to_free = ea_free_prepare(cur, r, cur_order, success);
+
+      uint next_order = atomic_load_explicit(&next->order, memory_order_relaxed);
+      ASSERT_DIE(cur_order || next_order);
+
+      if (next_order && (!success[0] && !next_to_free))
+        /* search in new array */
+        next_to_free = ea_free_prepare(next, r, next_order, success);
+
+    rcu_read_unlock();
 
+    if (success[0])
+    {
+      /* Consider if rehash is needed */
+      int count = atomic_fetch_sub_explicit(&rta_hash_table.count, 1, memory_order_relaxed);
+
+      u64 order = atomic_load_explicit(&cur->order, memory_order_relaxed);
+      if (count < 1 << (order - 1) && order > 28)
+        ev_send(rta_hash_table.ev_list, &rta_hash_table.rehash_event);
+
+      /* Schedule actual free of the storage */
+      struct ea_finally_free_deferred_call eafdc = {
+       .dc.hook = ea_finally_free,
+       .phase = rcu_begin_sync(), /* Asynchronous wait for RCU */
+       .attrs = r,
+      };
+
+      defer_call(&eafdc.dc, sizeof(eafdc));
+    } else
+      ASSERT_DIE(next_to_free);
+    r = next_to_free;
+  } while (r);
+}
+
+
+void
+ea_free_deferred(struct deferred_call *dc)
+{
+  struct ea_storage *r = ea_get_storage(SKIP_BACK(struct ea_free_deferred, dc, dc)->attrs);
+  ea_storage_free(r);
+}
+
+void
+ea_rehash(void * u UNUSED)
+{
+  struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed);
+  struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1;
+  u32 cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed);
+  ASSERT_DIE(atomic_load_explicit(&next->order, memory_order_relaxed) == 0);
+
+  /* count new order */
+  int count = atomic_fetch_add_explicit(&rta_hash_table.count, 1, memory_order_relaxed);
+  u32 next_order = cur_order;
+
+  while (count > 1 << (next_order + 1))
+    next_order++;
+  while (count < 1 << (next_order - 1) && next_order > 28)
+    next_order--;
+
+  if (next_order == cur_order)
+    return;
+
+  /* Prepare new array */
+  if (atomic_load_explicit(&next->order, memory_order_relaxed))
+    bug("Last rehash did has not ended yet or ended badly.");
+
+  ASSERT_DIE(next->eas == NULL);
+
+  RTA_LOCK;
+  struct ea_storage *_Atomic * new_array =
+      mb_allocz(rta_hash_table.pool, sizeof(struct ea_storage *_Atomic) * (1 << next_order));
   RTA_UNLOCK;
+
+  next->eas = new_array;
+  atomic_store_explicit(&next->order, next_order, memory_order_release);
+
+  synchronize_rcu(); /* We need all threads working with ea_storages to know there is new array */
+
+  /* Move ea_storages from old array to new. */
+  /* Lookup is addind new ea_storages to new array, but we might collide with deleting */
+  /* We need to follow rules of working with ea_storage arrays:
+   *   1) anyone can ADD ea_storage any time, BUT ONLY AS THE HEAD OF THE CHAIN - newer to the end, never anywhere in between
+   *   2) anyone can REMOVE any of the ea_storage, BUT ONLY IF HE SUCCEDED TO INCREASE THE USECOUNT OF THE PREVIOUS EA_STORAGE,
+   *      OR if it is the FIRST STORAGE in the chain
+   *   3) nobody can increase an already zero usecount.
+   *   4) and, obviously, if we increase an usecount, we have to make sure we will decrease it.
+   *   5) no reorder, only add or remove
+  */
+  for (int i = 0; i < (1 << cur_order); i++)
+  {
+    rcu_read_lock();
+      uint num_stor = 0;
+      struct ea_storage *eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire);
+      struct ea_storage *ea_index = eas_first;
+
+      if (!ea_index)
+      {
+        rcu_read_unlock();
+        continue;
+      }
+
+      u64 uc;
+      do
+      {
+        /* according to rule 2), we can remove all */
+        uc =  atomic_load_explicit(&ea_index->uc, memory_order_acquire);
+        bool succ = false;
+        do
+        {
+          if (uc && atomic_compare_exchange_strong_explicit(
+              &ea_index->uc, &uc, uc + 1,
+              memory_order_acq_rel, memory_order_acquire))
+          {
+            num_stor++;
+            succ = true;
+          } /* no need to care about those with use count on zero. Their next_hash pointers are ok and we can skip them again. */
+        } while (!succ && uc > 0);
+
+      } while (ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire));
+    rcu_read_unlock();
+
+    /* now nobody can do add or delete from our chain */
+    /* because each storage has to be possible to find at any time,
+     * we have to rehash them backwards. We put them to local array first.
+     */
+    struct ea_storage *local[num_stor];
+    rcu_read_lock();
+      ea_index = eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire);;
+      uint l_i = 0;
+      do
+      {
+        uc =  atomic_load_explicit(&ea_index->uc, memory_order_acquire);
+        if (uc)
+        {
+          local[l_i] = ea_index;
+          l_i++;
+        }
+      } while (ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire));
+    rcu_read_unlock();
+
+    ASSERT_DIE(l_i == num_stor);
+    /* and now we can finaly move the storages to new destination */
+    for (int i = l_i - 1; i>=0; i--)
+    {
+      struct ea_storage *ea = local[i];
+      uint h_next = ea->hash_key >> (32 - next_order);
+      struct ea_storage *next_first;
+      do
+      {
+        next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire);
+        atomic_store_explicit(&ea->next_hash, next_first, memory_order_release);
+      } while (!atomic_compare_exchange_strong_explicit(
+          &next->eas[h_next], &next_first, ea,
+          memory_order_acq_rel, memory_order_acquire));
+      /* we increased use count to all storages in local array. For this storage, it is no more needed. */
+      ea_storage_free(ea);
+    }
+  }
+
+  /* We are going to get rid of the old array, so we have to be sure nobody will touch it.
+   * That is why we first set the order of old field to zero (nobody will touch zero-length array)
+   * and then call synchronize_rcu (everybody will know old array is zero-length) */
+  atomic_store_explicit(&cur->order, 0, memory_order_release);
+  synchronize_rcu();
+
+  RTA_LOCK;
+  mb_free(cur->eas);
+  RTA_UNLOCK;
+  cur->eas = NULL;
+
+  /* Switch the cur pointer to the new array. From now, the new array is the currently used array. */
+  atomic_store_explicit(&rta_hash_table.cur, next, memory_order_relaxed);
+
+  struct ea_stor_array *cur_end = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed);
+  struct ea_stor_array *next_end = (cur_end == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1;
+  ASSERT_DIE(atomic_load_explicit(&next_end->order, memory_order_relaxed) == 0);
+  ASSERT_DIE(atomic_load_explicit(&cur_end->order, memory_order_relaxed) != 0);
+
+  synchronize_rcu(); /* To make sure the next rehash can not start before this one is fully accepted. */
+
+#if 0
+  // this is for debug - shows full state of current ea_stor_array
+  for (int j = 0; j < 1<< atomic_load_explicit(&cur_end->order, memory_order_relaxed); j++)
+  {
+    log("%i             . . . %x",j,  cur_end->eas[j]);
+    if (cur_end->eas[j])
+    {
+    int count = 0;
+      struct ea_storage *old_eas = &count;
+      struct ea_storage *old_eas2 = &count;
+      struct ea_storage *eas = cur_end->eas[j];
+      for (; eas; eas = atomic_load_explicit(&eas->next_hash, memory_order_acquire)){
+        log("eas debuggg %x", eas);
+        ASSERT_DIE(eas!=old_eas);
+        ASSERT_DIE(eas!=old_eas2);
+        ASSERT_DIE(eas != &sentinel_rehash);
+        //log("suspicious count %x", eas);
+        old_eas2 = old_eas;
+        old_eas = eas;
+        ASSERT_DIE(atomic_load_explicit(&eas->uc, memory_order_relaxed) < 38791387);
+        }
+     }
+  }
+  log("rehashed");
+#endif
 }
 
 
+static void
+ea_dump_esa(struct dump_request *dreq, struct ea_stor_array *esa, u64 order)
+{
+  for (uint i = 0; i < 1 << (order); i++)
+  {
+    struct ea_storage *eap = atomic_load_explicit(&esa->eas[i], memory_order_acquire);
+    for (; eap; eap = atomic_load_explicit(&eap->next_hash, memory_order_acquire))
+    {
+      RDUMP("%p ", eap);
+      ea_dump(dreq, eap->l);
+      RDUMP("\n");
+    }
+  }
+}
+
 /**
  * rta_dump_all - dump attribute cache
  *
@@ -1698,18 +2043,26 @@ ea_free_deferred(struct deferred_call *dc)
 void
 ea_dump_all(struct dump_request *dreq)
 {
+  rcu_read_lock();
+  struct ea_stor_array *esa = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed);
+  struct ea_stor_array *next_esa = (esa == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1;
+  u64 order = atomic_load_explicit(&esa->order, memory_order_relaxed);
+  u64 next_order = atomic_load_explicit(&next_esa->order, memory_order_relaxed);
   RDUMP("Route attribute cache (%d entries, order %d):\n",
       atomic_load_explicit(&rta_hash_table.count, memory_order_relaxed),
-      atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed)->order);
+      order);
+
+  if (order)
+    ea_dump_esa(dreq, esa, order);
+
+  if (next_order)
+  {
+    RDUMP("Rehashing is running right now. Some of the following routes you might have already seen above.");
+    ea_dump_esa(dreq, next_esa, next_order);
+  }
 
-  SPINHASH_WALK(rta_hash_table, RTAH, a)
-      {
-       RDUMP("%p ", a);
-       ea_dump(dreq, a->l);
-       RDUMP("\n");
-      }
-  SPINHASH_WALK_END;
   RDUMP("\n");
+  rcu_read_unlock();
 }
 
 void
@@ -1720,6 +2073,18 @@ ea_show_list(struct cli *c, ea_list *eal)
     ea_show(c, &n->attrs[i]);
 }
 
+static void
+ea_init_hash_table(pool *pool, struct event_list *ev_list)
+{
+  rta_hash_table.pool = pool;
+  rta_hash_table.ev_list = ev_list;
+  rta_hash_table.rehash_event.hook = ea_rehash;
+  rta_hash_table.esa1.eas = mb_allocz(pool, sizeof(struct ea_storage *_Atomic ) * 1<<6);
+  atomic_store_explicit(&rta_hash_table.esa1.order, 6, memory_order_relaxed);
+  atomic_store_explicit(&rta_hash_table.esa2.order, 0, memory_order_relaxed);
+  atomic_store_explicit(&rta_hash_table.cur, &rta_hash_table.esa1, memory_order_relaxed);
+}
+
 /**
  * rta_init - initialize route attribute cache
  *
@@ -1737,7 +2102,7 @@ rta_init(void)
   for (uint i=0; i<ARRAY_SIZE(ea_slab_sizes); i++)
     ea_slab[i] = sl_new(rta_pool, birdloop_event_list(&main_birdloop), ea_slab_sizes[i]);
 
-  SPINHASH_INIT(rta_hash_table, RTAH, rta_pool, &global_work_list);
+  ea_init_hash_table(rta_pool, birdloop_event_list(&main_birdloop));
 
   rte_src_init(birdloop_event_list(&main_birdloop));
   ea_class_init();