]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Coroutines: A simple and lightweight parallel execution framework.
authorMaria Matejka <mq@ucw.cz>
Mon, 8 Feb 2021 08:51:59 +0000 (09:51 +0100)
committerMaria Matejka <mq@ucw.cz>
Mon, 22 Nov 2021 18:05:43 +0000 (19:05 +0100)
lib/coro.h [new file with mode: 0644]
sysdep/unix/coroutine.c
sysdep/unix/io.c
sysdep/unix/log.c
sysdep/unix/unix.h

diff --git a/lib/coro.h b/lib/coro.h
new file mode 100644 (file)
index 0000000..51712b3
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ *     BIRD Coroutines
+ *
+ *     (c) 2017 Martin Mares <mj@ucw.cz>
+ *     (c) 2020 Maria Matejka <mq@jmq.cz>
+ *
+ *     Can be freely distributed and used under the terms of the GNU GPL.
+ */
+
+#ifndef _BIRD_CORO_H_
+#define _BIRD_CORO_H_
+
+#include "lib/resource.h"
+
+/* A completely opaque coroutine handle. */
+struct coroutine;
+
+/* Coroutines are independent threads bound to pools.
+ * You request a coroutine by calling coro_run().
+ * It is forbidden to free a running coroutine from outside.
+ * The running coroutine must free itself by rfree() before returning.
+ */
+struct coroutine *coro_run(pool *, void (*entry)(void *), void *data);
+
+
+#endif
index 05f101fbb477eac0af861888d3e1aa768e2fabd3..718475055d8c89a90f54e993d3e2034a1a88f30a 100644 (file)
 
 #include "lib/birdlib.h"
 #include "lib/locking.h"
+#include "lib/coro.h"
 #include "lib/resource.h"
+#include "lib/timer.h"
+
+/* Using a rather big stack for coroutines to allow for stack-local allocations.
+ * In real world, the kernel doesn't alloc this memory until it is used.
+ * */
+#define CORO_STACK_SIZE        1048576
 
 /*
  *     Implementation of coroutines based on POSIX threads
@@ -100,3 +107,65 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp)
   pthread_mutex_unlock(&dg->mutex);
 }
 
+/* Coroutines */
+struct coroutine {
+  resource r;
+  pthread_t id;
+  pthread_attr_t attr;
+  void (*entry)(void *);
+  void *data;
+};
+
+static _Thread_local _Bool coro_cleaned_up = 0;
+
+static void coro_free(resource *r)
+{
+  struct coroutine *c = (void *) r;
+  ASSERT_DIE(pthread_equal(pthread_self(), c->id));
+  pthread_attr_destroy(&c->attr);
+  coro_cleaned_up = 1;
+}
+
+static struct resclass coro_class = {
+  .name = "Coroutine",
+  .size = sizeof(struct coroutine),
+  .free = coro_free,
+};
+
+static void *coro_entry(void *p)
+{
+  struct coroutine *c = p;
+  ASSERT_DIE(c->entry);
+
+  c->entry(c->data);
+  ASSERT_DIE(coro_cleaned_up);
+
+  return NULL;
+}
+
+struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data)
+{
+  ASSERT_DIE(entry);
+  ASSERT_DIE(p);
+
+  struct coroutine *c = ralloc(p, &coro_class);
+
+  c->entry = entry;
+  c->data = data;
+
+  int e = 0;
+
+  if (e = pthread_attr_init(&c->attr))
+    die("pthread_attr_init() failed: %M", e);
+
+  if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE))
+    die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e);
+
+  if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED))
+    die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e);
+
+  if (e = pthread_create(&c->id, &c->attr, coro_entry, c))
+    die("pthread_create() failed: %M", e);
+
+  return c;
+}
index 2946786795a4beffe83dc8dafc07fd0124d33e8b..40841ea4482803d0c84b5c5236baa8f0d77863c1 100644 (file)
@@ -2176,6 +2176,15 @@ static int short_loops = 0;
 #define SHORT_LOOP_MAX 10
 #define WORK_EVENTS_MAX 10
 
+static int poll_reload_pipe[2];
+
+void
+io_loop_reload(void)
+{
+  char b;
+  write(poll_reload_pipe[1], &b, 1);
+}
+
 void
 io_loop(void)
 {
@@ -2187,6 +2196,9 @@ io_loop(void)
   int fdmax = 256;
   struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd));
 
+  if (pipe(poll_reload_pipe) < 0)
+    die("pipe(poll_reload_pipe) failed: %m");
+
   watchdog_start1();
   for(;;)
     {
@@ -2205,7 +2217,12 @@ io_loop(void)
        poll_tout = MIN(poll_tout, timeout);
       }
 
-      nfds = 0;
+      /* A hack to reload main io_loop() when something has changed asynchronously. */
+      pfd[0].fd = poll_reload_pipe[0];
+      pfd[0].events = POLLIN;
+
+      nfds = 1;
+
       WALK_LIST(n, sock_list)
        {
          pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */
@@ -2277,6 +2294,14 @@ io_loop(void)
        }
       if (pout)
        {
+         if (pfd[0].revents & POLLIN)
+         {
+           /* IO loop reload requested */
+           char b;
+           read(poll_reload_pipe[0], &b, 1);
+           continue;
+         }
+
          times_update(&main_timeloop);
 
          /* guaranteed to be non-empty */
index a23903b74bf8c15328706e58cd96c2fab34b38cb..dc2b14b37c481b42a8ee65af7a0b2608679dafe9 100644 (file)
@@ -15,6 +15,7 @@
  * user's manual.
  */
 
+#include <stdatomic.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdarg.h>
@@ -35,6 +36,10 @@ static FILE *dbgf;
 static list *current_log_list;
 static char *current_syslog_name; /* NULL -> syslog closed */
 
+static _Atomic uint max_coro_id = ATOMIC_VAR_INIT(1);
+static _Thread_local uint this_coro_id;
+
+#define THIS_CORO_ID  (this_coro_id ?: (this_coro_id = atomic_fetch_add_explicit(&max_coro_id, 1, memory_order_acq_rel)))
 
 #include <pthread.h>
 
@@ -178,7 +183,7 @@ log_commit(int class, buffer *buf)
                l->pos += msg_len;
              }
 
-             fprintf(l->fh, "%s <%s> ", tbuf, class_names[class]);
+             fprintf(l->fh, "%s [%04x] <%s> ", tbuf, THIS_CORO_ID, class_names[class]);
            }
          fputs(buf->start, l->fh);
          fputc('\n', l->fh);
@@ -288,6 +293,8 @@ die(const char *msg, ...)
   exit(1);
 }
 
+static struct timespec dbg_time_start;
+
 /**
  * debug - write to debug output
  * @msg: a printf-like message
@@ -300,12 +307,33 @@ debug(const char *msg, ...)
 {
 #define MAX_DEBUG_BUFSIZE 16384
   va_list args;
-  char buf[MAX_DEBUG_BUFSIZE];
+  char buf[MAX_DEBUG_BUFSIZE], *pos = buf;
+  int max = MAX_DEBUG_BUFSIZE;
 
   va_start(args, msg);
   if (dbgf)
     {
-      if (bvsnprintf(buf, MAX_DEBUG_BUFSIZE, msg, args) < 0)
+      struct timespec dbg_time;
+      clock_gettime(CLOCK_MONOTONIC, &dbg_time);
+      uint nsec;
+      uint sec;
+
+      if (dbg_time.tv_nsec > dbg_time_start.tv_nsec)
+      {
+       nsec = dbg_time.tv_nsec - dbg_time_start.tv_nsec;
+       sec = dbg_time.tv_sec - dbg_time_start.tv_sec;
+      }
+      else
+      {
+       nsec = 1000000000 + dbg_time.tv_nsec - dbg_time_start.tv_nsec;
+       sec = dbg_time.tv_sec - dbg_time_start.tv_sec - 1;
+      }
+
+      int n = bsnprintf(pos, max, "%u.%09u: [%04x] ", sec, nsec, THIS_CORO_ID);
+      pos += n;
+      max -= n;
+
+      if (bvsnprintf(pos, max, msg, args) < 0)
        bug("Extremely long debug output, split it.");
 
       fputs(buf, dbgf);
index ad85d1ea5fd6dd4849acda381e32202e04525894..313c97c36451ea219413962dcd8b5944a0677945 100644 (file)
@@ -106,6 +106,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
 
 void io_init(void);
 void io_loop(void);
+void io_loop_reload(void);
 void io_log_dump(void);
 int sk_open_unix(struct birdsock *s, char *name);
 struct rfile *rf_open(struct pool *, const char *name, const char *mode);