From: Maria Matejka Date: Thu, 9 Nov 2023 14:50:13 +0000 (+0100) Subject: Merge branch 'mq-aggregator-for-v3' into thread-next X-Git-Tag: v3.0.0~343 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c7cc1ccd2e27a726844616501c46e757e92b7c72;p=thirdparty%2Fbird.git Merge branch 'mq-aggregator-for-v3' into thread-next --- c7cc1ccd2e27a726844616501c46e757e92b7c72 diff --cc doc/reply_codes index bcbbbb038,71cdf341d..e387fed83 --- a/doc/reply_codes +++ b/doc/reply_codes @@@ -61,7 -61,7 +61,8 @@@ Reply codes of BIRD command-line interf 1023 Show Babel interfaces 1024 Show Babel neighbors 1025 Show Babel entries - 1026 Show threads + 1026 Show MPLS ranges ++1027 Show threads 8000 Reply too long 8001 Route not found diff --cc nest/mpls.Y index e234bdc9f,0e755fec8..6cf1f909d --- a/nest/mpls.Y +++ b/nest/mpls.Y @@@ -137,6 -138,59 +138,59 @@@ mpls_channel_opt_list mpls_channel_end: { mpls_channel_postconfig(this_channel); } channel_end; + show_mpls_ranges_args: + /* empty */ + { + if (EMPTY_LIST(config->mpls_domains)) + cf_error("No MPLS domain defined"); + + $$ = cfg_allocz(sizeof(struct mpls_show_ranges_cmd)); + } - | show_mpls_ranges_args symbol_known ++ | show_mpls_ranges_args CF_SYM_KNOWN + { + if ($2->class == SYM_MPLS_DOMAIN) + { + if ($$->domain) + cf_error("Only one MPLS domain expected"); + + $$->domain = $2->mpls_domain; + } + else if ($2->class == SYM_MPLS_RANGE) + { + if ($$->range) + cf_error("Only one MPLS label range expected"); + + if ($$->domain != $2->mpls_range->domain) + cf_error("MPLS label range from different MPLS domain"); + + $$->domain = $2->mpls_range->domain; + $$->range = $2->mpls_range; + } + else + cf_error("MPLS domain or label range expected"); + } + | show_mpls_ranges_args STATIC + { + if ($$->range) + cf_error("Only one MPLS label range expected"); + + $$->domain = $$->domain ?: cf_default_mpls_domain(config); + $$->range = $$->domain->static_range; + } + | show_mpls_ranges_args DYNAMIC + { + if ($$->range) + cf_error("Only one MPLS label range expected"); + + $$->domain = $$->domain ?: cf_default_mpls_domain(config); + $$->range = $$->domain->dynamic_range; + } + ; + + CF_CLI(SHOW MPLS RANGES, show_mpls_ranges_args, [ | ], [[Show MPLS ranges]]) + { mpls_show_ranges($4); } ; + + CF_CODE CF_END diff --cc nest/mpls.c index 269b4df7e,8033b8571..cd44d2d9f --- a/nest/mpls.c +++ b/nest/mpls.c @@@ -1006,37 -1026,65 +1006,84 @@@ mpls_rte_get_fec_lock(const rte *r } void -mpls_rte_remove(net *n UNUSED, rte *r) +mpls_rte_preimport(rte *new, const rte *old) { - struct proto *p = r->src->proto; - struct mpls_fec_map *m = p->mpls_map; + struct mpls_fec_tmp_lock new_mt = {}, old_mt = {}; - uint label = ea_get_int(r->attrs->eattrs, EA_MPLS_LABEL, 0); - if (label < 16) - return; + if (new) + new_mt = mpls_rte_get_fec_lock(new); - struct mpls_fec *fec = mpls_find_fec_by_label(m, label); - if (!fec) + if (old) + old_mt = mpls_rte_get_fec_lock(old); + + if (new_mt.fec == old_mt.fec) return; - mpls_unlock_fec(m, fec); + if (new_mt.fec) + mpls_lock_fec(new_mt.m, new_mt.fec); + + if (old_mt.fec) + mpls_unlock_fec(old_mt.m, old_mt.fec); } + static void + mpls_show_ranges_rng(struct mpls_show_ranges_cmd *cmd, struct mpls_range *r) + { + uint last = lmap_last_one_in_range(&cmd->dom->labels, r->lo, r->hi); + if (last == r->hi) last = 0; + + cli_msg(-1026, "%-11s %7u %7u %7u %7u %7u", + r->name, r->lo, r->hi - r->lo, r->hi, r->label_count, last); + } + + void + mpls_show_ranges_dom(struct mpls_show_ranges_cmd *cmd, struct mpls_domain *m) + { + if (cmd->dom) + cli_msg(-1026, ""); + + cmd->dom = m; + cli_msg(-1026, "MPLS domain %s:", m->name); + cli_msg(-1026, "%-11s %7s %7s %7s %7s %7s", + "Range", "Start", "Length", "End", "Labels", "Last"); + + if (cmd->range) + mpls_show_ranges_rng(cmd, cmd->range->range); + else + { + struct mpls_range *r; + WALK_LIST(r, m->ranges) + if (!r->removed) + mpls_show_ranges_rng(cmd, r); + } + } + + void + mpls_show_ranges(struct mpls_show_ranges_cmd *cmd) + { + if (cmd->domain) + mpls_show_ranges_dom(cmd, cmd->domain->domain); + else + { + struct mpls_domain *m; + WALK_LIST(m, mpls_domains) + mpls_show_ranges_dom(cmd, m); + } + + cli_msg(0, ""); + } ++ +struct ea_class ea_gen_mpls_policy = { + .name = "mpls_policy", + .type = T_ENUM_MPLS_POLICY, +}; + +struct ea_class ea_gen_mpls_class = { + .name = "mpls_class", + .type = T_INT, +}; + +struct ea_class ea_gen_mpls_label = { + .name = "mpls_label", + .type = T_INT, +}; diff --cc nest/mpls.h index 57978c990,bac5c69d4..a4319e5b5 --- a/nest/mpls.h +++ b/nest/mpls.h @@@ -162,9 -164,22 +162,20 @@@ void mpls_fec_map_free(struct mpls_fec_ struct mpls_fec *mpls_find_fec_by_label(struct mpls_fec_map *x, u32 label); struct mpls_fec *mpls_get_fec_by_label(struct mpls_fec_map *m, u32 label); struct mpls_fec *mpls_get_fec_by_net(struct mpls_fec_map *m, const net_addr *net, u32 path_id); -struct mpls_fec *mpls_get_fec_by_rta(struct mpls_fec_map *m, const rta *src, u32 class_id); +struct mpls_fec *mpls_get_fec_by_destination(struct mpls_fec_map *m, ea_list *dest); void mpls_free_fec(struct mpls_fec_map *x, struct mpls_fec *fec); -void mpls_handle_rte(struct mpls_fec_map *m, const net_addr *n, rte *r, linpool *lp, struct mpls_fec **locked_fec); -void mpls_handle_rte_cleanup(struct mpls_fec_map *m, struct mpls_fec **locked_fec); -void mpls_rte_insert(net *n UNUSED, rte *r); -void mpls_rte_remove(net *n UNUSED, rte *r); +void mpls_handle_rte(struct mpls_fec_map *m, const net_addr *n, rte *r); +void mpls_rte_preimport(rte *new, const rte *old); + + struct mpls_show_ranges_cmd { + struct mpls_domain_config *domain; + struct mpls_range_config *range; + + /* Runtime */ + struct mpls_domain *dom; + }; + + void mpls_show_ranges(struct mpls_show_ranges_cmd *cmd); + #endif diff --cc sysdep/unix/io-loop.c index 96699eb4e,000000000..5213fdc03 mode 100644,000000..100644 --- a/sysdep/unix/io-loop.c +++ b/sysdep/unix/io-loop.c @@@ -1,1692 -1,0 +1,1692 @@@ +/* + * BIRD -- I/O and event loop + * + * Can be freely distributed and used under the terms of the GNU GPL. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nest/bird.h" + +#include "lib/buffer.h" +#include "lib/lists.h" +#include "lib/locking.h" +#include "lib/resource.h" +#include "lib/event.h" +#include "lib/timer.h" +#include "lib/socket.h" + +#include "lib/io-loop.h" +#include "sysdep/unix/io-loop.h" +#include "conf/conf.h" +#include "nest/cli.h" + +#define THREAD_STACK_SIZE 65536 /* To be lowered in near future */ + +static struct birdloop *birdloop_new_no_pickup(pool *pp, uint order, const char *name, ...); + +/* + * Nanosecond time for accounting purposes + * + * A fixed point on startup is set as zero, all other values are relative to that. + * Caution: this overflows after like 500 years or so. If you plan to run + * BIRD for such a long time, please implement some means of overflow prevention. + */ + +static struct timespec ns_begin; + +static void ns_init(void) +{ + if (clock_gettime(CLOCK_MONOTONIC, &ns_begin)) + bug("clock_gettime: %m"); +} + +#define NSEC_IN_SEC ((u64) (1000 * 1000 * 1000)) + +static u64 ns_now(void) +{ + struct timespec ts; + if (clock_gettime(CLOCK_MONOTONIC, &ts)) + bug("clock_gettime: %m"); + + return (u64) (ts.tv_sec - ns_begin.tv_sec) * NSEC_IN_SEC + ts.tv_nsec - ns_begin.tv_nsec; +} + +#define NSEC_TO_SEC(x) ((x) / NSEC_IN_SEC) +#define CURRENT_SEC NSEC_TO_SEC(ns_now()) + +static _Thread_local struct spent_time *account_target_spent_time; +static _Thread_local u64 *account_target_total; +static _Thread_local u64 account_last; + +static u64 account_finish(void) +{ + /* Get current time */ + u64 now = ns_now(); + u64 dif = now - account_last; + + /* Update second by second */ + if (account_target_spent_time) + { + /* Drop old time information if difference is too large */ + if (NSEC_TO_SEC(account_last) + TIME_BY_SEC_SIZE - 1 < NSEC_TO_SEC(now)) + account_last = (NSEC_TO_SEC(now) - TIME_BY_SEC_SIZE + 1) * NSEC_IN_SEC; + + /* Zero new records */ + if (NSEC_TO_SEC(account_target_spent_time->last_written_ns) + TIME_BY_SEC_SIZE < NSEC_TO_SEC(account_last)) + memset(account_target_spent_time->by_sec_ns, 0, sizeof(account_target_spent_time->by_sec_ns)); + else + for (u64 fclr = NSEC_TO_SEC(account_target_spent_time->last_written_ns) + 1; + fclr <= NSEC_TO_SEC(now); + fclr++) + account_target_spent_time->by_sec_ns[fclr % TIME_BY_SEC_SIZE] = 0; + + /* Add times second by second */ + while (NSEC_TO_SEC(account_last) != NSEC_TO_SEC(now)) + { + u64 part = (NSEC_TO_SEC(account_last) + 1) * NSEC_IN_SEC - account_last; + account_target_spent_time->by_sec_ns[NSEC_TO_SEC(account_last) % TIME_BY_SEC_SIZE] += part; + account_last += part; + } + + /* Update the last second */ + account_target_spent_time->by_sec_ns[NSEC_TO_SEC(account_last) % TIME_BY_SEC_SIZE] += now - account_last; + + /* Store the current time */ + account_target_spent_time->last_written_ns = now; + } + + /* Update the total */ + if (account_target_total) + *account_target_total += dif; + + /* Store current time */ + account_last = now; + + return dif; +} + +static u64 account_to_spent_time(struct spent_time *st) +{ + u64 elapsed = account_finish(); + + account_target_spent_time = st; + account_target_total = &st->total_ns; + + return elapsed; +} + +static u64 account_to_total(u64 *total) +{ + u64 elapsed = account_finish(); + + account_target_spent_time = NULL; + account_target_total = total; + + return elapsed; +} + +#define account_to(_arg) _Generic((_arg), \ + struct spent_time *: account_to_spent_time, \ + u64 *: account_to_total)(_arg) + +/* + * Current thread context + */ + +_Thread_local struct birdloop *birdloop_current; +static _Thread_local struct birdloop *birdloop_wakeup_masked; +static _Thread_local uint birdloop_wakeup_masked_count; + +#define LOOP_NAME(loop) domain_name((loop)->time.domain) + +#define LOOP_TRACE(loop, fmt, args...) do { if (config && config->latency_debug) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args); } while (0) +#define THREAD_TRACE(...) do { if (config && config->latency_debug) log(L_TRACE "Thread: " __VA_ARGS__); } while (0) + +#define LOOP_WARN(loop, fmt, args...) log(L_TRACE "%s (%p): " fmt, LOOP_NAME(loop), (loop), ##args) + + +event_list * +birdloop_event_list(struct birdloop *loop) +{ + return &loop->event_list; +} + +struct timeloop * +birdloop_time_loop(struct birdloop *loop) +{ + return &loop->time; +} + +pool * +birdloop_pool(struct birdloop *loop) +{ + return loop->pool; +} + +_Bool +birdloop_inside(struct birdloop *loop) +{ + for (struct birdloop *c = birdloop_current; c; c = c->prev_loop) + if (loop == c) + return 1; + + return 0; +} + +_Bool +birdloop_in_this_thread(struct birdloop *loop) +{ + return pthread_equal(pthread_self(), loop->thread->thread_id); +} + +void +birdloop_flag(struct birdloop *loop, u32 flag) +{ + atomic_fetch_or_explicit(&loop->flags, flag, memory_order_acq_rel); + birdloop_ping(loop); +} + +void +birdloop_flag_set_handler(struct birdloop *loop, struct birdloop_flag_handler *fh) +{ + ASSERT_DIE(birdloop_inside(loop)); + loop->flag_handler = fh; +} + +static int +birdloop_process_flags(struct birdloop *loop) +{ + if (!loop->flag_handler) + return 0; + + u32 flags = atomic_exchange_explicit(&loop->flags, 0, memory_order_acq_rel); + if (!flags) + return 0; + + loop->flag_handler->hook(loop->flag_handler, flags); + return 1; +} + +/* + * Wakeup code for birdloop + */ + +void +pipe_new(struct pipe *p) +{ + int rv = pipe(p->fd); + if (rv < 0) + die("pipe: %m"); + + if (fcntl(p->fd[0], F_SETFL, O_NONBLOCK) < 0) + die("fcntl(O_NONBLOCK): %m"); + + if (fcntl(p->fd[1], F_SETFL, O_NONBLOCK) < 0) + die("fcntl(O_NONBLOCK): %m"); +} + +void +pipe_drain(struct pipe *p) +{ + while (1) { + char buf[64]; + int rv = read(p->fd[0], buf, sizeof(buf)); + if ((rv < 0) && (errno == EAGAIN)) + return; + + if (rv == 0) + bug("wakeup read eof"); + if ((rv < 0) && (errno != EINTR)) + bug("wakeup read: %m"); + } +} + +int +pipe_read_one(struct pipe *p) +{ + while (1) { + char v; + int rv = read(p->fd[0], &v, sizeof(v)); + if (rv == 1) + return 1; + if ((rv < 0) && (errno == EAGAIN)) + return 0; + if (rv > 1) + bug("wakeup read more bytes than expected: %d", rv); + if (rv == 0) + bug("wakeup read eof"); + if (errno != EINTR) + bug("wakeup read: %m"); + } +} + +void +pipe_kick(struct pipe *p) +{ + char v = 1; + int rv; + + while (1) { + rv = write(p->fd[1], &v, sizeof(v)); + if ((rv >= 0) || (errno == EAGAIN)) + return; + if (errno != EINTR) + bug("wakeup write: %m"); + } +} + +void +pipe_pollin(struct pipe *p, struct pfd *pfd) +{ + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = p->fd[0], + .events = POLLIN, + }; + BUFFER_PUSH(pfd->loop) = NULL; +} + +void +pipe_free(struct pipe *p) +{ + close(p->fd[0]); + close(p->fd[1]); +} + +static inline void +wakeup_init(struct bird_thread *loop) +{ + pipe_new(&loop->wakeup); +} + +static inline void +wakeup_drain(struct bird_thread *loop) +{ + pipe_drain(&loop->wakeup); +} + +static inline void +wakeup_do_kick(struct bird_thread *loop) +{ + pipe_kick(&loop->wakeup); +} + +static inline void +wakeup_free(struct bird_thread *loop) +{ + pipe_free(&loop->wakeup); +} + +static inline _Bool +birdloop_try_ping(struct birdloop *loop, u32 ltt) +{ + /* Somebody else is already pinging, be idempotent */ + if (ltt & LTT_PING) + { + LOOP_TRACE(loop, "already being pinged"); + return 0; + } + + /* Thread moving is an implicit ping */ + if (ltt & LTT_MOVE) + { + LOOP_TRACE(loop, "ping while moving"); + return 1; + } + + /* No more flags allowed */ + ASSERT_DIE(!ltt); + + /* No ping when not picked up */ + if (!loop->thread) + { + LOOP_TRACE(loop, "not picked up yet, can't ping"); + return 1; + } + + /* No ping when masked */ + if (loop == birdloop_wakeup_masked) + { + LOOP_TRACE(loop, "wakeup masked, can't ping"); + birdloop_wakeup_masked_count++; + return 1; + } + + /* Send meta event to ping */ + if ((loop != loop->thread->meta) && (loop != &main_birdloop)) + { + LOOP_TRACE(loop, "Ping by meta event to %p", loop->thread->meta); + ev_send_loop(loop->thread->meta, &loop->event); + return 1; + } + + /* Do the real ping of Meta or Main */ + LOOP_TRACE(loop, "sending pipe ping"); + wakeup_do_kick(loop->thread); + return 0; +} + +static inline void +birdloop_do_ping(struct birdloop *loop) +{ + /* Register our ping effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_PING, memory_order_acq_rel); + + /* Try to ping in multiple ways */ + if (birdloop_try_ping(loop, ltt)) + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_PING, memory_order_acq_rel); +} + +void +birdloop_ping(struct birdloop *loop) +{ + if (!birdloop_inside(loop)) + { + LOOP_TRACE(loop, "ping from outside"); + birdloop_do_ping(loop); + } + else + { + LOOP_TRACE(loop, "ping from inside, pending=%d", loop->ping_pending); + if (!loop->ping_pending) + loop->ping_pending++; + } +} + + +/* + * Sockets + */ + +static void +sockets_init(struct birdloop *loop) +{ + init_list(&loop->sock_list); + loop->sock_num = 0; +} + +void +socket_changed(sock *s) +{ + struct birdloop *loop = s->loop; + ASSERT_DIE(birdloop_inside(loop)); + + loop->sock_changed = 1; + birdloop_ping(loop); +} + +void +birdloop_add_socket(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(!s->loop); + + LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num); + add_tail(&loop->sock_list, &s->n); + loop->sock_num++; + + s->loop = loop; + s->index = -1; + + socket_changed(s); +} + +extern sock *stored_sock; /* mainloop hack */ + +void +birdloop_remove_socket(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(!enlisted(&s->n) == !s->loop); + + if (!s->loop) + return; + + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(s->loop == loop); + + /* Decouple the socket from the loop at all. */ + LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num); + + if (loop->sock_active == s) + loop->sock_active = sk_next(s); + + if ((loop == &main_birdloop) && (s == stored_sock)) + stored_sock = sk_next(s); + + rem_node(&s->n); + loop->sock_num--; + + socket_changed(s); + + s->loop = NULL; + s->index = -1; +} + +void +sk_reloop(sock *s, struct birdloop *loop) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(birdloop_inside(s->loop)); + + if (loop == s->loop) + return; + + birdloop_remove_socket(s->loop, s); + birdloop_add_socket(loop, s); +} + +void +sk_pause_rx(struct birdloop *loop, sock *s) +{ + ASSERT_DIE(birdloop_inside(loop)); + s->rx_hook = NULL; + socket_changed(s); +} + +void +sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint)) +{ + ASSERT_DIE(birdloop_inside(loop)); + ASSERT_DIE(hook); + s->rx_hook = hook; + socket_changed(s); +} + +static inline uint sk_want_events(sock *s) +{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); } + +void +sockets_prepare(struct birdloop *loop, struct pfd *pfd) +{ + node *n; + WALK_LIST(n, loop->sock_list) + { + sock *s = SKIP_BACK(sock, n, n); + uint w = sk_want_events(s); + + if (!w) + { + s->index = -1; + continue; + } + + s->index = pfd->pfd.used; + LOOP_TRACE(loop, "socket %p poll index is %d", s, s->index); + + BUFFER_PUSH(pfd->pfd) = (struct pollfd) { + .fd = s->fd, + .events = sk_want_events(s), + }; + BUFFER_PUSH(pfd->loop) = loop; + } +} + +int sk_read(sock *s, int revents); +int sk_write(sock *s); +void sk_err(sock *s, int revents); + +static int +sockets_fire(struct birdloop *loop) +{ + if (EMPTY_LIST(loop->sock_list)) + return 0; + + int repeat = 0; + + times_update(); + + struct pollfd *pfd = loop->thread->pfd->pfd.data; + loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list)); + + while (loop->sock_active) + { + sock *s = loop->sock_active; + + int rev; + if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL)) + { + int e = 1; + + if (rev & POLLOUT) + { + /* Write everything. */ + while ((s == loop->sock_active) && (e = sk_write(s))) + ; + + if (s != loop->sock_active) + continue; + + if (!sk_tx_pending(s)) + loop->thread->sock_changed = 1; + } + + if (rev & POLLIN) + /* Read just one packet and request repeat. */ + if ((s == loop->sock_active) && s->rx_hook) + if (sk_read(s, rev)) + repeat++; + + if (s != loop->sock_active) + continue; + + if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR)) + sk_err(s, rev); + + if (s != loop->sock_active) + continue; + } + + loop->sock_active = sk_next(s); + } + + return repeat; +} + +/* + * Threads + */ + +DEFINE_DOMAIN(attrs); +static void bird_thread_start_event(void *_data); + +struct birdloop_pickup_group { + DOMAIN(attrs) domain; + list loops; + list threads; + uint thread_count; + uint thread_busy_count; + uint loop_count; + uint loop_unassigned_count; + btime max_latency; + event start_threads; +} pickup_groups[2] = { + { + /* all zeroes */ + }, + { + /* FIXME: make this dynamic, now it copies the loop_max_latency value from proto/bfd/config.Y */ + .max_latency = 10 MS, + .start_threads.hook = bird_thread_start_event, + .start_threads.data = &pickup_groups[1], + }, +}; + +static _Thread_local struct bird_thread *this_thread; + +static void +birdloop_set_thread(struct birdloop *loop, struct bird_thread *thr, struct birdloop_pickup_group *group) +{ + struct bird_thread *old = loop->thread; + ASSERT_DIE(!thr != !old); + + /* Signal our moving effort */ + u32 ltt = atomic_fetch_or_explicit(&loop->thread_transition, LTT_MOVE, memory_order_acq_rel); + ASSERT_DIE((ltt & LTT_MOVE) == 0); + + /* Wait until all previously started pings end */ + while (ltt & LTT_PING) + { + birdloop_yield(); + ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + ASSERT_DIE(ltt & LTT_MOVE); + } + /* Now we are free of running pings */ + + if (!thr) + { + /* Unschedule from Meta */ + ev_postpone(&loop->event); + tm_stop(&loop->timer); + + /* Request local socket reload */ + this_thread->sock_changed = 1; + } + + /* Update the thread value */ + loop->thread = thr; + + /* Allow pings */ + atomic_fetch_and_explicit(&loop->thread_transition, ~LTT_MOVE, memory_order_acq_rel); + + /* Put into appropriate lists */ + if (thr) + { + thr->loop_count++; + add_tail(&thr->loops, &loop->n); + + if (!EMPTY_LIST(loop->sock_list)) + thr->sock_changed = 1; + ev_send_loop(loop->thread->meta, &loop->event); + } + else + { + /* Put into pickup list */ + LOCK_DOMAIN(attrs, group->domain); + add_tail(&group->loops, &loop->n); + group->loop_unassigned_count++; + UNLOCK_DOMAIN(attrs, group->domain); + } +} + +static void +bird_thread_pickup_next(struct birdloop_pickup_group *group) +{ + /* This thread goes to the end of the pickup list */ + rem_node(&this_thread->n); + add_tail(&group->threads, &this_thread->n); + + /* If there are more loops to be picked up, wakeup the next thread in order */ + if (!EMPTY_LIST(group->loops)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); +} + +static void +birdloop_take(struct birdloop_pickup_group *group) +{ + struct birdloop *loop = NULL; + + LOCK_DOMAIN(attrs, group->domain); + + int drop = + this_thread->busy_active && + (group->thread_busy_count < group->thread_count) && + (this_thread->loop_count > 1); + int take = !EMPTY_LIST(group->loops); + + if (drop) + { + UNLOCK_DOMAIN(attrs, group->domain); + + node *n; + WALK_LIST2(loop, n, this_thread->loops, n) + { + birdloop_enter(loop); + if (ev_active(&loop->event)) + { + LOOP_TRACE(loop, "Moving to another thread"); + /* Pass to another thread */ + rem_node(&loop->n); + this_thread->loop_count--; + + /* This also unschedules the loop from Meta */ + birdloop_set_thread(loop, NULL, group); + + birdloop_leave(loop); + + LOCK_DOMAIN(attrs, group->domain); + bird_thread_pickup_next(group); + UNLOCK_DOMAIN(attrs, group->domain); + break; + } + birdloop_leave(loop); + } + + return; + } + + if (take) + { + /* Take a proportional amount of loops from the pickup list and unlock */ + uint thread_count = group->thread_count + 1; + if (group->thread_busy_count < group->thread_count) + thread_count -= group->thread_busy_count; + + uint assign = 1 + group->loop_unassigned_count / thread_count; + for (uint i=0; !EMPTY_LIST(group->loops) && iloops)); + rem_node(&loop->n); + group->loop_unassigned_count--; + UNLOCK_DOMAIN(attrs, group->domain); + + birdloop_enter(loop); + birdloop_set_thread(loop, this_thread, group); + + node *n; + WALK_LIST(n, loop->sock_list) + SKIP_BACK(sock, n, n)->index = -1; + + birdloop_leave(loop); + + LOCK_DOMAIN(attrs, group->domain); + } + + bird_thread_pickup_next(group); + } + + UNLOCK_DOMAIN(attrs, group->domain); +} + +static int +poll_timeout(struct birdloop *loop) +{ + timer *t = timers_first(&loop->time); + if (!t) + return -1; + + btime remains = tm_remains(t); + return remains TO_MS + ((remains TO_MS) MS < remains); +} + +static void +bird_thread_busy_set(struct bird_thread *thr, int val) +{ + LOCK_DOMAIN(attrs, thr->group->domain); + if (thr->busy_active = val) + thr->group->thread_busy_count++; + else + thr->group->thread_busy_count--; + ASSERT_DIE(thr->group->thread_busy_count <= thr->group->thread_count); + UNLOCK_DOMAIN(attrs, thr->group->domain); +} + +static void +bird_thread_busy_update(struct bird_thread *thr, int timeout_ms) +{ + int idle_force = (timeout_ms < 0); + int val = (timeout_ms < 5) && !idle_force; + + if (val == thr->busy_active) + return; + + if (val && (++thr->busy_counter == 4)) + return bird_thread_busy_set(thr, 1); + + if (!val && (idle_force || (--thr->busy_counter == 0))) + { + thr->busy_counter = 0; + bird_thread_busy_set(thr, 0); + } +} + +static void * +bird_thread_main(void *arg) +{ + struct bird_thread *thr = this_thread = arg; + + rcu_thread_start(&thr->rcu); + synchronize_rcu(); + + account_to(&thr->overhead); + + birdloop_enter(thr->meta); + + tmp_init(thr->pool, birdloop_domain(thr->meta)); + init_list(&thr->loops); + + thr->sock_changed = 1; + + struct pfd pfd; + BUFFER_INIT(pfd.pfd, thr->pool, 16); + BUFFER_INIT(pfd.loop, thr->pool, 16); + thr->pfd = &pfd; + + while (1) + { + u64 thr_loop_start = ns_now(); + int timeout; + + /* Pickup new loops */ + birdloop_take(thr->group); + + /* Schedule all loops with timed out timers */ + timers_fire(&thr->meta->time, 0); + + /* Compute maximal time per loop */ + u64 thr_before_run = ns_now(); + if (thr->loop_count > 0) + thr->max_loop_time_ns = (thr->max_latency_ns / 2 - (thr_before_run - thr_loop_start)) / (u64) thr->loop_count; + + /* Run all scheduled loops */ + int more_events = ev_run_list(&thr->meta->event_list); + if (more_events) + { + THREAD_TRACE("More events to run"); + timeout = 0; + } + else + { + timeout = poll_timeout(thr->meta); + if (timeout == -1) + THREAD_TRACE("No timers, no events"); + else + THREAD_TRACE("Next timer in %d ms", timeout); + } + + /* Run priority events before sleeping */ + ev_run_list(&thr->priority_events); + + /* Do we have to refresh sockets? */ + if (thr->sock_changed) + { + thr->sock_changed = 0; + + BUFFER_FLUSH(pfd.pfd); + BUFFER_FLUSH(pfd.loop); + + pipe_pollin(&thr->wakeup, &pfd); + + node *nn; + struct birdloop *loop; + WALK_LIST2(loop, nn, thr->loops, n) + { + birdloop_enter(loop); + sockets_prepare(loop, &pfd); + birdloop_leave(loop); + } + + ASSERT_DIE(pfd.loop.used == pfd.pfd.used); + } + /* Nothing to do in at least 5 seconds, flush local hot page cache */ + else if ((timeout > 5000) && (timeout < 0)) + flush_local_pages(); + + bird_thread_busy_update(thr, timeout); + + account_to(&this_thread->idle); + birdloop_leave(thr->meta); +poll_retry:; + int rv = poll(pfd.pfd.data, pfd.pfd.used, timeout); + if (rv < 0) + { + if (errno == EINTR || errno == EAGAIN) + goto poll_retry; + bug("poll in %p: %m", thr); + } + + account_to(&this_thread->overhead); + birdloop_enter(thr->meta); + + /* Drain wakeup fd */ + if (pfd.pfd.data[0].revents & POLLIN) + { + ASSERT_DIE(rv > 0); + rv--; + wakeup_drain(thr); + } + + /* Unset ping information for Meta */ + atomic_fetch_and_explicit(&thr->meta->thread_transition, ~LTT_PING, memory_order_acq_rel); + + /* Schedule loops with active sockets */ + if (rv) + for (uint i = 1; i < pfd.pfd.used; i++) + if (pfd.pfd.data[i].revents) + { + LOOP_TRACE(pfd.loop.data[i], "socket id %d got revents=%d", i, pfd.pfd.data[i].revents); + ev_send_loop(thr->meta, &pfd.loop.data[i]->event); + } + } + + bug("An infinite loop has ended."); +} + +static void +bird_thread_cleanup(void *_thr) +{ + struct bird_thread *thr = _thr; + struct birdloop *meta = thr->meta; + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + /* Wait until the thread actually finishes */ + ASSERT_DIE(meta); + birdloop_enter(meta); + birdloop_leave(meta); + + /* No more wakeup */ + wakeup_free(thr); + + /* Thread attributes no longer needed */ + pthread_attr_destroy(&thr->thread_attr); + + /* Free the meta loop */ + thr->meta->thread = NULL; + thr->meta = NULL; + birdloop_free(meta); +} + +static struct bird_thread * +bird_thread_start(struct birdloop_pickup_group *group) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + struct birdloop *meta = birdloop_new_no_pickup(&root_pool, DOMAIN_ORDER(meta), "Thread Meta"); + pool *p = birdloop_pool(meta); + + birdloop_enter(meta); + LOCK_DOMAIN(attrs, group->domain); + + struct bird_thread *thr = mb_allocz(p, sizeof(*thr)); + thr->pool = p; + thr->cleanup_event = (event) { .hook = bird_thread_cleanup, .data = thr, }; + thr->group = group; + thr->max_latency_ns = (group->max_latency ?: 5 S) TO_NS; + thr->meta = meta; + thr->meta->thread = thr; + + wakeup_init(thr); + ev_init_list(&thr->priority_events, NULL, "Thread direct event list"); + + add_tail(&group->threads, &thr->n); + + int e = 0; + + if (e = pthread_attr_init(&thr->thread_attr)) + die("pthread_attr_init() failed: %M", e); + + /* We don't have to worry about thread stack size so much. + if (e = pthread_attr_setstacksize(&thr->thread_attr, THREAD_STACK_SIZE)) + die("pthread_attr_setstacksize(%u) failed: %M", THREAD_STACK_SIZE, e); + */ + + if (e = pthread_attr_setdetachstate(&thr->thread_attr, PTHREAD_CREATE_DETACHED)) + die("pthread_attr_setdetachstate(PTHREAD_CREATE_DETACHED) failed: %M", e); + + if (e = pthread_create(&thr->thread_id, &thr->thread_attr, bird_thread_main, thr)) + die("pthread_create() failed: %M", e); + + group->thread_count++; + + UNLOCK_DOMAIN(attrs, group->domain); + birdloop_leave(meta); + return thr; +} + +static void +bird_thread_start_event(void *_data) +{ + struct birdloop_pickup_group *group = _data; + bird_thread_start(group); +} + +static struct birdloop *thread_dropper; +static event *thread_dropper_event; +static uint thread_dropper_goal; + +static void +bird_thread_dropper_free(void *data) +{ + struct birdloop *tdl_stop = data; + birdloop_free(tdl_stop); +} + +static void +bird_thread_shutdown(void * _ UNUSED) +{ + struct birdloop_pickup_group *group = this_thread->group; + LOCK_DOMAIN(attrs, group->domain); + int dif = group->thread_count - thread_dropper_goal; + struct birdloop *tdl_stop = NULL; + + if (dif > 0) + ev_send_loop(thread_dropper, thread_dropper_event); + else + { + tdl_stop = thread_dropper; + thread_dropper = NULL; + } + + UNLOCK_DOMAIN(attrs, group->domain); + + DBG("Thread pickup size differs from dropper goal by %d%s\n", dif, tdl_stop ? ", stopping" : ""); + + if (tdl_stop) + { + birdloop_stop_self(tdl_stop, bird_thread_dropper_free, tdl_stop); + return; + } + + struct bird_thread *thr = this_thread; + + LOCK_DOMAIN(attrs, group->domain); + /* Leave the thread-picker list to get no more loops */ + rem_node(&thr->n); + group->thread_count--; + + /* Fix the busy count */ + if (thr->busy_active) + group->thread_busy_count--; + + UNLOCK_DOMAIN(attrs, group->domain); + + /* Leave the thread-dropper loop as we aren't going to return. */ + birdloop_leave(thread_dropper); + + /* Last try to run the priority event list; ruin it then to be extra sure */ + ev_run_list(&this_thread->priority_events); + memset(&this_thread->priority_events, 0xa5, sizeof(this_thread->priority_events)); + + /* Drop loops including the thread dropper itself */ + while (!EMPTY_LIST(thr->loops)) + { + struct birdloop *loop = HEAD(thr->loops); + + /* Remove loop from this thread's list */ + this_thread->loop_count--; + rem_node(&loop->n); + + /* Unset loop's thread */ + birdloop_set_thread(loop, NULL, group); + } + + /* Let others know about new loops */ + LOCK_DOMAIN(attrs, group->domain); + if (!EMPTY_LIST(group->loops)) + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + UNLOCK_DOMAIN(attrs, group->domain); + + /* Request thread cleanup from main loop */ + ev_send_loop(&main_birdloop, &thr->cleanup_event); + + /* Local pages not needed anymore */ + flush_local_pages(); + + /* Unregister from RCU */ + rcu_thread_stop(&thr->rcu); + + /* Now we can be cleaned up */ + birdloop_leave(thr->meta); + + /* Exit! */ + pthread_exit(NULL); +} + +void +bird_thread_commit(struct config *new, struct config *old UNUSED) +{ + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + if (new->shutdown) + return; + + if (!new->thread_count) + new->thread_count = 1; + + while (1) + { + struct birdloop_pickup_group *group = &pickup_groups[0]; + LOCK_DOMAIN(attrs, group->domain); + + int dif = group->thread_count - (thread_dropper_goal = new->thread_count); + _Bool thread_dropper_running = !!thread_dropper; + + UNLOCK_DOMAIN(attrs, group->domain); + + if (dif < 0) + { + bird_thread_start(group); + continue; + } + + if ((dif > 0) && !thread_dropper_running) + { + struct birdloop *tdl = birdloop_new(&root_pool, DOMAIN_ORDER(control), group->max_latency, "Thread dropper"); + birdloop_enter(tdl); + event *tde = ev_new_init(tdl->pool, bird_thread_shutdown, NULL); + + LOCK_DOMAIN(attrs, group->domain); + thread_dropper = tdl; + thread_dropper_event = tde; + UNLOCK_DOMAIN(attrs, group->domain); + + ev_send_loop(thread_dropper, thread_dropper_event); + birdloop_leave(tdl); + } + + return; + } +} + +/* Cleanup after last thread */ +static void +bird_thread_sync_finish(void *_sync) +{ + ASSERT_THE_BIRD_LOCKED; + struct bird_thread_syncer *sync = _sync; + + /* Keep necessary pointers locally */ + pool *p = sync->pool; + DOMAIN(control) lock = sync->lock; + LOCK_DOMAIN(control, lock); + + /* This invalidates the `sync` pointer */ + CALL(sync->finish, sync); + + /* Free pool and domain */ + rp_free(p); + UNLOCK_DOMAIN(control, lock); + DOMAIN_FREE(control, lock); +} + +/* Process regular one thread hook */ +static void +bird_thread_sync_one(void *_sync) +{ + struct bird_thread_syncer *sync = _sync; + + LOCK_DOMAIN(control, sync->lock); + CALL(sync->hook, sync); + sync->done++; + if (sync->done == sync->total) + ev_send_loop(&main_birdloop, ev_new_init(sync->pool, bird_thread_sync_finish, sync)); + UNLOCK_DOMAIN(control, sync->lock); +} + +void +bird_thread_sync_all(struct bird_thread_syncer *sync, + void (*hook)(struct bird_thread_syncer *), + void (*done)(struct bird_thread_syncer *), const char *name) +{ + sync->lock = DOMAIN_NEW(control); + LOCK_DOMAIN(control, sync->lock); + + sync->pool = rp_new(&root_pool, sync->lock.control, name); + sync->hook = hook; + sync->finish = done; + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + LOCK_DOMAIN(attrs, group->domain); + + struct bird_thread *thr; + WALK_LIST(thr, group->threads) + { + sync->total++; + ev_send(&thr->priority_events, ev_new_init(sync->pool, bird_thread_sync_one, sync)); + wakeup_do_kick(thr); + } + + UNLOCK_DOMAIN(attrs, group->domain); + } + + UNLOCK_DOMAIN(control, sync->lock); +} + + +struct bird_thread_show_data { + struct bird_thread_syncer sync; + cli *cli; + linpool *lp; + u8 show_loops; + uint line_pos; + uint line_max; + const char **lines; +}; + +#define tsd_append(...) do { \ + if (!tsd->lines) \ + tsd->lines = mb_allocz(tsd->sync.pool, sizeof(const char *) * tsd->line_max); \ + if (tsd->line_pos >= tsd->line_max) \ + tsd->lines = mb_realloc(tsd->lines, sizeof (const char *) * (tsd->line_max *= 2)); \ + tsd->lines[tsd->line_pos++] = lp_sprintf(tsd->lp, __VA_ARGS__); \ +} while (0) + +static void +bird_thread_show_cli_cont(struct cli *c UNUSED) +{ + /* Explicitly do nothing to prevent CLI from trying to parse another command. */ +} + +static int +bird_thread_show_cli_cleanup(struct cli *c UNUSED) +{ + return 1; /* Defer the cleanup until the writeout is finished. */ +} + +static void +bird_thread_show_spent_time(struct bird_thread_show_data *tsd, const char *name, struct spent_time *st) +{ + char b[TIME_BY_SEC_SIZE * sizeof("1234567890, ")], *bptr = b, *bend = b + sizeof(b); + uint cs = CURRENT_SEC; + uint fs = NSEC_TO_SEC(st->last_written_ns); + + for (uint i = 0; i <= cs && i < TIME_BY_SEC_SIZE; i++) + bptr += bsnprintf(bptr, bend - bptr, "% 10lu ", + (cs - i > fs) ? 0 : st->by_sec_ns[(cs - i) % TIME_BY_SEC_SIZE]); + bptr[-1] = 0; /* Drop the trailing space */ + + tsd_append(" %s total time: % 9t s; last %d secs [ns]: %s", name, st->total_ns NS, MIN(CURRENT_SEC+1, TIME_BY_SEC_SIZE), b); +} + +static void +bird_thread_show_loop(struct bird_thread_show_data *tsd, struct birdloop *loop) +{ + tsd_append(" Loop %s", domain_name(loop->time.domain)); + bird_thread_show_spent_time(tsd, "Working ", &loop->working); + bird_thread_show_spent_time(tsd, "Locking ", &loop->locking); +} + +static void +bird_thread_show(struct bird_thread_syncer *sync) +{ + struct bird_thread_show_data *tsd = SKIP_BACK(struct bird_thread_show_data, sync, sync); + + if (!tsd->lp) + tsd->lp = lp_new(tsd->sync.pool); + + if (tsd->show_loops) + tsd_append("Thread %p%s (busy counter %d)", this_thread, this_thread->busy_active ? " [busy]" : "", this_thread->busy_counter); + + u64 total_time_ns = 0; + struct birdloop *loop; + WALK_LIST(loop, this_thread->loops) + { + if (tsd->show_loops) + bird_thread_show_loop(tsd, loop); + + total_time_ns += loop->working.total_ns + loop->locking.total_ns; + } + + if (tsd->show_loops) + { + tsd_append(" Total working time: %t", total_time_ns NS); + bird_thread_show_spent_time(tsd, "Overhead", &this_thread->overhead); + bird_thread_show_spent_time(tsd, "Idle ", &this_thread->idle); + } + else + tsd_append("Thread %p working %t s overhead %t s", + this_thread, total_time_ns NS, this_thread->overhead.total_ns NS); +} + +static void +cmd_show_threads_done(struct bird_thread_syncer *sync) +{ + struct bird_thread_show_data *tsd = SKIP_BACK(struct bird_thread_show_data, sync, sync); + ASSERT_DIE(birdloop_inside(&main_birdloop)); + + tsd->cli->cont = NULL; + tsd->cli->cleanup = NULL; + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + LOCK_DOMAIN(attrs, group->domain); + uint count = 0; + u64 total_time_ns = 0; + if (!EMPTY_LIST(group->loops)) + { + if (tsd->show_loops) + tsd_append("Unassigned loops in group %d:", i); + + struct birdloop *loop; + WALK_LIST(loop, group->loops) + { + if (tsd->show_loops) + bird_thread_show_loop(tsd, loop); + + total_time_ns += loop->working.total_ns + loop->locking.total_ns; + count++; + } + + if (tsd->show_loops) + tsd_append(" Total working time: %t", total_time_ns NS); + else + tsd_append("Unassigned %d loops in group %d, total time %t", count, i, total_time_ns NS); + } + else + tsd_append("All loops in group %d are assigned.", i); + + UNLOCK_DOMAIN(attrs, group->domain); + } + + for (uint i = 0; i < tsd->line_pos - 1; i++) - cli_printf(tsd->cli, -1026, "%s", tsd->lines[i]); ++ cli_printf(tsd->cli, -1027, "%s", tsd->lines[i]); + - cli_printf(tsd->cli, 1026, "%s", tsd->lines[tsd->line_pos-1]); ++ cli_printf(tsd->cli, 1027, "%s", tsd->lines[tsd->line_pos-1]); + cli_write_trigger(tsd->cli); + mb_free(tsd); +} + +void +cmd_show_threads(int show_loops) +{ + struct bird_thread_show_data *tsd = mb_allocz(&root_pool, sizeof(struct bird_thread_show_data)); + tsd->cli = this_cli; + tsd->show_loops = show_loops; + tsd->line_pos = 0; + tsd->line_max = 64; + + this_cli->cont = bird_thread_show_cli_cont; + this_cli->cleanup = bird_thread_show_cli_cleanup; + + bird_thread_sync_all(&tsd->sync, bird_thread_show, cmd_show_threads_done, "Show Threads"); +} + + +/* + * Birdloop + */ + +static struct bird_thread main_thread; +struct birdloop main_birdloop = { .thread = &main_thread, }; + +static void birdloop_enter_locked(struct birdloop *loop); + +void +birdloop_init(void) +{ + ns_init(); + + for (int i=0; i<2; i++) + { + struct birdloop_pickup_group *group = &pickup_groups[i]; + + group->domain = DOMAIN_NEW(attrs); + DOMAIN_SETUP(attrs, group->domain, "Loop Pickup", NULL); + init_list(&group->loops); + init_list(&group->threads); + } + + wakeup_init(main_birdloop.thread); + + main_birdloop.time.domain = the_bird_domain.the_bird; + main_birdloop.time.loop = &main_birdloop; + + times_update(); + timers_init(&main_birdloop.time, &root_pool); + + birdloop_enter_locked(&main_birdloop); +} + +static void +birdloop_stop_internal(struct birdloop *loop) +{ + LOOP_TRACE(loop, "Stopping"); + + /* Block incoming pings */ + u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire); + while (!atomic_compare_exchange_strong_explicit( + &loop->thread_transition, <t, LTT_PING, + memory_order_acq_rel, memory_order_acquire)) + ; + + /* Flush remaining events */ + ASSERT_DIE(!ev_run_list(&loop->event_list)); + + /* Drop timers */ + timer *t; + while (t = timers_first(&loop->time)) + tm_stop(t); + + /* Drop sockets */ + sock *s; + WALK_LIST_FIRST2(s, n, loop->sock_list) + birdloop_remove_socket(loop, s); + + /* Unschedule from Meta */ + ev_postpone(&loop->event); + tm_stop(&loop->timer); + + /* Remove from thread loop list */ + ASSERT_DIE(loop->thread == this_thread); + rem_node(&loop->n); + loop->thread = NULL; + + /* Uncount from thread group */ + LOCK_DOMAIN(attrs, this_thread->group->domain); + this_thread->group->loop_count--; + UNLOCK_DOMAIN(attrs, this_thread->group->domain); + + /* Leave the loop context without causing any other fuss */ + ASSERT_DIE(!ev_active(&loop->event)); + loop->ping_pending = 0; + account_to(&this_thread->overhead); + birdloop_leave(loop); + + /* Request local socket reload */ + this_thread->sock_changed = 1; + + /* Call the stopped hook from the main loop */ + loop->event.hook = loop->stopped; + loop->event.data = loop->stop_data; + ev_send_loop(&main_birdloop, &loop->event); +} + +static void +birdloop_run(void *_loop) +{ + /* Run priority events before the loop is executed */ + ev_run_list(&this_thread->priority_events); + + struct birdloop *loop = _loop; + account_to(&loop->locking); + birdloop_enter(loop); + u64 dif = account_to(&loop->working); + + if (dif > this_thread->max_loop_time_ns) + LOOP_WARN(loop, "locked %lu ns after its scheduled end time", dif); + + uint repeat, loop_runs = 0; + do { + repeat = 0; + LOOP_TRACE(loop, "Regular run"); + loop_runs++; + + if (loop->stopped) + /* Birdloop left inside the helper function */ + return birdloop_stop_internal(loop); + + /* Process sockets */ + repeat += sockets_fire(loop); + + /* Run timers */ + timers_fire(&loop->time, 0); + + /* Run flag handlers */ + repeat += birdloop_process_flags(loop); + + /* Run events */ + repeat += ev_run_list(&loop->event_list); + + /* Check end time */ + } while (repeat && (ns_now() < account_last + this_thread->max_loop_time_ns)); + + /* Request meta timer */ + timer *t = timers_first(&loop->time); + if (t) + tm_start_in(&loop->timer, tm_remains(t), this_thread->meta); + else + tm_stop(&loop->timer); + + /* Request re-run if needed */ + if (repeat) + ev_send_loop(this_thread->meta, &loop->event); + + /* Collect socket change requests */ + this_thread->sock_changed |= loop->sock_changed; + loop->sock_changed = 0; + + account_to(&this_thread->overhead); + birdloop_leave(loop); +} + +static void +birdloop_run_timer(timer *tm) +{ + struct birdloop *loop = tm->data; + LOOP_TRACE(loop, "Timer ready, requesting run"); + ev_send_loop(loop->thread->meta, &loop->event); +} + +static struct birdloop * +birdloop_vnew_internal(pool *pp, uint order, struct birdloop_pickup_group *group, const char *name, va_list args) +{ + struct domain_generic *dg = domain_new(order); + DG_LOCK(dg); + + pool *p = rp_vnewf(pp, dg, name, args); + struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop)); + loop->pool = p; + + loop->time.domain = dg; + loop->time.loop = loop; + + atomic_store_explicit(&loop->thread_transition, 0, memory_order_relaxed); + + birdloop_enter_locked(loop); + + ev_init_list(&loop->event_list, loop, name); + timers_init(&loop->time, p); + sockets_init(loop); + + loop->event = (event) { .hook = birdloop_run, .data = loop, }; + loop->timer = (timer) { .hook = birdloop_run_timer, .data = loop, }; + + if (group) + { + LOCK_DOMAIN(attrs, group->domain); + group->loop_count++; + add_tail(&group->loops, &loop->n); + if (EMPTY_LIST(group->threads)) + ev_send(&global_event_list, &group->start_threads); + else + wakeup_do_kick(SKIP_BACK(struct bird_thread, n, HEAD(group->threads))); + UNLOCK_DOMAIN(attrs, group->domain); + } + else + loop->n.next = loop->n.prev = &loop->n; + + birdloop_leave(loop); + + return loop; +} + +static struct birdloop * +birdloop_new_no_pickup(pool *pp, uint order, const char *name, ...) +{ + va_list args; + va_start(args, name); + struct birdloop *loop = birdloop_vnew_internal(pp, order, NULL, name, args); + va_end(args); + return loop; +} + +struct birdloop * +birdloop_new(pool *pp, uint order, btime max_latency, const char *name, ...) +{ + va_list args; + va_start(args, name); + struct birdloop *loop = birdloop_vnew_internal(pp, order, max_latency ? &pickup_groups[1] : &pickup_groups[0], name, args); + va_end(args); + return loop; +} + +static void +birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) +{ + LOOP_TRACE(loop, "Stop requested"); + + loop->stopped = stopped; + loop->stop_data = data; + + birdloop_do_ping(loop); +} + +void +birdloop_stop(struct birdloop *loop, void (*stopped)(void *data), void *data) +{ + DG_LOCK(loop->time.domain); + birdloop_do_stop(loop, stopped, data); + DG_UNLOCK(loop->time.domain); +} + +void +birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *data) +{ + ASSERT_DIE(loop == birdloop_current); + ASSERT_DIE(DG_IS_LOCKED(loop->time.domain)); + + birdloop_do_stop(loop, stopped, data); +} + +void +birdloop_free(struct birdloop *loop) +{ + ASSERT_DIE(loop->thread == NULL); + + struct domain_generic *dg = loop->time.domain; + DG_LOCK(dg); + rp_free(loop->pool); + DG_UNLOCK(dg); + domain_free(dg); +} + +static void +birdloop_enter_locked(struct birdloop *loop) +{ + ASSERT_DIE(DG_IS_LOCKED(loop->time.domain)); + ASSERT_DIE(!birdloop_inside(loop)); + + /* Store the old context */ + loop->prev_loop = birdloop_current; + + /* Put the new context */ + birdloop_current = loop; +} + +void +birdloop_enter(struct birdloop *loop) +{ + DG_LOCK(loop->time.domain); + return birdloop_enter_locked(loop); +} + +static void +birdloop_leave_locked(struct birdloop *loop) +{ + /* Check the current context */ + ASSERT_DIE(birdloop_current == loop); + + /* Send pending pings */ + if (loop->ping_pending) + { + LOOP_TRACE(loop, "sending pings on leave"); + loop->ping_pending = 0; + birdloop_do_ping(loop); + } + + /* Restore the old context */ + birdloop_current = loop->prev_loop; +} + +void +birdloop_leave(struct birdloop *loop) +{ + birdloop_leave_locked(loop); + DG_UNLOCK(loop->time.domain); +} + +void +birdloop_mask_wakeups(struct birdloop *loop) +{ + ASSERT_DIE(birdloop_wakeup_masked == NULL); + birdloop_wakeup_masked = loop; +} + +void +birdloop_unmask_wakeups(struct birdloop *loop) +{ + ASSERT_DIE(birdloop_wakeup_masked == loop); + birdloop_wakeup_masked = NULL; + if (birdloop_wakeup_masked_count) + wakeup_do_kick(loop->thread); + + birdloop_wakeup_masked_count = 0; +} + +void +birdloop_yield(void) +{ + usleep(100); +}