]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
RCU: Add split-sync calls
authorMaria Matejka <mq@ucw.cz>
Fri, 31 Jan 2025 12:17:11 +0000 (13:17 +0100)
committerMaria Matejka <mq@ucw.cz>
Wed, 19 Feb 2025 08:42:45 +0000 (09:42 +0100)
Now, instead of synchronize_rcu(), one can call rcu_begin_sync(),
store the value, and (probably via defer_call()) use it later
to check by rcu_end_sync() that it indeed has ended.

lib/rcu.c
lib/rcu.h
sysdep/unix/io-loop.c
sysdep/unix/io.c

index 17101489e148997f497c27e1c3e989e66c909edb..ab7f4bd1131edd360ed3e411c501af63384b762a 100644 (file)
--- a/lib/rcu.c
+++ b/lib/rcu.c
@@ -21,65 +21,71 @@ _Thread_local struct rcu_thread this_rcu_thread;
 
 static struct rcu_thread * _Atomic rcu_thread_list = NULL;
 
-static _Atomic uint rcu_thread_spinlock = 0;
-
-static int
-rcu_critical(struct rcu_thread *t, u64 phase)
+bool
+rcu_end_sync(struct rcu_stored_phase phase)
 {
-  uint val = atomic_load_explicit(&t->ctl, memory_order_acquire);
-  return
-    (val & RCU_NEST_MASK) /* Active */
-    && ((val & ~RCU_NEST_MASK) <= phase); /* In an older phase */
-}
+  _Thread_local static u64 rcu_last_cleared_phase = 0;
 
-void
-synchronize_rcu(void)
-{
-  /* Increment phase */
-  u64 phase = atomic_fetch_add_explicit(&rcu_global_phase, RCU_GP_PHASE, memory_order_acq_rel);
-
-  while (1) {
-    /* Spinlock */
-    while (atomic_exchange_explicit(&rcu_thread_spinlock, 1, memory_order_acq_rel))
-      birdloop_yield();
-
-    /* Check all threads */
-    bool critical = 0;
-    for (struct rcu_thread * _Atomic *tp = &rcu_thread_list, *t;
-       t = atomic_load_explicit(tp, memory_order_acquire);
-       tp = &t->next)
-      /* Found a critical */
-      if (critical = rcu_critical(t, phase))
-       break;
-
-    /* Unlock */
-    ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_spinlock, 0, memory_order_acq_rel));
-
-    /* Done if no critical */
-    if (!critical)
-      return;
+  /* First check local cache */
+  if ((rcu_last_cleared_phase - phase.phase) < (1ULL << 63))
+    return true;
 
-    /* Wait and retry if critical */
-    birdloop_yield();
+  /* We read the thread list */
+  rcu_read_lock();
+
+  /* Check all threads */
+  u64 least = atomic_load_explicit(&rcu_global_phase, memory_order_acquire);
+
+  for (struct rcu_thread * _Atomic *tp = &rcu_thread_list, *t;
+      t = atomic_load_explicit(tp, memory_order_acquire);
+      tp = &t->next)
+  {
+    /* Load the phase */
+    u64 val = atomic_load_explicit(&t->ctl, memory_order_acquire);
+    if (val & RCU_NEST_MASK) /* Active */
+    {
+      /* Too old phase */
+      if ((phase.phase - val) < (1ULL << 63))
+      {
+       rcu_read_unlock();
+       return false;
+      }
+
+      /* New enough, find oldest */
+      if ((least - val) < (1ULL << 63))
+       least = val & ~RCU_NEST_MASK;
+    }
   }
+
+  rcu_read_unlock();
+
+  /* Store oldest */
+  rcu_last_cleared_phase = least - RCU_GP_PHASE;
+  return true;
 }
 
+static _Atomic int rcu_thread_list_writelock = 0;
 void
 rcu_thread_start(void)
 {
-  /* Insert this thread to the thread list, no spinlock is needed */
+  while (atomic_exchange_explicit(&rcu_thread_list_writelock, 1, memory_order_acq_rel))
+    birdloop_yield();
+
+  /* Insert this thread to the beginning of the thread list, no spinlock is needed */
   struct rcu_thread *next = atomic_load_explicit(&rcu_thread_list, memory_order_acquire);
   do atomic_store_explicit(&this_rcu_thread.next, next, memory_order_relaxed);
   while (!atomic_compare_exchange_strong_explicit(
        &rcu_thread_list, &next, &this_rcu_thread,
        memory_order_acq_rel, memory_order_acquire));
+
+  ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_list_writelock, 0, memory_order_acq_rel));
 }
 
 void
 rcu_thread_stop(void)
 {
-  /* Spinlock */
-  while (atomic_exchange_explicit(&rcu_thread_spinlock, 1, memory_order_acq_rel))
+  /* Assuring only one thread stopper at a time */
+  while (atomic_exchange_explicit(&rcu_thread_list_writelock, 1, memory_order_acq_rel))
     birdloop_yield();
 
   /* Find this thread */
@@ -91,8 +97,13 @@ rcu_thread_stop(void)
       /* Remove this thread */
       atomic_store_explicit(tp, atomic_load_explicit(&t->next, memory_order_acquire), memory_order_release);
 
-      /* Unlock and go */
-      ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_spinlock, 0, memory_order_acq_rel));
+      /* Unlock */
+      ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_list_writelock, 0, memory_order_acq_rel));
+
+      /* Wait for readers */
+      synchronize_rcu();
+
+      /* Done */
       return;
     }
 
index 6a771a3a5e30c09b44c13f8372e0c5f9ed8def3a..f3908c631eb5779c5b6257046dcc8ab13a93bf4a 100644 (file)
--- a/lib/rcu.h
+++ b/lib/rcu.h
@@ -27,6 +27,11 @@ struct rcu_thread {
   _Atomic u64 ctl;
 };
 
+/* A structure to syntactically ensure that no other u64 gets mixed up with this. */
+struct rcu_stored_phase {
+  u64 phase; /* The first acceptable phase to end */
+};
+
 extern _Thread_local struct rcu_thread this_rcu_thread;
 
 static inline void rcu_read_lock(void)
@@ -36,6 +41,8 @@ static inline void rcu_read_lock(void)
 
   /* Just nested */
   u64 local_nest = this_rcu_thread.local_ctl & RCU_NEST_MASK;
+  if (!local_nest)
+    bug("RCU overnested!");
   if (local_nest > RCU_NEST_CNT)
     return;
 
@@ -50,7 +57,7 @@ static inline void rcu_read_lock(void)
 static inline void rcu_read_unlock(void)
 {
   /* Just decrement the nesting counter; when unlocked, nobody cares */
-  atomic_fetch_sub_explicit(&this_rcu_thread.ctl, RCU_NEST_CNT, memory_order_acq_rel);
+  ASSERT_DIE(atomic_fetch_sub_explicit(&this_rcu_thread.ctl, RCU_NEST_CNT, memory_order_acq_rel) & RCU_NEST_MASK);
   this_rcu_thread.local_ctl--;
 }
 
@@ -59,7 +66,30 @@ static inline bool rcu_read_active(void)
   return !!(this_rcu_thread.local_ctl & RCU_NEST_MASK);
 }
 
-void synchronize_rcu(void);
+/* Begin asynchronous synchronization. */
+static inline struct rcu_stored_phase rcu_begin_sync(void)
+{
+  return (struct rcu_stored_phase) { .phase = RCU_GP_PHASE + atomic_fetch_add_explicit(&rcu_global_phase, RCU_GP_PHASE, memory_order_acq_rel), };
+}
+
+/* End asynchronous synchronization.
+ *
+ * phase: what you got from rcu_begin_sync()
+ * wait: true to wait
+ *
+ * Returns true if the synchronization is actually done. May be retried multiple times, until true.
+ */
+bool rcu_end_sync(struct rcu_stored_phase phase);
+
+/* Synchronous synchronization. */
+static inline void
+synchronize_rcu(void)
+{
+  struct rcu_stored_phase phase = rcu_begin_sync();
+  while (!rcu_end_sync(phase))
+    birdloop_yield();
+}
+
 
 /* Registering and unregistering a birdloop. To be called from birdloop implementation */
 void rcu_thread_start(void);
index f69189e06187762bf8200335b5bff0afaf159d90..5332a538b3de22521dff7ae134eb44d2c2cc5e4a 100644 (file)
@@ -779,7 +779,7 @@ poll_timeout(struct birdloop *loop)
   if (!t)
   {
     THREAD_TRACE(DL_SCHEDULING, "No timers, no events in meta");
-    return -1;
+    return 86400 * 1000; /* Wake up at least once a day for maintenance */
   }
 
   btime remains = tm_remains(t);
@@ -922,6 +922,10 @@ poll_retry:;
       bug("poll in %p: %m", thr);
     }
 
+    /* Refresh the local RCU counter. This has to run at least once a century or so. */
+    rcu_read_lock();
+    rcu_read_unlock();
+
     account_to(&this_thread->overhead);
     birdloop_enter(thr->meta);
 
@@ -1047,9 +1051,7 @@ bird_thread_shutdown(void * _ UNUSED)
   int dif = group->thread_count - thread_dropper_goal;
   struct birdloop *tdl_stop = NULL;
 
-  if (dif > 0)
-    ev_send_loop(thread_dropper, thread_dropper_event);
-  else
+  if (dif <= 0)
   {
     tdl_stop = thread_dropper;
     thread_dropper = NULL;
@@ -1107,6 +1109,12 @@ bird_thread_shutdown(void * _ UNUSED)
   /* Request thread cleanup from main loop */
   ev_send_loop(&main_birdloop, &thr->cleanup_event);
 
+  /* Re-schedule the thread dropper */
+  ev_send_loop(thread_dropper, thread_dropper_event);
+
+  /* Inform about the thread actually stopped */
+  THREAD_TRACE(DL_SCHEDULING, "Stopped");
+
   /* Local pages not needed anymore */
   flush_local_pages();
 
@@ -1114,10 +1122,10 @@ bird_thread_shutdown(void * _ UNUSED)
   rcu_thread_stop();
 
   /* Now we can be cleaned up */
+  thr->meta->ping_pending = 0;
   birdloop_leave(thr->meta);
 
   /* Exit! */
-  THREAD_TRACE(DL_SCHEDULING, "Stopped");
   pthread_exit(NULL);
 }
 
index f9785c074e834896c477983fd722d8ee62066f5c..389254533ba00a17005e31c8a7df8f5fbdf722f2 100644 (file)
@@ -2660,6 +2660,10 @@ io_loop(void)
       birdloop_enter(&main_birdloop);
       watchdog_start();
 
+      /* Refresh the local RCU counter. This has to run at least once a century or so. */
+      rcu_read_lock();
+      rcu_read_unlock();
+
       if (pout < 0)
        {
          if (errno == EINTR || errno == EAGAIN)