From: Maria Matejka Date: Fri, 31 Jan 2025 12:17:11 +0000 (+0100) Subject: RCU: Add split-sync calls X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=36cdb109bb5837f316cd2dea5089189035fecb6c;p=thirdparty%2Fbird.git RCU: Add split-sync calls Now, instead of synchronize_rcu(), one can call rcu_begin_sync(), store the value, and (probably via defer_call()) use it later to check by rcu_end_sync() that it indeed has ended. --- diff --git a/lib/rcu.c b/lib/rcu.c index 17101489e..ab7f4bd11 100644 --- a/lib/rcu.c +++ b/lib/rcu.c @@ -21,65 +21,71 @@ _Thread_local struct rcu_thread this_rcu_thread; static struct rcu_thread * _Atomic rcu_thread_list = NULL; -static _Atomic uint rcu_thread_spinlock = 0; - -static int -rcu_critical(struct rcu_thread *t, u64 phase) +bool +rcu_end_sync(struct rcu_stored_phase phase) { - uint val = atomic_load_explicit(&t->ctl, memory_order_acquire); - return - (val & RCU_NEST_MASK) /* Active */ - && ((val & ~RCU_NEST_MASK) <= phase); /* In an older phase */ -} + _Thread_local static u64 rcu_last_cleared_phase = 0; -void -synchronize_rcu(void) -{ - /* Increment phase */ - u64 phase = atomic_fetch_add_explicit(&rcu_global_phase, RCU_GP_PHASE, memory_order_acq_rel); - - while (1) { - /* Spinlock */ - while (atomic_exchange_explicit(&rcu_thread_spinlock, 1, memory_order_acq_rel)) - birdloop_yield(); - - /* Check all threads */ - bool critical = 0; - for (struct rcu_thread * _Atomic *tp = &rcu_thread_list, *t; - t = atomic_load_explicit(tp, memory_order_acquire); - tp = &t->next) - /* Found a critical */ - if (critical = rcu_critical(t, phase)) - break; - - /* Unlock */ - ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_spinlock, 0, memory_order_acq_rel)); - - /* Done if no critical */ - if (!critical) - return; + /* First check local cache */ + if ((rcu_last_cleared_phase - phase.phase) < (1ULL << 63)) + return true; - /* Wait and retry if critical */ - birdloop_yield(); + /* We read the thread list */ + rcu_read_lock(); + + /* Check all threads */ + u64 least = atomic_load_explicit(&rcu_global_phase, memory_order_acquire); + + for (struct rcu_thread * _Atomic *tp = &rcu_thread_list, *t; + t = atomic_load_explicit(tp, memory_order_acquire); + tp = &t->next) + { + /* Load the phase */ + u64 val = atomic_load_explicit(&t->ctl, memory_order_acquire); + if (val & RCU_NEST_MASK) /* Active */ + { + /* Too old phase */ + if ((phase.phase - val) < (1ULL << 63)) + { + rcu_read_unlock(); + return false; + } + + /* New enough, find oldest */ + if ((least - val) < (1ULL << 63)) + least = val & ~RCU_NEST_MASK; + } } + + rcu_read_unlock(); + + /* Store oldest */ + rcu_last_cleared_phase = least - RCU_GP_PHASE; + return true; } +static _Atomic int rcu_thread_list_writelock = 0; void rcu_thread_start(void) { - /* Insert this thread to the thread list, no spinlock is needed */ + while (atomic_exchange_explicit(&rcu_thread_list_writelock, 1, memory_order_acq_rel)) + birdloop_yield(); + + /* Insert this thread to the beginning of the thread list, no spinlock is needed */ struct rcu_thread *next = atomic_load_explicit(&rcu_thread_list, memory_order_acquire); do atomic_store_explicit(&this_rcu_thread.next, next, memory_order_relaxed); while (!atomic_compare_exchange_strong_explicit( &rcu_thread_list, &next, &this_rcu_thread, memory_order_acq_rel, memory_order_acquire)); + + ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_list_writelock, 0, memory_order_acq_rel)); } void rcu_thread_stop(void) { - /* Spinlock */ - while (atomic_exchange_explicit(&rcu_thread_spinlock, 1, memory_order_acq_rel)) + /* Assuring only one thread stopper at a time */ + while (atomic_exchange_explicit(&rcu_thread_list_writelock, 1, memory_order_acq_rel)) birdloop_yield(); /* Find this thread */ @@ -91,8 +97,13 @@ rcu_thread_stop(void) /* Remove this thread */ atomic_store_explicit(tp, atomic_load_explicit(&t->next, memory_order_acquire), memory_order_release); - /* Unlock and go */ - ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_spinlock, 0, memory_order_acq_rel)); + /* Unlock */ + ASSERT_DIE(atomic_exchange_explicit(&rcu_thread_list_writelock, 0, memory_order_acq_rel)); + + /* Wait for readers */ + synchronize_rcu(); + + /* Done */ return; } diff --git a/lib/rcu.h b/lib/rcu.h index 6a771a3a5..f3908c631 100644 --- a/lib/rcu.h +++ b/lib/rcu.h @@ -27,6 +27,11 @@ struct rcu_thread { _Atomic u64 ctl; }; +/* A structure to syntactically ensure that no other u64 gets mixed up with this. */ +struct rcu_stored_phase { + u64 phase; /* The first acceptable phase to end */ +}; + extern _Thread_local struct rcu_thread this_rcu_thread; static inline void rcu_read_lock(void) @@ -36,6 +41,8 @@ static inline void rcu_read_lock(void) /* Just nested */ u64 local_nest = this_rcu_thread.local_ctl & RCU_NEST_MASK; + if (!local_nest) + bug("RCU overnested!"); if (local_nest > RCU_NEST_CNT) return; @@ -50,7 +57,7 @@ static inline void rcu_read_lock(void) static inline void rcu_read_unlock(void) { /* Just decrement the nesting counter; when unlocked, nobody cares */ - atomic_fetch_sub_explicit(&this_rcu_thread.ctl, RCU_NEST_CNT, memory_order_acq_rel); + ASSERT_DIE(atomic_fetch_sub_explicit(&this_rcu_thread.ctl, RCU_NEST_CNT, memory_order_acq_rel) & RCU_NEST_MASK); this_rcu_thread.local_ctl--; } @@ -59,7 +66,30 @@ static inline bool rcu_read_active(void) return !!(this_rcu_thread.local_ctl & RCU_NEST_MASK); } -void synchronize_rcu(void); +/* Begin asynchronous synchronization. */ +static inline struct rcu_stored_phase rcu_begin_sync(void) +{ + return (struct rcu_stored_phase) { .phase = RCU_GP_PHASE + atomic_fetch_add_explicit(&rcu_global_phase, RCU_GP_PHASE, memory_order_acq_rel), }; +} + +/* End asynchronous synchronization. + * + * phase: what you got from rcu_begin_sync() + * wait: true to wait + * + * Returns true if the synchronization is actually done. May be retried multiple times, until true. + */ +bool rcu_end_sync(struct rcu_stored_phase phase); + +/* Synchronous synchronization. */ +static inline void +synchronize_rcu(void) +{ + struct rcu_stored_phase phase = rcu_begin_sync(); + while (!rcu_end_sync(phase)) + birdloop_yield(); +} + /* Registering and unregistering a birdloop. To be called from birdloop implementation */ void rcu_thread_start(void); diff --git a/sysdep/unix/io-loop.c b/sysdep/unix/io-loop.c index f69189e06..5332a538b 100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@ -779,7 +779,7 @@ poll_timeout(struct birdloop *loop) if (!t) { THREAD_TRACE(DL_SCHEDULING, "No timers, no events in meta"); - return -1; + return 86400 * 1000; /* Wake up at least once a day for maintenance */ } btime remains = tm_remains(t); @@ -922,6 +922,10 @@ poll_retry:; bug("poll in %p: %m", thr); } + /* Refresh the local RCU counter. This has to run at least once a century or so. */ + rcu_read_lock(); + rcu_read_unlock(); + account_to(&this_thread->overhead); birdloop_enter(thr->meta); @@ -1047,9 +1051,7 @@ bird_thread_shutdown(void * _ UNUSED) int dif = group->thread_count - thread_dropper_goal; struct birdloop *tdl_stop = NULL; - if (dif > 0) - ev_send_loop(thread_dropper, thread_dropper_event); - else + if (dif <= 0) { tdl_stop = thread_dropper; thread_dropper = NULL; @@ -1107,6 +1109,12 @@ bird_thread_shutdown(void * _ UNUSED) /* Request thread cleanup from main loop */ ev_send_loop(&main_birdloop, &thr->cleanup_event); + /* Re-schedule the thread dropper */ + ev_send_loop(thread_dropper, thread_dropper_event); + + /* Inform about the thread actually stopped */ + THREAD_TRACE(DL_SCHEDULING, "Stopped"); + /* Local pages not needed anymore */ flush_local_pages(); @@ -1114,10 +1122,10 @@ bird_thread_shutdown(void * _ UNUSED) rcu_thread_stop(); /* Now we can be cleaned up */ + thr->meta->ping_pending = 0; birdloop_leave(thr->meta); /* Exit! */ - THREAD_TRACE(DL_SCHEDULING, "Stopped"); pthread_exit(NULL); } diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index f9785c074..389254533 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -2660,6 +2660,10 @@ io_loop(void) birdloop_enter(&main_birdloop); watchdog_start(); + /* Refresh the local RCU counter. This has to run at least once a century or so. */ + rcu_read_lock(); + rcu_read_unlock(); + if (pout < 0) { if (errno == EINTR || errno == EAGAIN)