]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
BGP: Decoupling of listen sockets 299-mq-bgp-async-listen
authorMaria Matejka <mq@ucw.cz>
Sat, 11 Oct 2025 18:16:57 +0000 (20:16 +0200)
committerMaria Matejka <mq@ucw.cz>
Thu, 13 Nov 2025 11:25:03 +0000 (12:25 +0100)
For performance reasons, every listening socket now gets its own loop so
that with strict bind one can accept connections by more threads than
just one.

proto/bgp/bgp.c
proto/bgp/bgp.h

index c99d1301ed7cff305a1327b08df5dbeb2acd2171..f36562df49f2914cbb196caade989f6549dcc2a9 100644 (file)
 #include "lib/socket.h"
 #include "lib/resource.h"
 #include "lib/string.h"
+#include "lib/hash.h"
 
 #include "bgp.h"
 #ifdef CONFIG_BMP
 
 /* Listening socket machinery */
 struct bgp_listen_private {
-  struct birdloop *loop;
+  DOMAIN(subproto) lock;
   struct bgp_listen_private **locked_at;
+  pool *pool;
   list sockets;                /* Listening sockets */
 };
 
 typedef union bgp_listen {
-  struct birdloop *loop;
+  DOMAIN(subproto) lock;
   struct bgp_listen_private priv;
 } bgp_listen;
 
 static bgp_listen bgp_listen_pub;
 
-BLO_UNLOCK_CLEANUP(bgp_listen);
+LOBJ_UNLOCK_CLEANUP(bgp_listen, subproto);
 
-#define BGP_LISTEN_PRIV(bl)    ASSERT_DIE(birdloop_inside(bgp_listen_pub.loop)); struct bgp_listen_private *bl = &bgp_listen_pub.priv;
-#define BGP_LISTEN_LOCKED(bl)  BLO_LOCKED(&bgp_listen_pub, bl, bgp_listen)
-#define BGP_LISTEN_LOCK(bl)    BLO_LOCK(&bgp_listen_pub, bl, bgp_listen)
-#define BGP_LISTEN_UNLOCK(bl)  ( BLO_UNLOCK_CLEANUP_NAME(bgp_listen)(&bl), bl = NULL )
+#define BGP_LISTEN_LOCKED(bl)  LOBJ_LOCKED(&bgp_listen_pub, bl, bgp_listen, subproto)
+#define BGP_LISTEN_LOCK(bl)    LOBJ_LOCK(&bgp_listen_pub, bl, bgp_listen, subproto)
+#define BGP_LISTEN_UNLOCK(bl)  ( LOBJ_UNLOCK_CLEANUP_NAME(bgp_listen, subproto)(&bl), bl = NULL )
 
 static void bgp_connect(struct bgp_proto *p);
 static void bgp_active(struct bgp_proto *p);
@@ -218,7 +219,7 @@ bgp_same_ao_key(struct ao_key *a, struct ao_key *b)
 }
 
 static inline int
-bgp_sk_add_ao_key(struct bgp_proto *p, sock *sk, struct bgp_ao_key *key)
+bgp_sk_add_ao_key(struct bgp_proto *p, sock *sk, struct bgp_ao_key *key, const char *kind)
 {
   ip_addr prefix = p->cf->remote_ip;
   int pxlen = -1;
@@ -228,8 +229,7 @@ bgp_sk_add_ao_key(struct bgp_proto *p, sock *sk, struct bgp_ao_key *key)
   {
     sk_log_error(sk, p->p.name);
     log(L_ERR "%s: Cannot add TCP-AO key %d/%d to BGP %s socket",
-       p->p.name, key->key.send_id, key->key.recv_id,
-       ((sk == p->listen.sock->sk) ? "listening" : "session"));
+       p->p.name, key->key.send_id, key->key.recv_id, kind);
   }
 
   return rv;
@@ -243,22 +243,23 @@ bgp_enable_ao_key(struct bgp_proto *p, struct bgp_ao_key *key)
   BGP_TRACE(D_EVENTS, "Adding TCP-AO key %d/%d", key->key.send_id, key->key.recv_id);
 
   /* Handle listening socket */
-  if (bgp_sk_add_ao_key(p, p->listen.sock->sk, key) < 0)
-  {
-    key->failed = 1;
-    return -1;
-  }
+  BGP_SOCKET_LOCKED(p->listen.sock, bs)
+    if (bgp_sk_add_ao_key(p, bs->sk, key, "listening") < 0)
+    {
+      key->failed = 1;
+      return -1;
+    }
 
   key->active = 1;
 
   /* Handle incoming socket */
   if (p->incoming_conn.sk)
-    if (bgp_sk_add_ao_key(p, p->incoming_conn.sk, key) < 0)
+    if (bgp_sk_add_ao_key(p, p->incoming_conn.sk, key, "session (in)") < 0)
       return -1;
 
   /* Handle outgoing socket */
   if (p->outgoing_conn.sk)
-    if (bgp_sk_add_ao_key(p, p->outgoing_conn.sk, key) < 0)
+    if (bgp_sk_add_ao_key(p, p->outgoing_conn.sk, key, "session (out)") < 0)
       return -1;
 
   return 0;
@@ -272,7 +273,7 @@ struct bgp_active_keys {
 
 static int
 bgp_sk_delete_ao_key(struct bgp_proto *p, sock *sk, struct bgp_ao_key *key,
-                    struct bgp_ao_key *backup, int current_key_id, int rnext_key_id)
+                    struct bgp_ao_key *backup, int current_key_id, int rnext_key_id, const char *kind)
 {
   struct ao_key *set_current = NULL, *set_rnext = NULL;
 
@@ -300,8 +301,7 @@ bgp_sk_delete_ao_key(struct bgp_proto *p, sock *sk, struct bgp_ao_key *key,
   {
     sk_log_error(sk, p->p.name);
     log(L_ERR "%s: Cannot delete TCP-AO key %d/%d from BGP %s socket",
-       p->p.name, key->key.send_id, key->key.recv_id,
-       ((sk == p->listen.sock->sk) ? "listening" : "session"));
+       p->p.name, key->key.send_id, key->key.recv_id, kind);
   }
 
   return rv;
@@ -315,19 +315,20 @@ bgp_disable_ao_key(struct bgp_proto *p, struct bgp_ao_key *key, struct bgp_activ
   BGP_TRACE(D_EVENTS, "Deleting TCP-AO key %d/%d", key->key.send_id, key->key.recv_id);
 
   /* Handle listening socket */
-  if (bgp_sk_delete_ao_key(p, p->listen.sock->sk, key, NULL, -1, -1) < 0)
-    return -1;
+  BGP_SOCKET_LOCKED(p->listen.sock, bs)
+    if (bgp_sk_delete_ao_key(p, bs->sk, key, NULL, -1, -1, "listening") < 0)
+      return -1;
 
   key->active = 0;
 
   /* Handle incoming socket */
   if (p->incoming_conn.sk && info)
-    if (bgp_sk_delete_ao_key(p, p->incoming_conn.sk, key, info->backup, info->in_current, info->in_rnext) < 0)
+    if (bgp_sk_delete_ao_key(p, p->incoming_conn.sk, key, info->backup, info->in_current, info->in_rnext, "session (in)") < 0)
       return -1;
 
   /* Handle outgoing socket */
   if (p->outgoing_conn.sk && info)
-    if (bgp_sk_delete_ao_key(p, p->outgoing_conn.sk, key, info->backup, info->out_current, info->out_rnext) < 0)
+    if (bgp_sk_delete_ao_key(p, p->outgoing_conn.sk, key, info->backup, info->out_current, info->out_rnext, "session (out)") < 0)
       return -1;
 
   return 0;
@@ -642,12 +643,16 @@ bgp_setup_auth(struct bgp_proto *p, int enable)
       pxlen = net_pxlen(p->cf->remote_range);
     }
 
-    int rv = sk_set_md5_auth(p->listen.sock->sk,
+    int rv;
+    BGP_SOCKET_LOCKED(p->listen.sock, bs)
+    {
+      rv = sk_set_md5_auth(bs->sk,
                             p->cf->local_ip, prefix, pxlen, p->cf->iface,
                             enable ? p->cf->password : NULL, p->cf->setkey);
 
-    if (rv < 0)
-      sk_log_error(p->listen.sock->sk, p->p.name);
+      if (rv < 0)
+       sk_log_error(bs->sk, p->p.name);
+    }
 
     return rv;
   }
@@ -740,17 +745,18 @@ bgp_listen_open(struct bgp_proto *p, struct bgp_listen_request *req)
   BGP_LISTEN_LOCK(bl);
 
   /* First try to find existing socket */
-  struct bgp_socket *bs;
+  bgp_socket *bs;
   WALK_LIST(bs, bl->sockets)
     if (bgp_socket_match(&req->params, &bs->params))
-    {
-      bgp_listen_debug(p, &req->params, "exists: %p", bs);
-      add_tail(&bs->requests, &req->n);
-      req->sock = bs;
-      return 0;
-    }
+      BGP_SOCKET_LOCKED(bs, bsp)
+      {
+       bgp_listen_debug(p, &req->params, "exists: %p", bs);
+       add_tail(&bsp->requests, &req->n);
+       req->sock = bs;
+       return 0;
+      }
 
-  sock *sk = sk_new(birdloop_pool(bl->loop));
+  sock *sk = sk_new(p->p.pool);
   sk->type = SK_TCP_PASSIVE;
   sk->ttl = 255;
   sk->saddr = req->params.addr;
@@ -764,48 +770,79 @@ bgp_listen_open(struct bgp_proto *p, struct bgp_listen_request *req)
   sk->rx_hook = bgp_incoming_connection;
   sk->err_hook = bgp_listen_sock_err;
 
-  if (sk_open(sk, bl->loop) < 0)
-    goto err;
+  if (sk_open(sk, p->p.loop) < 0)
+  {
+    sk_log_error(sk, p->p.name);
+    log(L_ERR "%s: Cannot open listening socket", p->p.name);
+    sk_close(sk);
+    return -1;
+  }
+
+  struct birdloop *loop = birdloop_new(bl->pool, DOMAIN_ORDER(service), NULL,
+      "bgp listen socket at %I%J port %u",
+      req->params.addr, req->params.iface, req->params.port);
+
+  pool *pool = birdloop_pool(loop);
+
+  birdloop_enter(loop);
+  sk_reloop(sk, loop);
+  rmove(sk, pool);
+  sk->pool = pool;
+
+  bs = mb_allocz(pool, sizeof(*bs));
+  struct bgp_socket_private *bsp = &bs->priv;
 
-  bs = mb_allocz(birdloop_pool(bl->loop), sizeof(struct bgp_socket));
-  bs->params = req->params;
-  bs->sk = sk;
+  bsp->loop = loop;
+  bsp->params = req->params;
+  bsp->sk = sk;
 
   sk->data = bs;
   req->sock = bs;
 
-  init_list(&bs->requests);
-  add_tail(&bs->requests, &req->n);
+  init_list(&bsp->requests);
+  add_tail(&bsp->requests, &req->n);
   add_tail(&bl->sockets, &bs->n);
 
   bgp_listen_debug(p, &req->params, "create: %p", bs);
+  birdloop_leave(loop);
 
   return 0;
+}
 
-err:
-  sk_log_error(sk, p->p.name);
-  log(L_ERR "%s: Cannot open listening socket", p->p.name);
-  sk_close(sk);
-  return -1;
+static void
+bgp_listen_done(void *_bs)
+{
+  /* Loop finishing must be done from the main loop */
+  ASSERT_DIE(birdloop_inside(&main_birdloop));
+  BGP_LISTEN_LOCK(bl);
+
+  /* The loop finisher must at least once enter the loop to ensure
+   * that the requestor is done */
+  BGP_SOCKET_LOCKED((bgp_socket *) _bs, bsp)
+    ;
+
+  /* Everything related is allocated from the loop's pool */
+  birdloop_free(((bgp_socket *) _bs)->loop);
 }
 
 static void
 bgp_listen_close(struct bgp_proto *p, struct bgp_listen_request *req)
 {
   BGP_LISTEN_LOCK(bl);
-  struct bgp_socket *bs = req->sock;
+  BGP_SOCKET_LOCK(req->sock, bsp);
 
   rem_node(&req->n);
-  if (EMPTY_LIST(bs->requests))
+
+  if (EMPTY_LIST(bsp->requests))
   {
-    bgp_listen_debug(p, &req->params, "free: %p", bs);
-    sk_close(bs->sk);
-    rem_node(&bs->n);
-    mb_free(bs);
+    bgp_listen_debug(p, &req->params, "free: %p", bsp);
+    sk_close(bsp->sk);
+    rem_node(&bsp->n);
+    birdloop_stop_self(bsp->loop, bgp_listen_done, bsp);
   }
   else
   {
-    bgp_listen_debug(p, &req->params, "unlink: %p", bs);
+    bgp_listen_debug(p, &req->params, "unlink: %p", bsp);
   }
 }
 
@@ -1853,10 +1890,9 @@ err2:
  *
  */
 static struct bgp_listen_request *
-bgp_find_proto(struct bgp_listen_private *bl UNUSED, sock *sk)
+bgp_find_proto(struct bgp_socket_private *bs, sock *sk)
 {
   struct bgp_listen_request *best = NULL;
-  struct bgp_socket *bs = sk->data;
   struct bgp_listen_request *req;
 
   /* sk->iface is valid only if src or dst address is link-local */
@@ -1897,11 +1933,12 @@ bgp_find_proto(struct bgp_listen_private *bl UNUSED, sock *sk)
 static int
 bgp_incoming_connection(sock *sk, uint dummy UNUSED)
 {
-  BGP_LISTEN_PRIV(bl);
+  struct bgp_socket_private *bs = sk->data;
+  ASSERT_DIE(birdloop_inside(bs->loop));
 
   DBG("BGP: Incoming connection from %I port %d\n", sk->daddr, sk->dport);
 
-  struct bgp_listen_request *req = bgp_find_proto(bl, sk);
+  struct bgp_listen_request *req = bgp_find_proto(bs, sk);
   if (req)
   {
     struct bgp_proto *p = req->p;
@@ -1909,9 +1946,6 @@ bgp_incoming_connection(sock *sk, uint dummy UNUSED)
     bis->sk = sk;
     add_tail(&p->listen.incoming_sockets, &bis->n);
 
-    rmove(sk, birdloop_pool(bl->loop));
-    sk_reloop(sk, bl->loop);
-
     callback_activate(&p->incoming_connection);
   }
   else
@@ -1932,7 +1966,7 @@ bgp_incoming_connection_dynamic(struct callback *cb)
   while (true)
   {
     sock *sk = NULL;
-    BGP_LISTEN_LOCKED(bl)
+    BGP_SOCKET_LOCKED(p->listen.sock, bsp)
     {
       /* Pop the first incoming socket from the queue */
       if (EMPTY_LIST(p->listen.incoming_sockets))
@@ -1956,7 +1990,7 @@ static void
 bgp_incoming_connection_single(struct callback *cb)
 {
   SKIP_BACK_DECLARE(struct bgp_proto, p, incoming_connection, cb);
-  BGP_LISTEN_LOCK(bl);
+  BGP_SOCKET_LOCK(p->listen.sock, bsp);
 
   /* Called again by race condition, ignore */
   if (EMPTY_LIST(p->listen.incoming_sockets))
@@ -3793,7 +3827,9 @@ void bgp_build(void)
   proto_build(&proto_bgp);
   bgp_register_attrs();
 
-  bgp_listen_pub.loop = birdloop_new(proto_pool, DOMAIN_ORDER(service), NULL, "bgp listen loop");
+  bgp_listen_pub.lock = DOMAIN_NEW(subproto);
+
   BGP_LISTEN_LOCK(bl);
+  bl->pool = rp_new(proto_pool, bl->lock.subproto, "BGP Listeners");
   init_list(&bl->sockets);
 }
index e39b67c8d38ff82e62e8ec0079b9b83f77535c80..4652a84d201a42f2817bc975e1afc5c52e7744b1 100644 (file)
@@ -308,19 +308,37 @@ struct bgp_ao_state {
   struct bgp_ao_key *best_key;
 };
 
-struct bgp_socket {
-  node n;                              /* Node in global bgp_sockets */
+struct bgp_socket_params {
+  ip_addr addr;                                /* Local address to bind to */
+  struct iface *iface;                 /* Local interface to bind to */
+  struct iface *vrf;                   /* VRF to bind to */
+  uint port;                           /* Local port to bind to (mandatory) */
+  uint flags;                          /* Additional SKF_* flags */
+};
+
+#define BGP_SOCKET_PUB \
+  node n;                              /* Node in global bgp_listen -> sockets */ \
+  struct birdloop *loop;               /* Socket's accepting loop */ \
+  struct bgp_socket_params params;     /* Socket matching parameters */ \
+
+struct bgp_socket_private {
+  BGP_SOCKET_PUB;
+  struct bgp_socket_private **locked_at;
   list requests;                       /* Listen requests */
   sock *sk;                            /* Real listening socket */
-  struct bgp_socket_params {
-    ip_addr addr;                      /* Local address to bind to */
-    struct iface *iface;               /* Local interface to bind to */
-    struct iface *vrf;                 /* VRF to bind to */
-    uint port;                         /* Local port to bind to (mandatory) */
-    uint flags;                                /* Additional SKF_* flags */
-  } params;
 };
 
+typedef union bgp_socket {
+  struct { BGP_SOCKET_PUB; };
+  struct bgp_socket_private priv;
+} bgp_socket;
+
+BLO_UNLOCK_CLEANUP(bgp_socket);
+
+#define BGP_SOCKET_LOCKED(_pub, _priv) BLO_LOCKED(_pub, _priv, bgp_socket)
+#define BGP_SOCKET_LOCK(_pub, _priv)   BLO_LOCK(_pub, _priv, bgp_socket)
+#define BGP_SOCKET_UNLOCK(_pub, _priv) ( BLO_UNLOCK_CLEANUP_NAME(bgp_socket)(_priv), _priv = NULL )
+
 struct bgp_stats {
   uint rx_messages, tx_messages;
   uint rx_updates, tx_updates;
@@ -381,7 +399,7 @@ struct bgp_incoming_socket {
 
 struct bgp_listen_request {
   node n;                              /* Node in bgp_socket / pending list */
-  struct bgp_socket *sock;             /* Assigned socket */
+  bgp_socket *sock;                    /* Assigned socket */
   struct bgp_socket_params params;     /* Listening socket parameters */
   ip_addr local_ip;                    /* Local IP address to match */
   struct iface *iface;                 /* Local interface to match */