]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Run aggregation on feed end from src channel and request feeding after receiving...
authorIgor Putovny <igor.putovny@nic.cz>
Tue, 11 Jun 2024 11:47:07 +0000 (13:47 +0200)
committerIgor Putovny <igor.putovny@nic.cz>
Tue, 11 Jun 2024 11:47:07 +0000 (13:47 +0200)
proto/aggregator/aggregator.c

index 8cba48b9509e7e41f6d3bfc95a6a239a5682d0fd..a46c78274ef225459481a27d220494800ec14e84 100644 (file)
@@ -921,6 +921,22 @@ aggregate_on_settle_timer(struct settle *s)
     log("Aggregation is already finished");
 }
 
+static void
+aggregate_on_feed_end(struct channel *C)
+{
+  struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto);
+
+  if (C == p->src)
+  {
+    run_aggregation(p);
+    flush_trie(p);
+    p->root = NULL;
+
+    if (p->first_run)
+      p->first_run = 0;
+  }
+}
+
 /*
  * Set static attribute in @rta from static attribute in @old according to @sa.
  */
@@ -1281,6 +1297,8 @@ HASH_DEFINE_REHASH_FN(AGGR_BUCK, struct aggregator_bucket);
 
 #define AGGR_DATA_MEMSIZE      (sizeof(struct f_val) * p->aggr_on_count)
 
+static void trie_init(struct aggregator_proto *p);
+
 static void
 aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new, rte *old)
 {
@@ -1296,6 +1314,8 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new
   if (!p->root)
     trie_init(p);
 
+  channel_request_feeding(p->src);
+
   /* Find the objects for the old route */
   if (old)
     old_route = HASH_FIND(p->routes, AGGR_RTE, old);
@@ -1539,6 +1559,7 @@ aggregator_init(struct proto_config *CF)
 
   P->rt_notify = aggregator_rt_notify;
   P->preexport = aggregator_preexport;
+  P->feed_end = aggregate_on_feed_end;
 
   return P;
 }
@@ -1628,6 +1649,7 @@ aggregator_start(struct proto *P)
   };
 
   p->trie_pool = lp_new(P->pool);
+  p->first_run = 1;
   p->aggr_done = 0;
 
   settle_init(&p->aggr_timer, &p->aggr_timer_cf, aggregate_on_settle_timer, p);