}
static struct lfjour_item *
-lfjour_get_next(struct lfjour *j, struct lfjour_item *last)
+lfjour_get_next(struct lfjour *j, const struct lfjour_item *last)
{
/* This is lockless, no domain checks. */
if (!last)
struct lfjour_item *
lfjour_get(struct lfjour_recipient *r)
{
- ASSERT_DIE(r->cur == NULL);
struct lfjour *j = lfjour_of_recipient(r);
- /* The last pointer may get cleaned up under our hands.
- * Indicating that we're using it, by RCU read. */
+ const struct lfjour_item *last = r->cur;
+ struct lfjour_item *next = NULL;
- rcu_read_lock();
- struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
- r->cur = lfjour_get_next(j, last);
- rcu_read_unlock();
+ if (last)
+ next = lfjour_get_next(j, r->cur);
+ else
+ {
+ /* The last pointer may get cleaned up under our hands.
+ * Indicating that we're using it, by RCU read. */
+
+ rcu_read_lock();
+ last = atomic_load_explicit(&r->last, memory_order_acquire);
+ next = lfjour_get_next(j, last);
+ rcu_read_unlock();
+ }
if (last)
{
lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p",
- j, r, r->cur, r->cur ? r->cur->seq : 0ULL, last);
+ j, r, next, next ? next->seq : 0ULL, last);
}
else
{
lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean",
- j, r, r->cur, r->cur ? r->cur->seq : 0ULL);
+ j, r, next, next ? next->seq : 0ULL);
}
- return r->cur;
+ if (!next)
+ return NULL;
+
+ if (!r->first_holding_seq)
+ r->first_holding_seq = next->seq;
+
+ return r->cur = next;
}
-void lfjour_release(struct lfjour_recipient *r)
+void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it)
{
- /* This is lockless, no domain checks. */
+ /* Find out what we actually released last */
+ rcu_read_lock();
+ const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
+ struct lfjour_block *last_block = last ? PAGE_HEAD(last) : NULL;
+ rcu_read_unlock();
+ /* This is lockless, no domain checks. */
ASSERT_DIE(r->cur);
+
+ /* Partial or full release? */
+ ASSERT_DIE(r->first_holding_seq);
+ ASSERT_DIE(it->seq >= r->first_holding_seq);
+ if (it->seq < r->cur->seq)
+ {
+ lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, partial upto seq=%lu",
+ j, r, it, it->seq);
+ r->first_holding_seq = it->seq + 1;
+ atomic_store_explicit(&r->last, it, memory_order_release);
+ return;
+ }
+
struct lfjour_block *block = PAGE_HEAD(r->cur);
u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
atomic_store_explicit(&r->last, r->cur, memory_order_release);
/* The last block may be available to free */
- if (pos + 1 == end)
+ if ((pos + 1 == end) || last && (last_block != block))
lfjour_schedule_cleanup(j);
+ r->first_holding_seq = 0;
r->cur = NULL;
}
ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
if (r->cur)
- lfjour_release(r);
+ lfjour_release(r, r->cur);
lfjour_recipient_rem_node(&j->recipients, r);
lfjour_schedule_cleanup(j);
if (_locked) DG_LOCK(_locked);
u64 min_seq = ~((u64) 0);
- struct lfjour_item *last_item_to_free = NULL;
+ const struct lfjour_item *last_item_to_free = NULL;
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
if (!first)
WALK_TLIST(lfjour_recipient, r, &j->recipients)
{
- struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
+ const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
if (!last)
/* No last export means that the channel has exported nothing since last cleanup */
WALK_TLIST(lfjour_recipient, r, &j->recipients)
{
- struct lfjour_item *last = last_item_to_free;
+ const struct lfjour_item *last = last_item_to_free;
/* This either succeeds if this item is the most-behind-one,
* or fails and gives us the actual last for debug output. */
if (atomic_compare_exchange_strong_explicit(
TLIST_DEFAULT_NODE;
event *event; /* Event running when something is in the journal */
event_list *target; /* Event target */
- struct lfjour_item * _Atomic last; /* Last item processed */
+ const struct lfjour_item * _Atomic last; /* Last item processed */
+ u64 first_holding_seq; /* First item not released yet */
struct lfjour_item *cur; /* Processing this now */
_Atomic u64 recipient_flags; /* LFJOUR_R_* */
};
void lfjour_push_commit(struct lfjour *);
struct lfjour_item *lfjour_get(struct lfjour_recipient *);
-void lfjour_release(struct lfjour_recipient *);
+void lfjour_release(struct lfjour_recipient *, const struct lfjour_item *);
static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r)
{
return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET;
void (*refeed_hook)(struct channel *, struct rt_feeding_request *);
struct lfjour_recipient digest_recipient;
event update_event;
- struct rt_feeding_request rfr;
+};
+
+struct roa_reload_request {
+ struct rt_feeding_request req;
+ struct roa_subscription *s;
+ struct lfjour_item *item;
};
static void
channel_roa_reload_done(struct rt_feeding_request *req)
{
- SKIP_BACK_DECLARE(struct roa_subscription, s, rfr, req);
- ASSERT_DIE(s->c->channel_state == CS_UP);
+ SKIP_BACK_DECLARE(struct roa_reload_request, rrr, req, req);
+ ASSERT_DIE(rrr->s->c->channel_state == CS_UP);
- lfjour_release(&s->digest_recipient);
- ev_send(proto_work_list(s->c->proto), &s->update_event);
+ lfjour_release(&rrr->s->digest_recipient, rrr->item);
+ ev_send(proto_work_list(rrr->s->c->proto), &rrr->s->update_event);
+ mb_free(rrr);
/* FIXME: this should reset import/export filters if ACTION BLOCK */
}
{
struct roa_subscription *s = _s;
- if (s->digest_recipient.cur)
- return;
-
- if (!lfjour_get(&s->digest_recipient))
- return;
-
- SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur);
- s->rfr = (struct rt_feeding_request) {
- .prefilter = {
- .mode = TE_ADDR_TRIE,
- .trie = rd->trie,
- },
- .done = channel_roa_reload_done,
- };
+ for (struct lfjour_item *it; it = lfjour_get(&s->digest_recipient); )
+ {
+ SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur);
+ struct roa_reload_request *rrr = mb_alloc(s->c->proto->pool, sizeof *rrr);
+ *rrr = (struct roa_reload_request) {
+ .req = {
+ .prefilter = {
+ .mode = TE_ADDR_TRIE,
+ .trie = rd->trie,
+ },
+ .done = channel_roa_reload_done,
+ },
+ .s = s,
+ .item = it,
+ };
- s->refeed_hook(s->c, &s->rfr);
+ s->refeed_hook(s->c, &rrr->req);
+ }
}
static inline void (*channel_roa_reload_hook(int dir))(struct channel *, struct rt_feeding_request *)
channel_reset_limit(c, &c->rx_limit, PLD_RX);
channel_reset_limit(c, &c->in_limit, PLD_IN);
+ bmap_init(&c->imported_map, c->proto->pool, 16);
+
memset(&c->import_stats, 0, sizeof(struct channel_import_stats));
DBG("%s.%s: Channel start import req=%p\n", c->proto->name, c->name, &c->in_req);
mb_free(c->in_req.name);
c->in_req.name = NULL;
+ bmap_free(&c->imported_map);
+
channel_check_stopped(c);
}
+static u32
+channel_reimport_next_feed_index(struct rt_export_feeder *f, u32 try_this)
+{
+ SKIP_BACK_DECLARE(struct channel, c, reimporter, f);
+ while (!bmap_test(&c->imported_map, try_this))
+ if (!(try_this & (try_this - 1))) /* return every power of two to check for maximum */
+ return try_this;
+ else
+ try_this++;
+
+ return try_this;
+}
+
static void
channel_do_reload(void *_c)
{
RT_FEED_WALK(&c->reimporter, f)
{
+ _Bool seen = 0;
for (uint i = 0; i < f->count_routes; i++)
{
rte *r = &f->block[i];
/* And reload the route */
rte_update(c, r->net, &new, new.src);
+
+ seen = 1;
}
}
+ if (!seen)
+ bmap_clear(&c->imported_map, f->ni->index);
+
/* Local data needed no more */
tmp_flush();
c->reimporter = (struct rt_export_feeder) {
.name = mb_sprintf(c->proto->pool, "%s.%s.reimport", c->proto->name, c->name),
.trace_routes = c->debug,
+ .next_feed_index = channel_reimport_next_feed_index,
};
c->reimport_event = (event) {
.hook = channel_do_reload,
const struct filter *in_filter; /* Input filter */
const struct filter *out_filter; /* Output filter */
const net_addr *out_subprefix; /* Export only subprefixes of this net */
+ struct bmap imported_map; /* Which nets were touched by our import */
struct bmap export_accepted_map; /* Keeps track which routes were really exported */
struct bmap export_rejected_map; /* Keeps track which routes were rejected by export filter */
/* Feeding itself */
u32 feed_index; /* Index of the feed in progress */
+ u32 (*next_feed_index)(struct rt_export_feeder *, u32 try_this);
struct rt_feeding_request {
struct rt_feeding_request *next; /* Next in request chain */
void (*done)(struct rt_feeding_request *);/* Called when this refeed finishes */
} while (0)
#define NOT_THIS_UPDATE \
- lfjour_release(&r->r); \
+ lfjour_release(&r->r, &update->li); \
continue;
while (1)
case RT_EXPORT_UPDATE:
rtex_trace(r, D_ROUTES, "Export %lu released", u->update->seq);
- lfjour_release(&r->r);
+ lfjour_release(&r->r, &u->update->li);
break;
return NULL;
}
+#define NEXT_INDEX(f) f->feed_index = f->next_feed_index ? f->next_feed_index(f, f->feed_index + 1) : f->feed_index + 1
+
#define NOT_THIS_FEED(...) { \
rtex_trace(f, D_ROUTES, __VA_ARGS__); \
- f->feed_index++; \
+ NEXT_INDEX(f); \
continue; \
}
if (!feed)
NOT_THIS_FEED("Nothing found for index %u", f->feed_index);
- f->feed_index++;
+ NEXT_INDEX(f);
return feed;
}
f->feed_index = 0;
- if (f->feed_pending)
- {
- rtex_trace(f, D_STATES, "Feeding done, refeed request pending");
- f->feeding = f->feed_pending;
- f->feed_pending = NULL;
- return rt_export_next_feed(f);
- }
- else
- {
- rtex_trace(f, D_STATES, "Feeding done (%u)", f->feed_index);
+ uint count = 0;
+ for (struct rt_feeding_request *rfr = f->feed_pending; rfr; rfr = rfr->next)
+ count++;
+
+ rtex_trace(f, D_STATES, "Feeding done, %u refeed request%s pending",
+ count, (count == 1) ? "" : "s");
+
+ if (!f->feed_pending)
return NULL;
- }
+
+ f->feeding = f->feed_pending;
+ f->feed_pending = NULL;
+ return rt_export_next_feed(f);
}
static void
{
struct rt_roa_aggregator *rag = _rag;
- RT_EXPORT_WALK(&rag->src, u)
+ RT_EXPORT_WALK(&rag->src, u) TMP_SAVED
{
const net_addr *nroa = NULL;
struct rte_src *src = NULL;
SKIP_BACK_DECLARE(struct rt_roa_aggregated_adata, rad, ad, ea->u.ptr);
count = ROA_AGGR_COUNT(rad);
- rad_new = alloca(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]);
+ rad_new = tmp_alloc(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]);
/* Insertion into a sorted list */
uint p = 0;
else if (src)
{
count = 1;
- rad_new = alloca(sizeof *rad_new + sizeof rad_new->u[0]);
+ rad_new = tmp_alloc(sizeof *rad_new + sizeof rad_new->u[0]);
rad_new->u[0].asn = asn;
rad_new->u[0].max_pxlen = max_pxlen;
}
mpls_rte_preimport(new_in ? new : NULL, old_in ? old : NULL);
+ if (new)
+ bmap_set(&c->imported_map, NET_TO_INDEX(new->net)->index);
+
return verdict;
}