]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
BMP: simplified update queuing and better memory performance
authorMaria Matejka <mq@ucw.cz>
Tue, 17 Sep 2024 14:27:54 +0000 (16:27 +0200)
committerOndrej Zajicek <santiago@crfreenet.org>
Mon, 2 Dec 2024 02:38:17 +0000 (03:38 +0100)
This commit is quite a substantial rework of the underlying layers in
BMP TX:

- several unnecessary layers of indirection dropped, including most of
  the original BMP's buffer machinery
- all messages are now written directly into one protocol's buffer
  allocated for the whole time big enough to fit every possible message
- output blocks are allocated by pages and immediately returned when
  used, improving the overall memory footprint
- no intermediary allocation is done from the heap altogether
- there is a documented and configurable limit on the TX queue size

doc/bird.sgml
proto/bgp/bgp.h
proto/bgp/packets.c
proto/bmp/Makefile
proto/bmp/bmp.c
proto/bmp/bmp.h
proto/bmp/buffer.c [deleted file]
proto/bmp/buffer.h [deleted file]
proto/bmp/config.Y

index d1375cafca92dfe6f19d70f9c5cbe592f0d9bd04..b6b77691e95c55f6b545d0664f4a6d5b5c490ae9 100644 (file)
@@ -3808,6 +3808,15 @@ by default and have to be enabled during installation by the configure option
 routes (in <ref id="bgp-import-table" name="BGP import tables">) and post-policy
 routes (in regular routing tables). All BGP protocols are monitored automatically.
 
+<sect1>Configuration (incomplete)
+<label id="bmp-config">
+
+<p><descrip>
+       <tag><label id="bmp-tx-buffer-limit">tx buffer limit <m/number/</tag>
+       How much data we are going to queue before we call the session stuck
+       and restart it, in megabytes. Default value: 1024 (effectively 1 gigabyte).
+</descrip>
+
 <sect1>Example
 <label id="bmp-exam">
 
@@ -3821,6 +3830,9 @@ protocol bmp {
 
        # Monitor accepted routes (passed import filters)
        monitoring rib in post_policy;
+
+       # Allow only 64M of pending data
+       tx buffer limit 64;
 }
 </code>
 
index 7127bc88a38aad31d9bee4e68d57733a4bb8d3d5..4cef8caf40a4a7f4c53c419ed5683e5e8be364cc 100644 (file)
@@ -650,7 +650,7 @@ int bgp_get_attr(const struct eattr *e, byte *buf, int buflen);
 void bgp_get_route_info(struct rte *, byte *buf);
 int bgp_total_aigp_metric_(rte *e, u64 *metric, const struct adata **ad);
 
-byte * bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, const net_addr *n, const struct rte *new, const struct rte_src *src);
+byte * bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, byte *end, const net_addr *n, const struct rte *new, const struct rte_src *src);
 
 #define BGP_AIGP_METRIC                1
 #define BGP_AIGP_MAX           U64(0xffffffffffffffff)
index 30da32948ed7acddeee93a3336e4bd957076cb36..ec648231a1fcb75567760276ba06abc34e02933a 100644 (file)
@@ -2482,12 +2482,10 @@ bgp_create_mp_unreach(struct bgp_write_state *s, struct bgp_bucket *buck, byte *
 #ifdef CONFIG_BMP
 
 static byte *
-bgp_create_update_bmp(struct bgp_channel *c, byte *buf, struct bgp_bucket *buck, bool update)
+bgp_create_update_bmp(struct bgp_channel *c, byte *buf, byte *end, struct bgp_bucket *buck, bool update)
 {
   struct bgp_proto *p = (void *) c->c.proto;
-  byte *end = buf + (BGP_MAX_EXT_MSG_LENGTH - BGP_HEADER_LENGTH);
   byte *res = NULL;
-  /* FIXME: must be a bit shorter */
 
   struct lp_state tmpp;
   lp_save(tmp_linpool, &tmpp);
@@ -2535,7 +2533,7 @@ bgp_bmp_prepare_bgp_hdr(byte *buf, const u16 msg_size, const u8 msg_type)
 }
 
 byte *
-bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, const net_addr *n,
+bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, byte *end, const net_addr *n,
                   const struct rte *new, const struct rte_src *src)
 {
 //  struct bgp_proto *p = (void *) c->c.proto;
@@ -2561,7 +2559,7 @@ bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, const net_addr *n,
   net_copy(px->net, n);
   add_tail(&b->prefixes, &px->buck_node);
 
-  byte *end = bgp_create_update_bmp(c, pkt, b, !!new);
+  end = bgp_create_update_bmp(c, pkt, end, b, !!new);
 
   if (end)
     bgp_bmp_prepare_bgp_hdr(buf, end - buf, PKT_UPDATE);
index d6fca1aa7882e76ebb652192958a2f0f074a7e2d..e34f382280c816ed10250c7e4105d8080be08577 100644 (file)
@@ -1,6 +1,6 @@
-src := bmp.c buffer.c map.c
+src := bmp.c map.c
 obj := $(src-o-files)
 $(all-daemon)
 $(cf-local)
 
-tests_objs := $(tests_objs) $(src-o-files)
\ No newline at end of file
+tests_objs := $(tests_objs) $(src-o-files)
index 599024f550c2480d263c1b44ede3b5e437796ab2..6dcf1562a027edeaffae321381f6ebe1bdf1e2ea 100644 (file)
@@ -29,7 +29,6 @@
  */
 
 #include "proto/bmp/bmp.h"
-#include "proto/bmp/buffer.h"
 #include "proto/bmp/map.h"
 
 #include <sys/socket.h>
@@ -94,16 +93,10 @@ enum bmp_message_type {
   BMP_ROUTE_MIRROR_MSG = 6 // Route Mirroring Message
 };
 
-// Total size of Common Header
-#define BMP_COMMON_HDR_SIZE 6
 // Defines size of padding when IPv4 address is going to be put into field
 // which can accept also IPv6 address
 #define BMP_PADDING_IP4_ADDR_SIZE 12
 
-/* BMP Per-Peer Header [RFC 7854 - Section 4.2] */
-// Total size of Per-Peer Header
-#define BMP_PER_PEER_HDR_SIZE 42
-
 enum bmp_peer_type {
   BMP_PEER_TYPE_GLOBAL_INSTANCE = 0,
   BMP_PEER_TYPE_RD_INSTANCE = 1,
@@ -166,6 +159,15 @@ enum bmp_peer_down_notif_reason {
   BMP_PEER_DOWN_REASON_PEER_DE_CONFIGURED = 5
 };
 
+struct bmp_peer_down_info {
+  u8 reason;
+  u8 fsm_code;
+  u8 err_code;
+  u8 err_subcode;
+  const byte *data;
+  int length;
+};
+
 /* BMP Termination Message [RFC 7854 - Section 4.5] */
 #define BMP_TERM_INFO_TYPE_SIZE 2
 enum bmp_term_info_type {
@@ -198,21 +200,42 @@ enum bmp_term_reason {
 #define IP4_MAX_TTL 255
 
 
-#define IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(expr, msg, rv...)     \
-  do {                                                                      \
-    if ((expr))                                                             \
-    {                                                                       \
-      log(L_WARN "[BMP] " msg);                                             \
-      return rv;                                                            \
-    }                                                                       \
-  } while (0)
+#define bmp_buffer_need(b, sz)  ASSERT_DIE((b)->pos + (sz) <= (b)->end)
 
+// Idea for following macros has been taken from |proto/mrt/mrt.c|
+#define BMP_DEFINE_PUT_FUNC(S, T)                               \
+  static inline void                                            \
+  bmp_put_##S(buffer *b, const T x)                             \
+  {                                                             \
+    bmp_buffer_need(b, sizeof(T));                        \
+    put_##S(b->pos, x);                                    \
+    b->pos += sizeof(T);                                   \
+  }
 
-#define IF_PTR_IS_NULL_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(p, msg, rv...) \
-  do {                                                                 \
-    IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(!(p), msg, rv);      \
-  } while (0)
+BMP_DEFINE_PUT_FUNC(u8, u8)
+BMP_DEFINE_PUT_FUNC(u16, u16)
+BMP_DEFINE_PUT_FUNC(u32, u32)
+BMP_DEFINE_PUT_FUNC(u64, u64)
+BMP_DEFINE_PUT_FUNC(ip4, ip4_addr)
+BMP_DEFINE_PUT_FUNC(ip6, ip6_addr)
 
+static inline void
+bmp_put_data(buffer *b, const void *src, const size_t len)
+{
+  ASSERT_DIE(b->pos + len <= b->end);
+  memcpy(b->pos, src, len);
+  b->pos += len;
+}
+
+static inline buffer
+bmp_default_buffer(struct bmp_proto *p)
+{
+  return (buffer) {
+    .start = p->msgbuf,
+    .pos = p->msgbuf,
+    .end = p->msgbuf + sizeof p->msgbuf,
+  };
+}
 
 static void bmp_connected(struct birdsock *sk);
 static void bmp_sock_err(sock *sk, int err);
@@ -225,18 +248,44 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
 
 static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs);
 
-// Stores necessary any data in list
-struct bmp_data_node {
-  node n;
-  byte *data;
-  size_t data_size;
-
-  u32 remote_as;
-  u32 remote_id;
-  ip_addr remote_ip;
-  btime timestamp;
-  bool global_peer;
-  bool policy;
+// Stores TX data
+struct bmp_tx_buffer {
+  struct bmp_tx_buffer *next;
+  byte *pos;
+  byte data[];
+};
+
+#define bmp_tx_remains(b) (((byte *) (b) + page_size) - (b)->pos)
+
+/* A dummy resource to accurately show memory pages allocated for pending TX */
+struct bmp_tx_resource {
+  resource r;
+  struct bmp_proto *p;
+};
+
+static void
+bmp_tx_resource_free(resource *r UNUSED) {}
+
+static void
+bmp_tx_resource_dump(resource *r UNUSED) {}
+
+static struct resmem
+bmp_tx_resource_memsize(resource *r)
+{
+  struct bmp_proto *p = SKIP_BACK(struct bmp_tx_resource, r, r)->p;
+
+  return (struct resmem) {
+    .effective = p->tx_pending_count * page_size,
+    .overhead = sizeof(struct bmp_tx_resource),
+  };
+}
+
+static struct resclass bmp_tx_resource_class = {
+  .name = "BMP TX buffers",
+  .size = sizeof(struct bmp_tx_resource),
+  .free = bmp_tx_resource_free,
+  .dump = bmp_tx_resource_dump,
+  .memsize = bmp_tx_resource_memsize,
 };
 
 static void
@@ -268,6 +317,7 @@ bmp_init_msg_serialize(buffer *stream, const char *sys_descr, const char *sys_na
   // We include MIB-II sysDescr and sysName in BMP INIT MSG so that's why
   // allocated 2x BMP_INFO_TLV_FIX_SIZE memory pool size
   const size_t data_size = (2 * BMP_INFO_TLV_FIX_SIZE) + sys_descr_len + sys_name_len;
+
   bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
   bmp_common_hdr_serialize(stream, BMP_INIT_MSG, data_size);
   bmp_info_tlv_hdr_serialize(stream, BMP_INFO_TLV_TYPE_SYS_DESCR, sys_descr);
@@ -275,21 +325,62 @@ bmp_init_msg_serialize(buffer *stream, const char *sys_descr, const char *sys_na
 }
 
 static void
-bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size)
+bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, size_t size)
 {
   ASSERT(p->started);
 
-  struct bmp_data_node *tx_data = mb_alloc(p->tx_mem_pool, sizeof (struct bmp_data_node));
-  tx_data->data = mb_alloc(p->tx_mem_pool, size);
-  memcpy(tx_data->data, payload, size);
-  tx_data->data_size = size;
-  add_tail(&p->tx_queue, &tx_data->n);
-
-  if (sk_tx_buffer_empty(p->sk)
-      && !ev_active(p->tx_ev))
+  while (size)
   {
+    if (!p->tx_last || !bmp_tx_remains(p->tx_last))
+    {
+      if (p->tx_pending_count >= p->tx_pending_limit)
+       return ev_schedule(p->tx_overflow_event);
+
+      p->tx_pending_count++;
+
+      struct bmp_tx_buffer *btb = alloc_page();
+      btb->pos = btb->data;
+      btb->next = NULL;
+
+      if (p->tx_last)
+      {
+       ASSERT_DIE(!p->tx_last->next);
+       p->tx_last->next = btb;
+      }
+      else
+       ASSERT_DIE(p->tx_pending_count == 1);
+
+      p->tx_last = btb;
+
+      if (!p->tx_pending)
+       p->tx_pending = btb;
+    }
+
+    size_t cpylen = bmp_tx_remains(p->tx_last);
+    if (size < cpylen)
+      cpylen = size;
+
+    memcpy(p->tx_last->pos, payload, cpylen);
+    p->tx_last->pos += cpylen;
+
+    payload += cpylen;
+    size -= cpylen;
+  }
+
+  if (!p->sk->tbuf && !ev_active(p->tx_ev))
     ev_schedule(p->tx_ev);
+}
+
+static void
+bmp_tx_buffer_free(struct bmp_proto *p, struct bmp_tx_buffer *btb)
+{
+  if (btb == p->tx_last)
+  {
+    p->tx_last = NULL;
+    ASSERT_DIE(!p->tx_pending_count);
   }
+
+  free_page(btb);
 }
 
 static void
@@ -300,40 +391,33 @@ bmp_fire_tx(void *p_)
   if (!p->started)
     return;
 
-  IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
-    EMPTY_LIST(p->tx_queue),
-    "Called BMP TX event handler when there is not any data to send"
-  );
-
-  size_t cnt = 0; // Counts max packets which we want to send per TX slot
-  struct bmp_data_node *tx_data;
-  struct bmp_data_node *tx_data_next;
-  WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue)
+  int cnt = 0;
+  for (struct bmp_tx_buffer *btb; btb = p->tx_pending; )
   {
-    if (tx_data->data_size > p->sk->tbsize)
-    {
-      sk_set_tbsize(p->sk, tx_data->data_size);
-    }
+    ASSERT_DIE(!p->sk->tbuf);
+
+    p->sk->tbuf = btb->data;
+    u64 sz = btb->pos - btb->data;
+
+    p->tx_sent += sz;
+    p->tx_sent_total += sz;
+
+    if (p->tx_pending == p->tx_last)
+      p->tx_last = NULL;
 
-    size_t data_size = tx_data->data_size;
-    memcpy(p->sk->tbuf, tx_data->data, data_size);
-    mb_free(tx_data->data);
-    rem_node((node *) tx_data);
-    mb_free(tx_data);
+    p->tx_pending = btb->next;
+    p->tx_pending_count--;
 
-    if (sk_send(p->sk, data_size) <= 0)
+    if (sk_send(p->sk, sz) <= 0)
       return;
 
-    // BMP packets should be treat with lowest priority when scheduling sending
-    // packets to target. That's why we want to send max. 32 packets per event
-    // call
-    if (++cnt > 32)
+    p->sk->tbuf = NULL;
+    bmp_tx_buffer_free(p, btb);
+
+    if (cnt++ > 1024)
     {
       if (!ev_active(p->tx_ev))
-      {
-        ev_schedule(p->tx_ev);
-      }
-
+       ev_schedule(p->tx_ev);
       return;
     }
   }
@@ -342,6 +426,13 @@ bmp_fire_tx(void *p_)
 static void
 bmp_tx(struct birdsock *sk)
 {
+  struct bmp_proto *p = sk->data;
+
+  struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, data, sk->tbuf);
+  bmp_tx_buffer_free(p, btb);
+
+  sk->tbuf = NULL;
+
   bmp_fire_tx(sk->data);
 }
 
@@ -420,21 +511,31 @@ bmp_per_peer_hdr_serialize(buffer *stream, const bool is_global_instance_peer,
 }
 
 /* [4.6] Route Monitoring */
-static void
-bmp_route_monitor_msg_serialize(buffer *stream, const bool is_peer_global,
+static byte *
+bmp_route_monitor_msg_serialize(struct bmp_proto *p, const bool is_peer_global,
   const bool table_in_post_policy, const u32 peer_as, const u32 peer_bgp_id,
-  const bool as4_support, const ip_addr remote_addr, const byte *update_msg,
+  const bool as4_support, const ip_addr remote_addr, byte *update_msg,
   const size_t update_msg_size, btime timestamp)
 {
+  ASSERT_DIE(update_msg < &p->msgbuf[sizeof p->msgbuf]);
+
+  buffer stream;
+  STACK_BUFFER_INIT(stream, BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE);
+
   const size_t data_size = BMP_PER_PEER_HDR_SIZE + update_msg_size;
   u32 ts_sec = timestamp TO_S;
   u32 ts_usec = timestamp - (ts_sec S);
 
-  bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
-  bmp_common_hdr_serialize(stream, BMP_ROUTE_MONITOR, data_size);
-  bmp_per_peer_hdr_serialize(stream, is_peer_global, table_in_post_policy,
+  bmp_common_hdr_serialize(&stream, BMP_ROUTE_MONITOR, data_size);
+  bmp_per_peer_hdr_serialize(&stream, is_peer_global, table_in_post_policy,
     as4_support, remote_addr, peer_as, peer_bgp_id, ts_sec, ts_usec);
-  bmp_put_data(stream, update_msg, update_msg_size);
+
+  size_t hdr_sz = stream.pos - stream.start;
+  ASSERT_DIE(update_msg >= &p->msgbuf[hdr_sz]);
+
+  byte *begin = &update_msg[-hdr_sz];
+  memcpy(begin, stream.start, hdr_sz);
+  return begin;
 }
 
 static void
@@ -453,6 +554,7 @@ bmp_peer_up_notif_msg_serialize(buffer *stream, const bool is_peer_global,
   bmp_per_peer_hdr_serialize(stream, is_peer_global,
     false /* TODO: Hardcoded pre-policy Adj-RIB-In */, as4_support, remote_addr,
     peer_as, peer_bgp_id, 0, 0); // 0, 0 - No timestamp provided
+
   bmp_put_ipa(stream, local_addr);
   bmp_put_u16(stream, local_port);
   bmp_put_u16(stream, remote_port);
@@ -465,15 +567,37 @@ bmp_peer_up_notif_msg_serialize(buffer *stream, const bool is_peer_global,
 static void
 bmp_peer_down_notif_msg_serialize(buffer *stream, const bool is_peer_global,
   const u32 peer_as, const u32 peer_bgp_id, const bool as4_support,
-  const ip_addr remote_addr, const byte *data, const size_t data_size)
+  const ip_addr remote_addr, const struct bmp_peer_down_info *info)
 {
-  const size_t payload_size = BMP_PER_PEER_HDR_SIZE + data_size;
-  bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + payload_size);
-  bmp_common_hdr_serialize(stream, BMP_PEER_DOWN_NOTIF, payload_size);
+  const size_t data_size = BMP_PER_PEER_HDR_SIZE + 1 +
+    (((info->reason == BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION) ||
+      (info->reason == BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION)) ? (BGP_HEADER_LENGTH + 2 + info->length) :
+     (info->reason == BMP_PEER_DOWN_REASON_LOCAL_NO_NOTIFICATION) ? 2 : 0);
+
+  bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
+  bmp_common_hdr_serialize(stream, BMP_PEER_DOWN_NOTIF, data_size);
   bmp_per_peer_hdr_serialize(stream, is_peer_global,
     false /* TODO: Hardcoded pre-policy adj RIB IN */,  as4_support, remote_addr,
     peer_as, peer_bgp_id, 0, 0); // 0, 0 - No timestamp provided
-  bmp_put_data(stream, data, data_size);
+
+  bmp_put_u8(stream, info->reason);
+
+  switch (info->reason)
+  {
+  case BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION:
+  case BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION:;
+    uint bgp_msg_length = BGP_HEADER_LENGTH + 2 + info->length;
+    bmp_buffer_need(stream, bgp_msg_length);
+    bmp_put_bgp_hdr(stream, bgp_msg_length, PKT_NOTIFICATION);
+    bmp_put_u8(stream, info->err_code);
+    bmp_put_u8(stream, info->err_subcode);
+    bmp_put_data(stream, info->data, info->length);
+    break;
+
+  case BMP_PEER_DOWN_REASON_LOCAL_NO_NOTIFICATION:
+    bmp_put_u16(stream, info->fsm_code);
+    break;
+  }
 }
 
 
@@ -765,99 +889,63 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
   ASSERT(p->started);
 
   const struct birdsock *sk = bmp_get_birdsock_ext(bgp);
-  IF_PTR_IS_NULL_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
-    sk,
-    "[BMP] No BGP socket"
-  );
+  if (!sk)
+  {
+    log(L_WARN "%s: No BGP socket", p->p.name);
+    return;
+  }
 
   const bool is_global_instance_peer = bmp_is_peer_global_instance(bgp);
-  buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+  buffer payload = bmp_default_buffer(p);
   bmp_peer_up_notif_msg_serialize(&payload, is_global_instance_peer,
     bgp->remote_as, bgp->remote_id, 1,
     sk->saddr, sk->daddr, sk->sport, sk->dport, tx_data, tx_data_size,
     rx_data, rx_data_size);
-  bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-  bmp_buffer_free(&payload);
+  bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
 }
 
 static void
-bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp)
+bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, byte *data, size_t length, btime timestamp)
 {
-  struct bmp_data_node *upd_msg = mb_alloc(p->update_msg_mem_pool,
-                               sizeof (struct bmp_data_node));
-  upd_msg->data = mb_alloc(p->update_msg_mem_pool, length);
-  memcpy(upd_msg->data, data, length);
-  upd_msg->data_size = length;
-  add_tail(&p->update_msg_queue, &upd_msg->n);
-
-  /* Save some metadata */
   struct bgp_proto *bgp = bs->bgp;
-  upd_msg->remote_as = bgp->remote_as;
-  upd_msg->remote_id = bgp->remote_id;
-  upd_msg->remote_ip = bgp->remote_ip;
-  upd_msg->timestamp = timestamp;
-  upd_msg->global_peer = bmp_is_peer_global_instance(bgp);
-  upd_msg->policy = bmp_stream_policy(bs);
-
-  /* Kick the commit */
-  if (!ev_active(p->update_ev))
-    ev_schedule(p->update_ev);
+  const byte *start = bmp_route_monitor_msg_serialize(p,
+      bmp_is_peer_global_instance(bgp),
+      bmp_stream_policy(bs),
+      bgp->remote_as,
+      bgp->remote_id,
+      true,
+      bgp->remote_ip,
+      data,
+      length,
+      timestamp
+      );
+
+  bmp_schedule_tx_packet(p, start, (data - start) + length);
 }
 
 static void
 bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs,
                         const net_addr *n, const struct rte *new, const struct rte_src *src)
 {
-  byte buf[BGP_MAX_EXT_MSG_LENGTH];
-  byte *end = bgp_bmp_encode_rte(bs->sender, buf, n, new, src);
+  byte *bufend = &p->msgbuf[sizeof p->msgbuf];
+  byte *begin = bufend - BGP_MAX_EXT_MSG_LENGTH;
+  byte *end = bgp_bmp_encode_rte(bs->sender, begin, bufend, n, new, src);
 
   btime delta_t = new ? current_time() - new->lastmod : 0;
   btime timestamp = current_real_time() - delta_t;
 
   if (end)
-    bmp_route_monitor_put_update(p, bs, buf, end - buf, timestamp);
+    bmp_route_monitor_put_update(p, bs, begin, end - begin, timestamp);
   else
     log(L_WARN "%s: Cannot encode update for %N", p->p.name, n);
 }
 
-static void
-bmp_route_monitor_commit(void *p_)
-{
-  struct bmp_proto *p = p_;
-
-  if (!p->started)
-    return;
-
-  buffer payload
-    = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
-
-  struct bmp_data_node *data, *data_next;
-  WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue)
-  {
-    bmp_route_monitor_msg_serialize(&payload,
-      data->global_peer, data->policy,
-      data->remote_as, data->remote_id, true,
-      data->remote_ip, data->data, data->data_size,
-      data->timestamp);
-
-    bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-
-    bmp_buffer_flush(&payload);
-
-    mb_free(data->data);
-    rem_node(&data->n);
-    mb_free(data);
-  }
-
-  bmp_buffer_free(&payload);
-}
-
 static void
 bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
 {
   TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", bs->bgp->p.name, bs->sender->c.name);
 
-  byte rx_end_payload[DEFAULT_MEM_BLOCK_SIZE];
+  byte *rx_end_payload = p->msgbuf + BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE;
   byte *pos = bgp_create_end_mark_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH);
   memset(rx_end_payload + BGP_MSG_HDR_MARKER_POS, 0xff,
         BGP_MSG_HDR_MARKER_SIZE); // BGP UPDATE MSG marker
@@ -869,21 +957,18 @@ bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
 
 static void
 bmp_send_peer_down_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
-  const byte *data, const size_t data_size)
+                            const struct bmp_peer_down_info *info)
 {
   ASSERT(p->started);
 
   const struct bgp_caps *remote_caps = bmp_get_bgp_remote_caps_ext(bgp);
   bool is_global_instance_peer = bmp_is_peer_global_instance(bgp);
-  buffer payload
-    = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+  buffer payload = bmp_default_buffer(p);
   bmp_peer_down_notif_msg_serialize(&payload, is_global_instance_peer,
     bgp->remote_as, bgp->remote_id,
     remote_caps ? remote_caps->as4_support : bgp->as4_session,
-    bgp->remote_ip, data, data_size);
-  bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-
-  bmp_buffer_free(&payload);
+    bgp->remote_ip, info);
+  bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
 }
 
 static void
@@ -899,48 +984,32 @@ bmp_peer_down_(struct bmp_proto *p, const struct bgp_proto *bgp,
 
   TRACE(D_STATES, "Peer down for %s", bgp->p.name);
 
-  uint bmp_code = 0;
-  uint fsm_code = 0;
+  struct bmp_peer_down_info info = {
+    .err_code = err_code,
+    .err_subcode = err_subcode,
+    .data = data,
+    .length = length,
+  };
 
   switch (err_class)
   {
   case BE_BGP_RX:
-    bmp_code = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION;
+    info.reason = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION;
     break;
 
   case BE_BGP_TX:
   case BE_AUTO_DOWN:
   case BE_MAN_DOWN:
-    bmp_code = BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION;
+    info.reason = BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION;
     break;
 
   default:
-    bmp_code = BMP_PEER_DOWN_REASON_REMOTE_NO_NOTIFICATION;
-    length = 0;
+    info.reason = BMP_PEER_DOWN_REASON_REMOTE_NO_NOTIFICATION;
+    info.length = 0;
     break;
   }
 
-  buffer payload = bmp_buffer_alloc(p->buffer_mpool, 1 + BGP_HEADER_LENGTH + 2 + length);
-  bmp_put_u8(&payload, bmp_code);
-
-  switch (bmp_code)
-  {
-  case BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION:
-  case BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION:
-    bmp_put_bgp_hdr(&payload, BGP_HEADER_LENGTH + 2 + length, PKT_NOTIFICATION);
-    bmp_put_u8(&payload, err_code);
-    bmp_put_u8(&payload, err_subcode);
-    bmp_put_data(&payload, data, length);
-    break;
-
-  case BMP_PEER_DOWN_REASON_LOCAL_NO_NOTIFICATION:
-    bmp_put_u16(&payload, fsm_code);
-    break;
-  }
-
-  bmp_send_peer_down_notif_msg(p, bgp, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-
-  bmp_buffer_free(&payload);
+  bmp_send_peer_down_notif_msg(p, bgp, &info);
 
   bmp_remove_peer(p, bp);
 }
@@ -962,18 +1031,21 @@ bmp_send_termination_msg(struct bmp_proto *p,
                                      + BMP_TERM_INFO_LEN_FIELD_SIZE
                                      + BMP_TERM_REASON_CODE_SIZE;
   const size_t term_msg_size = BMP_COMMON_HDR_SIZE + term_msg_hdr_size;
-  buffer stream = bmp_buffer_alloc(p->buffer_mpool, term_msg_size);
+  buffer stream = bmp_default_buffer(p);
+  bmp_buffer_need(&stream, term_msg_size);
+
   bmp_common_hdr_serialize(&stream, BMP_TERM_MSG, term_msg_hdr_size);
   bmp_put_u16(&stream, BMP_TERM_INFO_REASON);
   bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason
   bmp_put_u16(&stream, reason);
-  memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream));
-  IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
-    sk_send(p->sk, bmp_buffer_pos(&stream)) < 0,
-    "Failed to send BMP termination message"
-    );
 
-  bmp_buffer_free(&stream);
+  if (p->sk->tbuf)
+    bmp_tx_buffer_free(p, SKIP_BACK(struct bmp_tx_buffer, data, p->sk->tbuf));
+
+  p->sk->tbuf = stream.start;
+  if (sk_send(p->sk, stream.pos - stream.start) < 0)
+    log(L_WARN "%s: Cannot send BMP termination message", p->p.name);
+  p->sk->tbuf = NULL;
 }
 
 int
@@ -1062,10 +1134,9 @@ bmp_startup(struct bmp_proto *p)
   proto_notify_state(&p->p, PS_UP);
 
   /* Send initiation message */
-  buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+  buffer payload = bmp_default_buffer(p);
   bmp_init_msg_serialize(&payload, p->sys_descr, p->sys_name);
-  bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-  bmp_buffer_free(&payload);
+  bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
 
   /* Send Peer Up messages */
   struct proto *peer;
@@ -1086,6 +1157,7 @@ bmp_down(struct bmp_proto *p)
 {
   ASSERT(p->started);
   p->started = false;
+  p->tx_sent = 0;
 
   TRACE(D_EVENTS, "BMP session closed");
 
@@ -1119,7 +1191,6 @@ bmp_connect(struct bmp_proto *p)
   sk->dport = p->station_port;
   sk->ttl = IP4_MAX_TTL;
   sk->tos = IP_PREC_INTERNET_CONTROL;
-  sk->tbsize = BGP_TX_BUFFER_EXT_SIZE;
   sk->tx_hook = bmp_connected;
   sk->err_hook = bmp_sock_err;
 
@@ -1173,6 +1244,26 @@ bmp_sock_err(sock *sk, int err)
   proto_notify_state(&p->p, PS_START);
 }
 
+static void
+bmp_tx_overflow(void *_p)
+{
+  struct bmp_proto *p = _p;
+  if (p->tx_pending_count < p->tx_pending_limit)
+    return;
+
+  p->sock_err = 0;
+
+  log(L_ERR "%s: Connection stalled", p->p.name);
+
+  if (p->started)
+    bmp_down(p);
+
+  bmp_close_socket(p);
+  tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME);
+
+  proto_notify_state(&p->p, PS_START);
+}
+
 /* BMP connect timeout event - switch from Idle/Connect state to Connect state */
 static void
 bmp_connection_retry(timer *t)
@@ -1189,6 +1280,24 @@ bmp_connection_retry(timer *t)
 static void
 bmp_close_socket(struct bmp_proto *p)
 {
+  if (p->sk && p->sk->tbuf)
+    bmp_tx_buffer_free(p, SKIP_BACK(struct bmp_tx_buffer, data, p->sk->tbuf));
+
+  struct bmp_tx_buffer *btb = p->tx_pending;
+  while (btb)
+  {
+    p->tx_pending_count--;
+
+    struct bmp_tx_buffer *next = btb->next;
+    bmp_tx_buffer_free(p, btb);
+    btb = next;
+  }
+
+  p->tx_pending = NULL;
+
+  ASSERT_DIE(!p->tx_last);
+  ASSERT_DIE(!p->tx_pending_count);
+
   rfree(p->sk);
   p->sk = NULL;
 }
@@ -1230,6 +1339,7 @@ bmp_init(struct proto_config *CF)
   strcpy(p->sys_name, cf->sys_name);
   p->monitoring_rib.in_pre_policy = cf->monitoring_rib_in_pre_policy;
   p->monitoring_rib.in_post_policy = cf->monitoring_rib_in_post_policy;
+  p->tx_pending_limit = cf->tx_pending_limit;
 
   return P;
 }
@@ -1243,20 +1353,20 @@ bmp_start(struct proto *P)
 {
   struct bmp_proto *p = (void *) P;
 
-  p->buffer_mpool = rp_new(P->pool, "BMP Buffer");
-  p->tx_mem_pool = rp_new(P->pool, "BMP Tx");
-  p->update_msg_mem_pool = rp_new(P->pool, "BMP Update");
   p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p);
-  p->update_ev = ev_new_init(p->p.pool, bmp_route_monitor_commit, p);
+  p->tx_pending = NULL;
+  p->tx_pending_count = 0;
+  p->tx_overflow_event = ev_new_init(p->p.pool, bmp_tx_overflow, p);
   p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0);
   p->sk = NULL;
 
+  resource *r = ralloc(P->pool, &bmp_tx_resource_class);
+  SKIP_BACK(struct bmp_tx_resource, r, r)->p = p;
+
   HASH_INIT(p->peer_map, P->pool, 4);
   HASH_INIT(p->stream_map, P->pool, 4);
   HASH_INIT(p->table_map, P->pool, 4);
 
-  init_list(&p->tx_queue);
-  init_list(&p->update_msg_queue);
   p->started = false;
   p->sock_err = 0;
   add_tail(&bmp_proto_list, &p->bmp_node);
@@ -1275,6 +1385,7 @@ bmp_shutdown(struct proto *P)
   {
     bmp_send_termination_msg(p, BMP_TERM_REASON_ADM);
     bmp_down(p);
+    bmp_close_socket(p);
   }
 
   p->sock_err = 0;
@@ -1305,6 +1416,9 @@ bmp_reconfigure(struct proto *P, struct proto_config *CF)
   /* We must update our copy of configuration ptr */
   p->cf = new;
 
+  /* Reconfigure tx buffer size limits */
+  p->tx_pending_limit = new->tx_pending_limit;
+
   return 1;
 }
 
@@ -1341,6 +1455,13 @@ bmp_show_proto_info(struct proto *P)
 
     if (p->sock_err)
       cli_msg(-1006, "  %-19s %M", "Last error:", p->sock_err);
+
+    cli_msg(-1006, "  %-19s % 9sB (limit %sB)", "Pending TX:",
+       fmt_order(p->tx_pending_count * (u64) page_size, 1, 10000),
+       fmt_order(p->tx_pending_limit * (u64) page_size, 1, 10000));
+
+    cli_msg(-1006, "  %-19s % 9sB", "Session TX:", fmt_order(p->tx_sent, 1, 10000));
+    cli_msg(-1006, "  %-19s % 9sB", "Total TX:", fmt_order(p->tx_sent_total, 1, 10000));
   }
 }
 
index 45844836f789d96868180c60902d2aeda73144e4..52f7212252afd988765a565f206f09bc61c02a52 100644 (file)
 #include "lib/event.h"
 #include "lib/hash.h"
 #include "lib/socket.h"
+#include "proto/bgp/bgp.h"
 #include "proto/bmp/map.h"
 
 // Max length of MIB-II description object
 #define MIB_II_STR_LEN 255
 
+// Total size of Common Header
+#define BMP_COMMON_HDR_SIZE 6
+
+/* BMP Per-Peer Header [RFC 7854 - Section 4.2] */
+// Total size of Per-Peer Header
+#define BMP_PER_PEER_HDR_SIZE 42
+
+// Maximum length of BMP message altogether
+#define BMP_MSGBUF_LEN (BGP_MAX_EXT_MSG_LENGTH + BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE + 1)
+
 // The following fields of this structure controls whether there will be put
 // specific routes into Route Monitoring message and send to BMP collector
 struct monitoring_rib {
@@ -38,6 +49,7 @@ struct bmp_config {
   u16 station_port;                   // Monitoring station TCP port
   bool monitoring_rib_in_pre_policy;  // Route monitoring pre-policy Adj-Rib-In
   bool monitoring_rib_in_post_policy;  // Route monitoring post-policy Adj-Rib-In
+  uint tx_pending_limit;             // Maximum on pending TX buffer count
 };
 
 /* Forward declarations */
@@ -64,14 +76,17 @@ struct bmp_proto {
   struct monitoring_rib monitoring_rib;
   // Below fields are for internal use
   // struct bmp_peer_map bgp_peers;   // Stores 'bgp_proto' structure per BGP peer
-  pool *buffer_mpool;              // Memory pool used for BMP buffer allocations
-  pool *tx_mem_pool;               // Memory pool used for packet allocations designated to BMP collector
-  pool *update_msg_mem_pool;       // Memory pool used for BPG UPDATE MSG allocations
-  list tx_queue;                   // Stores queued packets going to be sent
+  struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush
+  struct bmp_tx_buffer *tx_last;   // This buffer is the last to flush 
+  uint tx_pending_count;          // How many buffers waiting for flush
+  uint tx_pending_limit;          // Maximum on buffer count
+  u64 tx_sent;                    // Amount of data sent
+  u64 tx_sent_total;              // Amount of data sent accumulated over reconnections
+  event *tx_overflow_event;       // Too many buffers waiting for flush
   timer *connect_retry_timer;      // Timer for retrying connection to the BMP collector
-  list update_msg_queue;           // Stores all composed BGP UPDATE MSGs
   bool started;                    // Flag that stores running status of BMP instance
   int sock_err;                    // Last socket error code
+  byte msgbuf[BMP_MSGBUF_LEN];     // Buffer for preparing the messages before sending them out
 };
 
 struct bmp_peer {
diff --git a/proto/bmp/buffer.c b/proto/bmp/buffer.c
deleted file mode 100644 (file)
index be9dd69..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *     BIRD -- The BGP Monitoring Protocol (BMP)
- *
- *     (c) 2020 Akamai Technologies, Inc. (Pawel Maslanka, pmaslank@akamai.com)
- *
- *     Can be freely distributed and used under the terms of the GNU GPL.
- */
-
-#include "proto/bmp/buffer.h"
-
-buffer
-bmp_buffer_alloc(pool *ppool, const size_t n)
-{
-  buffer buf;
-  buf.start = mb_alloc(ppool, n);
-  buf.pos = buf.start;
-  buf.end = buf.start + n;
-  return buf;
-}
-
-void
-bmp_buffer_free(buffer *buf)
-{
-  mb_free(buf->start);
-  buf->start = buf->pos = buf->end = NULL;
-}
-
-/**
- * @brief bmp_buffer_grow
- * @param buf - buffer to grow
- * @param n   - required amount of available space
- * Resize buffer in a way that there is at least @n bytes of available space.
- */
-static void
-bmp_buffer_grow(buffer *buf, const size_t n)
-{
-  size_t pos = bmp_buffer_pos(buf);
-  size_t size = bmp_buffer_size(buf);
-  size_t req = pos + n;
-
-  while (size < req)
-    size = size * 3 / 2;
-
-  buf->start = mb_realloc(buf->start, size);
-  buf->pos = buf->start + pos;
-  buf->end = buf->start + size;
-}
-
-void
-bmp_buffer_need(buffer *buf, const size_t n)
-{
-  if (bmp_buffer_avail(buf) < n)
-    bmp_buffer_grow(buf, n);
-}
-
-void
-bmp_put_data(buffer *buf, const void *src, const size_t n)
-{
-  if (!n)
-    return;
-
-  bmp_buffer_need(buf, n);
-  memcpy(buf->pos, src, n);
-  buf->pos += n;
-}
diff --git a/proto/bmp/buffer.h b/proto/bmp/buffer.h
deleted file mode 100644 (file)
index f752cf5..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *     BIRD -- The BGP Monitoring Protocol (BMP)
- *
- *     (c) 2020 Akamai Technologies, Inc. (Pawel Maslanka, pmaslank@akamai.com)
- *
- *     Can be freely distributed and used under the terms of the GNU GPL.
- */
-
-#ifndef _BIRD_BMP_BUFFER_H_
-#define _BIRD_BMP_BUFFER_H_
-
-#include "proto/bmp/bmp.h"
-
-#include <stdlib.h>
-
-#include "lib/resource.h"
-
-buffer
-bmp_buffer_alloc(pool *ppool, const size_t n);
-
-void
-bmp_buffer_free(buffer *buf);
-
-static inline void
-bmp_buffer_flush(buffer *buf)
-{
-  buf->pos = buf->start;
-}
-
-static inline size_t
-bmp_buffer_size(const buffer *buf)
-{
-  return buf->end - buf->start;
-}
-
-static inline size_t
-bmp_buffer_avail(const buffer *buf)
-{
-  return buf->end - buf->pos;
-}
-
-static inline size_t
-bmp_buffer_pos(const buffer *buf)
-{
-  return buf->pos - buf->start;
-}
-
-static inline byte *
-bmp_buffer_data(const buffer *buf)
-{
-  return buf->start;
-}
-
-void
-bmp_buffer_need(buffer *buf, const size_t n);
-
-// Idea for following macros has been taken from |proto/mrt/mrt.c|
-#define BMP_DEFINE_PUT_FUNC(S, T)                               \
-  static inline void                                            \
-  bmp_put_##S(buffer *b, const T x)                             \
-  {                                                             \
-    bmp_buffer_need(b, sizeof(T));                              \
-    put_##S(b->pos, x);                                    \
-    b->pos += sizeof(T);                                   \
-  }
-
-BMP_DEFINE_PUT_FUNC(u8, u8)
-BMP_DEFINE_PUT_FUNC(u16, u16)
-BMP_DEFINE_PUT_FUNC(u32, u32)
-BMP_DEFINE_PUT_FUNC(u64, u64)
-BMP_DEFINE_PUT_FUNC(ip4, ip4_addr)
-BMP_DEFINE_PUT_FUNC(ip6, ip6_addr)
-
-void
-bmp_put_data(buffer *buf, const void *src, const size_t n);
-
-#endif /* _BIRD_BMP_BUFFER_H_ */
index acb0c4d9ad91f20d359469cedf62679026d5df1e..04540b26bf9b9c0c3580481241b6b5a976416536 100644 (file)
@@ -27,6 +27,7 @@ bmp_proto_start: proto_start BMP {
      this_proto = proto_config_new(&proto_bmp, $1);
      BMP_CFG->sys_descr = "Not defined";
      BMP_CFG->sys_name = "Not defined";
+     BMP_CFG->tx_pending_limit = (1 << 30) / page_size;
    }
  ;
 
@@ -71,6 +72,10 @@ bmp_proto:
  | bmp_proto MONITORING RIB IN POST_POLICY bool ';' {
      BMP_CFG->monitoring_rib_in_post_policy = $6;
    }
+ | bmp_proto TX BUFFER LIMIT expr ';' {
+     BMP_CFG->tx_pending_limit = $5 * (u64) (1 << 20) / page_size;
+     if ($5 < 1) cf_error("BMP TX buffer limit must be at least 1 megabyte");
+   }
  ;
 
 CF_CODE