From: Katerina Kubecova Date: Fri, 31 Jan 2025 10:13:26 +0000 (+0100) Subject: Unlocking the global attribute cache X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2a7735d28cfc85eaf676ca52b3538d9150e43396;p=thirdparty%2Fbird.git Unlocking the global attribute cache Now, when we have the unlocked slab, we can also go for a completely lockless global attribute cache, which has been the major contention point in BIRD 3. This is a complicated data structure, basically a lockless hash table with use-counted items inside, allowing concurrent insertions and deletions. Hash tables have to dynamically resize its base array, and the rehash routine runs from an event if needed. Also, due to some funny meta-race conditions, it is now possible to end up with multiple ea_list_storage instances with the same contents, yet it looks like it happens very rarely. (Prove us wrong, we dare you!) --- diff --git a/lib/route.h b/lib/route.h index 6ad4a7c7d..bf38a78a0 100644 --- a/lib/route.h +++ b/lib/route.h @@ -253,7 +253,7 @@ enum ea_stored { }; struct ea_storage { - struct ea_storage *next_hash; /* Next in hash chain */ + struct ea_storage *_Atomic next_hash; /* Next in hash chain */ _Atomic u64 uc; /* Use count */ u32 hash_key; /* List hash */ PADDING(unused, 0, 4); /* Sorry, we need u64 for the usecount */ @@ -573,7 +573,7 @@ static inline struct ea_storage *ea_get_storage(ea_list *r) static inline ea_list *ea_ref(ea_list *r) { - ASSERT_DIE(0 < atomic_fetch_add_explicit(&ea_get_storage(r)->uc, 1, memory_order_acq_rel)); + ASSERT_DIE(0 < atomic_fetch_add_explicit(&ea_get_storage(r)->uc, 1, memory_order_acq_rel)); //TODO Is this ok? Are we sure no one will decrease the count? return r; } @@ -586,6 +586,12 @@ static inline ea_list *ea_lookup(ea_list *r, u32 squash_upto, enum ea_stored oid return ea_lookup_slow(r, squash_upto, oid); } +struct ea_finally_free_deferred_call { + struct deferred_call dc; + struct rcu_stored_phase phase; + struct ea_storage *attrs; +}; + struct ea_free_deferred { struct deferred_call dc; ea_list *attrs; diff --git a/nest/rt-attr.c b/nest/rt-attr.c index 249bfae48..2534f3962 100644 --- a/nest/rt-attr.c +++ b/nest/rt-attr.c @@ -226,6 +226,8 @@ static struct idm src_ids; static struct rte_src * _Atomic * _Atomic rte_src_global; static _Atomic uint rte_src_global_max; +void ea_rehash(void*); + static void rte_src_init(struct event_list *ev) { @@ -1507,188 +1509,531 @@ ea_append(ea_list *to, ea_list *what) return res; } -/* - * rta's - */ - -static SPINHASH(struct ea_storage) rta_hash_table; - -#define RTAH_KEY(a) a->l, a->hash_key -#define RTAH_NEXT(a) a->next_hash -#define RTAH_EQ(a, b) ((a)->hash_key == (b)->hash_key) && (ea_same((a), (b))) -#define RTAH_FN(a, h) h -#define RTAH_ORDER 12 - -#define RTAH_REHASH rta_rehash -#define RTAH_PARAMS /8, *2, 2, 2, 12, 28 -static void RTAH_REHASH(void *_ UNUSED) { - int step; - - RTA_LOCK; - SPINHASH_REHASH_PREPARE(&rta_hash_table, RTAH, struct ea_storage, step); - RTA_UNLOCK; +struct ea_stor_array { + struct ea_storage *_Atomic *eas; + _Atomic uint order; +}; - if (!step) return; +struct hash_head { + struct ea_stor_array *_Atomic cur; + struct ea_stor_array esa1; + struct ea_stor_array esa2; + _Atomic uint count; + pool *pool; + struct event_list *ev_list; + event rehash_event; + event rehash; +}; - if (step > 0) SPINHASH_REHASH_UP (&rta_hash_table, RTAH, struct ea_storage, step); - if (step < 0) SPINHASH_REHASH_DOWN(&rta_hash_table, RTAH, struct ea_storage, -step); +static struct hash_head rta_hash_table; - RTA_LOCK; - SPINHASH_REHASH_FINISH(&rta_hash_table, RTAH); - RTA_UNLOCK; +static void +ea_increment_table_count(uint order) +{ + int count = atomic_fetch_add_explicit(&rta_hash_table.count, 1, memory_order_relaxed); + if (count > 1 << (order +1)) + ev_send(rta_hash_table.ev_list, &rta_hash_table.rehash_event); } -static ea_list * -ea_lookup_existing(ea_list *o, u32 squash_upto, uint h) +static struct ea_storage * +ea_walk_chain_for_storage(struct ea_storage *eap_first_next, ea_list *o, u32 squash_upto, uint h) { - struct ea_storage *r = NULL; - - SPINHASH_BEGIN_CHAIN(rta_hash_table, RTAH, read, eap, o, h); - for (struct ea_storage *ea; ea = *eap; eap = &RTAH_NEXT(ea)) + for (struct ea_storage *eap = eap_first_next; eap; + eap = atomic_load_explicit(&eap->next_hash, memory_order_acquire)) + { if ( - (h == ea->hash_key) && - ea_same(o, ea->l) && - BIT32_TEST(&squash_upto, ea->l->stored) && - (!r || r->l->stored > ea->l->stored)) - r = ea; - if (r) - atomic_fetch_add_explicit(&r->uc, 1, memory_order_acq_rel); - SPINHASH_END_CHAIN(read); - - return r ? r->l : NULL; + (h == eap->hash_key) && ea_same(o, eap->l) && + BIT32_TEST(&squash_upto, eap->l->stored)) + return eap; + } + return NULL; } - -/** - * rta_lookup - look up a &rta in attribute cache - * @o: a un-cached &rta - * - * rta_lookup() gets an un-cached &rta structure and returns its cached - * counterpart. It starts with examining the attribute cache to see whether - * there exists a matching entry. If such an entry exists, it's returned and - * its use count is incremented, else a new entry is created with use count - * set to 1. - * - * The extended attribute lists attached to the &rta are automatically - * converted to the normalized form. - */ ea_list * ea_lookup_slow(ea_list *o, u32 squash_upto, enum ea_stored oid) { - uint h; - ASSERT(o->stored != oid); ASSERT(oid); o = ea_normalize(o, squash_upto); - h = ea_hash(o); + uint h = ea_hash(o); squash_upto |= BIT32_VAL(oid); - struct ea_list *rr = ea_lookup_existing(o, squash_upto, h); - if (rr) return rr; - - RTA_LOCK; - if (rr = ea_lookup_existing(o, squash_upto, oid)) - { - RTA_UNLOCK; - return rr; - } - - struct ea_storage *r = NULL; + /* We are about to go to critical section. Allocate the storage now. + * If it is not used, we will free it later. */ + struct ea_storage *r_new = NULL; uint elen = ea_list_size(o); uint sz = elen + sizeof(struct ea_storage); - for (uint i=0; il, o, elen); - ea_list_ref(r->l); + ea_list_copy(r_new->l, o, elen); - r->l->flags |= huge; - r->l->stored = oid; - r->hash_key = h; - atomic_store_explicit(&r->uc, 1, memory_order_release); + r_new->l->flags |= huge; + r_new->l->stored = oid; + r_new->hash_key = h; + atomic_store_explicit(&r_new->uc, 1, memory_order_relaxed); - SPINHASH_INSERT(rta_hash_table, RTAH, r); + /* The storage is ready to be added if needed. */ - RTA_UNLOCK; - return r->l; -} + struct ea_storage *r_found = NULL; -#define EA_UC_SUB (1ULL << 44) -#define EA_UC_IN_PROGRESS(x) ((x) >> 44) -#define EA_UC_ASSIGNED(x) ((x) & (EA_UC_SUB-1)) + lookup_loop: + rcu_read_lock(); + struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_acquire); + struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; //maybe it would be more effective if we load it later + uint cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed); + uint in = h >> (32 - cur_order); -#define EA_UC_DONE(x) (EA_UC_ASSIGNED(x) == EA_UC_IN_PROGRESS(x)) + struct ea_storage *eap_first; + struct ea_storage *eap_first_next; -void -ea_free_deferred(struct deferred_call *dc) -{ - struct ea_storage *r = ea_get_storage(SKIP_BACK(struct ea_free_deferred, dc, dc)->attrs); + /* Actualy search for the ea_storage - maybe we already have it */ + + if (cur_order) + { + eap_first = atomic_load_explicit(&cur->eas[in], memory_order_acquire); + + r_found = ea_walk_chain_for_storage(eap_first, o, squash_upto, h); + } + /* Maybe rehashing is running right now. Lets check it. */ + uint next_order = atomic_load_explicit(&next->order, memory_order_relaxed); + uint next_in = h >> (32 - next_order); + + if (r_found == NULL && next_order) + { + eap_first_next = atomic_load_explicit(&next->eas[next_in], memory_order_acquire); + + r_found = ea_walk_chain_for_storage(eap_first_next, o, squash_upto, h); + } + + if (r_found) + { + /* We found out we already have a suitable ea_storage. Lets increment its use count */ + u64 uc = atomic_load_explicit(&r_found->uc, memory_order_relaxed); + do + { + if (uc == 0) + { + /* The found storage has zero use count. In that case, we are not longer allowed to increase it. */ + rcu_read_unlock(); + goto lookup_loop; + } + } while (!atomic_compare_exchange_strong_explicit( + &r_found->uc, &uc, uc + 1, + memory_order_acq_rel, memory_order_acquire)); + /* we succesfully increased count, ea_storrage is ours */ + /* free ea_storage we allocated earlier */ + if (huge) + { + RTA_LOCK; + mb_free(r_new); + RTA_UNLOCK; + } else + sl_free(r_new); + + rcu_read_unlock(); + return r_found->l; + } - /* Indicate free intent */ - u64 prev = atomic_fetch_add_explicit(&r->uc, EA_UC_SUB, memory_order_acq_rel); - prev |= EA_UC_SUB; + /* suitable ea_storage not found, we need to add it */ + if (next_order) + { + /* Rehash is running, so we put the new storage to the new array */ + atomic_store_explicit(&r_new->next_hash, eap_first_next, memory_order_release); + if (!atomic_compare_exchange_strong_explicit( + &next->eas[next_in], &eap_first_next, r_new, + memory_order_acq_rel, memory_order_acquire)) + { + /* Someone was quicker and added something. Maybe added the storage we are about to add, lets check out. */ + rcu_read_unlock(); + goto lookup_loop; + } + } else + { + atomic_store_explicit(&r_new->next_hash, eap_first, memory_order_release); + if (!atomic_compare_exchange_strong_explicit( + &cur->eas[in], &eap_first, r_new, + memory_order_acq_rel, memory_order_acquire)) + { + /* Someone was quicker and added something. Maybe added the storage we are about to add, lets check out. */ + rcu_read_unlock(); + goto lookup_loop; + } + } - if (!EA_UC_DONE(prev)) + /* ea_storrage succesfully added */ + rcu_read_unlock(); + + /* Increase the counter of stored ea_storages and check if we need rehash */ + ea_increment_table_count(cur_order); + ea_list_ref(r_new->l); + return r_new->l; +} + + +static void +ea_finally_free(struct deferred_call *dc) +{ + /* Free an ea_storrage in defer call */ + SKIP_BACK_DECLARE(struct ea_finally_free_deferred_call, eafdc, dc, dc); + + if (!rcu_end_sync(eafdc->phase)) { - /* The easy case: somebody else holds the usecount */ - ASSERT_DIE(EA_UC_IN_PROGRESS(prev) < EA_UC_ASSIGNED(prev)); - atomic_fetch_sub_explicit(&r->uc, EA_UC_SUB + 1, memory_order_acq_rel); + /* Somebody may still have the pointer of the storage, retry later */ + defer_call(dc, sizeof *eafdc); return; } - /* Wait for others to finish */ - while (EA_UC_DONE(prev) && (EA_UC_IN_PROGRESS(prev) > 1)) - prev = atomic_load_explicit(&r->uc, memory_order_acquire); + struct ea_storage *r = eafdc->attrs; + ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_relaxed) == 0); + ea_list_unref(r->l); - if (!EA_UC_DONE(prev)) + if (r->l->flags & EALF_HUGE) { - /* Somebody else has found us inbetween, no need to free */ - ASSERT_DIE(EA_UC_IN_PROGRESS(prev) < EA_UC_ASSIGNED(prev)); - atomic_fetch_sub_explicit(&r->uc, EA_UC_SUB + 1, memory_order_acq_rel); - return; + RTA_LOCK; + mb_free(r); + RTA_UNLOCK; } + else + sl_free(r); +} - /* We are the last one to free this ea */ - RTA_LOCK; +static struct ea_storage * +ea_free_prepare(struct ea_stor_array *esa, struct ea_storage *r, uint order, bool *success) +{ + /* Removes r from esa, returns NULL if nothing else needed, ea_storage if we need to remove the ea_storage. + * The use counter of this storage is already zero. + * (It might be the given storage if we were unsuccesful, or the storage previously pointing to it.) */ + + /* What are the golden rules here: + * 1) anyone can ADD ea_storage any time, BUT ONLY AS THE HEAD OF THE CHAIN - newer to the end, never anywhere in between + * 2) anyone can REMOVE any of the ea_storage, BUT ONLY IF HE SUCCEDED TO INCREASE THE USECOUNT OF THE PREVIOUS EA_STORAGE, + * OR if it is the FIRST STORAGE in the chain + * 3) nobody can increase an already zero usecount. + * 4) and, obviously, if we increase an usecount, we have to make sure we will decrease it. + * 5) no reorder, only add or remove + */ + uint in = r->hash_key >> (32 - order); + struct ea_storage *eap = atomic_load_explicit(&esa->eas[in], memory_order_relaxed); + struct ea_storage *ea_old = NULL; + + for (; eap; eap = atomic_load_explicit(&eap->next_hash, memory_order_acquire)) + { + if (eap == r) + { + /* We found the ea_storage, lets remove it */ + struct ea_storage *ea_next = atomic_load_explicit(&r->next_hash, memory_order_acquire); - /* Trying to remove the ea from the hash */ - SPINHASH_REMOVE(rta_hash_table, RTAH, r); + if (ea_old == NULL) + { + /* It is the first storage in chain */ + if (!atomic_compare_exchange_strong_explicit(&esa->eas[in], &r, ea_next, + memory_order_acq_rel, memory_order_acquire)) + { + success[0] = false; + return r; /* Not success, somebody else added a storage. Lets try again.*/ + } + success[0] = true; + return NULL; + } else + { + u64 uc_prev; + do + { + /* Try to increase use count of the previous ea_storage (rule 2) */ + uc_prev = atomic_load_explicit(&ea_old->uc, memory_order_acquire); + + if (uc_prev == 0) + { + success[0] = false; + return r; + } + } while (!atomic_compare_exchange_strong_explicit(&ea_old->uc, &uc_prev, uc_prev +1, memory_order_acq_rel, memory_order_acquire)); + + /* remove eap */ + atomic_store_explicit(&ea_old->next_hash, ea_next, memory_order_release); + /* decrease increased use count of the previous storage */ + uc_prev = atomic_fetch_sub_explicit(&ea_old->uc, 1, memory_order_release); + if (uc_prev == 0) + { + /* This was the last reference, we ned to remove the previous storage as well. */ + success[0] = true; + return ea_old; + } + success[0] = true; + return NULL; + } + } + ea_old = eap; + } + success[0] = false; + return NULL; +} - /* Somebody has cloned this rta inbetween. This sometimes happens. */ - prev = atomic_load_explicit(&r->uc, memory_order_acquire); - if (!EA_UC_DONE(prev) || (EA_UC_IN_PROGRESS(prev) > 1)) +static void +ea_storage_free(struct ea_storage *r) +{ + u64 uc = atomic_fetch_sub_explicit(&r->uc, 1, memory_order_acq_rel); + + if (uc != 1) { - /* Reinsert the ea and go away */ - SPINHASH_INSERT(rta_hash_table, RTAH, r); - atomic_fetch_sub_explicit(&r->uc, EA_UC_SUB + 1, memory_order_acq_rel); - RTA_UNLOCK; + /* Someone else has a reference to this ea_storage. We can just decrease use count. */ + ASSERT_DIE(uc > 0); /* Check this is not a double free. */ return; } - /* Fine, now everybody is actually done */ - ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_acquire) == EA_UC_SUB + 1); + do + { + rcu_read_lock(); + /* Find the storage in one of the stor arrays and remove it, so nobody will found it again */ + struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_acquire); + struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; - /* And now we can free the object, finally */ - ea_list_unref(r->l); - if (r->l->flags & EALF_HUGE) - mb_free(r); - else - sl_free(r); + bool success[1]; // two return values needed and creating new structure makes no sence + struct ea_storage *next_to_free; + + uint cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed); + + if (cur_order) + /* search in old array */ + next_to_free = ea_free_prepare(cur, r, cur_order, success); + + uint next_order = atomic_load_explicit(&next->order, memory_order_relaxed); + ASSERT_DIE(cur_order || next_order); + + if (next_order && (!success[0] && !next_to_free)) + /* search in new array */ + next_to_free = ea_free_prepare(next, r, next_order, success); + + rcu_read_unlock(); + if (success[0]) + { + /* Consider if rehash is needed */ + int count = atomic_fetch_sub_explicit(&rta_hash_table.count, 1, memory_order_relaxed); + + u64 order = atomic_load_explicit(&cur->order, memory_order_relaxed); + if (count < 1 << (order - 1) && order > 28) + ev_send(rta_hash_table.ev_list, &rta_hash_table.rehash_event); + + /* Schedule actual free of the storage */ + struct ea_finally_free_deferred_call eafdc = { + .dc.hook = ea_finally_free, + .phase = rcu_begin_sync(), /* Asynchronous wait for RCU */ + .attrs = r, + }; + + defer_call(&eafdc.dc, sizeof(eafdc)); + } else + ASSERT_DIE(next_to_free); + r = next_to_free; + } while (r); +} + + +void +ea_free_deferred(struct deferred_call *dc) +{ + struct ea_storage *r = ea_get_storage(SKIP_BACK(struct ea_free_deferred, dc, dc)->attrs); + ea_storage_free(r); +} + +void +ea_rehash(void * u UNUSED) +{ + struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed); + struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; + u32 cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed); + ASSERT_DIE(atomic_load_explicit(&next->order, memory_order_relaxed) == 0); + + /* count new order */ + int count = atomic_fetch_add_explicit(&rta_hash_table.count, 1, memory_order_relaxed); + u32 next_order = cur_order; + + while (count > 1 << (next_order + 1)) + next_order++; + while (count < 1 << (next_order - 1) && next_order > 28) + next_order--; + + if (next_order == cur_order) + return; + + /* Prepare new array */ + if (atomic_load_explicit(&next->order, memory_order_relaxed)) + bug("Last rehash did has not ended yet or ended badly."); + + ASSERT_DIE(next->eas == NULL); + + RTA_LOCK; + struct ea_storage *_Atomic * new_array = + mb_allocz(rta_hash_table.pool, sizeof(struct ea_storage *_Atomic) * (1 << next_order)); RTA_UNLOCK; + + next->eas = new_array; + atomic_store_explicit(&next->order, next_order, memory_order_release); + + synchronize_rcu(); /* We need all threads working with ea_storages to know there is new array */ + + /* Move ea_storages from old array to new. */ + /* Lookup is addind new ea_storages to new array, but we might collide with deleting */ + /* We need to follow rules of working with ea_storage arrays: + * 1) anyone can ADD ea_storage any time, BUT ONLY AS THE HEAD OF THE CHAIN - newer to the end, never anywhere in between + * 2) anyone can REMOVE any of the ea_storage, BUT ONLY IF HE SUCCEDED TO INCREASE THE USECOUNT OF THE PREVIOUS EA_STORAGE, + * OR if it is the FIRST STORAGE in the chain + * 3) nobody can increase an already zero usecount. + * 4) and, obviously, if we increase an usecount, we have to make sure we will decrease it. + * 5) no reorder, only add or remove + */ + for (int i = 0; i < (1 << cur_order); i++) + { + rcu_read_lock(); + uint num_stor = 0; + struct ea_storage *eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire); + struct ea_storage *ea_index = eas_first; + + if (!ea_index) + { + rcu_read_unlock(); + continue; + } + + u64 uc; + do + { + /* according to rule 2), we can remove all */ + uc = atomic_load_explicit(&ea_index->uc, memory_order_acquire); + bool succ = false; + do + { + if (uc && atomic_compare_exchange_strong_explicit( + &ea_index->uc, &uc, uc + 1, + memory_order_acq_rel, memory_order_acquire)) + { + num_stor++; + succ = true; + } /* no need to care about those with use count on zero. Their next_hash pointers are ok and we can skip them again. */ + } while (!succ && uc > 0); + + } while (ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire)); + rcu_read_unlock(); + + /* now nobody can do add or delete from our chain */ + /* because each storage has to be possible to find at any time, + * we have to rehash them backwards. We put them to local array first. + */ + struct ea_storage *local[num_stor]; + rcu_read_lock(); + ea_index = eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire);; + uint l_i = 0; + do + { + uc = atomic_load_explicit(&ea_index->uc, memory_order_acquire); + if (uc) + { + local[l_i] = ea_index; + l_i++; + } + } while (ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire)); + rcu_read_unlock(); + + ASSERT_DIE(l_i == num_stor); + /* and now we can finaly move the storages to new destination */ + for (int i = l_i - 1; i>=0; i--) + { + struct ea_storage *ea = local[i]; + uint h_next = ea->hash_key >> (32 - next_order); + struct ea_storage *next_first; + do + { + next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire); + atomic_store_explicit(&ea->next_hash, next_first, memory_order_release); + } while (!atomic_compare_exchange_strong_explicit( + &next->eas[h_next], &next_first, ea, + memory_order_acq_rel, memory_order_acquire)); + /* we increased use count to all storages in local array. For this storage, it is no more needed. */ + ea_storage_free(ea); + } + } + + /* We are going to get rid of the old array, so we have to be sure nobody will touch it. + * That is why we first set the order of old field to zero (nobody will touch zero-length array) + * and then call synchronize_rcu (everybody will know old array is zero-length) */ + atomic_store_explicit(&cur->order, 0, memory_order_release); + synchronize_rcu(); + + RTA_LOCK; + mb_free(cur->eas); + RTA_UNLOCK; + cur->eas = NULL; + + /* Switch the cur pointer to the new array. From now, the new array is the currently used array. */ + atomic_store_explicit(&rta_hash_table.cur, next, memory_order_relaxed); + + struct ea_stor_array *cur_end = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed); + struct ea_stor_array *next_end = (cur_end == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; + ASSERT_DIE(atomic_load_explicit(&next_end->order, memory_order_relaxed) == 0); + ASSERT_DIE(atomic_load_explicit(&cur_end->order, memory_order_relaxed) != 0); + + synchronize_rcu(); /* To make sure the next rehash can not start before this one is fully accepted. */ + +#if 0 + // this is for debug - shows full state of current ea_stor_array + for (int j = 0; j < 1<< atomic_load_explicit(&cur_end->order, memory_order_relaxed); j++) + { + log("%i . . . %x",j, cur_end->eas[j]); + if (cur_end->eas[j]) + { + int count = 0; + struct ea_storage *old_eas = &count; + struct ea_storage *old_eas2 = &count; + struct ea_storage *eas = cur_end->eas[j]; + for (; eas; eas = atomic_load_explicit(&eas->next_hash, memory_order_acquire)){ + log("eas debuggg %x", eas); + ASSERT_DIE(eas!=old_eas); + ASSERT_DIE(eas!=old_eas2); + ASSERT_DIE(eas != &sentinel_rehash); + //log("suspicious count %x", eas); + old_eas2 = old_eas; + old_eas = eas; + ASSERT_DIE(atomic_load_explicit(&eas->uc, memory_order_relaxed) < 38791387); + } + } + } + log("rehashed"); +#endif } +static void +ea_dump_esa(struct dump_request *dreq, struct ea_stor_array *esa, u64 order) +{ + for (uint i = 0; i < 1 << (order); i++) + { + struct ea_storage *eap = atomic_load_explicit(&esa->eas[i], memory_order_acquire); + for (; eap; eap = atomic_load_explicit(&eap->next_hash, memory_order_acquire)) + { + RDUMP("%p ", eap); + ea_dump(dreq, eap->l); + RDUMP("\n"); + } + } +} + /** * rta_dump_all - dump attribute cache * @@ -1698,18 +2043,26 @@ ea_free_deferred(struct deferred_call *dc) void ea_dump_all(struct dump_request *dreq) { + rcu_read_lock(); + struct ea_stor_array *esa = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed); + struct ea_stor_array *next_esa = (esa == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; + u64 order = atomic_load_explicit(&esa->order, memory_order_relaxed); + u64 next_order = atomic_load_explicit(&next_esa->order, memory_order_relaxed); RDUMP("Route attribute cache (%d entries, order %d):\n", atomic_load_explicit(&rta_hash_table.count, memory_order_relaxed), - atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed)->order); + order); + + if (order) + ea_dump_esa(dreq, esa, order); + + if (next_order) + { + RDUMP("Rehashing is running right now. Some of the following routes you might have already seen above."); + ea_dump_esa(dreq, next_esa, next_order); + } - SPINHASH_WALK(rta_hash_table, RTAH, a) - { - RDUMP("%p ", a); - ea_dump(dreq, a->l); - RDUMP("\n"); - } - SPINHASH_WALK_END; RDUMP("\n"); + rcu_read_unlock(); } void @@ -1720,6 +2073,18 @@ ea_show_list(struct cli *c, ea_list *eal) ea_show(c, &n->attrs[i]); } +static void +ea_init_hash_table(pool *pool, struct event_list *ev_list) +{ + rta_hash_table.pool = pool; + rta_hash_table.ev_list = ev_list; + rta_hash_table.rehash_event.hook = ea_rehash; + rta_hash_table.esa1.eas = mb_allocz(pool, sizeof(struct ea_storage *_Atomic ) * 1<<6); + atomic_store_explicit(&rta_hash_table.esa1.order, 6, memory_order_relaxed); + atomic_store_explicit(&rta_hash_table.esa2.order, 0, memory_order_relaxed); + atomic_store_explicit(&rta_hash_table.cur, &rta_hash_table.esa1, memory_order_relaxed); +} + /** * rta_init - initialize route attribute cache * @@ -1737,7 +2102,7 @@ rta_init(void) for (uint i=0; i