*/
#include "proto/bmp/bmp.h"
-#include "proto/bmp/buffer.h"
#include "proto/bmp/map.h"
#include <sys/socket.h>
BMP_ROUTE_MIRROR_MSG = 6 // Route Mirroring Message
};
-// Total size of Common Header
-#define BMP_COMMON_HDR_SIZE 6
// Defines size of padding when IPv4 address is going to be put into field
// which can accept also IPv6 address
#define BMP_PADDING_IP4_ADDR_SIZE 12
-/* BMP Per-Peer Header [RFC 7854 - Section 4.2] */
-// Total size of Per-Peer Header
-#define BMP_PER_PEER_HDR_SIZE 42
-
enum bmp_peer_type {
BMP_PEER_TYPE_GLOBAL_INSTANCE = 0,
BMP_PEER_TYPE_RD_INSTANCE = 1,
BMP_PEER_DOWN_REASON_PEER_DE_CONFIGURED = 5
};
+struct bmp_peer_down_info {
+ u8 reason;
+ u8 fsm_code;
+ u8 err_code;
+ u8 err_subcode;
+ const byte *data;
+ int length;
+};
+
/* BMP Termination Message [RFC 7854 - Section 4.5] */
#define BMP_TERM_INFO_TYPE_SIZE 2
enum bmp_term_info_type {
#define IP4_MAX_TTL 255
-#define IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(expr, msg, rv...) \
- do { \
- if ((expr)) \
- { \
- log(L_WARN "[BMP] " msg); \
- return rv; \
- } \
- } while (0)
+#define bmp_buffer_need(b, sz) ASSERT_DIE((b)->pos + (sz) <= (b)->end)
+// Idea for following macros has been taken from |proto/mrt/mrt.c|
+#define BMP_DEFINE_PUT_FUNC(S, T) \
+ static inline void \
+ bmp_put_##S(buffer *b, const T x) \
+ { \
+ bmp_buffer_need(b, sizeof(T)); \
+ put_##S(b->pos, x); \
+ b->pos += sizeof(T); \
+ }
-#define IF_PTR_IS_NULL_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(p, msg, rv...) \
- do { \
- IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(!(p), msg, rv); \
- } while (0)
+BMP_DEFINE_PUT_FUNC(u8, u8)
+BMP_DEFINE_PUT_FUNC(u16, u16)
+BMP_DEFINE_PUT_FUNC(u32, u32)
+BMP_DEFINE_PUT_FUNC(u64, u64)
+BMP_DEFINE_PUT_FUNC(ip4, ip4_addr)
+BMP_DEFINE_PUT_FUNC(ip6, ip6_addr)
+static inline void
+bmp_put_data(buffer *b, const void *src, const size_t len)
+{
+ ASSERT_DIE(b->pos + len <= b->end);
+ memcpy(b->pos, src, len);
+ b->pos += len;
+}
+
+static inline buffer
+bmp_default_buffer(struct bmp_proto *p)
+{
+ return (buffer) {
+ .start = p->msgbuf,
+ .pos = p->msgbuf,
+ .end = p->msgbuf + sizeof p->msgbuf,
+ };
+}
static void bmp_connected(struct birdsock *sk);
static void bmp_sock_err(sock *sk, int err);
static void bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs);
-// Stores necessary any data in list
-struct bmp_data_node {
- node n;
- byte *data;
- size_t data_size;
-
- u32 remote_as;
- u32 remote_id;
- ip_addr remote_ip;
- btime timestamp;
- bool global_peer;
- bool policy;
+// Stores TX data
+struct bmp_tx_buffer {
+ struct bmp_tx_buffer *next;
+ byte *pos;
+ byte data[];
+};
+
+#define bmp_tx_remains(b) (((byte *) (b) + page_size) - (b)->pos)
+
+/* A dummy resource to accurately show memory pages allocated for pending TX */
+struct bmp_tx_resource {
+ resource r;
+ struct bmp_proto *p;
+};
+
+static void
+bmp_tx_resource_free(resource *r UNUSED) {}
+
+static void
+bmp_tx_resource_dump(resource *r UNUSED) {}
+
+static struct resmem
+bmp_tx_resource_memsize(resource *r)
+{
+ struct bmp_proto *p = SKIP_BACK(struct bmp_tx_resource, r, r)->p;
+
+ return (struct resmem) {
+ .effective = p->tx_pending_count * page_size,
+ .overhead = sizeof(struct bmp_tx_resource),
+ };
+}
+
+static struct resclass bmp_tx_resource_class = {
+ .name = "BMP TX buffers",
+ .size = sizeof(struct bmp_tx_resource),
+ .free = bmp_tx_resource_free,
+ .dump = bmp_tx_resource_dump,
+ .memsize = bmp_tx_resource_memsize,
};
static void
// We include MIB-II sysDescr and sysName in BMP INIT MSG so that's why
// allocated 2x BMP_INFO_TLV_FIX_SIZE memory pool size
const size_t data_size = (2 * BMP_INFO_TLV_FIX_SIZE) + sys_descr_len + sys_name_len;
+
bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
bmp_common_hdr_serialize(stream, BMP_INIT_MSG, data_size);
bmp_info_tlv_hdr_serialize(stream, BMP_INFO_TLV_TYPE_SYS_DESCR, sys_descr);
}
static void
-bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, const size_t size)
+bmp_schedule_tx_packet(struct bmp_proto *p, const byte *payload, size_t size)
{
ASSERT(p->started);
- struct bmp_data_node *tx_data = mb_alloc(p->tx_mem_pool, sizeof (struct bmp_data_node));
- tx_data->data = mb_alloc(p->tx_mem_pool, size);
- memcpy(tx_data->data, payload, size);
- tx_data->data_size = size;
- add_tail(&p->tx_queue, &tx_data->n);
-
- if (sk_tx_buffer_empty(p->sk)
- && !ev_active(p->tx_ev))
+ while (size)
{
+ if (!p->tx_last || !bmp_tx_remains(p->tx_last))
+ {
+ if (p->tx_pending_count >= p->tx_pending_limit)
+ return ev_schedule(p->tx_overflow_event);
+
+ p->tx_pending_count++;
+
+ struct bmp_tx_buffer *btb = alloc_page();
+ btb->pos = btb->data;
+ btb->next = NULL;
+
+ if (p->tx_last)
+ {
+ ASSERT_DIE(!p->tx_last->next);
+ p->tx_last->next = btb;
+ }
+ else
+ ASSERT_DIE(p->tx_pending_count == 1);
+
+ p->tx_last = btb;
+
+ if (!p->tx_pending)
+ p->tx_pending = btb;
+ }
+
+ size_t cpylen = bmp_tx_remains(p->tx_last);
+ if (size < cpylen)
+ cpylen = size;
+
+ memcpy(p->tx_last->pos, payload, cpylen);
+ p->tx_last->pos += cpylen;
+
+ payload += cpylen;
+ size -= cpylen;
+ }
+
+ if (!p->sk->tbuf && !ev_active(p->tx_ev))
ev_schedule(p->tx_ev);
+}
+
+static void
+bmp_tx_buffer_free(struct bmp_proto *p, struct bmp_tx_buffer *btb)
+{
+ if (btb == p->tx_last)
+ {
+ p->tx_last = NULL;
+ ASSERT_DIE(!p->tx_pending_count);
}
+
+ free_page(btb);
}
static void
if (!p->started)
return;
- IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
- EMPTY_LIST(p->tx_queue),
- "Called BMP TX event handler when there is not any data to send"
- );
-
- size_t cnt = 0; // Counts max packets which we want to send per TX slot
- struct bmp_data_node *tx_data;
- struct bmp_data_node *tx_data_next;
- WALK_LIST_DELSAFE(tx_data, tx_data_next, p->tx_queue)
+ int cnt = 0;
+ for (struct bmp_tx_buffer *btb; btb = p->tx_pending; )
{
- if (tx_data->data_size > p->sk->tbsize)
- {
- sk_set_tbsize(p->sk, tx_data->data_size);
- }
+ ASSERT_DIE(!p->sk->tbuf);
+
+ p->sk->tbuf = btb->data;
+ u64 sz = btb->pos - btb->data;
+
+ p->tx_sent += sz;
+ p->tx_sent_total += sz;
+
+ if (p->tx_pending == p->tx_last)
+ p->tx_last = NULL;
- size_t data_size = tx_data->data_size;
- memcpy(p->sk->tbuf, tx_data->data, data_size);
- mb_free(tx_data->data);
- rem_node((node *) tx_data);
- mb_free(tx_data);
+ p->tx_pending = btb->next;
+ p->tx_pending_count--;
- if (sk_send(p->sk, data_size) <= 0)
+ if (sk_send(p->sk, sz) <= 0)
return;
- // BMP packets should be treat with lowest priority when scheduling sending
- // packets to target. That's why we want to send max. 32 packets per event
- // call
- if (++cnt > 32)
+ p->sk->tbuf = NULL;
+ bmp_tx_buffer_free(p, btb);
+
+ if (cnt++ > 1024)
{
if (!ev_active(p->tx_ev))
- {
- ev_schedule(p->tx_ev);
- }
-
+ ev_schedule(p->tx_ev);
return;
}
}
static void
bmp_tx(struct birdsock *sk)
{
+ struct bmp_proto *p = sk->data;
+
+ struct bmp_tx_buffer *btb = SKIP_BACK(struct bmp_tx_buffer, data, sk->tbuf);
+ bmp_tx_buffer_free(p, btb);
+
+ sk->tbuf = NULL;
+
bmp_fire_tx(sk->data);
}
}
/* [4.6] Route Monitoring */
-static void
-bmp_route_monitor_msg_serialize(buffer *stream, const bool is_peer_global,
+static byte *
+bmp_route_monitor_msg_serialize(struct bmp_proto *p, const bool is_peer_global,
const bool table_in_post_policy, const u32 peer_as, const u32 peer_bgp_id,
- const bool as4_support, const ip_addr remote_addr, const byte *update_msg,
+ const bool as4_support, const ip_addr remote_addr, byte *update_msg,
const size_t update_msg_size, btime timestamp)
{
+ ASSERT_DIE(update_msg < &p->msgbuf[sizeof p->msgbuf]);
+
+ buffer stream;
+ STACK_BUFFER_INIT(stream, BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE);
+
const size_t data_size = BMP_PER_PEER_HDR_SIZE + update_msg_size;
u32 ts_sec = timestamp TO_S;
u32 ts_usec = timestamp - (ts_sec S);
- bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
- bmp_common_hdr_serialize(stream, BMP_ROUTE_MONITOR, data_size);
- bmp_per_peer_hdr_serialize(stream, is_peer_global, table_in_post_policy,
+ bmp_common_hdr_serialize(&stream, BMP_ROUTE_MONITOR, data_size);
+ bmp_per_peer_hdr_serialize(&stream, is_peer_global, table_in_post_policy,
as4_support, remote_addr, peer_as, peer_bgp_id, ts_sec, ts_usec);
- bmp_put_data(stream, update_msg, update_msg_size);
+
+ size_t hdr_sz = stream.pos - stream.start;
+ ASSERT_DIE(update_msg >= &p->msgbuf[hdr_sz]);
+
+ byte *begin = &update_msg[-hdr_sz];
+ memcpy(begin, stream.start, hdr_sz);
+ return begin;
}
static void
bmp_per_peer_hdr_serialize(stream, is_peer_global,
false /* TODO: Hardcoded pre-policy Adj-RIB-In */, as4_support, remote_addr,
peer_as, peer_bgp_id, 0, 0); // 0, 0 - No timestamp provided
+
bmp_put_ipa(stream, local_addr);
bmp_put_u16(stream, local_port);
bmp_put_u16(stream, remote_port);
static void
bmp_peer_down_notif_msg_serialize(buffer *stream, const bool is_peer_global,
const u32 peer_as, const u32 peer_bgp_id, const bool as4_support,
- const ip_addr remote_addr, const byte *data, const size_t data_size)
+ const ip_addr remote_addr, const struct bmp_peer_down_info *info)
{
- const size_t payload_size = BMP_PER_PEER_HDR_SIZE + data_size;
- bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + payload_size);
- bmp_common_hdr_serialize(stream, BMP_PEER_DOWN_NOTIF, payload_size);
+ const size_t data_size = BMP_PER_PEER_HDR_SIZE + 1 +
+ (((info->reason == BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION) ||
+ (info->reason == BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION)) ? (BGP_HEADER_LENGTH + 2 + info->length) :
+ (info->reason == BMP_PEER_DOWN_REASON_LOCAL_NO_NOTIFICATION) ? 2 : 0);
+
+ bmp_buffer_need(stream, BMP_COMMON_HDR_SIZE + data_size);
+ bmp_common_hdr_serialize(stream, BMP_PEER_DOWN_NOTIF, data_size);
bmp_per_peer_hdr_serialize(stream, is_peer_global,
false /* TODO: Hardcoded pre-policy adj RIB IN */, as4_support, remote_addr,
peer_as, peer_bgp_id, 0, 0); // 0, 0 - No timestamp provided
- bmp_put_data(stream, data, data_size);
+
+ bmp_put_u8(stream, info->reason);
+
+ switch (info->reason)
+ {
+ case BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION:
+ case BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION:;
+ uint bgp_msg_length = BGP_HEADER_LENGTH + 2 + info->length;
+ bmp_buffer_need(stream, bgp_msg_length);
+ bmp_put_bgp_hdr(stream, bgp_msg_length, PKT_NOTIFICATION);
+ bmp_put_u8(stream, info->err_code);
+ bmp_put_u8(stream, info->err_subcode);
+ bmp_put_data(stream, info->data, info->length);
+ break;
+
+ case BMP_PEER_DOWN_REASON_LOCAL_NO_NOTIFICATION:
+ bmp_put_u16(stream, info->fsm_code);
+ break;
+ }
}
ASSERT(p->started);
const struct birdsock *sk = bmp_get_birdsock_ext(bgp);
- IF_PTR_IS_NULL_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
- sk,
- "[BMP] No BGP socket"
- );
+ if (!sk)
+ {
+ log(L_WARN "%s: No BGP socket", p->p.name);
+ return;
+ }
const bool is_global_instance_peer = bmp_is_peer_global_instance(bgp);
- buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+ buffer payload = bmp_default_buffer(p);
bmp_peer_up_notif_msg_serialize(&payload, is_global_instance_peer,
bgp->remote_as, bgp->remote_id, 1,
sk->saddr, sk->daddr, sk->sport, sk->dport, tx_data, tx_data_size,
rx_data, rx_data_size);
- bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
- bmp_buffer_free(&payload);
+ bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
}
static void
-bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, const byte *data, size_t length, btime timestamp)
+bmp_route_monitor_put_update(struct bmp_proto *p, struct bmp_stream *bs, byte *data, size_t length, btime timestamp)
{
- struct bmp_data_node *upd_msg = mb_alloc(p->update_msg_mem_pool,
- sizeof (struct bmp_data_node));
- upd_msg->data = mb_alloc(p->update_msg_mem_pool, length);
- memcpy(upd_msg->data, data, length);
- upd_msg->data_size = length;
- add_tail(&p->update_msg_queue, &upd_msg->n);
-
- /* Save some metadata */
struct bgp_proto *bgp = bs->bgp;
- upd_msg->remote_as = bgp->remote_as;
- upd_msg->remote_id = bgp->remote_id;
- upd_msg->remote_ip = bgp->remote_ip;
- upd_msg->timestamp = timestamp;
- upd_msg->global_peer = bmp_is_peer_global_instance(bgp);
- upd_msg->policy = bmp_stream_policy(bs);
-
- /* Kick the commit */
- if (!ev_active(p->update_ev))
- ev_schedule(p->update_ev);
+ const byte *start = bmp_route_monitor_msg_serialize(p,
+ bmp_is_peer_global_instance(bgp),
+ bmp_stream_policy(bs),
+ bgp->remote_as,
+ bgp->remote_id,
+ true,
+ bgp->remote_ip,
+ data,
+ length,
+ timestamp
+ );
+
+ bmp_schedule_tx_packet(p, start, (data - start) + length);
}
static void
bmp_route_monitor_notify(struct bmp_proto *p, struct bmp_stream *bs,
const net_addr *n, const struct rte *new, const struct rte_src *src)
{
- byte buf[BGP_MAX_EXT_MSG_LENGTH];
- byte *end = bgp_bmp_encode_rte(bs->sender, buf, n, new, src);
+ byte *bufend = &p->msgbuf[sizeof p->msgbuf];
+ byte *begin = bufend - BGP_MAX_EXT_MSG_LENGTH;
+ byte *end = bgp_bmp_encode_rte(bs->sender, begin, bufend, n, new, src);
btime delta_t = new ? current_time() - new->lastmod : 0;
btime timestamp = current_real_time() - delta_t;
if (end)
- bmp_route_monitor_put_update(p, bs, buf, end - buf, timestamp);
+ bmp_route_monitor_put_update(p, bs, begin, end - begin, timestamp);
else
log(L_WARN "%s: Cannot encode update for %N", p->p.name, n);
}
-static void
-bmp_route_monitor_commit(void *p_)
-{
- struct bmp_proto *p = p_;
-
- if (!p->started)
- return;
-
- buffer payload
- = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
-
- struct bmp_data_node *data, *data_next;
- WALK_LIST_DELSAFE(data, data_next, p->update_msg_queue)
- {
- bmp_route_monitor_msg_serialize(&payload,
- data->global_peer, data->policy,
- data->remote_as, data->remote_id, true,
- data->remote_ip, data->data, data->data_size,
- data->timestamp);
-
- bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-
- bmp_buffer_flush(&payload);
-
- mb_free(data->data);
- rem_node(&data->n);
- mb_free(data);
- }
-
- bmp_buffer_free(&payload);
-}
-
static void
bmp_route_monitor_end_of_rib(struct bmp_proto *p, struct bmp_stream *bs)
{
TRACE(D_PACKETS, "Sending END-OF-RIB for %s.%s", bs->bgp->p.name, bs->sender->c.name);
- byte rx_end_payload[DEFAULT_MEM_BLOCK_SIZE];
+ byte *rx_end_payload = p->msgbuf + BMP_PER_PEER_HDR_SIZE + BMP_COMMON_HDR_SIZE;
byte *pos = bgp_create_end_mark_(bs->sender, rx_end_payload + BGP_HEADER_LENGTH);
memset(rx_end_payload + BGP_MSG_HDR_MARKER_POS, 0xff,
BGP_MSG_HDR_MARKER_SIZE); // BGP UPDATE MSG marker
static void
bmp_send_peer_down_notif_msg(struct bmp_proto *p, const struct bgp_proto *bgp,
- const byte *data, const size_t data_size)
+ const struct bmp_peer_down_info *info)
{
ASSERT(p->started);
const struct bgp_caps *remote_caps = bmp_get_bgp_remote_caps_ext(bgp);
bool is_global_instance_peer = bmp_is_peer_global_instance(bgp);
- buffer payload
- = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+ buffer payload = bmp_default_buffer(p);
bmp_peer_down_notif_msg_serialize(&payload, is_global_instance_peer,
bgp->remote_as, bgp->remote_id,
remote_caps ? remote_caps->as4_support : bgp->as4_session,
- bgp->remote_ip, data, data_size);
- bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-
- bmp_buffer_free(&payload);
+ bgp->remote_ip, info);
+ bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
}
static void
TRACE(D_STATES, "Peer down for %s", bgp->p.name);
- uint bmp_code = 0;
- uint fsm_code = 0;
+ struct bmp_peer_down_info info = {
+ .err_code = err_code,
+ .err_subcode = err_subcode,
+ .data = data,
+ .length = length,
+ };
switch (err_class)
{
case BE_BGP_RX:
- bmp_code = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION;
+ info.reason = BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION;
break;
case BE_BGP_TX:
case BE_AUTO_DOWN:
case BE_MAN_DOWN:
- bmp_code = BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION;
+ info.reason = BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION;
break;
default:
- bmp_code = BMP_PEER_DOWN_REASON_REMOTE_NO_NOTIFICATION;
- length = 0;
+ info.reason = BMP_PEER_DOWN_REASON_REMOTE_NO_NOTIFICATION;
+ info.length = 0;
break;
}
- buffer payload = bmp_buffer_alloc(p->buffer_mpool, 1 + BGP_HEADER_LENGTH + 2 + length);
- bmp_put_u8(&payload, bmp_code);
-
- switch (bmp_code)
- {
- case BMP_PEER_DOWN_REASON_LOCAL_BGP_NOTIFICATION:
- case BMP_PEER_DOWN_REASON_REMOTE_BGP_NOTIFICATION:
- bmp_put_bgp_hdr(&payload, BGP_HEADER_LENGTH + 2 + length, PKT_NOTIFICATION);
- bmp_put_u8(&payload, err_code);
- bmp_put_u8(&payload, err_subcode);
- bmp_put_data(&payload, data, length);
- break;
-
- case BMP_PEER_DOWN_REASON_LOCAL_NO_NOTIFICATION:
- bmp_put_u16(&payload, fsm_code);
- break;
- }
-
- bmp_send_peer_down_notif_msg(p, bgp, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
-
- bmp_buffer_free(&payload);
+ bmp_send_peer_down_notif_msg(p, bgp, &info);
bmp_remove_peer(p, bp);
}
+ BMP_TERM_INFO_LEN_FIELD_SIZE
+ BMP_TERM_REASON_CODE_SIZE;
const size_t term_msg_size = BMP_COMMON_HDR_SIZE + term_msg_hdr_size;
- buffer stream = bmp_buffer_alloc(p->buffer_mpool, term_msg_size);
+ buffer stream = bmp_default_buffer(p);
+ bmp_buffer_need(&stream, term_msg_size);
+
bmp_common_hdr_serialize(&stream, BMP_TERM_MSG, term_msg_hdr_size);
bmp_put_u16(&stream, BMP_TERM_INFO_REASON);
bmp_put_u16(&stream, BMP_TERM_REASON_CODE_SIZE); // 2-byte code indication the reason
bmp_put_u16(&stream, reason);
- memcpy(p->sk->tbuf, bmp_buffer_data(&stream), bmp_buffer_pos(&stream));
- IF_COND_TRUE_PRINT_ERR_MSG_AND_RETURN_OPT_VAL(
- sk_send(p->sk, bmp_buffer_pos(&stream)) < 0,
- "Failed to send BMP termination message"
- );
- bmp_buffer_free(&stream);
+ if (p->sk->tbuf)
+ bmp_tx_buffer_free(p, SKIP_BACK(struct bmp_tx_buffer, data, p->sk->tbuf));
+
+ p->sk->tbuf = stream.start;
+ if (sk_send(p->sk, stream.pos - stream.start) < 0)
+ log(L_WARN "%s: Cannot send BMP termination message", p->p.name);
+ p->sk->tbuf = NULL;
}
int
proto_notify_state(&p->p, PS_UP);
/* Send initiation message */
- buffer payload = bmp_buffer_alloc(p->buffer_mpool, DEFAULT_MEM_BLOCK_SIZE);
+ buffer payload = bmp_default_buffer(p);
bmp_init_msg_serialize(&payload, p->sys_descr, p->sys_name);
- bmp_schedule_tx_packet(p, bmp_buffer_data(&payload), bmp_buffer_pos(&payload));
- bmp_buffer_free(&payload);
+ bmp_schedule_tx_packet(p, payload.start, payload.pos - payload.start);
/* Send Peer Up messages */
struct proto *peer;
{
ASSERT(p->started);
p->started = false;
+ p->tx_sent = 0;
TRACE(D_EVENTS, "BMP session closed");
sk->dport = p->station_port;
sk->ttl = IP4_MAX_TTL;
sk->tos = IP_PREC_INTERNET_CONTROL;
- sk->tbsize = BGP_TX_BUFFER_EXT_SIZE;
sk->tx_hook = bmp_connected;
sk->err_hook = bmp_sock_err;
proto_notify_state(&p->p, PS_START);
}
+static void
+bmp_tx_overflow(void *_p)
+{
+ struct bmp_proto *p = _p;
+ if (p->tx_pending_count < p->tx_pending_limit)
+ return;
+
+ p->sock_err = 0;
+
+ log(L_ERR "%s: Connection stalled", p->p.name);
+
+ if (p->started)
+ bmp_down(p);
+
+ bmp_close_socket(p);
+ tm_start(p->connect_retry_timer, CONNECT_RETRY_TIME);
+
+ proto_notify_state(&p->p, PS_START);
+}
+
/* BMP connect timeout event - switch from Idle/Connect state to Connect state */
static void
bmp_connection_retry(timer *t)
static void
bmp_close_socket(struct bmp_proto *p)
{
+ if (p->sk && p->sk->tbuf)
+ bmp_tx_buffer_free(p, SKIP_BACK(struct bmp_tx_buffer, data, p->sk->tbuf));
+
+ struct bmp_tx_buffer *btb = p->tx_pending;
+ while (btb)
+ {
+ p->tx_pending_count--;
+
+ struct bmp_tx_buffer *next = btb->next;
+ bmp_tx_buffer_free(p, btb);
+ btb = next;
+ }
+
+ p->tx_pending = NULL;
+
+ ASSERT_DIE(!p->tx_last);
+ ASSERT_DIE(!p->tx_pending_count);
+
rfree(p->sk);
p->sk = NULL;
}
strcpy(p->sys_name, cf->sys_name);
p->monitoring_rib.in_pre_policy = cf->monitoring_rib_in_pre_policy;
p->monitoring_rib.in_post_policy = cf->monitoring_rib_in_post_policy;
+ p->tx_pending_limit = cf->tx_pending_limit;
return P;
}
{
struct bmp_proto *p = (void *) P;
- p->buffer_mpool = rp_new(P->pool, "BMP Buffer");
- p->tx_mem_pool = rp_new(P->pool, "BMP Tx");
- p->update_msg_mem_pool = rp_new(P->pool, "BMP Update");
p->tx_ev = ev_new_init(p->p.pool, bmp_fire_tx, p);
- p->update_ev = ev_new_init(p->p.pool, bmp_route_monitor_commit, p);
+ p->tx_pending = NULL;
+ p->tx_pending_count = 0;
+ p->tx_overflow_event = ev_new_init(p->p.pool, bmp_tx_overflow, p);
p->connect_retry_timer = tm_new_init(p->p.pool, bmp_connection_retry, p, 0, 0);
p->sk = NULL;
+ resource *r = ralloc(P->pool, &bmp_tx_resource_class);
+ SKIP_BACK(struct bmp_tx_resource, r, r)->p = p;
+
HASH_INIT(p->peer_map, P->pool, 4);
HASH_INIT(p->stream_map, P->pool, 4);
HASH_INIT(p->table_map, P->pool, 4);
- init_list(&p->tx_queue);
- init_list(&p->update_msg_queue);
p->started = false;
p->sock_err = 0;
add_tail(&bmp_proto_list, &p->bmp_node);
{
bmp_send_termination_msg(p, BMP_TERM_REASON_ADM);
bmp_down(p);
+ bmp_close_socket(p);
}
p->sock_err = 0;
/* We must update our copy of configuration ptr */
p->cf = new;
+ /* Reconfigure tx buffer size limits */
+ p->tx_pending_limit = new->tx_pending_limit;
+
return 1;
}
if (p->sock_err)
cli_msg(-1006, " %-19s %M", "Last error:", p->sock_err);
+
+ cli_msg(-1006, " %-19s % 9sB (limit %sB)", "Pending TX:",
+ fmt_order(p->tx_pending_count * (u64) page_size, 1, 10000),
+ fmt_order(p->tx_pending_limit * (u64) page_size, 1, 10000));
+
+ cli_msg(-1006, " %-19s % 9sB", "Session TX:", fmt_order(p->tx_sent, 1, 10000));
+ cli_msg(-1006, " %-19s % 9sB", "Total TX:", fmt_order(p->tx_sent_total, 1, 10000));
}
}