u64 pending = (uc >> LFUC_PU_SHIFT) + 1;
uc &= LFUC_IN_PROGRESS - 1;
- /* We per-use the RCU critical section indicator to make the prune event wait
- * until we finish here in the rare case we get preempted. */
- rcu_read_lock();
-
/* Obviously, there can't be more pending unlocks than the usecount itself */
if (uc == pending)
/* If we're the last unlocker (every owner is already unlocking), schedule
* usecount, possibly allowing the pruning routine to free this structure */
uc = atomic_fetch_sub_explicit(&c->uc, LFUC_IN_PROGRESS + 1, memory_order_acq_rel);
- /* ... and to reduce the load a bit, the pruning routine will better wait for
- * RCU synchronization instead of a busy loop. */
- rcu_read_unlock();
-
// return uc - LFUC_IN_PROGRESS - 1;
}
u64 uc;
/* Wait until all unlockers finish */
while ((uc = atomic_load_explicit(&c->uc, memory_order_acquire)) >> LFUC_PU_SHIFT)
- synchronize_rcu();
+ birdloop_yield();
/* All of them are now done and if the usecount is now zero, then we're
* the last place to reference the object and we can call it finished. */
#define DOMAIN(type) struct domain__##type
#define DOMAIN_ORDER(type) OFFSETOF(struct lock_order, type)
-#define DOMAIN_NEW(type) (DOMAIN(type)) { .type = domain_new(DOMAIN_ORDER(type)) }
-struct domain_generic *domain_new(uint order);
+#define DOMAIN_NEW(type) (DOMAIN(type)) { .type = domain_new(DOMAIN_ORDER(type), 1) }
+#define DOMAIN_NEW_RCU_SYNC(type) (DOMAIN(type)) { .type = domain_new(DOMAIN_ORDER(type), 0) }
+struct domain_generic *domain_new(uint order, _Bool allow_rcu);
#define DOMAIN_FREE(type, d) domain_free((d).type)
void domain_free(struct domain_generic *);
_Atomic uint rcu_gp_ctl = RCU_NEST_CNT;
_Thread_local struct rcu_thread *this_rcu_thread = NULL;
+_Thread_local uint rcu_blocked;
static list rcu_thread_list;
void
synchronize_rcu(void)
{
+ if (!rcu_blocked && last_locked)
+ bug("Forbidden to synchronize RCU unless an appropriate lock is taken");
+
LOCK_DOMAIN(resource, rcu_domain);
update_counter_and_wait();
update_counter_and_wait();
};
extern _Thread_local struct rcu_thread *this_rcu_thread;
+extern _Thread_local uint rcu_blocked;
static inline void rcu_read_lock(void)
{
atomic_fetch_sub(&this_rcu_thread->ctl, RCU_NEST_CNT);
}
+static inline _Bool rcu_read_active(void)
+{
+ return !!(atomic_load_explicit(&this_rcu_thread->ctl, memory_order_acquire) & RCU_NEST_MASK);
+}
+
void synchronize_rcu(void);
/* Registering and unregistering a birdloop. To be called from birdloop implementation */
static inline void rt_cork_release(void)
{
if (atomic_fetch_sub_explicit(&rt_cork.active, 1, memory_order_acq_rel) == 1)
- {
- synchronize_rcu();
ev_send(&global_work_list, &rt_cork.run);
- }
}
-static inline int rt_cork_check(event *e)
+static inline _Bool rt_cork_check(event *e)
{
- rcu_read_lock();
-
int corked = (atomic_load_explicit(&rt_cork.active, memory_order_acquire) > 0);
if (corked)
ev_send(&rt_cork.queue, e);
- rcu_read_unlock();
+ if (atomic_load_explicit(&rt_cork.active, memory_order_acquire) == 0)
+ ev_send(&global_work_list, &rt_cork.run);
return corked;
}
void
rta_init(void)
{
- attrs_domain = DOMAIN_NEW(attrs);
+ attrs_domain = DOMAIN_NEW_RCU_SYNC(attrs);
RTA_LOCK;
rta_pool = rp_new(&root_pool, attrs_domain.attrs, "Attributes");
void
channel_rpe_mark_seen(struct channel *c, struct rt_pending_export *rpe)
{
- channel_trace(c, D_ROUTES, "Marking seen %p (%lu)", rpe, rpe->seq);
+ if (rpe->seq)
+ channel_trace(c, D_ROUTES, "Marking seen %p (%lu)", rpe, rpe->seq);
ASSERT_DIE(c->out_req.hook);
rpe_mark_seen(c->out_req.hook, rpe);
pool *sp = birdloop_pool(loop);
/* Create the table domain and pool */
- DOMAIN(rtable) dom = DOMAIN_NEW(rtable);
+ DOMAIN(rtable) dom = DOMAIN_NEW_RCU_SYNC(rtable);
LOCK_DOMAIN(rtable, dom);
pool *p = rp_newf(sp, dom.rtable, "Routing table data %s", cf->name);
static void
rt_cork_release_hook(void *data UNUSED)
{
- do synchronize_rcu();
+ do birdloop_yield();
while (
!atomic_load_explicit(&rt_cork.active, memory_order_acquire) &&
ev_run_list(&rt_cork.queue)
&& !ev_active(tab->hcu_event))
{
if (req->trace_routes & D_EVENTS)
- log(L_TRACE "%s requesting HCU");
+ log(L_TRACE "%s requesting HCU", req->name);
ev_send_loop(tab->loop, tab->hcu_event);
}
struct domain_generic {
pthread_mutex_t mutex;
uint order;
+ _Bool forbidden_when_reading_rcu;
struct domain_generic **prev;
struct lock_order *locked_by;
const char *name;
pool *pool;
};
-#define DOMAIN_INIT(_order) { .mutex = PTHREAD_MUTEX_INITIALIZER, .order = _order }
+#define DOMAIN_INIT(_order, _allow_rcu) { \
+ .mutex = PTHREAD_MUTEX_INITIALIZER, \
+ .order = _order, \
+ .forbidden_when_reading_rcu = !_allow_rcu, \
+}
-static struct domain_generic the_bird_domain_gen = DOMAIN_INIT(OFFSETOF(struct lock_order, the_bird));
+static struct domain_generic the_bird_domain_gen = DOMAIN_INIT(OFFSETOF(struct lock_order, the_bird), 1);
DOMAIN(the_bird) the_bird_domain = { .the_bird = &the_bird_domain_gen };
struct domain_generic *
-domain_new(uint order)
+domain_new(uint order, _Bool allow_rcu)
{
ASSERT_DIE(order < sizeof(struct lock_order));
struct domain_generic *dg = xmalloc(sizeof(struct domain_generic));
- *dg = (struct domain_generic) DOMAIN_INIT(order);
+ *dg = (struct domain_generic) DOMAIN_INIT(order, allow_rcu);
return dg;
}
memcpy(&stack_copy, &locking_stack, sizeof(stack_copy));
struct domain_generic **lll = last_locked;
+ if (dg->forbidden_when_reading_rcu)
+ if (rcu_read_active())
+ bug("Locking of this lock forbidden while RCU reader is active");
+ else
+ rcu_blocked++;
+
if ((char *) lsp - (char *) &locking_stack != dg->order)
bug("Trying to lock on bad position: order=%u, lsp=%p, base=%p", dg->order, lsp, &locking_stack);
*lsp = NULL;
dg->prev = NULL;
pthread_mutex_unlock(&dg->mutex);
+
+ if (dg->forbidden_when_reading_rcu)
+ ASSERT_DIE(rcu_blocked--);
}
static struct birdloop *
birdloop_vnew_internal(pool *pp, uint order, struct birdloop_pickup_group *group, const char *name, va_list args)
{
- struct domain_generic *dg = domain_new(order);
+ struct domain_generic *dg = domain_new(order, 1);
DG_LOCK(dg);
pool *p = rp_vnewf(pp, dg, name, args);