]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
fixup: rt-attr.c: (hopefully) solved deadlock in rehash.
authorKaterina Kubecova <katerina.kubecova@nic.cz>
Tue, 11 Mar 2025 15:50:41 +0000 (16:50 +0100)
committerKaterina Kubecova <katerina.kubecova@nic.cz>
Tue, 11 Mar 2025 15:50:41 +0000 (16:50 +0100)
 The solution for rehash broke ea_storage_free logic, but now both look repaired.

nest/rt-attr.c

index fef66cb65477cf0b3f38cbb4e1baea3523a6c351..a6a97f2d8d6359b9ad867e48d968b0cff679de95 100644 (file)
@@ -1815,6 +1815,7 @@ ea_storage_free(struct ea_storage *r)
 
   do
   {
+    //log("free      r %x", r);
     ASSERT_DIE(atomic_load_explicit(&r->uc, memory_order_acquire) == 0);
     rcu_read_lock();
       /* Find the storage in one of the stor arrays and remove it, so nobody will found it again */
@@ -1825,20 +1826,54 @@ ea_storage_free(struct ea_storage *r)
 
       ASSERT_DIE(cur_order || next_order);
 
-      bool success[1]; // two return values needed and creating new structure makes no sence
+      /*
+        This is tricky part.
+          The eattr can be:
+                a) in cur only
+                b) in next only
+                c) in both of them, if we are doing rehash right now, next is subset of cur
+          One attempt to delete one eattr from an ea_stor_array (cur or next) may result in:
+                I) eattr not found - returns success false, next_to_free NULL
+                II) eattr found, but was not deleted (retry needed) - returns succes false, next_to_free original eattr
+                III) eattr found, deleted, but due to race we need to free previous eattr in linked list.
+                        this can not occur if we are deleting from cur and eattr fulfill c)
+                        - returns succes true, next_to_free previous eattr
+                IV) eattr found, deleted, nothing more needed - returns succes, next_to_free NULL
+
+          And one more funny thing - the first moment we do rcu_read_unlock cur and next might switch or one of them might disappear.
+
+          Our strategy is try to delete eattr from cur and in case:
+                I) we need to delete it from next. Right now, without rcu_read_unlock,
+                       waiting for swaping next and cur might take longer time. However, if we don't succed, we reenter the do-while loop
+                       with the same configuration.
+                II) just make second attempt - reenter the do-while loop with the same configuration. We might need to deal with next array,
+                       but cur must be solved first.
+                III) We need to consider two cases:
+                       a) basic case - rehash is not running on this index - in this case, next can not contain eattr
+                              we can repeat do-while loop with the returned eattr
+                       b) rehash is running on this index - we can be sure the returned eattr is actually from next array - all eattrs in cur
+                              have their use counts artifically increased and thence there is no need to free them here.
+                              We can repeat do-while loop with the returned eattr, we are actually solving next already.
+                       -> repeat do-while loop with the returned eattr workes for both cases.
+                IV) Nice, but the deleted eattr might be the header of the next array. Plus, we can not be sure, if someone is not about
+                       replacing the next header at the time we are making this decision. Thus, we need to try to delete it from next
+                       as well. And, of course, right now without rcu_read_unlock.
+          Of course, if there is no next array, we will not use it. If case I, II or III occurs while solving next array, we will re-enter
+              do-while loop. We need to repeat all the steps, because reentering loop means refreshing rcu_read_lock.
+              (Not refreshing rcu_read_lock might cause deadlocks.)
+      */
+      bool cur_success, next_success = false; // two return values needed and creating new structure makes no sence
       struct ea_storage *next_to_free = NULL;
 
-      if (next_order)
-        /* search in new array */
-        next_to_free = ea_free_prepare(next, r, next_order, success);
+      if (cur_order)
+        next_to_free = ea_free_prepare(cur, r, cur_order, &cur_success);
 
-      if (cur_order && (!success[0] && !next_to_free))
-        /* search in old array */
-        next_to_free = ea_free_prepare(cur, r, cur_order, success);
+      if (!next_to_free && next_order) /* Yeah, the long comment results in that simple condition. */
+        next_to_free = ea_free_prepare(next, r, next_order, &next_success);
 
     rcu_read_unlock();
 
-    if (success[0])
+    if ((cur_success || next_success) && !next_to_free)
     {
       /* Consider if rehash is needed */
       int count = atomic_fetch_sub_explicit(&rta_hash_table.count, 1, memory_order_relaxed);
@@ -1873,44 +1908,70 @@ ea_free_deferred(struct deferred_call *dc)
 }
 
 static uint
-ea_linked_list_to_array(struct ea_storage **local, struct ea_stor_array *esa, u64 esa_i)
+ea_make_list_ours(struct ea_stor_array *esa, u64 esa_i)
 {
-  struct ea_storage *ea_index = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire);
-  struct ea_storage *prev_stor = NULL;
-  u64 uc;
-  int loc_i = 0;
+  /* First, make sure the first item is ours */
+  struct ea_storage *eas_first = NULL;
+  u64 uc = 0;
+  do
+  {
+    rcu_read_lock();
+      eas_first = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire);
+      //log("get zero %x", eas_first);
+
+      if (eas_first)
+      {
+        uc =  atomic_load_explicit(&eas_first->uc, memory_order_acquire);
+
+        /* We must be sure not to increment zero use count */
+        while (uc && !atomic_compare_exchange_strong_explicit(
+            &eas_first->uc, &uc, uc + 1,
+            memory_order_acq_rel, memory_order_acquire));
+      }
+
+    rcu_read_unlock();
+  } while (eas_first && !uc);
 
-  while (ea_index)
+
+  if (!eas_first)
+      return 0; // there is nothing at given index
+
+  /* Now, we have first stable point and can go further */ 
+  uint num_stor = 1;
+  struct ea_storage *eas_last_increased = eas_first;
+  struct ea_storage *eas_next = NULL;
+  do
   {
-    uc =  atomic_load_explicit(&ea_index->uc, memory_order_acquire);
+    rcu_read_lock();
+      eas_next = atomic_load_explicit(&eas_last_increased->next_hash, memory_order_acquire);
+      //log("eas_next %x", eas_next);
 
-    /* We must be sure not to increment zero use count */
-    while (uc && !atomic_compare_exchange_strong_explicit(
-          &ea_index->uc, &uc, uc + 1,
-          memory_order_acq_rel, memory_order_acquire));
+      if (eas_next)
+      {
+        uc =  atomic_load_explicit(&eas_next->uc, memory_order_acquire);
 
-    if (uc)
-    {
-      local[loc_i] = ea_index;
-      loc_i++;
-      prev_stor = ea_index;
-      ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire);
-    } else
-    {
-      /* oops, sombody is deleting this storage right now. Do not touch it, it bites! */
-      if (prev_stor)
-        ea_index = atomic_load_explicit(&prev_stor->next_hash, memory_order_acquire);
-      else
-        ea_index = atomic_load_explicit(&esa->eas[esa_i], memory_order_acquire);
-    }
-  }
-  return loc_i;
+        /* We must be sure not to increment zero use count */
+        while (uc && !atomic_compare_exchange_strong_explicit(
+            &eas_next->uc, &uc, uc + 1,
+            memory_order_acq_rel, memory_order_acquire));
+
+        if (uc)
+        {
+          eas_last_increased = eas_next;
+          num_stor++;
+        }
+      }
+
+    rcu_read_unlock();
+  } while (eas_next);
+
+  return num_stor;
 }
 
 void
 ea_rehash(void * u UNUSED)
 {
-  log("rehash start");
+  //log("rehash start");
   struct ea_stor_array *cur = atomic_load_explicit(&rta_hash_table.cur, memory_order_relaxed);
   struct ea_stor_array *next = (cur == &rta_hash_table.esa1)? &rta_hash_table.esa2 : &rta_hash_table.esa1;
   u32 cur_order = atomic_load_explicit(&cur->order, memory_order_relaxed);
@@ -1925,9 +1986,8 @@ ea_rehash(void * u UNUSED)
   while (count < 1 << (next_order - 1) && next_order > 28)
     next_order--;
 
-  if (next_order == cur_order){
-    log("rehash: order ok");
-    return;}
+  if (next_order == cur_order)
+    return;
 
   /* Prepare new array */
   if (atomic_load_explicit(&next->order, memory_order_relaxed))
@@ -1958,41 +2018,41 @@ ea_rehash(void * u UNUSED)
   */
   for (int i = 0; i < (1 << cur_order); i++)
   {
-    rcu_read_lock();
-      struct ea_storage *eas_first = atomic_load_explicit(&cur->eas[i], memory_order_acquire);
-      uint num_stor = 0;
-
-      /* find out maximum length of chain */
-      for (struct ea_storage *ea_index = eas_first; ea_index;
-          ea_index = atomic_load_explicit(&ea_index->next_hash, memory_order_acquire))
-        num_stor++;
-
-      struct ea_storage *local[num_stor];
-      uint count_arr = ea_linked_list_to_array(local, cur, i);
-    rcu_read_unlock();
+    /* Safely grab first eattr and increase its use count (rules 2 and 3) */
+    uint num_stor = ea_make_list_ours(cur, i);
+    if (num_stor == 0)
+      continue;
 
-    if (count_arr == 0)
-      continue; /* Empty linked list already */
+    struct ea_storage *local[num_stor];
+    struct ea_storage *ea_ptr = atomic_load_explicit(&cur->eas[i], memory_order_relaxed);
+    for (uint i = 0; i < num_stor; i++)
+    {
+      ASSERT_DIE(ea_ptr);
+      local[i] = ea_ptr;
+      /* We increased the use count for all of the eattrs in cur->eas[i], so we do not need to lock it again */
+      ea_ptr = atomic_load_explicit(&ea_ptr->next_hash, memory_order_relaxed);
+    }
+    ASSERT_DIE(!ea_ptr); /* Check the number of eattrs is correct. */
 
     /* now nobody can do add or delete from our chain */
     /* because each storage has to be possible to find at any time,
      * we have to rehash them backwards. We put them to local array first.
      */
-
-    ASSERT_DIE(count_arr <= num_stor); //ERROR //TODO //ERRORR //TODO //ERROR //TODO
-    /* and now we can finaly move the storages to new destination */
-    for (int arr_i = count_arr - 1; arr_i >= 0; arr_i--)
+    for (int arr_i = num_stor - 1; arr_i >= 0; arr_i--)
     {
+      //log("set i %i storage %x", arr_i, local[arr_i]);
       struct ea_storage *ea = local[arr_i];
       uint h_next = ea->hash_key >> (32 - next_order);
       struct ea_storage *next_first;
-      do
-      {
-        next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire);
-        atomic_store_explicit(&ea->next_hash, next_first, memory_order_release);
-      } while (!atomic_compare_exchange_strong_explicit(
-          &next->eas[h_next], &next_first, ea,
-          memory_order_acq_rel, memory_order_acquire));
+      rcu_read_lock(); /* We need to load next->eas[h_next] - that key might vanish if we do not lock */
+        do
+        {
+          next_first = atomic_load_explicit(&next->eas[h_next], memory_order_acquire);
+          atomic_store_explicit(&ea->next_hash, next_first, memory_order_release);
+        } while (!atomic_compare_exchange_strong_explicit(
+            &next->eas[h_next], &next_first, ea,
+            memory_order_acq_rel, memory_order_acquire));
+      rcu_read_unlock();
       /* we increased use count to all storages in local array. For this storage, it is no more needed. */
      // log("rehash free %x", ea);
       ea_storage_free(ea);
@@ -2020,7 +2080,6 @@ ea_rehash(void * u UNUSED)
   //log("rehash goes to last sync");
 
   synchronize_rcu(); /* To make sure the next rehash can not start before this one is fully accepted. */
-  log("rehashed");
 
 #if 0
   // this is for debug - shows full state of current ea_stor_array