]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Coroutines: A simple and lightweight parallel execution framework.
authorMaria Matejka <mq@ucw.cz>
Mon, 8 Feb 2021 08:51:59 +0000 (09:51 +0100)
committerMaria Matejka <mq@ucw.cz>
Wed, 2 Jun 2021 12:28:14 +0000 (14:28 +0200)
lib/coro.h [new file with mode: 0644]
sysdep/unix/coroutine.c
sysdep/unix/io.c
sysdep/unix/unix.h

diff --git a/lib/coro.h b/lib/coro.h
new file mode 100644 (file)
index 0000000..51712b3
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ *     BIRD Coroutines
+ *
+ *     (c) 2017 Martin Mares <mj@ucw.cz>
+ *     (c) 2020 Maria Matejka <mq@jmq.cz>
+ *
+ *     Can be freely distributed and used under the terms of the GNU GPL.
+ */
+
+#ifndef _BIRD_CORO_H_
+#define _BIRD_CORO_H_
+
+#include "lib/resource.h"
+
+/* A completely opaque coroutine handle. */
+struct coroutine;
+
+/* Coroutines are independent threads bound to pools.
+ * You request a coroutine by calling coro_run().
+ * It is forbidden to free a running coroutine from outside.
+ * The running coroutine must free itself by rfree() before returning.
+ */
+struct coroutine *coro_run(pool *, void (*entry)(void *), void *data);
+
+
+#endif
index 05f101fbb477eac0af861888d3e1aa768e2fabd3..bf5b09db6314052cb279f9a88aa0437b2b71eec1 100644 (file)
 
 #include "lib/birdlib.h"
 #include "lib/locking.h"
+#include "lib/coro.h"
 #include "lib/resource.h"
+#include "lib/timer.h"
+
+/* Using a rather big stack for coroutines to allow for stack-local allocations.
+ * In real world, the kernel doesn't alloc this memory until it is used.
+ * */
+#define CORO_STACK_SIZE        1048576
 
 /*
  *     Implementation of coroutines based on POSIX threads
@@ -100,3 +107,69 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp)
   pthread_mutex_unlock(&dg->mutex);
 }
 
+/* Coroutines */
+struct coroutine {
+  resource r;
+  pthread_t id;
+  pthread_attr_t attr;
+  void (*entry)(void *);
+  void *data;
+};
+
+static _Thread_local _Bool coro_cleaned_up = 0;
+
+static void coro_free(resource *r)
+{
+  struct coroutine *c = (void *) r;
+  ASSERT_DIE(pthread_equal(pthread_self(), c->id));
+  pthread_attr_destroy(&c->attr);
+  coro_cleaned_up = 1;
+}
+
+static struct resclass coro_class = {
+  .name = "Coroutine",
+  .size = sizeof(struct coroutine),
+  .free = coro_free,
+};
+
+extern pthread_key_t current_time_key;
+
+static void *coro_entry(void *p)
+{
+  struct coroutine *c = p;
+  ASSERT_DIE(c->entry);
+
+  pthread_setspecific(current_time_key, &main_timeloop);
+
+  c->entry(c->data);
+  ASSERT_DIE(coro_cleaned_up);
+
+  return NULL;
+}
+
+struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data)
+{
+  ASSERT_DIE(entry);
+  ASSERT_DIE(p);
+
+  struct coroutine *c = ralloc(p, &coro_class);
+
+  c->entry = entry;
+  c->data = data;
+
+  int e = 0;
+
+  if (e = pthread_attr_init(&c->attr))
+    die("pthread_attr_init() failed: %M", e);
+
+  if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE))
+    die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e);
+
+  if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED))
+    die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
+
+  if (e = pthread_create(&c->id, &c->attr, coro_entry, c))
+    die("pthread_create() failed: %M", e);
+
+  return c;
+}
index 2946786795a4beffe83dc8dafc07fd0124d33e8b..40841ea4482803d0c84b5c5236baa8f0d77863c1 100644 (file)
@@ -2176,6 +2176,15 @@ static int short_loops = 0;
 #define SHORT_LOOP_MAX 10
 #define WORK_EVENTS_MAX 10
 
+static int poll_reload_pipe[2];
+
+void
+io_loop_reload(void)
+{
+  char b;
+  write(poll_reload_pipe[1], &b, 1);
+}
+
 void
 io_loop(void)
 {
@@ -2187,6 +2196,9 @@ io_loop(void)
   int fdmax = 256;
   struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
 
+  if (pipe(poll_reload_pipe) < 0)
+    die("pipe(poll_reload_pipe) failed: %m");
+
   watchdog_start1();
   for(;;)
     {
@@ -2205,7 +2217,12 @@ io_loop(void)
        poll_tout = MIN(poll_tout, timeout);
       }
 
-      nfds = 0;
+      /* A hack to reload main io_loop() when something has changed asynchronously. */
+      pfd[0].fd = poll_reload_pipe[0];
+      pfd[0].events = POLLIN;
+
+      nfds = 1;
+
       WALK_LIST(n, sock_list)
        {
          pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
@@ -2277,6 +2294,14 @@ io_loop(void)
        }
       if (pout)
        {
+         if (pfd[0].revents & POLLIN)
+         {
+           /* IO loop reload requested */
+           char b;
+           read(poll_reload_pipe[0], &b, 1);
+           continue;
+         }
+
          times_update(&main_timeloop);
 
          /* guaranteed to be non-empty */
index ad85d1ea5fd6dd4849acda381e32202e04525894..313c97c36451ea219413962dcd8b5944a0677945 100644 (file)
@@ -106,6 +106,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
 
 void io_init(void);
 void io_loop(void);
+void io_loop_reload(void);
 void io_log_dump(void);
 int sk_open_unix(struct birdsock *s, char *name);
 struct rfile *rf_open(struct pool *, const char *name, const char *mode);