do
{
+ //log("free r %x", r);
ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_acquire) == 0);
rcu_read_lock();
/* Find the storage in one of the stor arrays and remove it, so nobody will found it again */
ASSERT_DIE(cur_order || next_order);
- bool success[1]; // two return values needed and creating new structure makes no sence
+ /*
+ This is tricky part.
+ The eattr can be:
+ a) in cur only
+ b) in next only
+ c) in both of them, if we are doing rehash right now, next is subset of cur
+ One attempt to delete one eattr from an ea_stor_array (cur or next) may result in:
+ I) eattr not found - returns success false, next_to_free NULL
+ II) eattr found, but was not deleted (retry needed) - returns succes false, next_to_free original eattr
+ III) eattr found, deleted, but due to race we need to free previous eattr in linked list.
+ this can not occur if we are deleting from cur and eattr fulfill c)
+ - returns succes true, next_to_free previous eattr
+ IV) eattr found, deleted, nothing more needed - returns succes, next_to_free NULL
+
+ And one more funny thing - the first moment we do rcu_read_unlock cur and next might switch or one of them might disappear.
+
+ Our strategy is try to delete eattr from cur and in case:
+ I) we need to delete it from next. Right now, without rcu_read_unlock,
+ waiting for swaping next and cur might take longer time. However, if we don't succed, we reenter the do-while loop
+ with the same configuration.
+ II) just make second attempt - reenter the do-while loop with the same configuration. We might need to deal with next array,
+ but cur must be solved first.
+ III) We need to consider two cases:
+ a) basic case - rehash is not running on this index - in this case, next can not contain eattr
+ we can repeat do-while loop with the returned eattr
+ b) rehash is running on this index - we can be sure the returned eattr is actually from next array - all eattrs in cur
+ have their use counts artifically increased and thence there is no need to free them here.
+ We can repeat do-while loop with the returned eattr, we are actually solving next already.
+ -> repeat do-while loop with the returned eattr workes for both cases.
+ IV) Nice, but the deleted eattr might be the header of the next array. Plus, we can not be sure, if someone is not about
+ replacing the next header at the time we are making this decision. Thus, we need to try to delete it from next
+ as well. And, of course, right now without rcu_read_unlock.
+ Of course, if there is no next array, we will not use it. If case I, II or III occurs while solving next array, we will re-enter
+ do-while loop. We need to repeat all the steps, because reentering loop means refreshing rcu_read_lock.
+ (Not refreshing rcu_read_lock might cause deadlocks.)
+ */
+ bool cur_success, next_success = false; // two return values needed and creating new structure makes no sence
struct ea_storage *next_to_free = NULL;
- if (next_order)
- /* search in new array */
- next_to_free = ea_free_prepare(next, r, next_order, success);
+ if (cur_order)
+ next_to_free = ea_free_prepare(cur, r, cur_order, &cur_success);
- if (cur_order && (!success[0] && !next_to_free))
- /* search in old array */
- next_to_free = ea_free_prepare(cur, r, cur_order, success);
+ if (!next_to_free && next_order) /* Yeah, the long comment results in that simple condition. */
+ next_to_free = ea_free_prepare(next, r, next_order, &next_success);
rcu_read_unlock();
- if (success[0])
+ if ((cur_success || next_success) && !next_to_free)
{
/* Consider if rehash is needed */
int count = atomic_fetch_sub_explicit(&rta_hash_table.count, 1, memory_order_relaxed);
}
static uint
-ea_linked_list_to_array(struct ea_storage **local, struct ea_stor_array *esa, u64 esa_i)
+ea_make_list_ours(struct ea_stor_array *esa, u64 esa_i)
{
- struct ea_storage *ea_index = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire);
- struct ea_storage *prev_stor = NULL;
- u64 uc;
- int loc_i = 0;
+ /* First, make sure the first item is ours */
+ struct ea_storage *eas_first = NULL;
+ u64 uc = 0;
+ do
+ {
+ rcu_read_lock();
+ eas_first = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire);
+ //log("get zero %x", eas_first);
+
+ if (eas_first)
+ {
+ uc = atomic_load_explicit(&eas_first->uc, memory_order_acquire);
+
+ /* We must be sure not to increment zero use count */
+ while (uc && !atomic_compare_exchange_strong_explicit(
+ &eas_first->uc, &uc, uc + 1,
+ memory_order_acq_rel, memory_order_acquire));
+ }
+
+ rcu_read_unlock();
+ } while (eas_first && !uc);
- while (ea_index)
+
+ if (!eas_first)
+ return 0; // there is nothing at given index
+
+ /* Now, we have first stable point and can go further */
+ uint num_stor = 1;
+ struct ea_storage *eas_last_increased = eas_first;
+ struct ea_storage *eas_next = NULL;
+ do
{
- uc = atomic_load_explicit(&ea_index->uc, memory_order_acquire);
+ rcu_read_lock();
+ eas_next = atomic_load_explicit(&eas_last_increased->next_hash, memory_order_acquire);
+ //log("eas_next %x", eas_next);
- /* We must be sure not to increment zero use count */
- while (uc && !atomic_compare_exchange_strong_explicit(
- &ea_index->uc, &uc, uc + 1,
- memory_order_acq_rel, memory_order_acquire));
+ if (eas_next)
+ {
+ uc = atomic_load_explicit(&eas_next->uc, memory_order_acquire);
- if (uc)
- {
- local[loc_i] = ea_index;
- loc_i++;
- prev_stor = ea_index;
- ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire);
- } else
- {
- /* oops, sombody is deleting this storage right now. Do not touch it, it bites! */
- if (prev_stor)
- ea_index = atomic_load_explicit(&prev_stor->next_hash, memory_order_acquire);
- else
- ea_index = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire);
- }
- }
- return loc_i;
+ /* We must be sure not to increment zero use count */
+ while (uc && !atomic_compare_exchange_strong_explicit(
+ &eas_next->uc, &uc, uc + 1,
+ memory_order_acq_rel, memory_order_acquire));
+
+ if (uc)
+ {
+ eas_last_increased = eas_next;
+ num_stor++;
+ }
+ }
+
+ rcu_read_unlock();
+ } while (eas_next);
+
+ return num_stor;
}
void
ea_rehash(void * u UNUSED)
{
- log("rehash start");
+ //log("rehash start");
struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed);
struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1;
u32 cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed);
while (count < 1 << (next_order - 1) && next_order > 28)
next_order--;
- if (next_order == cur_order){
- log("rehash: order ok");
- return;}
+ if (next_order == cur_order)
+ return;
/* Prepare new array */
if (atomic_load_explicit(&next->order, memory_order_relaxed))
*/
for (int i = 0; i < (1 << cur_order); i++)
{
- rcu_read_lock();
- struct ea_storage *eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire);
- uint num_stor = 0;
-
- /* find out maximum length of chain */
- for (struct ea_storage *ea_index = eas_first; ea_index;
- ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire))
- num_stor++;
-
- struct ea_storage *local[num_stor];
- uint count_arr = ea_linked_list_to_array(local, cur, i);
- rcu_read_unlock();
+ /* Safely grab first eattr and increase its use count (rules 2 and 3) */
+ uint num_stor = ea_make_list_ours(cur, i);
+ if (num_stor == 0)
+ continue;
- if (count_arr == 0)
- continue; /* Empty linked list already */
+ struct ea_storage *local[num_stor];
+ struct ea_storage *ea_ptr = atomic_load_explicit(&cur->eas[i], memory_order_relaxed);
+ for (uint i = 0; i < num_stor; i++)
+ {
+ ASSERT_DIE(ea_ptr);
+ local[i] = ea_ptr;
+ /* We increased the use count for all of the eattrs in cur->eas[i], so we do not need to lock it again */
+ ea_ptr = atomic_load_explicit(&ea_ptr->next_hash, memory_order_relaxed);
+ }
+ ASSERT_DIE(!ea_ptr); /* Check the number of eattrs is correct. */
/* now nobody can do add or delete from our chain */
/* because each storage has to be possible to find at any time,
* we have to rehash them backwards. We put them to local array first.
*/
-
- ASSERT_DIE(count_arr <= num_stor); //ERROR //TODO //ERRORR //TODO //ERROR //TODO
- /* and now we can finaly move the storages to new destination */
- for (int arr_i = count_arr - 1; arr_i >= 0; arr_i--)
+ for (int arr_i = num_stor - 1; arr_i >= 0; arr_i--)
{
+ //log("set i %i storage %x", arr_i, local[arr_i]);
struct ea_storage *ea = local[arr_i];
uint h_next = ea->hash_key >> (32 - next_order);
struct ea_storage *next_first;
- do
- {
- next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire);
- atomic_store_explicit(&ea->next_hash, next_first, memory_order_release);
- } while (!atomic_compare_exchange_strong_explicit(
- &next->eas[h_next], &next_first, ea,
- memory_order_acq_rel, memory_order_acquire));
+ rcu_read_lock(); /* We need to load next->eas[h_next] - that key might vanish if we do not lock */
+ do
+ {
+ next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire);
+ atomic_store_explicit(&ea->next_hash, next_first, memory_order_release);
+ } while (!atomic_compare_exchange_strong_explicit(
+ &next->eas[h_next], &next_first, ea,
+ memory_order_acq_rel, memory_order_acquire));
+ rcu_read_unlock();
/* we increased use count to all storages in local array. For this storage, it is no more needed. */
// log("rehash free %x", ea);
ea_storage_free(ea);
//log("rehash goes to last sync");
synchronize_rcu(); /* To make sure the next rehash can not start before this one is fully accepted. */
- log("rehashed");
#if 0
// this is for debug - shows full state of current ea_stor_array