]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Event lists rewritten to a single linked list
authorMaria Matejka <mq@ucw.cz>
Fri, 24 Jun 2022 17:53:34 +0000 (19:53 +0200)
committerMaria Matejka <mq@ucw.cz>
Mon, 18 Jul 2022 11:28:35 +0000 (13:28 +0200)
In multithreaded environment, we need to pass messages between workers.
This is done by queuing events to their respective queues. The
double-linked list is not really useful for that as it needs locking
everywhere.

This commit rewrites the event subsystem to use a single-linked list
where events are enqueued by a single atomic instruction and the queue
is processed after atomically moving the whole queue aside.

configure.ac
lib/event.c
lib/event.h
lib/io-loop.h
lib/locking.h
nest/proto.c

index 5e5f2c4e64a4eb119aa1a89dbeca54342a9bc0ad..330add8798a085038817b3756397f2d1654e6018 100644 (file)
@@ -337,7 +337,7 @@ case $sysdesc in
     ;;
 esac
 
-AC_CHECK_HEADERS_ONCE([alloca.h syslog.h])
+AC_CHECK_HEADERS_ONCE([alloca.h syslog.h stdatomic.h])
 AC_CHECK_HEADER([sys/mman.h], [AC_DEFINE([HAVE_MMAP], [1], [Define to 1 if mmap() is available.])], have_mman=no)
 AC_CHECK_FUNC([aligned_alloc], [AC_DEFINE([HAVE_ALIGNED_ALLOC], [1], [Define to 1 if aligned_alloc() is available.])], have_aligned_alloc=no)
 AC_CHECK_MEMBERS([struct sockaddr.sa_len], [], [], [#include <sys/socket.h>])
index b0abd1d0b2ed9d2620def1372d48a19f8422e37a..7effc315378618a589553e398d035445c8d678ab 100644 (file)
 
 #include "nest/bird.h"
 #include "lib/event.h"
-#include "lib/locking.h"
 #include "lib/io-loop.h"
 
-extern _Thread_local struct coroutine *this_coro;
-
 event_list global_event_list;
 event_list global_work_list;
 
+STATIC_ASSERT(OFFSETOF(event_list, _sentinel.next) >= OFFSETOF(event_list, _end[0]));
+
+void
+ev_init_list(event_list *el, struct birdloop *loop, const char *name)
+{
+  el->name = name;
+  el->loop = loop;
+
+  atomic_store_explicit(&el->receiver, &el->_sentinel, memory_order_relaxed);
+  atomic_store_explicit(&el->_executor, &el->_sentinel, memory_order_relaxed);
+  atomic_store_explicit(&el->_sentinel.next, NULL, memory_order_relaxed);
+}
+
+/*
+ * The event list should work as a message passing point. Sending a message
+ * must be a fairly fast process with no locks and low waiting times. OTOH,
+ * processing messages always involves running the assigned code and the
+ * receiver is always a single one thread with no concurrency at all. There is
+ * also a postponing requirement to synchronously remove an event from a queue,
+ * yet we allow this only when the caller has its receiver event loop locked.
+ * It still means that the event may get postponed from other event in the same
+ * list, therefore we have to be careful.
+ */
+
+static inline int
+ev_remove_from(event *e, event * _Atomic * head)
+{
+  /* The head pointer stores where cur is pointed to from */
+  event * _Atomic *prev = head;
+
+  /* The current event in queue to check */
+  event *cur = atomic_load_explicit(prev, memory_order_acquire);
+
+  /* Pre-loaded next pointer; if NULL, this is sentinel */
+  event *next = atomic_load_explicit(&cur->next, memory_order_acquire);
+
+  while (next)
+  {
+    if (e == cur)
+    {
+      /* Check whether we have collided with somebody else
+       * adding an item to the queue. */
+      if (!atomic_compare_exchange_strong_explicit(
+           prev, &cur, next,
+           memory_order_acq_rel, memory_order_acquire))
+      {
+       /* This may happen only on list head */
+       ASSERT_DIE(prev == head);
+
+       /* Restart. The collision should never happen again. */
+       return ev_remove_from(e, head);
+      }
+
+      /* Successfully removed from the list; inactivate this event. */
+      atomic_store_explicit(&cur->next, NULL, memory_order_release);
+      return 1;
+    }
+
+    /* Go to the next event. */
+    prev = &cur->next;
+    cur = next;
+    next = atomic_load_explicit(&cur->next, memory_order_acquire);
+  }
+
+  return 0;
+}
+
 inline void
 ev_postpone(event *e)
 {
-  event_list *el = e->list;
-  if (!el)
+  /* Find the list to remove the event from */
+  event_list *sl = ev_get_list(e);
+  if (!sl)
     return;
 
-  ASSERT_DIE(birdloop_inside(el->loop));
+  /* Postponing allowed only from the target loop */
+  ASSERT_DIE(birdloop_inside(sl->loop));
 
-  LOCK_DOMAIN(event, el->lock);
-  if (ev_active(e))
-    rem_node(&e->n);
-  UNLOCK_DOMAIN(event, el->lock);
+  /* Remove from one of these lists. */
+  ASSERT(ev_remove_from(e, &sl->_executor) || ev_remove_from(e, &sl->receiver));
 }
 
 static void
@@ -54,7 +118,7 @@ ev_dump(resource *r)
   debug("(code %p, data %p, %s)\n",
        e->hook,
        e->data,
-       e->n.next ? "scheduled" : "inactive");
+       atomic_load_explicit(&e->next, memory_order_relaxed) ? "scheduled" : "inactive");
 }
 
 static struct resclass ev_class = {
@@ -108,23 +172,17 @@ ev_run(event *e)
 inline void
 ev_send(event_list *l, event *e)
 {
-  DBG("ev_send(%p, %p)\n", l, e);
-  ASSERT_DIE(e->hook);
-  ASSERT_DIE(!e->list || (e->list == l) || (e->list->loop == l->loop));
-
-  e->list = l;
-
-  LOCK_DOMAIN(event, l->lock);
-  if (enlisted(&e->n))
-  {
-    UNLOCK_DOMAIN(event, l->lock);
+  event_list *sl = ev_get_list(e);
+  if (sl == l)
     return;
-  }
-
-  add_tail(&l->events, &e->n);
-  UNLOCK_DOMAIN(event, l->lock);
-
-  birdloop_ping(l->loop);
+  if (sl)
+    bug("Queuing an already queued event to another queue is not supported.");
+
+  event *next = atomic_load_explicit(&l->receiver, memory_order_acquire);
+  do atomic_store_explicit(&e->next, next, memory_order_relaxed);
+  while (!atomic_compare_exchange_strong_explicit(
+       &l->receiver, &next, e,
+       memory_order_acq_rel, memory_order_acquire));
 }
 
 void io_log_event(void *hook, void *data);
@@ -136,93 +194,56 @@ void io_log_event(void *hook, void *data);
  * This function calls ev_run() for all events enqueued in the list @l.
  */
 int
-ev_run_list(event_list *l)
+ev_run_list_limited(event_list *l, uint limit)
 {
-  const _Bool legacy = LEGACY_EVENT_LIST(l);
-
-  if (legacy)
-    ASSERT_THE_BIRD_LOCKED;
+  event * _Atomic *ep = &l->_executor;
 
-  node *n;
+  /* No pending events, refill the queue. */
+  if (atomic_load_explicit(ep, memory_order_relaxed) == &l->_sentinel)
+  {
+    /* Move the current event list aside and create a new one. */
+    event *received = atomic_exchange_explicit(
+       &l->receiver, &l->_sentinel, memory_order_acq_rel);
 
-  list tmp_list;
-  init_list(&tmp_list);
+    /* No event to run. */
+    if (received == &l->_sentinel)
+      return 0;
 
-  /* Move the event list contents to a local list to avoid executing repeatedly added events */
-  LOCK_DOMAIN(event, l->lock);
-  add_tail_list(&tmp_list, &l->events);
-  init_list(&l->events);
-  UNLOCK_DOMAIN(event, l->lock);
+    /* Setup the executor queue */
+    event *head = &l->_sentinel;
 
-  WALK_LIST_FIRST(n, tmp_list)
+    /* Flip the order of the events by relinking them one by one (push-pop) */
+    while (received != &l->_sentinel)
     {
-      event *e = SKIP_BACK(event, n, n);
-
-      if (legacy)
-      {
-       /* The legacy way of event execution */
-       io_log_event(e->hook, e->data);
-       ev_postpone(e);
-       e->hook(e->data);
-      }
-      else
-      {
-       // io_log_event(e->hook, e->data); /* TODO: add support for event logging in other io loops */
-       ASSERT_DIE(e->list == l);
-       LOCK_DOMAIN(event, l->lock);
-       rem_node(&e->n);
-       UNLOCK_DOMAIN(event, l->lock);
-       e->hook(e->data);
-      }
-      tmp_flush();
+      event *cur = received;
+      received = atomic_exchange_explicit(&cur->next, head, memory_order_relaxed);
+      head = cur;
     }
 
-  LOCK_DOMAIN(event, l->lock);
-  int repeat = ! EMPTY_LIST(l->events);
-  UNLOCK_DOMAIN(event, l->lock);
-  return repeat;
-}
-
-int
-ev_run_list_limited(event_list *l, uint limit)
-{
-  ASSERT_DIE(LEGACY_EVENT_LIST(l));
-  ASSERT_THE_BIRD_LOCKED;
-
-  node *n;
-  list tmp_list;
-
-  LOCK_DOMAIN(event, l->lock);
-  init_list(&tmp_list);
-  add_tail_list(&tmp_list, &l->events);
-  init_list(&l->events);
-  UNLOCK_DOMAIN(event, l->lock);
+    /* Store the executor queue to its designated place */
+    atomic_store_explicit(ep, head, memory_order_relaxed);
+  }
 
-  WALK_LIST_FIRST(n, tmp_list)
+  /* Run the events in order. */
+  event *e;
+  while ((e = atomic_load_explicit(ep, memory_order_relaxed)) != &l->_sentinel)
     {
-      event *e = SKIP_BACK(event, n, n);
+      /* Check limit */
+      if (!--limit)
+       return 1;
 
-      if (!limit)
-       break;
+      /* This is ugly hack, we want to log just events executed from the main I/O loop */
+      if ((l == &global_event_list) || (l == &global_work_list))
+       io_log_event(e->hook, e->data);
 
-      io_log_event(e->hook, e->data);
+      /* Inactivate the event */
+      atomic_store_explicit(ep, atomic_load_explicit(&e->next, memory_order_relaxed), memory_order_relaxed);
+      atomic_store_explicit(&e->next, NULL, memory_order_relaxed);
 
-      ev_run(e);
+      /* Run the event */
+      e->hook(e->data);
       tmp_flush();
-      limit--;
     }
 
-  LOCK_DOMAIN(event, l->lock);
-  if (!EMPTY_LIST(tmp_list))
-  {
-    /* Attach new items after the unprocessed old items */
-    add_tail_list(&tmp_list, &l->events);
-    init_list(&l->events);
-    add_tail_list(&l->events, &tmp_list);
-  }
-
-  int repeat = ! EMPTY_LIST(l->events);
-  UNLOCK_DOMAIN(event, l->lock);
-
-  return repeat;
+  return atomic_load_explicit(&l->receiver, memory_order_relaxed) != &l->_sentinel;
 }
index 6c358f84b59aab8446978599cc21e2022a83b409..9773c3a97014446792f297f4b554fb0ff6519949 100644 (file)
 
 #include <stdatomic.h>
 
-DEFINE_DOMAIN(event);
+struct birdloop;
 
 typedef struct event {
   resource r;
   void (*hook)(void *);
   void *data;
-  node n;                              /* Internal link */
-  struct event_list *list;             /* List where this event is put in */
+  struct event * _Atomic next;
 } event;
 
-typedef struct event_list {
-  list events;
-  pool *pool;
-  struct birdloop *loop;
-  DOMAIN(event) lock;
+typedef union event_list {
+  struct {
+    event * _Atomic receiver;  /* Event receive list */
+    event * _Atomic _executor; /* Event execute list */
+    const char *name;
+    struct birdloop *loop;     /* The executor loop */
+    char _end[0];
+  };
+  event _sentinel;     /* Sentinel node to actively detect list end */
 } event_list;
 
 extern event_list global_event_list;
@@ -36,36 +39,40 @@ extern event_list global_work_list;
 
 event *ev_new(pool *);
 void ev_run(event *);
-
-static inline void ev_init_list(event_list *el, struct birdloop *loop, const char *name)
-{
-  init_list(&el->events);
-  el->loop = loop;
-  el->lock = DOMAIN_NEW(event, name);
-}
-
-void ev_send(event_list *, event *);
+void ev_init_list(event_list *, struct birdloop *loop, const char *name);
+void ev_enqueue(event_list *, event *);
+#define ev_send ev_enqueue
 #define ev_send_loop(l, e) ev_send(birdloop_event_list((l)), (e))
 
 #define ev_schedule(e) ({ ASSERT_THE_BIRD_LOCKED; if (!ev_active((e))) ev_send(&global_event_list, (e)); })
 #define ev_schedule_work(e) ({ ASSERT_THE_BIRD_LOCKED; if (!ev_active((e))) ev_send(&global_work_list, (e)); })
 
 void ev_postpone(event *);
-int ev_run_list(event_list *);
 int ev_run_list_limited(event_list *, uint);
+#define ev_run_list(l) ev_run_list_limited((l), ~0)
 
 #define LEGACY_EVENT_LIST(l)  (((l) == &global_event_list) || ((l) == &global_work_list))
 
-_Bool birdloop_inside(struct birdloop *loop);
-
 static inline int
 ev_active(event *e)
 {
-  if (e->list == NULL)
-    return 0;
+  return atomic_load_explicit(&e->next, memory_order_relaxed) != NULL;
+}
 
-  ASSERT_DIE(birdloop_inside(e->list->loop));
-  return enlisted(&e->n);
+static inline event_list *
+ev_get_list(event *e)
+{
+  /* We are looking for the sentinel node at the list end.
+   * After this, we have s->next == NULL */
+  event *s = e;
+  for (event *sn; sn = atomic_load_explicit(&s->next, memory_order_acquire); s = sn)
+    ;
+
+  /* No sentinel, no list. */
+  if (s == e)
+    return NULL;
+  else
+    return SKIP_BACK(event_list, _sentinel, s);
 }
 
 static inline event*
index 25f1b2a36f5f4fb73ee932e76b5b5c74fe4521d4..dec7d040e8175bd68cb9fcb46f23eea766404bea 100644 (file)
 #include "lib/event.h"
 #include "lib/socket.h"
 
+extern struct birdloop main_birdloop;
+
 void sk_start(sock *s);
 void sk_stop(sock *s);
 void sk_reloop(sock *s, struct birdloop *loop);
 
-extern struct birdloop main_birdloop;
-
 /* Start a new birdloop owned by given pool and domain */
 struct birdloop *birdloop_new(pool *p, uint order, const char *name);
 
index ab5c06afce78363838638799c30a8c8f96e656d2..8ea1c9685b08645501d0f963752ccf64bc948abc 100644 (file)
@@ -16,7 +16,6 @@ struct lock_order {
   struct domain_generic *the_bird;
   struct domain_generic *proto;
   struct domain_generic *rtable;
-  struct domain_generic *event;
 };
 
 extern _Thread_local struct lock_order locking_stack;
index e9bced3bf539be33646316bfff0146ba705f9821..17cd08f7998b141d9e6bac53e256ecc367b730a5 100644 (file)
@@ -1072,7 +1072,6 @@ proto_loop_stopped(void *ptr)
   birdloop_enter(&main_birdloop);
 
   p->loop = &main_birdloop;
-  p->event->list = NULL;
   proto_cleanup(p);
 
   birdloop_leave(&main_birdloop);
@@ -1163,8 +1162,6 @@ proto_start(struct proto *p)
   if (p->cf->loop_order != DOMAIN_ORDER(the_bird))
     p->loop = birdloop_new(p->pool, p->cf->loop_order, p->pool->name);
 
-  p->event->list = proto_event_list(p);
-
   PROTO_LOCKED_FROM_MAIN(p)
     proto_notify_state(p, (p->proto->start ? p->proto->start(p) : PS_UP));
 }