From: Maria Matejka Date: Mon, 8 Feb 2021 08:51:59 +0000 (+0100) Subject: Coroutines: A simple and lightweight parallel execution framework. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bc353f1b501b78969884f3882060bd6e7eaa4765;p=thirdparty%2Fbird.git Coroutines: A simple and lightweight parallel execution framework. --- diff --git a/lib/coro.h b/lib/coro.h new file mode 100644 index 000000000..51712b362 --- /dev/null +++ b/lib/coro.h @@ -0,0 +1,26 @@ +/* + * BIRD Coroutines + * + * (c) 2017 Martin Mares + * (c) 2020 Maria Matejka + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +#ifndef _BIRD_CORO_H_ +#define _BIRD_CORO_H_ + +#include "lib/resource.h" + +/* A completely opaque coroutine handle. */ +struct coroutine; + +/* Coroutines are independent threads bound to pools. + * You request a coroutine by calling coro_run(). + * It is forbidden to free a running coroutine from outside. + * The running coroutine must free itself by rfree() before returning. + */ +struct coroutine *coro_run(pool *, void (*entry)(void *), void *data); + + +#endif diff --git a/sysdep/unix/coroutine.c b/sysdep/unix/coroutine.c index 05f101fbb..bf5b09db6 100644 --- a/sysdep/unix/coroutine.c +++ b/sysdep/unix/coroutine.c @@ -17,7 +17,14 @@ #include "lib/birdlib.h" #include "lib/locking.h" +#include "lib/coro.h" #include "lib/resource.h" +#include "lib/timer.h" + +/* Using a rather big stack for coroutines to allow for stack-local allocations. + * In real world, the kernel doesn't alloc this memory until it is used. + * */ +#define CORO_STACK_SIZE 1048576 /* * Implementation of coroutines based on POSIX threads @@ -100,3 +107,69 @@ void do_unlock(struct domain_generic *dg, struct domain_generic **lsp) pthread_mutex_unlock(&dg->mutex); } +/* Coroutines */ +struct coroutine { + resource r; + pthread_t id; + pthread_attr_t attr; + void (*entry)(void *); + void *data; +}; + +static _Thread_local _Bool coro_cleaned_up = 0; + +static void coro_free(resource *r) +{ + struct coroutine *c = (void *) r; + ASSERT_DIE(pthread_equal(pthread_self(), c->id)); + pthread_attr_destroy(&c->attr); + coro_cleaned_up = 1; +} + +static struct resclass coro_class = { + .name = "Coroutine", + .size = sizeof(struct coroutine), + .free = coro_free, +}; + +extern pthread_key_t current_time_key; + +static void *coro_entry(void *p) +{ + struct coroutine *c = p; + ASSERT_DIE(c->entry); + + pthread_setspecific(current_time_key, &main_timeloop); + + c->entry(c->data); + ASSERT_DIE(coro_cleaned_up); + + return NULL; +} + +struct coroutine *coro_run(pool *p, void (*entry)(void *), void *data) +{ + ASSERT_DIE(entry); + ASSERT_DIE(p); + + struct coroutine *c = ralloc(p, &coro_class); + + c->entry = entry; + c->data = data; + + int e = 0; + + if (e = pthread_attr_init(&c->attr)) + die("pthread_attr_init() failed: %M", e); + + if (e = pthread_attr_setstacksize(&c->attr, CORO_STACK_SIZE)) + die("pthread_attr_setstacksize(%u) failed: %M", CORO_STACK_SIZE, e); + + if (e = pthread_attr_setdetachstate(&c->attr, PTHREAD_CREATE_DETACHED)) + die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); + + if (e = pthread_create(&c->id, &c->attr, coro_entry, c)) + die("pthread_create() failed: %M", e); + + return c; +} diff --git a/sysdep/unix/io.c b/sysdep/unix/io.c index 294678679..40841ea44 100644 --- a/sysdep/unix/io.c +++ b/sysdep/unix/io.c @@ -2176,6 +2176,15 @@ static int short_loops = 0; #define SHORT_LOOP_MAX 10 #define WORK_EVENTS_MAX 10 +static int poll_reload_pipe[2]; + +void +io_loop_reload(void) +{ + char b; + write(poll_reload_pipe[1], &b, 1); +} + void io_loop(void) { @@ -2187,6 +2196,9 @@ io_loop(void) int fdmax = 256; struct pollfd *pfd = xmalloc(fdmax * sizeof(struct pollfd)); + if (pipe(poll_reload_pipe) < 0) + die("pipe(poll_reload_pipe) failed: %m"); + watchdog_start1(); for(;;) { @@ -2205,7 +2217,12 @@ io_loop(void) poll_tout = MIN(poll_tout, timeout); } - nfds = 0; + /* A hack to reload main io_loop() when something has changed asynchronously. */ + pfd[0].fd = poll_reload_pipe[0]; + pfd[0].events = POLLIN; + + nfds = 1; + WALK_LIST(n, sock_list) { pfd[nfds] = (struct pollfd) { .fd = -1 }; /* everything other set to 0 by this */ @@ -2277,6 +2294,14 @@ io_loop(void) } if (pout) { + if (pfd[0].revents & POLLIN) + { + /* IO loop reload requested */ + char b; + read(poll_reload_pipe[0], &b, 1); + continue; + } + times_update(&main_timeloop); /* guaranteed to be non-empty */ diff --git a/sysdep/unix/unix.h b/sysdep/unix/unix.h index ad85d1ea5..313c97c36 100644 --- a/sysdep/unix/unix.h +++ b/sysdep/unix/unix.h @@ -106,6 +106,7 @@ extern volatile sig_atomic_t async_shutdown_flag; void io_init(void); void io_loop(void); +void io_loop_reload(void); void io_log_dump(void); int sk_open_unix(struct birdsock *s, char *name); struct rfile *rf_open(struct pool *, const char *name, const char *mode);