From: Ondrej Zajicek Date: Tue, 3 Dec 2024 17:22:14 +0000 (+0100) Subject: Merge commit 'e6a100b31a7637ee739338e4b933367707ec931f' into thread-next X-Git-Tag: v3.0.0~39 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7cbcb7b23092b8552744de9aa8b73c318c14a060;p=thirdparty%2Fbird.git Merge commit 'e6a100b31a7637ee739338e4b933367707ec931f' into thread-next --- 7cbcb7b23092b8552744de9aa8b73c318c14a060 diff --cc proto/bgp/bgp.h index 0d06065fd,4cef8caf4..05aae612b --- a/proto/bgp/bgp.h +++ b/proto/bgp/bgp.h @@@ -674,36 -627,30 +674,36 @@@ int bgp_encode_mp_reach_mrt(struct bgp_ const char * bgp_attr_name(uint code); int bgp_encode_attrs(struct bgp_write_state *s, ea_list *attrs, byte *buf, byte *end); ea_list * bgp_decode_attrs(struct bgp_parse_state *s, byte *data, uint len); -void bgp_finish_attrs(struct bgp_parse_state *s, rta *a); - -void bgp_init_bucket_table(struct bgp_channel *c); -void bgp_free_bucket_table(struct bgp_channel *c); -void bgp_free_bucket(struct bgp_channel *c, struct bgp_bucket *b); -void bgp_defer_bucket(struct bgp_channel *c, struct bgp_bucket *b); -void bgp_withdraw_bucket(struct bgp_channel *c, struct bgp_bucket *b); - -void bgp_init_prefix_table(struct bgp_channel *c); -void bgp_free_prefix_table(struct bgp_channel *c); -void bgp_free_prefix(struct bgp_channel *c, struct bgp_prefix *bp); - -int bgp_rte_better(struct rte *, struct rte *); -int bgp_rte_mergable(rte *pri, rte *sec); -int bgp_rte_recalculate(rtable *table, net *net, rte *new, rte *old, rte *old_best); -struct rte *bgp_rte_modify_stale(struct rte *r, struct linpool *pool); -u32 bgp_rte_igp_metric(struct rte *); -void bgp_rt_notify(struct proto *P, struct channel *C, net *n, rte *new, rte *old); +void bgp_finish_attrs(struct bgp_parse_state *s, ea_list **to); + +void bgp_setup_out_table(struct bgp_channel *c); + +void bgp_init_pending_tx(struct bgp_channel *c); +void bgp_free_pending_tx(struct bgp_channel *c); +void bgp_tx_resend(struct bgp_proto *p, struct bgp_channel *c); + +void bgp_withdraw_bucket(struct bgp_ptx_private *c, struct bgp_bucket *b); +int bgp_done_bucket(struct bgp_ptx_private *c, struct bgp_bucket *b); + +void bgp_done_prefix(struct bgp_ptx_private *c, struct bgp_prefix *px, struct bgp_bucket *buck); + +int bgp_rte_better(const rte *, const rte *); +int bgp_rte_mergable(const rte *pri, const rte *sec); +int bgp_rte_recalculate(struct rtable_private *table, net *net, struct rte_storage *new, struct rte_storage *old, struct rte_storage *old_best); +void bgp_rte_modify_stale(void *bgp_channel); +u32 bgp_rte_igp_metric(const rte *); +void bgp_rt_notify(struct proto *P, struct channel *C, const net_addr *n, rte *new, const rte *old); int bgp_preexport(struct channel *, struct rte *); -int bgp_get_attr(const struct eattr *e, byte *buf, int buflen); -void bgp_get_route_info(struct rte *, byte *buf); -int bgp_total_aigp_metric_(rte *e, u64 *metric, const struct adata **ad); +void bgp_get_route_info(const rte *, byte *); +int bgp_total_aigp_metric_(const rte *e, u64 *metric, const struct adata **ad); + +static inline struct bgp_proto *bgp_rte_proto(const rte *rte) +{ + return (rte->src->owner->class == &bgp_rte_owner_class) ? + SKIP_BACK(struct bgp_proto, p.sources, rte->src->owner) : NULL; +} - byte * bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, const struct rte *new); -byte * bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, byte *end, const net_addr *n, const struct rte *new, const struct rte_src *src); ++byte * bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, byte *end, const struct rte *new); #define BGP_AIGP_METRIC 1 #define BGP_AIGP_MAX U64(0xffffffffffffffff) diff --cc proto/bgp/packets.c index 372b775d4,ec648231a..33f46a8fd --- a/proto/bgp/packets.c +++ b/proto/bgp/packets.c @@@ -2480,29 -2482,20 +2480,27 @@@ bgp_create_mp_unreach(struct bgp_write_ #ifdef CONFIG_BMP static byte * - bgp_create_update_bmp(ea_list *channel_ea, struct bgp_proto *bgp_p, byte *buf, struct bgp_bucket *buck, bool update) -bgp_create_update_bmp(struct bgp_channel *c, byte *buf, byte *end, struct bgp_bucket *buck, bool update) ++bgp_create_update_bmp(ea_list *channel_ea, struct bgp_proto *bgp_p, byte *buf, byte *end, struct bgp_bucket *buck, bool update) { - struct bgp_proto *p = (void *) c->c.proto; - byte *res = NULL; + struct bgp_channel *c; + u32 c_id = ea_get_int(channel_ea, &ea_channel_id, 0); + BGP_WALK_CHANNELS(bgp_p, c) + if (c->c.id == c_id) + break; - byte *end = buf + (BGP_MAX_EXT_MSG_LENGTH - BGP_HEADER_LENGTH); - struct lp_state tmpp; - lp_save(tmp_linpool, &tmpp); + byte *res = NULL; - /* FIXME: must be a bit shorter */ - struct bgp_caps *peer = p->conn->remote_caps; + struct bgp_caps *peer = bgp_p->conn->remote_caps; const struct bgp_af_caps *rem = bgp_find_af_caps(peer, c->afi); + struct bgp_ptx_private ptx = { + .bmp = 1, + .c = c, + }; + struct bgp_write_state s = { - .proto = p, - .channel = c, + .proto = bgp_p, + .ptx = &ptx, .pool = tmp_linpool, .mp_reach = (c->afi != BGP_AF_IPV4) || rem->ext_next_hop, .as4_session = 1, @@@ -2538,31 -2533,33 +2536,31 @@@ bgp_bmp_prepare_bgp_hdr(byte *buf, cons } byte * - bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, const struct rte *new) -bgp_bmp_encode_rte(struct bgp_channel *c, byte *buf, byte *end, const net_addr *n, - const struct rte *new, const struct rte_src *src) ++bgp_bmp_encode_rte(ea_list *c, struct bgp_proto *bgp_p, byte *buf, byte *end, const struct rte *new) { -// struct bgp_proto *p = (void *) c->c.proto; byte *pkt = buf + BGP_HEADER_LENGTH; - ea_list *attrs = new ? new->attrs->eattrs : NULL; - uint ea_size = new ? (sizeof(ea_list) + attrs->count * sizeof(eattr)) : 0; - uint bucket_size = sizeof(struct bgp_bucket) + ea_size; - uint prefix_size = sizeof(struct bgp_prefix) + n->length; + uint ea_size = new->attrs ? (sizeof(ea_list) + new->attrs->count * sizeof(eattr)) : 0; + uint prefix_size = sizeof(struct bgp_prefix) + new->net->length; + + struct lp_state *tmpp = lp_save(tmp_linpool); - /* Sham bucket */ - struct bgp_bucket *b = alloca(bucket_size); - *b = (struct bgp_bucket) { }; + /* Temporary bucket */ + struct bgp_bucket *b = tmp_allocz(sizeof(struct bgp_bucket) + ea_size); + b->bmp = 1; init_list(&b->prefixes); - if (attrs) - memcpy(b->eattrs, attrs, ea_size); + if (new->attrs) + memcpy(b->eattrs, new->attrs, ea_size); - /* Sham prefix */ - struct bgp_prefix *px = alloca(prefix_size); - *px = (struct bgp_prefix) { }; - px->path_id = (u32) src->private_id; - net_copy(px->net, n); + /* Temporary prefix */ + struct bgp_prefix *px = tmp_allocz(prefix_size); + px->src = tmp_allocz(sizeof(struct rte_src)); + memcpy(px->src, new->src, sizeof(struct rte_src)); + px->ni = NET_TO_INDEX(new->net); add_tail(&b->prefixes, &px->buck_node); - byte *end = bgp_create_update_bmp(c, bgp_p, pkt, b, !!new->attrs); - end = bgp_create_update_bmp(c, pkt, end, b, !!new); ++ end = bgp_create_update_bmp(c, bgp_p, pkt, end, b, !!new->attrs); if (end) bgp_bmp_prepare_bgp_hdr(buf, end - buf, PKT_UPDATE); diff --cc proto/bmp/bmp.c index 011b276e7,6dcf1562a..05c718254 --- a/proto/bmp/bmp.c +++ b/proto/bmp/bmp.c @@@ -197,23 -200,43 +199,45 @@@ enum bmp_term_reason #define IP4_MAX_TTL 255 - #define IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(expr, msg, rv...) \ - do { \ - if ((expr)) \ - { \ - log(L_WARN "[BMP] " msg); \ - return rv; \ - } \ - } while (0) + #define bmp_buffer_need(b, sz) ASSERT_DIE((b)->pos + (sz) <= (b)->end) + + // Idea for following macros has been taken from |proto/mrt/mrt.c| + #define BMP_DEFINE_PUT_FUNC(S, T) \ + static inline void \ + bmp_put_##S(buffer *b, const T x) \ + { \ + bmp_buffer_need(b, sizeof(T)); \ + put_##S(b->pos, x); \ + b->pos += sizeof(T); \ + } + BMP_DEFINE_PUT_FUNC(u8, u8) + BMP_DEFINE_PUT_FUNC(u16, u16) + BMP_DEFINE_PUT_FUNC(u32, u32) + BMP_DEFINE_PUT_FUNC(u64, u64) + BMP_DEFINE_PUT_FUNC(ip4, ip4_addr) + BMP_DEFINE_PUT_FUNC(ip6, ip6_addr) - #define IF_PTR_IS_NULL_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(p, msg, rv...) \ - do { \ - IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(!(p), msg, rv); \ - } while (0) + static inline void + bmp_put_data(buffer *b, const void *src, const size_t len) + { + ASSERT_DIE(b->pos + len <= b->end); + memcpy(b->pos, src, len); + b->pos += len; + } + + static inline buffer + bmp_default_buffer(struct bmp_proto *p) + { + return (buffer) { + .start = p->msgbuf, + .pos = p->msgbuf, + .end = p->msgbuf + sizeof p->msgbuf, + }; + } +static const struct ea_class *bgp_next_hop_ea_class = NULL; + static void bmp_connected(struct birdsock *sk); static void bmp_sock_err(sock *sk, int err); static void bmp_close_socket(struct bmp_proto *p); @@@ -228,18 -248,44 +252,44 @@@ bmp_send_peer_up_notif_msg(struct bmp_p static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs); - // Stores necessary any data in list - struct bmp_data_node { - node n; - byte *data; - size_t data_size; - - u32 remote_as; - u32 remote_id; - ip_addr remote_ip; - btime timestamp; - bool global_peer; - bool policy; + // Stores TX data + struct bmp_tx_buffer { + struct bmp_tx_buffer *next; + byte *pos; + byte data[]; + }; + + #define bmp_tx_remains(b) (((byte *) (b) + page_size) - (b)->pos) + + /* A dummy resource to accurately show memory pages allocated for pending TX */ + struct bmp_tx_resource { + resource r; + struct bmp_proto *p; + }; + + static void + bmp_tx_resource_free(resource *r UNUSED) {} + + static void -bmp_tx_resource_dump(resource *r UNUSED) {} ++bmp_tx_resource_dump(resource *r UNUSED, uint indent UNUSED) {} + + static struct resmem + bmp_tx_resource_memsize(resource *r) + { + struct bmp_proto *p = SKIP_BACK(struct bmp_tx_resource, r, r)->p; + + return (struct resmem) { + .effective = p->tx_pending_count * page_size, + .overhead = sizeof(struct bmp_tx_resource), + }; + } + + static struct resclass bmp_tx_resource_class = { + .name = "BMP TX buffers", + .size = sizeof(struct bmp_tx_resource), + .free = bmp_tx_resource_free, + .dump = bmp_tx_resource_dump, + .memsize = bmp_tx_resource_memsize, }; static void @@@ -282,17 -329,58 +333,58 @@@ bmp_schedule_tx_packet(struct bmp_prot { ASSERT(p->started); - struct bmp_data_node *tx_data = mb_allocz(p->tx_mem_pool, sizeof (struct bmp_data_node)); - tx_data->data = mb_allocz(p->tx_mem_pool, size); - memcpy(tx_data->data, payload, size); - tx_data->data_size = size; - add_tail(&p->tx_queue, &tx_data->n); - - if (sk_tx_buffer_empty(p->sk) - && !ev_active(p->tx_ev)) + while (size) { + if (!p->tx_last || !bmp_tx_remains(p->tx_last)) + { + if (p->tx_pending_count >= p->tx_pending_limit) - return ev_schedule(p->tx_overflow_event); ++ return ev_send_loop(p->p.loop, p->tx_overflow_event); + + p->tx_pending_count++; + + struct bmp_tx_buffer *btb = alloc_page(); + btb->pos = btb->data; + btb->next = NULL; + + if (p->tx_last) + { + ASSERT_DIE(!p->tx_last->next); + p->tx_last->next = btb; + } + else + ASSERT_DIE(p->tx_pending_count == 1); + + p->tx_last = btb; + + if (!p->tx_pending) + p->tx_pending = btb; + } + + size_t cpylen = bmp_tx_remains(p->tx_last); + if (size < cpylen) + cpylen = size; + + memcpy(p->tx_last->pos, payload, cpylen); + p->tx_last->pos += cpylen; + + payload += cpylen; + size -= cpylen; + } + + if (!p->sk->tbuf && !ev_active(p->tx_ev)) - ev_schedule(p->tx_ev); + ev_send_loop(p->p.loop, p->tx_ev); + } + + static void + bmp_tx_buffer_free(struct bmp_proto *p, struct bmp_tx_buffer *btb) + { + if (btb == p->tx_last) + { + p->tx_last = NULL; + ASSERT_DIE(!p->tx_pending_count); } + + free_page(btb); } static void @@@ -303,40 -391,33 +395,33 @@@ bmp_fire_tx(void *p_ if (!p->started) return; - IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL( - EMPTY_LIST(p->tx_queue), - "Called BMP TX event handler when there is not any data to send" - ); - - size_t cnt = 0; // Counts max packets which we want to send per TX slot - struct bmp_data_node *tx_data; - struct bmp_data_node *tx_data_next; - WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue) + int cnt = 0; + for (struct bmp_tx_buffer *btb; btb = p->tx_pending; ) { - if (tx_data->data_size > p->sk->tbsize) - { - sk_set_tbsize(p->sk, tx_data->data_size); - } + ASSERT_DIE(!p->sk->tbuf); + + p->sk->tbuf = btb->data; + u64 sz = btb->pos - btb->data; + + p->tx_sent += sz; + p->tx_sent_total += sz; + + if (p->tx_pending == p->tx_last) + p->tx_last = NULL; - size_t data_size = tx_data->data_size; - memcpy(p->sk->tbuf, tx_data->data, data_size); - mb_free(tx_data->data); - rem_node((node *) tx_data); - mb_free(tx_data); + p->tx_pending = btb->next; + p->tx_pending_count--; - if (sk_send(p->sk, data_size) <= 0) + if (sk_send(p->sk, sz) <= 0) return; - // BMP packets should be treat with lowest priority when scheduling sending - // packets to target. That's why we want to send max. 32 packets per event - // call - if (++cnt > 32) + p->sk->tbuf = NULL; + bmp_tx_buffer_free(p, btb); + + if (cnt++ > 1024) { if (!ev_active(p->tx_ev)) - { - ev_schedule(p->tx_ev); + ev_send_loop(p->p.loop, p->tx_ev); - } - return; } } @@@ -805,106 -888,65 +930,68 @@@ bmp_send_peer_up_notif_msg(struct bmp_p { ASSERT(p->started); - const struct birdsock *sk = bmp_get_birdsock_ext(bgp); - if (!sk) - { - log(L_WARN "%s: No BGP socket", p->p.name); - return; - } - + const int rem_as = ea_get_int(bgp, &ea_bgp_rem_as, 0); + const int rem_id = ea_get_int(bgp, &ea_bgp_rem_id, 0); const bool is_global_instance_peer = bmp_is_peer_global_instance(bgp); - buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE); + + buffer payload = bmp_default_buffer(p); bmp_peer_up_notif_msg_serialize(&payload, is_global_instance_peer, - bgp->remote_as, bgp->remote_id, 1, - sk->saddr, sk->daddr, sk->sport, sk->dport, tx_data, tx_data_size, - rx_data, rx_data_size); + rem_as, rem_id, 1, + sk->saddr, sk->daddr, sk->sport, sk->dport, tx_data, rx_data); - bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload)); - bmp_buffer_free(&payload); + bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start); } static void - bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp) + bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, byte *data, size_t length, btime timestamp) { - struct bmp_data_node *upd_msg = mb_allocz(p->update_msg_mem_pool, - sizeof (struct bmp_data_node)); - upd_msg->data = mb_alloc(p->update_msg_mem_pool, length); - memcpy(upd_msg->data, data, length); - upd_msg->data_size = length; - - add_tail(&p->update_msg_queue, &upd_msg->n); - - /* Save some metadata */ - struct bgp_proto *bgp = bs->bgp; + ea_list *bgp = bs->bgp; - upd_msg->remote_as = ea_get_int(bgp, &ea_bgp_rem_as, 0); - upd_msg->remote_id = ea_get_int(bgp, &ea_bgp_rem_id, 0); - upd_msg->remote_ip = ea_get_ip(bgp, &ea_bgp_rem_ip, IPA_NONE); - upd_msg->timestamp = timestamp; - upd_msg->global_peer = bmp_is_peer_global_instance(bgp); - upd_msg->policy = bmp_stream_policy(bs); - - /* Kick the commit */ - if (!ev_active(p->update_ev)) - ev_send_loop(p->p.loop, p->update_ev); + const byte *start = bmp_route_monitor_msg_serialize(p, + bmp_is_peer_global_instance(bgp), + bmp_stream_policy(bs), - bgp->remote_as, - bgp->remote_id, ++ ea_get_int(bgp, &ea_bgp_rem_as, 0), ++ ea_get_int(bgp, &ea_bgp_rem_id, 0), + true, - bgp->remote_ip, ++ ea_get_ip(bgp, &ea_bgp_rem_ip, IPA_NONE), + data, + length, + timestamp + ); + + bmp_schedule_tx_packet(p, start, (data - start) + length); } static void -bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs, - const net_addr *n, const struct rte *new, const struct rte_src *src) +bmp_route_monitor_notify(struct bmp_proto *p, struct bgp_proto *bgp_p, u32 afi, bool policy, const rte *new, ea_list *old) { + /* Idempotent update */ + if ((old == new->attrs) || old && new->attrs && ea_same(old, new->attrs)) + return; + + /* No stream, probably flushed already */ + struct bmp_stream *bs = bmp_find_stream(p, bgp_p, afi, policy); + if (!bs) + return; + - byte buf[BGP_MAX_EXT_MSG_LENGTH]; - byte *end = bgp_bmp_encode_rte(bs->sender, bgp_p, buf, new); + byte *bufend = &p->msgbuf[sizeof p->msgbuf]; + byte *begin = bufend - BGP_MAX_EXT_MSG_LENGTH; - byte *end = bgp_bmp_encode_rte(bs->sender, begin, bufend, n, new, src); ++ byte *end = bgp_bmp_encode_rte(bs->sender, bgp_p, begin, bufend, new); - btime delta_t = new ? current_time() - new->lastmod : 0; + btime delta_t = new->attrs ? current_time() - new->lastmod : 0; btime timestamp = current_real_time() - delta_t; if (end) - bmp_route_monitor_put_update(p, bs, buf, end - buf, timestamp); + bmp_route_monitor_put_update(p, bs, begin, end - begin, timestamp); else - log(L_WARN "%s: Cannot encode update for %N", p->p.name, n); + log(L_WARN "%s: Cannot encode update for %N", p->p.name, new->net); } - static void - bmp_route_monitor_commit(void *p_) - { - struct bmp_proto *p = p_; - - if (!p->started) - return; - - buffer payload - = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE); - - struct bmp_data_node *data, *data_next; - WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue) - { - bmp_route_monitor_msg_serialize(&payload, - data->global_peer, data->policy, - data->remote_as, data->remote_id, true, - data->remote_ip, data->data, data->data_size, - data->timestamp); - - bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload)); - - bmp_buffer_flush(&payload); - - mb_free(data->data); - rem_node(&data->n); - mb_free(data); - } - - bmp_buffer_free(&payload); - } - static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs) { - TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", bs->bgp->p.name, bs->sender->c.name); + TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", ea_get_adata(bs->bgp, &ea_name)->data, ea_get_adata(bs->sender, &ea_name)->data); - byte rx_end_payload[DEFAULT_MEM_BLOCK_SIZE]; + byte *rx_end_payload = p->msgbuf + BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE; - byte *pos = bgp_create_end_mark_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH); + byte *pos = bgp_create_end_mark_ea_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH); memset(rx_end_payload + BGP_MSG_HDR_MARKER_POS, 0xff, BGP_MSG_HDR_MARKER_SIZE); // BGP UPDATE MSG marker put_u16(rx_end_payload + BGP_MSG_HDR_LENGTH_POS, pos - rx_end_payload); @@@ -914,38 -956,19 +1001,35 @@@ } static void -bmp_send_peer_down_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp, +bmp_send_peer_down_notif_msg(struct bmp_proto *p, ea_list *bgp, - const byte *data, const size_t data_size) + const struct bmp_peer_down_info *info) { ASSERT(p->started); - const struct bgp_caps *remote_caps = bmp_get_bgp_remote_caps_ext(bgp); + //const struct bgp_caps *remote_caps = bmp_get_bgp_remote_caps_ext(bgp); + int remote_caps = ea_get_int(bgp, &ea_bgp_as4_session, 0); + int in_state = ea_get_int(bgp, &ea_bgp_in_conn_state, 0); + int out_state = ea_get_int(bgp, &ea_bgp_out_conn_state, 0); + int in_as4 = ea_get_int(bgp, &ea_bgp_as4_in_conn, 0); + int out_as4 = ea_get_int(bgp, &ea_bgp_as4_out_conn, 0); + + if (in_state && in_as4) + remote_caps = in_as4; + else if (out_state && out_as4) + remote_caps = out_as4; + bool is_global_instance_peer = bmp_is_peer_global_instance(bgp); - buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE); + buffer payload = bmp_default_buffer(p); - bmp_peer_down_notif_msg_serialize(&payload, is_global_instance_peer, - bgp->remote_as, bgp->remote_id, - remote_caps ? remote_caps->as4_support : bgp->as4_session, - bgp->remote_ip, info); + bmp_peer_down_notif_msg_serialize( + &payload, + is_global_instance_peer, + ea_get_int(bgp, &ea_bgp_rem_as, 0), + ea_get_int(bgp, &ea_bgp_rem_id, 0), + remote_caps, + *((ip_addr *) ea_get_adata(bgp, &ea_bgp_rem_ip)->data), - data, - data_size ++ info + ); - bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload)); - - bmp_buffer_free(&payload); + bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start); } static void @@@ -958,15 -982,19 +1042,19 @@@ bmp_peer_down_(struct bmp_proto *p, ea_ if (!bp) return; - TRACE(D_STATES, "Peer down for %s", bgp->p.name); + TRACE(D_STATES, "Peer down for %s", ea_find(bgp, &ea_name)->u.ad->data); - uint bmp_code = 0; - uint fsm_code = 0; + struct bmp_peer_down_info info = { - .err_code = err_code, - .err_subcode = err_subcode, - .data = data, - .length = length, ++ .err_code = bscad->notify_code, ++ .err_subcode = bscad->notify_subcode, ++ .data = bscad->data, ++ .length = bscad->ad.length - sizeof *bscad + sizeof bscad->ad, + }; - switch (err_class) + switch (bscad->last_error_class) { case BE_BGP_RX: - bmp_code = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION; + info.reason = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION; break; case BE_BGP_TX: @@@ -1020,51 -1038,52 +1090,52 @@@ bmp_send_termination_msg(struct bmp_pro bmp_put_u16(&stream, BMP_TERM_INFO_REASON); bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason bmp_put_u16(&stream, reason); - memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream)); - IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL( - sk_send(p->sk, bmp_buffer_pos(&stream)) < 0, - "Failed to send BMP termination message" - ); - bmp_buffer_free(&stream); + if (p->sk->tbuf) + bmp_tx_buffer_free(p, SKIP_BACK(struct bmp_tx_buffer, data, p->sk->tbuf)); + + p->sk->tbuf = stream.start; + if (sk_send(p->sk, stream.pos - stream.start) < 0) + log(L_WARN "%s: Cannot send BMP termination message", p->p.name); + p->sk->tbuf = NULL; } -int -bmp_preexport(struct channel *C UNUSED, rte *e) +static void +bmp_split_policy(struct bmp_proto *p, const rte *new, const rte *old) { - /* Reject non-direct routes */ - if (e->src->proto != e->sender->proto) - return -1; + rte loc = *(new ?: old); - /* Reject non-BGP routes */ - if (e->sender->channel != &channel_bgp) - return -1; + struct proto *rte_proto = (struct proto*) SKIP_BACK(struct proto, sources, loc.src->owner); + struct bgp_proto *bgp = (struct bgp_proto *) rte_proto; + struct bgp_channel *src_ch = SKIP_BACK(struct bgp_channel, c.in_req, loc.sender->req); - return 1; -} + /* Ignore piped routes */ + if (src_ch->c.proto != rte_proto) + return; -static void -bmp_rt_notify(struct proto *P, struct channel *c, struct network *net, - struct rte *new, struct rte *old) -{ - struct bmp_proto *p = (void *) P; + /* Ignore non-BGP routes */ + if (rte_proto->proto != &proto_bgp) + return; - struct bgp_channel *src = (void *) (new ?: old)->sender; - struct bgp_proto *bgp = (void *) src->c.proto; - bool policy = (c->table == src->c.table); + /* Checking the pre policy */ + if (p->monitoring_rib.in_pre_policy) + { + /* Compute the pre policy attributes */ + loc.attrs = new ? ea_strip_to(new->attrs, BIT32_ALL(EALS_PREIMPORT)) : NULL; + ea_list *old_attrs = old ? ea_strip_to(old->attrs, BIT32_ALL(EALS_PREIMPORT)) : NULL; - /* - * We assume that we receive peer_up before the first route and peer_down - * synchronously with BGP session close. So if bmp_stream exists, the related - * BGP session is up and could be accessed. That may not be true in - * multithreaded setup. - */ + bmp_route_monitor_notify(p, bgp, src_ch->afi, false, &loc, old_attrs); + } - struct bmp_stream *bs = bmp_find_stream(p, bgp, src->afi, policy); - if (!bs) - return; + /* Checking the post policy */ + if (p->monitoring_rib.in_post_policy) + { + /* Compute the post policy attributes */ + loc.attrs = new ? ea_normalize(new->attrs, 0) : NULL; + ea_list *old_attrs = old ? ea_normalize(old->attrs, 0) : NULL; - bmp_route_monitor_notify(p, bs, net->n.addr, new, (new ?: old)->src); + bmp_route_monitor_notify(p, bgp, src_ch->afi, true, &loc, old_attrs); + } } static void @@@ -1145,44 -1134,15 +1216,43 @@@ bmp_startup(struct bmp_proto *p proto_notify_state(&p->p, PS_UP); /* Send initiation message */ - buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE); + buffer payload = bmp_default_buffer(p); bmp_init_msg_serialize(&payload, p->sys_descr, p->sys_name); - bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload)); - bmp_buffer_free(&payload); + bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start); /* Send Peer Up messages */ - struct proto *peer; - WALK_LIST(peer, proto_list) - if ((peer->proto->class == PROTOCOL_BGP) && (peer->proto_state == PS_UP)) - bmp_peer_init(p, (struct bgp_proto *) peer); + u32 length; + PST_LOCKED(ts) /* The size of protos field will never decrease, the inconsistency caused by growing is not important */ + length = ts->length_states; + + /* Subscribe to protocol state changes */ + p->proto_state_reader = (struct lfjour_recipient) { + .event = &p->proto_state_changed, + .target = proto_event_list(&p->p), + }; + + p->proto_state_changed = (event) { + .hook = bmp_proto_state_changed, + .data = p, + }; + + proto_states_subscribe(&p->proto_state_reader); + + /* Load protocol states */ + for (u32 i = 0; i < length; i++) + { + ea_list *proto_attr = proto_get_state(i); + if (proto_attr == NULL) + continue; + + struct protocol *proto = (struct protocol *) ea_get_ptr(proto_attr, &ea_protocol_type, 0); + const int state = ea_get_int(proto_attr, &ea_state, 0); + + if (proto != &proto_bgp || state != PS_UP) + continue; + + bmp_peer_up_inout(p, proto_attr, false); + } } /** @@@ -1282,12 -1239,31 +1352,32 @@@ bmp_sock_err(sock *sk, int err bmp_down(p); bmp_close_socket(p); - tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME); + tm_start_in(p->connect_retry_timer, CONNECT_RETRY_TIME, p->p.loop); - proto_notify_state(&p->p, PS_START); + if (p->p.proto_state == PS_UP) + proto_notify_state(&p->p, PS_START); } + static void + bmp_tx_overflow(void *_p) + { + struct bmp_proto *p = _p; + if (p->tx_pending_count < p->tx_pending_limit) + return; + + p->sock_err = 0; + + log(L_ERR "%s: Connection stalled", p->p.name); + + if (p->started) + bmp_down(p); + + bmp_close_socket(p); + tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME); + + proto_notify_state(&p->p, PS_START); + } + /* BMP connect timeout event - switch from Idle/Connect state to Connect state */ static void bmp_connection_retry(timer *t) @@@ -1404,12 -1367,11 +1515,10 @@@ bmp_start(struct proto *P HASH_INIT(p->stream_map, P->pool, 4); HASH_INIT(p->table_map, P->pool, 4); - init_list(&p->tx_queue); - init_list(&p->update_msg_queue); p->started = false; p->sock_err = 0; - add_tail(&bmp_proto_list, &p->bmp_node); - tm_start(p->connect_retry_timer, CONNECT_INIT_TIME); + tm_start_in(p->connect_retry_timer, CONNECT_INIT_TIME, p->p.loop); return PS_START; } diff --cc proto/bmp/bmp.h index 0c75181a8,52f721225..9ab379221 --- a/proto/bmp/bmp.h +++ b/proto/bmp/bmp.h @@@ -63,17 -76,17 +75,20 @@@ struct bmp_proto struct monitoring_rib monitoring_rib; // Below fields are for internal use // struct bmp_peer_map bgp_peers; // Stores 'bgp_proto' structure per BGP peer - pool *buffer_mpool; // Memory pool used for BMP buffer allocations - pool *tx_mem_pool; // Memory pool used for packet allocations designated to BMP collector - pool *update_msg_mem_pool; // Memory pool used for BPG UPDATE MSG allocations - list tx_queue; // Stores queued packets going to be sent + struct bmp_tx_buffer *tx_pending;// This buffer waits for socket to flush + struct bmp_tx_buffer *tx_last; // This buffer is the last to flush + uint tx_pending_count; // How many buffers waiting for flush + uint tx_pending_limit; // Maximum on buffer count + u64 tx_sent; // Amount of data sent + u64 tx_sent_total; // Amount of data sent accumulated over reconnections + event *tx_overflow_event; // Too many buffers waiting for flush timer *connect_retry_timer; // Timer for retrying connection to the BMP collector - list update_msg_queue; // Stores all composed BGP UPDATE MSGs bool started; // Flag that stores running status of BMP instance int sock_err; // Last socket error code + + struct lfjour_recipient proto_state_reader; // Reader of protocol states + event proto_state_changed; + byte msgbuf[BMP_MSGBUF_LEN]; // Buffer for preparing the messages before sending them out }; struct bmp_peer { diff --cc proto/bmp/config.Y index 556339433,04540b26b..f7e0eb46c --- a/proto/bmp/config.Y +++ b/proto/bmp/config.Y @@@ -25,9 -25,9 +25,10 @@@ proto: bmp_proto '}' bmp_proto_start: proto_start BMP { this_proto = proto_config_new(&proto_bmp, $1); + this_proto->loop_order = DOMAIN_ORDER(proto); BMP_CFG->sys_descr = "Not defined"; BMP_CFG->sys_name = "Not defined"; + BMP_CFG->tx_pending_limit = (1 << 30) / page_size; } ;