]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Merge commit 'e6a100b31a7637ee739338e4b933367707ec931f' into thread-next
authorOndrej Zajicek <santiago@crfreenet.org>
Tue, 3 Dec 2024 17:22:14 +0000 (18:22 +0100)
committerOndrej Zajicek <santiago@crfreenet.org>
Tue, 3 Dec 2024 17:22:14 +0000 (18:22 +0100)
1  2 
doc/bird.sgml
proto/bgp/bgp.h
proto/bgp/packets.c
proto/bmp/bmp.c
proto/bmp/bmp.h
proto/bmp/config.Y

diff --cc doc/bird.sgml
Simple merge
diff --cc proto/bgp/bgp.h
index 0d06065fd6466747c0a47cab4948790f7ff158a3,4cef8caf40a4a7f4c53c419ed5683e5e8be364cc..05aae612b91e215d6ed1bcdc8043ecd46263eb33
@@@ -674,36 -627,30 +674,36 @@@ int bgp_encode_mp_reach_mrt(struct bgp_
  const char * bgp_attr_name(uint code);
  int bgp_encode_attrs(struct bgp_write_state *s, ea_list *attrs, byte *buf, byte *end);
  ea_list * bgp_decode_attrs(struct bgp_parse_state *s, byte *data, uint len);
 -void bgp_finish_attrs(struct bgp_parse_state *s, rta *a);
 -
 -void bgp_init_bucket_table(struct bgp_channel *c);
 -void bgp_free_bucket_table(struct bgp_channel *c);
 -void bgp_free_bucket(struct bgp_channel *c, struct bgp_bucket *b);
 -void bgp_defer_bucket(struct bgp_channel *c, struct bgp_bucket *b);
 -void bgp_withdraw_bucket(struct bgp_channel *c, struct bgp_bucket *b);
 -
 -void bgp_init_prefix_table(struct bgp_channel *c);
 -void bgp_free_prefix_table(struct bgp_channel *c);
 -void bgp_free_prefix(struct bgp_channel *c, struct bgp_prefix *bp);
 -
 -int bgp_rte_better(struct rte *, struct rte *);
 -int bgp_rte_mergable(rte *pri, rte *sec);
 -int bgp_rte_recalculate(rtable *table, net *net, rte *new, rte *old, rte *old_best);
 -struct rte *bgp_rte_modify_stale(struct rte *r, struct linpool *pool);
 -u32 bgp_rte_igp_metric(struct rte *);
 -void bgp_rt_notify(struct proto *P, struct channel *C, net *n, rte *new, rte *old);
 +void bgp_finish_attrs(struct bgp_parse_state *s, ea_list **to);
 +
 +void bgp_setup_out_table(struct bgp_channel *c);
 +
 +void bgp_init_pending_tx(struct bgp_channel *c);
 +void bgp_free_pending_tx(struct bgp_channel *c);
 +void bgp_tx_resend(struct bgp_proto *p, struct bgp_channel *c);
 +
 +void bgp_withdraw_bucket(struct bgp_ptx_private *c, struct bgp_bucket *b);
 +int bgp_done_bucket(struct bgp_ptx_private *c, struct bgp_bucket *b);
 +
 +void bgp_done_prefix(struct bgp_ptx_private *c, struct bgp_prefix *px, struct bgp_bucket *buck);
 +
 +int bgp_rte_better(const rte *, const rte *);
 +int bgp_rte_mergable(const rte *pri, const rte *sec);
 +int bgp_rte_recalculate(struct rtable_private *table, net *net, struct rte_storage *new, struct rte_storage *old, struct rte_storage *old_best);
 +void bgp_rte_modify_stale(void *bgp_channel);
 +u32 bgp_rte_igp_metric(const rte *);
 +void bgp_rt_notify(struct proto *P, struct channel *C, const net_addr *n, rte *new, const rte *old);
  int bgp_preexport(struct channel *, struct rte *);
 -int bgp_get_attr(const struct eattr *e, byte *buf, int buflen);
 -void bgp_get_route_info(struct rte *, byte *buf);
 -int bgp_total_aigp_metric_(rte *e, u64 *metric, const struct adata **ad);
 +void bgp_get_route_info(const rte *, byte *);
 +int bgp_total_aigp_metric_(const rte *e, u64 *metric, const struct adata **ad);
 +
 +static inline struct bgp_proto *bgp_rte_proto(const rte *rte)
 +{
 +  return (rte->src->owner->class == &bgp_rte_owner_class) ?
 +    SKIP_BACK(struct bgp_proto, p.sources, rte->src->owner) : NULL;
 +}
  
- byte * bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, const struct rte *new);
 -byte * bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, byte *end, const net_addr *n, const struct rte *new, const struct rte_src *src);
++byte * bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, byte *end, const struct rte *new);
  
  #define BGP_AIGP_METRIC               1
  #define BGP_AIGP_MAX          U64(0xffffffffffffffff)
index 372b775d41b955192e966db9ef7a50b3611f8989,ec648231a1fcb75567760276ba06abc34e02933a..33f46a8fd31d39c39307e3afe462c8259e2631e1
@@@ -2480,29 -2482,20 +2480,27 @@@ bgp_create_mp_unreach(struct bgp_write_
  #ifdef CONFIG_BMP
  
  static byte *
- bgp_create_update_bmp(ea_list *channel_ea, struct bgp_proto *bgp_p, byte *buf, struct bgp_bucket *buck, bool update)
 -bgp_create_update_bmp(struct bgp_channel *c, byte *buf, byte *end, struct bgp_bucket *buck, bool update)
++bgp_create_update_bmp(ea_list *channel_ea, struct bgp_proto *bgp_p, byte *buf, byte *end, struct bgp_bucket *buck, bool update)
  {
 -  struct bgp_proto *p = (void *) c->c.proto;
 -  byte *res = NULL;
 +  struct bgp_channel *c;
 +  u32 c_id = ea_get_int(channel_ea, &ea_channel_id, 0);
 +  BGP_WALK_CHANNELS(bgp_p, c)
 +    if (c->c.id == c_id)
 +      break;
  
-   byte *end = buf + (BGP_MAX_EXT_MSG_LENGTH - BGP_HEADER_LENGTH);
 -  struct lp_state tmpp;
 -  lp_save(tmp_linpool, &tmpp);
 +  byte *res = NULL;
-   /* FIXME: must be a bit shorter */
  
 -  struct bgp_caps *peer = p->conn->remote_caps;
 +  struct bgp_caps *peer = bgp_p->conn->remote_caps;
    const struct bgp_af_caps *rem = bgp_find_af_caps(peer, c->afi);
  
 +  struct bgp_ptx_private ptx = {
 +    .bmp = 1,
 +    .c = c,
 +  };
 +
    struct bgp_write_state s = {
 -    .proto = p,
 -    .channel = c,
 +    .proto = bgp_p,
 +    .ptx = &ptx,
      .pool = tmp_linpool,
      .mp_reach = (c->afi != BGP_AF_IPV4) || rem->ext_next_hop,
      .as4_session = 1,
@@@ -2538,31 -2533,33 +2536,31 @@@ bgp_bmp_prepare_bgp_hdr(byte *buf, cons
  }
  
  byte *
- bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, const struct rte *new)
 -bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, byte *end, const net_addr *n,
 -                 const struct rte *new, const struct rte_src *src)
++bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, byte *end, const struct rte *new)
  {
 -//  struct bgp_proto *p = (void *) c->c.proto;
    byte *pkt = buf + BGP_HEADER_LENGTH;
  
 -  ea_list *attrs = new ? new->attrs->eattrs : NULL;
 -  uint ea_size = new ? (sizeof(ea_list) + attrs->count * sizeof(eattr)) : 0;
 -  uint bucket_size = sizeof(struct bgp_bucket) + ea_size;
 -  uint prefix_size = sizeof(struct bgp_prefix) + n->length;
 +  uint ea_size = new->attrs ? (sizeof(ea_list) + new->attrs->count * sizeof(eattr)) : 0;
 +  uint prefix_size = sizeof(struct bgp_prefix) + new->net->length;
 +
 +  struct lp_state *tmpp = lp_save(tmp_linpool);
  
 -  /* Sham bucket */
 -  struct bgp_bucket *b = alloca(bucket_size);
 -  *b = (struct bgp_bucket) { };
 +  /* Temporary bucket */
 +  struct bgp_bucket *b = tmp_allocz(sizeof(struct bgp_bucket) + ea_size);
 +  b->bmp = 1;
    init_list(&b->prefixes);
  
 -  if (attrs)
 -    memcpy(b->eattrs, attrs, ea_size);
 +  if (new->attrs)
 +    memcpy(b->eattrs, new->attrs, ea_size);
  
 -  /* Sham prefix */
 -  struct bgp_prefix *px = alloca(prefix_size);
 -  *px = (struct bgp_prefix) { };
 -  px->path_id = (u32) src->private_id;
 -  net_copy(px->net, n);
 +  /* Temporary prefix */
 +  struct bgp_prefix *px = tmp_allocz(prefix_size);
 +  px->src = tmp_allocz(sizeof(struct rte_src));
 +  memcpy(px->src, new->src, sizeof(struct rte_src));
 +  px->ni = NET_TO_INDEX(new->net);
    add_tail(&b->prefixes, &px->buck_node);
  
-   byte *end = bgp_create_update_bmp(c, bgp_p, pkt, b, !!new->attrs);
 -  end = bgp_create_update_bmp(c, pkt, end, b, !!new);
++  end = bgp_create_update_bmp(c, bgp_p, pkt, end, b, !!new->attrs);
  
    if (end)
      bgp_bmp_prepare_bgp_hdr(buf, end - buf, PKT_UPDATE);
diff --cc proto/bmp/bmp.c
index 011b276e781164312141d5a5f80c2aaa59b964a3,6dcf1562a027edeaffae321381f6ebe1bdf1e2ea..05c7182545cf087c42a55a3dabb638dd9e309c10
@@@ -197,23 -200,43 +199,45 @@@ enum bmp_term_reason 
  #define IP4_MAX_TTL 255
  
  
- #define IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(expr, msg, rv...)     \
-   do {                                                                      \
-     if ((expr))                                                             \
-     {                                                                       \
-       log(L_WARN "[BMP] " msg);                                             \
-       return rv;                                                            \
-     }                                                                       \
-   } while (0)
+ #define bmp_buffer_need(b, sz)  ASSERT_DIE((b)->pos + (sz) <= (b)->end)
+ // Idea for following macros has been taken from |proto/mrt/mrt.c|
+ #define BMP_DEFINE_PUT_FUNC(S, T)                               \
+   static inline void                                            \
+   bmp_put_##S(buffer *b, const T x)                             \
+   {                                                             \
+     bmp_buffer_need(b, sizeof(T));                       \
+     put_##S(b->pos, x);                                    \
+     b->pos += sizeof(T);                                   \
+   }
  
+ BMP_DEFINE_PUT_FUNC(u8, u8)
+ BMP_DEFINE_PUT_FUNC(u16, u16)
+ BMP_DEFINE_PUT_FUNC(u32, u32)
+ BMP_DEFINE_PUT_FUNC(u64, u64)
+ BMP_DEFINE_PUT_FUNC(ip4, ip4_addr)
+ BMP_DEFINE_PUT_FUNC(ip6, ip6_addr)
  
- #define IF_PTR_IS_NULL_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(p, msg, rv...)        \
-   do {                                                                        \
-     IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(!(p), msg, rv);     \
-   } while (0)
+ static inline void
+ bmp_put_data(buffer *b, const void *src, const size_t len)
+ {
+   ASSERT_DIE(b->pos + len <= b->end);
+   memcpy(b->pos, src, len);
+   b->pos += len;
+ }
+ static inline buffer
+ bmp_default_buffer(struct bmp_proto *p)
+ {
+   return (buffer) {
+     .start = p->msgbuf,
+     .pos = p->msgbuf,
+     .end = p->msgbuf + sizeof p->msgbuf,
+   };
+ }
  
 +static const struct ea_class *bgp_next_hop_ea_class = NULL;
 +
  static void bmp_connected(struct birdsock *sk);
  static void bmp_sock_err(sock *sk, int err);
  static void bmp_close_socket(struct bmp_proto *p);
@@@ -228,18 -248,44 +252,44 @@@ bmp_send_peer_up_notif_msg(struct bmp_p
  
  static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs);
  
- // Stores necessary any data in list
- struct bmp_data_node {
-   node n;
-   byte *data;
-   size_t data_size;
-   u32 remote_as;
-   u32 remote_id;
-   ip_addr remote_ip;
-   btime timestamp;
-   bool global_peer;
-   bool policy;
+ // Stores TX data
+ struct bmp_tx_buffer {
+   struct bmp_tx_buffer *next;
+   byte *pos;
+   byte data[];
+ };
+ #define bmp_tx_remains(b) (((byte *) (b) + page_size) - (b)->pos)
+ /* A dummy resource to accurately show memory pages allocated for pending TX */
+ struct bmp_tx_resource {
+   resource r;
+   struct bmp_proto *p;
+ };
+ static void
+ bmp_tx_resource_free(resource *r UNUSED) {}
+ static void
 -bmp_tx_resource_dump(resource *r UNUSED) {}
++bmp_tx_resource_dump(resource *r UNUSED, uint indent UNUSED) {}
+ static struct resmem
+ bmp_tx_resource_memsize(resource *r)
+ {
+   struct bmp_proto *p = SKIP_BACK(struct bmp_tx_resource, r, r)->p;
+   return (struct resmem) {
+     .effective = p->tx_pending_count * page_size,
+     .overhead = sizeof(struct bmp_tx_resource),
+   };
+ }
+ static struct resclass bmp_tx_resource_class = {
+   .name = "BMP TX buffers",
+   .size = sizeof(struct bmp_tx_resource),
+   .free = bmp_tx_resource_free,
+   .dump = bmp_tx_resource_dump,
+   .memsize = bmp_tx_resource_memsize,
  };
  
  static void
@@@ -282,17 -329,58 +333,58 @@@ bmp_schedule_tx_packet(struct bmp_prot
  {
    ASSERT(p->started);
  
-   struct bmp_data_node *tx_data = mb_allocz(p->tx_mem_pool, sizeof (struct bmp_data_node));
-   tx_data->data = mb_allocz(p->tx_mem_pool, size);
-   memcpy(tx_data->data, payload, size);
-   tx_data->data_size = size;
-   add_tail(&p->tx_queue, &tx_data->n);
-   if (sk_tx_buffer_empty(p->sk)
-       && !ev_active(p->tx_ev))
+   while (size)
    {
 -      return ev_schedule(p->tx_overflow_event);
+     if (!p->tx_last || !bmp_tx_remains(p->tx_last))
+     {
+       if (p->tx_pending_count >= p->tx_pending_limit)
 -    ev_schedule(p->tx_ev);
++      return ev_send_loop(p->p.loop, p->tx_overflow_event);
+       p->tx_pending_count++;
+       struct bmp_tx_buffer *btb = alloc_page();
+       btb->pos = btb->data;
+       btb->next = NULL;
+       if (p->tx_last)
+       {
+       ASSERT_DIE(!p->tx_last->next);
+       p->tx_last->next = btb;
+       }
+       else
+       ASSERT_DIE(p->tx_pending_count == 1);
+       p->tx_last = btb;
+       if (!p->tx_pending)
+       p->tx_pending = btb;
+     }
+     size_t cpylen = bmp_tx_remains(p->tx_last);
+     if (size < cpylen)
+       cpylen = size;
+     memcpy(p->tx_last->pos, payload, cpylen);
+     p->tx_last->pos += cpylen;
+     payload += cpylen;
+     size -= cpylen;
+   }
+   if (!p->sk->tbuf && !ev_active(p->tx_ev))
 +    ev_send_loop(p->p.loop, p->tx_ev);
+ }
+ static void
+ bmp_tx_buffer_free(struct bmp_proto *p, struct bmp_tx_buffer *btb)
+ {
+   if (btb == p->tx_last)
+   {
+     p->tx_last = NULL;
+     ASSERT_DIE(!p->tx_pending_count);
    }
+   free_page(btb);
  }
  
  static void
@@@ -303,40 -391,33 +395,33 @@@ bmp_fire_tx(void *p_
    if (!p->started)
      return;
  
-   IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
-     EMPTY_LIST(p->tx_queue),
-     "Called BMP TX event handler when there is not any data to send"
-   );
-   size_t cnt = 0; // Counts max packets which we want to send per TX slot
-   struct bmp_data_node *tx_data;
-   struct bmp_data_node *tx_data_next;
-   WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue)
+   int cnt = 0;
+   for (struct bmp_tx_buffer *btb; btb = p->tx_pending; )
    {
-     if (tx_data->data_size > p->sk->tbsize)
-     {
-       sk_set_tbsize(p->sk, tx_data->data_size);
-     }
+     ASSERT_DIE(!p->sk->tbuf);
+     p->sk->tbuf = btb->data;
+     u64 sz = btb->pos - btb->data;
+     p->tx_sent += sz;
+     p->tx_sent_total += sz;
+     if (p->tx_pending == p->tx_last)
+       p->tx_last = NULL;
  
-     size_t data_size = tx_data->data_size;
-     memcpy(p->sk->tbuf, tx_data->data, data_size);
-     mb_free(tx_data->data);
-     rem_node((node *) tx_data);
-     mb_free(tx_data);
+     p->tx_pending = btb->next;
+     p->tx_pending_count--;
  
-     if (sk_send(p->sk, data_size) <= 0)
+     if (sk_send(p->sk, sz) <= 0)
        return;
  
-     // BMP packets should be treat with lowest priority when scheduling sending
-     // packets to target. That's why we want to send max. 32 packets per event
-     // call
-     if (++cnt > 32)
+     p->sk->tbuf = NULL;
+     bmp_tx_buffer_free(p, btb);
+     if (cnt++ > 1024)
      {
        if (!ev_active(p->tx_ev))
-       {
 -      ev_schedule(p->tx_ev);
 +        ev_send_loop(p->p.loop, p->tx_ev);
-       }
        return;
      }
    }
@@@ -805,106 -888,65 +930,68 @@@ bmp_send_peer_up_notif_msg(struct bmp_p
  {
    ASSERT(p->started);
  
 -  const struct birdsock *sk = bmp_get_birdsock_ext(bgp);
 -  if (!sk)
 -  {
 -    log(L_WARN "%s: No BGP socket", p->p.name);
 -    return;
 -  }
 -
 +  const int rem_as = ea_get_int(bgp, &ea_bgp_rem_as, 0);
 +  const int rem_id = ea_get_int(bgp, &ea_bgp_rem_id, 0);
    const bool is_global_instance_peer = bmp_is_peer_global_instance(bgp);
-   buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
 +
+   buffer payload = bmp_default_buffer(p);
    bmp_peer_up_notif_msg_serialize(&payload, is_global_instance_peer,
 -    bgp->remote_as, bgp->remote_id, 1,
 -    sk->saddr, sk->daddr, sk->sport, sk->dport, tx_data, tx_data_size,
 -    rx_data, rx_data_size);
 +    rem_as, rem_id, 1,
 +    sk->saddr, sk->daddr, sk->sport, sk->dport, tx_data, rx_data);
-   bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-   bmp_buffer_free(&payload);
+   bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
  }
  
  static void
- bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp)
+ bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, byte *data, size_t length, btime timestamp)
  {
-   struct bmp_data_node *upd_msg = mb_allocz(p->update_msg_mem_pool,
-                                sizeof (struct bmp_data_node));
-   upd_msg->data = mb_alloc(p->update_msg_mem_pool, length);
-   memcpy(upd_msg->data, data, length);
-   upd_msg->data_size = length;
-   add_tail(&p->update_msg_queue, &upd_msg->n);
-   /* Save some metadata */
 -  struct bgp_proto *bgp = bs->bgp;
 +  ea_list *bgp = bs->bgp;
-   upd_msg->remote_as = ea_get_int(bgp, &ea_bgp_rem_as, 0);
-   upd_msg->remote_id = ea_get_int(bgp, &ea_bgp_rem_id, 0);
-   upd_msg->remote_ip = ea_get_ip(bgp, &ea_bgp_rem_ip, IPA_NONE);
-   upd_msg->timestamp = timestamp;
-   upd_msg->global_peer = bmp_is_peer_global_instance(bgp);
-   upd_msg->policy = bmp_stream_policy(bs);
-   /* Kick the commit */
-   if (!ev_active(p->update_ev))
-     ev_send_loop(p->p.loop, p->update_ev);
+   const byte *start = bmp_route_monitor_msg_serialize(p,
+       bmp_is_peer_global_instance(bgp),
+       bmp_stream_policy(bs),
 -      bgp->remote_as,
 -      bgp->remote_id,
++      ea_get_int(bgp, &ea_bgp_rem_as, 0),
++      ea_get_int(bgp, &ea_bgp_rem_id, 0),
+       true,
 -      bgp->remote_ip,
++      ea_get_ip(bgp, &ea_bgp_rem_ip, IPA_NONE),
+       data,
+       length,
+       timestamp
+       );
+   bmp_schedule_tx_packet(p, start, (data - start) + length);
  }
  
  static void
 -bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs,
 -                       const net_addr *n, const struct rte *new, const struct rte_src *src)
 +bmp_route_monitor_notify(struct bmp_proto *p, struct bgp_proto *bgp_p, u32 afi, bool policy, const rte *new, ea_list *old)
  {
-   byte buf[BGP_MAX_EXT_MSG_LENGTH];
-   byte *end = bgp_bmp_encode_rte(bs->sender, bgp_p, buf, new);
 +  /* Idempotent update */
 +  if ((old == new->attrs) || old && new->attrs && ea_same(old, new->attrs))
 +    return;
 +
 +  /* No stream, probably flushed already */
 +  struct bmp_stream *bs = bmp_find_stream(p, bgp_p, afi, policy);
 +  if (!bs)
 +    return;
 +
 -  byte *end = bgp_bmp_encode_rte(bs->sender, begin, bufend, n, new, src);
+   byte *bufend = &p->msgbuf[sizeof p->msgbuf];
+   byte *begin = bufend - BGP_MAX_EXT_MSG_LENGTH;
++  byte *end = bgp_bmp_encode_rte(bs->sender, bgp_p, begin, bufend, new);
  
 -  btime delta_t = new ? current_time() - new->lastmod : 0;
 +  btime delta_t = new->attrs ? current_time() - new->lastmod : 0;
    btime timestamp = current_real_time() - delta_t;
  
    if (end)
-     bmp_route_monitor_put_update(p, bs, buf, end - buf, timestamp);
+     bmp_route_monitor_put_update(p, bs, begin, end - begin, timestamp);
    else
 -    log(L_WARN "%s: Cannot encode update for %N", p->p.name, n);
 +    log(L_WARN "%s: Cannot encode update for %N", p->p.name, new->net);
  }
  
- static void
- bmp_route_monitor_commit(void *p_)
- {
-   struct bmp_proto *p = p_;
-   if (!p->started)
-     return;
-   buffer payload
-     = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
-   struct bmp_data_node *data, *data_next;
-   WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue)
-   {
-     bmp_route_monitor_msg_serialize(&payload,
-       data->global_peer, data->policy,
-       data->remote_as, data->remote_id, true,
-       data->remote_ip, data->data, data->data_size,
-       data->timestamp);
-     bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-     bmp_buffer_flush(&payload);
-     mb_free(data->data);
-     rem_node(&data->n);
-     mb_free(data);
-   }
-   bmp_buffer_free(&payload);
- }
  static void
  bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
  {
 -  TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", bs->bgp->p.name, bs->sender->c.name);
 +  TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", ea_get_adata(bs->bgp, &ea_name)->data, ea_get_adata(bs->sender, &ea_name)->data);
  
-   byte rx_end_payload[DEFAULT_MEM_BLOCK_SIZE];
+   byte *rx_end_payload = p->msgbuf + BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE;
 -  byte *pos = bgp_create_end_mark_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH);
 +  byte *pos = bgp_create_end_mark_ea_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH);
    memset(rx_end_payload + BGP_MSG_HDR_MARKER_POS, 0xff,
         BGP_MSG_HDR_MARKER_SIZE); // BGP UPDATE MSG marker
    put_u16(rx_end_payload + BGP_MSG_HDR_LENGTH_POS, pos - rx_end_payload);
  }
  
  static void
 -bmp_send_peer_down_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
 +bmp_send_peer_down_notif_msg(struct bmp_proto *p, ea_list *bgp,
-   const byte *data, const size_t data_size)
+                            const struct bmp_peer_down_info *info)
  {
    ASSERT(p->started);
  
 -  const struct bgp_caps *remote_caps = bmp_get_bgp_remote_caps_ext(bgp);
 +  //const struct bgp_caps *remote_caps = bmp_get_bgp_remote_caps_ext(bgp);
 +  int remote_caps = ea_get_int(bgp, &ea_bgp_as4_session, 0);
 +  int in_state = ea_get_int(bgp, &ea_bgp_in_conn_state, 0);
 +  int out_state = ea_get_int(bgp, &ea_bgp_out_conn_state, 0);
 +  int in_as4 = ea_get_int(bgp, &ea_bgp_as4_in_conn, 0);
 +  int out_as4 = ea_get_int(bgp, &ea_bgp_as4_out_conn, 0);
 +
 +  if (in_state && in_as4)
 +    remote_caps = in_as4;
 +  else if (out_state && out_as4)
 +    remote_caps = out_as4;
 +
    bool is_global_instance_peer = bmp_is_peer_global_instance(bgp);
-   buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+   buffer payload = bmp_default_buffer(p);
 -  bmp_peer_down_notif_msg_serialize(&payload, is_global_instance_peer,
 -    bgp->remote_as, bgp->remote_id,
 -    remote_caps ? remote_caps->as4_support : bgp->as4_session,
 -    bgp->remote_ip, info);
 +  bmp_peer_down_notif_msg_serialize(
 +      &payload,
 +      is_global_instance_peer,
 +      ea_get_int(bgp, &ea_bgp_rem_as, 0),
 +      ea_get_int(bgp, &ea_bgp_rem_id, 0),
 +      remote_caps,
 +      *((ip_addr *) ea_get_adata(bgp, &ea_bgp_rem_ip)->data),
-       data,
-       data_size
++      info
 +      );
-   bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-   bmp_buffer_free(&payload);
+   bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
  }
  
  static void
@@@ -958,15 -982,19 +1042,19 @@@ bmp_peer_down_(struct bmp_proto *p, ea_
    if (!bp)
      return;
  
 -  TRACE(D_STATES, "Peer down for %s", bgp->p.name);
 +  TRACE(D_STATES, "Peer down for %s", ea_find(bgp, &ea_name)->u.ad->data);
  
-   uint bmp_code = 0;
-   uint fsm_code = 0;
+   struct bmp_peer_down_info info = {
 -    .err_code = err_code,
 -    .err_subcode = err_subcode,
 -    .data = data,
 -    .length = length,
++    .err_code = bscad->notify_code,
++    .err_subcode = bscad->notify_subcode,
++    .data = bscad->data,
++    .length = bscad->ad.length - sizeof *bscad + sizeof bscad->ad,
+   };
  
 -  switch (err_class)
 +  switch (bscad->last_error_class)
    {
    case BE_BGP_RX:
-     bmp_code = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION;
+     info.reason = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION;
      break;
  
    case BE_BGP_TX:
@@@ -1020,51 -1038,52 +1090,52 @@@ bmp_send_termination_msg(struct bmp_pro
    bmp_put_u16(&stream, BMP_TERM_INFO_REASON);
    bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason
    bmp_put_u16(&stream, reason);
-   memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream));
-   IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
-     sk_send(p->sk, bmp_buffer_pos(&stream)) < 0,
-     "Failed to send BMP termination message"
-     );
  
-   bmp_buffer_free(&stream);
+   if (p->sk->tbuf)
+     bmp_tx_buffer_free(p, SKIP_BACK(struct bmp_tx_buffer, data, p->sk->tbuf));
+   p->sk->tbuf = stream.start;
+   if (sk_send(p->sk, stream.pos - stream.start) < 0)
+     log(L_WARN "%s: Cannot send BMP termination message", p->p.name);
+   p->sk->tbuf = NULL;
  }
  
 -int
 -bmp_preexport(struct channel *C UNUSED, rte *e)
 +static void
 +bmp_split_policy(struct bmp_proto *p, const rte *new, const rte *old)
  {
 -  /* Reject non-direct routes */
 -  if (e->src->proto != e->sender->proto)
 -    return -1;
 +  rte loc = *(new ?: old);
  
 -  /* Reject non-BGP routes */
 -  if (e->sender->channel != &channel_bgp)
 -    return -1;
 +  struct proto *rte_proto = (struct proto*) SKIP_BACK(struct proto, sources, loc.src->owner);
 +  struct bgp_proto *bgp = (struct bgp_proto *) rte_proto;
 +  struct bgp_channel *src_ch = SKIP_BACK(struct bgp_channel, c.in_req, loc.sender->req);
  
 -  return 1;
 -}
 +  /* Ignore piped routes */
 +  if (src_ch->c.proto != rte_proto)
 +    return;
  
 -static void
 -bmp_rt_notify(struct proto *P, struct channel *c, struct network *net,
 -              struct rte *new, struct rte *old)
 -{
 -  struct bmp_proto *p = (void *) P;
 +  /* Ignore non-BGP routes */
 +  if (rte_proto->proto != &proto_bgp)
 +    return;
  
 -  struct bgp_channel *src = (void *) (new ?: old)->sender;
 -  struct bgp_proto *bgp = (void *) src->c.proto;
 -  bool policy = (c->table == src->c.table);
 +  /* Checking the pre policy */
 +  if (p->monitoring_rib.in_pre_policy)
 +  {
 +    /* Compute the pre policy attributes */
 +    loc.attrs = new ? ea_strip_to(new->attrs, BIT32_ALL(EALS_PREIMPORT)) : NULL;
 +    ea_list *old_attrs = old ? ea_strip_to(old->attrs, BIT32_ALL(EALS_PREIMPORT)) : NULL;
  
 -  /*
 -   * We assume that we receive peer_up before the first route and peer_down
 -   * synchronously with BGP session close. So if bmp_stream exists, the related
 -   * BGP session is up and could be accessed. That may not be true in
 -   * multithreaded setup.
 -   */
 +    bmp_route_monitor_notify(p, bgp, src_ch->afi, false, &loc, old_attrs);
 +  }
  
 -  struct bmp_stream *bs = bmp_find_stream(p, bgp, src->afi, policy);
 -  if (!bs)
 -    return;
 +  /* Checking the post policy */
 +  if (p->monitoring_rib.in_post_policy)
 +  {
 +    /* Compute the post policy attributes */
 +    loc.attrs = new ? ea_normalize(new->attrs, 0) : NULL;
 +    ea_list *old_attrs = old ? ea_normalize(old->attrs, 0) : NULL;
  
 -  bmp_route_monitor_notify(p, bs, net->n.addr, new, (new ?: old)->src);
 +    bmp_route_monitor_notify(p, bgp, src_ch->afi, true, &loc, old_attrs);
 +  }
  }
  
  static void
@@@ -1145,44 -1134,15 +1216,43 @@@ bmp_startup(struct bmp_proto *p
    proto_notify_state(&p->p, PS_UP);
  
    /* Send initiation message */
-   buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+   buffer payload = bmp_default_buffer(p);
    bmp_init_msg_serialize(&payload, p->sys_descr, p->sys_name);
-   bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-   bmp_buffer_free(&payload);
+   bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
  
    /* Send Peer Up messages */
 -  struct proto *peer;
 -  WALK_LIST(peer, proto_list)
 -    if ((peer->proto->class == PROTOCOL_BGP) && (peer->proto_state == PS_UP))
 -      bmp_peer_init(p, (struct bgp_proto *) peer);
 +  u32 length;
 +  PST_LOCKED(ts) /* The size of protos field will never decrease, the inconsistency caused by growing is not important */
 +    length = ts->length_states;
 +
 +  /* Subscribe to protocol state changes */
 +  p->proto_state_reader = (struct lfjour_recipient) {
 +    .event = &p->proto_state_changed,
 +    .target = proto_event_list(&p->p),
 +  };
 +
 +  p->proto_state_changed = (event) {
 +    .hook = bmp_proto_state_changed,
 +    .data = p,
 +  };
 +
 +  proto_states_subscribe(&p->proto_state_reader);
 +
 +  /* Load protocol states */
 +  for (u32 i = 0; i < length; i++)
 +  {
 +    ea_list *proto_attr = proto_get_state(i);
 +    if (proto_attr == NULL)
 +      continue;
 +
 +    struct protocol *proto = (struct protocol *) ea_get_ptr(proto_attr, &ea_protocol_type, 0);
 +    const int state = ea_get_int(proto_attr, &ea_state, 0);
 +
 +    if (proto != &proto_bgp || state != PS_UP)
 +      continue;
 +
 +    bmp_peer_up_inout(p, proto_attr, false);
 +  }
  }
  
  /**
@@@ -1282,12 -1239,31 +1352,32 @@@ bmp_sock_err(sock *sk, int err
      bmp_down(p);
  
    bmp_close_socket(p);
 -  tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME);
 +  tm_start_in(p->connect_retry_timer, CONNECT_RETRY_TIME, p->p.loop);
  
 -  proto_notify_state(&p->p, PS_START);
 +  if (p->p.proto_state == PS_UP)
 +    proto_notify_state(&p->p, PS_START);
  }
  
+ static void
+ bmp_tx_overflow(void *_p)
+ {
+   struct bmp_proto *p = _p;
+   if (p->tx_pending_count < p->tx_pending_limit)
+     return;
+   p->sock_err = 0;
+   log(L_ERR "%s: Connection stalled", p->p.name);
+   if (p->started)
+     bmp_down(p);
+   bmp_close_socket(p);
+   tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME);
+   proto_notify_state(&p->p, PS_START);
+ }
  /* BMP connect timeout event - switch from Idle/Connect state to Connect state */
  static void
  bmp_connection_retry(timer *t)
@@@ -1404,12 -1367,11 +1515,10 @@@ bmp_start(struct proto *P
    HASH_INIT(p->stream_map, P->pool, 4);
    HASH_INIT(p->table_map, P->pool, 4);
  
-   init_list(&p->tx_queue);
-   init_list(&p->update_msg_queue);
    p->started = false;
    p->sock_err = 0;
 -  add_tail(&bmp_proto_list, &p->bmp_node);
  
 -  tm_start(p->connect_retry_timer, CONNECT_INIT_TIME);
 +  tm_start_in(p->connect_retry_timer, CONNECT_INIT_TIME, p->p.loop);
  
    return PS_START;
  }
diff --cc proto/bmp/bmp.h
index 0c75181a8d959476ff9c5f77bc545d21007565cc,52f7212252afd988765a565f206f09bc61c02a52..9ab379221eb3fc00b1c5d39532aa2c5a2109b947
@@@ -63,17 -76,17 +75,20 @@@ struct bmp_proto 
    struct monitoring_rib monitoring_rib;
    // Below fields are for internal use
    // struct bmp_peer_map bgp_peers;   // Stores 'bgp_proto' structure per BGP peer
-   pool *buffer_mpool;              // Memory pool used for BMP buffer allocations
-   pool *tx_mem_pool;               // Memory pool used for packet allocations designated to BMP collector
-   pool *update_msg_mem_pool;       // Memory pool used for BPG UPDATE MSG allocations
-   list tx_queue;                   // Stores queued packets going to be sent
+   struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush
+   struct bmp_tx_buffer *tx_last;   // This buffer is the last to flush 
+   uint tx_pending_count;         // How many buffers waiting for flush
+   uint tx_pending_limit;         // Maximum on buffer count
+   u64 tx_sent;                           // Amount of data sent
+   u64 tx_sent_total;             // Amount of data sent accumulated over reconnections
+   event *tx_overflow_event;      // Too many buffers waiting for flush
    timer *connect_retry_timer;      // Timer for retrying connection to the BMP collector
-   list update_msg_queue;           // Stores all composed BGP UPDATE MSGs
    bool started;                    // Flag that stores running status of BMP instance
    int sock_err;                    // Last socket error code
 +
 +  struct lfjour_recipient proto_state_reader; // Reader of protocol states
 +  event proto_state_changed;
+   byte msgbuf[BMP_MSGBUF_LEN];     // Buffer for preparing the messages before sending them out
  };
  
  struct bmp_peer {
index 556339433da59213c918d74068ad1a5c7a109819,04540b26bf9b9c0c3580481241b6b5a976416536..f7e0eb46c1643afb5057e83ea0fac83f43ca6be3
@@@ -25,9 -25,9 +25,10 @@@ proto: bmp_proto '}' 
  
  bmp_proto_start: proto_start BMP {
       this_proto = proto_config_new(&proto_bmp, $1);
 +     this_proto->loop_order = DOMAIN_ORDER(proto);
       BMP_CFG->sys_descr = "Not defined";
       BMP_CFG->sys_name = "Not defined";
+      BMP_CFG->tx_pending_limit = (1 << 30) / page_size;
     }
   ;