From b9e619a8bc23c6bd038fb8fa2ee77739e94f2980 Mon Sep 17 00:00:00 2001 From: Katerina Kubecova Date: Tue, 11 Mar 2025 16:50:41 +0100 Subject: [PATCH] fixup: rt-attr.c: (hopefully) solved deadlock in rehash. The solution for rehash broke ea_storage_free logic, but now both look repaired. --- nest/rt-attr.c | 189 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 124 insertions(+), 65 deletions(-) diff --git a/nest/rt-attr.c b/nest/rt-attr.c index fef66cb65..a6a97f2d8 100644 --- a/nest/rt-attr.c +++ b/nest/rt-attr.c @@ -1815,6 +1815,7 @@ ea_storage_free(struct ea_storage *r) do { + //log("free r %x", r); ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_acquire) == 0); rcu_read_lock(); /* Find the storage in one of the stor arrays and remove it, so nobody will found it again */ @@ -1825,20 +1826,54 @@ ea_storage_free(struct ea_storage *r) ASSERT_DIE(cur_order || next_order); - bool success[1]; // two return values needed and creating new structure makes no sence + /* + This is tricky part. + The eattr can be: + a) in cur only + b) in next only + c) in both of them, if we are doing rehash right now, next is subset of cur + One attempt to delete one eattr from an ea_stor_array (cur or next) may result in: + I) eattr not found - returns success false, next_to_free NULL + II) eattr found, but was not deleted (retry needed) - returns succes false, next_to_free original eattr + III) eattr found, deleted, but due to race we need to free previous eattr in linked list. + this can not occur if we are deleting from cur and eattr fulfill c) + - returns succes true, next_to_free previous eattr + IV) eattr found, deleted, nothing more needed - returns succes, next_to_free NULL + + And one more funny thing - the first moment we do rcu_read_unlock cur and next might switch or one of them might disappear. + + Our strategy is try to delete eattr from cur and in case: + I) we need to delete it from next. Right now, without rcu_read_unlock, + waiting for swaping next and cur might take longer time. However, if we don't succed, we reenter the do-while loop + with the same configuration. + II) just make second attempt - reenter the do-while loop with the same configuration. We might need to deal with next array, + but cur must be solved first. + III) We need to consider two cases: + a) basic case - rehash is not running on this index - in this case, next can not contain eattr + we can repeat do-while loop with the returned eattr + b) rehash is running on this index - we can be sure the returned eattr is actually from next array - all eattrs in cur + have their use counts artifically increased and thence there is no need to free them here. + We can repeat do-while loop with the returned eattr, we are actually solving next already. + -> repeat do-while loop with the returned eattr workes for both cases. + IV) Nice, but the deleted eattr might be the header of the next array. Plus, we can not be sure, if someone is not about + replacing the next header at the time we are making this decision. Thus, we need to try to delete it from next + as well. And, of course, right now without rcu_read_unlock. + Of course, if there is no next array, we will not use it. If case I, II or III occurs while solving next array, we will re-enter + do-while loop. We need to repeat all the steps, because reentering loop means refreshing rcu_read_lock. + (Not refreshing rcu_read_lock might cause deadlocks.) + */ + bool cur_success, next_success = false; // two return values needed and creating new structure makes no sence struct ea_storage *next_to_free = NULL; - if (next_order) - /* search in new array */ - next_to_free = ea_free_prepare(next, r, next_order, success); + if (cur_order) + next_to_free = ea_free_prepare(cur, r, cur_order, &cur_success); - if (cur_order && (!success[0] && !next_to_free)) - /* search in old array */ - next_to_free = ea_free_prepare(cur, r, cur_order, success); + if (!next_to_free && next_order) /* Yeah, the long comment results in that simple condition. */ + next_to_free = ea_free_prepare(next, r, next_order, &next_success); rcu_read_unlock(); - if (success[0]) + if ((cur_success || next_success) && !next_to_free) { /* Consider if rehash is needed */ int count = atomic_fetch_sub_explicit(&rta_hash_table.count, 1, memory_order_relaxed); @@ -1873,44 +1908,70 @@ ea_free_deferred(struct deferred_call *dc) } static uint -ea_linked_list_to_array(struct ea_storage **local, struct ea_stor_array *esa, u64 esa_i) +ea_make_list_ours(struct ea_stor_array *esa, u64 esa_i) { - struct ea_storage *ea_index = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire); - struct ea_storage *prev_stor = NULL; - u64 uc; - int loc_i = 0; + /* First, make sure the first item is ours */ + struct ea_storage *eas_first = NULL; + u64 uc = 0; + do + { + rcu_read_lock(); + eas_first = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire); + //log("get zero %x", eas_first); + + if (eas_first) + { + uc = atomic_load_explicit(&eas_first->uc, memory_order_acquire); + + /* We must be sure not to increment zero use count */ + while (uc && !atomic_compare_exchange_strong_explicit( + &eas_first->uc, &uc, uc + 1, + memory_order_acq_rel, memory_order_acquire)); + } + + rcu_read_unlock(); + } while (eas_first && !uc); - while (ea_index) + + if (!eas_first) + return 0; // there is nothing at given index + + /* Now, we have first stable point and can go further */ + uint num_stor = 1; + struct ea_storage *eas_last_increased = eas_first; + struct ea_storage *eas_next = NULL; + do { - uc = atomic_load_explicit(&ea_index->uc, memory_order_acquire); + rcu_read_lock(); + eas_next = atomic_load_explicit(&eas_last_increased->next_hash, memory_order_acquire); + //log("eas_next %x", eas_next); - /* We must be sure not to increment zero use count */ - while (uc && !atomic_compare_exchange_strong_explicit( - &ea_index->uc, &uc, uc + 1, - memory_order_acq_rel, memory_order_acquire)); + if (eas_next) + { + uc = atomic_load_explicit(&eas_next->uc, memory_order_acquire); - if (uc) - { - local[loc_i] = ea_index; - loc_i++; - prev_stor = ea_index; - ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire); - } else - { - /* oops, sombody is deleting this storage right now. Do not touch it, it bites! */ - if (prev_stor) - ea_index = atomic_load_explicit(&prev_stor->next_hash, memory_order_acquire); - else - ea_index = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire); - } - } - return loc_i; + /* We must be sure not to increment zero use count */ + while (uc && !atomic_compare_exchange_strong_explicit( + &eas_next->uc, &uc, uc + 1, + memory_order_acq_rel, memory_order_acquire)); + + if (uc) + { + eas_last_increased = eas_next; + num_stor++; + } + } + + rcu_read_unlock(); + } while (eas_next); + + return num_stor; } void ea_rehash(void * u UNUSED) { - log("rehash start"); + //log("rehash start"); struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed); struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1; u32 cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed); @@ -1925,9 +1986,8 @@ ea_rehash(void * u UNUSED) while (count < 1 << (next_order - 1) && next_order > 28) next_order--; - if (next_order == cur_order){ - log("rehash: order ok"); - return;} + if (next_order == cur_order) + return; /* Prepare new array */ if (atomic_load_explicit(&next->order, memory_order_relaxed)) @@ -1958,41 +2018,41 @@ ea_rehash(void * u UNUSED) */ for (int i = 0; i < (1 << cur_order); i++) { - rcu_read_lock(); - struct ea_storage *eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire); - uint num_stor = 0; - - /* find out maximum length of chain */ - for (struct ea_storage *ea_index = eas_first; ea_index; - ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire)) - num_stor++; - - struct ea_storage *local[num_stor]; - uint count_arr = ea_linked_list_to_array(local, cur, i); - rcu_read_unlock(); + /* Safely grab first eattr and increase its use count (rules 2 and 3) */ + uint num_stor = ea_make_list_ours(cur, i); + if (num_stor == 0) + continue; - if (count_arr == 0) - continue; /* Empty linked list already */ + struct ea_storage *local[num_stor]; + struct ea_storage *ea_ptr = atomic_load_explicit(&cur->eas[i], memory_order_relaxed); + for (uint i = 0; i < num_stor; i++) + { + ASSERT_DIE(ea_ptr); + local[i] = ea_ptr; + /* We increased the use count for all of the eattrs in cur->eas[i], so we do not need to lock it again */ + ea_ptr = atomic_load_explicit(&ea_ptr->next_hash, memory_order_relaxed); + } + ASSERT_DIE(!ea_ptr); /* Check the number of eattrs is correct. */ /* now nobody can do add or delete from our chain */ /* because each storage has to be possible to find at any time, * we have to rehash them backwards. We put them to local array first. */ - - ASSERT_DIE(count_arr <= num_stor); //ERROR //TODO //ERRORR //TODO //ERROR //TODO - /* and now we can finaly move the storages to new destination */ - for (int arr_i = count_arr - 1; arr_i >= 0; arr_i--) + for (int arr_i = num_stor - 1; arr_i >= 0; arr_i--) { + //log("set i %i storage %x", arr_i, local[arr_i]); struct ea_storage *ea = local[arr_i]; uint h_next = ea->hash_key >> (32 - next_order); struct ea_storage *next_first; - do - { - next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire); - atomic_store_explicit(&ea->next_hash, next_first, memory_order_release); - } while (!atomic_compare_exchange_strong_explicit( - &next->eas[h_next], &next_first, ea, - memory_order_acq_rel, memory_order_acquire)); + rcu_read_lock(); /* We need to load next->eas[h_next] - that key might vanish if we do not lock */ + do + { + next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire); + atomic_store_explicit(&ea->next_hash, next_first, memory_order_release); + } while (!atomic_compare_exchange_strong_explicit( + &next->eas[h_next], &next_first, ea, + memory_order_acq_rel, memory_order_acquire)); + rcu_read_unlock(); /* we increased use count to all storages in local array. For this storage, it is no more needed. */ // log("rehash free %x", ea); ea_storage_free(ea); @@ -2020,7 +2080,6 @@ ea_rehash(void * u UNUSED) //log("rehash goes to last sync"); synchronize_rcu(); /* To make sure the next rehash can not start before this one is fully accepted. */ - log("rehashed"); #if 0 // this is for debug - shows full state of current ea_stor_array -- 2.47.2