]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
BMP: Refactor route monitoring
authorOndrej Zajicek <santiago@crfreenet.org>
Fri, 18 Aug 2023 01:53:58 +0000 (03:53 +0200)
committerOndrej Zajicek <santiago@crfreenet.org>
Fri, 18 Aug 2023 01:53:58 +0000 (03:53 +0200)
 - Manage BMP state through bmp_peer, bmp_stream, bmp_table structures
 - Use channels and rt_notify() hook for route announcements
 - Add support for post-policy monitoring
 - Send End-of-RIB even when there is no routes
 - Remove rte_update_in_notify() hook from import tables
 - Update import tables to support channels
 - Add bmp_hack (no feed / no flush) flag to channels

nest/proto.c
nest/protocol.h
nest/rt-table.c
proto/bgp/bgp.c
proto/bmp/bmp.c
proto/bmp/bmp.h
proto/bmp/config.Y

index 885a0b7523fa1017001e8abd4cd125c816a04bfd..16245dcac9d0ce5dbcf113163ff95d8fff8b252b 100644 (file)
@@ -179,6 +179,7 @@ proto_add_channel(struct proto *p, struct channel_config *cf)
   c->merge_limit = cf->merge_limit;
   c->in_keep_filtered = cf->in_keep_filtered;
   c->rpki_reload = cf->rpki_reload;
+  c->bmp_hack = cf->bmp_hack;
 
   c->channel_state = CS_DOWN;
   c->export_state = ES_DOWN;
@@ -450,7 +451,10 @@ channel_start_export(struct channel *c)
   ASSERT(c->channel_state == CS_UP);
   ASSERT(c->export_state == ES_DOWN);
 
-  channel_schedule_feed(c, 1); /* Sets ES_FEEDING */
+  if (!c->bmp_hack)
+    channel_schedule_feed(c, 1);       /* Sets ES_FEEDING */
+  else
+    c->export_state = ES_READY;
 }
 
 static void
@@ -523,7 +527,7 @@ channel_setup_in_table(struct channel *c)
   cf->addr_type = c->net_type;
   cf->internal = 1;
 
-  c->in_table = rt_setup(c->proto->pool, cf);
+  c->in_table = cf->table = rt_setup(c->proto->pool, cf);
 
   c->reload_event = ev_new_init(c->proto->pool, channel_reload_loop, c);
 }
@@ -574,7 +578,8 @@ channel_do_up(struct channel *c)
 static void
 channel_do_flush(struct channel *c)
 {
-  rt_schedule_prune(c->table);
+  if (!c->bmp_hack)
+    rt_schedule_prune(c->table);
 
   c->gr_wait = 0;
   if (c->gr_lock)
index da6d434e6e1a5522da03bf6dcbef00c00468d118..d94a11bc03b1c874a89e95a12435f2a318c2db06 100644 (file)
@@ -214,7 +214,6 @@ struct proto {
   void (*if_notify)(struct proto *, unsigned flags, struct iface *i);
   void (*ifa_notify)(struct proto *, unsigned flags, struct ifa *a);
   void (*rt_notify)(struct proto *, struct channel *, struct network *net, struct rte *new, struct rte *old);
-  void (*rte_update_in_notify)(struct channel *, const net_addr *, const struct rte *, const struct rte_src *);
   void (*neigh_notify)(struct neighbor *neigh);
   int (*preexport)(struct channel *, struct rte *rt);
   void (*reload_routes)(struct channel *);
@@ -477,7 +476,8 @@ struct channel_class {
 #endif
 };
 
-extern struct channel_class channel_bgp;
+extern const struct channel_class channel_basic;
+extern const struct channel_class channel_bgp;
 
 struct channel_config {
   node n;
@@ -500,6 +500,7 @@ struct channel_config {
   u8 merge_limit;                      /* Maximal number of nexthops for RA_MERGED */
   u8 in_keep_filtered;                 /* Routes rejected in import filter are kept */
   u8 rpki_reload;                      /* RPKI changes trigger channel reload */
+  u8 bmp_hack;                         /* No feed, no flush */
 };
 
 struct channel {
@@ -552,6 +553,7 @@ struct channel {
   u8 reload_pending;                   /* Reloading and another reload is scheduled */
   u8 refeed_pending;                   /* Refeeding and another refeed is scheduled */
   u8 rpki_reload;                      /* RPKI changes trigger channel reload */
+  u8 bmp_hack;                         /* No feed, no flush */
 
   struct rtable *out_table;            /* Internal table for exported routes */
 
@@ -620,6 +622,7 @@ static inline struct channel_config *proto_cf_main_channel(struct proto_config *
 struct channel *proto_find_channel_by_table(struct proto *p, struct rtable *t);
 struct channel *proto_find_channel_by_name(struct proto *p, const char *n);
 struct channel *proto_add_channel(struct proto *p, struct channel_config *cf);
+void proto_remove_channel(struct proto *p, struct channel *c);
 int proto_configure_channel(struct proto *p, struct channel **c, struct channel_config *cf);
 
 void channel_set_state(struct channel *c, uint state);
index 2b065032e7cbe66062810e5f025d1038a4807c6e..e8478c36470299dbe03b9b723cef709bd88267af 100644 (file)
@@ -3063,6 +3063,23 @@ rt_feed_channel_abort(struct channel *c)
  *     Import table
  */
 
+static void
+rte_announce_in(struct rtable *tab, struct network *net, struct rte *new, struct rte *old)
+{
+  struct channel *c; node *n;
+  WALK_LIST2(c, n, tab->channels, table_node)
+  {
+    if (c->export_state == ES_DOWN)
+      continue;
+
+    if (c->ra_mode != RA_ANY)
+      continue;
+
+    struct proto *p = c->proto;
+    p->rt_notify(p, c, net, new, old);
+  }
+}
+
 int
 rte_update_in(struct channel *c, const net_addr *n, rte *new, struct rte_src *src)
 {
@@ -3096,9 +3113,6 @@ rte_update_in(struct channel *c, const net_addr *n, rte *new, struct rte_src *sr
        {
          old->flags &= ~(REF_STALE | REF_DISCARD | REF_MODIFY);
 
-         if (c->proto->rte_update_in_notify)
-           c->proto->rte_update_in_notify(c, n, old, src);
-
          return 1;
        }
 
@@ -3111,28 +3125,15 @@ rte_update_in(struct channel *c, const net_addr *n, rte *new, struct rte_src *sr
 
       /* Remove the old rte */
       *pos = old->next;
-      rte_free_quick(old);
       tab->rt_count--;
-
       break;
     }
 
-  if (!new)
-  {
-    if (!old)
-      goto drop_withdraw;
-
-    if (!net->routes)
-      fib_delete(&tab->fib, net);
-
-    if (c->proto->rte_update_in_notify)
-      c->proto->rte_update_in_notify(c, n, NULL, src);
-
-    return 1;
-  }
+  if (!old && !new)
+    goto drop_withdraw;
 
   struct channel_limit *l = &c->rx_limit;
-  if (l->action && !old)
+  if (l->action && !old && new)
   {
     if (tab->rt_count >= l->limit)
       channel_notify_limit(c, l, PLD_RX, tab->rt_count);
@@ -3147,18 +3148,26 @@ rte_update_in(struct channel *c, const net_addr *n, rte *new, struct rte_src *sr
     }
   }
 
-  /* Insert the new rte */
-  rte *e = rte_do_cow(new);
-  e->flags |= REF_COW;
-  e->net = net;
-  e->sender = c;
-  e->lastmod = current_time();
-  e->next = *pos;
-  *pos = e;
-  tab->rt_count++;
+  if (new)
+  {
+    /* Insert the new rte */
+    rte *e = rte_do_cow(new);
+    e->flags |= REF_COW;
+    e->net = net;
+    e->sender = c;
+    e->lastmod = current_time();
+    e->next = *pos;
+    *pos = new = e;
+    tab->rt_count++;
+  }
 
-  if (c->proto->rte_update_in_notify)
-    c->proto->rte_update_in_notify(c, n, e, src);
+  rte_announce_in(tab, net, new, old);
+
+  if (old)
+    rte_free_quick(old);
+
+  if (!net->routes)
+    fib_delete(&tab->fib, net);
 
   return 1;
 
index 54da225385c489886b72582d0366d22982187397..ccaa5067c1d1c892f1f81474c11917a417d8b7f8 100644 (file)
@@ -1714,10 +1714,6 @@ bgp_init(struct proto_config *CF)
   P->rte_modify = bgp_rte_modify_stale;
   P->rte_igp_metric = bgp_rte_igp_metric;
 
-#ifdef CONFIG_BMP
-  P->rte_update_in_notify = bmp_route_monitor_update_in_notify;
-#endif
-
   p->cf = cf;
   p->is_internal = (cf->local_as == cf->remote_as);
   p->is_interior = p->is_internal || cf->confederation_member;
@@ -2643,7 +2639,7 @@ bgp_show_proto_info(struct proto *P)
   }
 }
 
-struct channel_class channel_bgp = {
+const struct channel_class channel_bgp = {
   .channel_size =      sizeof(struct bgp_channel),
   .config_size =       sizeof(struct bgp_channel_config),
   .init =              bgp_channel_init,
index f8899863c1558c4c81baa5864287e330b348408d..ec1614f57d8d1df5bfd3b352490b9f468aaf640c 100644 (file)
 // List of BMP instances
 static list STATIC_LIST_INIT(bmp_proto_list);
 
+#define HASH_PEER_KEY(n)               n->bgp
+#define HASH_PEER_NEXT(n)              n->next
+#define HASH_PEER_EQ(b1,b2)            b1 == b2
+#define HASH_PEER_FN(b)                        ptr_hash(b)
+
+#define BMP_STREAM_KEY_POLICY          0x100
+
+#define HASH_STREAM_KEY(n)             n->bgp, n->key
+#define HASH_STREAM_NEXT(n)            n->next
+#define HASH_STREAM_EQ(b1,k1,b2,k2)    b1 == b2 && k1 == k2
+#define HASH_STREAM_FN(b,k)            ptr_hash(b) ^ u32_hash(k)
+
+#define HASH_TABLE_KEY(n)              n->table
+#define HASH_TABLE_NEXT(n)             n->next
+#define HASH_TABLE_EQ(t1,t2)           t1 == t2
+#define HASH_TABLE_FN(t)               ptr_hash(t)
+
 /* BMP Common Header [RFC 7854 - Section 4.1] */
 enum bmp_version {
   BMP_VER_UNUSED = 0, // Version 0 is reserved and MUST NOT be sent
@@ -217,10 +234,11 @@ struct bmp_data_node {
   ip_addr remote_ip;
   btime timestamp;
   bool global_peer;
+  bool policy;
 };
 
 static void
-bmp_route_monitor_pre_policy_table_in_snapshot(struct bmp_proto *p, struct bgp_channel *c);
+bmp_route_monitor_snapshot(struct bmp_proto *p, struct bmp_stream *bs);
 
 static void
 bmp_common_hdr_serialize(buffer *stream, const enum bmp_message_type type, const u32 data_size)
@@ -359,7 +377,7 @@ bmp_put_bgp_hdr(buffer *stream, const u8 msg_type, const u16 msg_length)
 /**
  * bmp_per_peer_hdr_serialize - serializes Per-Peer Header
  *
- * @is_pre_policy: indicate the message reflects the pre-policy Adj-RIB-In
+ * @is_post_policy: indicate the message reflects the post-policy Adj-RIB-In
  * @peer_addr: the remote IP address associated with the TCP session
  * @peer_as: the Autonomous System number of the peer
  * @peer_bgp_id: the BGP Identifier of the peer
@@ -368,7 +386,7 @@ bmp_put_bgp_hdr(buffer *stream, const u8 msg_type, const u16 msg_length)
  */
 static void
 bmp_per_peer_hdr_serialize(buffer *stream, const bool is_global_instance_peer,
-  const bool is_pre_policy, const bool is_as_path_4bytes,
+  const bool is_post_policy, const bool is_as_path_4bytes,
   const ip_addr peer_addr, const u32 peer_as, const u32 peer_bgp_id,
   const u32 ts_sec, const u32 ts_usec)
 {
@@ -379,9 +397,9 @@ bmp_per_peer_hdr_serialize(buffer *stream, const bool is_global_instance_peer,
   const u8 peer_flag_v = ipa_is_ip4(peer_addr)
                            ? BMP_PEER_HDR_FLAG_V_IP4
                            : BMP_PEER_HDR_FLAG_V_IP6;
-  const u8 peer_flag_l = is_pre_policy
-                           ? BMP_PEER_HDR_FLAG_L_PRE_POLICY_ADJ_RIB_IN
-                           : BMP_PEER_HDR_FLAG_L_POST_POLICY_ADJ_RIB_IN;
+  const u8 peer_flag_l = is_post_policy
+                           ? BMP_PEER_HDR_FLAG_L_POST_POLICY_ADJ_RIB_IN
+                           : BMP_PEER_HDR_FLAG_L_PRE_POLICY_ADJ_RIB_IN;
   const u8 peer_flag_a = is_as_path_4bytes
                            ? BMP_PEER_HDR_FLAG_A_AS_PATH_4B
                            : BMP_PEER_HDR_FLAG_A_AS_PATH_2B;
@@ -405,7 +423,7 @@ bmp_per_peer_hdr_serialize(buffer *stream, const bool is_global_instance_peer,
 /* [4.6] Route Monitoring */
 static void
 bmp_route_monitor_msg_serialize(buffer *stream, const bool is_peer_global,
-  const bool table_in_pre_policy, const u32 peer_as, const u32 peer_bgp_id,
+  const bool table_in_post_policy, const u32 peer_as, const u32 peer_bgp_id,
   const bool as4_support, const ip_addr remote_addr, const byte *update_msg,
   const size_t update_msg_size, btime timestamp)
 {
@@ -415,7 +433,7 @@ bmp_route_monitor_msg_serialize(buffer *stream, const bool is_peer_global,
 
   bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
   bmp_common_hdr_serialize(stream, BMP_ROUTE_MONITOR, data_size);
-  bmp_per_peer_hdr_serialize(stream, is_peer_global, table_in_pre_policy,
+  bmp_per_peer_hdr_serialize(stream, is_peer_global, table_in_post_policy,
     as4_support, remote_addr, peer_as, peer_bgp_id, ts_sec, ts_usec);
   bmp_put_data(stream, update_msg, update_msg_size);
 }
@@ -434,7 +452,7 @@ bmp_peer_up_notif_msg_serialize(buffer *stream, const bool is_peer_global,
   bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
   bmp_common_hdr_serialize(stream, BMP_PEER_UP_NOTIF, data_size);
   bmp_per_peer_hdr_serialize(stream, is_peer_global,
-    true /* TODO: Hardcoded pre-policy Adj-RIB-In */, as4_support, remote_addr,
+    false /* TODO: Hardcoded pre-policy Adj-RIB-In */, as4_support, remote_addr,
     peer_as, peer_bgp_id, 0, 0); // 0, 0 - No timestamp provided
   bmp_put_ipa(stream, local_addr);
   bmp_put_u16(stream, local_port);
@@ -454,33 +472,192 @@ bmp_peer_down_notif_msg_serialize(buffer *stream, const bool is_peer_global,
   bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + payload_size);
   bmp_common_hdr_serialize(stream, BMP_PEER_DOWN_NOTIF, payload_size);
   bmp_per_peer_hdr_serialize(stream, is_peer_global,
-    true /* TODO: Hardcoded pre-policy adj RIB IN */,  as4_support, remote_addr,
+    false /* TODO: Hardcoded pre-policy adj RIB IN */,  as4_support, remote_addr,
     peer_as, peer_bgp_id, 0, 0); // 0, 0 - No timestamp provided
   bmp_put_data(stream, data, data_size);
 }
 
+
+/*
+ *     BMP tables
+ */
+
+static struct bmp_table *
+bmp_find_table(struct bmp_proto *p, struct rtable *tab)
+{
+  return HASH_FIND(p->table_map, HASH_TABLE, tab);
+}
+
+static struct bmp_table *
+bmp_add_table(struct bmp_proto *p, struct rtable *tab)
+{
+  struct bmp_table *bt = mb_allocz(p->p.pool, sizeof(struct bmp_table));
+  bt->table = tab;
+  rt_lock_table(bt->table);
+
+  HASH_INSERT(p->table_map, HASH_TABLE, bt);
+
+  struct channel_config cc = {
+    .name = "monitor",
+    .channel = &channel_basic,
+    .table = tab->config,
+    .in_filter = FILTER_REJECT,
+    .net_type = tab->addr_type,
+    .ra_mode = RA_ANY,
+    .bmp_hack = 1,
+  };
+
+  bt->channel = proto_add_channel(&p->p, &cc);
+  channel_set_state(bt->channel, CS_UP);
+
+  return bt;
+}
+
 static void
-bmp_peer_up_(struct bmp_proto *p, const struct bgp_proto *bgp,
+bmp_remove_table(struct bmp_proto *p, struct bmp_table *bt)
+{
+  channel_set_state(bt->channel, CS_FLUSHING);
+  channel_set_state(bt->channel, CS_DOWN);
+  proto_remove_channel(&p->p, bt->channel);
+
+  HASH_REMOVE(p->table_map, HASH_TABLE, bt);
+
+  rt_unlock_table(bt->table);
+  bt->table = NULL;
+
+  mb_free(bt);
+}
+
+static inline struct bmp_table *bmp_get_table(struct bmp_proto *p, struct rtable *tab)
+{ return bmp_find_table(p, tab) ?: bmp_add_table(p, tab); }
+
+static inline void bmp_lock_table(struct bmp_proto *p UNUSED, struct bmp_table *bt)
+{ bt->uc++; }
+
+static inline void bmp_unlock_table(struct bmp_proto *p, struct bmp_table *bt)
+{ bt->uc--; if (!bt->uc) bmp_remove_table(p, bt); }
+
+
+/*
+ *     BMP streams
+ */
+
+static inline u32 bmp_stream_key(u32 afi, bool policy)
+{ return afi ^ (policy ? BMP_STREAM_KEY_POLICY : 0); }
+
+static inline u32 bmp_stream_afi(struct bmp_stream *bs)
+{ return bs->key & ~BMP_STREAM_KEY_POLICY; }
+
+static inline bool bmp_stream_policy(struct bmp_stream *bs)
+{ return !!(bs->key & BMP_STREAM_KEY_POLICY); }
+
+static struct bmp_stream *
+bmp_find_stream(struct bmp_proto *p, const struct bgp_proto *bgp, u32 afi, bool policy)
+{
+  return HASH_FIND(p->stream_map, HASH_STREAM, bgp, bmp_stream_key(afi, policy));
+}
+
+static struct bmp_stream *
+bmp_add_stream(struct bmp_proto *p, struct bmp_peer *bp, u32 afi, bool policy, struct rtable *tab, struct bgp_channel *sender)
+{
+  struct bmp_stream *bs = mb_allocz(p->p.pool, sizeof(struct bmp_stream));
+  bs->bgp = bp->bgp;
+  bs->key = bmp_stream_key(afi, policy);
+
+  add_tail(&bp->streams, &bs->n);
+  HASH_INSERT(p->stream_map, HASH_STREAM, bs);
+
+  bs->table = bmp_get_table(p, tab);
+  bmp_lock_table(p, bs->table);
+
+  bs->sender = sender;
+
+  return bs;
+}
+
+static void
+bmp_remove_stream(struct bmp_proto *p, struct bmp_stream *bs)
+{
+  rem_node(&bs->n);
+  HASH_REMOVE(p->stream_map, HASH_STREAM, bs);
+
+  bmp_unlock_table(p, bs->table);
+  bs->table = NULL;
+
+  mb_free(bs);
+}
+
+
+/*
+ *     BMP peers
+ */
+
+static struct bmp_peer *
+bmp_find_peer(struct bmp_proto *p, const struct bgp_proto *bgp)
+{
+  return HASH_FIND(p->peer_map, HASH_PEER, bgp);
+}
+
+static struct bmp_peer *
+bmp_add_peer(struct bmp_proto *p, struct bgp_proto *bgp)
+{
+  struct bmp_peer *bp = mb_allocz(p->p.pool, sizeof(struct bmp_peer));
+  bp->bgp = bgp;
+
+  init_list(&bp->streams);
+
+  HASH_INSERT(p->peer_map, HASH_PEER, bp);
+
+  struct bgp_channel *c;
+  BGP_WALK_CHANNELS(bgp, c)
+  {
+    if (p->monitoring_rib.in_pre_policy && c->c.in_table)
+      bmp_add_stream(p, bp, c->afi, false, c->c.in_table, c);
+
+    if (p->monitoring_rib.in_post_policy && c->c.table)
+      bmp_add_stream(p, bp, c->afi, true, c->c.table, c);
+  }
+
+  return bp;
+}
+
+static void
+bmp_remove_peer(struct bmp_proto *p, struct bmp_peer *bp)
+{
+  struct bmp_stream *bs, *bs_next;
+  WALK_LIST_DELSAFE(bs, bs_next, bp->streams)
+    bmp_remove_stream(p, bs);
+
+  HASH_REMOVE(p->peer_map, HASH_PEER, bp);
+
+  mb_free(bp);
+}
+
+static void
+bmp_peer_up_(struct bmp_proto *p, struct bgp_proto *bgp,
            const byte *tx_open_msg, uint tx_open_length,
            const byte *rx_open_msg, uint rx_open_length)
 {
   if (!p->started)
     return;
 
+  struct bmp_peer *bp = bmp_find_peer(p, bgp);
+  if (bp)
+    return;
+
   TRACE(D_STATES, "Peer up for %s", bgp->p.name);
 
-  // struct bmp_peer_map_key key = bmp_peer_map_key_create(bgp->remote_ip, bgp->remote_as);
-  // bmp_peer_map_insert(&p->bgp_peers, key, (const byte *) &bgp, sizeof (bgp));
+  bp = bmp_add_peer(p, bgp);
 
   bmp_send_peer_up_notif_msg(p, bgp, tx_open_msg, tx_open_length, rx_open_msg, rx_open_length);
 
-  struct bgp_channel *c;
-  BGP_WALK_CHANNELS(bgp, c)
-    bmp_route_monitor_pre_policy_table_in_snapshot(p, c);
+  struct bmp_stream *bs;
+  WALK_LIST(bs, bp->streams)
+    bmp_route_monitor_snapshot(p, bs);
 }
 
 void
-bmp_peer_up(const struct bgp_proto *bgp,
+bmp_peer_up(struct bgp_proto *bgp,
            const byte *tx_open_msg, uint tx_open_length,
            const byte *rx_open_msg, uint rx_open_length)
 {
@@ -490,7 +667,7 @@ bmp_peer_up(const struct bgp_proto *bgp,
 }
 
 static void
-bmp_peer_init(struct bmp_proto *p, const struct bgp_proto *bgp)
+bmp_peer_init(struct bmp_proto *p, struct bgp_proto *bgp)
 {
   struct bgp_conn *conn = bgp->conn;
 
@@ -502,6 +679,8 @@ bmp_peer_init(struct bmp_proto *p, const struct bgp_proto *bgp)
               conn->remote_open_msg, conn->remote_open_length);
 }
 
+
+
 static const struct birdsock *
 bmp_get_birdsock(const struct bgp_proto *bgp)
 {
@@ -591,7 +770,7 @@ bmp_send_peer_up_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
 }
 
 static void
-bmp_route_monitor_put_update(struct bmp_proto *p, const byte *data, size_t length, struct bgp_proto *bgp)
+bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length)
 {
   struct bmp_data_node *upd_msg = mb_alloc(p->update_msg_mem_pool,
                                sizeof (struct bmp_data_node));
@@ -601,11 +780,13 @@ bmp_route_monitor_put_update(struct bmp_proto *p, const byte *data, size_t lengt
   add_tail(&p->update_msg_queue, &upd_msg->n);
 
   /* Save some metadata */
+  struct bgp_proto *bgp = bs->bgp;
   upd_msg->remote_as = bgp->remote_as;
   upd_msg->remote_id = bgp->remote_id;
   upd_msg->remote_ip = bgp->remote_ip;
   upd_msg->timestamp = current_time();
   upd_msg->global_peer = bmp_is_peer_global_instance(bgp);
+  upd_msg->policy = bmp_stream_policy(bs);
 
   /* Kick the commit */
   if (!ev_active(p->update_ev))
@@ -613,31 +794,16 @@ bmp_route_monitor_put_update(struct bmp_proto *p, const byte *data, size_t lengt
 }
 
 static void
-bmp_route_monitor_update_in_notify_(struct bmp_proto *p, struct bgp_channel *c,
-                                       const net_addr *n, const struct rte *new, const struct rte_src *src)
+bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs,
+                        const net_addr *n, const struct rte *new, const struct rte_src *src)
 {
-  struct bgp_proto *bgp = (void *) c->c.proto;
-
-  if (!p->started)
-    return;
-
-  if (p->monitoring_rib.in_pre_policy == false)
-    return;
-
   byte buf[BGP_MAX_EXT_MSG_LENGTH];
-  byte *end = bgp_bmp_encode_rte(c, buf, n, new, src);
-  bmp_route_monitor_put_update(p, buf, end - buf, bgp);
-}
+  byte *end = bgp_bmp_encode_rte(bs->sender, buf, n, new, src);
 
-void
-bmp_route_monitor_update_in_notify(struct channel *C, const net_addr *n,
-                                  const struct rte *new, const struct rte_src *src)
-{
-  struct bgp_channel *c = (void *) C;
-
-  struct bmp_proto *p; node *nx;
-  WALK_LIST2(p, nx, bmp_proto_list, bmp_node)
-    bmp_route_monitor_update_in_notify_(p, c, n, new, src);
+  if (end)
+    bmp_route_monitor_put_update(p, bs, buf, end - buf);
+  else
+    log(L_WARN "%s: Cannot encode update for %N", p->p.name, n);
 }
 
 static void
@@ -648,9 +814,6 @@ bmp_route_monitor_commit(void *p_)
   if (!p->started)
     return;
 
-  if (p->monitoring_rib.in_pre_policy == false)
-    return;
-
   buffer payload
     = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
 
@@ -658,7 +821,7 @@ bmp_route_monitor_commit(void *p_)
   WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue)
   {
     bmp_route_monitor_msg_serialize(&payload,
-      data->global_peer, true /* TODO: Hardcoded pre-policy Adj-Rib-In */,
+      data->global_peer, data->policy,
       data->remote_as, data->remote_id, true,
       data->remote_ip, data->data, data->data_size,
       data->timestamp);
@@ -676,53 +839,37 @@ bmp_route_monitor_commit(void *p_)
 }
 
 static void
-bmp_route_monitor_end_of_rib_msg(struct bmp_proto *p, struct bgp_channel *c)
+bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
 {
-  struct bgp_proto *bgp = (void *) c->c.proto;
-
-  TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", bgp->p.name, c->c.name);
+  TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", bs->bgp->p.name, bs->sender->c.name);
 
   byte rx_end_payload[DEFAULT_MEM_BLOCK_SIZE];
-  byte *pos = bgp_create_end_mark_(c, rx_end_payload + BGP_HEADER_LENGTH);
+  byte *pos = bgp_create_end_mark_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH);
   memset(rx_end_payload + BGP_MSG_HDR_MARKER_POS, 0xff,
         BGP_MSG_HDR_MARKER_SIZE); // BGP UPDATE MSG marker
   put_u16(rx_end_payload + BGP_MSG_HDR_LENGTH_POS, pos - rx_end_payload);
   put_u8(rx_end_payload + BGP_MSG_HDR_TYPE_POS, PKT_UPDATE);
 
-  bmp_route_monitor_put_update(p, rx_end_payload, pos - rx_end_payload, bgp);
+  bmp_route_monitor_put_update(p, bs, rx_end_payload, pos - rx_end_payload);
 }
 
 static void
-bmp_route_monitor_pre_policy_table_in_snapshot(struct bmp_proto *p, struct bgp_channel *c)
+bmp_route_monitor_snapshot(struct bmp_proto *p, struct bmp_stream *bs)
 {
-  if (p->monitoring_rib.in_pre_policy == false)
-    return;
+  struct rtable *tab = bs->table->table;
 
-  struct rtable *tab = c->c.in_table;
-  if (!tab)
-    return;
-
-  size_t cnt = 0;
-  struct proto *P;
-  struct fib_iterator fit;
-  memset(&fit, 0x00, sizeof (fit));
+  struct fib_iterator fit = {};
   FIB_ITERATE_INIT(&fit, &tab->fib);
   FIB_ITERATE_START(&tab->fib, &fit, net, n)
   {
-    P = n->routes->sender->proto;
-    if (P->proto->class != PROTOCOL_BGP)
-      continue;
-
     rte *e;
     for (e = n->routes; e; e = e->next)
-      bmp_route_monitor_update_in_notify_(p, c, n->n.addr, e, e->src);
-
-    ++cnt;
+      if (e->sender == &bs->sender->c)
+       bmp_route_monitor_notify(p, bs, n->n.addr, e, e->src);
   }
   FIB_ITERATE_END;
 
-  if (cnt > 0)
-    bmp_route_monitor_end_of_rib_msg(p, c);
+  bmp_route_monitor_end_of_rib(p, bs);
 }
 
 static void
@@ -751,10 +898,11 @@ bmp_peer_down_(struct bmp_proto *p, const struct bgp_proto *bgp,
   if (!p->started)
     return;
 
-  TRACE(D_STATES, "Peer down for %s", bgp->p.name);
+  struct bmp_peer *bp = bmp_find_peer(p, bgp);
+  if (!bp)
+    return;
 
-  // struct bmp_peer_map_key key = bmp_peer_map_key_create(bgp->remote_ip, bgp->remote_as);
-  // bmp_peer_map_remove(&p->bgp_peers, key);
+  TRACE(D_STATES, "Peer down for %s", bgp->p.name);
 
   buffer payload = bmp_buffer_alloc(p->buffer_mpool, 1 + BGP_HEADER_LENGTH + msg_length);
 
@@ -786,6 +934,8 @@ bmp_peer_down_(struct bmp_proto *p, const struct bgp_proto *bgp,
   bmp_send_peer_down_notif_msg(p, bgp, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
 
   bmp_buffer_free(&payload);
+
+  bmp_remove_peer(p, bp);
 }
 
 void
@@ -819,6 +969,38 @@ bmp_send_termination_msg(struct bmp_proto *p,
   bmp_buffer_free(&stream);
 }
 
+int
+bmp_preexport(struct channel *C UNUSED, rte *e)
+{
+  /* Reject non-direct routes */
+  if (e->src->proto != e->sender->proto)
+    return -1;
+
+  /* Reject non-BGP routes */
+  if (e->sender->channel != &channel_bgp)
+    return -1;
+
+  return 1;
+}
+
+static void
+bmp_rt_notify(struct proto *P, struct channel *c, struct network *net,
+               struct rte *new, struct rte *old)
+{
+  struct bmp_proto *p = (void *) P;
+
+  struct bgp_channel *src = (void *) (new ?: old)->sender;
+  struct bgp_proto *bgp = (void *) src->c.proto;
+  bool policy = (c->table == src->c.table);
+
+  struct bmp_stream *bs = bmp_find_stream(p, bgp, src->afi, policy);
+  if (!bs)
+    return;
+
+  bmp_route_monitor_notify(p, bs, net->n.addr, new, (new ?: old)->src);
+}
+
+
 /**
  * bmp_startup - enter established state
  * @p: BMP instance
@@ -835,6 +1017,8 @@ bmp_startup(struct bmp_proto *p)
 
   TRACE(D_EVENTS, "BMP session established");
 
+  proto_notify_state(&p->p, PS_UP);
+
   /* Send initiation message */
   buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
   bmp_init_msg_serialize(&payload, p->sys_descr, p->sys_name);
@@ -846,15 +1030,14 @@ bmp_startup(struct bmp_proto *p)
   WALK_LIST(peer, proto_list)
     if ((peer->proto->class == PROTOCOL_BGP) && (peer->proto_state == PS_UP))
       bmp_peer_init(p, (struct bgp_proto *) peer);
-
-  proto_notify_state(&p->p, PS_UP);
 }
 
 /**
  * bmp_down - leave established state
  * @p: BMP instance
  *
- * The bgp_down() function is called when the BMP session fails.
+ * The bgp_down() function is called when the BMP session fails. The caller is
+ * responsible for changing protocol state.
  */
 static void
 bmp_down(struct bmp_proto *p)
@@ -864,7 +1047,15 @@ bmp_down(struct bmp_proto *p)
 
   TRACE(D_EVENTS, "BMP session closed");
 
-  proto_notify_state(&p->p, PS_START);
+  /* Unregister existing peer structures */
+  HASH_WALK_DELSAFE(p->peer_map, next, bp)
+  {
+    bmp_remove_peer(p, bp);
+  }
+  HASH_WALK_END;
+
+  /* Removing peers should also remove all streams and tables */
+  ASSERT(!p->peer_map.count && !p->stream_map.count && !p->table_map.count);
 }
 
 /**
@@ -936,6 +1127,8 @@ bmp_sock_err(sock *sk, int err)
 
   bmp_close_socket(p);
   tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME);
+
+  proto_notify_state(&p->p, PS_START);
 }
 
 /* BMP connect timeout event - switch from Idle/Connect state to Connect state */
@@ -983,6 +1176,9 @@ bmp_init(struct proto_config *CF)
   struct bmp_proto *p = (void *) P;
   struct bmp_config *cf = (void *) CF;
 
+  P->rt_notify = bmp_rt_notify;
+  P->preexport = bmp_preexport;
+
   p->cf = cf;
   p->local_addr = cf->local_addr;
   p->station_ip = cf->station_ip;
@@ -990,6 +1186,7 @@ bmp_init(struct proto_config *CF)
   strcpy(p->sys_descr, cf->sys_descr);
   strcpy(p->sys_name, cf->sys_name);
   p->monitoring_rib.in_pre_policy = cf->monitoring_rib_in_pre_policy;
+  p->monitoring_rib.in_post_policy = cf->monitoring_rib_in_post_policy;
 
   return P;
 }
@@ -1012,7 +1209,9 @@ bmp_start(struct proto *P)
   p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0);
   p->sk = NULL;
 
-  // bmp_peer_map_init(&p->bgp_peers, p->map_mem_pool);
+  HASH_INIT(p->peer_map, P->pool, 4);
+  HASH_INIT(p->stream_map, P->pool, 4);
+  HASH_INIT(p->table_map, P->pool, 4);
 
   init_list(&p->tx_queue);
   init_list(&p->update_msg_queue);
@@ -1033,7 +1232,7 @@ bmp_shutdown(struct proto *P)
   if (p->started)
   {
     bmp_send_termination_msg(p, BMP_TERM_REASON_ADM);
-    p->started = false;
+    bmp_down(p);
   }
 
   p->sock_err = 0;
@@ -1054,7 +1253,8 @@ bmp_reconfigure(struct proto *P, struct proto_config *CF)
     || !ipa_equal(new->local_addr, old->local_addr)
     || !ipa_equal(new->station_ip, old->station_ip)
     || (new->station_port != old->station_port)
-    || (new->monitoring_rib_in_pre_policy != old->monitoring_rib_in_pre_policy);
+    || (new->monitoring_rib_in_pre_policy != old->monitoring_rib_in_pre_policy)
+    || (new->monitoring_rib_in_post_policy != old->monitoring_rib_in_post_policy);
 
   /* If there is any change, restart the protocol */
   if (needs_restart)
index 0c35575453a1676e846372b39d6ec30a5e3da6ef..9b4e2a7356a00e394006db614d9966fb838e0ce8 100644 (file)
@@ -39,6 +39,7 @@ struct bmp_config {
   ip_addr station_ip;                 // Monitoring station address
   u16 station_port;                   // Monitoring station TCP port
   bool monitoring_rib_in_pre_policy;  // Route monitoring pre-policy Adj-Rib-In
+  bool monitoring_rib_in_post_policy;  // Route monitoring post-policy Adj-Rib-In
 };
 
 /* Forward declarations */
@@ -49,6 +50,11 @@ struct bmp_proto {
   struct proto p;                  // Parent proto
   const struct bmp_config *cf;     // Shortcut to BMP configuration
   node bmp_node;                   // Node in bmp_proto_list
+
+  HASH(struct bmp_peer) peer_map;
+  HASH(struct bmp_stream) stream_map;
+  HASH(struct bmp_table) table_map;
+
   sock *sk;                        // TCP connection
   event *tx_ev;                    // TX event
   event *update_ev;                // Update event
@@ -71,6 +77,28 @@ struct bmp_proto {
   int sock_err;                    // Last socket error code
 };
 
+struct bmp_peer {
+  struct bgp_proto *bgp;
+  struct bmp_peer *next;
+  list streams;
+};
+
+struct bmp_stream {
+  node n;
+  struct bgp_proto *bgp;
+  u32 key;
+  struct bmp_stream *next;
+  struct bmp_table *table;
+  struct bgp_channel *sender;
+};
+
+struct bmp_table {
+  struct rtable *table;
+  struct bmp_table *next;
+  struct channel *channel;
+  u32 uc;
+};
+
 
 #ifdef CONFIG_BMP
 
@@ -78,17 +106,10 @@ struct bmp_proto {
  * bmp_peer_up - send notification that BGP peer connection is established
  */
 void
-bmp_peer_up(const struct bgp_proto *bgp,
+bmp_peer_up(struct bgp_proto *bgp,
            const byte *tx_open_msg, uint tx_open_length,
            const byte *rx_open_msg, uint rx_open_length);
 
-/**
- * bmp_route_monitor_update_in_notify - send notification that rte vas received
- */
-void
-bmp_route_monitor_update_in_notify(struct channel *C, const net_addr *n,
-                                  const struct rte *new, const struct rte_src *src);
-
 /**
  * bmp_peer_down - send notification that BGP peer connection is not in
  * established state
index 5a5e08124e7bdd065e4d865773d5d835209dd5a4..acb0c4d9ad91f20d359469cedf62679026d5df1e 100644 (file)
@@ -27,7 +27,6 @@ bmp_proto_start: proto_start BMP {
      this_proto = proto_config_new(&proto_bmp, $1);
      BMP_CFG->sys_descr = "Not defined";
      BMP_CFG->sys_name = "Not defined";
-     BMP_CFG->monitoring_rib_in_pre_policy = false;
    }
  ;
 
@@ -69,6 +68,9 @@ bmp_proto:
  | bmp_proto MONITORING RIB IN PRE_POLICY bool ';' {
      BMP_CFG->monitoring_rib_in_pre_policy = $6;
    }
+ | bmp_proto MONITORING RIB IN POST_POLICY bool ';' {
+     BMP_CFG->monitoring_rib_in_post_policy = $6;
+   }
  ;
 
 CF_CODE