From: Alberto Leiva Popper Date: Wed, 7 Jul 2021 19:42:39 +0000 (-0500) Subject: RTR Server: thread-pool.server.max now refers to RTR requests X-Git-Tag: v1.5.1~5 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=23478fdff80e8ceeaeeaffc71880f950b3c71104;p=thirdparty%2FFORT-validator.git RTR Server: thread-pool.server.max now refers to RTR requests Apparently, there was a huge misunderstanding when the thread pool was implemented. The intended model was > When the RTR server receives a request, it borrows a thread from the > thread pool, and tasks it with the request. Which is logical and a typical thread pool use case. However, what was actually implemented was > When the RTR server opens a connection, it borrows a thread from the > thread pool, and tasks it with the whole connection. So `thread-pool.server.max` was a hard limit for simultaneous RTR clients (routers), but now it's just a limit to simultaneous RTR requests. (Surplus requests will queue.) This is much less taxing to the CPU when there are hundreds of clients. Thanks to Mark Tinka for basically spelling this out to me. ----------------------- Actually, this commit is an almost entire rewrite of the RTR server core. Here's a (possibly incomplete) list of other problems I had to fix in the process: == Problem 1 == sockaddr2str() was returning a pointer to invalid memory on success. This was due to a naive attempt of a bugfix from 1ff403a0c7f61d443cbc4e2e512b8d0324547856. == Problem 2 == Changed the delta expiration conditional. Was "keep track of the clients, expire deltas when all clients outgrow them." I see two problems with that: 1. It'll lead to bad performance if a client misbehaves by not maintaining the connection. (ie. the server will have to fall back to too many cache resets.) 2. It might keep the deltas forever if a client bugs out without killing the connection. New conditional is "keep deltas for server.deltas.lifetime iterations." "server.deltas.lifetime" is a new configuration argument. == Problem 3 == Serials weren't being compared according to RFC 1982 serial arithmetic. This was going to cause mayhem when the integer wrapped. (Though Fort always starts at 1, and serials are 32-bit unsigned integers, so this wasn't going to be a problem for a very long time.) == Problem 4 == The thread pool had an awkward termination bug. When threads were suspended, they were meant to be ended through a pthread signal, but when they were running, they were supposed to be terminated through pthread_cancel(). (Because, since each client was assigned a thread, they would spend most of their time sleeping.) These termination methods don't play well with each other. Apparently, threads waiting on a signal cannot be canceled, because of this strange quirk from man 3 pthread_cond_wait: > a side effect of acting upon a cancellation request while in a > condition wait is that the mutex is (in effect) re-acquired before > calling the first cancellation cleanup handler. (So the first thread dies with the mutex locked, and no other threads can be canceled because no one can ever lock the mutex again.) And of course, you can't stop a server thread through a signal, because they aren't listening to it; they're sleeping in wait for a request. I still don't really know how would I fix this, but luckily, the problem no longer exists since working threads are mapped to single requests, and therefore no longer sleep. (For long periods of time, anyway.) So always using the signal works fine. --- diff --git a/src/Makefile.am b/src/Makefile.am index ef6dfa1f..0cb383be 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -10,7 +10,7 @@ fort_SOURCES += address.h address.c fort_SOURCES += algorithm.h algorithm.c fort_SOURCES += certificate_refs.h certificate_refs.c fort_SOURCES += cert_stack.h cert_stack.c -fort_SOURCES += clients.c clients.h +fort_SOURCES += clients.h fort_SOURCES += common.c common.h fort_SOURCES += config.h config.c fort_SOURCES += daemon.h daemon.c @@ -29,6 +29,7 @@ fort_SOURCES += random.h random.c fort_SOURCES += reqs_errors.h reqs_errors.c fort_SOURCES += resource.h resource.c fort_SOURCES += rpp.h rpp.c +fort_SOURCES += serial.h serial.c fort_SOURCES += sorted_array.h sorted_array.c fort_SOURCES += state.h state.c fort_SOURCES += str_token.h str_token.c @@ -109,6 +110,7 @@ fort_SOURCES += rtr/rtr.c rtr/rtr.h fort_SOURCES += rtr/db/db_table.c rtr/db/db_table.h fort_SOURCES += rtr/db/delta.c rtr/db/delta.h +fort_SOURCES += rtr/db/deltas_array.c rtr/db/deltas_array.h fort_SOURCES += rtr/db/roa.c rtr/db/roa.h fort_SOURCES += rtr/db/vrp.h fort_SOURCES += rtr/db/vrps.c rtr/db/vrps.h diff --git a/src/address.c b/src/address.c index acf96a62..6b5cf8b6 100644 --- a/src/address.c +++ b/src/address.c @@ -518,15 +518,16 @@ ipv6_covered(struct in6_addr *f_addr, uint8_t f_len, struct in6_addr *son_addr) /** * buffer must length INET6_ADDRSTRLEN. */ -char const * -sockaddr2str(struct sockaddr_storage *sockaddr) +void +sockaddr2str(struct sockaddr_storage *sockaddr, char *buffer) { void *addr = NULL; - char const *addr_str; - char buffer[INET6_ADDRSTRLEN]; + char const *str; - if (sockaddr == NULL) - return "(null)"; + if (sockaddr == NULL) { + strcpy(buffer, "(null)"); + return; + } switch (sockaddr->ss_family) { case AF_INET: @@ -536,11 +537,12 @@ sockaddr2str(struct sockaddr_storage *sockaddr) addr = &((struct sockaddr_in6 *) sockaddr)->sin6_addr; break; default: - return "(protocol unknown)"; + strcpy(buffer, "(protocol unknown)"); + return; } - addr_str = inet_ntop(sockaddr->ss_family, addr, buffer, - INET6_ADDRSTRLEN); - return (addr_str != NULL) ? addr_str : "(unprintable address)"; + str = inet_ntop(sockaddr->ss_family, addr, buffer, INET6_ADDRSTRLEN); + if (str == NULL) + strcpy(buffer, "(unprintable address)"); } diff --git a/src/address.h b/src/address.h index a69b8308..53585ec4 100644 --- a/src/address.h +++ b/src/address.h @@ -51,6 +51,6 @@ int ipv6_prefix_validate(struct ipv6_prefix *); bool ipv4_covered(struct in_addr *, uint8_t, struct in_addr *); bool ipv6_covered(struct in6_addr *, uint8_t, struct in6_addr *); -char const *sockaddr2str(struct sockaddr_storage *); +void sockaddr2str(struct sockaddr_storage *, char *buffer); #endif /* SRC_ADDRESS_H_ */ diff --git a/src/asn1/asn1c/INTEGER.c b/src/asn1/asn1c/INTEGER.c index 254e78fb..d64c458e 100644 --- a/src/asn1/asn1c/INTEGER.c +++ b/src/asn1/asn1c/INTEGER.c @@ -177,7 +177,7 @@ INTEGER__dump(const asn_TYPE_descriptor_t *td, const INTEGER_t *st, asn_app_cons } /* Output in the long xx:yy:zz... format */ - /* TODO: replace with generic algorithm (Knuth TAOCP Vol 2, 4.3.1) */ + /* TODO (asn1c) replace with generic algorithm (Knuth TAOCP Vol 2, 4.3.1) */ for(p = scratch; buf < buf_end; buf++) { const char * const h2c = "0123456789ABCDEF"; if((p - scratch) >= (ssize_t)(sizeof(scratch) - 4)) { @@ -684,7 +684,7 @@ INTEGER_decode_uper(const asn_codec_ctx_t *opt_codec_ctx, /* #12.2.3 */ if(ct && ct->lower_bound) { /* - * TODO: replace by in-place arithmetics. + * TODO (asn1c) replace by in-place arithmetics. */ long value = 0; if(asn_INTEGER2long(st, &value)) @@ -779,7 +779,7 @@ INTEGER_encode_uper(const asn_TYPE_descriptor_t *td, if(ct && ct->lower_bound) { ASN_DEBUG("Adjust lower bound to %ld", ct->lower_bound); - /* TODO: adjust lower bound */ + /* TODO (asn1c) adjust lower bound */ ASN__ENCODE_FAILED; } diff --git a/src/asn1/asn1c/OCTET_STRING.c b/src/asn1/asn1c/OCTET_STRING.c index 88589b09..eedc420e 100644 --- a/src/asn1/asn1c/OCTET_STRING.c +++ b/src/asn1/asn1c/OCTET_STRING.c @@ -711,7 +711,7 @@ OS__check_escaped_control_char(const void *buf, int size) { /* * Inefficient algorithm which translates the escape sequences * defined above into characters. Returns -1 if not found. - * TODO: replace by a faster algorithm (bsearch(), hash or + * TODO (asn1c) replace by a faster algorithm (bsearch(), hash or * nested table lookups). */ for(i = 0; i < 32 /* Don't spend time on the bottom half */; i++) { diff --git a/src/asn1/asn1c/constr_SEQUENCE.c b/src/asn1/asn1c/constr_SEQUENCE.c index 67efb0de..4a95dbce 100644 --- a/src/asn1/asn1c/constr_SEQUENCE.c +++ b/src/asn1/asn1c/constr_SEQUENCE.c @@ -1472,7 +1472,7 @@ SEQUENCE_encode_uper(const asn_TYPE_descriptor_t *td, ASN_DEBUG("Bit-map of %d elements", n_extensions); /* #18.7. Encoding the extensions presence bit-map. */ - /* TODO: act upon NOTE in #18.7 for canonical PER */ + /* TODO (asn1c) act upon NOTE in #18.7 for canonical PER */ if(SEQUENCE__handle_extensions(td, sptr, po, 0) != n_extensions) ASN__ENCODE_FAILED; diff --git a/src/clients.c b/src/clients.c deleted file mode 100644 index 9ce2f3fb..00000000 --- a/src/clients.c +++ /dev/null @@ -1,260 +0,0 @@ -#include "clients.h" - -#include "common.h" -#include "log.h" -#include "data_structure/uthash_nonfatal.h" -#include "rtr/pdu.h" - -struct hashable_client { - struct client meat; - UT_hash_handle hh; -}; - -/** Hash table of clients */ -static struct clients_table { - struct hashable_client *clients; -} db; - -/** Read/write lock, which protects @table and its inhabitants. */ -static pthread_rwlock_t lock; - -int -clients_db_init(void) -{ - int error; - - db.clients = NULL; - - error = pthread_rwlock_init(&lock, NULL); - if (error) - return pr_op_errno(error, "pthread_rwlock_init() errored"); - return 0; -} - -static struct hashable_client * -create_client(int fd, struct sockaddr_storage addr) -{ - struct hashable_client *client; - - client = malloc(sizeof(struct hashable_client)); - if (client == NULL) - return NULL; - /* Needed by uthash */ - memset(client, 0, sizeof(struct hashable_client)); - - client->meat.fd = fd; - client->meat.serial_number_set = false; - client->meat.rtr_version_set = false; - client->meat.addr = addr; - - return client; -} - -/* - * If the client whose file descriptor is @fd isn't already stored, store it. - */ -int -clients_add(int fd, struct sockaddr_storage addr) -{ - struct hashable_client *new_client; - struct hashable_client *old_client; - - new_client = create_client(fd, addr); - if (new_client == NULL) - return pr_enomem(); - - rwlock_write_lock(&lock); - - errno = 0; - HASH_REPLACE(hh, db.clients, meat.fd, sizeof(new_client->meat.fd), - new_client, old_client); - if (errno) { - rwlock_unlock(&lock); - free(new_client); - return -pr_op_errno(errno, "Client couldn't be stored"); - } - if (old_client != NULL) - free(old_client); - - rwlock_unlock(&lock); - - return 0; -} - -void -clients_update_serial(int fd, serial_t serial) -{ - struct hashable_client *cur_client; - - rwlock_write_lock(&lock); - HASH_FIND_INT(db.clients, &fd, cur_client); - if (cur_client == NULL) - goto unlock; - - cur_client->meat.serial_number = serial; - cur_client->meat.serial_number_set = true; - -unlock: - rwlock_unlock(&lock); -} - -int -clients_get_min_serial(serial_t *result) -{ - struct hashable_client *current, *tmp; - int retval; - - retval = -ENOENT; - rwlock_read_lock(&lock); - if (HASH_COUNT(db.clients) == 0) - goto unlock; - - HASH_ITER(hh, db.clients, current, tmp) { - if (!current->meat.serial_number_set) - continue; - if (retval) { - *result = current->meat.serial_number; - retval = 0; - } else if (current->meat.serial_number < *result) - *result = current->meat.serial_number; - } - -unlock: - rwlock_unlock(&lock); - return retval; -} - -int -clients_set_rtr_version(int fd, uint8_t rtr_version) -{ - struct hashable_client *client; - int result; - - result = -ENOENT; - rwlock_write_lock(&lock); - - HASH_FIND_INT(db.clients, &fd, client); - if (client == NULL) - goto unlock; - - if (client->meat.rtr_version_set) { - result = -EINVAL; /* Can't be modified */ - goto unlock; - } - - client->meat.rtr_version = rtr_version; - client->meat.rtr_version_set = true; - result = 0; -unlock: - rwlock_unlock(&lock); - - return result; -} - -int -clients_get_rtr_version_set(int fd, bool *is_set, uint8_t *rtr_version) -{ - struct hashable_client *client; - int result; - - result = -ENOENT; - rwlock_read_lock(&lock); - - HASH_FIND_INT(db.clients, &fd, client); - if (client != NULL) { - (*is_set) = client->meat.rtr_version_set; - (*rtr_version) = client->meat.rtr_version; - result = 0; - } - - rwlock_unlock(&lock); - - return result; -} - -/* - * Remove the client with ID @fd from the DB. If a @cb is set, it will be - * called before deleting the client from the DB. - */ -void -clients_forget(int fd, clients_foreach_cb cb, void *arg) -{ - struct hashable_client *client; - - rwlock_write_lock(&lock); - - HASH_FIND_INT(db.clients, &fd, client); - if (client != NULL) { - HASH_DEL(db.clients, client); - /* Nothing to do at errors */ - if (cb != NULL) - cb(&client->meat, arg); - free(client); - } - - rwlock_unlock(&lock); -} - -int -clients_foreach(clients_foreach_cb cb, void *arg) -{ - struct hashable_client *client, *tmp; - int error; - - error = rwlock_read_lock(&lock); - if (error) - return error; - - HASH_ITER(hh, db.clients, client, tmp) { - error = cb(&client->meat, arg); - if (error) - break; - } - - rwlock_unlock(&lock); - - return error; -} - -/* - * Terminate all clients and remove them from DB. - * - * @cb (with @arg) will be called before deleting the corresponding client from - * the DB. - */ -int -clients_terminate_all(clients_foreach_cb cb, void *arg) -{ - struct hashable_client *node, *tmp; - int error; - - rwlock_write_lock(&lock); - - HASH_ITER(hh, db.clients, node, tmp) { - error = cb(&node->meat, arg); - if (error) - break; - HASH_DEL(db.clients, node); - free(node); - } - - rwlock_unlock(&lock); - - return error; -} - -/* - * Destroy the clients DB. - */ -void -clients_db_destroy(void) -{ - struct hashable_client *node, *tmp; - - HASH_ITER(hh, db.clients, node, tmp) { - HASH_DEL(db.clients, node); - free(node); - } - - pthread_rwlock_destroy(&lock); /* Nothing to do with error code */ -} diff --git a/src/clients.h b/src/clients.h deleted file mode 100644 index b22792ab..00000000 --- a/src/clients.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef SRC_CLIENTS_H_ -#define SRC_CLIENTS_H_ - -#include -#include -#include "rtr/pdu.h" -#include "rtr/db/vrp.h" - -struct client { - int fd; - struct sockaddr_storage addr; - - serial_t serial_number; - bool serial_number_set; - - uint8_t rtr_version; - bool rtr_version_set; -}; - -int clients_db_init(void); - -int clients_add(int, struct sockaddr_storage); -void clients_update_serial(int, serial_t); - -typedef int (*clients_foreach_cb)(struct client *, void *); -void clients_forget(int, clients_foreach_cb, void *); -int clients_foreach(clients_foreach_cb, void *); - -int clients_get_min_serial(serial_t *); - -int clients_set_rtr_version(int, uint8_t); -int clients_get_rtr_version_set(int, bool *, uint8_t *); - -int clients_terminate_all(clients_foreach_cb, void *); -void clients_db_destroy(void); - -#endif /* SRC_CLIENTS_H_ */ diff --git a/src/config.c b/src/config.c index cdd364dc..fe0e4b78 100644 --- a/src/config.c +++ b/src/config.c @@ -81,6 +81,8 @@ struct rpki_config { unsigned int retry; unsigned int expire; } interval; + /** Number of iterations the deltas will be stored. */ + unsigned int deltas_lifetime; } server; struct { @@ -435,6 +437,14 @@ static const struct option_field options[] = { */ .min = 600, .max = 172800, + }, { + .id = 5007, + .name = "server.deltas.lifetime", + .type = >_uint, + .offset = offsetof(struct rpki_config, server.deltas_lifetime), + .doc = "Number of iterations the deltas will be stored.", + .min = 0, + .max = UINT_MAX, }, /* RSYNC fields */ @@ -977,6 +987,7 @@ set_default_values(void) rpki_config.server.interval.refresh = 3600; rpki_config.server.interval.retry = 600; rpki_config.server.interval.expire = 7200; + rpki_config.server.deltas_lifetime = 4; rpki_config.tal = NULL; rpki_config.slurm = NULL; @@ -1332,6 +1343,12 @@ config_get_interval_expire(void) return rpki_config.server.interval.expire; } +unsigned int +config_get_deltas_lifetime(void) +{ + return rpki_config.server.deltas_lifetime; +} + char const * config_get_slurm(void) { diff --git a/src/config.h b/src/config.h index cdb07208..d072db2b 100644 --- a/src/config.h +++ b/src/config.h @@ -24,6 +24,7 @@ unsigned int config_get_validation_interval(void); unsigned int config_get_interval_refresh(void); unsigned int config_get_interval_retry(void); unsigned int config_get_interval_expire(void); +unsigned int config_get_deltas_lifetime(void); char const *config_get_slurm(void); char const *config_get_tal(void); diff --git a/src/data_structure/array_list.h b/src/data_structure/array_list.h index 716dbe62..01808b47 100644 --- a/src/data_structure/array_list.h +++ b/src/data_structure/array_list.h @@ -30,6 +30,7 @@ list->capacity = 0; \ } \ \ + /* Call name##_init() again if you want to reuse the list. */ \ modifiers void \ name##_cleanup(struct name *list, void (*cb)(elem_type *)) \ { \ diff --git a/src/log.c b/src/log.c index 1c54a10d..48658bd7 100644 --- a/src/log.c +++ b/src/log.c @@ -17,11 +17,11 @@ struct level { FILE *stream; }; -static struct level DBG = { "DBG", "\x1B[36m" }; -static struct level INF = { "INF", "\x1B[37m" }; -static struct level WRN = { "WRN", "\x1B[33m" }; -static struct level ERR = { "ERR", "\x1B[31m" }; -static struct level CRT = { "CRT", "\x1B[35m" }; +static struct level DBG = { "DBG", "\x1B[36m" }; /* Cyan */ +static struct level INF = { "INF", "\x1B[37m" }; /* White */ +static struct level WRN = { "WRN", "\x1B[33m" }; /* Yellow */ +static struct level ERR = { "ERR", "\x1B[31m" }; /* Red */ +static struct level CRT = { "CRT", "\x1B[35m" }; /* Purple */ static struct level UNK = { "UNK", "" }; #define COLOR_RESET "\x1B[0m" @@ -98,10 +98,10 @@ print_stack_trace(char const *title) free(strings); } -static void init_config(struct log_config *cfg) +static void init_config(struct log_config *cfg, bool unit_tests) { cfg->fprintf_enabled = true; - cfg->syslog_enabled = true; + cfg->syslog_enabled = !unit_tests; cfg->level = LOG_DEBUG; cfg->prefix = NULL; cfg->color = false; @@ -166,7 +166,8 @@ register_signal_handlers(void) } /* - * SIGPIPE is sometimes triggered by libcurl: + * SIGPIPE can be triggered by any I/O function. libcurl is particularly + * tricky: * * > libcurl makes an effort to never cause such SIGPIPEs to trigger, * > but some operating systems have no way to avoid them and even on @@ -174,8 +175,8 @@ register_signal_handlers(void) * > happen * (Documentation of CURLOPT_NOSIGNAL) * - * All SIGPIPE usually means is "the server closed the connection for - * some reason, fuck you." + * All SIGPIPE means is "the peer closed the connection for some reason, + * fuck you." * Which is a normal I/O error, and should be handled by the normal * error propagation logic, not by a signal handler. * So, ignore SIGPIPE. @@ -193,7 +194,7 @@ register_signal_handlers(void) } int -log_setup(void) +log_setup(bool unit_tests) { /* * Remember not to use any actual logging functions until logging has @@ -209,22 +210,25 @@ log_setup(void) CRT.stream = stderr; UNK.stream = stdout; - openlog("fort", LOG_CONS | LOG_PID, LOG_DAEMON); + if (unit_tests) + openlog("fort", LOG_CONS | LOG_PID, LOG_DAEMON); - init_config(&op_config); - init_config(&val_config); + init_config(&op_config, unit_tests); + init_config(&val_config, unit_tests); error = pthread_mutex_init(&logck, NULL); if (error) { fprintf(ERR.stream, "pthread_mutex_init() returned %d: %s\n", error, strerror(abs(error))); - syslog(LOG_ERR | op_config.facility, - "pthread_mutex_init() returned %d: %s", - error, strerror(abs(error))); + if (!unit_tests) + syslog(LOG_ERR | op_config.facility, + "pthread_mutex_init() returned %d: %s", + error, strerror(abs(error))); return error; } - register_signal_handlers(); + if (!unit_tests) + register_signal_handlers(); return 0; } diff --git a/src/log.h b/src/log.h index 22fce170..4155638d 100644 --- a/src/log.h +++ b/src/log.h @@ -48,7 +48,7 @@ * log_init() finishes initialization by loading the user's intended config. * log_teardown() reverts initialization. */ -int log_setup(void); +int log_setup(bool); void log_start(void); void log_teardown(void); @@ -121,8 +121,13 @@ int incidence(enum incidence_id, const char *, ...) CHECK_FORMAT(2, 3); * These are not meant to be uploaded; remember to delete them once the bug has * been found. */ -#define PR_DEBUG printf("%s:%d (%s())\n", __FILE__, __LINE__, __func__) -#define PR_DEBUG_MSG(msg, ...) printf("%s:%d (%s()): " msg "\n", \ +#define DBG_COLOR "\x1B[32m" /* Green */ +#define DBG_COLOR_RESET "\x1B[0m" +#define PR_DEBUG \ + printf(DBG_COLOR "%s:%d (%s())" DBG_COLOR_RESET "\n", \ + __FILE__, __LINE__, __func__) +#define PR_DEBUG_MSG(msg, ...) \ + printf(DBG_COLOR "%s:%d (%s()): " msg DBG_COLOR_RESET "\n", \ __FILE__, __LINE__, __func__, ##__VA_ARGS__) #endif /* SRC_LOG_H_ */ diff --git a/src/main.c b/src/main.c index 790f2a37..ebbf2af1 100644 --- a/src/main.c +++ b/src/main.c @@ -1,4 +1,3 @@ -#include "clients.h" #include "config.h" #include "extension.h" #include "internal_pool.h" @@ -69,7 +68,7 @@ main(int argc, char **argv) /* Initializations */ - error = log_setup(); + error = log_setup(false); if (error) goto just_quit; @@ -93,8 +92,8 @@ main(int argc, char **argv) goto revert_nid; /* - * TODO this looks like a lot of overhead. Is it really necessary - * when mode is STANDALONE? + * TODO (performance) this looks like a lot of overhead. Is it really + * necessary when mode is STANDALONE? */ error = internal_pool_init(); if (error) @@ -112,9 +111,6 @@ main(int argc, char **argv) error = reqs_errors_init(); if (error) goto db_rrdp_cleanup; - error = clients_db_init(); - if (error) - goto revert_reqs_errors; /* Do stuff */ switch (config_get_mode()) { @@ -128,8 +124,6 @@ main(int argc, char **argv) /* End */ - clients_db_destroy(); -revert_reqs_errors: reqs_errors_cleanup(); db_rrdp_cleanup: db_rrdp_cleanup(); diff --git a/src/notify.c b/src/notify.c index 6d8ee3c5..9bfa4a7c 100644 --- a/src/notify.c +++ b/src/notify.c @@ -2,19 +2,17 @@ #include #include -#include "clients.h" #include "log.h" +#include "rtr/rtr.h" #include "rtr/pdu_sender.h" #include "rtr/db/vrps.h" static int -send_notify(struct client *client, void *arg) +send_notify(struct rtr_client const *client, void *arg) { serial_t *serial = arg; - /* Send Serial Notify PDU */ - send_serial_notify_pdu(client->fd, client->rtr_version, - *serial); + send_serial_notify_pdu(client->fd, client->rtr_version, *serial); /* Errors already logged, do not interrupt notify to other clients */ return 0; @@ -30,5 +28,5 @@ notify_clients(void) if (error) return error; - return clients_foreach(send_notify, &serial); + return rtr_foreach_client(send_notify, &serial); } diff --git a/src/rtr/db/delta.c b/src/rtr/db/delta.c index 45a31cf2..e6cd3f9f 100644 --- a/src/rtr/db/delta.c +++ b/src/rtr/db/delta.c @@ -90,8 +90,8 @@ deltas_refput(struct deltas *deltas) } int -deltas_add_roa_v4(struct deltas *deltas, uint32_t as, struct v4_address *addr, - int op) +deltas_add_roa_v4(struct deltas *deltas, uint32_t as, + struct v4_address const *addr, int op) { struct delta_v4 delta = { .as = as, @@ -110,8 +110,8 @@ deltas_add_roa_v4(struct deltas *deltas, uint32_t as, struct v4_address *addr, } int -deltas_add_roa_v6(struct deltas *deltas, uint32_t as, struct v6_address *addr, - int op) +deltas_add_roa_v6(struct deltas *deltas, uint32_t as, + struct v6_address const *addr, int op) { struct delta_v6 delta = { .as = as, @@ -130,7 +130,8 @@ deltas_add_roa_v6(struct deltas *deltas, uint32_t as, struct v6_address *addr, } int -deltas_add_router_key(struct deltas *deltas, struct router_key *key, int op) +deltas_add_router_key(struct deltas *deltas, struct router_key const *key, + int op) { struct delta_rk delta = { .as = key->as, @@ -161,14 +162,13 @@ deltas_is_empty(struct deltas *deltas) static int __foreach_v4(struct deltas_v4 *array, delta_vrp_foreach_cb cb, void *arg, - serial_t serial, uint8_t flags) + uint8_t flags) { struct delta_vrp delta; struct delta_v4 *d; array_index i; int error; - delta.serial = serial; delta.vrp.addr_fam = AF_INET; delta.flags = flags; @@ -187,14 +187,13 @@ __foreach_v4(struct deltas_v4 *array, delta_vrp_foreach_cb cb, void *arg, static int __foreach_v6(struct deltas_v6 *array, delta_vrp_foreach_cb cb, void *arg, - serial_t serial, uint8_t flags) + uint8_t flags) { struct delta_vrp delta; struct delta_v6 *d; array_index i; int error; - delta.serial = serial; delta.vrp.addr_fam = AF_INET6; delta.flags = flags; @@ -213,14 +212,13 @@ __foreach_v6(struct deltas_v6 *array, delta_vrp_foreach_cb cb, void *arg, static int __foreach_rk(struct deltas_rk *array, delta_router_key_foreach_cb cb, - void *arg, serial_t serial, uint8_t flags) + void *arg, uint8_t flags) { struct delta_router_key delta; struct delta_rk *d; array_index i; int error; - delta.serial = serial; delta.flags = flags; ARRAYLIST_FOREACH(array, d, i) { @@ -236,36 +234,30 @@ __foreach_rk(struct deltas_rk *array, delta_router_key_foreach_cb cb, } int -deltas_foreach(serial_t serial, struct deltas *deltas, - delta_vrp_foreach_cb cb_vrp, delta_router_key_foreach_cb cb_rk, void *arg) +deltas_foreach(struct deltas *deltas, delta_vrp_foreach_cb cb_vrp, + delta_router_key_foreach_cb cb_rk, void *arg) { int error; - error = __foreach_v4(&deltas->v4.adds, cb_vrp, arg, serial, - FLAG_ANNOUNCEMENT); + error = __foreach_v4(&deltas->v4.adds, cb_vrp, arg, FLAG_ANNOUNCEMENT); if (error) return error; - error = __foreach_v4(&deltas->v4.removes, cb_vrp, arg, serial, - FLAG_WITHDRAWAL); + error = __foreach_v4(&deltas->v4.removes, cb_vrp, arg, FLAG_WITHDRAWAL); if (error) return error; - error = __foreach_v6(&deltas->v6.adds, cb_vrp, arg, serial, - FLAG_ANNOUNCEMENT); + error = __foreach_v6(&deltas->v6.adds, cb_vrp, arg, FLAG_ANNOUNCEMENT); if (error) return error; - error = __foreach_v6(&deltas->v6.removes, cb_vrp, arg, serial, - FLAG_WITHDRAWAL); + error = __foreach_v6(&deltas->v6.removes, cb_vrp, arg, FLAG_WITHDRAWAL); if (error) return error; - error = __foreach_rk(&deltas->rk.adds, cb_rk, arg, serial, - FLAG_ANNOUNCEMENT); + error = __foreach_rk(&deltas->rk.adds, cb_rk, arg, FLAG_ANNOUNCEMENT); if (error) return error; - return __foreach_rk(&deltas->rk.removes, cb_rk, arg, serial, - FLAG_WITHDRAWAL); + return __foreach_rk(&deltas->rk.removes, cb_rk, arg, FLAG_WITHDRAWAL); } diff --git a/src/rtr/db/delta.h b/src/rtr/db/delta.h index cdd44a03..a0878f14 100644 --- a/src/rtr/db/delta.h +++ b/src/rtr/db/delta.h @@ -11,12 +11,12 @@ int deltas_create(struct deltas **); void deltas_refget(struct deltas *); void deltas_refput(struct deltas *); -int deltas_add_roa_v4(struct deltas *, uint32_t, struct v4_address *, int); -int deltas_add_roa_v6(struct deltas *, uint32_t, struct v6_address *, int); -int deltas_add_router_key(struct deltas *, struct router_key *, int); +int deltas_add_roa_v4(struct deltas *, uint32_t, struct v4_address const *, int); +int deltas_add_roa_v6(struct deltas *, uint32_t, struct v6_address const *, int); +int deltas_add_router_key(struct deltas *, struct router_key const *, int); bool deltas_is_empty(struct deltas *); -int deltas_foreach(serial_t, struct deltas *, delta_vrp_foreach_cb, +int deltas_foreach(struct deltas *, delta_vrp_foreach_cb, delta_router_key_foreach_cb, void *); #endif /* SRC_DELTA_H_ */ diff --git a/src/rtr/db/deltas_array.c b/src/rtr/db/deltas_array.c new file mode 100644 index 00000000..ce035b2d --- /dev/null +++ b/src/rtr/db/deltas_array.c @@ -0,0 +1,98 @@ +#include "rtr/db/deltas_array.h" + +struct deltas_array { + struct deltas **array; /* It's a circular array. */ + unsigned int len; /* Occupied slots. */ + unsigned int last; /* Index of last added element. */ +}; + +struct deltas_array * +darray_create(void) +{ + struct deltas_array *result; + + result = malloc(sizeof(struct deltas_array)); + if (result == NULL) + return NULL; + + result->array = calloc(config_get_deltas_lifetime(), + sizeof(struct deltas *)); + if (result->array == NULL) { + free(result); + return NULL; + } + + result->len = 0; + result->last = UINT_MAX; + return result; +} + +void +darray_destroy(struct deltas_array *darray) +{ + darray_clear(darray); + free(darray->array); + free(darray); +} + +unsigned int +darray_len(struct deltas_array *darray) +{ + return darray->len; +} + +void +darray_add(struct deltas_array *darray, struct deltas *addend) +{ + unsigned int first; + + if (darray->len < config_get_deltas_lifetime()) { + darray->array[darray->len] = addend; + darray->last = darray->len; + darray->len++; + } else { + first = (darray->last == darray->len - 1) + ? 0 : (darray->last + 1); + deltas_refput(darray->array[first]); + darray->array[first] = addend; + darray->last = first; + } +} + +void +darray_clear(struct deltas_array *darray) +{ + unsigned int i; + for (i = 0; i < darray->len; i++) + deltas_refput(darray->array[i]); + darray->len = 0; +} + +int +darray_foreach_since(struct deltas_array *darray, unsigned int from, + darray_foreach_cb cb, void *arg) +{ + unsigned int i; + unsigned int j; + int error; + + if (from == 0) + return 0; + if (from > darray->len) + return -EINVAL; + + i = darray->last - from + 1; + if (i > darray->len) + i += darray->len; + + from += i; + for (; i < from; i++) { + j = (i >= darray->len) ? (i - darray->len) : i; + error = cb(darray->array[j], arg); + if (error) + return error; + } + + return 0; + +} diff --git a/src/rtr/db/deltas_array.h b/src/rtr/db/deltas_array.h new file mode 100644 index 00000000..a9bcf576 --- /dev/null +++ b/src/rtr/db/deltas_array.h @@ -0,0 +1,19 @@ +#ifndef SRC_RTR_DB_DELTAS_ARRAY_H_ +#define SRC_RTR_DB_DELTAS_ARRAY_H_ + +#include "rtr/db/delta.h" + +struct deltas_array; + +struct deltas_array *darray_create(void); +void darray_destroy(struct deltas_array *); + +unsigned int darray_len(struct deltas_array *); +void darray_add(struct deltas_array *, struct deltas *); +void darray_clear(struct deltas_array *); + +typedef int (*darray_foreach_cb)(struct deltas *, void *); +int darray_foreach_since(struct deltas_array *, serial_t from, + darray_foreach_cb, void *); + +#endif /* SRC_RTR_DB_DELTAS_ARRAY_H_ */ diff --git a/src/rtr/db/vrp.h b/src/rtr/db/vrp.h index 59c05528..a1f78711 100644 --- a/src/rtr/db/vrp.h +++ b/src/rtr/db/vrp.h @@ -1,9 +1,9 @@ #ifndef SRC_RTR_DB_VRP_H_ #define SRC_RTR_DB_VRP_H_ -#include #include #include "address.h" +#include "serial.h" #include "object/router_key.h" #define FLAG_WITHDRAWAL 0 @@ -51,8 +51,6 @@ #define VRP_EQ(a, b) \ (VRP_ASN_EQ(a, b) && VRP_PREFIX_EQ(a, b) && VRP_MAX_PREFIX_LEN_EQ(a, b)) -typedef uint32_t serial_t; - struct vrp { uint32_t asn; union { @@ -65,7 +63,6 @@ struct vrp { }; struct delta_vrp { - serial_t serial; struct vrp vrp; uint8_t flags; }; diff --git a/src/rtr/db/vrps.c b/src/rtr/db/vrps.c index 87d83326..a5006623 100644 --- a/src/rtr/db/vrps.c +++ b/src/rtr/db/vrps.c @@ -6,19 +6,17 @@ #include #include -#include "clients.h" #include "common.h" #include "output_printer.h" #include "validation_handler.h" #include "data_structure/array_list.h" #include "object/router_key.h" #include "object/tal.h" +#include "rtr/rtr.h" #include "rtr/db/db_table.h" #include "slurm/slurm_loader.h" #include "thread/thread_pool.h" -DEFINE_ARRAY_LIST_FUNCTIONS(deltas_db, struct delta_group, ) - struct vrp_node { struct delta_vrp delta; SLIST_ENTRY(vrp_node) next; @@ -48,9 +46,14 @@ struct state { */ struct db_table *base; /** DB changes to @base over time. */ - struct deltas_db deltas; + struct deltas_array *deltas; - /* Last valid SLURM applied to base */ + /* + * Last valid SLURM applied to base. + * + * Doesn't need locking, because the only writer is also the only + * reader. + */ struct db_slurm *slurm; /* @@ -65,6 +68,8 @@ struct state { * Do not use this value to check whether we already finished our first * validation. (Use base != NULL for that.) Zero is totally a valid * serial, particularly when the integer wraps. + * + * TODO (fine) this should probably be moved to struct db_table. */ serial_t serial; uint16_t v0_session_id; @@ -76,18 +81,12 @@ static struct state state; /* Thread pool to use when the TALs will be processed */ static struct thread_pool *pool; -/** Read/write lock, which protects @state and its inhabitants. */ +/** Protects @state.base, @state.deltas and @state.serial. */ static pthread_rwlock_t state_lock; /** Lock to protect ROA table during construction. */ static pthread_rwlock_t table_lock; -void -deltagroup_cleanup(struct delta_group *group) -{ - deltas_refput(group->deltas); -} - int vrps_init(void) { @@ -101,8 +100,11 @@ vrps_init(void) return error; state.base = NULL; - - deltas_db_init(&state.deltas); + state.deltas = darray_create(); + if (state.deltas == NULL) { + error = pr_enomem(); + goto revert_thread_pool; + } /* * Use the same start serial, the session ID will avoid @@ -115,7 +117,7 @@ vrps_init(void) now = 0; error = get_current_time(&now); if (error) - goto release_deltas; + goto revert_deltas; state.v0_session_id = now & 0xFFFF; /* Minus 1 to prevent same ID */ @@ -128,20 +130,22 @@ vrps_init(void) error = pthread_rwlock_init(&state_lock, NULL); if (error) { error = pr_op_errno(error, "state pthread_rwlock_init() errored"); - goto release_deltas; + goto revert_deltas; } error = pthread_rwlock_init(&table_lock, NULL); if (error) { error = pr_op_errno(error, "table pthread_rwlock_init() errored"); - goto release_state_lock; + goto revert_state_lock; } return 0; -release_state_lock: + +revert_state_lock: pthread_rwlock_destroy(&state_lock); -release_deltas: - deltas_db_cleanup(&state.deltas, deltagroup_cleanup); +revert_deltas: + darray_destroy(state.deltas); +revert_thread_pool: thread_pool_destroy(pool); return error; } @@ -149,15 +153,17 @@ release_deltas: void vrps_destroy(void) { - if (state.base != NULL) - db_table_destroy(state.base); - if (state.slurm != NULL) - db_slurm_destroy(state.slurm); - deltas_db_cleanup(&state.deltas, deltagroup_cleanup); - /* Nothing to do with error codes from now on */ + thread_pool_destroy(pool); + pthread_rwlock_destroy(&state_lock); pthread_rwlock_destroy(&table_lock); - thread_pool_destroy(pool); + + if (state.slurm != NULL) + db_slurm_destroy(state.slurm); + + darray_destroy(state.deltas); + if (state.base != NULL) + db_table_destroy(state.base); } #define WLOCK_HANDLER(lock, cb) \ @@ -218,101 +224,51 @@ __perform_standalone_validation(struct db_table **result) return 0; } -/* - * Remove unnecessary deltas from the database. - * Unnecessary deltas = those whose serial < min_serial. - */ -static void -cleanup_deltas(serial_t min_serial) -{ - struct delta_group *initial; - struct delta_group *rm; - array_index i; - - /* - * TODO the array is sorted by serial, but it's supposed to employ - * serial arithmetic. > is incorrect. - */ - ARRAYLIST_FOREACH(&state.deltas, initial, i) - if (initial->serial > min_serial) - break; - - for (rm = state.deltas.array; rm < initial; rm++) - deltas_refput(rm->deltas); - - state.deltas.len -= (initial - state.deltas.array); - memmove(state.deltas.array, initial, - state.deltas.len * sizeof(struct delta_group)); -} - static int __compute_deltas(struct db_table *old_base, struct db_table *new_base, - bool *notify_clients) + bool *notify_clients, struct deltas **result) { - struct deltas *deltas; /* Deltas in raw form */ - struct delta_group deltas_node; /* Deltas in database node form */ - serial_t min_serial; int error; - error = 0; - - /* No clients listening = no need for deltas */ - if (clients_get_min_serial(&min_serial) == -ENOENT) - goto purge_deltas; - + *result = NULL; if (notify_clients != NULL) *notify_clients = true; /* First version of the database = No deltas */ if (old_base == NULL) - goto purge_deltas; + return 0; - /* - * Failure on computing deltas = latest database version lacks deltas, - * which renders all previous deltas useless. (Because clients always - * want the latest.) - */ - error = compute_deltas(old_base, new_base, &deltas); + error = compute_deltas(old_base, new_base, result); if (error) - goto purge_deltas; + return error; - if (deltas_is_empty(deltas)) { + if (deltas_is_empty(*result)) { if (notify_clients != NULL) *notify_clients = false; - deltas_refput(deltas); - goto success; /* Happy path when the DB doesn't change. */ - } - - deltas_node.serial = state.serial; - deltas_node.deltas = deltas; - /* On success, ownership of deltas is transferred to state.deltas. */ - error = deltas_db_add(&state.deltas, &deltas_node); - if (error) { - deltas_refput(deltas); - goto purge_deltas; + deltas_refput(*result); + *result = NULL; } - /* Happy path when the DB changes. (Fall through) */ - -success: - cleanup_deltas(min_serial); return 0; - -purge_deltas: - deltas_db_cleanup(&state.deltas, deltagroup_cleanup); - return error; } static int __vrps_update(bool *notify_clients) { + /* + * This function is the only writer, and it runs once at a time. + * Therefore, it's going to worry about write locking, but not read + * locking. + */ + struct db_table *old_base; struct db_table *new_base; + struct deltas *new_deltas; int error; if (notify_clients) *notify_clients = false; - old_base = NULL; + old_base = state.base; new_base = NULL; error = __perform_standalone_validation(&new_base); @@ -333,18 +289,8 @@ __vrps_update(bool *notify_clients) */ output_print_data(new_base); - rwlock_write_lock(&state_lock); - - old_base = state.base; /* Postpone destruction, to release lock ASAP. */ - state.base = new_base; - state.serial++; - - /* - * TODO after refactoring vrps_foreach_filtered_delta(), move this out - * of the mutex. You don't really need the mutex to compute the deltas; - * vrps_update() is supposed to be the only writer. - */ - error = __compute_deltas(old_base, new_base, notify_clients); + error = __compute_deltas(old_base, new_base, notify_clients, + &new_deltas); if (error) { /* * Deltas are nice-to haves. As long as state.base is correct, @@ -355,6 +301,22 @@ __vrps_update(bool *notify_clients) pr_op_warn("Deltas could not be computed: %s", strerror(error)); } + rwlock_write_lock(&state_lock); + + state.base = new_base; + state.serial++; + if (new_deltas != NULL) { + /* Ownership transferred */ + darray_add(state.deltas, new_deltas); + } else { + /* + * If the latest base has no deltas, all existing deltas are + * rendered useless. This is because clients always want to + * reach the latest serial, no matter where they are. + */ + darray_clear(state.deltas); + } + rwlock_unlock(&state_lock); if (old_base != NULL) @@ -505,21 +467,64 @@ router_key_ovrd_remove(struct delta_router_key const *delta, void *arg) return 0; } -/* - * Remove all operations on @deltas that override each other, and do @cb (with - * @arg) on each element of the resultant delta. +static int +__deltas_foreach(struct deltas *deltas, void *arg) +{ + return deltas_foreach(deltas, vrp_ovrd_remove, router_key_ovrd_remove, + arg); +} + +/** + * Runs @vrp_cb and @rk_cb on all the deltas from the database whose + * serial > @from, excluding those that cancel each other. + * + * Please keep in mind that there is at least one errcode-aware caller. The most + * important ones are + * 1. 0: No errors. + * 2. -EAGAIN: No data available; database still under construction. + * 3. -ESRCH: @from was not found. */ int -vrps_foreach_filtered_delta(struct deltas_db *deltas, - delta_vrp_foreach_cb cb_prefix, delta_router_key_foreach_cb cb_rk, +vrps_foreach_delta_since(serial_t from, serial_t *to, + delta_vrp_foreach_cb vrp_cb, delta_router_key_foreach_cb rk_cb, void *arg) { struct sorted_lists filtered_lists; - struct delta_group *group; struct vrp_node *vnode; struct rk_node *rnode; - array_index i; - int error = 0; + int error; + + error = rwlock_read_lock(&state_lock); + if (error) + return error; + + if (state.base == NULL) { + /* Database still under construction. */ + rwlock_unlock(&state_lock); + return -EAGAIN; + } + + if (from == state.serial) { + /* Client already has the latest serial. */ + rwlock_unlock(&state_lock); + *to = from; + return 0; + } + + /* if from < first serial */ + if (serial_lt(from, state.serial - darray_len(state.deltas))) + goto cache_reset; /* Delta was already deleted. */ + /* if from > last serial */ + if (serial_lt(state.serial, from)) + goto cache_reset; /* Serial is invalid. */ + + /* + * TODO (performance) this implementation is naive. + * Either use a hash set, or sort the resources. + * Also, deltas that share a serial do not need to be compared to each + * other. (Corollary: If there's one serial, no comparisons whatsoever + * need to be made.) + */ /* * Filter: Remove entries that cancel each other. @@ -528,21 +533,20 @@ vrps_foreach_filtered_delta(struct deltas_db *deltas, */ SLIST_INIT(&filtered_lists.prefixes); SLIST_INIT(&filtered_lists.router_keys); - ARRAYLIST_FOREACH(deltas, group, i) { - error = deltas_foreach(group->serial, group->deltas, - vrp_ovrd_remove, router_key_ovrd_remove, &filtered_lists); - if (error) - goto release_list; - } + + error = darray_foreach_since(state.deltas, state.serial - from, + __deltas_foreach, &filtered_lists); + if (error) + goto release_list; /* Now do the corresponding callback on the filtered deltas */ SLIST_FOREACH(vnode, &filtered_lists.prefixes, next) { - error = cb_prefix(&vnode->delta, arg); + error = vrp_cb(&vnode->delta, arg); if (error) break; } SLIST_FOREACH(rnode, &filtered_lists.router_keys, next) { - error = cb_rk(&rnode->delta, arg); + error = rk_cb(&rnode->delta, arg); if (error) break; } @@ -559,75 +563,11 @@ release_list: free(rnode); } - return error; -} - -/** - * Adds to @result the deltas whose serial > @from. - * - * Please keep in mind that there is at least one errcode-aware caller. The most - * important ones are - * 1. 0: No errors. - * 2. -EAGAIN: No data available; database still under construction. - * 3. -ESRCH: @from was not found. - * - * As usual, only 0 guarantees valid out parameters. (@to and @result.) - * (But note that @result is supposed to be already initialized, so caller will - * have to clean it up regardless of error.) - */ -int -vrps_get_deltas_from(serial_t from, serial_t *to, struct deltas_db *result) -{ - struct delta_group *group; - serial_t first_serial; - serial_t last_serial; - array_index i; - int error; - - error = rwlock_read_lock(&state_lock); - if (error) - return error; - - if (state.base == NULL) - goto try_again; /* Database still under construction. */ - if (from == state.serial) { - /* Client already has the latest serial. */ - rwlock_unlock(&state_lock); - *to = from; - return 0; - } - if (state.deltas.len == 0) - goto reset_database; /* No deltas available. */ - - first_serial = state.deltas.array[0].serial - 1; - last_serial = state.deltas.array[state.deltas.len - 1].serial; - - if (from < first_serial) - goto reset_database; /* Delta was already deleted. */ - if (from > last_serial) - goto reset_database; /* Serial is invalid. */ - - for (i = from - first_serial; i < state.deltas.len; i++) { - group = &state.deltas.array[i]; - - error = deltas_db_add(result, group); - if (error) { - rwlock_unlock(&state_lock); - return error; - } - - deltas_refget(group->deltas); - } - + *to = state.serial; rwlock_unlock(&state_lock); - *to = last_serial; return 0; -try_again: - rwlock_unlock(&state_lock); - return -EAGAIN; - -reset_database: +cache_reset: rwlock_unlock(&state_lock); return -ESRCH; } @@ -654,7 +594,10 @@ get_last_serial_number(serial_t *result) uint16_t get_current_session_id(uint8_t rtr_version) { - /* Semaphore isn't needed since this value is set at initialization */ + /* + * These values are constant after initialization, + * so locking isn't needed. + */ if (rtr_version == 1) return state.v1_session_id; return state.v0_session_id; diff --git a/src/rtr/db/vrps.h b/src/rtr/db/vrps.h index 3d45e15b..31b0ab07 100644 --- a/src/rtr/db/vrps.h +++ b/src/rtr/db/vrps.h @@ -8,19 +8,8 @@ */ #include -#include "data_structure/array_list.h" -#include "rtr/db/delta.h" -struct delta_group { - serial_t serial; - /* snapshot(serial - 1) + deltas = snapshot(serial) */ - struct deltas *deltas; -}; - -void deltagroup_cleanup(struct delta_group *); - -DEFINE_ARRAY_LIST_STRUCT(deltas_db, struct delta_group); -DECLARE_ARRAY_LIST_FUNCTIONS(deltas_db, struct delta_group) +#include "rtr/db/deltas_array.h" int vrps_init(void); void vrps_destroy(void); @@ -34,11 +23,9 @@ int vrps_update(bool *); */ int vrps_foreach_base(vrp_foreach_cb, router_key_foreach_cb, void *); -int vrps_get_deltas_from(serial_t, serial_t *, struct deltas_db *); -int get_last_serial_number(serial_t *); - -int vrps_foreach_filtered_delta(struct deltas_db *, delta_vrp_foreach_cb, +int vrps_foreach_delta_since(serial_t, serial_t *, delta_vrp_foreach_cb, delta_router_key_foreach_cb, void *); +int get_last_serial_number(serial_t *); int handle_roa_v4(uint32_t, struct ipv4_prefix const *, uint8_t, void *); int handle_roa_v6(uint32_t, struct ipv6_prefix const *, uint8_t, void *); diff --git a/src/rtr/pdu.c b/src/rtr/pdu.c index 4ba76c0c..0aa29114 100644 --- a/src/rtr/pdu.c +++ b/src/rtr/pdu.c @@ -6,7 +6,6 @@ #include #include "address.h" -#include "clients.h" #include "common.h" #include "log.h" #include "rtr/err_pdu.h" @@ -44,111 +43,106 @@ pdutype2str(enum pdu_type type) static int pdu_header_from_reader(struct pdu_reader *reader, struct pdu_header *header) { - return read_int8(reader, &header->protocol_version) - || read_int8(reader, &header->pdu_type) - || read_int16(reader, &header->m.session_id) - || read_int32(reader, &header->length); -} + int error; -/* Do not use this macro before @header has been initialized, obviously. */ -#define RESPOND_ERROR(report_cb) \ - ((header.pdu_type != PDU_TYPE_ERROR_REPORT) ? (report_cb) : -EINVAL); + error = read_int8(reader, &header->protocol_version); + if (error) + return error; + error = read_int8(reader, &header->pdu_type); + if (error) + return error; + error = read_int16(reader, &header->m.session_id); + if (error) + return error; + return read_int32(reader, &header->length); +} static int -validate_rtr_version(int fd, struct pdu_header *header, +validate_rtr_version(struct rtr_client *client, struct pdu_header *header, unsigned char *hdr_bytes) { - uint8_t curr_version; - bool version_set; - int error; - - error = clients_get_rtr_version_set(fd, &version_set, &curr_version); - if (error) - /* Unlikely, but close connection with min version */ - return (header->pdu_type != PDU_TYPE_ERROR_REPORT) ? - err_pdu_send_internal_error(fd, RTR_V0) : -EINVAL; - - if (version_set) { - if (header->protocol_version == curr_version) + if (client->rtr_version != -1) { + if (header->protocol_version == client->rtr_version) return 0; /* Don't send error on a rcvd error! */ if (header->pdu_type == PDU_TYPE_ERROR_REPORT) return -EINVAL; - switch (curr_version) { + switch (client->rtr_version) { case RTR_V1: /* Rcvd version is valid, but unexpected */ if (header->protocol_version == RTR_V0) return err_pdu_send_unexpected_proto_version( - fd, curr_version, hdr_bytes, + client->fd, client->rtr_version, hdr_bytes, "RTR version 0 was expected"); /* Send common error */ case RTR_V0: - return err_pdu_send_unsupported_proto_version(fd, - curr_version, hdr_bytes, + return err_pdu_send_unsupported_proto_version( + client->fd, client->rtr_version, hdr_bytes, "RTR version received is unknown."); default: - pr_crit("Unknown RTR version %u", curr_version); - break; + pr_crit("Unknown RTR version %u", client->rtr_version); } } /* Unsigned and incremental values, so compare against major version */ if (header->protocol_version > RTR_V1) /* ...and send error with min version */ - return (header->pdu_type != PDU_TYPE_ERROR_REPORT) ? - err_pdu_send_unsupported_proto_version(fd, RTR_V0, - hdr_bytes, "RTR version received is unknown.") + return (header->pdu_type != PDU_TYPE_ERROR_REPORT) + ? err_pdu_send_unsupported_proto_version(client->fd, RTR_V0, + hdr_bytes, "RTR version received is unknown.") : -EINVAL; - return clients_set_rtr_version(fd, header->protocol_version); + client->rtr_version = header->protocol_version; + return 0; } +/* Do not use this macro before @header has been initialized, obviously. */ +#define RESPOND_ERROR(report_cb) \ + ((header.pdu_type != PDU_TYPE_ERROR_REPORT) ? (report_cb) : -EINVAL); + +/* + * Reads the next PDU from @reader. Returns the PDU in @request, and its + * metadata in @metadata. + */ int -pdu_load(int fd, struct sockaddr_storage *client_addr, +pdu_load(struct pdu_reader *reader, struct rtr_client *client, struct rtr_request *request, struct pdu_metadata const **metadata) { - unsigned char hdr_bytes[RTRPDU_HDR_LEN]; - struct pdu_reader reader; struct pdu_header header; struct pdu_metadata const *meta; - uint8_t version; int error; - /* Read the header into its buffer. */ - error = pdu_reader_init(&reader, fd, hdr_bytes, RTRPDU_HDR_LEN, true); - if (error) - /* Communication interrupted; omit error response */ - return error; - error = pdu_header_from_reader(&reader, &header); + if (reader->size == 0) { + pr_op_debug("Client packet contains no more PDUs."); + return ENOENT; + } + + request->bytes = reader->buffer; + request->bytes_len = RTRPDU_HDR_LEN; + + error = pdu_header_from_reader(reader, &header); if (error) /* No error response because the PDU might have been an error */ return error; + pr_op_debug("PDU '%s' received from client '%s'", + pdutype2str(header.pdu_type), client->addr); - if (log_op_enabled(LOG_DEBUG)) - pr_op_debug("Received a %s from %s.", - pdutype2str(header.pdu_type), - sockaddr2str(client_addr)); - - error = validate_rtr_version(fd, &header, hdr_bytes); + error = validate_rtr_version(client, &header, request->bytes); if (error) return error; /* Error response PDU already sent */ - /* RTR version ok, keep it for later use */ - version = header.protocol_version; - /* * DO NOT USE THE err_pdu_* functions directly. Wrap them with * RESPOND_ERROR() INSTEAD. - * TODO I think this comment should be above validate_rtr_version(), - * and validate_rtr_version() is buggy. */ if (header.length < RTRPDU_HDR_LEN) - return RESPOND_ERROR(err_pdu_send_invalid_request_truncated(fd, - version, hdr_bytes, "Invalid header length. (< 8 bytes)")); + return RESPOND_ERROR(err_pdu_send_invalid_request_truncated( + client->fd, client->rtr_version, request->bytes, + "Invalid header length. (< 8 bytes)")); /* * Error messages can be quite large. @@ -158,60 +152,33 @@ pdu_load(int fd, struct sockaddr_storage *client_addr, * (Warning: I'm assuming english tho.) */ if (header.length > 512) - return RESPOND_ERROR(err_pdu_send_invalid_request_truncated(fd, - version, hdr_bytes, "PDU is too large. (> 512 bytes)")); + return RESPOND_ERROR(err_pdu_send_invalid_request_truncated( + client->fd, client->rtr_version, request->bytes, + "PDU is too large. (> 512 bytes)")); - /* Read the rest of the PDU into its buffer. */ request->bytes_len = header.length; - request->bytes = malloc(header.length); - if (request->bytes == NULL) - /* No error report PDU on allocation failures. */ - return pr_enomem(); - - memcpy(request->bytes, hdr_bytes, RTRPDU_HDR_LEN); - error = pdu_reader_init(&reader, fd, - request->bytes + RTRPDU_HDR_LEN, - header.length - RTRPDU_HDR_LEN, - false); - if (error) - /* Communication interrupted; no error PDU. */ - goto revert_bytes; /* Deserialize the PDU. */ meta = pdu_get_metadata(header.pdu_type); - if (!meta) { - error = RESPOND_ERROR(err_pdu_send_unsupported_pdu_type(fd, - version, request)); - goto revert_bytes; - } + if (!meta) + return RESPOND_ERROR(err_pdu_send_unsupported_pdu_type( + client->fd, client->rtr_version, request)); request->pdu = malloc(meta->length); - if (request->pdu == NULL) { - error = pr_enomem(); - goto revert_bytes; - } + if (request->pdu == NULL) + /* No error report PDU on allocation failures. */ + return pr_enomem(); - error = meta->from_stream(&header, &reader, request->pdu); - if (reader.size != 0) { - error = RESPOND_ERROR(err_pdu_send_invalid_request(fd, version, - request, - "The PDU length sent doesn't match the real PDU length")); - goto revert_pdu; - } + error = meta->from_stream(&header, reader, request->pdu); if (error) { - RESPOND_ERROR(err_pdu_send_internal_error(fd, version)); - goto revert_pdu; + /* Communication interrupted; no error PDU. */ + free(request->pdu); + return error; } /* Happy path. */ *metadata = meta; return 0; - -revert_pdu: - free(request->pdu); -revert_bytes: - free(request->bytes); - return error; } static int @@ -344,7 +311,7 @@ error_report_destroy(void *pdu_void) } DEFINE_METADATA(serial_notify, free); -DEFINE_METADATA(serial_query, free); +DEFINE_METADATA(serial_query, free); /* handle_serial_query_pdu */ DEFINE_METADATA(reset_query, free); DEFINE_METADATA(cache_response, free); DEFINE_METADATA(ipv4_prefix, free); diff --git a/src/rtr/pdu.h b/src/rtr/pdu.h index 5afdfcb8..b5c9b6aa 100644 --- a/src/rtr/pdu.h +++ b/src/rtr/pdu.h @@ -7,6 +7,7 @@ #include "common.h" #include "object/router_key.h" #include "rtr/primitive_reader.h" +#include "rtr/rtr.h" #define RTR_V0 0 #define RTR_V1 1 @@ -157,7 +158,7 @@ struct pdu_metadata { void (*destructor)(void *); }; -int pdu_load(int, struct sockaddr_storage *, struct rtr_request *, +int pdu_load(struct pdu_reader *, struct rtr_client *, struct rtr_request *, struct pdu_metadata const **); struct pdu_metadata const *pdu_get_metadata(uint8_t); struct pdu_header *pdu_get_header(void *); diff --git a/src/rtr/pdu_handler.c b/src/rtr/pdu_handler.c index 1ed8532b..d4110a5b 100644 --- a/src/rtr/pdu_handler.c +++ b/src/rtr/pdu_handler.c @@ -22,13 +22,61 @@ handle_serial_notify_pdu(int fd, struct rtr_request const *request) WARN_UNEXPECTED_PDU(serial_notify, fd, request, "Serial Notify"); } +struct send_delta_args { + int fd; + uint8_t rtr_version; + bool cache_response_sent; +}; + +static int +send_cache_response_maybe(struct send_delta_args *args) +{ + int error; + + if (!args->cache_response_sent) { + error = send_cache_response_pdu(args->fd, args->rtr_version); + if (error) + return error; + args->cache_response_sent = true; + } + + return 0; +} + +static int +send_delta_vrp(struct delta_vrp const *delta, void *arg) +{ + struct send_delta_args *args = arg; + int error; + + error = send_cache_response_maybe(args); + if (error) + return error; + + return send_prefix_pdu(args->fd, args->rtr_version, &delta->vrp, + delta->flags); +} + +static int +send_delta_rk(struct delta_router_key const *delta, void *arg) +{ + struct send_delta_args *args = arg; + int error; + + error = send_cache_response_maybe(args); + if (error) + return error; + + return send_router_key_pdu(args->fd, args->rtr_version, + &delta->router_key, delta->flags); +} + int handle_serial_query_pdu(int fd, struct rtr_request const *request) { struct serial_query_pdu *query = request->pdu; - struct deltas_db deltas; + struct send_delta_args args; serial_t final_serial; - uint8_t version; int error; /* @@ -38,12 +86,15 @@ handle_serial_query_pdu(int fd, struct rtr_request const *request) * the mismatch MUST immediately terminate the session with an Error * Report PDU with code 0 ("Corrupt Data")" */ - version = query->header.protocol_version; + args.rtr_version = query->header.protocol_version; if (query->header.m.session_id != - get_current_session_id(version)) - return err_pdu_send_corrupt_data(fd, version, request, + get_current_session_id(args.rtr_version)) + return err_pdu_send_corrupt_data(fd, args.rtr_version, request, "Session ID doesn't match."); + args.fd = fd; + args.cache_response_sent = false; + /* * For the record, there are two reasons why we want to work on a * (shallow) copy of the deltas (as opposed to eg. a foreach): @@ -53,50 +104,39 @@ handle_serial_query_pdu(int fd, struct rtr_request const *request) * PDUs, to minimize writer stagnation. */ - deltas_db_init(&deltas); - error = vrps_get_deltas_from(query->serial_number, &final_serial, - &deltas); + error = vrps_foreach_delta_since(query->serial_number, &final_serial, + send_delta_vrp, send_delta_rk, &args); switch (error) { case 0: - break; + /* + * https://tools.ietf.org/html/rfc6810#section-6.2 + * + * These functions presently only fail on writes, allocations + * and programming errors. Best avoid error PDUs. + */ + if (!args.cache_response_sent) { + error = send_cache_response_pdu(fd, args.rtr_version); + if (error) + return error; + } + return send_end_of_data_pdu(fd, args.rtr_version, final_serial); case -EAGAIN: /* Database still under construction */ - error = err_pdu_send_no_data_available(fd, version); - goto end; + error = err_pdu_send_no_data_available(fd, args.rtr_version); + return error; case -ESRCH: /* Invalid serial */ /* https://tools.ietf.org/html/rfc6810#section-6.3 */ - error = send_cache_reset_pdu(fd, version); - goto end; + return send_cache_reset_pdu(fd, args.rtr_version); case -ENOMEM: /* Memory allocation failure */ - error = pr_enomem(); - goto end; + return pr_enomem(); case EAGAIN: /* Too many threads */ /* * I think this should be more of a "try again" thing, but * RTR does not provide a code for that. Just fall through. */ - default: - error = err_pdu_send_internal_error(fd, version); - goto end; + break; } - /* - * https://tools.ietf.org/html/rfc6810#section-6.2 - * - * These functions presently only fail on writes, allocations and - * programming errors. Best avoid error PDUs. - */ - - error = send_cache_response_pdu(fd, version); - if (error) - goto end; - error = send_delta_pdus(fd, version, &deltas); - if (error) - goto end; - error = send_end_of_data_pdu(fd, version, final_serial); - -end: - deltas_db_cleanup(&deltas, deltagroup_cleanup); - return error; + return err_pdu_send_internal_error(fd, args.rtr_version); } struct base_roa_args { diff --git a/src/rtr/pdu_sender.c b/src/rtr/pdu_sender.c index 1cd11317..bee37a79 100644 --- a/src/rtr/pdu_sender.c +++ b/src/rtr/pdu_sender.c @@ -8,7 +8,6 @@ #include #include /* INET_ADDRSTRLEN */ -#include "clients.h" #include "common.h" #include "config.h" #include "log.h" @@ -217,52 +216,6 @@ send_router_key_pdu(int fd, uint8_t version, return send_response(fd, pdu.header.pdu_type, data, len); } -struct simple_param { - int fd; - uint8_t version; -}; - -static int -vrp_simply_send(struct delta_vrp const *delta, void *arg) -{ - struct simple_param *param = arg; - - return send_prefix_pdu(param->fd, param->version, &delta->vrp, - delta->flags); -} - -static int -router_key_simply_send(struct delta_router_key const *delta, void *arg) -{ - struct simple_param *param = arg; - - return send_router_key_pdu(param->fd, param->version, - &delta->router_key, delta->flags); -} - -int -send_delta_pdus(int fd, uint8_t version, struct deltas_db *deltas) -{ - struct delta_group *group; - struct simple_param param; - - param.fd = fd; - param.version = version; - - /* - * Short circuit: Entries that share serial are already guaranteed to - * not contradict each other, so no filtering required. - */ - if (deltas->len == 1) { - group = &deltas->array[0]; - return deltas_foreach(group->serial, group->deltas, - vrp_simply_send, router_key_simply_send, ¶m); - } - - return vrps_foreach_filtered_delta(deltas, vrp_simply_send, - router_key_simply_send, ¶m); -} - #define GET_END_OF_DATA_LENGTH(version) \ ((version == RTR_V1) ? \ RTRPDU_END_OF_DATA_V1_LEN : RTRPDU_END_OF_DATA_V0_LEN) @@ -273,7 +226,6 @@ send_end_of_data_pdu(int fd, uint8_t version, serial_t end_serial) struct end_of_data_pdu pdu; unsigned char data[GET_END_OF_DATA_LENGTH(version)]; size_t len; - int error; set_header_values(&pdu.header, version, PDU_TYPE_END_OF_DATA, get_current_session_id(version)); @@ -290,12 +242,7 @@ send_end_of_data_pdu(int fd, uint8_t version, serial_t end_serial) if (len != GET_END_OF_DATA_LENGTH(version)) pr_crit("Serialized End of Data is %zu bytes.", len); - error = send_response(fd, pdu.header.pdu_type, data, len); - if (error) - return error; - - clients_update_serial(fd, pdu.serial_number); - return 0; + return send_response(fd, pdu.header.pdu_type, data, len); } int diff --git a/src/rtr/pdu_sender.h b/src/rtr/pdu_sender.h index 4f2cb33e..113b3a91 100644 --- a/src/rtr/pdu_sender.h +++ b/src/rtr/pdu_sender.h @@ -10,7 +10,6 @@ int send_cache_reset_pdu(int, uint8_t); int send_cache_response_pdu(int, uint8_t); int send_prefix_pdu(int, uint8_t, struct vrp const *, uint8_t); int send_router_key_pdu(int, uint8_t, struct router_key const *, uint8_t); -int send_delta_pdus(int, uint8_t, struct deltas_db *); int send_end_of_data_pdu(int, uint8_t, serial_t); int send_error_report_pdu(int, uint8_t, uint16_t, struct rtr_request const *, char *); diff --git a/src/rtr/primitive_reader.c b/src/rtr/primitive_reader.c index 00943f35..2979b5ea 100644 --- a/src/rtr/primitive_reader.c +++ b/src/rtr/primitive_reader.c @@ -13,59 +13,22 @@ static int get_octets(unsigned char); static void place_null_character(rtr_char *, size_t); -/** - * Reads exactly @buffer_len bytes from @buffer, erroring if this goal cannot be - * met. - * - * If @allow_eof is true, will allow immediate EOF in place of the buffer bytes. - * (Though all this really means is that the corresponding warning message will - * not be printed, which is perfectly fine as far as the only current caller is - * concerned.) - * - * Returns 0 if exactly @buffer_len bytes could be read. - */ -static int -read_exact(int fd, unsigned char *buffer, size_t buffer_len, bool allow_eof) -{ - ssize_t read_result; - size_t offset; - - for (offset = 0; offset < buffer_len; offset += read_result) { - read_result = read(fd, &buffer[offset], buffer_len - offset); - if (read_result == -1) - return -pr_op_errno(errno, - "Client socket read interrupted"); - - if (read_result == 0) { - if (!allow_eof) - pr_op_warn("Stream ended mid-PDU."); - return -EPIPE; - } - - allow_eof = false; - } - - return 0; -} - /** * BTW: I think it's best not to use sizeof for @size, because it risks * including padding. */ -int -pdu_reader_init(struct pdu_reader *reader, int fd, unsigned char *buffer, - size_t size, bool allow_eof) +void +pdu_reader_init(struct pdu_reader *reader, unsigned char *buffer, size_t size) { reader->buffer = buffer; reader->size = size; - return read_exact(fd, reader->buffer, size, allow_eof); } static int insufficient_bytes(void) { pr_op_debug("Attempted to read past the end of a PDU Reader."); - return -EINVAL; + return -EPIPE; } int diff --git a/src/rtr/primitive_reader.h b/src/rtr/primitive_reader.h index a66857ea..803adac2 100644 --- a/src/rtr/primitive_reader.h +++ b/src/rtr/primitive_reader.h @@ -1,7 +1,6 @@ #ifndef RTR_PRIMITIVE_READER_H_ #define RTR_PRIMITIVE_READER_H_ -#include #include /* in_addr, in6_addr */ #include "common.h" @@ -13,8 +12,7 @@ struct pdu_reader { size_t size; }; -int pdu_reader_init(struct pdu_reader *, int, unsigned char *, size_t size, - bool); +void pdu_reader_init(struct pdu_reader *, unsigned char *, size_t size); int read_int8(struct pdu_reader *, uint8_t *); int read_int16(struct pdu_reader *, uint16_t *); diff --git a/src/rtr/rtr.c b/src/rtr/rtr.c index 4161f963..c5ff459b 100644 --- a/src/rtr/rtr.c +++ b/src/rtr/rtr.c @@ -2,50 +2,154 @@ #include #include -#include +#include +#include +#include +#include #include -#include -#include -#include #include -#include -#include +#include +#include +#include +#include "address.h" #include "config.h" -#include "clients.h" -#include "internal_pool.h" -#include "log.h" -#include "rtr/err_pdu.h" +#include "data_structure/array_list.h" #include "rtr/pdu.h" -#include "rtr/db/vrps.h" #include "thread/thread_pool.h" -/* Constant messages regarding a client status */ -#define CL_ACCEPTED "accepted" -#define CL_CLOSED "closed" -#define CL_TERMINATED "terminated" -#define CL_REJECTED "rejected" +static pthread_t server_thread; +static volatile bool stop_server_thread; -/* Parameters for each thread that handles client connections */ -struct thread_param { - int fd; - struct sockaddr_storage addr; +STATIC_ARRAY_LIST(server_arraylist, struct rtr_server); +STATIC_ARRAY_LIST(client_arraylist, struct rtr_client); +STATIC_ARRAY_LIST(pollfd_arraylist, struct pollfd); + +static struct server_arraylist servers; +static struct client_arraylist clients; +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + +struct thread_pool *request_handlers; + +#define REQUEST_BUFFER_LEN 1024 + +struct client_request { + struct rtr_client *client; + unsigned char buffer[REQUEST_BUFFER_LEN]; + size_t nread; }; -/* Parameters for each file descriptor that binds to a server address/socket */ -struct fd_node { - int id; - SLIST_ENTRY(fd_node) next; +enum poll_verdict { + PV_CONTINUE, + PV_RETRY, /* Pause for a while, then continue */ + PV_STOP, }; -/* List of server sockets */ -SLIST_HEAD(server_fds, fd_node); +static void +panic_on_fail(int error, char const *function_name) +{ + if (error) + pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.", + function_name, error); +} + +static void +lock_mutex(void) +{ + panic_on_fail(pthread_mutex_lock(&lock), "pthread_mutex_lock"); +} + +static void +unlock_mutex(void) +{ + panic_on_fail(pthread_mutex_unlock(&lock), "pthread_mutex_unlock"); +} + +static void +cleanup_server(struct rtr_server *server) +{ + if (server->fd != -1) + close(server->fd); + free(server->addr); +} + +static void +cleanup_client(struct rtr_client *client) +{ + if (client->fd != -1) { + shutdown(client->fd, SHUT_RDWR); + close(client->fd); + } +} + +static void +destroy_db(void) +{ + server_arraylist_cleanup(&servers, cleanup_server); + client_arraylist_cleanup(&clients, cleanup_client); +} + +/* + * Extracts from @full_address ("IP#[port]") the address and port, and returns + * them in @address and @service, respectively. + * + * The default port is config_get_server_port(). + */ +static int +parse_address(char const *full_address, char **address, char **service) +{ + char *ptr; + char *tmp_addr; + char *tmp_serv; + size_t tmp_addr_len; + + if (full_address == NULL) { + tmp_addr = NULL; + tmp_serv = strdup(config_get_server_port()); + if (tmp_serv == NULL) + return pr_enomem(); + goto done; + } + + ptr = strrchr(full_address, '#'); + if (ptr == NULL) { + tmp_addr = strdup(full_address); + if (tmp_addr == NULL) + return pr_enomem(); + + tmp_serv = strdup(config_get_server_port()); + if (tmp_serv == NULL) { + free(tmp_addr); + return pr_enomem(); + } + + goto done; + } + + if (*(ptr + 1) == '\0') + return pr_op_err("Invalid server address '%s', can't end with '#'", + full_address); + + tmp_addr_len = strlen(full_address) - strlen(ptr); + tmp_addr = malloc(tmp_addr_len + 1); + if (tmp_addr == NULL) + return pr_enomem(); + + memcpy(tmp_addr, full_address, tmp_addr_len); + tmp_addr[tmp_addr_len] = '\0'; + + tmp_serv = strdup(ptr + 1); + if (tmp_serv == NULL) { + free(tmp_addr); + return pr_enomem(); + } -/* "file descriptors" */ -static struct server_fds fds; -static struct thread_pool *threads; -/* Does the server needs to be stopped? */ -static volatile bool server_stop; + /* Fall through */ +done: + *address = tmp_addr; + *service = tmp_serv; + return 0; +} static int init_addrinfo(char const *hostname, char const *service, @@ -96,7 +200,7 @@ init_addrinfo(char const *hostname, char const *service, } static int -set_nonblock(int fd, bool value) +set_nonblock(int fd) { int flags; int error; @@ -108,11 +212,7 @@ set_nonblock(int fd, bool value) return error; } - /* Non-block to allow listening on all server sockets */ - if (value) - flags |= O_NONBLOCK; - else - flags &= ~O_NONBLOCK; + flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) == -1) { error = errno; @@ -128,16 +228,17 @@ set_nonblock(int fd, bool value) * from the clients. */ static int -create_server_socket(char const *hostname, char const *service, int *result) +create_server_socket(char const *input_addr, char const *hostname, + char const *service) { struct addrinfo *addrs; struct addrinfo *addr; unsigned long port; int reuse; int fd; + struct rtr_server server; int error; - *result = 0; /* Shuts up gcc */ reuse = 1; error = init_addrinfo(hostname, service, &addrs); @@ -157,8 +258,11 @@ create_server_socket(char const *hostname, char const *service, int *result) continue; } - /* Non-block to allow listening on all server sockets */ - if (set_nonblock(fd, true) != 0) { + /* + * We want to listen to all sockets in one thread, + * so don't block. + */ + if (set_nonblock(fd) != 0) { close(fd); continue; } @@ -198,7 +302,21 @@ create_server_socket(char const *hostname, char const *service, int *result) (addr->ai_canonname != NULL) ? addr->ai_canonname : "any", port); freeaddrinfo(addrs); - *result = fd; + + if (listen(fd, config_get_server_queue()) != 0) { + close(fd); + return pr_op_errno(errno, "listen() failure"); + } + + server.fd = fd; + /* Ignore failure; this is just a nice-to-have. */ + server.addr = strdup(input_addr); + error = server_arraylist_add(&servers, &server); + if (error) { + close(fd); + return error; + } + return 0; /* Happy path */ } @@ -207,155 +325,89 @@ create_server_socket(char const *hostname, char const *service, int *result) } static int -fd_node_create(struct fd_node **result) +init_server_fd(char const *input_addr) { - struct fd_node *node; - - node = malloc(sizeof(struct fd_node)); - if (node == NULL) - return pr_enomem(); - - node->id = -1; - - *result = node; - return 0; -} - -static int -server_fd_add(char const *address, char const *service) -{ - struct fd_node *node; + char *address; + char *service; int error; - node = NULL; - error = fd_node_create(&node); + address = NULL; + service = NULL; + + error = parse_address(input_addr, &address, &service); if (error) return error; - error = create_server_socket(address, service, &node->id); - if (error) { - free(node); - return error; - } + error = create_server_socket(input_addr, address, service); - SLIST_INSERT_HEAD(&fds, node, next); - pr_op_debug("Created server socket with FD %d.", node->id); - return 0; -} + free(address); + free(service); -static void -destroy_fds(void) -{ - struct fd_node *fd; - - while (!SLIST_EMPTY(&fds)) { - fd = fds.slh_first; - SLIST_REMOVE_HEAD(&fds, next); - close(fd->id); - free(fd); - } + return error; } static int -parse_address(char const *full_address, char const *default_service, - char **address, char **service) +init_server_fds(void) { - char *ptr; - char *tmp_addr; - char *tmp_serv; - size_t tmp_addr_len; - - ptr = strrchr(full_address, '#'); - if (ptr == NULL) { - tmp_addr = strdup(full_address); - if (tmp_addr == NULL) - return pr_enomem(); - - tmp_serv = strdup(default_service); - if (tmp_serv == NULL) { - free(tmp_addr); - return pr_enomem(); - } - *address = tmp_addr; - *service = tmp_serv; - return 0; - } + struct string_array const *conf_addrs; + unsigned int i; + int error; - if (*(ptr + 1) == '\0') - return pr_op_err("Invalid server address '%s', can't end with '#'", - full_address); + conf_addrs = config_get_server_address(); - tmp_addr_len = strlen(full_address) - strlen(ptr); - tmp_addr = malloc(tmp_addr_len + 1); - if (tmp_addr == NULL) - return pr_enomem(); - - memcpy(tmp_addr, full_address, tmp_addr_len); - tmp_addr[tmp_addr_len] = '\0'; + if (conf_addrs->length == 0) + return init_server_fd(NULL); - tmp_serv = strdup(ptr + 1); - if (tmp_serv == NULL) { - free(tmp_addr); - return pr_enomem(); + for (i = 0; i < conf_addrs->length; i++) { + error = init_server_fd(conf_addrs->array[i]); + if (error) + return error; /* Cleanup happens outside */ } - *address = tmp_addr; - *service = tmp_serv; return 0; } -static int -init_fds(void) +static void +handle_client_request(void *arg) { - struct string_array const *addresses; - char const *default_service; - char *address; - char *service; - unsigned int i; - int error; - - default_service = config_get_server_port(); - addresses = config_get_server_address(); - if (addresses->length == 0) - return server_fd_add(NULL, default_service); + struct client_request *crequest = arg; + struct pdu_reader reader; + struct rtr_request rrequest; + struct pdu_metadata const *meta; - for (i = 0; i < addresses->length; i++) { - address = NULL; - service = NULL; - error = parse_address(addresses->array[i], default_service, - &address, &service); - if (error) - return error; + pdu_reader_init(&reader, crequest->buffer, crequest->nread); - error = server_fd_add(address, service); - /* Always release them */ - free(address); - free(service); - if (error) - return error; + while (pdu_load(&reader, crequest->client, &rrequest, &meta) == 0) { + meta->handle(crequest->client->fd, &rrequest); + meta->destructor(rrequest.pdu); } - return 0; + free(crequest); +} + +static void +init_pollfd(struct pollfd *pfd, int fd) +{ + pfd->fd = fd; + pfd->events = POLLIN; + pfd->revents = 0; } -enum verdict { - /* No errors; continue happily. */ - VERDICT_SUCCESS, - /* A temporal error just happened. Try again. */ - VERDICT_RETRY, - /* "Stop whatever you're doing and return." */ - VERDICT_EXIT, +enum accept_verdict { + AV_SUCCESS, + AV_CLIENT_ERROR, + AV_SERVER_ERROR, }; /* * Converts an error code to a verdict. * The error code is assumed to have been spewed by the `accept()` function. */ -static enum verdict +static enum accept_verdict handle_accept_result(int client_fd, int err) { if (client_fd >= 0) - return VERDICT_SUCCESS; + return AV_SUCCESS; /* * Note: I can't just use a single nice switch because EAGAIN and @@ -387,279 +439,360 @@ handle_accept_result(int client_fd, int err) pr_op_info("Client connection attempt not accepted: %s. Quitting...", strerror(err)); - return VERDICT_EXIT; + return AV_SERVER_ERROR; retry: pr_op_info("Client connection attempt not accepted: %s. Retrying...", strerror(err)); - return VERDICT_RETRY; + return AV_CLIENT_ERROR; } -static void -clean_request(struct rtr_request *request, const struct pdu_metadata *meta) +static enum accept_verdict +accept_new_client(struct pollfd const *server_fd) { - free(request->bytes); - meta->destructor(request->pdu); -} + struct sockaddr_storage client_addr; + socklen_t sizeof_client_addr; + struct rtr_client client; + enum accept_verdict result; -static int -print_close_failure(int error, struct sockaddr_storage *sockaddr) -{ - return pr_op_errno(error, "close() failed on socket of client %s", - sockaddr2str(sockaddr)); + sizeof_client_addr = sizeof(client_addr); + + /* Accept the connection */ + client.fd = accept(server_fd->fd, (struct sockaddr *) &client_addr, + &sizeof_client_addr); + + result = handle_accept_result(client.fd, errno); + if (result != AV_SUCCESS) + return result; + + if (set_nonblock(client.fd) != 0) { + close(client.fd); + return AV_CLIENT_ERROR; + } + + client.rtr_version = -1; + sockaddr2str(&client_addr, client.addr); + if (client_arraylist_add(&clients, &client) != 0) { + close(client.fd); + return AV_CLIENT_ERROR; + } + + pr_op_info("Client accepted [FD: %d]: %s", client.fd, client.addr); + return AV_SUCCESS; } -static int -end_client(struct client *client, void *arg) +/* + * true: success. + * false: oh noes; close socket. + */ +static bool +read_until_block(int fd, struct client_request *request) { - char const *action = arg; - bool rejected; + ssize_t read_result; + size_t offset; - /* When we (server) are closing the connection */ - rejected = (strcmp(action, CL_REJECTED) == 0); - if (arg != NULL && (strcmp(arg, CL_TERMINATED) == 0 || rejected)) - shutdown(client->fd, SHUT_RDWR); + request->nread = 0; - if (close(client->fd) != 0) - return print_close_failure(errno, &client->addr); + for (offset = 0; offset < REQUEST_BUFFER_LEN; offset += read_result) { + read_result = read(fd, &request->buffer[offset], + REQUEST_BUFFER_LEN - offset); + if (read_result == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return true; /* Ok, we have the full packet. */ + + pr_op_errno(errno, "Client socket read interrupted"); + return false; + } + + if (read_result == 0) { + if (offset == 0) { + pr_op_debug("Client closed the socket."); + return false; + } + + return true; /* Ok, we have the last packet. */ + } - if (rejected) { - pr_op_warn("Client %s [ID %d]: %s", action, client->fd, - sockaddr2str(&client->addr)); - pr_op_warn("Use a greater value at 'thread-pool.server.max' if you wish to accept more than %u clients.", - config_get_thread_pool_server_max()); - return 0; + request->nread += read_result; } - pr_op_info("Client %s [ID %d]: %s", action, client->fd, - sockaddr2str(&client->addr)); - return 0; + pr_op_warn("Peer's request is too big (>= %u bytes). Peer does not look like an RTR client; closing connection.", + REQUEST_BUFFER_LEN); + return false; } -static void -reject_client(int fd, struct sockaddr_storage *addr) +static bool +__handle_client_request(struct rtr_client *client) { - struct client client; + struct client_request *request; + int error; + + request = malloc(sizeof(struct client_request)); + if (request == NULL) { + pr_enomem(); + return false; + } - client.fd = fd; - client.addr = *addr; + request->client = client; + if (!read_until_block(client->fd, request)) + goto cancel; - /* Try to be polite notifying there was an error */ - err_pdu_send_internal_error(fd, RTR_V0); - end_client(&client, CL_REJECTED); + pr_op_debug("Client sent %zu bytes.", request->nread); + error = thread_pool_push(request_handlers, "RTR request", + handle_client_request, request); + if (error) + goto cancel; + + return true; + +cancel: + free(request); + return false; } -/* - * The client socket threads' entry routine. - * @arg must be released. - */ static void -client_thread_cb(void *arg) +print_poll_failure(struct pollfd *pfd, char const *what, char const *addr) { - struct pdu_metadata const *meta; - struct rtr_request request; - struct thread_param param; - int error; + if (pfd->revents & POLLHUP) + pr_op_err("%s '%s' down: POLLHUP (Peer hung up)", what, addr); + if (pfd->revents & POLLERR) + pr_op_err("%s '%s' down: POLLERR (Generic error)", what, addr); + if (pfd->revents & POLLNVAL) + pr_op_err("%s '%s' down: POLLNVAL (fd not open)", what, addr); +} - memcpy(¶m, arg, sizeof(param)); - free(arg); +static void +apply_pollfds(struct pollfd_arraylist *pollfds, unsigned int nclients) +{ + struct pollfd *pfd; + struct rtr_server *server; + struct rtr_client *client; + unsigned int i; - error = clients_add(param.fd, param.addr); - if (error) { - close(param.fd); - return; + for (i = 0; i < servers.len; i++) { + pfd = &pollfds->array[i]; + server = &servers.array[i]; + + /* PR_DEBUG_MSG("pfd:%d server:%d", pfd->fd, server->fd); */ + + if ((pfd->fd == -1) && (server->fd != -1)) { + close(server->fd); + server->fd = -1; + print_poll_failure(pfd, "Server", server->addr); + } } - while (true) { /* For each PDU... */ - error = pdu_load(param.fd, ¶m.addr, &request, &meta); - if (error) - break; + for (i = 0; i < nclients; i++) { + pfd = &pollfds->array[servers.len + i]; + client = &clients.array[i]; - error = meta->handle(param.fd, &request); - clean_request(&request, meta); - if (error) - break; + /* PR_DEBUG_MSG("pfd:%d client:%d", pfd->fd, client->fd); */ + + if ((pfd->fd == -1) && (client->fd != -1)) { + close(client->fd); + client->fd = -1; + print_poll_failure(pfd, "Client", client->addr); + } } - clients_forget(param.fd, end_client, CL_CLOSED); + /* TODO clean up client array */ } -static void -init_fdset(struct server_fds *fds, fd_set *fdset) +static enum poll_verdict +fddb_poll(void) { - struct fd_node *node; + struct pollfd_arraylist pollfds; - FD_ZERO(fdset); - SLIST_FOREACH(node, fds, next) - FD_SET(node->id, fdset); -} + struct rtr_server *server; + struct rtr_client *client; + struct pollfd *fd; -/* - * Waits for client connections and spawns threads to handle them. - */ -static void -handle_client_connections(void *arg) -{ - struct sockaddr_storage client_addr; - struct thread_param *param; - struct timeval select_time; - socklen_t sizeof_client_addr; - fd_set readfds; - int last_server_fd; - int client_fd; - int fd; + unsigned int nclients; + unsigned int i; int error; - last_server_fd = SLIST_FIRST(&fds)->id; - sizeof_client_addr = sizeof(client_addr); + pollfd_arraylist_init(&pollfds); - /* I'm alive! */ - server_stop = false; + pollfds.len = servers.len + clients.len; + pollfds.capacity = pollfds.len; + pollfds.array = calloc(pollfds.len, sizeof(struct pollfd)); + if (pollfds.array == NULL) { + pr_enomem(); + return PV_RETRY; + } - pr_op_debug("Waiting for client connections at server..."); - do { - /* Query server_stop every second. */ - select_time.tv_sec = 1; - select_time.tv_usec = 0; + ARRAYLIST_FOREACH(&servers, server, i) + init_pollfd(&pollfds.array[i], server->fd); + ARRAYLIST_FOREACH(&clients, client, i) + init_pollfd(&pollfds.array[servers.len + i], client->fd); - /* Am I still alive? */ - if (server_stop) - break; + error = poll(pollfds.array, pollfds.len, 1000); - init_fdset(&fds, &readfds); + if (stop_server_thread) + goto stop; - if (select(last_server_fd + 1, &readfds, NULL, NULL, - &select_time) == -1) { - pr_op_errno(errno, "Monitoring server sockets"); - continue; + if (error == 0) + goto success; + + if (error < 0) { + error = errno; + switch (error) { + case EINTR: + pr_op_info("poll() was interrupted by some signal."); + goto stop; + case ENOMEM: + pr_enomem(); + /* Fall through */ + case EAGAIN: + goto retry; + case EFAULT: + case EINVAL: + pr_crit("poll() returned %d.", error); } + } - for (fd = 0; fd < (last_server_fd + 1); fd++) { - if (!FD_ISSET(fd, &readfds)) - continue; + /* The servers might change this number, so store a backup. */ + nclients = clients.len; - /* Accept the connection */ - client_fd = accept(fd, (struct sockaddr *) &client_addr, - &sizeof_client_addr); - switch (handle_accept_result(client_fd, errno)) { - case VERDICT_SUCCESS: - break; - case VERDICT_RETRY: - continue; - case VERDICT_EXIT: - return; - } + /* New connections */ + for (i = 0; i < servers.len; i++) { + /* This fd is a listening socket. */ + fd = &pollfds.array[i]; - /* - * It's very likely that the clients won't release their - * sessions once established; so, don't let any new - * client at the thread pool queue since it's probable - * that it'll remain there forever. - */ - if (!thread_pool_avail_threads(threads)) { - reject_client(client_fd, &client_addr); - continue; - } + /* PR_DEBUG_MSG("Server %u: fd:%d revents:%x", + i, fd->fd, fd->revents); */ - pr_op_info("Client %s [ID %d]: %s", CL_ACCEPTED, - client_fd, sockaddr2str(&client_addr)); - - /* - * Note: My gut says that errors from now on (even the - * unknown ones) should be treated as temporary; maybe - * the next accept() will work. - * So don't interrupt the thread when this happens. - */ - - /* - * On some systems, O_NONBLOCK is inherited. - * We very much don't want O_NONBLOCK on the client - * socket. - */ - if (set_nonblock(client_fd, false) != 0) { - close(client_fd); - continue; - } + if (fd->fd == -1) + continue; - param = malloc(sizeof(struct thread_param)); - if (param == NULL) { - /* No error PDU on memory allocation. */ - pr_enomem(); - close(client_fd); - continue; - } - param->fd = client_fd; - param->addr = client_addr; - - error = thread_pool_push(threads, "Client thread", - client_thread_cb, param); - if (error) { - pr_op_err("Couldn't push a thread to attend incoming RTR client"); - /* Error with min RTR version */ - err_pdu_send_internal_error(client_fd, RTR_V0); - close(client_fd); - free(param); + if (fd->revents & (POLLHUP | POLLERR | POLLNVAL)) { + fd->fd = -1; + + } else if (fd->revents & POLLIN) { + switch (accept_new_client(fd)) { + case AV_SUCCESS: + case AV_CLIENT_ERROR: + break; + case AV_SERVER_ERROR: + fd->fd = -1; } } - } while (true); -} + } -static int -start_server_thread(void) -{ - struct fd_node *node; - int error; + /* Client requests */ + for (i = 0; i < nclients; i++) { + /* This fd is a client handler socket. */ + fd = &pollfds.array[servers.len + i]; - SLIST_FOREACH(node, &fds, next) { - error = listen(node->id, config_get_server_queue()); - if (error) - return pr_op_errno(errno, - "Couldn't listen on server socket"); + /* PR_DEBUG_MSG("Client %u: fd:%d revents:%x", i, fd->fd, + fd->revents); */ + + if (fd->fd == -1) + continue; + + if (fd->revents & (POLLHUP | POLLERR | POLLNVAL)) { + fd->fd = -1; + } else if (fd->revents & POLLIN) { + if (!__handle_client_request(&clients.array[i])) + fd->fd = -1; + } } - return internal_pool_push("Server thread", handle_client_connections, - NULL); + lock_mutex(); + apply_pollfds(&pollfds, nclients); + unlock_mutex(); + /* Fall through */ + +success: + pollfd_arraylist_cleanup(&pollfds, NULL); + return PV_CONTINUE; +retry: + pollfd_arraylist_cleanup(&pollfds, NULL); + return PV_RETRY; +stop: + pollfd_arraylist_cleanup(&pollfds, NULL); + return PV_STOP; +} + +static void * +server_cb(void *arg) +{ + do { + switch (fddb_poll()) { + case PV_CONTINUE: + break; + case PV_RETRY: + sleep(1); + break; + case PV_STOP: + return NULL; + } + } while (true); } -/* - * Starts the RTR server. - */ int rtr_start(void) { int error; - SLIST_INIT(&fds); - threads = NULL; - server_stop = true; + server_arraylist_init(&servers); + client_arraylist_init(&clients); - error = init_fds(); + error = init_server_fds(); if (error) - goto revert_server_fds; + goto revert_fds; error = thread_pool_create("Server", - config_get_thread_pool_server_max(), &threads); + config_get_thread_pool_server_max(), + &request_handlers); if (error) - goto revert_server_fds; + goto revert_fds; - error = start_server_thread(); - if (error) - goto revert_thread_pool; + error = pthread_create(&server_thread, NULL, server_cb, NULL); + if (error) { + thread_pool_destroy(request_handlers); + goto revert_fds; + } return 0; -revert_thread_pool: - thread_pool_destroy(threads); -revert_server_fds: - destroy_fds(); +revert_fds: + destroy_db(); return error; } -void -rtr_stop(void) +void rtr_stop(void) { - server_stop = true; - clients_terminate_all(end_client, CL_TERMINATED); - thread_pool_destroy(threads); - destroy_fds(); + int error; + + stop_server_thread = true; + error = pthread_join(server_thread, NULL); + if (error) + pr_op_errno(error, "pthread_join() returned %d", error); + + thread_pool_destroy(request_handlers); + + destroy_db(); +} + +int +rtr_foreach_client(rtr_foreach_client_cb cb, void *arg) +{ + struct rtr_client *client; + unsigned int i; + int error = 0; + + lock_mutex(); + + ARRAYLIST_FOREACH(&clients, client, i) { + if (client->fd != -1) { + error = cb(client, arg); + if (error) + break; + } + } + + unlock_mutex(); + + return error; } diff --git a/src/rtr/rtr.h b/src/rtr/rtr.h index 7922af52..51e69546 100644 --- a/src/rtr/rtr.h +++ b/src/rtr/rtr.h @@ -1,7 +1,24 @@ #ifndef RTR_RTR_H_ #define RTR_RTR_H_ +#include + +struct rtr_server { + int fd; + /* Printable address to which the server was bound. */ + char *addr; +}; + +struct rtr_client { + int fd; + char addr[INET6_ADDRSTRLEN]; /* Printable address of the client. */ + int rtr_version; /* -1: unset; > 0: version number */ +}; + int rtr_start(void); void rtr_stop(void); +typedef int (*rtr_foreach_client_cb)(struct rtr_client const *, void *arg); +int rtr_foreach_client(rtr_foreach_client_cb, void *); + #endif /* RTR_RTR_H_ */ diff --git a/src/serial.c b/src/serial.c new file mode 100644 index 00000000..b364ec90 --- /dev/null +++ b/src/serial.c @@ -0,0 +1,14 @@ +#include "serial.h" + +/* + * Returns s1 < s2 , according to RFC 1982 serial arithmetic. + */ +bool +serial_lt(serial_t s1, serial_t s2) +{ + if (s1 == s2) + return false; + + return ((s1 < s2) && ((s2 - s1) < 0x80000000u)) || + ((s1 > s2) && ((s1 - s2) > 0x80000000u)); +} diff --git a/src/serial.h b/src/serial.h new file mode 100644 index 00000000..32e421a2 --- /dev/null +++ b/src/serial.h @@ -0,0 +1,11 @@ +#ifndef SRC_SERIAL_H_ +#define SRC_SERIAL_H_ + +#include +#include + +typedef uint32_t serial_t; + +bool serial_lt(serial_t s1, serial_t s2); + +#endif /* SRC_SERIAL_H_ */ diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index 9ee8eb8a..a58c6798 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -68,7 +68,7 @@ struct thread_pool { * Enable to signal all threads to stop. * (But all ongoing tasks will be completed first.) */ - bool stop; + volatile bool stop; /* * Tasks registered by the Parent Thread, currently waiting for a * Worker Thread to claim them. diff --git a/src/thread/thread_pool.h b/src/thread/thread_pool.h index b714cf41..921cfcf7 100644 --- a/src/thread/thread_pool.h +++ b/src/thread/thread_pool.h @@ -3,6 +3,10 @@ #include +/* + * THREAD POOL THREADS ARE NOT ALLOWED TO SLEEP FOR LONG PERIODS OF TIME. + */ + /* Thread pool base struct */ struct thread_pool; diff --git a/test/Makefile.am b/test/Makefile.am index acba67a6..54ed1f0b 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -20,11 +20,12 @@ AM_CFLAGS = -pedantic -Wall -std=gnu11 -I../src -DUNIT_TESTING ${CHECK_CFLAGS} $ MY_LDADD = ${CHECK_LIBS} check_PROGRAMS = address.test -check_PROGRAMS += clients.test +check_PROGRAMS += deltas_array.test check_PROGRAMS += db_table.test check_PROGRAMS += line_file.test check_PROGRAMS += pdu_handler.test check_PROGRAMS += rsync.test +check_PROGRAMS += serial.test check_PROGRAMS += tal.test check_PROGRAMS += thread_pool.test check_PROGRAMS += vcard.test @@ -37,8 +38,8 @@ TESTS = ${check_PROGRAMS} address_test_SOURCES = address_test.c address_test_LDADD = ${MY_LDADD} -clients_test_SOURCES = client_test.c -clients_test_LDADD = ${MY_LDADD} +deltas_array_test_SOURCES = rtr/db/deltas_array_test.c +deltas_array_test_LDADD = ${MY_LDADD} db_table_test_SOURCES = rtr/db/db_table_test.c db_table_test_LDADD = ${MY_LDADD} @@ -52,6 +53,9 @@ pdu_handler_test_LDADD = ${MY_LDADD} ${JANSSON_LIBS} rsync_test_SOURCES = rsync_test.c rsync_test_LDADD = ${MY_LDADD} +serial_test_SOURCES = serial_test.c +serial_test_LDADD = ${MY_LDADD} + tal_test_SOURCES = tal_test.c tal_test_LDADD = ${MY_LDADD} diff --git a/test/client_test.c b/test/client_test.c deleted file mode 100644 index efa4a0da..00000000 --- a/test/client_test.c +++ /dev/null @@ -1,97 +0,0 @@ -#include -#include - -#include "clients.c" -#include "common.c" -#include "log.c" -#include "impersonator.c" - -static int -handle_foreach(struct client *client, void *arg) -{ - unsigned int *state = arg; - - switch (*state) { - case 0: - ck_assert_int_eq(1, client->fd); - break; - case 1: - ck_assert_int_eq(2, client->fd); - break; - case 2: - ck_assert_int_eq(4, client->fd); - break; - default: - ck_abort_msg("Invalid state: %u", *state); - } - - (*state)++; - return 0; -} - -START_TEST(basic_test) -{ - /* - * The module is pretty simple; there's not much to test. - * I'm mostly just concerned about uthash usage; I've never used uthash - * before. - */ - struct sockaddr_storage addr; - unsigned int i; - unsigned int state; - - memset(&addr, 0, sizeof(addr)); - addr.ss_family = AF_INET; - - ck_assert_int_eq(0, clients_db_init()); - - /* - * The address is actually supposed to be unique, but this is rather - * enforced by calling code, so whatever. - */ - - for (i = 0; i < 4; i++) { - ck_assert_int_eq(0, clients_add(1, addr)); - ck_assert_int_eq(0, clients_add(2, addr)); - ck_assert_int_eq(0, clients_add(3, addr)); - ck_assert_int_eq(0, clients_add(4, addr)); - } - - clients_forget(3, NULL, NULL); - - state = 0; - ck_assert_int_eq(0, clients_foreach(handle_foreach, &state)); - ck_assert_uint_eq(3, state); - - clients_db_destroy(); -} -END_TEST - -Suite *clients_load_suite(void) -{ - Suite *suite; - TCase *core; - - core = tcase_create("Core"); - tcase_add_test(core, basic_test); - - suite = suite_create("Clients suite"); - suite_add_tcase(suite, core); - return suite; -} - -int main(void) -{ - Suite *suite; - SRunner *runner; - int tests_failed; - - suite = clients_load_suite(); - - runner = srunner_create(suite); - srunner_run_all(runner, CK_NORMAL); - tests_failed = srunner_ntests_failed(runner); - srunner_free(runner); - - return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; -} diff --git a/test/rtr/db/deltas_array_test.c b/test/rtr/db/deltas_array_test.c new file mode 100644 index 00000000..cce227d6 --- /dev/null +++ b/test/rtr/db/deltas_array_test.c @@ -0,0 +1,106 @@ +#include +#include + +#include "log.c" +#include "impersonator.c" +#include "rtr/db/delta.c" +#include "rtr/db/deltas_array.c" + +#define TOTAL_CREATED 15 +struct deltas *created[TOTAL_CREATED]; + +unsigned int +config_get_deltas_lifetime(void) +{ + return 5; +} + +static int +foreach_cb(struct deltas *deltas, void *arg) +{ + unsigned int *next_index = arg; + + ck_assert_ptr_eq(created[*next_index], deltas); + (*next_index)++; + + return 0; +} + +static void +test_foreach(struct deltas_array *darray, unsigned int total, + unsigned int offset) +{ + unsigned int next_index; + unsigned int i; + + for (i = 0; i <= total; i++) { + next_index = total - i + offset; + ck_assert_int_eq(0, darray_foreach_since(darray, i, + foreach_cb, &next_index)); + ck_assert_uint_eq(total + offset, next_index); + } + + ck_assert_int_eq(-EINVAL, darray_foreach_since(darray, total + 1, + foreach_cb, &next_index)); +} + +START_TEST(add_only) +{ + struct deltas_array *darray; + unsigned int i; + + darray = darray_create(); + ck_assert_ptr_ne(NULL, darray); + + for (i = 0; i < TOTAL_CREATED; i++) + ck_assert_int_eq(0, deltas_create(&created[i])); + + test_foreach(darray, 0, 0); + + darray_add(darray, created[0]); + test_foreach(darray, 1, 0); + + darray_add(darray, created[1]); + test_foreach(darray, 2, 0); + + darray_add(darray, created[2]); + test_foreach(darray, 3, 0); + + darray_add(darray, created[3]); + test_foreach(darray, 4, 0); + + for (i = 4; i < TOTAL_CREATED; i++) { + darray_add(darray, created[i]); + test_foreach(darray, 5, i - 4); + } +} +END_TEST + +Suite *address_load_suite(void) +{ + Suite *suite; + TCase *core; + + core = tcase_create("Core"); + tcase_add_test(core, add_only); + + suite = suite_create("Deltas Array"); + suite_add_tcase(suite, core); + return suite; +} + +int main(void) +{ + Suite *suite; + SRunner *runner; + int tests_failed; + + suite = address_load_suite(); + + runner = srunner_create(suite); + srunner_run_all(runner, CK_NORMAL); + tests_failed = srunner_ntests_failed(runner); + srunner_free(runner); + + return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/test/rtr/db/rtr_db_impersonator.c b/test/rtr/db/rtr_db_impersonator.c index 9b8d930e..2abe23fe 100644 --- a/test/rtr/db/rtr_db_impersonator.c +++ b/test/rtr/db/rtr_db_impersonator.c @@ -19,7 +19,7 @@ static unsigned char db_imp_spk[] = { 0x2f, 0x7b, 0x39, 0x9f, 0x70, 0x42, 0xd4, 0x07, 0xce, 0xde, 0x04 }; -static int iteration = 0; +static int serial = 1; static void add_v4(struct validation_handler *handler, uint32_t as) @@ -79,13 +79,13 @@ perform_standalone_validation(struct thread_pool *pool, struct db_table *table) handler.handle_router_key = __handle_router_key; handler.arg = table; - switch (iteration) { - case 0: + switch (serial) { + case 1: add_v4(&handler, 0); add_v6(&handler, 0); add_rk(&handler, 0); break; - case 1: + case 2: add_v4(&handler, 0); add_v6(&handler, 0); add_rk(&handler, 0); @@ -93,21 +93,21 @@ perform_standalone_validation(struct thread_pool *pool, struct db_table *table) add_v6(&handler, 1); add_rk(&handler, 1); break; - case 2: + case 3: add_v4(&handler, 1); add_v6(&handler, 1); add_rk(&handler, 1); break; - case 3: + case 4: add_v4(&handler, 0); add_v6(&handler, 0); add_rk(&handler, 0); break; default: ck_abort_msg("perform_standalone_validation() was called too many times (%d).", - iteration); + serial); } - iteration++; + serial++; return 0; } diff --git a/test/rtr/db/vrps_test.c b/test/rtr/db/vrps_test.c index 34e79124..50188eb9 100644 --- a/test/rtr/db/vrps_test.c +++ b/test/rtr/db/vrps_test.c @@ -10,8 +10,10 @@ #include "json_parser.c" #include "log.c" #include "output_printer.c" +#include "serial.c" #include "object/router_key.c" #include "rtr/db/delta.c" +#include "rtr/db/deltas_array.c" #include "rtr/db/db_table.c" #include "rtr/db/rtr_db_impersonator.c" #include "rtr/db/vrps.c" @@ -55,56 +57,66 @@ static const bool deltas_1to3[] = { 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, }; static const bool deltas_2to3[] = { 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, }; static const bool deltas_3to3[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; -/* Deltas with rules that override each other */ -static const bool deltas_1to4_ovrd[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, }; -static const bool deltas_2to4_ovrd[] = { 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, }; -static const bool deltas_3to4_ovrd[] = { 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, }; -static const bool deltas_4to4_ovrd[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; - -/* Deltas cleaned up */ -static const bool deltas_1to4_clean[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; -static const bool deltas_2to4_clean[] = { 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, }; -static const bool deltas_3to4_clean[] = { 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, }; -static const bool deltas_4to4_clean[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; +static const bool deltas_1to4[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; +static const bool deltas_2to4[] = { 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, }; +static const bool deltas_3to4[] = { 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, }; +static const bool deltas_4to4[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; /* Impersonator functions */ -serial_t current_min_serial = 0; +unsigned int deltas_lifetime = 5; -int -clients_get_min_serial(serial_t *result) +unsigned int +config_get_deltas_lifetime(void) { - *result = current_min_serial; - return 0; + return deltas_lifetime; } /* Test functions */ -static int -vrp_fail(struct vrp const *vrp, void *arg) +static char const * +vrpaddr2str(struct vrp const *vrp) { - char const *addr; - switch (vrp->addr_fam) { case AF_INET: - addr = v4addr2str(&vrp->prefix.v4); - break; + return v4addr2str(&vrp->prefix.v4); case AF_INET6: - addr = v6addr2str(&vrp->prefix.v6); - break; - default: - addr = "unknown"; + return v6addr2str(&vrp->prefix.v6); } + return "unknown"; +} + +static int +vrp_fail(struct vrp const *vrp, void *arg) +{ ck_abort_msg("Expected no callbacks, got VRP %u/%s/%u/%u.", - vrp->asn, addr, vrp->prefix_length, vrp->max_prefix_length); + vrp->asn, vrpaddr2str(vrp), vrp->prefix_length, + vrp->max_prefix_length); return -EINVAL; } static int rk_fail(struct router_key const *key, void *arg) { - ck_abort_msg("Expected no callbacks, got from RK ASN %u.", key->as); + ck_abort_msg("Expected no callbacks, got RK %u.", key->as); + return -EINVAL; +} + +static int +dvrp_fail(struct delta_vrp const *delta, void *arg) +{ + ck_abort_msg("Expected no callbacks, got Delta VRP %u/%s/%u/%u/%u.", + delta->vrp.asn, vrpaddr2str(&delta->vrp), delta->vrp.prefix_length, + delta->vrp.max_prefix_length, delta->flags); + return -EINVAL; +} + +static int +drk_fail(struct delta_router_key const *delta, void *arg) +{ + ck_abort_msg("Expected no callbacks, got Delta RK %u/%u.", + delta->router_key.as, delta->flags); return -EINVAL; } @@ -258,82 +270,52 @@ check_base(serial_t expected_serial, bool const *expected_base) static int vrp_add(struct delta_vrp const *delta, void *arg) { - struct deltas *deltas = arg; - struct vrp const *vrp; - struct v4_address v4; - struct v6_address v6; + union { + struct v4_address v4; + struct v6_address v6; + } addr; - vrp = &delta->vrp; - switch (vrp->addr_fam) { + switch (delta->vrp.addr_fam) { case AF_INET: - v4.prefix.len = vrp->prefix_length; - v4.prefix.addr = vrp->prefix.v4; - v4.max_length = vrp->max_prefix_length; - deltas_add_roa_v4(deltas, vrp->asn, &v4, delta->flags); - break; + addr.v4.prefix.len = delta->vrp.prefix_length; + addr.v4.prefix.addr = delta->vrp.prefix.v4; + addr.v4.max_length = delta->vrp.max_prefix_length; + return deltas_add_roa_v4(arg, delta->vrp.asn, &addr.v4, + delta->flags); case AF_INET6: - v6.prefix.len = vrp->prefix_length; - v6.prefix.addr = vrp->prefix.v6; - v6.max_length = vrp->max_prefix_length; - deltas_add_roa_v6(deltas, vrp->asn, &v6, delta->flags); - break; - default: - ck_abort_msg("Unknown addr family"); + addr.v6.prefix.len = delta->vrp.prefix_length; + addr.v6.prefix.addr = delta->vrp.prefix.v6; + addr.v6.max_length = delta->vrp.max_prefix_length; + return deltas_add_roa_v6(arg, delta->vrp.asn, &addr.v6, + delta->flags); } - return 0; + + ck_abort_msg("Unknown family: %u", delta->vrp.addr_fam); } static int rk_add(struct delta_router_key const *delta, void *arg) { - struct deltas *deltas = arg; - struct router_key key; - - key = delta->router_key; - deltas_add_router_key(deltas, &key, delta->flags); - return 0; + return deltas_add_router_key(arg, &delta->router_key, delta->flags); } static void -filter_deltas(struct deltas_db *db) +check_deltas(serial_t from, serial_t to, bool const *expected_deltas) { - struct deltas_db tmp; - struct delta_group group; struct deltas *deltas; - - group.serial = 0; - ck_assert_int_eq(0, deltas_create(&deltas)); - group.deltas = deltas; - ck_assert_int_eq(0, vrps_foreach_filtered_delta(db, vrp_add, - rk_add, group.deltas)); - deltas_db_init(&tmp); - ck_assert_int_eq(0, deltas_db_add(&tmp, &group)); - - *db = tmp; -} - -static void -check_deltas(serial_t from, serial_t to, bool const *expected_deltas, - bool filter) -{ serial_t actual_serial; bool actual_deltas[12]; - struct deltas_db deltas; - struct delta_group *group; array_index i; - deltas_db_init(&deltas); - ck_assert_int_eq(0, vrps_get_deltas_from(from, &actual_serial, - &deltas)); - ck_assert_uint_eq(to, actual_serial); + ck_assert_int_eq(0, deltas_create(&deltas)); - if (filter) - filter_deltas(&deltas); + ck_assert_int_eq(0, vrps_foreach_delta_since(from, &actual_serial, + vrp_add, rk_add, deltas)); + ck_assert_uint_eq(to, actual_serial); memset(actual_deltas, 0, sizeof(actual_deltas)); - ARRAYLIST_FOREACH(&deltas, group, i) - ck_assert_int_eq(0, deltas_foreach(group->serial, group->deltas, - delta_vrp_check, delta_rk_check, actual_deltas)); + ck_assert_int_eq(0, deltas_foreach(deltas, delta_vrp_check, + delta_rk_check, actual_deltas)); for (i = 0; i < ARRAY_LEN(actual_deltas); i++) ck_assert_uint_eq(expected_deltas[i], actual_deltas[i]); } @@ -342,59 +324,53 @@ static void check_no_deltas(serial_t from) { serial_t actual_to; - struct deltas_db deltas; - - deltas_db_init(&deltas); - ck_assert_int_eq(-ESRCH, vrps_get_deltas_from(from, &actual_to, - &deltas)); + ck_assert_int_eq(-ESRCH, vrps_foreach_delta_since(from, &actual_to, + dvrp_fail, drk_fail, NULL)); } static void -create_deltas_1to2(struct deltas_db *deltas, serial_t *serial, bool *changed, - bool *iterated_entries) +create_deltas_1to2(serial_t *serial, bool *changed, bool *iterated_entries) { - current_min_serial = 0; - - deltas_db_init(deltas); - ck_assert_int_eq(0, vrps_init()); /* First validation not yet performed: Tell routers to wait */ ck_assert_int_eq(-EAGAIN, get_last_serial_number(serial)); ck_assert_int_eq(-EAGAIN, vrps_foreach_base(vrp_fail, rk_fail, iterated_entries)); - ck_assert_int_eq(-EAGAIN, vrps_get_deltas_from(0, serial, deltas)); + ck_assert_int_eq(-EAGAIN, vrps_foreach_delta_since(0, serial, dvrp_fail, + drk_fail, NULL)); /* First validation: One tree, no deltas */ ck_assert_int_eq(0, vrps_update(changed)); check_serial(1); check_base(1, iteration1_base); - check_deltas(1, 1, deltas_1to1, false); + check_deltas(1, 1, deltas_1to1); /* Second validation: One tree, added deltas */ ck_assert_int_eq(0, vrps_update(changed)); check_serial(2); check_base(2, iteration2_base); - check_deltas(1, 2, deltas_1to2, false); - check_deltas(2, 2, deltas_2to2, false); + check_deltas(1, 2, deltas_1to2); + check_deltas(2, 2, deltas_2to2); } START_TEST(test_basic) { - struct deltas_db deltas; serial_t serial; bool changed; bool iterated_entries[12]; - create_deltas_1to2(&deltas, &serial, &changed, iterated_entries); + deltas_lifetime = 5; + + create_deltas_1to2(&serial, &changed, iterated_entries); /* Third validation: One tree, removed deltas */ ck_assert_int_eq(0, vrps_update(&changed)); check_serial(3); check_base(3, iteration3_base); - check_deltas(1, 3, deltas_1to3, false); - check_deltas(2, 3, deltas_2to3, false); - check_deltas(3, 3, deltas_3to3, false); + check_deltas(1, 3, deltas_1to3); + check_deltas(2, 3, deltas_2to3); + check_deltas(3, 3, deltas_3to3); vrps_destroy(); } @@ -402,70 +378,54 @@ END_TEST START_TEST(test_delta_forget) { - struct deltas_db deltas; serial_t serial; bool changed; bool iterated_entries[12]; - create_deltas_1to2(&deltas, &serial, &changed, iterated_entries); + deltas_lifetime = 1; - /* - * Assume that the client(s) already have serial 2 (serial 3 will be - * created) so serial 1 isn't needed anymore. - */ - current_min_serial = 2; + create_deltas_1to2(&serial, &changed, iterated_entries); /* Third validation: One tree, removed deltas and delta 1 removed */ ck_assert_int_eq(0, vrps_update(&changed)); check_serial(3); check_base(3, iteration3_base); check_no_deltas(1); - check_deltas(2, 3, deltas_2to3, false); - check_deltas(3, 3, deltas_3to3, false); + check_deltas(2, 3, deltas_2to3); + check_deltas(3, 3, deltas_3to3); vrps_destroy(); - - /* Return to its initial value */ - current_min_serial = 0; } END_TEST START_TEST(test_delta_ovrd) { - struct deltas_db deltas; serial_t serial; bool changed; bool iterated_entries[12]; - create_deltas_1to2(&deltas, &serial, &changed, iterated_entries); + deltas_lifetime = 3; + + create_deltas_1to2(&serial, &changed, iterated_entries); /* Third validation: One tree, removed deltas */ ck_assert_int_eq(0, vrps_update(&changed)); check_serial(3); check_base(3, iteration3_base); - check_deltas(1, 3, deltas_1to3, false); - check_deltas(2, 3, deltas_2to3, false); - check_deltas(3, 3, deltas_3to3, false); + check_deltas(1, 3, deltas_1to3); + check_deltas(2, 3, deltas_2to3); + check_deltas(3, 3, deltas_3to3); /* Fourth validation with deltas that override each other */ ck_assert_int_eq(0, vrps_update(&changed)); check_serial(4); check_base(4, iteration4_base); - check_deltas(1, 4, deltas_1to4_ovrd, false); - check_deltas(2, 4, deltas_2to4_ovrd, false); - check_deltas(3, 4, deltas_3to4_ovrd, false); - check_deltas(4, 4, deltas_4to4_ovrd, false); - - /* Check "cleaned up" deltas */ - check_deltas(1, 4, deltas_1to4_clean, true); - check_deltas(2, 4, deltas_2to4_clean, true); - check_deltas(3, 4, deltas_3to4_clean, true); - check_deltas(4, 4, deltas_4to4_clean, true); + check_deltas(1, 4, deltas_1to4); + check_deltas(2, 4, deltas_2to4); + check_deltas(3, 4, deltas_3to4); + check_deltas(4, 4, deltas_4to4); vrps_destroy(); - - /* Return to its initial value */ - current_min_serial = 0; } END_TEST diff --git a/test/rtr/pdu_handler_test.c b/test/rtr/pdu_handler_test.c index 94148e20..70479e29 100644 --- a/test/rtr/pdu_handler_test.c +++ b/test/rtr/pdu_handler_test.c @@ -10,6 +10,7 @@ #include "json_parser.c" #include "log.c" #include "output_printer.c" +#include "serial.c" #include "crypto/base64.c" #include "object/router_key.c" #include "rtr/pdu.c" @@ -17,8 +18,8 @@ #include "rtr/primitive_reader.c" #include "rtr/primitive_writer.c" #include "rtr/err_pdu.c" -#include "rtr/stream.c" #include "rtr/db/delta.c" +#include "rtr/db/deltas_array.c" #include "rtr/db/db_table.c" #include "rtr/db/rtr_db_impersonator.c" #include "rtr/db/vrps.c" @@ -69,6 +70,10 @@ has_expected_pdus(void) return !STAILQ_EMPTY(&expected_pdus); } +/* + * This initializes the database using the test values from + * db/rtr_db_impersonator.c. + */ static void init_db_full(void) { @@ -108,11 +113,10 @@ init_serial_query(struct rtr_request *request, struct serial_query_pdu *query, /* Impersonator functions */ -int -clients_get_min_serial(serial_t *result) +unsigned int +config_get_deltas_lifetime(void) { - *result = 0; - return 0; + return 5; } int @@ -145,6 +149,18 @@ send_cache_response_pdu(int fd, uint8_t version) return 0; } +static char const * +flags2str(uint8_t flags) +{ + switch (flags) { + case FLAG_ANNOUNCEMENT: + return "add"; + case FLAG_WITHDRAWAL: + return "rm"; + } + return "unk"; +} + int send_prefix_pdu(int fd, uint8_t version, struct vrp const *vrp, uint8_t flags) { @@ -155,9 +171,22 @@ send_prefix_pdu(int fd, uint8_t version, struct vrp const *vrp, uint8_t flags) */ uint8_t pdu_type = pop_expected_pdu(); pr_op_info(" Server sent Prefix PDU."); + + switch (vrp->addr_fam) { + case AF_INET: + PR_DEBUG_MSG("%s asn%u IPv4", flags2str(flags), vrp->asn); + break; + case AF_INET6: + PR_DEBUG_MSG("%s asn%u IPv6", flags2str(flags), vrp->asn); + break; + default: + PR_DEBUG_MSG("%s asn%u Unknown", flags2str(flags), vrp->asn); + break; + } + ck_assert_msg(pdu_type == PDU_TYPE_IPV4_PREFIX || pdu_type == PDU_TYPE_IPV6_PREFIX, - "Server's PDU type is %d, not one of the IP Prefixes.", pdu_type); + "Server sent a prefix. Expected PDU type was %d.", pdu_type); return 0; } @@ -172,39 +201,9 @@ send_router_key_pdu(int fd, uint8_t version, */ uint8_t pdu_type = pop_expected_pdu(); pr_op_info(" Server sent Router Key PDU."); + PR_DEBUG_MSG("%s asn%u RK", flags2str(flags), router_key->as); ck_assert_msg(pdu_type == PDU_TYPE_ROUTER_KEY, - "Server's PDU type is %d, not Router Key type.", pdu_type); - return 0; -} - -static int -handle_delta(struct delta_vrp const *delta, void *arg) -{ - int *fd = arg; - ck_assert_int_eq(0, send_prefix_pdu(*fd, RTR_V1, &delta->vrp, - delta->flags)); - return 0; -} - -static int -handle_delta_router_key(struct delta_router_key const *delta, void *arg) -{ - int *fd = arg; - ck_assert_int_eq(0, send_router_key_pdu(*fd, RTR_V1, &delta->router_key, - delta->flags)); - return 0; -} - -int -send_delta_pdus(int fd, uint8_t version, struct deltas_db *deltas) -{ - struct delta_group *group; - array_index i; - - ARRAYLIST_FOREACH(deltas, group, i) - ck_assert_int_eq(0, deltas_foreach(group->serial, group->deltas, - handle_delta, handle_delta_router_key, &fd)); - + "Server sent a Router Key. Expected PDU type was %d.", pdu_type); return 0; } @@ -286,14 +285,15 @@ START_TEST(test_typical_exchange) expected_pdu_add(PDU_TYPE_CACHE_RESPONSE); expected_pdu_add(PDU_TYPE_IPV4_PREFIX); expected_pdu_add(PDU_TYPE_IPV6_PREFIX); - expected_pdu_add(PDU_TYPE_ROUTER_KEY); expected_pdu_add(PDU_TYPE_IPV4_PREFIX); expected_pdu_add(PDU_TYPE_IPV6_PREFIX); expected_pdu_add(PDU_TYPE_ROUTER_KEY); + expected_pdu_add(PDU_TYPE_ROUTER_KEY); expected_pdu_add(PDU_TYPE_END_OF_DATA); /* From serial 1: Run and validate */ ck_assert_int_eq(0, handle_serial_query_pdu(0, &request)); + PR_DEBUG; ck_assert_uint_eq(false, has_expected_pdus()); /* From serial 2: Init client request */ @@ -432,43 +432,6 @@ serialize_serial_query_pdu(struct serial_query_pdu *pdu, unsigned char *buf) return ptr - buf; } -START_TEST(test_bad_length) -{ -#define BUF_SIZE 13 /* Max expected length */ - struct rtr_request request; - struct serial_query_pdu client_pdu; - struct pdu_metadata const *meta; - unsigned char buf[BUF_SIZE]; - int fd; - - pr_op_info("-- Bad Length --"); - - /* Prepare DB */ - init_db_full(); - - /* From serial 0: Init client request */ - init_serial_query(&request, &client_pdu, 0); - /* Less than what's specified */ - client_pdu.header.length--; - - ck_assert_int_gt(serialize_serial_query_pdu(&client_pdu, buf), 0); - fd = buffer2fd(buf, BUF_SIZE); - ck_assert_int_ge(fd, 0); - - /* Define expected server response */ - expected_pdu_add(PDU_TYPE_ERROR_REPORT); - - /* Run and validate, before handling */ - ck_assert_int_eq(-EINVAL, pdu_load(fd, NULL, &request, &meta)); - ck_assert_uint_eq(false, has_expected_pdus()); - - /* Clean up */ - vrps_destroy(); - close(fd); -#undef BUF_SIZE -} -END_TEST - Suite *pdu_suite(void) { Suite *suite; @@ -482,7 +445,6 @@ Suite *pdu_suite(void) error = tcase_create("Unhappy path cases"); tcase_add_test(error, test_bad_session_id); - tcase_add_test(error, test_bad_length); suite = suite_create("PDU Handler"); suite_add_tcase(suite, core); @@ -496,6 +458,8 @@ int main(void) SRunner *runner; int tests_failed; + log_setup(true); + suite = pdu_suite(); runner = srunner_create(suite); diff --git a/test/rtr/pdu_test.c b/test/rtr/pdu_test.c index 14606be1..284cab26 100644 --- a/test/rtr/pdu_test.c +++ b/test/rtr/pdu_test.c @@ -5,29 +5,24 @@ #include "common.c" #include "log.c" #include "impersonator.c" -#include "rtr/stream.c" #include "rtr/err_pdu.c" #include "rtr/pdu.c" #include "rtr/primitive_reader.c" #include "rtr/db/rtr_db_impersonator.c" /* - * Just a wrapper for `buffer2fd()`. Boilerplate one-liner. + * Used to be a wrapper for `buffer2fd()`, but that's no longer necessary. + * + * Converts the @buffer buffer into PDU @obj, using the @cb function. + * Also takes care of the header validation. */ #define BUFFER2FD(buffer, cb, obj) { \ struct pdu_header header; \ struct pdu_reader reader; \ - unsigned char read[sizeof(buffer)]; \ - int fd, err; \ \ - fd = buffer2fd(buffer, sizeof(buffer)); \ - ck_assert_int_ge(fd, 0); \ - ck_assert_int_eq(pdu_reader_init(&reader, fd, read, \ - sizeof(buffer), true), 0); \ - close(fd); \ + pdu_reader_init(&reader, buffer, sizeof(buffer)); \ init_pdu_header(&header); \ - err = cb(&header, &reader, obj); \ - ck_assert_int_eq(err, 0); \ + ck_assert_int_eq(0, cb(&header, &reader, obj)); \ assert_pdu_header(&(obj)->header); \ } @@ -124,20 +119,12 @@ assert_pdu_header(struct pdu_header *header) START_TEST(test_pdu_header_from_stream) { unsigned char input[] = { 0, 1, 2, 3, 4, 5, 6, 7 }; - unsigned char read[RTRPDU_HDR_LEN]; struct pdu_reader reader; struct pdu_header header; - int fd; - int err; - - fd = buffer2fd(input, sizeof(input)); - ck_assert_int_ge(fd, 0); - ck_assert_int_eq(pdu_reader_init(&reader, fd, read, RTRPDU_HDR_LEN, - true), 0); - close(fd); + + pdu_reader_init(&reader, input, ARRAY_LEN(input)); /* Read the header into its buffer. */ - err = pdu_header_from_reader(&reader, &header); - ck_assert_int_eq(err, 0); + ck_assert_int_eq(0, pdu_header_from_reader(&reader, &header)); ck_assert_uint_eq(header.protocol_version, 0); ck_assert_uint_eq(header.pdu_type, 1); @@ -243,7 +230,7 @@ START_TEST(test_error_report_from_stream) unsigned char input[] = { /* Sub-pdu length */ 0, 0, 0, 12, - /* Sub-pdu w header*/ + /* Sub-pdu with header*/ 1, 0, 2, 3, 0, 0, 0, 12, 1, 2, 3, 4, /* Error msg length */ 0, 0, 0, 5, @@ -252,49 +239,28 @@ START_TEST(test_error_report_from_stream) /* Garbage */ 1, 2, 3, 4, }; - struct error_report_pdu *pdu; - struct serial_notify_pdu *sub_pdu; + struct error_report_pdu pdu; + struct serial_notify_pdu sub_pdu; struct pdu_header sub_pdu_header; struct pdu_reader reader; - unsigned char sub_pdu_read[12]; - int fd, err; - pdu = malloc(sizeof(struct error_report_pdu)); - if (!pdu) - ck_abort_msg("PDU allocation failure"); + BUFFER2FD(input, error_report_from_stream, &pdu); - sub_pdu = malloc(sizeof(struct serial_notify_pdu)); - if (!sub_pdu) { - ck_abort_msg("SUB PDU allocation failure"); - free(pdu); - } + /* Get the erroneous PDU as a serial notify */ + pdu_reader_init(&reader, pdu.erroneous_pdu, pdu.error_pdu_length); - BUFFER2FD(input, error_report_from_stream, pdu); + ck_assert_int_eq(0, pdu_header_from_reader(&reader, &sub_pdu_header)); + ck_assert_int_eq(0, serial_notify_from_stream(&sub_pdu_header, &reader, + &sub_pdu)); - /* Get the erroneous PDU as a serial notify */ - fd = buffer2fd(pdu->erroneous_pdu, pdu->error_pdu_length); - ck_assert_int_ge(fd, 0); - ck_assert_int_eq(pdu_reader_init(&reader, fd, sub_pdu_read, - pdu->error_pdu_length, true), 0); - close(fd); - - ck_assert_int_eq(pdu_header_from_reader(&reader, &sub_pdu_header), 0); - err = serial_notify_from_stream(&sub_pdu_header, &reader, sub_pdu); - ck_assert_int_eq(err, 0); - - ck_assert_uint_eq(sub_pdu->header.protocol_version, 1); - ck_assert_uint_eq(sub_pdu->header.pdu_type, 0); - ck_assert_uint_eq(sub_pdu->header.m.reserved, 0x0203); - ck_assert_uint_eq(sub_pdu->header.length, 12); - ck_assert_uint_eq(sub_pdu->serial_number, 0x01020304); - ck_assert_str_eq(pdu->error_message, "hello"); - - /* - * Yes, this test memory leaks on failure. - * Not sure how to fix it without making a huge mess. - */ - error_report_destroy(pdu); - free(sub_pdu); + ck_assert_uint_eq(sub_pdu.header.protocol_version, 1); + ck_assert_uint_eq(sub_pdu.header.pdu_type, 0); + ck_assert_uint_eq(sub_pdu.header.m.reserved, 0x0203); + ck_assert_uint_eq(sub_pdu.header.length, 12); + ck_assert_uint_eq(sub_pdu.serial_number, 0x01020304); + ck_assert_str_eq(pdu.error_message, "hello"); + + free(pdu.error_message); } END_TEST @@ -302,14 +268,10 @@ START_TEST(test_interrupted) { unsigned char input[] = { 0, 1 }; struct pdu_reader reader; - unsigned char read[4]; - int fd, err; - - fd = buffer2fd(input, sizeof(input)); - ck_assert_int_ge(fd, 0); - err = pdu_reader_init(&reader, fd, read, 4, true); - close(fd); - ck_assert_int_eq(err, -EPIPE); + struct pdu_header header; + + pdu_reader_init(&reader, input, ARRAY_LEN(input)); + ck_assert_int_eq(-EPIPE, pdu_header_from_reader(&reader, &header)); } END_TEST diff --git a/test/rtr/primitive_reader_test.c b/test/rtr/primitive_reader_test.c index c60d789b..969e7712 100644 --- a/test/rtr/primitive_reader_test.c +++ b/test/rtr/primitive_reader_test.c @@ -7,7 +7,6 @@ #include "impersonator.c" #include "log.c" -#include "rtr/stream.c" #include "rtr/primitive_reader.c" /* @@ -17,24 +16,8 @@ static int __read_string(unsigned char *input, size_t size, rtr_char **result) { struct pdu_reader reader; - unsigned char read_bytes[size]; - int fd; - int err; - uint32_t usize; - - fd = buffer2fd(input, size); - if (fd < 0) - return fd; - - err = pdu_reader_init(&reader, fd, read_bytes, size, false); - if (err) - goto close; - - usize = size & 0xFFFF; - err = read_string(&reader, usize, result); -close: - close(fd); - return err; + pdu_reader_init(&reader, input, size); + return read_string(&reader, size & 0xFFFF, result); } static void @@ -52,19 +35,6 @@ test_read_string_success(unsigned char *input, size_t length, } } -static void -test_read_string_fail(unsigned char *input, size_t length, int expected) -{ - rtr_char *result; - int err; - - err = __read_string(input, length, &result); - ck_assert_int_eq(expected, err); - - if (!err) - free(result); -} - START_TEST(read_string_ascii) { unsigned char input[] = { 'a', 'b', 'c', 'd' }; @@ -102,8 +72,8 @@ END_TEST START_TEST(read_string_empty) { - unsigned char *input = { '\0' }; - test_read_string_fail(input, sizeof(input), -EFAULT); + unsigned char input[] = { 0, 0, 0, 0 }; + test_read_string_success(input, sizeof(input), ""); } END_TEST @@ -115,30 +85,6 @@ struct thread_param { #define WRITER_PATTERN "abcdefghijklmnopqrstuvwxyz0123456789" -/* - * Writes a @param_void->msg_size-sized RTR string in @param_void->fd. - */ -static void * -writer_thread_cb(void *param_void) -{ - struct thread_param *param; - rtr_char *pattern; - size_t pattern_len; - - param = param_void; - pattern = WRITER_PATTERN; - pattern_len = strlen(pattern); - - /* Write the string */ - for (; param->msg_size > pattern_len; param->msg_size -= pattern_len) { - param->err = write_exact(param->fd, UCHAR(pattern), pattern_len); - if (param->err) - return param; - } - param->err = write_exact(param->fd, UCHAR(pattern), param->msg_size); - return param; -} - /* * Checks that the string @str is made up of @expected_len characters composed * of the @WRITER_PATTERN pattern repeatedly. @@ -185,66 +131,36 @@ validate_massive_string(uint32_t expected_len, rtr_char *str) static void test_massive_string(uint32_t return_length, uint32_t full_string_length) { - int fd[2]; - pthread_t writer_thread; - struct thread_param *arg; + unsigned char *buffer; + rtr_char *pattern; + size_t pattern_len; + + size_t written; + size_t w; + struct pdu_reader reader; - unsigned char *read_bytes; rtr_char *result_string; - int err, err2, err3; - if (pipe(fd) == -1) - ck_abort_msg("pipe(fd) threw errcode %d", errno); - /* Need to close @fd[0] and @fd[1] from now on */ + buffer = malloc(full_string_length); + if (buffer == NULL) + ck_abort_msg("Out of memory."); - arg = malloc(sizeof(struct thread_param)); - if (!arg) { - close(fd[0]); - close(fd[1]); - ck_abort_msg("Thread parameter allocation failure"); - } - /* Need to free @arg from now on */ - - arg->fd = fd[1]; - arg->msg_size = full_string_length; - arg->err = 0; - - err = pthread_create(&writer_thread, NULL, writer_thread_cb, arg); - if (err) { - close(fd[0]); - close(fd[1]); - free(arg); - ck_abort_msg("pthread_create() threw errcode %d", err); - } - /* The writer thread owns @arg now; do not touch it until retrieved */ - do { - read_bytes = malloc(full_string_length); - err = pdu_reader_init(&reader, fd[0], read_bytes, - full_string_length, false); - if (err) - break; - err = read_string(&reader, full_string_length, &result_string); - } while(0); - - /* Need to free @result_string from now on */ - err2 = pthread_join(writer_thread, (void **)&arg); - /* @arg is now retrieved. */ - err3 = arg->err; - - close(fd[0]); - close(fd[1]); - free(arg); - free(read_bytes); - /* Don't need to close @fd[0], @fd[1] nor free @arg from now on */ - - if (err || err2 || err3) { - free(result_string); - ck_abort_msg("read_string:%d pthread_join:%d write_exact:%d", - err, err2, err3); + pattern = WRITER_PATTERN; + pattern_len = strlen(pattern); + for (written = 0; written < full_string_length; written += w) { + w = (full_string_length - written > pattern_len) + ? pattern_len + : (full_string_length - written); + memcpy(&buffer[written], pattern, w); } - /* This function now owns @result_string */ + pdu_reader_init(&reader, buffer, full_string_length); + ck_assert_int_eq(0, read_string(&reader, full_string_length, + &result_string)); + validate_massive_string(return_length, result_string); + + free(buffer); } START_TEST(read_string_massive) @@ -257,7 +173,6 @@ START_TEST(read_string_massive) test_massive_string(4097, 4097); test_massive_string(8000, 8000); test_massive_string(16000, 16000); - test_massive_string(786432000, 786432000); /* 750MB */ } END_TEST @@ -308,7 +223,6 @@ Suite *read_string_suite(void) limits = tcase_create("Limits"); tcase_add_test(limits, read_string_empty); tcase_add_test(limits, read_string_massive); - tcase_set_timeout(limits, 60); errors = tcase_create("Errors"); tcase_add_test(errors, read_string_null); diff --git a/test/rtr/stream.c b/test/rtr/stream.c deleted file mode 100644 index 7526aa88..00000000 --- a/test/rtr/stream.c +++ /dev/null @@ -1,62 +0,0 @@ -#include "stream.h" - -#include -#include -#include -#include - -#include "common.h" - -/* - * Writes exactly @length bytes from @buffer to the file descriptor @fd. - * All or nothing. - * - * The result is zero on success, nonzero on failure. - */ -int -write_exact(int fd, unsigned char *buffer, size_t length) -{ - size_t written; - int written_now; - - for (written = 0; written < length; written += written_now) { - written_now = write(fd, buffer + written, length - written); - if (written_now == -1) - return errno; - } - - return 0; -} - -/* - * "Converts" the buffer @buffer (sized @size) to a file descriptor (FD). - * You will get @buffer if you `read()` the FD. - * - * If the result is not negative, then you're receiving the resulting FD. - * If the result is negative, it's an error code. - * - * Note that you need to close the FD when you're done reading it. - */ -int -buffer2fd(unsigned char *buffer, size_t size) -{ - int fd[2]; - int err; - - if (pipe(fd) == -1) { - err = errno; - warn("Pipe creation failed"); - return ENSURE_NEGATIVE(err); - } - - err = write_exact(fd[1], buffer, size); - close(fd[1]); - if (err) { - errno = err; - warn("Pipe write failed"); - close(fd[0]); - return ENSURE_NEGATIVE(err); - } - - return fd[0]; -} diff --git a/test/rtr/stream.h b/test/rtr/stream.h deleted file mode 100644 index a3fc2c8f..00000000 --- a/test/rtr/stream.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef TEST_RTR_STREAM_H_ -#define TEST_RTR_STREAM_H_ - -#include - -int write_exact(int, unsigned char *, size_t); -int buffer2fd(unsigned char *, size_t); - -#endif /* TEST_RTR_STREAM_H_ */ diff --git a/test/serial_test.c b/test/serial_test.c new file mode 100644 index 00000000..e3c253e3 --- /dev/null +++ b/test/serial_test.c @@ -0,0 +1,153 @@ +#include +#include + +#include "serial.c" + +START_TEST(pivot_0) +{ + /* Pivot: Zero */ + ck_assert_int_eq(false, serial_lt(0, 0)); + + ck_assert_int_eq(true, serial_lt(0, 1)); + ck_assert_int_eq(true, serial_lt(0, 2)); + ck_assert_int_eq(true, serial_lt(0, 3)); + ck_assert_int_eq(true, serial_lt(0, 4)); + + ck_assert_int_eq(true, serial_lt(0, 0x7FFFFFFCu)); + ck_assert_int_eq(true, serial_lt(0, 0x7FFFFFFDu)); + ck_assert_int_eq(true, serial_lt(0, 0x7FFFFFFEu)); + ck_assert_int_eq(true, serial_lt(0, 0x7FFFFFFFu)); + + ck_assert_int_eq(false, serial_lt(0, 0x80000001u)); + ck_assert_int_eq(false, serial_lt(0, 0x80000002u)); + ck_assert_int_eq(false, serial_lt(0, 0x80000003u)); + ck_assert_int_eq(false, serial_lt(0, 0x80000004u)); + + ck_assert_int_eq(false, serial_lt(0, 0xFFFFFFFCu)); + ck_assert_int_eq(false, serial_lt(0, 0xFFFFFFFDu)); + ck_assert_int_eq(false, serial_lt(0, 0xFFFFFFFEu)); + ck_assert_int_eq(false, serial_lt(0, 0xFFFFFFFFu)); +} +END_TEST + +START_TEST(pivot_mid) +{ + ck_assert_int_eq(false, serial_lt(0x80000000u, 0x80000000u)); + + ck_assert_int_eq(true, serial_lt(0x80000000u, 0x80000001u)); + ck_assert_int_eq(true, serial_lt(0x80000000u, 0x80000002u)); + ck_assert_int_eq(true, serial_lt(0x80000000u, 0x80000003u)); + ck_assert_int_eq(true, serial_lt(0x80000000u, 0x80000004u)); + + ck_assert_int_eq(true, serial_lt(0x80000000u, 0xFFFFFFFCu)); + ck_assert_int_eq(true, serial_lt(0x80000000u, 0xFFFFFFFDu)); + ck_assert_int_eq(true, serial_lt(0x80000000u, 0xFFFFFFFEu)); + ck_assert_int_eq(true, serial_lt(0x80000000u, 0xFFFFFFFFu)); + + ck_assert_int_eq(false, serial_lt(0x80000000u, 1)); + ck_assert_int_eq(false, serial_lt(0x80000000u, 2)); + ck_assert_int_eq(false, serial_lt(0x80000000u, 3)); + ck_assert_int_eq(false, serial_lt(0x80000000u, 4)); + + ck_assert_int_eq(false, serial_lt(0x80000000u, 0x7FFFFFFCu)); + ck_assert_int_eq(false, serial_lt(0x80000000u, 0x7FFFFFFDu)); + ck_assert_int_eq(false, serial_lt(0x80000000u, 0x7FFFFFFEu)); + ck_assert_int_eq(false, serial_lt(0x80000000u, 0x7FFFFFFFu)); +} +END_TEST + +START_TEST(pivot_max) +{ + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0xFFFFFFFFu)); + + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 1)); + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 2)); + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 3)); + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 4)); + + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 0x7FFFFFFBu)); + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 0x7FFFFFFCu)); + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 0x7FFFFFFDu)); + ck_assert_int_eq(true, serial_lt(0xFFFFFFFFu, 0x7FFFFFFEu)); + + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0x80000000u)); + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0x80000001u)); + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0x80000002u)); + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0x80000003u)); + + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0xFFFFFFFBu)); + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0xFFFFFFFCu)); + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0xFFFFFFFDu)); + ck_assert_int_eq(false, serial_lt(0xFFFFFFFFu, 0xFFFFFFFEu)); +} +END_TEST + +START_TEST(rfc1982_section_5_2) +{ + const serial_t multiplier = 0xFFFFFFFFu / 256; + /* + * These are the examples from rfc1982#section-5.2, adjusted to the + * larger SERIAL_BITS. + */ + const serial_t N0 = 0; + const serial_t N1 = 1 * multiplier; + const serial_t N44 = 44 * multiplier; + const serial_t N100 = 100 * multiplier; + const serial_t N200 = 200 * multiplier; + const serial_t N255 = 255 * multiplier; + + ck_assert_int_eq(true, serial_lt(N0, N1)); + ck_assert_int_eq(true, serial_lt(N0, N44)); + ck_assert_int_eq(true, serial_lt(N0, N100)); + ck_assert_int_eq(true, serial_lt(N44, N100)); + ck_assert_int_eq(true, serial_lt(N100, N200)); + ck_assert_int_eq(true, serial_lt(N200, N255)); + ck_assert_int_eq(true, serial_lt(N255, N0)); + ck_assert_int_eq(true, serial_lt(N255, N100)); + ck_assert_int_eq(true, serial_lt(N200, N0)); + ck_assert_int_eq(true, serial_lt(N200, N44)); + + ck_assert_int_eq(false, serial_lt(N1, N0)); + ck_assert_int_eq(false, serial_lt(N44, N0)); + ck_assert_int_eq(false, serial_lt(N100, N0)); + ck_assert_int_eq(false, serial_lt(N100, N44)); + ck_assert_int_eq(false, serial_lt(N200, N100)); + ck_assert_int_eq(false, serial_lt(N255, N200)); + ck_assert_int_eq(false, serial_lt(N0, N255)); + ck_assert_int_eq(false, serial_lt(N100, N255)); + ck_assert_int_eq(false, serial_lt(N0, N200)); + ck_assert_int_eq(false, serial_lt(N44, N200)); +} +END_TEST + +Suite *serial_suite(void) +{ + Suite *suite; + TCase *core; + + core = tcase_create("Core"); + tcase_add_test(core, pivot_0); + tcase_add_test(core, pivot_mid); + tcase_add_test(core, pivot_max); + tcase_add_test(core, rfc1982_section_5_2); + + suite = suite_create("serial"); + suite_add_tcase(suite, core); + return suite; +} + +int main(void) +{ + Suite *suite; + SRunner *runner; + int tests_failed; + + suite = serial_suite(); + + runner = srunner_create(suite); + srunner_run_all(runner, CK_NORMAL); + tests_failed = srunner_ntests_failed(runner); + srunner_free(runner); + + return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +}