if (b == c->withdraw_bucket)
return 0;
- if (EMPTY_LIST(b->prefixes))
+ if (enlisted(&b->send_node) && EMPTY_LIST(b->prefixes))
rem_node(&b->send_node);
if (b->px_uc || !EMPTY_LIST(b->prefixes))
return px;
return NULL;
+
+ c->tx_lock = DOMAIN_NEW(rtable);
}
static struct bgp_prefix *
atomic_store_explicit(&c->prefixes, nb, memory_order_release);
atomic_store_explicit(&c->prefixes_len, nlen, memory_order_release);
+ atomic_store_explicit(&c->prefix_exporter.max_feed_index, nlen, memory_order_release);
synchronize_rcu();
}
/* The new bucket is the same as we sent before */
- if ((px->last == b) || c->c.out_table && !px->last && IS_WITHDRAW_BUCKET(b))
+ if ((px->last == b) || c->tx_keep && !px->last && IS_WITHDRAW_BUCKET(b))
{
if (px->cur)
BPX_TRACE("reverted");
rem_node(&px->buck_node);
/* We may want to store the updates */
- if (c->c.out_table)
+ if (c->tx_keep)
{
/* Nothing to be sent right now */
px->cur = NULL;
/* Unref the previous sent version */
if (px->last)
- px->last->px_uc--;
+ if (!--px->last->px_uc)
+ bgp_done_bucket(c, px->last);
/* Ref the current sent version */
if (!IS_WITHDRAW_BUCKET(buck))
bgp_free_prefix(c, px);
}
+void
+bgp_tx_resend(struct bgp_proto *p, struct bgp_channel *c)
+{
+ LOCK_DOMAIN(rtable, c->tx_lock);
+
+ ASSERT_DIE(c->tx_keep);
+ uint seen = 0;
+
+ u32 len = atomic_load_explicit(&c->prefixes_len, memory_order_relaxed);
+ struct bgp_prefix * _Atomic * block =
+ atomic_load_explicit(&c->prefixes, memory_order_relaxed);
+
+ for (u32 i = 0; i < len; i++)
+ for (struct bgp_prefix * _Atomic *ppx = &block[i], *px;
+ px = atomic_load_explicit(ppx, memory_order_relaxed);
+ ppx = &px->next)
+ if (!px->cur)
+ {
+ ASSERT_DIE(px->last);
+ struct bgp_bucket *last = px->last;
+
+ /* Remove the last reference, we wanna resend the route */
+ px->last->px_uc--;
+ px->last = NULL;
+
+ /* And send it once again */
+ seen += bgp_update_prefix(c, px, last);
+ }
+
+ if (c->c.debug & D_EVENTS)
+ log(L_TRACE "%s.%s: TX resending %u routes",
+ c->c.proto->name, c->c.name, seen);
+
+ UNLOCK_DOMAIN(rtable, c->tx_lock);
+
+ if (seen)
+ bgp_schedule_packet(p->conn, c, PKT_UPDATE);
+}
+
+/*
+ * Prefix hash table exporter
+ */
+
+static void
+bgp_out_item_done(struct lfjour *j, struct lfjour_item *i)
+{}
+
+static struct rt_export_feed *
+bgp_out_feed_net(struct rt_exporter *e, struct rcu_unwinder *u, const struct netindex *ni, const struct rt_export_item *_first)
+{
+ struct rt_export_feed *feed = NULL;
+ SKIP_BACK_DECLARE(struct bgp_channel, c, prefix_exporter, e);
+
+ u32 len = atomic_load_explicit(&c->prefixes_len, memory_order_relaxed);
+ if (ni->index >= len)
+ return NULL;
+
+ struct bgp_prefix * _Atomic * block =
+ atomic_load_explicit(&c->prefixes, memory_order_relaxed);
+
+ uint count = 0;
+ for (struct bgp_prefix * _Atomic *ppx = &block[ni->index], *cpx;
+ cpx = atomic_load_explicit(ppx, memory_order_relaxed);
+ ppx = &cpx->next)
+ count++;
+
+ if (count)
+ {
+ feed = rt_alloc_feed(count, 0);
+ feed->ni = ni;
+
+ uint pos = 0;
+ for (struct bgp_prefix * _Atomic *ppx = &block[ni->index], *cpx;
+ cpx = atomic_load_explicit(ppx, memory_order_relaxed);
+ ppx = &cpx->next)
+ if (cpx->ni == ni)
+ {
+ if (cpx->cur)
+ if (pos >= count)
+ RCU_RETRY(u);
+ else
+ feed->block[pos++] = (rte) {
+ .attrs = cpx->cur->attrs ? ea_ref_tmp(cpx->cur->attrs) : NULL,
+ .net = ni->addr,
+ .src = cpx->src,
+ .lastmod = cpx->lastmod,
+ .flags = REF_PENDING,
+ };
+
+ if (cpx->last)
+ if (pos >= count)
+ RCU_RETRY(u);
+ else
+ feed->block[pos++] = (rte) {
+ .attrs = cpx->last->attrs ? ea_ref_tmp(cpx->last->attrs) : NULL,
+ .net = ni->addr,
+ .src = cpx->src,
+ .lastmod = cpx->lastmod,
+ .flags = REF_PENDING,
+ };
+ }
+
+ if (pos != count)
+ RCU_RETRY(u);
+ }
+
+ return feed;
+}
+
+/* TX structures Init and Free */
+
void
bgp_init_pending_tx(struct bgp_channel *c)
{
+ ASSERT_DIE(c->c.out_table == NULL);
+
bgp_init_bucket_table(c);
bgp_init_prefix_table(c);
+
+ c->prefix_exporter = (struct rt_exporter) {
+ .journal = {
+ .loop = c->c.proto->loop,
+ .item_size = sizeof(struct rt_export_item),
+ .item_done = bgp_out_item_done,
+ },
+ .name = mb_sprintf(c->c.proto->pool, "%s.%s.export", c->c.proto->name, c->c.name),
+ .net_type = c->c.net_type,
+ .max_feed_index = atomic_load_explicit(&c->prefixes_len, memory_order_relaxed),
+ .netindex = c->tx_netindex,
+ .trace_routes = c->c.debug,
+ .feed_net = bgp_out_feed_net,
+ };
+
+ rt_exporter_init(&c->prefix_exporter, &c->cf->ptx_exporter_settle);
+
+ c->c.out_table = &c->prefix_exporter;
}
struct bgp_pending_tx_finisher {
LOCK_DOMAIN(rtable, c->tx_lock);
+ c->c.out_table = NULL;
+ rt_exporter_shutdown(&c->prefix_exporter, NULL);
+
struct bgp_prefix *px;
+ u32 len = atomic_load_explicit(&c->prefixes_len, memory_order_relaxed);
+ struct bgp_prefix * _Atomic * block =
+ atomic_load_explicit(&c->prefixes, memory_order_relaxed);
+
+ if (c->tx_keep)
+ {
+ /* Move all kept prefixes to the withdraw bucket */
+ struct bgp_bucket *b = bgp_get_withdraw_bucket(c);
+ for (u32 i = 0; i < len; i++)
+ for (struct bgp_prefix * _Atomic *ppx = &block[i], *cpx;
+ cpx = atomic_load_explicit(ppx, memory_order_relaxed);
+ ppx = &cpx->next)
+ bgp_update_prefix(c, cpx, b);
+ }
+
+ /* Flush withdrawals */
if (c->withdraw_bucket)
WALK_LIST_FIRST(px, c->withdraw_bucket->prefixes)
bgp_done_prefix(c, px, c->withdraw_bucket);
+ /* Flush pending TX */
struct bgp_bucket *b;
WALK_LIST_FIRST(b, c->bucket_queue)
{
bgp_done_bucket(c, b);
}
- u32 len = atomic_load_explicit(&c->prefixes_len, memory_order_relaxed);
- struct bgp_prefix * _Atomic * block =
- atomic_load_explicit(&c->prefixes, memory_order_relaxed);
-
+ /* Consistence and resource leak checks */
for (u32 i = 0; i < len; i++)
if (atomic_load_explicit(&block[i], memory_order_relaxed))
bug("Stray prefix after cleanup");
atomic_store_explicit(&c->prefixes, NULL, memory_order_release);
atomic_store_explicit(&c->prefixes_len, 0, memory_order_release);
+ atomic_store_explicit(&c->prefix_exporter.max_feed_index, 0, memory_order_release);
synchronize_rcu();
mb_free(block);
channel_add_obstacle(&c->c);
netindex_hash_delete(c->tx_netindex, &bptf->e, proto_event_list(c->c.proto));
c->tx_netindex = NULL;
+ c->prefix_exporter.netindex = NULL;
UNLOCK_DOMAIN(rtable, c->tx_lock);
DOMAIN_FREE(rtable, c->tx_lock);
}
-#if 0
-/*
- * Prefix hash table exporter
- */
-
-struct bgp_out_export_hook {
- struct rt_export_hook h;
- u32 hash_iter; /* Iterator over hash */
-};
-
-static void
-bgp_out_table_feed(void *data)
-{
- struct bgp_out_export_hook *hook = data;
- SKIP_BACK_DECLARE(struct bgp_channel, c, prefix_exporter, hook->h.table);
-
- int max = 512;
-
- const net_addr *neq = (hook->h.req->prefilter.mode == TE_ADDR_EQUAL) ? hook->h.req->prefilter.addr : NULL;
- const net_addr *cand = NULL;
-
- do {
- HASH_WALK_ITER(c->prefix_hash, PXH, n, hook->hash_iter)
- {
- switch (hook->h.req->prefilter.mode)
- {
- case TE_ADDR_HOOK:
- case TE_ADDR_TRIE:
- case TE_ADDR_IN:
- if (!rt_prefilter_net(&hook->h.req->prefilter, n->net))
- continue;
- /* fall through */
- case TE_ADDR_NONE:
- /* Splitting only for multi-net exports */
- if (--max <= 0)
- HASH_WALK_ITER_PUT;
- break;
-
- case TE_ADDR_FOR:
- if (!neq)
- {
- if (net_in_netX(hook->h.req->prefilter.addr, n->net) && (!cand || (n->net->length > cand->length)))
- cand = n->net;
- continue;
- }
- /* fall through */
- case TE_ADDR_EQUAL:
- if (!net_equal(n->net, neq))
- continue;
- break;
- }
-
- struct bgp_bucket *buck = n->cur ?: n->last;
- ea_list *ea = NULL;
- if (buck == c->withdraw_bucket)
- ea_set_dest(&ea, 0, RTD_UNREACHABLE);
- else
- {
- ea = buck->eattrs;
- eattr *eanh = bgp_find_attr(ea, BA_NEXT_HOP);
- ASSERT_DIE(eanh);
- const ip_addr *nh = (const void *) eanh->u.ptr->data;
-
- struct nexthop_adata nhad = {
- .ad = { .length = sizeof (struct nexthop_adata) - sizeof (struct adata), },
- .nh = { .gw = nh[0], },
- };
-
- ea_set_attr(&ea, EA_LITERAL_DIRECT_ADATA(&ea_gen_nexthop, 0, tmp_copy_adata(&nhad.ad)));
- }
-
- struct rte es = {
- .attrs = ea,
- .net = n->net,
- .src = rt_find_source_global(n->path_id),
- .sender = NULL,
- .lastmod = n->lastmod,
- .flags = n->cur ? REF_PENDING : 0,
- };
-
- struct rt_pending_export rpe = {
- .new = &es, .new_best = &es,
- };
-
- if (hook->h.req->export_bulk)
- {
- const rte *feed = &es;
- hook->h.req->export_bulk(hook->h.req, n->net, &rpe, &rpe, &feed, 1);
- }
- else if (hook->h.req->export_one)
- hook->h.req->export_one(hook->h.req, n->net, &rpe);
- else
- bug("No export method in export request");
- }
- HASH_WALK_ITER_END;
-
- neq = cand;
- cand = NULL;
- } while (neq);
-
- if (hook->hash_iter)
- ev_schedule_work(&hook->h.event);
- else
- rt_set_export_state(&hook->h, BIT32_ALL(TES_FEEDING), TES_READY);
-}
-
-static void
-bgp_out_table_export_start(struct rt_exporter *re, struct rt_export_request *req)
-{
- req->hook = rt_alloc_export(re, req->pool, sizeof(struct bgp_out_export_hook));
- req->hook->req = req;
-
- SKIP_BACK_DECLARE(struct bgp_out_export_hook, hook, h, req->hook);
-
- hook->h.event.hook = bgp_out_table_feed;
- rt_init_export(re, req->hook);
-}
-
-static void
-bgp_out_table_export_stop(struct rt_export_hook *hook)
-{
- rt_set_export_state(hook, BIT32_ALL(TES_HUNGRY, TES_FEEDING, TES_READY), TES_STOP);
- rt_stop_export_common(hook);
-}
-
-static void
-bgp_out_table_export_done(void *data)
-{
- struct bgp_out_export_hook *hook = data;
- struct rt_export_request *req = hook->h.req;
- void (*stopped)(struct rt_export_request *) = hook->h.stopped;
-
- rt_export_stopped(&hook->h);
- CALL(stopped, req);
-}
-
-static const struct rt_exporter_class bgp_out_table_export_class = {
- .start = bgp_out_table_export_start,
- .stop = bgp_out_table_export_stop,
- .done = bgp_out_table_export_done,
-};
-
-void
-bgp_setup_out_table(struct bgp_channel *c)
-{
- ASSERT_DIE(c->c.out_table == NULL);
-
- c->prefix_exporter = (struct rt_exporter) {
- .class = &bgp_out_table_export_class,
- .addr_type = c->c.table->addr_type,
- .rp = c->c.proto->pool,
- };
-
- rt_exporter_init(&c->prefix_exporter);
-
- c->c.out_table = &c->prefix_exporter;
-}
-#else
-void
-bgp_setup_out_table(struct bgp_channel *c UNUSED)
-{}
-#endif
-
-
/*
* BGP protocol glue
*/