From 51066c131859997b85827cc871e039c3b222c9a9 Mon Sep 17 00:00:00 2001 From: pcarana Date: Fri, 15 Mar 2019 09:41:08 -0600 Subject: [PATCH] Calculate and get changes between serials. Use a base serial, the base will always be the last DB update. Calculate the difference between the latest version and the past, and store it as delta. Save pointers to deltas, increment last serial number when the update ops are complete. Avoid to send duplicate announcements/withdrawals. --- src/csv.c | 8 +- src/rtr/pdu_handler.c | 9 +-- src/rtr/pdu_sender.c | 20 +++-- src/vrps.c | 181 ++++++++++++++++++++++++++++++++---------- src/vrps.h | 4 +- 5 files changed, 160 insertions(+), 62 deletions(-) diff --git a/src/csv.c b/src/csv.c index 98720a0e..d9323a92 100644 --- a/src/csv.c +++ b/src/csv.c @@ -179,7 +179,7 @@ error: } static int -load_vrps(struct line_file *lfile) +load_vrps(struct line_file *lfile, bool is_update) { struct delta *delta; char *line; @@ -205,7 +205,7 @@ load_vrps(struct line_file *lfile) error = lfile_read(lfile, &line); if (error) { err(error, "Error reading line %d, stop processing file.", current_line); - delta_destroy(delta); + delta_destroy(&delta); goto end; } if (line == NULL) { @@ -219,7 +219,7 @@ load_vrps(struct line_file *lfile) error = add_vrp(line, delta); if (error) { - delta_destroy(delta); + delta_destroy(&delta); goto end; } } while (true); @@ -266,7 +266,7 @@ load_vrps_file(bool check_update, bool *updated) if (check_update && last_update <= get_vrps_last_modified_date()) goto end2; - error = load_vrps(lfile); + error = load_vrps(lfile, check_update); if (error) goto end2; diff --git a/src/rtr/pdu_handler.c b/src/rtr/pdu_handler.c index 0d17f215..741c7ef0 100644 --- a/src/rtr/pdu_handler.c +++ b/src/rtr/pdu_handler.c @@ -82,14 +82,7 @@ handle_serial_query_pdu(int fd, void *pdu) return send_cache_reset_pdu(&common); case DIFF_AVAILABLE: /* https://tools.ietf.org/html/rfc8210#section-8.2 */ - /* - * TODO The diff calculation between serials isn't quite ready yet, - * so always respond with a cache reset. When the implementation is - * ready use: - * - * return send_commmon_exchange(&common); - */ - return send_cache_reset_pdu(&common); + return send_commmon_exchange(&common); case NO_DIFF: /* Typical exchange with no Payloads */ error = send_cache_response_pdu(&common); diff --git a/src/rtr/pdu_sender.c b/src/rtr/pdu_sender.c index 49ac4914..ad173c1b 100644 --- a/src/rtr/pdu_sender.c +++ b/src/rtr/pdu_sender.c @@ -198,26 +198,30 @@ send_ipv6_prefix_pdu(struct sender_common *common, struct vrp *vrp) int send_payload_pdus(struct sender_common *common) { - struct vrp *vrps, *ptr; + struct vrp *vrps; unsigned int len, i; int error; - len = get_vrps_delta(common->start_serial, common->end_serial, &vrps); + vrps = malloc(sizeof(struct vrp)); + len = get_vrps_delta(common->start_serial, common->end_serial, vrps); if (len == 0) return 0; - ptr = vrps; for (i = 0; i < len; i++) { - if (ptr->in_addr_len == INET_ADDRSTRLEN) - error = send_ipv4_prefix_pdu(common, ptr); + if (vrps[i].in_addr_len == INET_ADDRSTRLEN) + error = send_ipv4_prefix_pdu(common, &vrps[i]); + else if (vrps[i].in_addr_len == INET6_ADDRSTRLEN) + error = send_ipv6_prefix_pdu(common, &vrps[i]); else - error = send_ipv6_prefix_pdu(common, ptr); + error = -EINVAL; - if (error) + if (error) { + free(vrps); return error; - ptr++; + } } + free(vrps); return 0; } diff --git a/src/vrps.c b/src/vrps.c index f4024532..c79d6bce 100644 --- a/src/vrps.c +++ b/src/vrps.c @@ -1,9 +1,12 @@ #include "vrps.h" +#include +#include #include "array_list.h" #define FLAG_WITHDRAWAL 0 #define FLAG_ANNOUNCEMENT 1 +#define START_SERIAL 0 ARRAY_LIST(vrps, struct vrp) @@ -12,9 +15,10 @@ struct delta { struct vrps vrps; }; -ARRAY_LIST(deltasdb, struct delta) +ARRAY_LIST(deltasdb, struct delta *) struct state { + struct delta *base_db; struct deltasdb *deltas_db; u_int32_t current_serial; u_int16_t v0_session_id; @@ -27,6 +31,12 @@ deltas_db_init(void) { int error; + state.base_db = create_delta(); + if (state.base_db == NULL){ + err(-ENOMEM, "Delta base DB couldn't be initialized"); + return -ENOMEM; + } + state.deltas_db = malloc(sizeof(struct deltasdb)); if (state.deltas_db == NULL){ err(-ENOMEM, "Deltas DB couldn't be allocated"); @@ -38,7 +48,7 @@ deltas_db_init(void) err(error, "Deltas DB couldn't be initialized"); return error; } - state.current_serial = 0; + state.current_serial = START_SERIAL; /* The downcast takes the LSBs */ state.v0_session_id = time(NULL); /* Minus 1 to prevent same ID */ @@ -78,6 +88,7 @@ create_vrp (u_int32_t asn, u_int8_t prefix_length, u_int8_t max_prefix_length) result->asn = asn; result->prefix_length = prefix_length; result->max_prefix_length = max_prefix_length; + /* Set as ANNOUNCEMENT by default */ result->flags = FLAG_ANNOUNCEMENT; return result; @@ -115,11 +126,88 @@ create_vrp6(u_int32_t asn, struct in6_addr ipv6_prefix, u_int8_t prefix_length, return result; } +static bool +vrp_equal(struct vrp *left, struct vrp *right) +{ + return left->asn == right->asn && left->in_addr_len == right->in_addr_len + && left->prefix_length == right->prefix_length + && left->max_prefix_length == right->max_prefix_length + && ((left->in_addr_len == INET_ADDRSTRLEN + && left->ipv4_prefix.s_addr == right->ipv4_prefix.s_addr) + || (left->in_addr_len == INET6_ADDRSTRLEN + && IN6_ARE_ADDR_EQUAL(left->ipv6_prefix.s6_addr32, + right->ipv6_prefix.s6_addr32))); +} + +static struct vrp * +vrp_locate(struct vrps *base, struct vrp *vrp) +{ + struct vrp *cursor; + + ARRAYLIST_FOREACH(base, cursor) + if (vrp_equal(cursor, vrp)) + return cursor; + + return NULL; +} + +static bool +vrp_is_new(struct vrps *base, struct vrp *vrp) +{ + return vrp_locate(base, vrp) == NULL; +} + +static struct delta * +delta_resume(struct delta *delta) +{ + struct delta *resume_delta; + struct vrps *base, *search_list; + struct vrp *cursor; + + resume_delta = create_delta(); + resume_delta->serial = delta->serial; + /* First check for announcements */ + base = &delta->vrps; + search_list = &state.base_db->vrps; + ARRAYLIST_FOREACH(base, cursor) + if (vrp_is_new(search_list, cursor)) { + cursor->flags = FLAG_ANNOUNCEMENT; + delta_add_vrp(resume_delta, cursor); + } + + /* Now for withdrawals */ + base = &state.base_db->vrps; + search_list = &delta->vrps; + ARRAYLIST_FOREACH(base, cursor) + if (vrp_is_new(search_list, cursor)) { + cursor->flags = FLAG_WITHDRAWAL; + delta_add_vrp(resume_delta, cursor); + } + + return resume_delta; +} + int deltas_db_add_delta(struct delta *delta) { - delta->serial = state.current_serial++; - return deltasdb_add(state.deltas_db, delta); + struct delta *resume; + int result; + + result = 0; + delta->serial = state.current_serial; + /* Store only updates */ + if (delta->serial != START_SERIAL) { + resume = delta_resume(delta); + result = deltasdb_add(state.deltas_db, &resume); + } + /* Don't set the base in case of error */ + if (result != 0) + return result; + + free(state.base_db); + state.base_db = delta; + state.current_serial++; + return result; } int @@ -135,10 +223,11 @@ vrp_destroy(struct vrp *vrp) } void -delta_destroy(struct delta *delta) +delta_destroy(struct delta **delta) { /* Nothing else to free yet */ - vrps_cleanup(&delta->vrps, vrp_destroy); + vrps_cleanup(&(*delta)->vrps, vrp_destroy); + free(*delta); } void @@ -146,15 +235,7 @@ deltas_db_destroy(void) { deltasdb_cleanup(state.deltas_db, delta_destroy); free(state.deltas_db); -} - -static unsigned int -get_delta_diff(struct delta *start_delta, struct delta *end_delta, - struct vrp **result) -{ - /* TODO Do some magic to get the diff */ - *result = state.deltas_db->array[state.deltas_db->len - 1].vrps.array; - return state.deltas_db->array[state.deltas_db->len - 1].vrps.len; + free(state.base_db); } /* @@ -173,11 +254,9 @@ get_delta_diff(struct delta *start_delta, struct delta *end_delta, int deltas_db_status(u_int32_t *serial) { - struct deltasdb *deltas_db; - struct delta *delta; + struct delta **delta; - deltas_db = state.deltas_db; - if (deltas_db->len == 0) + if (state.base_db->vrps.len == 0) return NO_DATA_AVAILABLE; // No serial to match, and there's data at DB @@ -185,19 +264,45 @@ deltas_db_status(u_int32_t *serial) return DIFF_AVAILABLE; /* Is the last version? */ - if (*serial == deltas_db->array[deltas_db->len-1].serial) + if (*serial == state.base_db->serial) return NO_DIFF; /* Get the delta corresponding to the serial */ - ARRAYLIST_FOREACH(deltas_db, delta) { - if (delta->serial == *serial) + ARRAYLIST_FOREACH(state.deltas_db, delta) { + if ((*delta)->serial == *serial) return DIFF_AVAILABLE; } + /* The first serial isn't at deltas */ + if (*serial == START_SERIAL) + return DIFF_AVAILABLE; + /* Reached end, diff can't be determined */ return DIFF_UNDETERMINED; } +static void +add_vrps_filtered(struct vrps *dst, struct vrps *src) +{ + int i; + for (i = 0; i < src->len; i++) + if (vrp_is_new(dst, &src->array[i])) + vrps_add(dst, &src->array[i]); +} + +static void +copy_vrps(struct vrp *dst, struct vrp *src, unsigned int len) +{ + struct vrp *tmp; + tmp = realloc(dst, len * sizeof(struct vrp)); + if (tmp == NULL) { + err(-ENOMEM, "Couldn't copy VRPs"); + return; + } + dst = tmp; + memcpy(dst, src, len * sizeof(struct vrp)); +} + /* * Get the number of updates from serial START_SERIAL to END_SERIAL, set them * at RESULT. @@ -207,21 +312,19 @@ deltas_db_status(u_int32_t *serial) */ unsigned int get_vrps_delta(u_int32_t *start_serial, u_int32_t *end_serial, - struct vrp **result) + struct vrp *result) { - struct deltasdb *deltas_db; - struct delta *delta0, *delta1; + struct delta **delta1; + struct vrps summary; - deltas_db = state.deltas_db; /* No data */ - if (deltas_db->len == 0) + if (state.base_db->vrps.len == 0) return 0; /* NULL start? Send the last version, there's no need to iterate DB */ if (start_serial == NULL) { - *result = deltas_db->array[deltas_db->len - 1].vrps.array; - return deltas_db->array[deltas_db->len - 1].vrps.len; - /* TODO Send all data as ANNOUNCEMENTS */ + copy_vrps(result, state.base_db->vrps.array, state.base_db->vrps.len); + return state.base_db->vrps.len; } /* Apparently nothing to return */ @@ -229,19 +332,17 @@ get_vrps_delta(u_int32_t *start_serial, u_int32_t *end_serial, return 0; /* Get the delta corresponding to the serials */ - delta0 = NULL; - ARRAYLIST_FOREACH(deltas_db, delta1) { - if (delta1->serial == *start_serial) - delta0 = delta1; - if (delta1->serial == *end_serial) + vrps_init(&summary); + ARRAYLIST_FOREACH(state.deltas_db, delta1) { + if ((*delta1)->serial > *start_serial) + add_vrps_filtered(&summary, &(*delta1)->vrps); + if ((*delta1)->serial == *end_serial) break; } - /* Reached end or no delta0 found, diff can't be determined, send error */ - if (delta1 == NULL || delta0 == NULL) - return 0; - - return get_delta_diff(delta0, delta1, result); + copy_vrps(result, summary.array, summary.len); + vrps_cleanup(&summary, vrp_destroy); + return summary.len; } void diff --git a/src/vrps.h b/src/vrps.h index 7931e57a..6546061b 100644 --- a/src/vrps.h +++ b/src/vrps.h @@ -33,10 +33,10 @@ int delta_add_vrp(struct delta *, struct vrp *); int deltas_db_add_delta(struct delta *); int deltas_db_status(u_int32_t *); -unsigned int get_vrps_delta(u_int32_t *, u_int32_t *, struct vrp **); +unsigned int get_vrps_delta(u_int32_t *, u_int32_t *, struct vrp *); void vrp_destroy(struct vrp *); -void delta_destroy(struct delta *); +void delta_destroy(struct delta **); void deltas_db_destroy(void); void set_vrps_last_modified_date(time_t); -- 2.47.3