static struct rcu_thread * _Atomic rcu_thread_list = NULL;
-static _Atomic uint rcu_thread_spinlock = 0;
-
-static int
-rcu_critical(struct rcu_thread *t, u64 phase)
+bool
+rcu_end_sync(struct rcu_stored_phase phase)
{
- uint val = atomic_load_explicit(&t->ctl, memory_order_acquire);
- return
- (val & RCU_NEST_MASK) /* Active */
- && ((val & ~RCU_NEST_MASK) <= phase); /* In an older phase */
-}
+ _Thread_local static u64 rcu_last_cleared_phase = 0;
-void
-synchronize_rcu(void)
-{
- /* Increment phase */
- u64 phase = atomic_fetch_add_explicit(&rcu_global_phase, RCU_GP_PHASE, memory_order_acq_rel);
-
- while (1) {
- /* Spinlock */
- while (atomic_exchange_explicit(&rcu_thread_spinlock, 1, memory_order_acq_rel))
- birdloop_yield();
-
- /* Check all threads */
- bool critical = 0;
- for (struct rcu_thread * _Atomic *tp = &rcu_thread_list, *t;
- t = atomic_load_explicit(tp, memory_order_acquire);
- tp = &t->next)
- /* Found a critical */
- if (critical = rcu_critical(t, phase))
- break;
-
- /* Unlock */
- ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_spinlock, 0, memory_order_acq_rel));
-
- /* Done if no critical */
- if (!critical)
- return;
+ /* First check local cache */
+ if ((rcu_last_cleared_phase - phase.phase) < (1ULL << 63))
+ return true;
- /* Wait and retry if critical */
- birdloop_yield();
+ /* We read the thread list */
+ rcu_read_lock();
+
+ /* Check all threads */
+ u64 least = atomic_load_explicit(&rcu_global_phase, memory_order_acquire);
+
+ for (struct rcu_thread * _Atomic *tp = &rcu_thread_list, *t;
+ t = atomic_load_explicit(tp, memory_order_acquire);
+ tp = &t->next)
+ {
+ /* Load the phase */
+ u64 val = atomic_load_explicit(&t->ctl, memory_order_acquire);
+ if (val & RCU_NEST_MASK) /* Active */
+ {
+ /* Too old phase */
+ if ((phase.phase - val) < (1ULL << 63))
+ {
+ rcu_read_unlock();
+ return false;
+ }
+
+ /* New enough, find oldest */
+ if ((least - val) < (1ULL << 63))
+ least = val & ~RCU_NEST_MASK;
+ }
}
+
+ rcu_read_unlock();
+
+ /* Store oldest */
+ rcu_last_cleared_phase = least - RCU_GP_PHASE;
+ return true;
}
+static _Atomic int rcu_thread_list_writelock = 0;
void
rcu_thread_start(void)
{
- /* Insert this thread to the thread list, no spinlock is needed */
+ while (atomic_exchange_explicit(&rcu_thread_list_writelock, 1, memory_order_acq_rel))
+ birdloop_yield();
+
+ /* Insert this thread to the beginning of the thread list, no spinlock is needed */
struct rcu_thread *next = atomic_load_explicit(&rcu_thread_list, memory_order_acquire);
do atomic_store_explicit(&this_rcu_thread.next, next, memory_order_relaxed);
while (!atomic_compare_exchange_strong_explicit(
&rcu_thread_list, &next, &this_rcu_thread,
memory_order_acq_rel, memory_order_acquire));
+
+ ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_list_writelock, 0, memory_order_acq_rel));
}
void
rcu_thread_stop(void)
{
- /* Spinlock */
- while (atomic_exchange_explicit(&rcu_thread_spinlock, 1, memory_order_acq_rel))
+ /* Assuring only one thread stopper at a time */
+ while (atomic_exchange_explicit(&rcu_thread_list_writelock, 1, memory_order_acq_rel))
birdloop_yield();
/* Find this thread */
/* Remove this thread */
atomic_store_explicit(tp, atomic_load_explicit(&t->next, memory_order_acquire), memory_order_release);
- /* Unlock and go */
- ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_spinlock, 0, memory_order_acq_rel));
+ /* Unlock */
+ ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_list_writelock, 0, memory_order_acq_rel));
+
+ /* Wait for readers */
+ synchronize_rcu();
+
+ /* Done */
return;
}
_Atomic u64 ctl;
};
+/* A structure to syntactically ensure that no other u64 gets mixed up with this. */
+struct rcu_stored_phase {
+ u64 phase; /* The first acceptable phase to end */
+};
+
extern _Thread_local struct rcu_thread this_rcu_thread;
static inline void rcu_read_lock(void)
/* Just nested */
u64 local_nest = this_rcu_thread.local_ctl & RCU_NEST_MASK;
+ if (!local_nest)
+ bug("RCU overnested!");
if (local_nest > RCU_NEST_CNT)
return;
static inline void rcu_read_unlock(void)
{
/* Just decrement the nesting counter; when unlocked, nobody cares */
- atomic_fetch_sub_explicit(&this_rcu_thread.ctl, RCU_NEST_CNT, memory_order_acq_rel);
+ ASSERT_DIE(atomic_fetch_sub_explicit(&this_rcu_thread.ctl, RCU_NEST_CNT, memory_order_acq_rel) & RCU_NEST_MASK);
this_rcu_thread.local_ctl--;
}
return !!(this_rcu_thread.local_ctl & RCU_NEST_MASK);
}
-void synchronize_rcu(void);
+/* Begin asynchronous synchronization. */
+static inline struct rcu_stored_phase rcu_begin_sync(void)
+{
+ return (struct rcu_stored_phase) { .phase = RCU_GP_PHASE + atomic_fetch_add_explicit(&rcu_global_phase, RCU_GP_PHASE, memory_order_acq_rel), };
+}
+
+/* End asynchronous synchronization.
+ *
+ * phase: what you got from rcu_begin_sync()
+ * wait: true to wait
+ *
+ * Returns true if the synchronization is actually done. May be retried multiple times, until true.
+ */
+bool rcu_end_sync(struct rcu_stored_phase phase);
+
+/* Synchronous synchronization. */
+static inline void
+synchronize_rcu(void)
+{
+ struct rcu_stored_phase phase = rcu_begin_sync();
+ while (!rcu_end_sync(phase))
+ birdloop_yield();
+}
+
/* Registering and unregistering a birdloop. To be called from birdloop implementation */
void rcu_thread_start(void);
if (!t)
{
THREAD_TRACE(DL_SCHEDULING, "No timers, no events in meta");
- return -1;
+ return 86400 * 1000; /* Wake up at least once a day for maintenance */
}
btime remains = tm_remains(t);
bug("poll in %p: %m", thr);
}
+ /* Refresh the local RCU counter. This has to run at least once a century or so. */
+ rcu_read_lock();
+ rcu_read_unlock();
+
account_to(&this_thread->overhead);
birdloop_enter(thr->meta);
int dif = group->thread_count - thread_dropper_goal;
struct birdloop *tdl_stop = NULL;
- if (dif > 0)
- ev_send_loop(thread_dropper, thread_dropper_event);
- else
+ if (dif <= 0)
{
tdl_stop = thread_dropper;
thread_dropper = NULL;
/* Request thread cleanup from main loop */
ev_send_loop(&main_birdloop, &thr->cleanup_event);
+ /* Re-schedule the thread dropper */
+ ev_send_loop(thread_dropper, thread_dropper_event);
+
+ /* Inform about the thread actually stopped */
+ THREAD_TRACE(DL_SCHEDULING, "Stopped");
+
/* Local pages not needed anymore */
flush_local_pages();
rcu_thread_stop();
/* Now we can be cleaned up */
+ thr->meta->ping_pending = 0;
birdloop_leave(thr->meta);
/* Exit! */
- THREAD_TRACE(DL_SCHEDULING, "Stopped");
pthread_exit(NULL);
}