From c103b51fcae0181595b49cdcda4544cfe0603f55 Mon Sep 17 00:00:00 2001 From: Maria Matejka Date: Tue, 31 Oct 2023 11:37:54 +0100 Subject: [PATCH] Aggregator: Expressed most of the attribute logic in filter language --- proto/aggregator/aggregator.c | 398 +++------------------------------- proto/aggregator/aggregator.h | 32 +-- proto/aggregator/config.Y | 124 ++++++----- proto/aggregator/test.conf | 2 +- 4 files changed, 102 insertions(+), 454 deletions(-) diff --git a/proto/aggregator/aggregator.c b/proto/aggregator/aggregator.c index 944d2f099..03d7bbf9b 100644 --- a/proto/aggregator/aggregator.c +++ b/proto/aggregator/aggregator.c @@ -46,85 +46,6 @@ #include "aggregator.h" #include -/* -#include "nest/route.h" -#include "nest/iface.h" -#include "lib/resource.h" -#include "lib/event.h" -#include "lib/timer.h" -#include "lib/string.h" -#include "conf/conf.h" -#include "filter/filter.h" -#include "filter/data.h" -#include "lib/hash.h" -#include "lib/string.h" -#include "lib/alloca.h" -#include "lib/flowspec.h" -*/ - -/* - * Set static attribute in @rta from static attribute in @old according to @sa. - */ -static void -rta_set_static_attr(struct rta *rta, const struct rta *old, struct f_static_attr sa) -{ - switch (sa.sa_code) - { - case SA_NET: - break; - - case SA_FROM: - rta->from = old->from; - break; - - case SA_GW: - rta->dest = RTD_UNICAST; - rta->nh.gw = old->nh.gw; - rta->nh.iface = old->nh.iface; - rta->nh.next = NULL; - rta->hostentry = NULL; - rta->nh.labels = 0; - break; - - case SA_SCOPE: - rta->scope = old->scope; - break; - - case SA_DEST: - rta->dest = old->dest; - rta->nh.gw = IPA_NONE; - rta->nh.iface = NULL; - rta->nh.next = NULL; - rta->hostentry = NULL; - rta->nh.labels = 0; - break; - - case SA_IFNAME: - rta->dest = RTD_UNICAST; - rta->nh.gw = IPA_NONE; - rta->nh.iface = old->nh.iface; - rta->nh.next = NULL; - rta->hostentry = NULL; - rta->nh.labels = 0; - break; - - case SA_GW_MPLS: - rta->nh.labels = old->nh.labels; - memcpy(&rta->nh.label, &old->nh.label, sizeof(u32) * old->nh.labels); - break; - - case SA_WEIGHT: - rta->nh.weight = old->nh.weight; - break; - - case SA_PREF: - rta->pref = old->pref; - break; - - default: - bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code); - } -} /* * Compare list of &f_val entries. @@ -158,35 +79,23 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b return; } - /* Allocate RTA and EA list */ + /* Store TMP linpool state */ + struct lp_state tmp_state; + lp_save(tmp_linpool, &tmp_state); + + /* Allocate RTA */ struct rta *rta = allocz(rta_size(bucket->rte->attrs)); rta->dest = RTD_UNREACHABLE; rta->source = RTS_AGGREGATED; rta->scope = SCOPE_UNIVERSE; - struct ea_list *eal = allocz(sizeof(struct ea_list) + sizeof(struct eattr) * p->aggr_on_da_count); - eal->next = NULL; - eal->count = 0; - rta->eattrs = eal; - - /* Seed the attributes from aggregator rule */ - for (uint i = 0; i < p->aggr_on_count; i++) - { - if (p->aggr_on[i].type == AGGR_ITEM_DYNAMIC_ATTR) - { - u32 ea_code = p->aggr_on[i].da.ea_code; - const struct eattr *e = ea_find(bucket->rte->attrs->eattrs, ea_code); - - if (e) - eal->attrs[eal->count++] = *e; - } - else if (p->aggr_on[i].type == AGGR_ITEM_STATIC_ATTR) - rta_set_static_attr(rta, bucket->rte->attrs, p->aggr_on[i].sa); - } - + /* Allocate route */ struct rte *new = rte_get_temp(rta, bucket->rte->src); new->net = net; + /* Seed the attributes from aggregator rule */ + f_eval_rte(p->premerge, &new, tmp_linpool, p->aggr_on_count, bucket->aggr_data, 0, NULL); + /* log("=============== CREATE MERGED ROUTE ==============="); log("New route created: id = %d, protocol: %s", new->src->global_id, new->src->proto->name); @@ -199,11 +108,8 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b .val.rte = bucket->rte, }; - struct lp_state tmp_state; - lp_save(tmp_linpool, &tmp_state); - - /* Actually run the filter */ - enum filter_return fret = f_eval_rte(p->merge_by, &new, tmp_linpool, 1, &val, 0); + /* Actually run the merge rule */ + enum filter_return fret = f_eval_rte(p->merge_by, &new, tmp_linpool, 1, &val, 0, NULL); /* Src must be stored now, rte_update2() may return new */ struct rte_src *new_src = new ? new->src : NULL; @@ -257,136 +163,6 @@ aggregator_reload_buckets(void *data) HASH_WALK_END; } - -/* - * Evaluate static attribute of @rt1 according to @sa - * and store result in @pos. - */ -static void -eval_static_attr(const struct rte *rt1, struct f_static_attr sa, struct f_val *pos) -{ - const struct rta *rta = rt1->attrs; - -#define RESULT(_type, value, result) \ - do { \ - pos->type = _type; \ - pos->val.value = result; \ - } while (0) - - switch (sa.sa_code) - { - case SA_NET: RESULT(sa.f_type, net, rt1->net->n.addr); break; - case SA_FROM: RESULT(sa.f_type, ip, rta->from); break; - case SA_GW: RESULT(sa.f_type, ip, rta->nh.gw); break; - case SA_PROTO: RESULT(sa.f_type, s, rt1->src->proto->name); break; - case SA_SOURCE: RESULT(sa.f_type, i, rta->source); break; - case SA_SCOPE: RESULT(sa.f_type, i, rta->scope); break; - case SA_DEST: RESULT(sa.f_type, i, rta->dest); break; - case SA_IFNAME: RESULT(sa.f_type, s, rta->nh.iface ? rta->nh.iface->name : ""); break; - case SA_IFINDEX: RESULT(sa.f_type, i, rta->nh.iface ? rta->nh.iface->index : 0); break; - case SA_WEIGHT: RESULT(sa.f_type, i, rta->nh.weight + 1); break; - case SA_PREF: RESULT(sa.f_type, i, rta->pref); break; - case SA_GW_MPLS: RESULT(sa.f_type, i, rta->nh.labels ? rta->nh.label[0] : MPLS_NULL); break; - default: - bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code); - } - -#undef RESULT -} - -/* - * Evaluate dynamic attribute of @rt1 according to @da - * and store result in @pos. - */ -static void -eval_dynamic_attr(const struct rte *rt1, struct f_dynamic_attr da, struct f_val *pos) -{ - const struct rta *rta = rt1->attrs; - const struct eattr *e = ea_find(rta->eattrs, da.ea_code); - -#define RESULT(_type, value, result) \ - do { \ - pos->type = _type; \ - pos->val.value = result; \ - } while (0) - -#define RESULT_VOID \ - do { \ - pos->type = T_VOID; \ - } while (0) - - if (!e) - { - /* A special case: undefined as_path looks like empty as_path */ - if (da.type == EAF_TYPE_AS_PATH) - { - RESULT(T_PATH, ad, &null_adata); - return; - } - - /* The same special case for int_set */ - if (da.type == EAF_TYPE_INT_SET) - { - RESULT(T_CLIST, ad, &null_adata); - return; - } - - /* The same special case for ec_set */ - if (da.type == EAF_TYPE_EC_SET) - { - RESULT(T_ECLIST, ad, &null_adata); - return; - } - - /* The same special case for lc_set */ - if (da.type == EAF_TYPE_LC_SET) - { - RESULT(T_LCLIST, ad, &null_adata); - return; - } - - /* Undefined value */ - RESULT_VOID; - return; - } - - switch (e->type & EAF_TYPE_MASK) - { - case EAF_TYPE_INT: - RESULT(da.f_type, i, e->u.data); - break; - case EAF_TYPE_ROUTER_ID: - RESULT(T_QUAD, i, e->u.data); - break; - case EAF_TYPE_OPAQUE: - RESULT(T_ENUM_EMPTY, i, 0); - break; - case EAF_TYPE_IP_ADDRESS: - RESULT(T_IP, ip, *((ip_addr *) e->u.ptr->data)); - break; - case EAF_TYPE_AS_PATH: - RESULT(T_PATH, ad, e->u.ptr); - break; - case EAF_TYPE_BITFIELD: - RESULT(T_BOOL, i, !!(e->u.data & (1u << da.bit))); - break; - case EAF_TYPE_INT_SET: - RESULT(T_CLIST, ad, e->u.ptr); - break; - case EAF_TYPE_EC_SET: - RESULT(T_ECLIST, ad, e->u.ptr); - break; - case EAF_TYPE_LC_SET: - RESULT(T_LCLIST, ad, e->u.ptr); - break; - default: - bug("Unknown dynamic attribute type"); - } - -#undef RESULT -#undef RESULT_VOID -} - static inline u32 aggr_route_hash(const rte *e) { struct { @@ -453,125 +229,29 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new struct lp_state tmp_state; lp_save(tmp_linpool, &tmp_state); - for (uint val_idx = 0; val_idx < p->aggr_on_count; val_idx++) + struct rte *rt1 = new; + enum filter_return fret = f_eval_rte(p->aggr_on, &new, tmp_linpool, 0, NULL, p->aggr_on_count, tmp_bucket->aggr_data); + + if (rt1 != new) { - int type = p->aggr_on[val_idx].type; - struct f_val *pos = &tmp_bucket->aggr_data[val_idx]; + rte_free(rt1); + log(L_WARN "Aggregator rule modifies the route, reverting"); + } - switch (type) - { - case AGGR_ITEM_TERM: { - const struct f_line *line = p->aggr_on[val_idx].line; - struct rte *rt1 = new; - enum filter_return fret = f_eval_rte(line, &new, tmp_linpool, 0, NULL, pos); - - if (rt1 != new) - { - rte_free(rt1); - log(L_WARN "Aggregator rule modifies the route, reverting"); - } - - if (fret > F_RETURN) - log(L_WARN "%s.%s: Wrong number of items left on stack after evaluation of aggregation list", rt1->src->proto->name, rt1->sender->name); - - switch (pos->type) { - case T_VOID: - case T_INT: - case T_BOOL: - case T_PAIR: - case T_QUAD: - case T_ENUM: - case T_IP: - case T_EC: - case T_LC: - case T_RD: - /* Fits, OK */ - break; - - default: - log(L_WARN "%s.%s: Expression evaluated to type %s unsupported by aggregator. Store this value as a custom attribute instead", new->src->proto->name, new->sender->name, f_type_name(pos->type)); - *pos = (struct f_val) { .type = T_INT, .val.i = 0 }; - } - - break; - } - - case AGGR_ITEM_STATIC_ATTR: { - eval_static_attr(new, p->aggr_on[val_idx].sa, pos); - break; - } - - case AGGR_ITEM_DYNAMIC_ATTR: { - eval_dynamic_attr(new, p->aggr_on[val_idx].da, pos); - break; - } - - default: - break; - } + /* Check filter return value */ + if (fret > F_RETURN) + { + sl_free(tmp_bucket); + lp_restore(tmp_linpool, &tmp_state); + + return; } /* Compute the hash */ u64 haux; mem_hash_init(&haux); for (uint i = 0; i < p->aggr_on_count; i++) - { - mem_hash_mix_num(&haux, tmp_bucket->aggr_data[i].type); - -#define MX(k) mem_hash_mix(&haux, &IT(k), sizeof IT(k)); -#define IT(k) tmp_bucket->aggr_data[i].val.k - - switch (tmp_bucket->aggr_data[i].type) - { - case T_VOID: - break; - case T_INT: - case T_BOOL: - case T_PAIR: - case T_QUAD: - case T_ENUM: - MX(i); - break; - case T_EC: - case T_RD: - MX(ec); - break; - case T_LC: - MX(lc); - break; - case T_IP: - MX(ip); - break; - case T_NET: - mem_hash_mix_num(&haux, net_hash(IT(net))); - break; - case T_STRING: - mem_hash_mix_str(&haux, IT(s)); - break; - case T_PATH_MASK: - mem_hash_mix(&haux, IT(path_mask), sizeof(*IT(path_mask)) + IT(path_mask)->len * sizeof (IT(path_mask)->item)); - break; - case T_PATH: - case T_CLIST: - case T_ECLIST: - case T_LCLIST: - case T_BYTESTRING: - mem_hash_mix(&haux, IT(ad)->data, IT(ad)->length); - break; - case T_NONE: - case T_PATH_MASK_ITEM: - case T_ROUTE: - case T_ROUTES_BLOCK: - bug("Invalid type %s in hashing", f_type_name(tmp_bucket->aggr_data[i].type)); - case T_SET: - MX(t); - break; - case T_PREFIX_SET: - MX(ti); - break; - } - } - + mem_hash_mix_f_val(&haux, &tmp_bucket->aggr_data[i]); tmp_bucket->hash = mem_hash_value(&haux); /* Find the existing bucket */ @@ -686,8 +366,8 @@ aggregator_init(struct proto_config *CF) proto_configure_channel(P, &p->dst, cf->dst); p->aggr_on_count = cf->aggr_on_count; - p->aggr_on_da_count = cf->aggr_on_da_count; p->aggr_on = cf->aggr_on; + p->premerge = cf->premerge; p->merge_by = cf->merge_by; P->rt_notify = aggregator_rt_notify; @@ -753,34 +433,16 @@ aggregator_reconfigure(struct proto *P, struct proto_config *CF) if (cf->aggr_on_count != p->aggr_on_count) return 0; - if (cf->aggr_on_da_count != p->aggr_on_da_count) - return 0; - /* Compare aggregator rule */ - for (uint i = 0; i < p->aggr_on_count; i++) - switch (cf->aggr_on[i].type) - { - case AGGR_ITEM_TERM: - if (!f_same(cf->aggr_on[i].line, p->aggr_on[i].line)) - return 0; - break; - case AGGR_ITEM_STATIC_ATTR: - if (memcmp(&cf->aggr_on[i].sa, &p->aggr_on[i].sa, sizeof(struct f_static_attr)) != 0) - return 0; - break; - case AGGR_ITEM_DYNAMIC_ATTR: - if (memcmp(&cf->aggr_on[i].da, &p->aggr_on[i].da, sizeof(struct f_dynamic_attr)) != 0) - return 0; - break; - default: - bug("Broken aggregator rule"); - } + if (!f_same(cf->aggr_on, p->aggr_on) || !f_same(cf->premerge, p->premerge)) + return 0; /* Compare merge filter */ if (!f_same(cf->merge_by, p->merge_by)) ev_schedule(&p->reload_buckets); p->aggr_on = cf->aggr_on; + p->premerge = cf->premerge; p->merge_by = cf->merge_by; return 1; diff --git a/proto/aggregator/aggregator.h b/proto/aggregator/aggregator.h index 19459b1d0..7155b19b6 100644 --- a/proto/aggregator/aggregator.h +++ b/proto/aggregator/aggregator.h @@ -20,10 +20,11 @@ struct aggregator_config { struct proto_config c; struct channel_config *src, *dst; - uint aggr_on_count; - uint aggr_on_da_count; - struct aggr_item *aggr_on; + const struct f_line *aggr_on; + const struct f_line *premerge; const struct f_line *merge_by; + uint aggr_on_count; + u8 aggr_on_net; }; struct aggregator_route { @@ -54,33 +55,14 @@ struct aggregator_proto { slab *route_slab; /* Aggregator rule */ + const struct f_line *aggr_on; uint aggr_on_count; - uint aggr_on_da_count; - struct aggr_item *aggr_on; + u8 aggr_on_net; /* Merge filter */ + const struct f_line *premerge; const struct f_line *merge_by; event reload_buckets; }; -enum aggr_item_type { - AGGR_ITEM_TERM, - AGGR_ITEM_STATIC_ATTR, - AGGR_ITEM_DYNAMIC_ATTR, -}; - -struct aggr_item { - enum aggr_item_type type; - union { - struct f_static_attr sa; - struct f_dynamic_attr da; - const struct f_line *line; - }; -}; - -struct aggr_item_node { - const struct aggr_item_node *next; - struct aggr_item i; -}; - #endif diff --git a/proto/aggregator/config.Y b/proto/aggregator/config.Y index d7d9ae53e..ba4eb9c45 100644 --- a/proto/aggregator/config.Y +++ b/proto/aggregator/config.Y @@ -17,12 +17,11 @@ CF_DEFINES #define AGGREGATOR_CFG ((struct aggregator_config *) this_proto) #define AGGR_ITEM_ALLOC ((struct aggr_item_node *) cfg_allocz(sizeof(struct aggr_item_node))) - CF_DECLS CF_KEYWORDS(AGGREGATOR, AGGREGATE, ON, MERGE, BY) -%type aggr_item aggr_list +%type aggr_item aggr_list CF_GRAMMAR @@ -41,42 +40,31 @@ aggregator_proto_item: proto_item | channel_item_ | PEER TABLE rtable { AGGREGATOR_CFG->dst->table = $3; } - | AGGREGATE ON aggr_list { + | AGGREGATE ON { if (AGGREGATOR_CFG->aggr_on) cf_error("Only one aggregate on clause allowed"); - _Bool net_present = 0; - int count = 0; - - for (const struct aggr_item_node *item = $3; item; item = item->next) { -// log(L_WARN "type %d sacode %d", item->i.type, item->i.sa.sa_code); - if (item->i.type == AGGR_ITEM_STATIC_ATTR && item->i.sa.sa_code == SA_NET) - net_present = 1; - - count++; - } + cf_push_block_scope(new_config); + } aggr_list { + int count = new_config->current_scope->slots; + cf_pop_block_scope(new_config); - if (!net_present) - cf_error("'NET' must be present"); + if (!AGGREGATOR_CFG->aggr_on_net) + cf_error("aggregate on must be always include 'net'."); - AGGREGATOR_CFG->aggr_on = cfg_alloc(sizeof(struct aggr_item) * count); - - int pos = 0; - for (const struct aggr_item_node *item = $3; item; item = item->next) { - if (item->i.type == AGGR_ITEM_DYNAMIC_ATTR) - AGGREGATOR_CFG->aggr_on_da_count++; - - AGGREGATOR_CFG->aggr_on[pos++] = item->i; - } + AGGREGATOR_CFG->aggr_on_count = count; + AGGREGATOR_CFG->aggr_on = f_linearize($4.begin, count); - AGGREGATOR_CFG->aggr_on_count = pos; + struct f_line *premerge = f_linearize($4.end, 0); + premerge->args = count; + AGGREGATOR_CFG->premerge = premerge; } | MERGE BY { cf_push_block_scope(new_config); f_predefined_variable(new_config, "routes", T_ROUTES_BLOCK); } function_body { cf_pop_block_scope(new_config); - $4->args++; /* The predefined "routes" variable */ + $4->args++; AGGREGATOR_CFG->merge_by = $4; } ; @@ -88,45 +76,61 @@ aggregator_proto: aggregator_proto_start proto_name '{' aggregator_proto_opts '} aggr_list: aggr_item | aggr_list ',' aggr_item { - if ($3 == NULL) { - $$ = $1; - } else { - $$ = $3; - $$->next = $1; - } - } + if ($$.begin = $3.begin) + $$.begin->next = $1.begin; + else + $$.begin = $1.begin; + + if ($$.end = $3.end) + $$.end->next = $1.end; + else + $$.end = $1.end; + } ; aggr_item: '(' term ')' { - $$ = AGGR_ITEM_ALLOC; - $$->i.type = AGGR_ITEM_TERM; - $$->i.line = f_linearize($2, 1); - } - | CF_SYM_KNOWN { - switch ($1->class) { - case SYM_ATTRIBUTE: - $$ = AGGR_ITEM_ALLOC; - $$->i.type = AGGR_ITEM_DYNAMIC_ATTR; - $$->i.da = *$1->attribute; - break; - case SYM_CONSTANT_RANGE: - $$ = NULL; - break; - default: - cf_error("Can't aggregate on symbol type %s.", cf_symbol_class_name($1)); - } - } - | dynamic_attr { - $$ = AGGR_ITEM_ALLOC; - $$->i.type = AGGR_ITEM_DYNAMIC_ATTR; - $$->i.da = $1; - } - | static_attr { - $$ = AGGR_ITEM_ALLOC; - $$->i.type = AGGR_ITEM_STATIC_ATTR; - $$->i.sa = $1; + switch ($2->type) { + case T_INT: + case T_BOOL: + case T_PAIR: + case T_QUAD: + case T_ENUM: + case T_IP: + case T_EC: + case T_LC: + case T_RD: + /* Fits, OK */ + break; + + default: + cf_error("Expression evaluated to type %s unsupported by aggregator. Store this value as a custom attribute instead", f_type_name($2->type)); + } + + $$.begin = $2; + $$.end = NULL; + f_new_var(new_config->current_scope); + } + | lvalue { + $$.begin = f_lval_getter(&$1); + int vari = f_new_var(new_config->current_scope); + + if ($1.type == F_LVAL_SA && $1.sa.sa_code == SA_NET) + AGGREGATOR_CFG->aggr_on_net = 1; + if (($1.type == F_LVAL_CONSTANT) || + ($1.type == F_LVAL_SA && $1.sa.readonly)) + $$.end = NULL; + else + { + char varname[12]; + bsnprintf(varname, 12, "!aggr%d", vari); + $$.end = f_lval_setter(&$1, + f_new_inst(FI_VAR_GET, cf_define_symbol( + new_config, cf_get_symbol(new_config, varname), + SYM_VARIABLE | $$.begin->type, offset, vari + ))); } + } ; CF_CODE diff --git a/proto/aggregator/test.conf b/proto/aggregator/test.conf index e5e1e2672..ec7afc4fe 100644 --- a/proto/aggregator/test.conf +++ b/proto/aggregator/test.conf @@ -100,7 +100,7 @@ protocol aggregator { table master6; peer table agr_result; export all; - aggregate on net,(defined(bgp_med)); + aggregate on net,(defined(bgp_med)), (1 + 3 + 5 + 7), preference, dest; merge by { print "Merging all these: ", routes; bgp_med = 0; -- 2.47.2