]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Conflating multiple partial ROA reload requests together
authorMaria Matejka <mq@ucw.cz>
Thu, 20 Jun 2024 09:58:23 +0000 (11:58 +0200)
committerMaria Matejka <mq@ucw.cz>
Wed, 26 Jun 2024 09:29:43 +0000 (11:29 +0200)
lib/lockfree.c
lib/lockfree.h
nest/proto.c
nest/protocol.h
nest/route.h
nest/rt-export.c
nest/rt-table.c

index 3a2ccab75fd5414d2d3cd52b7aa073a2790dafa7..17c17d1898f53a44020c30529bd3f6ced9547963 100644 (file)
@@ -117,7 +117,7 @@ lfjour_push_commit(struct lfjour *j)
 }
 
 static struct lfjour_item *
-lfjour_get_next(struct lfjour *j, struct lfjour_item *last)
+lfjour_get_next(struct lfjour *j, const struct lfjour_item *last)
 {
   /* This is lockless, no domain checks. */
   if (!last)
@@ -158,36 +158,67 @@ lfjour_get_next(struct lfjour *j, struct lfjour_item *last)
 struct lfjour_item *
 lfjour_get(struct lfjour_recipient *r)
 {
-  ASSERT_DIE(r->cur == NULL);
   struct lfjour *j = lfjour_of_recipient(r);
 
-  /* The last pointer may get cleaned up under our hands.
-   * Indicating that we're using it, by RCU read. */
+  const struct lfjour_item *last = r->cur;
+  struct lfjour_item *next = NULL;
 
-  rcu_read_lock();
-  struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
-  r->cur = lfjour_get_next(j, last);
-  rcu_read_unlock();
+  if (last)
+    next = lfjour_get_next(j, r->cur);
+  else
+  {
+    /* The last pointer may get cleaned up under our hands.
+     * Indicating that we're using it, by RCU read. */
+
+    rcu_read_lock();
+    last = atomic_load_explicit(&r->last, memory_order_acquire);
+    next = lfjour_get_next(j, last);
+    rcu_read_unlock();
+  }
 
   if (last)
   {
     lfjour_debug_detailed("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, last %p",
-       j, r, r->cur, r->cur ? r->cur->seq : 0ULL, last);
+       j, r, next, next ? next->seq : 0ULL, last);
   }
   else
   {
     lfjour_debug("lfjour(%p)_get(recipient=%p) returns %p, seq=%lu, clean",
-       j, r, r->cur, r->cur ? r->cur->seq : 0ULL);
+       j, r, next, next ? next->seq : 0ULL);
   }
 
-  return r->cur;
+  if (!next)
+    return NULL;
+
+  if (!r->first_holding_seq)
+    r->first_holding_seq = next->seq;
+
+  return r->cur = next;
 }
 
-void lfjour_release(struct lfjour_recipient *r)
+void lfjour_release(struct lfjour_recipient *r, const struct lfjour_item *it)
 {
-  /* This is lockless, no domain checks. */
+  /* Find out what we actually released last */
+  rcu_read_lock();
+  const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
+  struct lfjour_block *last_block = last ? PAGE_HEAD(last) : NULL;
+  rcu_read_unlock();
 
+  /* This is lockless, no domain checks. */
   ASSERT_DIE(r->cur);
+
+  /* Partial or full release? */
+  ASSERT_DIE(r->first_holding_seq);
+  ASSERT_DIE(it->seq >= r->first_holding_seq);
+  if (it->seq < r->cur->seq)
+  {
+    lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, partial upto seq=%lu",
+       j, r, it, it->seq);
+    r->first_holding_seq = it->seq + 1;
+    atomic_store_explicit(&r->last, it, memory_order_release);
+    return;
+  }
+
   struct lfjour_block *block = PAGE_HEAD(r->cur);
   u32 end = atomic_load_explicit(&block->end, memory_order_acquire);
 
@@ -210,9 +241,10 @@ void lfjour_release(struct lfjour_recipient *r)
   atomic_store_explicit(&r->last, r->cur, memory_order_release);
 
   /* The last block may be available to free */
-  if (pos + 1 == end)
+  if ((pos + 1 == end) || last && (last_block != block))
     lfjour_schedule_cleanup(j);
 
+  r->first_holding_seq = 0;
   r->cur = NULL;
 }
 
@@ -276,7 +308,7 @@ lfjour_unregister(struct lfjour_recipient *r)
   ASSERT_DIE(!j->domain || DG_IS_LOCKED(j->domain));
 
   if (r->cur)
-    lfjour_release(r);
+    lfjour_release(r, r->cur);
 
   lfjour_recipient_rem_node(&j->recipients, r);
   lfjour_schedule_cleanup(j);
@@ -297,7 +329,7 @@ lfjour_cleanup_hook(void *_j)
   if (_locked) DG_LOCK(_locked);
 
   u64 min_seq = ~((u64) 0);
-  struct lfjour_item *last_item_to_free = NULL;
+  const struct lfjour_item *last_item_to_free = NULL;
   struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
 
   if (!first)
@@ -310,7 +342,7 @@ lfjour_cleanup_hook(void *_j)
 
   WALK_TLIST(lfjour_recipient, r, &j->recipients)
   {
-    struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
+    const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
 
     if (!last)
       /* No last export means that the channel has exported nothing since last cleanup */
@@ -333,7 +365,7 @@ lfjour_cleanup_hook(void *_j)
 
   WALK_TLIST(lfjour_recipient, r, &j->recipients)
   {
-    struct lfjour_item *last = last_item_to_free;
+    const struct lfjour_item *last = last_item_to_free;
     /* This either succeeds if this item is the most-behind-one,
      * or fails and gives us the actual last for debug output. */
     if (atomic_compare_exchange_strong_explicit(
index 0553aac124e67ad8cfb62a403f3dec2c816ee498..f99704b366b26b6d1e555398bcb34526951442ae 100644 (file)
@@ -211,7 +211,8 @@ struct lfjour_recipient {
   TLIST_DEFAULT_NODE;
   event *event;                                        /* Event running when something is in the journal */
   event_list *target;                          /* Event target */
-  struct lfjour_item * _Atomic last;           /* Last item processed */
+  const struct lfjour_item * _Atomic last;     /* Last item processed */
+  u64 first_holding_seq;                       /* First item not released yet */
   struct lfjour_item *cur;                     /* Processing this now */
   _Atomic u64 recipient_flags;                 /* LFJOUR_R_* */
 };
@@ -248,7 +249,7 @@ struct lfjour_item *lfjour_push_prepare(struct lfjour *);
 void lfjour_push_commit(struct lfjour *);
 
 struct lfjour_item *lfjour_get(struct lfjour_recipient *);
-void lfjour_release(struct lfjour_recipient *);
+void lfjour_release(struct lfjour_recipient *, const struct lfjour_item *);
 static inline _Bool lfjour_reset_seqno(struct lfjour_recipient *r)
 {
   return atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_SEQ_RESET, memory_order_acq_rel) & LFJOUR_R_SEQ_RESET;
index f74e8063bd3bcd90e97cf5dadfb7a5c9500cfbf2..85344c04bc853352f881348300b9f4ff93bf4a1d 100644 (file)
@@ -394,17 +394,23 @@ struct roa_subscription {
   void (*refeed_hook)(struct channel *, struct rt_feeding_request *);
   struct lfjour_recipient digest_recipient;
   event update_event;
-  struct rt_feeding_request rfr;
+};
+
+struct roa_reload_request {
+  struct rt_feeding_request req;
+  struct roa_subscription *s;
+  struct lfjour_item *item;
 };
 
 static void
 channel_roa_reload_done(struct rt_feeding_request *req)
 {
-  SKIP_BACK_DECLARE(struct roa_subscription, s, rfr, req);
-  ASSERT_DIE(s->c->channel_state == CS_UP);
+  SKIP_BACK_DECLARE(struct roa_reload_request, rrr, req, req);
+  ASSERT_DIE(rrr->s->c->channel_state == CS_UP);
 
-  lfjour_release(&s->digest_recipient);
-  ev_send(proto_work_list(s->c->proto), &s->update_event);
+  lfjour_release(&rrr->s->digest_recipient, rrr->item);
+  ev_send(proto_work_list(rrr->s->c->proto), &rrr->s->update_event);
+  mb_free(rrr);
   /* FIXME: this should reset import/export filters if ACTION BLOCK */
 }
 
@@ -413,22 +419,24 @@ channel_roa_changed(void *_s)
 {
   struct roa_subscription *s = _s;
 
-  if (s->digest_recipient.cur)
-    return;
-
-  if (!lfjour_get(&s->digest_recipient))
-    return;
-
-  SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur);
-  s->rfr = (struct rt_feeding_request) {
-    .prefilter = {
-      .mode = TE_ADDR_TRIE,
-      .trie = rd->trie,
-    },
-    .done = channel_roa_reload_done,
-  };
+  for (struct lfjour_item *it; it = lfjour_get(&s->digest_recipient); )
+  {
+    SKIP_BACK_DECLARE(struct rt_digest, rd, li, s->digest_recipient.cur);
+    struct roa_reload_request *rrr = mb_alloc(s->c->proto->pool, sizeof *rrr);
+    *rrr = (struct roa_reload_request) {
+      .req = {
+       .prefilter = {
+         .mode = TE_ADDR_TRIE,
+         .trie = rd->trie,
+       },
+       .done = channel_roa_reload_done,
+      },
+      .s = s,
+      .item = it,
+    };
 
-  s->refeed_hook(s->c, &s->rfr);
+    s->refeed_hook(s->c, &rrr->req);
+  }
 }
 
 static inline void (*channel_roa_reload_hook(int dir))(struct channel *, struct rt_feeding_request *)
@@ -572,6 +580,8 @@ channel_start_import(struct channel *c)
   channel_reset_limit(c, &c->rx_limit, PLD_RX);
   channel_reset_limit(c, &c->in_limit, PLD_IN);
 
+  bmap_init(&c->imported_map, c->proto->pool, 16);
+
   memset(&c->import_stats, 0, sizeof(struct channel_import_stats));
 
   DBG("%s.%s: Channel start import req=%p\n", c->proto->name, c->name, &c->in_req);
@@ -694,9 +704,24 @@ channel_import_stopped(struct rt_import_request *req)
   mb_free(c->in_req.name);
   c->in_req.name = NULL;
 
+  bmap_free(&c->imported_map);
+
   channel_check_stopped(c);
 }
 
+static u32
+channel_reimport_next_feed_index(struct rt_export_feeder *f, u32 try_this)
+{
+  SKIP_BACK_DECLARE(struct channel, c, reimporter, f);
+  while (!bmap_test(&c->imported_map, try_this))
+    if (!(try_this & (try_this - 1))) /* return every power of two to check for maximum */
+      return try_this;
+    else
+      try_this++;
+
+  return try_this;
+}
+
 static void
 channel_do_reload(void *_c)
 {
@@ -704,6 +729,7 @@ channel_do_reload(void *_c)
 
   RT_FEED_WALK(&c->reimporter, f)
   {
+    _Bool seen = 0;
     for (uint i = 0; i < f->count_routes; i++)
     {
       rte *r = &f->block[i];
@@ -721,9 +747,14 @@ channel_do_reload(void *_c)
 
        /* And reload the route */
        rte_update(c, r->net, &new, new.src);
+
+       seen = 1;
       }
     }
 
+    if (!seen)
+      bmap_clear(&c->imported_map, f->ni->index);
+
     /* Local data needed no more */
     tmp_flush();
 
@@ -739,6 +770,7 @@ channel_setup_in_table(struct channel *c)
   c->reimporter = (struct rt_export_feeder) {
     .name = mb_sprintf(c->proto->pool, "%s.%s.reimport", c->proto->name, c->name),
     .trace_routes = c->debug,
+    .next_feed_index = channel_reimport_next_feed_index,
   };
   c->reimport_event = (event) {
     .hook = channel_do_reload,
index ad43e9d97108ee599b3aff29e877c2af7d1173c5..bbb76a8a10ca5f8b98a8e496b468faebb653b2b7 100644 (file)
@@ -533,6 +533,7 @@ struct channel {
   const struct filter *in_filter;      /* Input filter */
   const struct filter *out_filter;     /* Output filter */
   const net_addr *out_subprefix;       /* Export only subprefixes of this net */
+  struct bmap imported_map;            /* Which nets were touched by our import */
   struct bmap export_accepted_map;     /* Keeps track which routes were really exported */
   struct bmap export_rejected_map;     /* Keeps track which routes were rejected by export filter */
 
index e64c8d8b664231a815efd8ee69791434584e964a..c640a8a210e6b2813d78d4e2539f8111bcf7029f 100644 (file)
@@ -158,6 +158,7 @@ struct rt_export_request {
 
     /* Feeding itself */
     u32 feed_index;                            /* Index of the feed in progress */
+    u32 (*next_feed_index)(struct rt_export_feeder *, u32 try_this);
     struct rt_feeding_request {
       struct rt_feeding_request *next;         /* Next in request chain */
       void (*done)(struct rt_feeding_request *);/* Called when this refeed finishes */
index 1dd536a5c80be55f81eafbb6917bb0498ffff3f1..b991b975032c685222bb00a1e95db28f44f32d59 100644 (file)
@@ -52,7 +52,7 @@ rt_export_get(struct rt_export_request *r)
 } while (0)
 
 #define NOT_THIS_UPDATE        \
-  lfjour_release(&r->r); \
+  lfjour_release(&r->r, &update->li); \
   continue;
 
   while (1)
@@ -200,7 +200,7 @@ rt_export_release(const struct rt_export_union *u)
 
     case RT_EXPORT_UPDATE:
       rtex_trace(r, D_ROUTES, "Export %lu released", u->update->seq);
-      lfjour_release(&r->r);
+      lfjour_release(&r->r, &u->update->li);
 
       break;
 
@@ -272,16 +272,18 @@ rt_export_get_next_feed(struct rt_export_feeder *f, struct rcu_unwinder *u)
       return NULL;
     }
 
+#define NEXT_INDEX(f) f->feed_index = f->next_feed_index ? f->next_feed_index(f, f->feed_index + 1) : f->feed_index + 1
+
 #define NOT_THIS_FEED(...) {           \
   rtex_trace(f, D_ROUTES, __VA_ARGS__);        \
-  f->feed_index++;                     \
+  NEXT_INDEX(f);                       \
   continue;                            \
 }
 
     if (!feed)
       NOT_THIS_FEED("Nothing found for index %u", f->feed_index);
 
-    f->feed_index++;
+    NEXT_INDEX(f);
     return feed;
   }
 
@@ -319,18 +321,19 @@ rt_export_next_feed(struct rt_export_feeder *f)
 
   f->feed_index = 0;
 
-  if (f->feed_pending)
-  {
-    rtex_trace(f, D_STATES, "Feeding done, refeed request pending");
-    f->feeding = f->feed_pending;
-    f->feed_pending = NULL;
-    return rt_export_next_feed(f);
-  }
-  else
-  {
-    rtex_trace(f, D_STATES, "Feeding done (%u)", f->feed_index);
+  uint count = 0;
+  for (struct rt_feeding_request *rfr = f->feed_pending; rfr; rfr = rfr->next)
+    count++;
+
+  rtex_trace(f, D_STATES, "Feeding done, %u refeed request%s pending",
+      count, (count == 1) ? "" : "s");
+
+  if (!f->feed_pending)
     return NULL;
-  }
+
+  f->feeding = f->feed_pending;
+  f->feed_pending = NULL;
+  return rt_export_next_feed(f);
 }
 
 static void
index 276f7b4705ba11a92690da4db1385dc488612b78..4a27bc3a2c5dcc394f2e412c4b255bc96317ac79 100644 (file)
@@ -477,7 +477,7 @@ rt_aggregate_roa(void *_rag)
 {
   struct rt_roa_aggregator *rag = _rag;
 
-  RT_EXPORT_WALK(&rag->src, u)
+  RT_EXPORT_WALK(&rag->src, u) TMP_SAVED
   {
     const net_addr *nroa = NULL;
     struct rte_src *src = NULL;
@@ -529,7 +529,7 @@ rt_aggregate_roa(void *_rag)
       SKIP_BACK_DECLARE(struct rt_roa_aggregated_adata, rad, ad, ea->u.ptr);
 
       count = ROA_AGGR_COUNT(rad);
-      rad_new = alloca(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]);
+      rad_new = tmp_alloc(sizeof *rad_new + (count + 1) * sizeof rad_new->u[0]);
 
       /* Insertion into a sorted list */
       uint p = 0;
@@ -559,7 +559,7 @@ rt_aggregate_roa(void *_rag)
     else if (src)
     {
       count = 1;
-      rad_new = alloca(sizeof *rad_new + sizeof rad_new->u[0]);
+      rad_new = tmp_alloc(sizeof *rad_new + sizeof rad_new->u[0]);
       rad_new->u[0].asn = asn;
       rad_new->u[0].max_pxlen = max_pxlen;
     }
@@ -1988,6 +1988,9 @@ channel_preimport(struct rt_import_request *req, rte *new, const rte *old)
 
   mpls_rte_preimport(new_in ? new : NULL, old_in ? old : NULL);
 
+  if (new)
+    bmap_set(&c->imported_map, NET_TO_INDEX(new->net)->index);
+
   return verdict;
 }