From 36c60e17f6e48ff224ee49094d3623944fc93d2e Mon Sep 17 00:00:00 2001 From: Igor Putovny Date: Fri, 21 Feb 2025 12:00:14 +0100 Subject: [PATCH] Aggregator: Support prefix aggregation (ready for code review) --- conf/conf.h | 1 + proto/aggregator/aggregator.c | 1585 +++++++++++++++++++++++++++++---- proto/aggregator/aggregator.h | 74 +- proto/aggregator/config.Y | 35 +- 4 files changed, 1530 insertions(+), 165 deletions(-) diff --git a/conf/conf.h b/conf/conf.h index 56bde0395..19a115b64 100644 --- a/conf/conf.h +++ b/conf/conf.h @@ -14,6 +14,7 @@ #include "lib/hash.h" #include "lib/resource.h" #include "lib/timer.h" +#include "lib/settle.h" /* Configuration structure */ struct config { diff --git a/proto/aggregator/aggregator.c b/proto/aggregator/aggregator.c index 66c60166f..960eb7e08 100644 --- a/proto/aggregator/aggregator.c +++ b/proto/aggregator/aggregator.c @@ -1,37 +1,92 @@ /* * BIRD Internet Routing Daemon -- Route aggregation * - * (c) 2023--2023 Igor Putovny - * (c) 2023 CZ.NIC, z.s.p.o. + * (c) 2023--2025 Igor Putovny + * (c) 2025 CZ.NIC, z.s.p.o. * * Can be freely distributed and used under the terms of the GNU GPL. */ /** - * DOC: Route aggregation + * DOC: Aggregator protocol * - * This is an implementation of route aggregation functionality. - * It enables user to specify a set of route attributes in the configuarion file - * and then, for a given destination (net), aggregate routes with the same - * values of these attributes into a single multi-path route. + * The purpose of the aggregator protocol is to aggregate routes based on + * user-specified set of route attributes. It can be used for aggregating + * routes for a given destination (net) or for aggregating prefixes. * - * Structure &channel contains pointer to aggregation list which is represented - * by &aggr_list_linearized. In rt_notify_aggregated(), attributes from this - * list are evaluated for every route of a given net and results are stored - * in &rte_val_list which contains pointer to this route and array of &f_val. - * Array of pointers to &rte_val_list entries is sorted using - * sort_rte_val_list(). For comparison of &f_val structures, val_compare() - * is used. Comparator function is written so that sorting is stable. If all - * attributes have the same values, routes are compared by their global IDs. + * Aggregation of routes for networks means that for each destination, routes + * with the same values of attributes will be aggregated into a single + * multi-path route. Aggregation is performed by inserting routes into a hash + * table based on values of their attributes and generating new routes from + * the routes in th same bucket. Buckets are represented by @aggregator_bucket, + * which contains linked list of @aggregator_route. * - * After sorting, &rte_val_list entries containing equivalent routes will be - * adjacent to each other. Function process_rte_list() iterates through these - * entries to identify sequences of equivalent routes. New route will be - * created for each such sequence, even if only from a single route. - * Only attributes from the aggreagation list will be set for the new route. - * New &rta is created and prepare_rta() is used to copy static and dynamic - * attributes to new &rta from &rta of the original route. New route is created - * by create_merged_rte() from new &rta and exported to the routing table. + * Aggregation of prefixes aggregates a given set of prefixes into another set + * of prefixes. It offers a reduction in number of prefixes without changing + * the routing semantics. + * + * Prefix aggregation implements the ORTC (Optimal Route Table Construction) + * algorithm. This algorithm uses a binary tree representation of the routing + * table. An edge from the parent node to its left child represents bit 0, and + * an edge from the parent node to its right child represents bit 1 as the + * prefix is traversed from the most to the least significant bit. Last node + * of every prefix contains pointer to @aggregator_bucket where the route for + * this prefix belongs. + * + * ORTC algorithm consists of three passes through the trie. + * + * The first pass adds new nodes to the trie so that every node has either two + * or zero children. During this pass, routing information is propagated to the + * leaves. + * + * The second pass finds the most prevalent buckets by pushing information from + * the leaves up towards the root. Each node is assigned a set of potential + * buckets. If there are any common buckets among the node's children, they + * are carried to the parent node. Otherwise, all of children's buckets are + * carried to the parent node. + * + * The third pass moves down the trie while deciding which prefixes will be + * exported to the FIB. The node inherits a bucket from the closest ancestor + * that has a bucket. If the inherited bucket is one of potential buckets of + * this node, then this node does not need a bucket and its prefix will not + * be in FIB. Otherwise the node does need a bucket and any of its potential + * buckets can be chosen. We always choose the bucket with the lowest ID, but + * other options are possible. + * + * The algorithm works with the assumption that there is a default route, that is, + * the null prefix at the root node has a bucket. + * + * Memory for the aggregator is allocated from three linpools: one for buckets, + * one for routes and one for trie used in prefix aggregation. Obviously, trie + * linpool is allocated only when aggregating prefixes. Linpools are flushed + * after prefix aggregation is finished, thus destroying all data structures + * used. + * + * Aggregator is capable of processing incremental updates. After receiving + * an update, which can be either announce or withdraw, corresponding node + * is found in the trie and its original bucket is updated. + * The trie now needs to be recomputed to reflect this update. We go from + * updated node upwards until we find its closest IN_FIB ancestor. + * This is the prefix node that covers an address space which is affected + * by received update. The whole subtree rooted at this node is deaggregated, + * which means deleting all information computed by aggregation algorithm. + * This is followed by second pass which propagates potential buckets from + * the leaves upwards. Merging of sets of potential buckets continues upwards + * until the node's set is not changed by this operation. Then, third pass is + * run from this node, finishing the aggregation. During third pass, any change + * in prefix FIB status is detected and based on this, new route is exported + * or is removed from the routing table, respectively. All new routes are + * exported immmediately, however, all routes to be withdrawed are pushed on + * the stack and are removed after recomputing the trie. + * + * From a practical point of view, our implementation differs a little bit from + * the algorithm as it was described in the original paper. + * During first pass, the trie is normalized by adding new nodes so that every + * node has either zero or two children. These nodes are used only for + * computing of node's set of potential buckets from the sets of its children. + * We can therefore afford to not add these nodes to save both time and memory. + * Another change is performing the work previously done in the first pass + * during second pass, saving one trie traversal. */ #undef LOCAL_DEBUG @@ -46,24 +101,1080 @@ #include "proto/aggregator/aggregator.h" #include -/* -#include "nest/route.h" -#include "nest/iface.h" -#include "lib/resource.h" -#include "lib/event.h" -#include "lib/timer.h" -#include "lib/string.h" -#include "conf/conf.h" -#include "filter/filter.h" -#include "filter/data.h" -#include "lib/hash.h" -#include "lib/string.h" -#include "lib/alloca.h" -#include "lib/flowspec.h" -*/ +#include extern linpool *rte_update_pool; +static const char *px_origin_str[] = { + [FILLER] = "filler", + [ORIGINAL] = "original", + [AGGREGATED] = "aggregated", +}; + +static const u32 ipa_shift[] = { + [NET_IP4] = IP6_MAX_PREFIX_LENGTH - IP4_MAX_PREFIX_LENGTH, + [NET_IP6] = 0, +}; + +static inline int +is_leaf(const struct trie_node *node) +{ + assert(node != NULL); + return !node->child[0] && !node->child[1]; +} + +/* + * Allocate new node in protocol linpool + */ +static inline struct trie_node * +create_new_node(linpool *trie_pool) +{ + struct trie_node *node = lp_allocz(trie_pool, sizeof(*node)); + return node; +} + +/* + * Unlink node from the trie by setting appropriate child of parent node to NULL + */ +static inline void +remove_node(struct trie_node *node) +{ + assert(node != NULL); + assert(node->child[0] == NULL && node->child[1] == NULL); + + if (!node->parent) + ; + else + { + if (node->parent->child[0] == node) + node->parent->child[0] = NULL; + else if (node->parent->child[1] == node) + node->parent->child[1] = NULL; + else + bug("Corrupted memory (node is not its parent's child)"); + } + + memset(node, 0, sizeof(*node)); +} + +/* + * Insert @bucket to the set of potential buckets in @node + */ +static inline void +node_add_potential_bucket(struct trie_node *node, const struct aggregator_bucket *bucket) +{ + assert(node->potential_buckets_count < MAX_POTENTIAL_BUCKETS_COUNT); + + /* + if (BIT32R_TEST(node->potential_buckets, bucket->id)) + return; + + BIT32R_SET(node->potential_buckets, bucket->id); + node->potential_buckets_count++; + */ + + /* + * If the bit is set, the result of TEST is 1 and is subtracted from + * the bucket count, decreasing it by one. + * Second statement has no effect since the bit is already set. + * Third statement increases count by one, returning it to its previous + * value. Nothing changed. + * + * If the bit is not set, the result of TEST is 0 and subtracting it + * from the total count doesn't change its value. + * Second statement sets the bit and third statement increases count by one. + * Bit is now set and the total count was increased by one. + */ + node->potential_buckets_count -= BIT32R_TEST(node->potential_buckets, bucket->id); + BIT32R_SET(node->potential_buckets, bucket->id); + node->potential_buckets_count++; +} + +/* + * Check if @bucket is one of potential buckets of @node + */ +static inline int +node_is_bucket_potential(const struct trie_node *node, const struct aggregator_bucket *bucket) +{ + assert(node != NULL); + assert(bucket != NULL); + + ASSERT_DIE(bucket->id < MAX_POTENTIAL_BUCKETS_COUNT); + return BIT32R_TEST(node->potential_buckets, bucket->id); +} + +/* + * Return pointer to bucket with ID @id. + * Protocol contains list of pointers to all buckets. Every pointer + * lies at position equal to bucket ID to enable fast lookup. + */ +static inline struct aggregator_bucket * +get_bucket_ptr(const struct aggregator_proto *p, u32 id) +{ + ASSERT_DIE(id < p->bucket_list_size); + ASSERT_DIE(p->bucket_list[id] != NULL); + ASSERT_DIE(p->bucket_list[id]->id == id); + return p->bucket_list[id]; +} + +/* + * Allocate unique ID for bucket + */ +static inline u32 +get_new_bucket_id(struct aggregator_proto *p) +{ + u32 id = hmap_first_zero(&p->bucket_id_map); + hmap_set(&p->bucket_id_map, id); + return id; +} + +/* + * Select bucket with the lowest ID from the set of node's potential buckets + */ +static inline struct aggregator_bucket * +select_lowest_id_bucket(const struct aggregator_proto *p, const struct trie_node *node) +{ + assert(p != NULL); + assert(node != NULL); + + for (u32 i = 0; i < MAX_POTENTIAL_BUCKETS_COUNT; i++) + { + if (BIT32R_TEST(node->potential_buckets, i)) + { + struct aggregator_bucket *bucket = get_bucket_ptr(p, i); + assert(bucket != NULL); + assert(bucket->id == i); + return bucket; + } + } + + bug("No potential buckets to choose from"); +} + +/* + * If sets of potential buckets in @left and @right have non-empty intersection, + * computed as bitwise AND, save it to the target bucket. Otherwise compute + * their union as bitwise OR. Return whether the set of potential buckets in the + * target node has changed. + */ +static int +merge_potential_buckets(struct trie_node *target, const struct trie_node *left, const struct trie_node *right) +{ + assert(target != NULL); + assert(left != NULL); + assert(right != NULL); + + int has_intersection = 0; + int has_changed = 0; + int buckets_count = 0; + + u32 old[ARRAY_SIZE(target->potential_buckets)] = { 0 }; + + for (int i = 0; i < POTENTIAL_BUCKETS_BITMAP_SIZE; i++) + { + /* Save current bitmap values */ + old[i] = target->potential_buckets[i]; + + /* Compute intersection */ + has_intersection |= !!(target->potential_buckets[i] = left->potential_buckets[i] & right->potential_buckets[i]); + buckets_count += u32_popcount(target->potential_buckets[i]); + + /* + * If old and new values are different, the result of their XOR will be + * non-zero, thus @changed will be set to non-zero - true, as well. + */ + has_changed |= !!(old[i] ^ target->potential_buckets[i]); + } + + /* Sets have an empty intersection, compute their union instead */ + if (!has_intersection) + { + buckets_count = 0; + has_changed = 0; + + for (int i = 0; i < POTENTIAL_BUCKETS_BITMAP_SIZE; i++) + { + target->potential_buckets[i] = left->potential_buckets[i] | right->potential_buckets[i]; + buckets_count += u32_popcount(target->potential_buckets[i]); + has_changed |= !!(old[i] ^ target->potential_buckets[i]); + } + } + + /* Update number of potential buckets */ + target->potential_buckets_count = buckets_count; + + return has_changed; +} + +/* + * Insert @bucket to the list of bucket pointers in @p to position @bucket-ID + */ +static void +agregator_insert_bucket(struct aggregator_proto *p, struct aggregator_bucket *bucket) +{ + assert(p != NULL); + assert(p->bucket_list != NULL); + assert(bucket != NULL); + + /* Bucket is already in the list */ + if (bucket->id < p->bucket_list_size && p->bucket_list[bucket->id]) + return; + + const size_t old_size = p->bucket_list_size; + + /* Reallocate if more space is needed */ + if (bucket->id >= p->bucket_list_size) + { + while (bucket->id >= p->bucket_list_size) + p->bucket_list_size *= 2; + + assert(old_size < p->bucket_list_size); + + p->bucket_list = mb_realloc(p->bucket_list, sizeof(p->bucket_list[0]) * p->bucket_list_size); + memset(&p->bucket_list[old_size], 0, sizeof(p->bucket_list[0]) * (p->bucket_list_size - old_size)); + } + + assert(bucket->id < p->bucket_list_size); + assert(p->bucket_list[bucket->id] == NULL); + + p->bucket_list[bucket->id] = bucket; + p->bucket_list_count++; + + assert(get_bucket_ptr(p, bucket->id) == bucket); +} + +/* + * Prepare to withdraw route for @prefix + */ +static void +aggregator_prepare_rte_withdrawal(struct aggregator_proto *p, ip_addr prefix, u32 pxlen, struct aggregator_bucket *bucket) +{ + assert(p != NULL); + assert(bucket != NULL); + + struct net_addr addr = { 0 }; + net_fill_ipa(&addr, prefix, pxlen); + + struct rte_withdrawal_item *node = lp_allocz(p->rte_withdrawal_pool, sizeof(*node)); + + *node = (struct rte_withdrawal_item) { + .next = p->rte_withdrawal_stack, + .bucket = bucket, + }; + + net_copy(&node->addr, &addr); + + p->rte_withdrawal_stack = node; + p->rte_withdrawal_count++; + + assert(p->rte_withdrawal_stack != NULL); +} + +/* + * Withdraw all routes that are on the stack + */ +static void +aggregator_withdraw_rte(struct aggregator_proto *p) +{ + if ((NET_IP4 == p->addr_type && p->rte_withdrawal_count > IP4_WITHDRAWAL_LIMIT) || + (NET_IP6 == p->addr_type && p->rte_withdrawal_count > IP6_WITHDRAWAL_LIMIT)) + log(L_WARN "This number of updates was not expected." + "They will be processed, but please, contact the developers."); + + struct rte_withdrawal_item *node = NULL; + + while (node = p->rte_withdrawal_stack) + { + rte_update2(p->dst, &node->addr, NULL, node->bucket->last_src); + p->rte_withdrawal_stack = node->next; + p->rte_withdrawal_count--; + } + + assert(p->rte_withdrawal_stack == NULL); + assert(p->rte_withdrawal_count == 0); + + lp_flush(p->rte_withdrawal_pool); +} + +static void aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *bucket, struct network *net); + +static void +create_route(struct aggregator_proto *p, ip_addr prefix, u32 pxlen, struct aggregator_bucket *bucket) +{ + struct net_addr addr = { 0 }; + net_fill_ipa(&addr, prefix, pxlen); + + struct network *n = allocz(sizeof(*n) + sizeof(struct net_addr)); + net_copy(n->n.addr, &addr); + + aggregator_bucket_update(p, bucket, n); +} + +static void +print_prefixes_helper(const struct trie_node *node, ip_addr *prefix, u32 pxlen, u32 type) +{ + assert(node != NULL); + assert(prefix != NULL); + + if (IN_FIB == node->status) + { + struct net_addr addr = { 0 }; + net_fill_ipa(&addr, *prefix, pxlen); + log("%N selected bucket: %u", &addr, node->selected_bucket->id); + } + + if (node->child[0]) + { + assert((u32)node->depth == pxlen); + ipa_clrbit(prefix, node->depth + ipa_shift[type]); + print_prefixes_helper(node->child[0], prefix, pxlen + 1, type); + } + + if (node->child[1]) + { + assert((u32)node->depth == pxlen); + ipa_setbit(prefix, node->depth + ipa_shift[type]); + print_prefixes_helper(node->child[1], prefix, pxlen + 1, type); + ipa_clrbit(prefix, node->depth + ipa_shift[type]); + } +} + +static void +print_prefixes(const struct trie_node *node, u32 type) +{ + assert(node != NULL); + + ip_addr prefix = (NET_IP4 == type) ? ipa_from_ip4(IP4_NONE) : ipa_from_ip6(IP6_NONE); + print_prefixes_helper(node, &prefix, 0, type); +} + +static void +dump_trie_helper(const struct aggregator_proto *p, const struct trie_node *node, ip_addr *prefix, u32 pxlen, struct buffer *buf) +{ + assert(p != NULL); + assert(node != NULL); + assert(prefix != NULL); + + memset(buf->start, 0, buf->pos - buf->start); + buf->pos = buf->start; + + struct net_addr addr = { 0 }; + net_fill_ipa(&addr, *prefix, pxlen); + + buffer_print(buf, "%*s%s%N ", 2 * node->depth, "", (IN_FIB == node->status) ? "@" : " ", &addr); + + if (node->original_bucket) + buffer_print(buf, "[%u] ", node->original_bucket->id); + else + buffer_print(buf, "[] "); + + buffer_print(buf, "{"); + + int j = 0; + + for (size_t i = 0; i < MAX_POTENTIAL_BUCKETS_COUNT; i++) + { + if (BIT32R_TEST(node->potential_buckets, i)) + { + buffer_print(buf, "%u", i); + j++; + + if (j < node->potential_buckets_count) + buffer_print(buf, ", "); + } + } + + buffer_print(buf, "}"); + + if (node->selected_bucket) + buffer_print(buf, " -> [[%u]]", node->selected_bucket->id); + + buffer_print(buf, " %p %s", node, px_origin_str[node->px_origin]); + log("%s", buf->start); + + if (node->child[0]) + { + assert((u32)node->depth == pxlen); + ipa_clrbit(prefix, node->depth + ipa_shift[p->addr_type]); + dump_trie_helper(p, node->child[0], prefix, pxlen + 1, buf); + } + + if (node->child[1]) + { + assert((u32)node->depth == pxlen); + ipa_setbit(prefix, node->depth + ipa_shift[p->addr_type]); + dump_trie_helper(p, node->child[1], prefix, pxlen + 1, buf); + ipa_clrbit(prefix, node->depth + ipa_shift[p->addr_type]); + } +} + +static void +dump_trie(const struct aggregator_proto *p) +{ + ip_addr prefix = (NET_IP4 == p->addr_type) ? ipa_from_ip4(IP4_NONE) : ipa_from_ip6(IP6_NONE); + + struct buffer buf = { 0 }; + LOG_BUFFER_INIT(buf); + + log("==== TRIE BEGIN ===="); + dump_trie_helper(p, p->root, &prefix, 0, &buf); + log("==== TRIE END ===="); +} + +/* + * Insert @prefix to the trie and assign @bucket to this prefix. If the prefix + * is already in the trie, update its bucket to @bucket and return updated node. + */ +static struct trie_node * +aggregator_insert_prefix(struct aggregator_proto *p, ip_addr prefix, u32 pxlen, struct aggregator_bucket *bucket) +{ + assert(p != NULL); + assert(bucket != NULL); + + struct trie_node *node = p->root; + + for (u32 i = 0; i < pxlen; i++) + { + u32 bit = ipa_getbit(prefix, i + ipa_shift[p->addr_type]); + + if (!node->child[bit]) + { + struct trie_node *new = create_new_node(p->trie_pool); + + *new = (struct trie_node) { + .parent = node, + .status = NON_FIB, + .px_origin = FILLER, + .depth = node->depth + 1, + }; + + node->child[bit] = new; + } + + node = node->child[bit]; + } + + /* Assign bucket to the last node */ + node->original_bucket = bucket; + node->px_origin = ORIGINAL; + + return node; +} + +/* + * Remove @prefix from the trie and return the last affected node + */ +static struct trie_node * +aggregator_remove_prefix(struct aggregator_proto *p, ip_addr prefix, u32 pxlen) +{ + struct trie_node *node = p->root; + + for (u32 i = 0; i < pxlen; i++) + { + u32 bit = ipa_getbit(prefix, i + ipa_shift[p->addr_type]); + node = node->child[bit]; + assert(node != NULL); + } + + assert(node->px_origin == ORIGINAL); + assert(node->selected_bucket != NULL); + assert((u32)node->depth == pxlen); + + /* If this prefix was IN_FIB, remove its route */ + if (IN_FIB == node->status) + aggregator_prepare_rte_withdrawal(p, prefix, pxlen, node->selected_bucket); + + node->status = NON_FIB; + node->px_origin = FILLER; + node->ancestor = NULL; + node->original_bucket = NULL; + node->selected_bucket = NULL; + node->potential_buckets_count = 0; + memset(node->potential_buckets, 0, sizeof(node->potential_buckets)); + + /* + * If prefix node is a leaf, remove it with the branch it resides on, + * until non-leaf or prefix node is reached. + */ + for (struct trie_node *parent = node->parent; parent; node = parent, parent = node->parent) + { + if (FILLER == node->px_origin && is_leaf(node)) + { + remove_node(node); + assert(node != NULL); + assert(parent != NULL); + } + else + break; + } + + return node; +} + +/* + * Find prefix corresponding to the position of @target node in the trie + * and save result into @prefix and @pxlen. + */ +static void +find_subtree_prefix(const struct trie_node *target, ip_addr *prefix, u32 *pxlen, u32 type) +{ + assert(target != NULL); + assert(prefix != NULL); + assert(pxlen != NULL); + + int path[IP6_MAX_PREFIX_LENGTH] = { 0 }; + int pos = 0; + u32 len = 0; + + const struct trie_node *node = target; + const struct trie_node *parent = node->parent; + + /* Ascend to the root node */ + while (parent) + { + if (node == parent->child[0]) + path[pos++] = 0; + else if (node == parent->child[1]) + path[pos++] = 1; + else + bug("Corrupted memory (node is not its parent's child)"); + + assert(pos < IP6_MAX_PREFIX_LENGTH); + node = parent; + parent = node->parent; + } + + assert(node->parent == NULL); + + /* Descend to the target node */ + for (int i = pos - 1; i >= 0; i--) + { + if (path[i] == 0) + ipa_clrbit(prefix, node->depth + ipa_shift[type]); + else + ipa_setbit(prefix, node->depth + ipa_shift[type]); + + node = node->child[path[i]]; + len++; + assert((u32)node->depth == len); + } + + assert(node == target); + *pxlen = len; +} + +/* + * Second pass of Optimal Route Table Construction (ORTC) algorithm + */ +static void +second_pass(struct trie_node *node) +{ + assert(node != NULL); + assert(node->potential_buckets_count <= MAX_POTENTIAL_BUCKETS_COUNT); + + if (is_leaf(node)) + { + assert(node->original_bucket != NULL); + assert(node->status == NON_FIB); + assert(node->potential_buckets_count == 0); + node_add_potential_bucket(node, node->original_bucket); + return; + } + + /* Propagate original buckets */ + if (!node->original_bucket) + node->original_bucket = node->parent->original_bucket; + + /* Internal node */ + assert(node->potential_buckets_count == 0); + + struct trie_node *left = node->child[0]; + struct trie_node *right = node->child[1]; + + /* Postorder traversal */ + if (left) + second_pass(left); + + if (right) + second_pass(right); + + assert(node->original_bucket != NULL); + assert(node->selected_bucket == NULL); + + /* Imaginary node if this was a complete binary tree */ + struct trie_node imaginary_node = { + .parent = node, + }; + + /* + * Imaginary node is used only for computing sets of potential buckets + * of its parent node. It inherits parent's potential bucket. + */ + node_add_potential_bucket(&imaginary_node, node->original_bucket); + + /* Nodes with exactly one child */ + if ((left && !right) || (!left && right)) + { + if (left && !right) + right = &imaginary_node; + else + left = &imaginary_node; + } + + assert(left != NULL && right != NULL); + + /* + * If there are no common buckets among children's buckets, parent's + * buckets are computed as union of its children's buckets. + * Otherwise, parent's buckets are computed as intersection of its + * children's buckets. + */ + merge_potential_buckets(node, left, right); +} + +static void +third_pass_helper(struct aggregator_proto *p, struct trie_node *node, ip_addr *prefix, u32 pxlen) +{ + assert(node != NULL); + assert(node->potential_buckets_count <= MAX_POTENTIAL_BUCKETS_COUNT); + + assert(node->original_bucket != NULL); + assert(node->parent->ancestor != NULL); + assert(node->parent->ancestor->selected_bucket != NULL); + + /* Bucket inherited from the closest ancestor with a non-null selected bucket */ + const struct aggregator_bucket * const inherited_bucket = node->parent->ancestor->selected_bucket; + + /* + * If the bucket inherited from the ancestor is one of the potential buckets + * of this node, then this node doesn't need a bucket because it inherits + * one, and is not needed in FIB. + */ + if (node_is_bucket_potential(node, inherited_bucket)) + { + /* + * Prefix status is changing from IN_FIB to NON_FIB, thus its route + * must be removed from the routing table. + */ + if (IN_FIB == node->status) + { + assert(node->px_origin == ORIGINAL || node->px_origin == AGGREGATED); + assert(node->selected_bucket != NULL); + + aggregator_prepare_rte_withdrawal(p, *prefix, pxlen, node->selected_bucket); + } + + node->selected_bucket = NULL; + node->status = NON_FIB; + node->ancestor = node->parent->ancestor; + + /* + * We have to keep information whether this prefix was original to enable + * processing of incremental updates. If it's not original, then it is + * a filler because it's not going to FIB. + */ + node->px_origin = (ORIGINAL == node->px_origin) ? ORIGINAL : FILLER; + } + else + { + assert(node->potential_buckets_count > 0); + + /* Assign bucket with the lowest ID to the node */ + node->selected_bucket = select_lowest_id_bucket(p, node); + assert(node->selected_bucket != NULL); + + /* + * Prefix status is changing from NON_FIB or UNASSIGNED (at newly created nodes) + * to IN_FIB, thus its route is exported to the routing table. + */ + if (IN_FIB != node->status) + create_route(p, *prefix, pxlen, node->selected_bucket); + + /* + * Keep information whether this prefix was original. If not, then its origin + * is changed to aggregated, because algorithm decided it's going to FIB. + */ + node->px_origin = (ORIGINAL == node->px_origin) ? ORIGINAL : AGGREGATED; + node->status = IN_FIB; + node->ancestor = node; + } + + assert((node->selected_bucket != NULL && node->status == IN_FIB) || (node->selected_bucket == NULL && node->status == NON_FIB)); + assert(node->ancestor != NULL); + assert(node->ancestor->original_bucket != NULL); + assert(node->ancestor->selected_bucket != NULL); + + const struct trie_node * const left = node->child[0]; + const struct trie_node * const right = node->child[1]; + + /* Nodes with only one child */ + if ((left && !right) || (!left && right)) + { + /* + * Imaginary node that would have been added during first pass. + * This node inherits bucket from its parent (current node). + */ + struct trie_node imaginary_node = { + .parent = node, + .original_bucket = node->original_bucket, + .px_origin = AGGREGATED, + .depth = node->depth + 1, + }; + + node_add_potential_bucket(&imaginary_node, node->original_bucket); + + /* + * If the current node (parent of the imaginary node) has a bucket, + * then the imaginary node inherits this bucket. + * Otherwise it inherits bucket from the closest ancestor with + * a non-null bucket. + */ + const struct aggregator_bucket * const imaginary_node_inherited_bucket = node->selected_bucket ? node->selected_bucket : inherited_bucket; + + /* + * Original algorithm would normalize the trie during first pass, so that + * every node has either zero or two children. We skip this step to save + * both time and memory. + * On the other hand, nodes may be removed from the trie during third pass, + * if they do not have bucket on their own and inherit one from their + * ancestors instead (and thus are not needed in FIB). + * Nodes get bucket if the bucket inherited from their ancestors is NOT + * one of their potential buckets. In this case, we need to add these nodes + * to the trie. + */ + if (!node_is_bucket_potential(&imaginary_node, imaginary_node_inherited_bucket)) + { + struct trie_node *new = create_new_node(p->trie_pool); + *new = imaginary_node; + + /* Connect new node to the trie */ + if (left && !right) + node->child[1] = new; + else + node->child[0] = new; + } + } + + /* Preorder traversal */ + if (node->child[0]) + { + assert((u32)node->depth == pxlen); + ipa_clrbit(prefix, node->depth + ipa_shift[p->addr_type]); + third_pass_helper(p, node->child[0], prefix, pxlen + 1); + } + + if (node->child[1]) + { + assert((u32)node->depth == pxlen); + ipa_setbit(prefix, node->depth + ipa_shift[p->addr_type]); + third_pass_helper(p, node->child[1], prefix, pxlen + 1); + ipa_clrbit(prefix, node->depth + ipa_shift[p->addr_type]); + } + + if (NON_FIB == node->status && is_leaf(node)) + assert(node->selected_bucket == NULL); +} + +/* + * Third pass of Optimal Route Table Construction (ORTC) algorithm + */ +static void +third_pass(struct aggregator_proto *p, struct trie_node *node) +{ + assert(node != NULL); + assert(node->potential_buckets_count <= MAX_POTENTIAL_BUCKETS_COUNT); + assert(node->potential_buckets_count > 0); + + ip_addr prefix = (NET_IP4 == p->addr_type) ? ipa_from_ip4(IP4_NONE) : ipa_from_ip6(IP6_NONE); + u32 pxlen = 0; + + /* + * If third pass runs on a subtree and not the whole trie, + * find prefix that covers this subtree. + */ + find_subtree_prefix(node, &prefix, &pxlen, p->addr_type); + + /* Select bucket with the lowest ID */ + node->selected_bucket = select_lowest_id_bucket(p, node); + assert(node->selected_bucket != NULL); + + /* + * Export new route if node status is changing from NON_FIB + * or UNASSIGNED to IN_FIB. + */ + if (IN_FIB != node->status) + create_route(p, prefix, pxlen, node->selected_bucket); + + /* The closest ancestor of the IN_FIB node with a non-null bucket is the node itself */ + node->ancestor = node; + node->status = IN_FIB; + + if (node->child[0]) + { + assert((u32)node->depth == pxlen); + ipa_clrbit(&prefix, node->depth + ipa_shift[p->addr_type]); + third_pass_helper(p, node->child[0], &prefix, pxlen + 1); + } + + if (node->child[1]) + { + assert((u32)node->depth == pxlen); + ipa_setbit(&prefix, node->depth + ipa_shift[p->addr_type]); + third_pass_helper(p, node->child[1], &prefix, pxlen + 1); + ipa_clrbit(&prefix, node->depth + ipa_shift[p->addr_type]); + } +} + +static void +check_ancestors_after_aggregation(const struct trie_node *node) +{ + assert(node != NULL); + assert(node->ancestor != NULL); + + if (IN_FIB == node->status) + { + assert(node->selected_bucket != NULL); + assert(node->ancestor != NULL); + assert(node->ancestor == node); + } + else if (NON_FIB == node->status) + { + assert(node->selected_bucket == NULL); + assert(node->ancestor != NULL); + assert(node->ancestor != node); + assert(node->ancestor == node->parent->ancestor); + } + else + bug("Unknown node status"); + + if (node->child[0]) + check_ancestors_after_aggregation(node->child[0]); + + if (node->child[1]) + check_ancestors_after_aggregation(node->child[1]); +} + +/* + * Delete all information computed by aggregation algorithm in the subtree + * rooted at @node and propagate original buckets in the subtree. + */ +static void +deaggregate(struct trie_node * const node) +{ + assert(node != NULL); + + /* Delete results computed by aggregation algorithm */ + node->selected_bucket = NULL; + node->ancestor = NULL; + node->potential_buckets_count = 0; + memset(node->potential_buckets, 0, sizeof(node->potential_buckets)); + + /* + * Original prefixes already have their original bucket set, + * others inherit it from their parents. + */ + if (ORIGINAL != node->px_origin) + { + assert(node->original_bucket == NULL); + node->original_bucket = node->parent->original_bucket; + } + + assert(node->original_bucket != NULL); + assert(node->potential_buckets_count == 0); + + /* As during the first pass, leaves get one potential bucket */ + if (is_leaf(node)) + { + assert(node->px_origin == ORIGINAL); + assert(node->potential_buckets_count == 0); + node_add_potential_bucket(node, node->original_bucket); + } + + if (node->child[0]) + deaggregate(node->child[0]); + + if (node->child[1]) + deaggregate(node->child[1]); +} + +/* + * Merge sets of potential buckets of node's children going from @node upwards. + * Stop when the node's set doesn't change and return the last updated node. + */ +static struct trie_node * +merge_buckets_above(struct trie_node *node) +{ + assert(node != NULL); + + struct trie_node *parent = node->parent; + + while (parent) + { + const struct trie_node *left = parent->child[0]; + const struct trie_node *right = parent->child[1]; + assert(left == node || right == node); + + struct trie_node imaginary_node = { 0 }; + node_add_potential_bucket(&imaginary_node, parent->original_bucket); + + /* Nodes with only one child */ + if (left && !right) + right = &imaginary_node; + else if (!left && right) + left = &imaginary_node; + + assert(left != NULL && right != NULL); + + /* The parent's set wasn't affected by merging, stop here */ + if (merge_potential_buckets(parent, left, right) == 0) + return node; + + node = parent; + parent = node->parent; + } + + return node; +} + +/* + * Incorporate announcement of new prefix into the trie + */ +static void +aggregator_update_prefix(struct aggregator_proto *p, struct aggregator_route *old UNUSED, struct aggregator_route *new) +{ + assert(p != NULL); + assert(new != NULL); + + const struct net_addr *addr = new->rte.net->n.addr; + + const ip_addr prefix = net_prefix(addr); + const u32 pxlen = net_pxlen(addr); + + struct trie_node * const updated_node = aggregator_insert_prefix(p, prefix, pxlen, new->bucket); + assert(updated_node != NULL); + assert(updated_node->original_bucket != NULL); + assert(updated_node->status == NON_FIB); + assert(updated_node->px_origin == ORIGINAL); + + struct trie_node *node = updated_node; + + /* + * Find the closest IN_FIB ancestor of the updated node and + * deaggregate the whole subtree rooted at this node. + * Then aggregate it once again, this time with incorporated update. + */ + while (1) + { + if (IN_FIB == node->status && node != updated_node) + break; + + node = node->parent; + } + + deaggregate(node); + second_pass(node); + struct trie_node *highest_node = merge_buckets_above(node); + assert(highest_node != NULL); + third_pass(p, highest_node); +} + +/* + * Incorporate prefix withdrawal to the trie + */ +static void +aggregator_withdraw_prefix(struct aggregator_proto *p, struct aggregator_route *old) +{ + assert(p != NULL); + assert(old != NULL); + + const struct net_addr *addr = old->rte.net->n.addr; + + const ip_addr prefix = net_prefix(addr); + const u32 pxlen = net_pxlen(addr); + + struct trie_node * const updated_node = aggregator_remove_prefix(p, prefix, pxlen); + assert(updated_node != NULL); + + struct trie_node *node = updated_node; + + /* + * Find the closest IN_FIB ancestor of the updated node and + * deaggregate the whole subtree rooted at this node. + * Then aggegate it once again, this time with received update. + */ + while (1) + { + if (IN_FIB == node->status) + break; + + node = node->parent; + } + + deaggregate(node); + second_pass(node); + struct trie_node *highest_node = merge_buckets_above(node); + assert(highest_node != NULL); + third_pass(p, highest_node); +} + +static void +construct_trie(struct aggregator_proto *p) +{ + HASH_WALK(p->buckets, next_hash, bucket) + { + for (const struct rte *rte = bucket->rte; rte; rte = rte->next) + { + const struct net_addr *addr = rte->net->n.addr; + + const ip_addr prefix = net_prefix(addr); + const u32 pxlen = net_pxlen(addr); + + aggregator_insert_prefix(p, prefix, pxlen, bucket); + } + } + HASH_WALK_END; +} + +/* + * Run Optimal Routing Table Constructor (ORTC) algorithm + */ +static void +calculate_trie(struct aggregator_proto *p) +{ + assert(p->addr_type == NET_IP4 || p->addr_type == NET_IP6); + + second_pass(p->root); + third_pass(p, p->root); + + check_ancestors_after_aggregation(p->root); +} + +static void +run_aggregation(struct aggregator_proto *p) +{ + assert(p->root != NULL); + + construct_trie(p); + calculate_trie(p); +} + +static void aggregator_initialize_trie(struct aggregator_proto *p); + +static void +aggregate_on_feed_end(struct channel *C) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto); + + if (C != p->src) + return; + + assert(PREFIX_AGGR == p->aggr_mode); + assert(p->root == NULL); + + aggregator_initialize_trie(p); + run_aggregation(p); +} + /* * Set static attribute in @rta from static attribute in @old according to @sa. */ @@ -143,11 +1254,7 @@ same_val_list(const struct f_val *v1, const struct f_val *v2, uint len) } /* - * Create and export new merged route. - * @old: first route in a sequence of equivalent routes that are to be merged - * @rte_val: first element in a sequence of equivalent rte_val_list entries - * @length: number of equivalent routes that are to be merged (at least 1) - * @ail: aggregation list + * Create and export new merged route */ static void aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *bucket, struct network *net) @@ -166,7 +1273,7 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b rta->source = RTS_AGGREGATED; rta->scope = SCOPE_UNIVERSE; - struct ea_list *eal = allocz(sizeof(struct ea_list) + sizeof(struct eattr) * p->aggr_on_da_count); + struct ea_list *eal = allocz(sizeof(*eal) + sizeof(struct eattr) * p->aggr_on_da_count); eal->next = NULL; eal->count = 0; rta->eattrs = eal; @@ -186,13 +1293,15 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b rta_set_static_attr(rta, bucket->rte->attrs, p->aggr_on[i].sa); } - struct rte *new = rte_get_temp(rta, bucket->rte->src); + struct rte *new = rte_get_temp(rta, p->p.main_source); new->net = net; - log("=============== CREATE MERGED ROUTE ==============="); - log("New route created: id = %d, protocol: %s", new->src->global_id, new->src->proto->name); - log("==================================================="); - + if (p->logging) + { + log("=============== CREATE MERGED ROUTE ==============="); + log("New route created: id = %d, protocol: %s", new->src->global_id, new->src->proto->name); + log("==================================================="); + } /* merge filter needs one argument called "routes" */ struct f_val val = { @@ -222,7 +1331,7 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b /* We actually don't want this route */ case F_REJECT: if (bucket->last_src) - rte_update2(p->dst, net->n.addr, NULL, bucket->last_src); + rte_update2(p->dst, net->n.addr, NULL, bucket->last_src); break; } @@ -231,6 +1340,7 @@ aggregator_bucket_update(struct aggregator_proto *p, struct aggregator_bucket *b { if (new_src) rt_lock_source(new_src); + if (bucket->last_src) rt_unlock_source(bucket->last_src); @@ -274,18 +1384,18 @@ eval_static_attr(const struct rte *rt1, struct f_static_attr sa, struct f_val *p switch (sa.sa_code) { - case SA_NET: RESULT(sa.f_type, net, rt1->net->n.addr); break; - case SA_FROM: RESULT(sa.f_type, ip, rta->from); break; - case SA_GW: RESULT(sa.f_type, ip, rta->nh.gw); break; - case SA_PROTO: RESULT(sa.f_type, s, rt1->src->proto->name); break; - case SA_SOURCE: RESULT(sa.f_type, i, rta->source); break; - case SA_SCOPE: RESULT(sa.f_type, i, rta->scope); break; - case SA_DEST: RESULT(sa.f_type, i, rta->dest); break; - case SA_IFNAME: RESULT(sa.f_type, s, rta->nh.iface ? rta->nh.iface->name : ""); break; - case SA_IFINDEX: RESULT(sa.f_type, i, rta->nh.iface ? rta->nh.iface->index : 0); break; - case SA_WEIGHT: RESULT(sa.f_type, i, rta->nh.weight + 1); break; - case SA_PREF: RESULT(sa.f_type, i, rta->pref); break; - case SA_GW_MPLS: RESULT(sa.f_type, i, rta->nh.labels ? rta->nh.label[0] : MPLS_NULL); break; + case SA_NET: RESULT(sa.f_type, net, rt1->net->n.addr); break; + case SA_FROM: RESULT(sa.f_type, ip, rta->from); break; + case SA_GW: RESULT(sa.f_type, ip, rta->nh.gw); break; + case SA_PROTO: RESULT(sa.f_type, s, rt1->src->proto->name); break; + case SA_SOURCE: RESULT(sa.f_type, i, rta->source); break; + case SA_SCOPE: RESULT(sa.f_type, i, rta->scope); break; + case SA_DEST: RESULT(sa.f_type, i, rta->dest); break; + case SA_IFNAME: RESULT(sa.f_type, s, rta->nh.iface ? rta->nh.iface->name : ""); break; + case SA_IFINDEX: RESULT(sa.f_type, i, rta->nh.iface ? rta->nh.iface->index : 0); break; + case SA_WEIGHT: RESULT(sa.f_type, i, rta->nh.weight + 1); break; + case SA_PREF: RESULT(sa.f_type, i, rta->pref); break; + case SA_GW_MPLS: RESULT(sa.f_type, i, rta->nh.labels ? rta->nh.label[0] : MPLS_NULL); break; default: bug("Invalid static attribute access (%u/%u)", sa.f_type, sa.sa_code); } @@ -386,7 +1496,8 @@ eval_dynamic_attr(const struct rte *rt1, struct f_dynamic_attr da, struct f_val #undef RESULT_VOID } -static inline u32 aggr_route_hash(const rte *e) +static inline u32 +aggr_route_hash(const rte *e) { struct { net *net; @@ -430,8 +1541,13 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new { struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); ASSERT_DIE(src_ch == p->src); + struct aggregator_bucket *new_bucket = NULL, *old_bucket = NULL; - struct aggregator_route *old_route = NULL; + struct aggregator_route *new_route = NULL, *old_route = NULL; + + /* Ignore all updates if protocol is not up */ + if (p->p.proto_state != PS_UP) + return; /* Find the objects for the old route */ if (old) @@ -448,7 +1564,8 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new return; /* Evaluate route attributes. */ - struct aggregator_bucket *tmp_bucket = sl_allocz(p->bucket_slab); + struct aggregator_bucket *tmp_bucket = allocz(sizeof(*tmp_bucket) + sizeof(tmp_bucket->aggr_data[0]) * p->aggr_on_count); + assert(tmp_bucket->id == 0); for (uint val_idx = 0; val_idx < p->aggr_on_count; val_idx++) { @@ -493,6 +1610,7 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new /* Compute the hash */ u64 haux; mem_hash_init(&haux); + for (uint i = 0; i < p->aggr_on_count; i++) { mem_hash_mix_num(&haux, tmp_bucket->aggr_data[i].type); @@ -502,50 +1620,50 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new switch (tmp_bucket->aggr_data[i].type) { - case T_VOID: - break; - case T_INT: - case T_BOOL: - case T_PAIR: - case T_QUAD: - case T_ENUM: - MX(i); - break; - case T_EC: - case T_RD: - MX(ec); - break; - case T_LC: - MX(lc); - break; - case T_IP: - MX(ip); - break; - case T_NET: - mem_hash_mix_num(&haux, net_hash(IT(net))); - break; - case T_STRING: - mem_hash_mix_str(&haux, IT(s)); - break; - case T_PATH_MASK: - mem_hash_mix(&haux, IT(path_mask), sizeof(*IT(path_mask)) + IT(path_mask)->len * sizeof (IT(path_mask)->item)); - break; - case T_PATH: - case T_CLIST: - case T_ECLIST: - case T_LCLIST: - mem_hash_mix(&haux, IT(ad)->data, IT(ad)->length); - break; - case T_PATH_MASK_ITEM: - case T_ROUTE: - case T_ROUTES_BLOCK: - bug("Invalid type %s in hashing", f_type_name(tmp_bucket->aggr_data[i].type)); - case T_SET: - MX(t); - break; - case T_PREFIX_SET: - MX(ti); - break; + case T_VOID: + break; + case T_INT: + case T_BOOL: + case T_PAIR: + case T_QUAD: + case T_ENUM: + MX(i); + break; + case T_EC: + case T_RD: + MX(ec); + break; + case T_LC: + MX(lc); + break; + case T_IP: + MX(ip); + break; + case T_NET: + mem_hash_mix_num(&haux, net_hash(IT(net))); + break; + case T_STRING: + mem_hash_mix_str(&haux, IT(s)); + break; + case T_PATH_MASK: + mem_hash_mix(&haux, IT(path_mask), sizeof(*IT(path_mask)) + IT(path_mask)->len * sizeof (IT(path_mask)->item)); + break; + case T_PATH: + case T_CLIST: + case T_ECLIST: + case T_LCLIST: + mem_hash_mix(&haux, IT(ad)->data, IT(ad)->length); + break; + case T_PATH_MASK_ITEM: + case T_ROUTE: + case T_ROUTES_BLOCK: + bug("Invalid type %s in hashing", f_type_name(tmp_bucket->aggr_data[i].type)); + case T_SET: + MX(t); + break; + case T_PREFIX_SET: + MX(ti); + break; } } @@ -553,11 +1671,15 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new /* Find the existing bucket */ if (new_bucket = HASH_FIND(p->buckets, AGGR_BUCK, tmp_bucket)) - sl_free(tmp_bucket); + ; else { - new_bucket = tmp_bucket; + new_bucket = lp_allocz(p->bucket_pool, sizeof(*new_bucket) + sizeof(new_bucket->aggr_data[0]) * p->aggr_on_count); + memcpy(new_bucket, tmp_bucket, sizeof(*new_bucket) + sizeof(new_bucket->aggr_data[0]) * p->aggr_on_count); HASH_INSERT2(p->buckets, AGGR_BUCK, p->p.pool, new_bucket); + + new_bucket->id = get_new_bucket_id(p); + agregator_insert_bucket(p, new_bucket); } /* Store the route attributes */ @@ -566,47 +1688,77 @@ aggregator_rt_notify(struct proto *P, struct channel *src_ch, net *net, rte *new else new->attrs = rta_lookup(new->attrs); + if (p->logging) + log("New rte: %p, net: %p, src: %p, hash: %x", new, new->net, new->src, aggr_route_hash(new)); + /* Insert the new route into the bucket */ - struct aggregator_route *arte = sl_alloc(p->route_slab); + struct aggregator_route *arte = lp_allocz(p->route_pool, sizeof(*arte)); + *arte = (struct aggregator_route) { .bucket = new_bucket, .rte = *new, }; + arte->rte.next = new_bucket->rte, new_bucket->rte = &arte->rte; new_bucket->count++; HASH_INSERT2(p->routes, AGGR_RTE, p->p.pool, arte); + + /* New route */ + new_route = arte; + assert(new_route != NULL); + + if (p->logging) + log("Inserting rte: %p, arte: %p, net: %p, src: %p, hash: %x", &arte->rte, arte, arte->rte.net, arte->rte.src, aggr_route_hash(&arte->rte)); } /* Remove the old route from its bucket */ if (old_bucket) { for (struct rte **k = &old_bucket->rte; *k; k = &(*k)->next) + { if (*k == &old_route->rte) { - *k = (*k)->next; - break; + *k = (*k)->next; + break; } + } old_bucket->count--; HASH_REMOVE2(p->routes, AGGR_RTE, p->p.pool, old_route); rta_free(old_route->rte.attrs); - sl_free(old_route); } - /* Announce changes */ - if (old_bucket) - aggregator_bucket_update(p, old_bucket, net); + /* Aggregation within nets allows incremental updates */ + if (NET_AGGR == p->aggr_mode) + { + /* Announce changes */ + if (old_bucket) + aggregator_bucket_update(p, old_bucket, net); + + if (new_bucket && (new_bucket != old_bucket)) + aggregator_bucket_update(p, new_bucket, net); + } + else if (PREFIX_AGGR == p->aggr_mode) + { + if (p->root) + { + if (old && !new) + aggregator_withdraw_prefix(p, old_route); + else + aggregator_update_prefix(p, old_route, new_route); - if (new_bucket && (new_bucket != old_bucket)) - aggregator_bucket_update(p, new_bucket, net); + /* Process all route withdrawals which were caused by the update */ + aggregator_withdraw_rte(p); + } + } /* Cleanup the old bucket if empty */ if (old_bucket && (!old_bucket->rte || !old_bucket->count)) { ASSERT_DIE(!old_bucket->rte && !old_bucket->count); + hmap_clear(&p->bucket_id_map, old_bucket->id); HASH_REMOVE2(p->buckets, AGGR_BUCK, p->p.pool, old_bucket); - sl_free(old_bucket); } } @@ -614,6 +1766,7 @@ static int aggregator_preexport(struct channel *C, struct rte *new) { struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, C->proto); + /* Reject our own routes */ if (new->sender == p->dst) return -1; @@ -660,26 +1813,112 @@ aggregator_init(struct proto_config *CF) proto_configure_channel(P, &p->src, cf->src); proto_configure_channel(P, &p->dst, cf->dst); + p->aggr_mode = cf->aggr_mode; p->aggr_on_count = cf->aggr_on_count; p->aggr_on_da_count = cf->aggr_on_da_count; p->aggr_on = cf->aggr_on; p->merge_by = cf->merge_by; + p->logging = cf->logging; + p->bucket_list = NULL; + p->bucket_list_size = 0; + p->bucket_list_count = 0; P->rt_notify = aggregator_rt_notify; P->preexport = aggregator_preexport; + P->feed_end = aggregate_on_feed_end; return P; } +/* + * Initialize hash table and create default route + */ +static void +aggregator_initialize_trie(struct aggregator_proto *p) +{ + struct network *default_net = NULL; + + if (p->addr_type == NET_IP4) + { + default_net = mb_allocz(p->p.pool, sizeof(*default_net) + sizeof(struct net_addr_ip4)); + net_fill_ip4(default_net->n.addr, IP4_NONE, 0); + + if (p->logging) + log("Creating net %p for default route %N", default_net, default_net->n.addr); + } + else if (p->addr_type == NET_IP6) + { + default_net = mb_allocz(p->p.pool, sizeof(*default_net) + sizeof(struct net_addr_ip6)); + net_fill_ip6(default_net->n.addr, IP6_NONE, 0); + + if (p->logging) + log("Creating net %p for default route %N", default_net, default_net->n.addr); + } + + /* Create root node */ + p->root = create_new_node(p->trie_pool); + + /* Create route attributes with zero nexthop */ + struct rta rta = { 0 }; + + /* Allocate bucket for root node */ + struct aggregator_bucket *new_bucket = lp_allocz(p->bucket_pool, sizeof(*new_bucket)); + assert(new_bucket->id == 0); + + u64 haux = 0; + mem_hash_init(&haux); + new_bucket->hash = mem_hash_value(&haux); + + /* Assign ID to the root node bucket */ + new_bucket->id = get_new_bucket_id(p); + agregator_insert_bucket(p, new_bucket); + assert(get_bucket_ptr(p, new_bucket->id) == new_bucket); + + struct aggregator_route *arte = lp_allocz(p->route_pool, sizeof(*arte)); + + *arte = (struct aggregator_route) { + .bucket = new_bucket, + .rte = { .attrs = rta_lookup(&rta) }, + }; + + arte->rte.next = new_bucket->rte; + new_bucket->rte = &arte->rte; + new_bucket->count++; + + arte->rte.net = default_net; + default_net->routes = &arte->rte; + + HASH_INSERT2(p->routes, AGGR_RTE, p->p.pool, arte); + HASH_INSERT2(p->buckets, AGGR_BUCK, p->p.pool, new_bucket); + + /* + * Root node is initialized with NON_FIB status. + * Default route will be exported during first aggregation run. + */ + *p->root = (struct trie_node) { + .original_bucket = new_bucket, + .status = NON_FIB, + .px_origin = ORIGINAL, + .depth = 0, + }; +} + static int aggregator_start(struct proto *P) { struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); - p->bucket_slab = sl_new(P->pool, sizeof(struct aggregator_bucket) + AGGR_DATA_MEMSIZE); + assert(p->bucket_pool == NULL); + assert(p->route_pool == NULL); + assert(p->trie_pool == NULL); + assert(p->root == NULL); + + p->addr_type = p->src->table->addr_type; + + p->bucket_pool = lp_new(P->pool); HASH_INIT(p->buckets, P->pool, AGGR_BUCK_ORDER); - p->route_slab = sl_new(P->pool, sizeof(struct aggregator_route)); + p->route_pool = lp_new(P->pool); HASH_INIT(p->routes, P->pool, AGGR_RTE_ORDER); p->reload_buckets = (event) { @@ -687,33 +1926,57 @@ aggregator_start(struct proto *P) .data = p, }; + if (PREFIX_AGGR == p->aggr_mode) + { + assert(p->trie_pool == NULL); + p->trie_pool = lp_new(P->pool); + + assert(p->bucket_list == NULL); + assert(p->bucket_list_size == 0); + assert(p->bucket_list_count == 0); + p->bucket_list_size = BUCKET_LIST_INIT_SIZE; + p->bucket_list = mb_allocz(p->p.pool, sizeof(p->bucket_list[0]) * p->bucket_list_size); + } + + hmap_init(&p->bucket_id_map, p->p.pool, 1024); + hmap_set(&p->bucket_id_map, 0); /* 0 is default value, do not use it as ID */ + + p->rte_withdrawal_pool = lp_new(P->pool); + p->rte_withdrawal_count = 0; + return PS_UP; } static int -aggregator_shutdown(struct proto *P) +aggregator_shutdown(struct proto *P UNUSED) +{ + return PS_DOWN; +} + +static void +aggregator_cleanup(struct proto *P) { struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); - HASH_WALK_DELSAFE(p->buckets, next_hash, b) - { - while (b->rte) - { - struct aggregator_route *arte = SKIP_BACK(struct aggregator_route, rte, b->rte); - b->rte = arte->rte.next; - b->count--; - HASH_REMOVE(p->routes, AGGR_RTE, arte); - rta_free(arte->rte.attrs); - sl_free(arte); - } + /* + * Linpools will be freed along with other protocol resources but pointers + * have to be set to NULL because protocol may be started again. + */ + p->bucket_pool = NULL; + p->route_pool = NULL; + p->trie_pool = NULL; + p->rte_withdrawal_pool = NULL; - ASSERT_DIE(b->count == 0); - HASH_REMOVE(p->buckets, AGGR_BUCK, b); - sl_free(b); - } - HASH_WALK_END; + p->root = NULL; - return PS_DOWN; + p->bucket_list = NULL; + p->bucket_list_size = 0; + p->bucket_list_count = 0; + + p->rte_withdrawal_stack = NULL; + p->rte_withdrawal_count = 0; + + p->bucket_id_map = (struct hmap) { 0 }; } static int @@ -733,23 +1996,25 @@ aggregator_reconfigure(struct proto *P, struct proto_config *CF) /* Compare aggregator rule */ for (uint i = 0; i < p->aggr_on_count; i++) + { switch (cf->aggr_on[i].type) { case AGGR_ITEM_TERM: - if (!f_same(cf->aggr_on[i].line, p->aggr_on[i].line)) - return 0; - break; + if (!f_same(cf->aggr_on[i].line, p->aggr_on[i].line)) + return 0; + break; case AGGR_ITEM_STATIC_ATTR: - if (memcmp(&cf->aggr_on[i].sa, &p->aggr_on[i].sa, sizeof(struct f_static_attr)) != 0) - return 0; - break; + if (memcmp(&cf->aggr_on[i].sa, &p->aggr_on[i].sa, sizeof(struct f_static_attr)) != 0) + return 0; + break; case AGGR_ITEM_DYNAMIC_ATTR: - if (memcmp(&cf->aggr_on[i].da, &p->aggr_on[i].da, sizeof(struct f_dynamic_attr)) != 0) - return 0; - break; + if (memcmp(&cf->aggr_on[i].da, &p->aggr_on[i].da, sizeof(struct f_dynamic_attr)) != 0) + return 0; + break; default: - bug("Broken aggregator rule"); + bug("Broken aggregator rule"); } + } /* Compare merge filter */ if (!f_same(cf->merge_by, p->merge_by)) @@ -761,6 +2026,22 @@ aggregator_reconfigure(struct proto *P, struct proto_config *CF) return 1; } +static void +aggregator_get_status(struct proto *P, byte *buf) +{ + struct aggregator_proto *p = SKIP_BACK(struct aggregator_proto, p, P); + + if (p->p.proto_state == PS_DOWN) + buf[0] = 0; + else + { + if (PREFIX_AGGR == p->aggr_mode) + strcpy(buf, "prefix aggregation"); + else + strcpy(buf, "net aggregation"); + } +} + struct protocol proto_aggregator = { .name = "Aggregator", .template = "aggregator%d", @@ -773,7 +2054,9 @@ struct protocol proto_aggregator = { .init = aggregator_init, .start = aggregator_start, .shutdown = aggregator_shutdown, + .cleanup = aggregator_cleanup, .reconfigure = aggregator_reconfigure, + .get_status = aggregator_get_status, }; void diff --git a/proto/aggregator/aggregator.h b/proto/aggregator/aggregator.h index 19459b1d0..7448fefdc 100644 --- a/proto/aggregator/aggregator.h +++ b/proto/aggregator/aggregator.h @@ -1,9 +1,9 @@ /* * BIRD -- Aggregator Pseudoprotocol * - * (c) 2023 Igor Putovny - * (c) 2023 Maria Matejka - * (c) 2023 CZ.NIC z.s.p.o. + * (c) 2023--2024 Igor Putovny + * (c) 2023--2024 Maria Matejka + * (c) 2024 CZ.NIC z.s.p.o. * * Can be freely distributed and used under the terms of the GNU GPL. * @@ -17,13 +17,26 @@ #include "nest/protocol.h" #include "lib/hash.h" +#define BUCKET_LIST_INIT_SIZE 16 +#define POTENTIAL_BUCKETS_BITMAP_SIZE 8 +#define MAX_POTENTIAL_BUCKETS_COUNT ((int)(sizeof(u32) * 8 * POTENTIAL_BUCKETS_BITMAP_SIZE)) + +#define IP4_WITHDRAWAL_LIMIT 100 +#define IP6_WITHDRAWAL_LIMIT 200 + +enum aggregation_mode { + NET_AGGR, PREFIX_AGGR, +}; + struct aggregator_config { struct proto_config c; struct channel_config *src, *dst; + enum aggregation_mode aggr_mode; uint aggr_on_count; uint aggr_on_da_count; struct aggr_item *aggr_on; const struct f_line *merge_by; + int logging; }; struct aggregator_route { @@ -38,20 +51,29 @@ struct aggregator_bucket { struct rte_src *last_src; /* Which src we announced the bucket last with */ u32 count; u32 hash; + u32 id; struct f_val aggr_data[0]; }; +/* Structure containing information needed for route withdrawal */ +struct rte_withdrawal_item { + struct rte_withdrawal_item *next; + struct aggregator_bucket *bucket; + struct net_addr addr; +}; + struct aggregator_proto { struct proto p; struct channel *src, *dst; + enum aggregation_mode aggr_mode; /* Buckets by aggregator rule */ HASH(struct aggregator_bucket) buckets; - slab *bucket_slab; + linpool *bucket_pool; /* Routes by net and src */ HASH(struct aggregator_route) routes; - slab *route_slab; + linpool *route_pool; /* Aggregator rule */ uint aggr_on_count; @@ -61,6 +83,23 @@ struct aggregator_proto { /* Merge filter */ const struct f_line *merge_by; event reload_buckets; + + /* Aggregation trie */ + uint addr_type; + linpool *trie_pool; + struct trie_node *root; + int logging; + + /* List of bucket pointers */ + struct aggregator_bucket **bucket_list; + size_t bucket_list_size; + size_t bucket_list_count; + + struct hmap bucket_id_map; + + linpool *rte_withdrawal_pool; + struct rte_withdrawal_item *rte_withdrawal_stack; + int rte_withdrawal_count; }; enum aggr_item_type { @@ -83,4 +122,29 @@ struct aggr_item_node { struct aggr_item i; }; +enum fib_status { + UNASSIGNED_FIB, + IN_FIB, + NON_FIB, +}; + +enum prefix_origin { + FILLER, + ORIGINAL, + AGGREGATED, +}; + +struct trie_node { + struct trie_node *parent; + struct trie_node *child[2]; + struct trie_node *ancestor; + struct aggregator_bucket *original_bucket; + struct aggregator_bucket *selected_bucket; + enum fib_status status; + enum prefix_origin px_origin; + u32 potential_buckets[POTENTIAL_BUCKETS_BITMAP_SIZE]; + int potential_buckets_count; + int depth; +}; + #endif diff --git a/proto/aggregator/config.Y b/proto/aggregator/config.Y index 07fdffd10..f73a6dfa2 100644 --- a/proto/aggregator/config.Y +++ b/proto/aggregator/config.Y @@ -20,7 +20,7 @@ CF_DEFINES CF_DECLS -CF_KEYWORDS(AGGREGATOR, AGGREGATE, ON, MERGE, BY) +CF_KEYWORDS(AGGREGATOR, AGGREGATE, ON, MERGE, BY, RELOAD, AFTER, LOG, ALL) %type aggr_item aggr_list @@ -36,7 +36,14 @@ aggregator_proto_start: proto_start AGGREGATOR this_channel = AGGREGATOR_CFG->src = channel_config_new(NULL, "source", 0, this_proto); AGGREGATOR_CFG->dst = channel_config_new(NULL, "destination", 0, this_proto); - AGGREGATOR_CFG->src->ra_mode = AGGREGATOR_CFG->dst->ra_mode = RA_ANY; + /* + * Aggregation mode is set to prefix aggregation by default, in which case we want to receive + * updates with the best routes. + */ + AGGREGATOR_CFG->aggr_mode = PREFIX_AGGR; + AGGREGATOR_CFG->src->ra_mode = RA_OPTIMAL; + AGGREGATOR_CFG->dst->ra_mode = RA_ANY; + AGGREGATOR_CFG->logging = 0; }; aggregator_proto_item: @@ -47,20 +54,23 @@ aggregator_proto_item: if (AGGREGATOR_CFG->aggr_on) cf_error("Only one aggregate on clause allowed"); - _Bool net_present = 0; int count = 0; for (const struct aggr_item_node *item = $3; item; item = item->next) { log(L_WARN "type %d sacode %d", item->i.type, item->i.sa.sa_code); - if (item->i.type == AGGR_ITEM_STATIC_ATTR && item->i.sa.sa_code == SA_NET) - net_present = 1; + + /* + * If NET attribute is present, aggregate routes within the same net + * and receive updates with any routes. + */ + if (item->i.type == AGGR_ITEM_STATIC_ATTR && item->i.sa.sa_code == SA_NET) { + AGGREGATOR_CFG->aggr_mode = NET_AGGR; + AGGREGATOR_CFG->src->ra_mode = RA_ANY; + } count++; } - if (!net_present) - cf_error("'NET' must be present"); - AGGREGATOR_CFG->aggr_on = cfg_alloc(sizeof(struct aggr_item) * count); int pos = 0; @@ -81,11 +91,18 @@ aggregator_proto_item: $4->args++; AGGREGATOR_CFG->merge_by = $4; } + | LOG ALL { AGGREGATOR_CFG->logging = 1; } ; aggregator_proto_opts: /* empty */ | aggregator_proto_opts aggregator_proto_item ';' ; -aggregator_proto: aggregator_proto_start proto_name '{' aggregator_proto_opts '}' ; +aggregator_proto: aggregator_proto_start proto_name '{' aggregator_proto_opts '}' { + if (AGGREGATOR_CFG->src->table->addr_type != AGGREGATOR_CFG->dst->table->addr_type) + cf_error("Both rtables in aggregator must have the same network type"); + if (PREFIX_AGGR == AGGREGATOR_CFG->aggr_mode) + if (AGGREGATOR_CFG->src->table->addr_type != NET_IP4 && AGGREGATOR_CFG->src->table->addr_type != NET_IP6) + cf_error("Trie aggregation is available only for IP4 or IPv6 networks"); +}; aggr_list: aggr_item -- 2.47.2