From: Alberto Leiva Popper Date: Tue, 22 Apr 2025 22:03:30 +0000 (-0600) Subject: Update glue between main loop and rsync X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a98fafbb1ea714f9533114bf1ab507b7d2e53287;p=thirdparty%2FFORT-validator.git Update glue between main loop and rsync 1. Propagate EBUSY so the main loop suspends the task (and takes care of other tasks) while the rsync runs. 2. Spawner now responds rsync URL and path to parent, so the cache can update the download state. --- diff --git a/src/asn1/asn1c/OCTET_STRING.c b/src/asn1/asn1c/OCTET_STRING.c index 39a2746e..8ed51f41 100644 --- a/src/asn1/asn1c/OCTET_STRING.c +++ b/src/asn1/asn1c/OCTET_STRING.c @@ -947,6 +947,12 @@ OCTET_STRING_new_fromBuf(const asn_TYPE_descriptor_t *td, const char *str, return st; } +char * +OCTET_STRING_toString(OCTET_STRING_t *os) +{ + return pstrndup((char *)os->buf, os->size); +} + json_t * OCTET_STRING_to_json(const asn_TYPE_descriptor_t *td, OCTET_STRING_t const *ber) { diff --git a/src/asn1/asn1c/OCTET_STRING.h b/src/asn1/asn1c/OCTET_STRING.h index 1506cb92..bd0e8fc1 100644 --- a/src/asn1/asn1c/OCTET_STRING.h +++ b/src/asn1/asn1c/OCTET_STRING.h @@ -65,6 +65,7 @@ int OCTET_STRING_fromBuf(OCTET_STRING_t *s, const char *str, int size); OCTET_STRING_t *OCTET_STRING_new_fromBuf(const asn_TYPE_descriptor_t *td, const char *str, int size); +char *OCTET_STRING_toString(OCTET_STRING_t *); json_t *OCTET_STRING_to_json(const asn_TYPE_descriptor_t *, OCTET_STRING_t const *); diff --git a/src/cache.c b/src/cache.c index 6d150a54..816c9850 100644 --- a/src/cache.c +++ b/src/cache.c @@ -730,14 +730,8 @@ static int dl_rsync(struct cache_node *module) { int error; - - // XXX go pick some other task on zero error = rsync_queue(module->map.url, module->map.path); - if (error) - return error; - - module->success_ts = module->attempt_ts; - return 0; + return error ? error : EBUSY; } static int @@ -1178,6 +1172,32 @@ cache_commit_file(struct cache_mapping *map) mutex_unlock(&commits_lock); } +void +rsync_finished(char const *url, char const *path) +{ + struct cache_node *node; + + mutex_lock(&cache.rsync.lock); + + node = find_node(&cache.rsync, url, strlen(url)); + if (node == NULL) { + mutex_unlock(&cache.rsync.lock); + pr_op_err("rsync '%s -> %s' finished, but cache node does not exist.", + url, path); + return; + } + if (node->state != DLS_ONGOING) + pr_op_warn("rsync '%s -> %s' finished, but existing node was not in ONGOING state.", + url, path); + + node->state = DLS_FRESH; + node->dlerr = 0; + node->success_ts = node->attempt_ts; + mutex_unlock(&cache.rsync.lock); + + task_wakeup_dormants(); +} + char const * cage_rpkiNotify(struct cache_cage *cage) { diff --git a/src/log.h b/src/log.h index 1544a187..453899d2 100644 --- a/src/log.h +++ b/src/log.h @@ -117,9 +117,9 @@ int incidence(enum incidence_id, const char *, ...) CHECK_FORMAT(2, 3); #define DBG_COLOR_RESET "\x1B[0m" #define PR_DEBUG \ printf(DBG_COLOR "%s:%d (%s())" DBG_COLOR_RESET "\n", \ - __FILE__, __LINE__, __func__) + __FILE__, __LINE__, __func__) #define PR_DEBUG_MSG(msg, ...) \ printf(DBG_COLOR "%s:%d (%s()): " msg DBG_COLOR_RESET "\n", \ - __FILE__, __LINE__, __func__, ##__VA_ARGS__) + __FILE__, __LINE__, __func__, ##__VA_ARGS__) #endif /* SRC_LOG_H_ */ diff --git a/src/main.c b/src/main.c index 6b13e38a..0917288b 100644 --- a/src/main.c +++ b/src/main.c @@ -120,6 +120,7 @@ main(int argc, char **argv) int error; /* Initializations */ + /* (Do not start any threads until after rsync_setup() has forked.) */ error = log_setup(); if (error) @@ -128,7 +129,7 @@ main(int argc, char **argv) if (error) goto revert_log; - rsync_setup(NULL, NULL); /* Spawn rsync spawner ASAP */ + rsync_setup(NULL, NULL); /* Fork rsync spawner ASAP */ register_signal_handlers(); error = thvar_init(); diff --git a/src/object/tal.c b/src/object/tal.c index 85347444..8ec75d5d 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -170,16 +170,19 @@ try_urls(struct tal *tal, bool (*url_is_protocol)(char const *), { char **url; struct cache_mapping map; + int error; ARRAYLIST_FOREACH(&tal->urls, url) { map.url = *url; if (!url_is_protocol(map.url)) continue; - // XXX if this is rsync, it seems this will queue and fail map.path = get_path(*url); if (!map.path) continue; - if (validate_ta(tal, &map) != 0) + error = validate_ta(tal, &map); + if (error == EBUSY) + return EBUSY; + if (error) continue; cache_commit_file(&map); return 0; @@ -201,14 +204,18 @@ traverse_tal(char const *tal_path) goto end1; /* Online attempts */ - if (try_urls(&tal, url_is_https, cache_refresh_by_url) == 0) + error = try_urls(&tal, url_is_https, cache_refresh_by_url); + if (!error || error == EBUSY) goto end2; - if (try_urls(&tal, url_is_rsync, cache_refresh_by_url) == 0) + error = try_urls(&tal, url_is_rsync, cache_refresh_by_url); + if (!error || error == EBUSY) goto end2; /* Offline fallback attempts */ - if (try_urls(&tal, url_is_https, cache_get_fallback) == 0) + error = try_urls(&tal, url_is_https, cache_get_fallback); + if (!error || error == EBUSY) goto end2; - if (try_urls(&tal, url_is_rsync, cache_get_fallback) == 0) + error = try_urls(&tal, url_is_rsync, cache_get_fallback); + if (!error || error == EBUSY) goto end2; pr_op_err("None of the TAL URIs yielded a successful traversal."); @@ -233,8 +240,16 @@ pick_up_work(void *arg) } break; case VTT_TAL: - if (traverse_tal(task->u.tal) != 0) + switch (traverse_tal(task->u.tal)) { + case 0: + break; + case EBUSY: + task_requeue_dormant(task); + task = NULL; + break; + default: task_stop(); + } break; } diff --git a/src/rsync.c b/src/rsync.c index 772fc7a9..1bde288a 100644 --- a/src/rsync.c +++ b/src/rsync.c @@ -14,12 +14,14 @@ #include "config.h" #include "log.h" #include "types/array.h" +#include "types/map.h" #include "asn1/asn1c/ber_decoder.h" #include "asn1/asn1c/der_encoder.h" #include "asn1/asn1c/RsyncRequest.h" #define RSP /* rsync spawner prefix */ "[rsync spawner] " +#define SRTP "[spawner response thread] " static char const *rsync_args[20]; /* Last must be NULL */ @@ -33,24 +35,17 @@ static const int WRFD = 1; static pid_t spawner; /* The subprocess that spawns rsync runs */ -static int readfd; /* Parent's end of the spawner-to-parent pipe */ -static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER; - -static int writefd; /* Parent's end of the parent-to-spawner pipe */ -static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER; - /* - * "Spawner to parent" socket. - * Socket used by the spawner to communicate with the parent. + * "Parent-spawner socket." + * Used by both parent and spawner to speak with each other. */ -struct s2p_socket { - /* Spawner's end of the parent-to-spawner pipe */ - struct read_stream rd; - /* Spawner's end of the spawner-to-parent pipe */ - int wr; - /* Scratchpad buffer for stream read */ - struct RsyncRequest *rr; -}; +struct { + struct read_stream rd; /* Read fd and extra tools */ + int wr; /* Write fd */ + + struct RsyncRequest *rr; /* Scratchpad buffer for stream read */ + pthread_mutex_t wrlock; /* To sync writes */ +} pssk; struct rsync_task { int pid; @@ -81,32 +76,98 @@ struct rsync_tasks { ) #endif +/* Spawner response thread; The thread that listens to spawner responses. */ +static pthread_t srt; + static void -void_task(struct rsync_task *task, struct s2p_socket *s2p) +__spsk_init(int rdfd, int wrfd) { - static const unsigned char one = 1; + rstream_init(&pssk.rd, rdfd, 512); + pssk.wr = wrfd; + pssk.rr = NULL; + pthread_mutex_init(&pssk.wrlock, NULL); +} - free(task->url); - free(task->path); - free(task); +static void +spsk_init(int rdpipes[2], int wrpipes[2]) +{ + close(wrpipes[RDFD]); + close(rdpipes[WRFD]); + __spsk_init(rdpipes[RDFD], wrpipes[WRFD]); +} - if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) { - pr_op_err(RSP "Cannot message parent process: %s", strerror(errno)); +static void +spsk_cleanup(void) +{ + rstream_close(&pssk.rd, true); + if (pssk.wr != -1) { + close(pssk.wr); + pssk.wr = -1; + } + ASN_STRUCT_FREE(asn_DEF_RsyncRequest, pssk.rr); + pthread_mutex_destroy(&pssk.wrlock); +} + +static int +write_cb(const void *buffer, size_t size, void *arg) +{ + return stream_full_write(pssk.wr, buffer, size); +} + +static void +notify_parent(struct rsync_task *task) +{ + struct RsyncRequest req; + asn_enc_rval_t result; - close(s2p->wr); - s2p->wr = -1; - /* Can't signal finished rsyncs anymore; reject future ones. */ - rstream_close(&s2p->rd, true); + if (pssk.wr == -1) { + pr_op_err(RSP "Cannot message parent process: " + "The socket is closed."); + return; + } + + /* + * TODO (asn1) these error messages are too generic. + * The asn1 code needs better error reporting. + */ + + if (RsyncRequest_init(&req, task->url, task->path) < 0) { + pr_op_err(RSP "Cannot message parent process: " + "The request object cannot be created"); + return; + } + + result = der_encode(&asn_DEF_RsyncRequest, &req, write_cb, NULL); + if (result.encoded == -1) { + pr_op_err(RSP "Cannot message parent process: Unknown error"); + /* TODO (asn1) Do this if the error was I/O: + * close(spsk.wr); + * spsk.wr = -1; + * // Can't signal finished rsyncs anymore; reject future ones. + * rstream_close(&spsk.rd, true); + */ } + + pr_op_debug(RSP "Parent notified; sent %zd bytes.", result.encoded); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req); +} + +static void +void_task(struct rsync_task *task) +{ + notify_parent(task); + + free(task->url); + free(task->path); + free(task); } static void -finish_task(struct rsync_tasks *tasks, struct rsync_task *task, - struct s2p_socket *s2p) +finish_task(struct rsync_tasks *tasks, struct rsync_task *task) { LIST_REMOVE(task, lh); tasks->a--; - void_task(task, s2p); + void_task(task); } static void @@ -248,12 +309,12 @@ fork_rsync(struct rsync_task *task) static void activate_task(struct rsync_tasks *tasks, struct rsync_task *task, - struct s2p_socket *s2p, struct timespec *now) + struct timespec *now) { ts_add(&task->expiration, now, 1000 * config_rsync_timeout()); if (fork_rsync(task) != 0) { - void_task(task, s2p); + void_task(task); return; } @@ -262,94 +323,92 @@ activate_task(struct rsync_tasks *tasks, struct rsync_task *task, } static void -post_task(struct s2p_socket *s2p, struct rsync_tasks *tasks, +post_task(struct cache_mapping *map, struct rsync_tasks *tasks, struct timespec *now) { struct rsync_task *task; task = pzalloc(sizeof(struct rsync_task)); - task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size); - task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size); + task->url = map->url; + task->path = map->path; if (tasks->a >= config_rsync_max()) { LIST_INSERT_HEAD(&tasks->queued, task, lh); pr_op_debug(RSP "Queued new task."); } else { - activate_task(tasks, task, s2p, now); + activate_task(tasks, task, now); pr_op_debug(RSP "Got new task: %d", task->pid); } } -static void -read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks, - struct timespec *now) +static int +next_task(struct cache_mapping *result) { - struct read_stream *in; - ssize_t consumed; - size_t offset; asn_dec_rval_t decres; + ssize_t consumed; int error; - in = &s2p->rd; - - do { - consumed = read(in->fd, in->buffer + in->len, - in->capacity - in->len); - if (consumed < 0) { - error = errno; - if (error != EAGAIN && error != EWOULDBLOCK) - rstream_close(in, true); - return; - } - if (consumed == 0) { /* EOS */ - rstream_close(in, true); - return; +again: if (pssk.rd.len > 0) { + decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&pssk.rr, + pssk.rd.buffer, pssk.rd.len); + + memmove(pssk.rd.buffer, pssk.rd.buffer + decres.consumed, + pssk.rd.len - decres.consumed); + pssk.rd.len -= decres.consumed; + + switch (decres.code) { + case RC_OK: + result->url = OCTET_STRING_toString(&pssk.rr->url); + result->path = OCTET_STRING_toString(&pssk.rr->path); + ASN_STRUCT_RESET(asn_DEF_RsyncRequest, pssk.rr); + return 0; + case RC_WMORE: + break; + case RC_FAIL: + rstream_close(&pssk.rd, true); + return EINVAL; } + } - in->len += consumed; - - for (offset = 0; offset < in->len;) { - decres = ber_decode(&asn_DEF_RsyncRequest, - (void **)&s2p->rr, - in->buffer + offset, in->len - offset); - offset += decres.consumed; - switch (decres.code) { - case RC_OK: - post_task(s2p, tasks, now); - ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr); - break; - case RC_WMORE: - goto break_for; - case RC_FAIL: - rstream_close(in, true); - return; - } - } + if (pssk.rd.fd == -1) + return EBADF; + consumed = read(pssk.rd.fd, pssk.rd.buffer + pssk.rd.len, + pssk.rd.capacity - pssk.rd.len); + if (consumed < 0) { + error = errno; + if (error != EAGAIN && error != EWOULDBLOCK) + rstream_close(&pssk.rd, true); + return error; + } + if (consumed == 0) { /* EOS */ + rstream_close(&pssk.rd, true); + return ENOENT; + } -break_for: if (offset > in->len) - pr_crit("read_tasks off:%zu len:%zu", offset, in->len); - in->len -= offset; - memmove(in->buffer, in->buffer + offset, in->len); - } while (true); + pssk.rd.len += consumed; + goto again; } static void -handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd, - struct rsync_tasks *tasks, struct timespec *now) +handle_parent_fd(struct pollfd *pfd, struct rsync_tasks *tasks, + struct timespec *now) { - if (s2p->rd.fd == -1) + struct cache_mapping map; + + if (pssk.rd.fd == -1) return; if (pfd->revents & POLLNVAL) { pr_op_err(RSP "bad parent fd: %i", pfd->fd); - rstream_close(&s2p->rd, false); + rstream_close(&pssk.rd, false); } else if (pfd->revents & POLLERR) { pr_op_err(RSP "Generic error during parent fd poll."); - rstream_close(&s2p->rd, true); + rstream_close(&pssk.rd, true); } else if (pfd->revents & (POLLIN | POLLHUP)) { - read_tasks(s2p, tasks, now); + while (next_task(&map) == 0) + post_task(&map, tasks, now); } } @@ -489,8 +548,7 @@ kill_subprocess(struct rsync_task *task) } static void -activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p, - struct timespec *now) +activate_queued(struct rsync_tasks *tasks, struct timespec *now) { struct rsync_task *task; @@ -501,13 +559,13 @@ activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p, pr_op_debug(RSP "Activating queued task %s -> %s.", task->url, task->path); LIST_REMOVE(task, lh); - activate_task(tasks, task, s2p, now); + activate_task(tasks, task, now); } /* Returns true if the task died. */ static bool maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task, - struct s2p_socket *s2p, struct timespec *now) + struct timespec *now) { struct timespec epoch; @@ -518,19 +576,15 @@ maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task, pr_op_debug(RSP "Task %d ran out of time.", task->pid); kill_subprocess(task); wait_subprocess("rsync", task->pid); - finish_task(tasks, task, s2p); - activate_queued(tasks, s2p, now); + finish_task(tasks, task); + activate_queued(tasks, now); return true; } static int -spawner_run( - int request_fd, /* Requests from parent to us (spawner) */ - int response_fd /* Responses from us (spawner) to parent */ -) { - struct s2p_socket s2p; /* Channel to parent */ - +spawner_run(void) +{ struct pollfd *pfds; /* Channels to children */ size_t p, pfds_count; struct timespec now, expiration; @@ -543,10 +597,6 @@ spawner_run( int error; - rstream_init(&s2p.rd, request_fd, 1024); - s2p.wr = response_fd; - s2p.rr = NULL; - LIST_INIT(&tasks.active); LIST_INIT(&tasks.queued); tasks.a = 0; @@ -560,7 +610,7 @@ spawner_run( * odd: stdouts * even > 0: stderrs */ - pfds = create_pfds(s2p.rd.fd, &tasks.active, tasks.a); + pfds = create_pfds(pssk.rd.fd, &tasks.active, tasks.a); pfds_count = 2 * tasks.a + 1; expiration.tv_sec = now.tv_sec + 10; expiration.tv_nsec = now.tv_nsec; @@ -583,14 +633,14 @@ spawner_run( if (events == 0) { /* Timeout */ pr_op_debug(RSP "Woke up because of timeout."); LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) - maybe_expire(&tasks, task, &s2p, &now); + maybe_expire(&tasks, task, &now); goto cont; } pr_op_debug(RSP "Woke up because of input."); p = 1; LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) { - if (maybe_expire(&tasks, task, &s2p, &now)) + if (maybe_expire(&tasks, task, &now)) continue; if (handle_rsync_fd(&pfds[p], p)) { @@ -609,29 +659,27 @@ spawner_run( pr_op_debug(RSP "Both stdout & stderr are closed; ending task %d.", task->pid); wait_subprocess("rsync", task->pid); - finish_task(&tasks, task, &s2p); - activate_queued(&tasks, &s2p, &now); + finish_task(&tasks, task); + activate_queued(&tasks, &now); } } - handle_parent_fd(&s2p, &pfds[0], &tasks, &now); + handle_parent_fd(&pfds[0], &tasks, &now); cont: free(pfds); - } while ((s2p.rd.fd != -1 || tasks.a > 0)); + } while ((pssk.rd.fd != -1 || tasks.a > 0)); pr_op_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up..."); LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) { kill_subprocess(task); wait_subprocess("rsync", task->pid); - finish_task(&tasks, task, &s2p); + finish_task(&tasks, task); } LIST_FOREACH_SAFE(task, &tasks.queued, lh, tmp) { LIST_REMOVE(task, lh); - void_task(task, &s2p); + void_task(task); } - rstream_close(&s2p.rd, true); - if (s2p.wr != -1) - close(s2p.wr); + spsk_cleanup(); free_rpki_config(); log_teardown(); @@ -646,19 +694,19 @@ nonblock_pipe(int *fds) if (pipe(fds) < 0) { error = errno; - pr_op_err("Cannot create pipe: %s", strerror(error)); + pr_op_warn("Cannot create pipe: %s", strerror(error)); return error; } flags = fcntl(fds[RDFD], F_GETFL); if (flags < 0) { error = errno; - pr_op_err("Cannot retrieve pipe flags: %s", strerror(error)); + pr_op_warn("Cannot retrieve pipe flags: %s", strerror(error)); goto cancel; } if (fcntl(fds[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) { error = errno; - pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(error)); + pr_op_warn("Cannot enable O_NONBLOCK: %s", strerror(error)); goto cancel; } @@ -669,6 +717,19 @@ cancel: close(fds[RDFD]); return error; } +static void * +rcv_spawner_responses(void *arg) +{ + struct cache_mapping map = { 0 }; + + while (next_task(&map) == 0) { + rsync_finished(map.url, map.path); + map_cleanup(&map); + } + + return NULL; +} + void rsync_setup(char const *program, ...) { @@ -678,6 +739,7 @@ rsync_setup(char const *program, ...) va_list args; array_index i; char const *arg; + int error; if (program != NULL) { rsync_args[0] = arg = program; @@ -712,29 +774,38 @@ rsync_setup(char const *program, ...) if (nonblock_pipe(parent2spawner) != 0) goto fail1; - if (nonblock_pipe(spawner2parent) != 0) + if (pipe(spawner2parent) < 0) { + pr_op_warn("Cannot create pipe: %s", strerror(errno)); goto fail2; + } fflush(stdout); fflush(stderr); spawner = fork(); if (spawner < 0) { - pr_op_err("Cannot fork rsync spawner: %s", strerror(errno)); + pr_op_warn("Cannot fork rsync spawner: %s", strerror(errno)); goto fail3; } if (spawner == 0) { /* Client code */ - close(parent2spawner[WRFD]); - close(spawner2parent[RDFD]); - exit(spawner_run(parent2spawner[RDFD], spawner2parent[WRFD])); + spsk_init(parent2spawner, spawner2parent); + exit(spawner_run()); } /* Parent code */ - close(parent2spawner[RDFD]); - close(spawner2parent[WRFD]); - readfd = spawner2parent[RDFD]; - writefd = parent2spawner[WRFD]; + /* (Threads can now be spawned.) */ + + spsk_init(spawner2parent, parent2spawner); + + error = pthread_create(&srt, NULL, rcv_spawner_responses, NULL); + if (error) { + pr_op_warn("Cannot start rsync spawner listener thread: %s", + strerror(error)); + spsk_cleanup(); + goto fail1; + } + return; fail3: close(spawner2parent[RDFD]); @@ -742,16 +813,15 @@ fail3: close(spawner2parent[RDFD]); fail2: close(parent2spawner[RDFD]); close(parent2spawner[WRFD]); fail1: pr_op_warn("rsync will not be available."); - readfd = writefd = -1; -} - -static int -send_to_spawner(const void *buffer, size_t size, void *arg) -{ - return stream_full_write(writefd, buffer, size); + pssk.rd.fd = pssk.wr = -1; } -/* Queues rsync; doesn't wait. Call rsync_finished() later. */ +/* + * Queues rsync; doesn't wait. + * + * Whenever at least one rsync is finished, the function rsync_finished() + * will be automatically called. + */ int rsync_queue(char const *url, char const *path) { @@ -762,48 +832,30 @@ rsync_queue(char const *url, char const *path) if (RsyncRequest_init(&req, url, path) < 0) return EINVAL; - mutex_lock(&writelock); + mutex_lock(&pssk.wrlock); - if (writefd == -1) { + if (pssk.wr == -1) { error = EIO; goto end; } - result = der_encode(&asn_DEF_RsyncRequest, &req, send_to_spawner, NULL); + result = der_encode(&asn_DEF_RsyncRequest, &req, write_cb, NULL); if (result.encoded == -1) { - close(writefd); - writefd = -1; + close(pssk.wr); + pssk.wr = -1; error = EIO; goto end; } error = 0; -end: mutex_unlock(&writelock); +end: mutex_unlock(&pssk.wrlock); ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req); return error; } -/* Returns the number of rsyncs that have ended since the last query */ -unsigned int -rsync_finished(void) -{ - unsigned char buf[8]; - ssize_t result; - - mutex_lock(&readlock); - result = (readfd != -1) ? read(readfd, buf, sizeof(buf)) : 0; - mutex_unlock(&readlock); - - return (result >= 0) ? result : 0; -} - void rsync_teardown(void) { - if (readfd != -1) - close(readfd); - if (writefd != -1) - close(writefd); - readfd = writefd = -1; + spsk_cleanup(); wait_subprocess("rsync spawner", spawner); } diff --git a/src/rsync.h b/src/rsync.h index bdfc7df8..2d85f147 100644 --- a/src/rsync.h +++ b/src/rsync.h @@ -3,7 +3,7 @@ void rsync_setup(char const *, ...); int rsync_queue(char const *, char const *); -unsigned int rsync_finished(void); +void rsync_finished(char const *, char const *); void rsync_teardown(void); #endif /* SRC_RSYNC_RSYNC_H_ */ diff --git a/test/cache_test.c b/test/cache_test.c index d6007c37..35fee3b0 100644 --- a/test/cache_test.c +++ b/test/cache_test.c @@ -48,6 +48,9 @@ touch_file(char const *dir) ck_assert_int_eq(0, system(cmd)); } +static char *queued_url; +static char *queued_path; + int rsync_queue(char const *url, char const *path) { @@ -62,6 +65,9 @@ rsync_queue(char const *url, char const *path) ck_assert_int_eq(0, mkdir(path, CACHE_FILEMODE)); touch_file(path); + queued_url = pstrdup(url); + queued_path = pstrdup(path); + return 0; } @@ -116,6 +122,22 @@ run_dl_rsync(char *caRepository, int expected_err, unsigned int expected_calls) return cage; } +static void +finish_rsync(void) +{ + rsync_finished(queued_url, queued_path); + free(queued_url); + free(queued_path); +} + +static struct cache_cage * +rsync_dance(char *url) +{ + ck_assert_ptr_eq(NULL, run_dl_rsync(url, EBUSY, 1)); + finish_rsync(); + return run_dl_rsync(url, 0, 0); +} + static void run_dl_https(char const *url, unsigned int expected_calls, char const *expected_result) @@ -389,7 +411,7 @@ START_TEST(test_cache_download_rsync) setup_test(); printf("==== Startup ====\n"); - cage = run_dl_rsync("rsync://a.b.c/d", 0, 1); + cage = rsync_dance("rsync://a.b.c/d"); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL); ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL); @@ -424,7 +446,7 @@ START_TEST(test_cache_download_rsync) * and there would be consequences for violating it. */ printf("==== rsync truncated ====\n"); - cage = run_dl_rsync("rsync://x.y.z/m/n/o", 0, 1); + cage = rsync_dance("rsync://x.y.z/m/n/o"); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://x.y.z/m", "rsync/1", NULL); ck_cage(cage, "rsync://x.y.z/m/n/o", "rsync/1/n/o", NULL); @@ -433,7 +455,7 @@ START_TEST(test_cache_download_rsync) free(cage); printf("==== Sibling ====\n"); - cage = run_dl_rsync("rsync://a.b.c/e/f", 0, 1); + cage = rsync_dance("rsync://a.b.c/e/f"); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://a.b.c/e", "rsync/2", NULL); ck_cage(cage, "rsync://a.b.c/e/f/x/y/z", "rsync/2/f/x/y/z", NULL); @@ -456,7 +478,7 @@ START_TEST(test_cache_download_rsync_error) printf("==== Startup ====\n"); dl_error = 0; - free(run_dl_rsync("rsync://a.b.c/d", 0, 1)); + free(rsync_dance("rsync://a.b.c/d")); dl_error = EINVAL; ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 1)); ck_cache_rsync(nodes); @@ -761,7 +783,11 @@ START_TEST(test_context) * but does not provide RRDP as an option. */ sias.rpkiNotify = NULL; + + ck_assert_int_eq(EBUSY, cache_refresh_by_sias(&sias, &cage)); + finish_rsync(); ck_assert_int_eq(0, cache_refresh_by_sias(&sias, &cage)); + ck_assert_ptr_eq(NULL, cage->rpkiNotify); ck_assert_str_eq(FILE_RSYNC_PATH, cage_map_file(cage, FILE_URL)); ck_assert_int_eq(false, cage_disable_refresh(cage)); diff --git a/test/mock.c b/test/mock.c index d214670e..f79e3199 100644 --- a/test/mock.c +++ b/test/mock.c @@ -81,7 +81,7 @@ void pr_crit(const char *format, ...) { va_list args; - fprintf(stderr, "pr_crit() called!\n"); + fprintf(stderr, "pr_crit() called! "); va_start(args, format); vfprintf(stderr, format, args); va_end(args); diff --git a/test/rsync_test.c b/test/rsync_test.c index 1676bbd9..1de30423 100644 --- a/test/rsync_test.c +++ b/test/rsync_test.c @@ -5,6 +5,7 @@ #include "mock.c" #include "rsync.c" #include "stream.c" +#include "types/map.c" #include "asn1/asn1c/ber_decoder.c" #include "asn1/asn1c/ber_tlv_length.c" @@ -42,6 +43,32 @@ __MOCK_ABORT(OPEN_TYPE_ber_get, asn_dec_rval_t, trash, MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td, const void *strt, asn_app_constraint_failed_f *cb, void *key) +static struct timespec rsync_request_time; +static int rsync_expected_duration = -1; +static int rsyncs_done = 0; + +void +rsync_finished(char const *url, char const *path) +{ + struct timespec now; + int delta; + + if (rsync_expected_duration == -1) + ck_abort_msg("rsync_finished() called, but duration not set."); + + ts_now(&now); + delta = ts_delta(&rsync_request_time, &now); + + printf("Callback! rsync finished after %dms.\n", delta); + if (rsync_expected_duration < 100) + ck_assert_int_le(0, delta); + else + ck_assert_int_le(rsync_expected_duration - 100, delta); + ck_assert_int_lt(delta, rsync_expected_duration + 100); + + rsyncs_done++; +} + /* Tests */ /* Test RsyncRequest decode, feeding as few bytes as possible every time. */ @@ -93,25 +120,25 @@ START_TEST(test_decode_extremely_fragmented) END_TEST static void -ck_no_tasks(struct rsync_tasks *tasks) +ck_no_tasks(void) { - ck_assert_int_eq(0, tasks->a); - ck_assert(LIST_EMPTY(&tasks->active)); - ck_assert(LIST_EMPTY(&tasks->queued)); + struct cache_mapping map; + int error; + + error = next_task(&map); + ck_assert(error == EAGAIN || error == EWOULDBLOCK); } static void -ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk, - char const *url, char const *path) +ck_next_task(char const *url, char const *path) { - struct rsync_task *task; - - task = LIST_FIRST(&tasks->active); - ck_assert_ptr_ne(NULL, task); - ck_assert_str_eq(url, task->url); - ck_assert_str_eq(path, task->path); - ck_assert(tasks->a > 0); - finish_task(tasks, task, sk); + struct cache_mapping map; + + ck_assert_int_eq(0, next_task(&map)); + ck_assert_str_eq(url, map.url); + ck_assert_str_eq(path, map.path); + + map_cleanup(&map); } static void @@ -130,90 +157,74 @@ encode_request(char const *url, char const *path, unsigned char *buffer) * Tests messy request queuing; in particular, when a single read() yields less * than one or multiple of them. */ -START_TEST(test_read_tasks) +START_TEST(test_next_task) { unsigned char bytes[BUFSIZE]; int fds[2]; - struct s2p_socket sk; - struct rsync_tasks tasks; - struct timespec now; ck_assert_int_eq(0, nonblock_pipe(fds)); - rstream_init(&sk.rd, fds[0], 256); - sk.wr = -1; - sk.rr = NULL; - LIST_INIT(&tasks.active); - LIST_INIT(&tasks.queued); - tasks.a = 0; - ts_now(&now); + __spsk_init(fds[RDFD], -1); printf("Read yields nothing\n"); - read_tasks(&sk, &tasks, &now); - ck_no_tasks(&tasks); + ck_no_tasks(); printf("Read yields less than 1 request\n"); encode_request("111", "2222", bytes); /* 13 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10)); - read_tasks(&sk, &tasks, &now); - ck_no_tasks(&tasks); - ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3)); - read_tasks(&sk, &tasks, &now); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 10)); + ck_no_tasks(); - ck_1st_task(&tasks, &sk, "111", "2222"); - ck_no_tasks(&tasks); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 10, 3)); + ck_next_task("111", "2222"); + ck_no_tasks(); printf("Read yields 1 request\n"); encode_request("3333", "444", bytes); /* 13 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13)); - read_tasks(&sk, &tasks, &now); - ck_1st_task(&tasks, &sk, "3333", "444"); - ck_no_tasks(&tasks); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 13)); + ck_next_task("3333", "444"); + ck_no_tasks(); printf("Read yields 1.5 requests\n"); encode_request("55", "666", bytes); /* 11 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11)); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 11)); encode_request("777", "88", bytes); /* 11 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5)); - read_tasks(&sk, &tasks, &now); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 5)); - ck_1st_task(&tasks, &sk, "55", "666"); - ck_no_tasks(&tasks); + ck_next_task("55", "666"); + ck_no_tasks(); - ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6)); - read_tasks(&sk, &tasks, &now); - - ck_1st_task(&tasks, &sk, "777", "88"); - ck_no_tasks(&tasks); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 5, 6)); + ck_next_task("777", "88"); + ck_no_tasks(); printf("Read yields 2 requests\n"); encode_request("9999", "00", bytes); /* 12 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12)); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 12)); encode_request("aa", "bbbb", bytes); /* 12 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12)); - read_tasks(&sk, &tasks, &now); - ck_1st_task(&tasks, &sk, "aa", "bbbb"); - ck_1st_task(&tasks, &sk, "9999", "00"); - ck_no_tasks(&tasks); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 12)); + + ck_next_task("9999", "00"); + ck_next_task("aa", "bbbb"); + ck_no_tasks(); printf("Read yields 2.5 requests\n"); encode_request("cc", "dd", bytes); /* 10 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10)); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 10)); encode_request("eeeee", "fffff", bytes); /* 16 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 16)); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 16)); encode_request("gggg", "hhhhh", bytes); /* 15 bytes */ - ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3)); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 3)); - read_tasks(&sk, &tasks, &now); - ck_1st_task(&tasks, &sk, "eeeee", "fffff"); - ck_1st_task(&tasks, &sk, "cc", "dd"); - ck_no_tasks(&tasks); + ck_next_task("cc", "dd"); + ck_next_task("eeeee", "fffff"); + ck_no_tasks(); - ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12)); - read_tasks(&sk, &tasks, &now); - ck_1st_task(&tasks, &sk, "gggg", "hhhhh"); - ck_no_tasks(&tasks); + ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 3, 12)); + ck_next_task("gggg", "hhhhh"); + ck_no_tasks(); + + spsk_cleanup(); } END_TEST @@ -223,25 +234,28 @@ wait_rsyncs(unsigned int count, unsigned int millis) { struct timespec req; - if (millis > 100) { - millis -= 100; - req.tv_sec = millis / 1000; - req.tv_nsec = (millis % 1000) * 1000000; - ck_assert_int_eq(0, nanosleep(&req, NULL)); - ck_assert_uint_eq(0, rsync_finished()); - } + printf("Waiting for %u rsyncs after %ums.\n", count, millis); - req.tv_sec = 0; - req.tv_nsec = 200000000; /* 200ms */ - ck_assert_int_eq(0, nanosleep(&req, NULL)); - ck_assert_uint_eq(count, rsync_finished()); + ts_now(&rsync_request_time); + rsync_expected_duration = millis; + rsyncs_done = 0; + + millis += 100; + req.tv_sec = millis / 1000; + req.tv_nsec = (millis % 1000) * 1000000; + ck_assert_int_eq(0, nanosleep(&req, NULL)); /* Wait for rsync_finished() */ + + ck_assert_int_eq(count, rsyncs_done); + rsync_expected_duration = -1; } START_TEST(test_fast_single_rsync) { + printf("-- test_fast_single_rsync() --\n"); + rsync_setup("resources/rsync/fast.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); wait_rsyncs(1, 0); @@ -252,9 +266,11 @@ END_TEST START_TEST(test_stalled_single_rsync) { + printf("-- test_stalled_single_rsync() --\n"); + rsync_setup("resources/rsync/stalled.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); wait_rsyncs(1, 3000); @@ -265,9 +281,11 @@ END_TEST START_TEST(test_stalled_single_rsync_timeout) { + printf("-- test_stalled_single_rsync_timeout() --\n"); + rsync_setup("resources/rsync/stalled-timeout.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); wait_rsyncs(1, 4000); /* 4000 = timeout */ @@ -278,9 +296,11 @@ END_TEST START_TEST(test_dripfeed_single_rsync) { + printf("-- test_dripfeed_single_rsync() --\n"); + rsync_setup("resources/rsync/drip-feed.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); wait_rsyncs(1, 3000); @@ -291,9 +311,11 @@ END_TEST START_TEST(test_dripfeed_single_rsync_timeout) { + printf("-- test_dripfeed_single_rsync_timeout() --\n"); + rsync_setup("resources/rsync/drip-feed-timeout.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); wait_rsyncs(1, 4000); /* 4000 = timeout */ @@ -304,12 +326,13 @@ END_TEST START_TEST(test_no_rsyncs) { + printf("-- test_no_rsyncs() --\n"); + rsync_setup("rsync", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); - sleep(2); - ck_assert_uint_eq(0, rsync_finished()); + wait_rsyncs(0, 2000); rsync_teardown(); } @@ -317,9 +340,11 @@ END_TEST START_TEST(test_simultaneous_rsyncs) { + printf("-- test_simultaneous_rsyncs() --\n"); + rsync_setup("resources/rsync/simultaneous.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); ck_assert_int_eq(0, rsync_queue("C", "D")); @@ -333,10 +358,11 @@ END_TEST START_TEST(test_queued_rsyncs) { - rsync_setup("resources/rsync/queued.sh", NULL); - ck_assert_int_ne(-1, readfd); - ck_assert_int_ne(-1, writefd); + printf("-- test_queued_rsyncs() --\n"); + rsync_setup("resources/rsync/queued.sh", NULL); + ck_assert_int_ne(-1, pssk.rd.fd); + ck_assert_int_ne(-1, pssk.wr); ck_assert_int_eq(0, rsync_queue("A", "B")); ck_assert_int_eq(0, rsync_queue("C", "D")); @@ -344,7 +370,8 @@ START_TEST(test_queued_rsyncs) ck_assert_int_eq(0, rsync_queue("G", "H")); wait_rsyncs(3, 2000); - wait_rsyncs(1, 2000); + /* 2k minus the 100 extra we slept during the previous wait_rsyncs() */ + wait_rsyncs(1, 1900); rsync_teardown(); } @@ -358,7 +385,7 @@ create_suite(void) p2s = tcase_create("parent-spawner channel"); tcase_add_test(p2s, test_decode_extremely_fragmented); - tcase_add_test(p2s, test_read_tasks); + tcase_add_test(p2s, test_next_task); s2r = tcase_create("spawner-rsync channel"); tcase_add_test(s2r, test_fast_single_rsync);