log("Aggregation is already finished");
}
+static void
+aggregate_on_feed_end(struct channel *C)
+{
+ struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto);
+
+ if (C == p->src)
+ {
+ run_aggregation(p);
+ flush_trie(p);
+ p->root = NULL;
+
+ if (p->first_run)
+ p->first_run = 0;
+ }
+}
+
/*
* Set static attribute in @rta from static attribute in @old according to @sa.
*/
#define AGGR_DATA_MEMSIZE (sizeof(struct f_val) * p->aggr_on_count)
+static void trie_init(struct aggregator_proto *p);
+
static void
aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new, rte *old)
{
if (!p->root)
trie_init(p);
+ channel_request_feeding(p->src);
+
/* Find the objects for the old route */
if (old)
old_route = HASH_FIND(p->routes, AGGR_RTE, old);
P->rt_notify = aggregator_rt_notify;
P->preexport = aggregator_preexport;
+ P->feed_end = aggregate_on_feed_end;
return P;
}
};
p->trie_pool = lp_new(P->pool);
+ p->first_run = 1;
p->aggr_done = 0;
settle_init(&p->aggr_timer, &p->aggr_timer_cf, aggregate_on_settle_timer, p);