]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Sockets: Unified API for main and other loops
authorMaria Matejka <mq@ucw.cz>
Sun, 2 Apr 2023 17:15:22 +0000 (19:15 +0200)
committerMaria Matejka <mq@ucw.cz>
Tue, 4 Apr 2023 15:00:59 +0000 (17:00 +0200)
Now sk_open() requires an explicit IO loop to open the socket in. Also
specific functions for socket RX pause / resume are added to allow for
BGP corking.

And last but not least, socket reloop is now synchronous to resolve
weird cases of the target loop stopping before actually picking up the
relooped socket. Now the caller must ensure that both loops are locked
while relooping, and this way all sockets always have their respective
loop.

20 files changed:
lib/io-loop.h
lib/socket.h
proto/babel/packets.c
proto/bfd/bfd.c
proto/bfd/packets.c
proto/bgp/bgp.c
proto/bgp/packets.c
proto/ospf/iface.c
proto/radv/packets.c
proto/rip/packets.c
proto/rpki/ssh_transport.c
proto/rpki/tcp_transport.c
proto/rpki/transport.c
sysdep/bsd/krt-sock.c
sysdep/linux/netlink.c
sysdep/unix/io-loop.c
sysdep/unix/io-loop.h
sysdep/unix/io.c
sysdep/unix/main.c
sysdep/unix/unix.h

index ae58bbee7d54eec11a5424a404ed477399cc4924..502d77fc59f577c4330192a95c37cef89d4d32bf 100644 (file)
 
 extern struct birdloop main_birdloop;
 
-void sk_start(sock *s);
-void sk_stop(sock *s);
-void sk_reloop(sock *s, struct birdloop *loop);
-
 /* Start a new birdloop owned by given pool and domain */
 struct birdloop *birdloop_new(pool *p, uint order, const char *name);
 
@@ -58,6 +54,10 @@ struct birdloop_flag_handler {
 void birdloop_flag(struct birdloop *loop, u32 flag);
 void birdloop_flag_set_handler(struct birdloop *, struct birdloop_flag_handler *);
 
+/* Setup sockets */
+void birdloop_add_socket(struct birdloop *, struct birdsock *);
+void birdloop_remove_socket(struct birdloop *, struct birdsock *);
+
 void birdloop_init(void);
 
 /* Yield for a little while. Use only in special cases. */
index 5c69482ed9ba84262e7b540eeef776d82274f473..4b1695818a9eccc212040c5c49d8b20bb9c3b0d0 100644 (file)
@@ -80,17 +80,22 @@ typedef struct birdsock {
   const char *password;                        /* Password for MD5 authentication */
   const char *err;                     /* Error message */
   struct ssh_sock *ssh;                        /* Used in SK_SSH */
-  struct event reloop;                 /* Reloop event */
+  struct birdloop *loop;               /* BIRDLoop owning this socket */
 } sock;
 
 sock *sock_new(pool *);                        /* Allocate new socket */
 #define sk_new(X) sock_new(X)          /* Wrapper to avoid name collision with OpenSSL */
 
-int sk_open(sock *);                   /* Open socket */
+int sk_open(sock *, struct birdloop *);                /* Open socket */
+void sk_reloop(sock *, struct birdloop *);     /* Move socket to another loop. Both loops must be locked. */
+
 int sk_rx_ready(sock *s);
+_Bool sk_tx_pending(sock *s);
 int sk_send(sock *, uint len);         /* Send data, <0=err, >0=ok, 0=sleep */
 int sk_send_to(sock *, uint len, ip_addr to, uint port); /* sk_send to given destination */
 void sk_reallocate(sock *);            /* Free and allocate tbuf & rbuf */
+void sk_pause_rx(struct birdloop *loop, sock *s);
+void sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint));
 void sk_set_rbsize(sock *s, uint val); /* Resize RX buffer */
 void sk_set_tbsize(sock *s, uint val); /* Resize TX buffer, keeping content */
 void sk_set_tbuf(sock *s, void *tbuf); /* Switch TX buffer, NULL-> return to internal */
@@ -114,6 +119,7 @@ int sk_set_icmp6_filter(sock *s, int p1, int p2);
 void sk_log_error(sock *s, const char *p);
 
 byte * sk_rx_buffer(sock *s, int *len);        /* Temporary */
+sock *sk_next(sock *s);
 
 extern int sk_priority_control;                /* Suggested priority for control traffic, should be sysdep define */
 
@@ -127,11 +133,9 @@ extern int sk_priority_control;            /* Suggested priority for control traffic, shou
 #define SKF_HIGH_PORT  0x20    /* Choose port from high range if possible */
 #define SKF_FREEBIND   0x40    /* Allow socket to bind to a nonlocal address */
 
-#define SKF_THREAD     0x100   /* Socked used in thread, Do not add to main loop */
 #define SKF_TRUNCATED  0x200   /* Received packet was truncated, set by IO layer */
 #define SKF_HDRINCL    0x400   /* Used internally */
 #define SKF_PKTINFO    0x800   /* Used internally */
-#define SKF_PASSIVE_THREAD  0x1000  /* Child sockets used in thread, do not add to main loop */
 
 /*
  *     Socket types                 SA SP DA DP IF  TTL SendTo (?=may, -=must not, *=must)
index d4acc17042d212a74678ff104539a8591e43a945..f13bb5baa10962057b4da4ac5e07e9d9f92416a7 100644 (file)
@@ -1617,7 +1617,7 @@ babel_open_socket(struct babel_iface *ifa)
   sk->ttl = 1;
   sk->flags = SKF_LADDR_RX;
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
   if (sk_setup_multicast(sk) < 0)
index 1ef92b189d63a5a7c334965c2ee65170a5a6cd99..b6ccdb9a05d45df4ea0c2354d7a622419d4f5516 100644 (file)
@@ -603,16 +603,10 @@ bfd_free_iface(struct bfd_iface *ifa)
     return;
 
   if (ifa->sk)
-  {
-    sk_stop(ifa->sk);
     rfree(ifa->sk);
-  }
 
   if (ifa->rx)
-  {
-    sk_stop(ifa->rx);
     rfree(ifa->rx);
-  }
 
   rem_node(&ifa->n);
   mb_free(ifa);
@@ -1100,11 +1094,6 @@ bfd_shutdown(struct proto *P)
 
   bfd_drop_requests(p);
 
-  if (p->rx4_1) sk_stop(p->rx4_1);
-  if (p->rx4_m) sk_stop(p->rx4_m);
-  if (p->rx6_1) sk_stop(p->rx6_1);
-  if (p->rx6_m) sk_stop(p->rx6_m);
-
   return PS_DOWN;
 }
 
index 2200ab09f089a9cb6d221b16ee195575d81280a2..a22f223bebb16fd3df1db51489647fd8760bc63d 100644 (file)
@@ -430,12 +430,11 @@ bfd_open_rx_sk(struct bfd_proto *p, int multihop, int af)
   /* TODO: configurable ToS and priority */
   sk->tos = IP_PREC_INTERNET_CONTROL;
   sk->priority = sk_priority_control;
-  sk->flags = SKF_THREAD | SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0);
+  sk->flags = SKF_LADDR_RX | (!multihop ? SKF_TTL_RX : 0);
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
-  sk_start(sk);
   return sk;
 
  err:
@@ -462,12 +461,11 @@ bfd_open_rx_sk_bound(struct bfd_proto *p, ip_addr local, struct iface *ifa)
   /* TODO: configurable ToS and priority */
   sk->tos = IP_PREC_INTERNET_CONTROL;
   sk->priority = sk_priority_control;
-  sk->flags = SKF_THREAD | SKF_BIND | (ifa ? SKF_TTL_RX : 0);
+  sk->flags = SKF_BIND | (ifa ? SKF_TTL_RX : 0);
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
-  sk_start(sk);
   return sk;
 
  err:
@@ -494,12 +492,11 @@ bfd_open_tx_sk(struct bfd_proto *p, ip_addr local, struct iface *ifa)
   sk->tos = IP_PREC_INTERNET_CONTROL;
   sk->priority = sk_priority_control;
   sk->ttl = ifa ? 255 : -1;
-  sk->flags = SKF_THREAD | SKF_BIND | SKF_HIGH_PORT;
+  sk->flags = SKF_BIND | SKF_HIGH_PORT;
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
-  sk_start(sk);
   return sk;
 
  err:
index 122b0c22a9fa231643e706f2f6912a569cd7e8e6..a90283536a3aea9883ba162e0f03344fee2c5881 100644 (file)
@@ -263,7 +263,7 @@ bgp_listen_create(void *_ UNUSED)
       sk->rx_hook = bgp_incoming_connection;
       sk->err_hook = bgp_listen_sock_err;
 
-      if (sk_open(sk) < 0)
+      if (sk_open(sk, &main_birdloop) < 0)
       {
        sk_log_error(sk, p->p.name);
        log(L_ERR "%s: Cannot open listening socket", p->p.name);
@@ -1203,7 +1203,7 @@ bgp_connect(struct bgp_proto *p)  /* Enter Connect state and start establishing c
   bgp_setup_sk(conn, s);
   bgp_conn_set_state(conn, BS_CONNECT);
 
-  if (sk_open(s) < 0)
+  if (sk_open(s, p->p.loop) < 0)
     goto err;
 
   /* Set minimal receive TTL if needed */
index 924d68281f6106c00cd1203af77971b447a5151d..c90798a09569b9795baace299b5ade859d5fb1e9 100644 (file)
@@ -3015,7 +3015,7 @@ bgp_kick_tx(void *vconn)
     ;
 
   if (!max && !ev_active(conn->tx_ev))
-    ev_schedule(conn->tx_ev);
+    proto_send_event(&conn->bgp->p, conn->tx_ev);
 }
 
 void
@@ -3023,13 +3023,14 @@ bgp_tx(sock *sk)
 {
   struct bgp_conn *conn = sk->data;
 
+  ASSERT_DIE(birdloop_inside(conn->bgp->p.loop));
   DBG("BGP: TX hook\n");
   uint max = 1024;
   while (--max && (bgp_fire_tx(conn) > 0))
     ;
 
   if (!max && !ev_active(conn->tx_ev))
-    ev_schedule(conn->tx_ev);
+    proto_send_event(&conn->bgp->p, conn->tx_ev);
 }
 
 
index 1919bccb0f829975fa4e31b6843a4697c4d18f97..0aa7fa00338065601ddeb2b25c70fe341e44150d 100644 (file)
@@ -136,7 +136,7 @@ ospf_sk_open(struct ospf_iface *ifa)
   sk->flags = SKF_LADDR_RX | (ifa->check_ttl ? SKF_TTL_RX : 0);
   sk->ttl = ifa->cf->ttl_security ? 255 : 1;
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
   /* 12 is an offset of the checksum in an OSPFv3 packet */
@@ -220,7 +220,7 @@ ospf_open_vlink_sk(struct ospf_proto *p)
   sk->data = (void *) p;
   sk->flags = 0;
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
   /* 12 is an offset of the checksum in an OSPFv3 packet */
index 5cd8b2deb9c0a90f51a0d5e5d5e10326d93fe18b..c6b565d288d5df69e7ce08e186e5eb2b745f7fbb 100644 (file)
@@ -493,7 +493,7 @@ radv_sk_open(struct radv_iface *ifa)
   sk->data = ifa;
   sk->flags = SKF_LADDR_RX;
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, ifa->ra->p.loop) < 0)
     goto err;
 
   /* We want listen just to ICMPv6 messages of type RS and RA */
index 9c3bd7a3b871f61f34575ceb4d1f8925a37b22c2..fecdf896c2dd6c0b7f8ba5240defc59c9ef66f25 100644 (file)
@@ -1012,7 +1012,7 @@ rip_open_socket(struct rip_iface *ifa)
 
   /* sk->rbsize and sk->tbsize are handled in rip_iface_update_buffers() */
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, p->p.loop) < 0)
     goto err;
 
   if (ifa->cf->mode == RIP_IM_MULTICAST)
index 223afa80c1ca76070de8b3fa0afb0211abadceea..425ad4606cbf93498f2e199cfa0c80db3297ce47 100644 (file)
@@ -35,11 +35,9 @@ rpki_tr_ssh_open(struct rpki_tr_sock *tr)
   sk->ssh->subsystem = "rpki-rtr";
   sk->ssh->state = SK_SSH_CONNECT;
 
-  if (sk_open(sk) != 0)
+  if (sk_open(sk, cache->p->p.loop) != 0)
     return RPKI_TR_ERROR;
 
-  sk_start(sk);
-
   return RPKI_TR_SUCCESS;
 }
 
index 4e850c442028a5e81def554481476c297f037553..ebb8030fea2f7e677d7b26d2cce697f187c54b27 100644 (file)
@@ -28,11 +28,9 @@ rpki_tr_tcp_open(struct rpki_tr_sock *tr)
 
   sk->type = SK_TCP_ACTIVE;
 
-  if (sk_open(sk) != 0)
+  if (sk_open(sk, tr->cache->p->p.loop) != 0)
     return RPKI_TR_ERROR;
 
-  sk_start(sk);
-
   return RPKI_TR_SUCCESS;
 }
 
index 4026fca43d0e8af1af063ad8e4e58e25d2ba4db7..81bd6dd8d3c747a137e26883ae721d2685c0ffd4 100644 (file)
@@ -85,7 +85,6 @@ rpki_tr_open(struct rpki_tr_sock *tr)
   sk->rbsize = RPKI_RX_BUFFER_SIZE;
   sk->tbsize = RPKI_TX_BUFFER_SIZE;
   sk->tos = IP_PREC_INTERNET_CONTROL;
-  sk->flags |= SKF_THREAD;
   sk->vrf = cache->p->p.vrf;
 
   if (ipa_zero(sk->daddr) && sk->host)
@@ -121,7 +120,6 @@ rpki_tr_close(struct rpki_tr_sock *tr)
 
   if (tr->sk)
   {
-    sk_stop(tr->sk);
     rfree(tr->sk);
     tr->sk = NULL;
   }
index e7b2147401e8ae66bc87db6fbadac109da4cc6b5..094268b75195680b2ae827cbdc8726c2a5a45228 100644 (file)
@@ -1088,7 +1088,7 @@ krt_sock_open(pool *pool, void *data, int table_id UNUSED)
   sk->fd = fd;
   sk->data = data;
 
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, &main_birdloop) < 0)
     bug("krt-sock: sk_open failed");
 
   return sk;
index b18cb899e38264f7f264e2e37db1fa438b089594..e8a86ce4f58a35ac5f179fd954900c3cadcfb793 100644 (file)
@@ -2043,7 +2043,7 @@ nl_open_async(void)
   sk->rx_hook = nl_async_hook;
   sk->err_hook = nl_async_err_hook;
   sk->fd = fd;
-  if (sk_open(sk) < 0)
+  if (sk_open(sk, &main_birdloop) < 0)
     bug("Netlink: sk_open failed");
 }
 
index df162c6e3a796916b08ffe5ab1a3e685acf60889..0e2396f7dfb246a603095c33486b052d925bdf5c 100644 (file)
@@ -310,59 +310,95 @@ sockets_init(struct birdloop *loop)
   loop->sock_num = 0;
 }
 
-static void
-sockets_add(struct birdloop *loop, sock *s)
+void
+socket_changed(sock *s)
+{
+  struct birdloop *loop = s->loop;
+  ASSERT_DIE(birdloop_inside(loop));
+
+  loop->sock_changed++;
+  birdloop_ping(loop);
+}
+
+void
+birdloop_add_socket(struct birdloop *loop, sock *s)
 {
+  ASSERT_DIE(birdloop_inside(loop));
+  ASSERT_DIE(!s->loop);
+
   LOOP_TRACE(loop, "adding socket %p (total=%d)", s, loop->sock_num);
   add_tail(&loop->sock_list, &s->n);
   loop->sock_num++;
 
+  s->loop = loop;
   s->index = -1;
-  if (loop->thread)
-    atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
 
-  birdloop_ping(loop);
+  socket_changed(s);
 }
 
+extern sock *stored_sock; /* mainloop hack */
+
 void
-sk_start(sock *s)
+birdloop_remove_socket(struct birdloop *loop, sock *s)
 {
-  ASSERT_DIE(birdloop_current != &main_birdloop);
-  sockets_add(birdloop_current, s);
-}
+  ASSERT_DIE(!enlisted(&s->n) == !s->loop);
 
-static void
-sockets_remove(struct birdloop *loop, sock *s)
-{
-  if (!enlisted(&s->n))
+  if (!s->loop)
     return;
 
+  ASSERT_DIE(birdloop_inside(loop));
+  ASSERT_DIE(s->loop == loop);
+
   /* Decouple the socket from the loop at all. */
   LOOP_TRACE(loop, "removing socket %p (total=%d)", s, loop->sock_num);
 
+  if (loop->sock_active == s)
+    loop->sock_active = sk_next(s);
+
+  if ((loop == &main_birdloop) && (s == stored_sock))
+    stored_sock = sk_next(s);
+
   rem_node(&s->n);
   loop->sock_num--;
-  if (loop->thread)
-    atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
 
+  socket_changed(s);
+
+  s->loop = NULL;
   s->index = -1;
+}
 
-  /* Close the filedescriptor. If it ever gets into the poll(), it just returns
-   * POLLNVAL for this fd which then is ignored because nobody checks for
-   * that result. Or some other routine opens another fd, getting this number,
-   * yet also in this case poll() at worst spuriously returns and nobody checks
-   * for the result in this fd. No further precaution is needed. */
-  close(s->fd);
+void
+sk_reloop(sock *s, struct birdloop *loop)
+{
+  ASSERT_DIE(birdloop_inside(loop));
+  ASSERT_DIE(birdloop_inside(s->loop));
+
+  if (loop == s->loop)
+    return;
+
+  birdloop_remove_socket(s->loop, s);
+  birdloop_add_socket(loop, s);
+}
+
+void
+sk_pause_rx(struct birdloop *loop, sock *s)
+{
+  ASSERT_DIE(birdloop_inside(loop));
+  s->rx_hook = NULL;
+  socket_changed(s);
 }
 
 void
-sk_stop(sock *s)
+sk_resume_rx(struct birdloop *loop, sock *s, int (*hook)(sock *, uint))
 {
-  sockets_remove(birdloop_current, s);
+  ASSERT_DIE(birdloop_inside(loop));
+  ASSERT_DIE(hook);
+  s->rx_hook = hook;
+  socket_changed(s);
 }
 
 static inline uint sk_want_events(sock *s)
-{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
+{ return (s->rx_hook ? POLLIN : 0) | (sk_tx_pending(s) ? POLLOUT : 0); }
 
 void
 sockets_prepare(struct birdloop *loop, struct pfd *pfd)
@@ -392,40 +428,60 @@ sockets_prepare(struct birdloop *loop, struct pfd *pfd)
 
 int sk_read(sock *s, int revents);
 int sk_write(sock *s);
+void sk_err(sock *s, int revents);
 
-static void
+static int
 sockets_fire(struct birdloop *loop)
 {
+  if (EMPTY_LIST(loop->sock_list))
+    return 0;
+
+  int sch = 0;
+
   times_update();
 
   struct pollfd *pfd = loop->thread->pfd->pfd.data;
-  sock *s; node *n, *nxt;
-  WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
+  loop->sock_active = SKIP_BACK(sock, n, HEAD(loop->sock_list));
+
+  while (loop->sock_active)
   {
-    if (s->index < 0)
-      continue;
+    sock *s = loop->sock_active;
 
-    int rev = pfd[s->index].revents;
+    int rev;
+    if ((s->index >= 0) && (rev = pfd[s->index].revents) && !(rev & POLLNVAL))
+    {
+      int e = 1;
 
-    if (!rev)
-      continue;
+      if (rev & POLLOUT)
+      {
+       while ((s == loop->sock_active) && (e = sk_write(s)))
+         ;
 
-    if (rev & POLLNVAL)
-      bug("poll: invalid fd %d", s->fd);
+       if (s != loop->sock_active)
+         continue;
 
-    int e = 1;
+       if (!sk_tx_pending(s))
+         sch++;
+      }
 
-    if (rev & POLLIN)
-      while (e && s->rx_hook)
-       e = sk_read(s, rev);
+      if (rev & POLLIN)
+       while (e && (s == loop->sock_active) && s->rx_hook)
+         e = sk_read(s, rev);
 
-    if (rev & POLLOUT)
-    {
-      atomic_store_explicit(&loop->thread->poll_changed, 1, memory_order_release);
-      while (e = sk_write(s))
-       ;
+      if (s != loop->sock_active)
+       continue;
+
+      if (!(rev & (POLLOUT | POLLIN)) && (rev & POLLERR))
+       sk_err(s, rev);
+
+      if (s != loop->sock_active)
+       continue;
     }
+
+    loop->sock_active = sk_next(s);
   }
+
+  return sch;
 }
 
 /*
@@ -547,7 +603,8 @@ bird_thread_main(void *arg)
   thr->meta->thread = thr;
   birdloop_enter(thr->meta);
 
-  u32 refresh_sockets = 1;
+  thr->sock_changed = 1;
+
   struct pfd pfd;
   BUFFER_INIT(pfd.pfd, thr->pool, 16);
   BUFFER_INIT(pfd.loop, thr->pool, 16);
@@ -563,7 +620,7 @@ bird_thread_main(void *arg)
     {
       birdloop_enter(loop);
       if (!EMPTY_LIST(loop->sock_list))
-       refresh_sockets = 1;
+       thr->sock_changed = 1;
       birdloop_leave(loop);
     }
 
@@ -590,10 +647,10 @@ bird_thread_main(void *arg)
     ev_run_list(&thr->priority_events);
 
     /* Do we have to refresh sockets? */
-    refresh_sockets += atomic_exchange_explicit(&thr->poll_changed, 0, memory_order_acq_rel);
-
-    if (refresh_sockets)
+    if (thr->sock_changed)
     {
+      thr->sock_changed = 0;
+
       BUFFER_FLUSH(pfd.pfd);
       BUFFER_FLUSH(pfd.loop);
 
@@ -608,7 +665,6 @@ bird_thread_main(void *arg)
       }
 
       ASSERT_DIE(pfd.loop.used == pfd.pfd.used);
-      refresh_sockets = 0;
     }
     /* Nothing to do in at least 5 seconds, flush local hot page cache */
     else if (timeout > 5000)
@@ -957,6 +1013,15 @@ birdloop_init(void)
 static void
 birdloop_stop_internal(struct birdloop *loop)
 {
+  LOOP_TRACE(loop, "Stopping");
+
+  /* Block incoming pings */
+  u32 ltt = atomic_load_explicit(&loop->thread_transition, memory_order_acquire);
+  while (!atomic_compare_exchange_strong_explicit(
+       &loop->thread_transition, &ltt, LTT_PING,
+       memory_order_acq_rel, memory_order_acquire))
+    ;
+
   /* Flush remaining events */
   ASSERT_DIE(!ev_run_list(&loop->event_list));
 
@@ -965,17 +1030,27 @@ birdloop_stop_internal(struct birdloop *loop)
   while (t = timers_first(&loop->time))
     tm_stop(t);
 
-  /* No sockets allowed */
-  ASSERT_DIE(EMPTY_LIST(loop->sock_list));
+  /* Drop sockets */
+  sock *s;
+  WALK_LIST_FIRST2(s, n, loop->sock_list)
+    birdloop_remove_socket(loop, s);
 
   /* Unschedule from Meta */
   ev_postpone(&loop->event);
   tm_stop(&loop->timer);
 
-  /* Declare loop stopped */
+  /* Remove from thread loop list */
   rem_node(&loop->n);
+  loop->thread = NULL;
+
+  /* Leave the loop context without causing any other fuss */
+  ASSERT_DIE(!ev_active(&loop->event));
+  loop->ping_pending = 0;
   birdloop_leave(loop);
 
+  /* Request local socket reload */
+  this_thread->sock_changed++;
+
   /* Tail-call the stopped hook */
   loop->stopped(loop->stop_data);
 }
@@ -989,12 +1064,14 @@ birdloop_run(void *_loop)
   struct birdloop *loop = _loop;
   birdloop_enter(loop);
 
+  LOOP_TRACE(loop, "Regular run");
+
   if (loop->stopped)
     /* Birdloop left inside the helper function */
     return birdloop_stop_internal(loop);
 
   /* Process sockets */
-  sockets_fire(loop);
+  this_thread->sock_changed += sockets_fire(loop);
 
   /* Run timers */
   timers_fire(&loop->time, 0);
@@ -1016,6 +1093,10 @@ birdloop_run(void *_loop)
   else
     tm_stop(&loop->timer);
 
+  /* Collect socket change requests */
+  this_thread->sock_changed += loop->sock_changed;
+  loop->sock_changed = 0;
+
   birdloop_leave(loop);
 }
 
@@ -1074,6 +1155,8 @@ birdloop_new(pool *pp, uint order, const char *name)
 static void
 birdloop_do_stop(struct birdloop *loop, void (*stopped)(void *data), void *data)
 {
+  LOOP_TRACE(loop, "Stop requested");
+
   loop->stopped = stopped;
   loop->stop_data = data;
 
@@ -1100,8 +1183,7 @@ birdloop_stop_self(struct birdloop *loop, void (*stopped)(void *data), void *dat
 void
 birdloop_free(struct birdloop *loop)
 {
-  ASSERT_DIE(loop->links == 0);
-  ASSERT_DIE(birdloop_in_this_thread(loop));
+  ASSERT_DIE(loop->thread == NULL);
 
   domain_free(loop->time.domain);
   rfree(loop->pool);
@@ -1170,20 +1252,6 @@ birdloop_unmask_wakeups(struct birdloop *loop)
   birdloop_wakeup_masked_count = 0;
 }
 
-void
-birdloop_link(struct birdloop *loop)
-{
-  ASSERT_DIE(birdloop_inside(loop));
-  loop->links++;
-}
-
-void
-birdloop_unlink(struct birdloop *loop)
-{
-  ASSERT_DIE(birdloop_inside(loop));
-  loop->links--;
-}
-
 void
 birdloop_yield(void)
 {
index e606f07e3db67762a057820b711ce96c99550820..7ec903af9b002afdcc91c2529db1ccf5067a2f95 100644 (file)
@@ -22,6 +22,7 @@ struct pfd {
 };
 
 void sockets_prepare(struct birdloop *, struct pfd *);
+void socket_changed(struct birdsock *);
 
 void pipe_new(struct pipe *);
 void pipe_pollin(struct pipe *, struct pfd *);
@@ -40,12 +41,12 @@ struct birdloop
   struct timeloop time;
   event_list event_list;
   list sock_list;
+  struct birdsock *sock_active;
   int sock_num;
+  uint sock_changed;
 
   uint ping_pending;
 
-  uint links;
-
   _Atomic u32 thread_transition;
 #define LTT_PING  1
 #define LTT_MOVE  2
@@ -66,8 +67,6 @@ struct bird_thread
 {
   node n;
 
-  _Atomic u32 poll_changed;
-
   struct pipe wakeup;
   event_list priority_events;
 
@@ -83,6 +82,8 @@ struct bird_thread
   struct pfd *pfd;
 
   event cleanup_event;
+
+  int sock_changed;
 };
 
 #endif
index 067970961cc1e938efe8b4787109d9f2f891f2e8..88d187a4f6cc17934b56ae163fd223995327bf96 100644 (file)
@@ -722,10 +722,7 @@ sk_log_error(sock *s, const char *p)
  *     Actual struct birdsock code
  */
 
-static struct birdsock *current_sock;
-static struct birdsock *stored_sock;
-
-static inline sock *
+sock *
 sk_next(sock *s)
 {
   if (!s->n.next->next)
@@ -787,6 +784,7 @@ sk_ssh_free(sock *s)
 }
 #endif
 
+
 static void
 sk_free(resource *r)
 {
@@ -799,18 +797,10 @@ sk_free(resource *r)
     sk_ssh_free(s);
 #endif
 
-  if ((s->fd < 0) || (s->flags & SKF_THREAD))
-    return;
-
-  if (s == current_sock)
-    current_sock = sk_next(s);
-  if (s == stored_sock)
-    stored_sock = sk_next(s);
-
-  if (enlisted(&s->n))
-    rem_node(&s->n);
+  if (s->loop)
+    birdloop_remove_socket(s->loop, s);
 
-  if (s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
+  if (s->fd >= 0 && s->type != SK_SSH && s->type != SK_SSH_ACTIVE)
     close(s->fd);
 
   s->fd = -1;
@@ -1022,12 +1012,6 @@ sk_setup(sock *s)
   return 0;
 }
 
-static void
-sk_insert(sock *s)
-{
-  add_tail(&main_birdloop.sock_list, &s->n);
-}
-
 static void
 sk_tcp_connected(sock *s)
 {
@@ -1101,10 +1085,7 @@ sk_passive_connected(sock *s, int type)
     return 1;
   }
 
-  if (s->flags & SKF_PASSIVE_THREAD)
-    t->flags |= SKF_THREAD;
-  else
-    sk_insert(t);
+  birdloop_add_socket(s->loop, t);
 
   sk_alloc_bufs(t);
   s->rx_hook(t, 0);
@@ -1319,6 +1300,7 @@ sk_open_ssh(sock *s)
 
 /**
  * sk_open - open a socket
+ * @loop: loop
  * @s: socket
  *
  * This function takes a socket resource created by sk_new() and
@@ -1328,7 +1310,7 @@ sk_open_ssh(sock *s)
  * Result: 0 for success, -1 for an error.
  */
 int
-sk_open(sock *s)
+sk_open(sock *s, struct birdloop *loop)
 {
   int af = AF_UNSPEC;
   int fd = -1;
@@ -1481,9 +1463,7 @@ sk_open(sock *s)
     sk_alloc_bufs(s);
   }
 
-  if (!(s->flags & SKF_THREAD))
-    sk_insert(s);
-
+  birdloop_add_socket(loop, s);
   return 0;
 
 err:
@@ -1493,7 +1473,7 @@ err:
 }
 
 int
-sk_open_unix(sock *s, char *name)
+sk_open_unix(sock *s, struct birdloop *loop, char *name)
 {
   struct sockaddr_un sa;
   int fd;
@@ -1520,40 +1500,10 @@ sk_open_unix(sock *s, char *name)
     return -1;
 
   s->fd = fd;
-  sk_insert(s);
+  birdloop_add_socket(loop, s);
   return 0;
 }
 
-static void
-sk_reloop_hook(void *_vs)
-{
-  sock *s = _vs;
-  if (birdloop_inside(&main_birdloop))
-  {
-    s->flags &= ~SKF_THREAD;
-    sk_insert(s);
-  }
-  else
-  {
-    s->flags |= SKF_THREAD;
-    sk_start(s);
-  }
-}
-
-void
-sk_reloop(sock *s, struct birdloop *loop)
-{
-  if (enlisted(&s->n))
-    rem_node(&s->n);
-
-  s->reloop = (event) {
-    .hook = sk_reloop_hook,
-    .data = s,
-  };
-
-  ev_send_loop(loop, &s->reloop);
-}
-
 
 #define CMSG_RX_SPACE MAX(CMSG4_SPACE_PKTINFO+CMSG4_SPACE_TTL, \
                          CMSG6_SPACE_PKTINFO+CMSG6_SPACE_TTL)
@@ -1676,6 +1626,13 @@ sk_recvmsg(sock *s)
 
 static inline void reset_tx_buffer(sock *s) { s->ttx = s->tpos = s->tbuf; }
 
+_Bool
+sk_tx_pending(sock *s)
+{
+  return s->ttx != s->tpos;
+}
+
+
 static int
 sk_maybe_write(sock *s)
 {
@@ -1686,7 +1643,7 @@ sk_maybe_write(sock *s)
   case SK_TCP:
   case SK_MAGIC:
   case SK_UNIX:
-    while (s->ttx != s->tpos)
+    while (sk_tx_pending(s))
     {
       e = write(s->fd, s->ttx, s->tpos - s->ttx);
 
@@ -1708,7 +1665,7 @@ sk_maybe_write(sock *s)
 
 #ifdef HAVE_LIBSSH
   case SK_SSH:
-    while (s->ttx != s->tpos)
+    while (sk_tx_pending(s))
     {
       e = ssh_channel_write(s->ssh->channel, s->ttx, s->tpos - s->ttx);
 
@@ -1791,7 +1748,12 @@ sk_send(sock *s, unsigned len)
 {
   s->ttx = s->tbuf;
   s->tpos = s->tbuf + len;
-  return sk_maybe_write(s);
+
+  int e = sk_maybe_write(s);
+  if (e == 0) /* Trigger thread poll reload to poll this socket's write. */
+    socket_changed(s);
+
+  return e;
 }
 
 /**
@@ -1838,7 +1800,7 @@ call_rx_hook(sock *s, int size)
   if (s->rx_hook(s, size))
   {
     /* We need to be careful since the socket could have been deleted by the hook */
-    if (current_sock == s)
+    if (s->loop->sock_active == s)
       s->rpos = s->rbuf;
   }
 }
@@ -2002,7 +1964,7 @@ sk_write_noflush(sock *s)
 #endif
 
   default:
-    if (s->ttx != s->tpos && sk_maybe_write(s) > 0)
+    if (sk_tx_pending(s) && sk_maybe_write(s) > 0)
     {
       if (s->tx_hook)
        s->tx_hook(s);
@@ -2224,6 +2186,8 @@ static int short_loops = 0;
 #define SHORT_LOOP_MAX 10
 #define WORK_EVENTS_MAX 10
 
+sock *stored_sock;
+
 void
 io_loop(void)
 {
@@ -2312,17 +2276,13 @@ io_loop(void)
          times_update();
 
          /* guaranteed to be non-empty */
-         current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
+         main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
 
-         while (current_sock)
+         while (main_birdloop.sock_active)
+         {
+           sock *s = main_birdloop.sock_active;
+           if (s->index != -1)
            {
-             sock *s = current_sock;
-             if (s->index == -1)
-               {
-                 current_sock = sk_next(s);
-                 goto next;
-               }
-
              int e;
              int steps;
 
@@ -2333,10 +2293,11 @@ io_loop(void)
                    steps--;
                    io_log_event(s->rx_hook, s->data);
                    e = sk_read(s, pfd.pfd.data[s->index].revents);
-                   if (s != current_sock)
-                     goto next;
                  }
-               while (e && s->rx_hook && steps);
+               while (e && (main_birdloop.sock_active == s) && s->rx_hook && steps);
+
+             if (s != main_birdloop.sock_active)
+               continue;
 
              steps = MAX_STEPS;
              if (pfd.pfd.data[s->index].revents & POLLOUT)
@@ -2345,56 +2306,54 @@ io_loop(void)
                    steps--;
                    io_log_event(s->tx_hook, s->data);
                    e = sk_write(s);
-                   if (s != current_sock)
-                     goto next;
                  }
-               while (e && steps);
+               while (e && (main_birdloop.sock_active == s) && steps);
 
-             current_sock = sk_next(s);
-           next: ;
+             if (s != main_birdloop.sock_active)
+               continue;
            }
 
+           main_birdloop.sock_active = sk_next(s);
+         }
+
          short_loops++;
          if (events && (short_loops < SHORT_LOOP_MAX))
            continue;
          short_loops = 0;
 
          int count = 0;
-         current_sock = stored_sock;
-         if (current_sock == NULL)
-           current_sock = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
+         main_birdloop.sock_active = stored_sock;
+         if (main_birdloop.sock_active == NULL)
+           main_birdloop.sock_active = SKIP_BACK(sock, n, HEAD(main_birdloop.sock_list));
 
-         while (current_sock && count < MAX_RX_STEPS)
+         while (main_birdloop.sock_active && count < MAX_RX_STEPS)
            {
-             sock *s = current_sock;
+             sock *s = main_birdloop.sock_active;
              if (s->index == -1)
-               {
-                 current_sock = sk_next(s);
-                 goto next2;
-               }
+               goto next2;
 
              if (!s->fast_rx && (pfd.pfd.data[s->index].revents & POLLIN) && s->rx_hook)
                {
                  count++;
                  io_log_event(s->rx_hook, s->data);
                  sk_read(s, pfd.pfd.data[s->index].revents);
-                 if (s != current_sock)
-                   goto next2;
+                 if (s != main_birdloop.sock_active)
+                   continue;
                }
 
              if (pfd.pfd.data[s->index].revents & (POLLHUP | POLLERR))
                {
                  sk_err(s, pfd.pfd.data[s->index].revents);
-                 if (s != current_sock)
-                   goto next2;
+                 if (s != main_birdloop.sock_active)
+                   continue;
                }
 
-             current_sock = sk_next(s);
            next2: ;
+             main_birdloop.sock_active = sk_next(s);
            }
 
 
-         stored_sock = current_sock;
+         stored_sock = main_birdloop.sock_active;
        }
     }
 }
index a9ae842ad57299b793198fd0c50215f5d0330219..79db32d32218e55ffc6675c607446fed3b18b0eb 100644 (file)
@@ -542,7 +542,7 @@ cli_init_unix(uid_t use_uid, gid_t use_gid)
   /* Return value intentionally ignored */
   unlink(path_control_socket);
 
-  if (sk_open_unix(s, path_control_socket) < 0)
+  if (sk_open_unix(s, &main_birdloop, path_control_socket) < 0)
     die("Cannot create control socket %s: %m", path_control_socket);
 
   if (use_uid || use_gid)
index e26afe412a020ffef2b018b9a70bf9d19ea73c67..606b79cd131fff0437784368a66dbb036878bf9b 100644 (file)
@@ -9,6 +9,9 @@
 #ifndef _BIRD_UNIX_H_
 #define _BIRD_UNIX_H_
 
+#include "nest/bird.h"
+#include "lib/io-loop.h"
+
 #include <sys/socket.h>
 #include <signal.h>
 
@@ -110,7 +113,7 @@ extern volatile sig_atomic_t async_shutdown_flag;
 void io_init(void);
 void io_loop(void);
 void io_log_dump(void);
-int sk_open_unix(struct birdsock *s, char *name);
+int sk_open_unix(struct birdsock *s, struct birdloop *, char *name);
 struct rfile *rf_open(struct pool *, const char *name, const char *mode);
 void *rf_file(struct rfile *f);
 int rf_fileno(struct rfile *f);