From: Alberto Leiva Popper Date: Tue, 8 Apr 2025 17:33:36 +0000 (-0600) Subject: rsync spawner: Poll all read file descriptors X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=29fe78d9d1be17e0fa2ef1f7dbbf184280ed7c7b;p=thirdparty%2FFORT-validator.git rsync spawner: Poll all read file descriptors The single thread requirement and lack of polling was preventing the spawner from running multiple rsyncs at the same time (as their output needs to be exhausted for them to end), and more importantly, from consuming the request stream while the one rsync was running. (The latter might result in dropped requests if too many rsyncs are queued.) Therefore, poll both the request stream and the rsync pipes. All input is now consumed immediately, and multiple rsyncs can be forked at the same time. (Still needs a limit.) --- diff --git a/src/asn1/asn1c/Makefile.include b/src/asn1/asn1c/Makefile.include index 9b602c27..7f6dcb83 100644 --- a/src/asn1/asn1c/Makefile.include +++ b/src/asn1/asn1c/Makefile.include @@ -58,6 +58,7 @@ ASN_MODULE_SRCS= \ asn1/asn1c/RouteOriginAttestation.c \ asn1/asn1c/ROAIPAddressFamily.c \ asn1/asn1c/ROAIPAddress.c \ + asn1/asn1c/RsyncRequest.c \ asn1/asn1c/Manifest.c \ asn1/asn1c/FileAndHash.c \ asn1/asn1c/BinarySigningTime.c \ @@ -123,6 +124,7 @@ ASN_MODULE_HDRS= \ asn1/asn1c/RouteOriginAttestation.h \ asn1/asn1c/ROAIPAddressFamily.h \ asn1/asn1c/ROAIPAddress.h \ + asn1/asn1c/RsyncRequest.h \ asn1/asn1c/Manifest.h \ asn1/asn1c/FileAndHash.h \ asn1/asn1c/BinarySigningTime.h \ diff --git a/src/asn1/asn1c/OCTET_STRING.h b/src/asn1/asn1c/OCTET_STRING.h index 5f834766..1506cb92 100644 --- a/src/asn1/asn1c/OCTET_STRING.h +++ b/src/asn1/asn1c/OCTET_STRING.h @@ -37,6 +37,7 @@ xer_type_encoder_f OCTET_STRING_encode_xer; xer_type_encoder_f OCTET_STRING_encode_xer_utf8; #define OCTET_STRING_constraint asn_generic_no_constraint +#define OCTET_STRING_cmp(a, b) OCTET_STRING_compare(&asn_DEF_OCTET_STRING, a, b) /****************************** * Handy conversion routines. * diff --git a/src/asn1/asn1c/RsyncRequest.c b/src/asn1/asn1c/RsyncRequest.c new file mode 100644 index 00000000..4349ad2b --- /dev/null +++ b/src/asn1/asn1c/RsyncRequest.c @@ -0,0 +1,69 @@ +/* + * Generated by asn1c-0.9.29 (http://lionet.info/asn1c) + * From ASN.1 module "RsyncSpawnerChannel" + * found in "rsync.asn1" + * `asn1c -Werror -fcompound-names -fwide-types -D asn1/asn1c -no-gen-PER -no-gen-example` + */ + +#include "asn1/asn1c/RsyncRequest.h" + +static asn_TYPE_member_t asn_MBR_RsyncRequest_1[] = { + { ATF_NOFLAGS, 0, offsetof(struct RsyncRequest, url), + (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)), + 0, + &asn_DEF_OCTET_STRING, + 0, + { 0, 0, 0 }, + 0, 0, /* No default value */ + "url" + }, + { ATF_NOFLAGS, 0, offsetof(struct RsyncRequest, path), + (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)), + 0, + &asn_DEF_OCTET_STRING, + 0, + { 0, 0, 0 }, + 0, 0, /* No default value */ + "path" + }, +}; +static const ber_tlv_tag_t asn_DEF_RsyncRequest_tags_1[] = { + (ASN_TAG_CLASS_UNIVERSAL | (16 << 2)) +}; +static const asn_TYPE_tag2member_t asn_MAP_RsyncRequest_tag2el_1[] = { + { (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)), 0, 0, 1 }, /* url */ + { (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)), 1, -1, 0 } /* path */ +}; +static asn_SEQUENCE_specifics_t asn_SPC_RsyncRequest_specs_1 = { + sizeof(struct RsyncRequest), + offsetof(struct RsyncRequest, _asn_ctx), + asn_MAP_RsyncRequest_tag2el_1, + 2, /* Count of tags in the map */ + -1, /* First extension addition */ +}; +asn_TYPE_descriptor_t asn_DEF_RsyncRequest = { + "RsyncRequest", + "RsyncRequest", + &asn_OP_SEQUENCE, + asn_DEF_RsyncRequest_tags_1, + sizeof(asn_DEF_RsyncRequest_tags_1) + /sizeof(asn_DEF_RsyncRequest_tags_1[0]), /* 1 */ + asn_DEF_RsyncRequest_tags_1, /* Same as above */ + sizeof(asn_DEF_RsyncRequest_tags_1) + /sizeof(asn_DEF_RsyncRequest_tags_1[0]), /* 1 */ + { 0, 0, SEQUENCE_constraint }, + asn_MBR_RsyncRequest_1, + 2, /* Elements count */ + &asn_SPC_RsyncRequest_specs_1 /* Additional specs */ +}; + +int +RsyncRequest_init(struct RsyncRequest *req, char const *url, char const *path) +{ + memset(req, 0, sizeof(*req)); + if (OCTET_STRING_fromString(&req->url, url) < 0) + return -1; + if (OCTET_STRING_fromString(&req->path, path) < 0) + return -1; + return 0; +} diff --git a/src/asn1/asn1c/RsyncRequest.h b/src/asn1/asn1c/RsyncRequest.h new file mode 100644 index 00000000..13ec8329 --- /dev/null +++ b/src/asn1/asn1c/RsyncRequest.h @@ -0,0 +1,38 @@ +/* + * Generated by asn1c-0.9.29 (http://lionet.info/asn1c) + * From ASN.1 module "RsyncSpawnerChannel" + * found in "rsync.asn1" + * `asn1c -Werror -fcompound-names -fwide-types -D asn1/asn1c -no-gen-PER -no-gen-example` + */ + +#ifndef _RsyncRequest_H_ +#define _RsyncRequest_H_ + +/* Including external dependencies */ +#include "asn1/asn1c/OCTET_STRING.h" +#include "asn1/asn1c/constr_SEQUENCE.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* RsyncRequest */ +typedef struct RsyncRequest { + OCTET_STRING_t url; + OCTET_STRING_t path; + + /* Context for parsing across buffer boundaries */ + asn_struct_ctx_t _asn_ctx; +} RsyncRequest_t; + +/* Implementation */ +extern asn_TYPE_descriptor_t asn_DEF_RsyncRequest; + +int RsyncRequest_init(struct RsyncRequest *, char const *, char const *); + +#ifdef __cplusplus +} +#endif + +#endif /* _RsyncRequest_H_ */ +#include "asn1/asn1c/asn_internal.h" diff --git a/src/cache.c b/src/cache.c index cabd0269..203b81e4 100644 --- a/src/cache.c +++ b/src/cache.c @@ -727,7 +727,8 @@ dl_rsync(struct cache_node *module) { int error; - error = rsync_download(module->map.url, module->map.path); + // XXX go pick some other task on zero + error = rsync_queue(module->map.url, module->map.path); if (error) return error; @@ -880,6 +881,8 @@ do_refresh(struct cache_table *tbl, char const *uri, struct cache_node **result) node->attempt_ts = time_fatal(); rm_metadata(node); node->dlerr = tbl->download(node); + if (node->dlerr == EBUSY) + goto ongoing; write_metadata(node); downloaded = true; @@ -887,7 +890,7 @@ do_refresh(struct cache_table *tbl, char const *uri, struct cache_node **result) node->state = DLS_FRESH; break; case DLS_ONGOING: - mutex_unlock(&tbl->lock); +ongoing: mutex_unlock(&tbl->lock); pr_val_debug("Refresh ongoing."); return EBUSY; case DLS_FRESH: diff --git a/src/common.c b/src/common.c index f5e3e4dc..e07f2e6e 100644 --- a/src/common.c +++ b/src/common.c @@ -309,7 +309,60 @@ str2time(char const *str, time_t *tt) return 0; } -char *hex2str(uint8_t const *hex, size_t hexlen) +static void +ts_normalize(struct timespec *ts) +{ + if (ts->tv_nsec >= 1000000000L) { + ts->tv_sec += ts->tv_nsec / 1000000000L; + ts->tv_nsec %= 1000000000L; + } + while (ts->tv_nsec < 0) { + ts->tv_sec--; + ts->tv_nsec += 1000000000L; + } +} + +void +ts_now(struct timespec *now) +{ + if (clock_gettime(CLOCK_MONOTONIC, now) < 0) + pr_crit("clock_gettime() returned '%s'", strerror(errno)); + ts_normalize(now); /* Probably not needed, but I can't find contracts */ +} + +int +ts_cmp(struct timespec *ts1, struct timespec *ts2) +{ + if (ts1->tv_sec < ts2->tv_sec) + return -1; + if (ts1->tv_sec > ts2->tv_sec) + return 1; + if (ts1->tv_nsec < ts2->tv_nsec) + return -1; + if (ts1->tv_nsec > ts2->tv_nsec) + return 1; + return 0; +} + +/* Result in milliseconds */ +int +ts_delta(struct timespec *before, struct timespec *after) +{ + return (1000 * (after->tv_sec - before->tv_sec)) + + ((after->tv_nsec - before->tv_nsec) / 1000000L); +} + +/* dst = src + millis */ +void +ts_add(struct timespec *dst, struct timespec *src, long millis) +{ + dst->tv_sec = src->tv_sec; + dst->tv_nsec = src->tv_nsec + (1000000L * millis); + ts_normalize(dst); + +} +char * +hex2str(uint8_t const *hex, size_t hexlen) { static const char * const H2C = "0123456789ABCDEF"; char *str; @@ -341,7 +394,8 @@ c2h(char c) } /* @hex needs to be already allocated. */ -int str2hex(char const *str, uint8_t *hex) +int +str2hex(char const *str, uint8_t *hex) { size_t h; int digit; diff --git a/src/common.h b/src/common.h index ae32d407..b93f622e 100644 --- a/src/common.h +++ b/src/common.h @@ -60,6 +60,11 @@ time_t time_fatal(void); int time2str(time_t, char *); int str2time(char const *, time_t *); +void ts_now(struct timespec *); +int ts_cmp(struct timespec *, struct timespec *); +int ts_delta(struct timespec *, struct timespec *); +void ts_add(struct timespec *, struct timespec *, long); + char *hex2str(uint8_t const *, size_t); int str2hex(char const *, uint8_t *); diff --git a/src/file.c b/src/file.c index 50df56f0..5ea5dadd 100644 --- a/src/file.c +++ b/src/file.c @@ -209,6 +209,7 @@ file_rm_rf(char const *path) pr_op_debug("rm -rf %s", path); /* TODO (performance) optimize that 32 */ + // XXX In MacOS, this breaks if path is a file. if (nftw(path, rm, 32, FTW_DEPTH | FTW_PHYS) < 0) { error = errno; // XXX This msg is sometimes annoying; maybe defer it diff --git a/src/main.c b/src/main.c index b40d011d..6b13e38a 100644 --- a/src/main.c +++ b/src/main.c @@ -128,7 +128,7 @@ main(int argc, char **argv) if (error) goto revert_log; - rsync_setup(); /* Spawn rsync spawner ASAP */ + rsync_setup(NULL, NULL); /* Spawn rsync spawner ASAP */ register_signal_handlers(); error = thvar_init(); diff --git a/src/object/tal.c b/src/object/tal.c index 79e2b1be..424c9784 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -252,6 +252,12 @@ perform_standalone_validation(void) if (error) goto end; + /* + * From now on, the trees should be considered valid, even if subsequent + * certificates fail. + * (The roots validated successfully; subtrees are isolated problems.) + */ + for (t = 0; t < 5; t++) { error = pthread_create(&threads[t], NULL, pick_up_work, NULL); if (error) diff --git a/src/print_file.c b/src/print_file.c index ac3ec81f..4183f5c6 100644 --- a/src/print_file.c +++ b/src/print_file.c @@ -31,9 +31,9 @@ __rsync2bio(char const *src, char const *dst) // XXX use the cache - error = rsync_download(src, dst); + error = rsync_queue(src, dst); if (error) { - pr_op_err("rysnc download failed: %s", strerror(abs(error))); + pr_op_err("rsync download failed: %s", strerror(abs(error))); return NULL; } diff --git a/src/rsync.c b/src/rsync.c index 25f15846..d50d01fd 100644 --- a/src/rsync.c +++ b/src/rsync.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -12,205 +13,157 @@ #include "common.h" #include "config.h" #include "log.h" +#include "types/array.h" + +#include "asn1/asn1c/ber_decoder.h" +#include "asn1/asn1c/der_encoder.h" +#include "asn1/asn1c/RsyncRequest.h" + +#define RSP /* rsync spawner prefix */ "[rsync spawner] " + +static char const *rsync_args[20]; /* Last must be NULL */ + +static const int RDFD = 0; +static const int WRFD = 1; #define STDERR_WRITE(fds) fds[0][1] #define STDOUT_WRITE(fds) fds[1][1] #define STDERR_READ(fds) fds[0][0] #define STDOUT_READ(fds) fds[1][0] -static pid_t spawner; /* The process that spawns rsync runs */ +static pid_t spawner; /* The subprocess that spawns rsync runs */ -static int readfd; /* Our end of the spawner-to-parent pipe */ +static int readfd; /* Parent's end of the spawner-to-parent pipe */ static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER; -static int writefd; /* Our end of the parent-to-spawner pipe */ +static int writefd; /* Parent's end of the parent-to-spawner pipe */ static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER; -static int rsync(char const *, char const *); +/* + * "Spawner to parent" socket. + * Socket used by the spawner to communicate with the parent. + */ +struct s2p_socket { + /* Spawner's end of the parent-to-spawner pipe */ + struct read_stream rd; + /* Spawner's end of the spawner-to-parent pipe */ + int wr; + /* Scratchpad buffer for stream read */ + struct RsyncRequest *rr; +}; + +struct rsync_task { + int pid; + char *url; + char *path; + int stdoutfd; /* Child rsync's standard output */ + int stderrfd; /* Child rsync's standard error */ + struct timespec expiration; + + LIST_ENTRY(rsync_task) lh; +}; + +LIST_HEAD(rsync_tasks, rsync_task); + +#ifndef LIST_FOREACH_SAFE +#define LIST_FOREACH_SAFE(var, ls, lh, tmp) \ + for ( \ + var = LIST_FIRST(ls), tmp = (var ? LIST_NEXT(var, lh) : NULL); \ + var != NULL; \ + var = tmp, tmp = (var ? LIST_NEXT(var, lh) : NULL) \ + ) +#endif -static int -run_child(int readfd, int writefd) +static void +void_task(struct rsync_task *task, struct s2p_socket *s2p) { - unsigned char zero = 0; - struct read_stream stream; - char *url, *path; - int error; + static const unsigned char one = 1; - read_stream_init(&stream, readfd); + free(task->url); + free(task->path); + free(task); - do { - error = read_string(&stream, &url); - if (error || url == NULL) - break; - error = read_string(&stream, &path); - if (error || path == NULL) { - free(url); - break; - } - - error = rsync(url, path); - - free(url); - free(path); + if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) { + pr_op_err("Cannot message parent process: %s", strerror(errno)); - error = full_write(writefd, &zero, 1); - } while (!error); - - read_stream_close(&stream); - close(writefd); - free_rpki_config(); - log_teardown(); - return error; + close(s2p->wr); + s2p->wr = -1; + /* Can't signal finished rsyncs anymore; reject future ones. */ + rstream_close(&s2p->rd, true); + } } -void -rsync_setup(void) +static void +finish_task(struct rsync_task *task, struct s2p_socket *s2p) { - static const int RDFD = 0; - static const int WRFD = 1; - int parent2spawner[2]; /* Pipe: Parent writes, spawner reads */ - int spawner2parent[2]; /* Pipe: Spawner writes, parent reads */ - int flags; - - if (pipe(parent2spawner) < 0) { - pr_op_err("Cannot create pipe: %s", strerror(errno)); - goto fail1; - } - if (pipe(spawner2parent) < 0) { - pr_op_err("Cannot create pipe: %s", strerror(errno)); - goto fail2; - } - - flags = fcntl(spawner2parent[RDFD], F_GETFL); - if (flags < 0) { - pr_op_err("Cannot retrieve pipe flags: %s", strerror(errno)); - goto fail3; - } - if (fcntl(spawner2parent[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) { - pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(errno)); - goto fail3; - } - - fflush(stdout); - fflush(stderr); - - spawner = fork(); - if (spawner < 0) { - pr_op_err("Cannot fork rsync spawner: %s", strerror(errno)); - goto fail3; - } - - if (spawner == 0) { /* Client code */ - close(parent2spawner[WRFD]); - close(spawner2parent[RDFD]); - exit(run_child(parent2spawner[RDFD], spawner2parent[WRFD])); - } - - /* Parent code */ - close(parent2spawner[RDFD]); - close(spawner2parent[WRFD]); - readfd = spawner2parent[RDFD]; - writefd = parent2spawner[WRFD]; - return; - -fail3: close(spawner2parent[RDFD]); - close(spawner2parent[WRFD]); -fail2: close(parent2spawner[RDFD]); - close(parent2spawner[WRFD]); -fail1: pr_op_warn("rsync will not be available."); - readfd = writefd = 0; + LIST_REMOVE(task, lh); + void_task(task, s2p); } -int -rsync_download(char const *url, char const *path) +static void +init_pfd(struct pollfd *pfd, int fd) { - mutex_lock(&writelock); - - if (writefd == 0) - goto fail1; - if (write_string(writefd, url) != 0) - goto fail2; - if (write_string(writefd, path) != 0) - goto fail2; - - mutex_unlock(&writelock); - return 0; // XXX go pick some other task - -fail2: close(readfd); - close(writefd); - readfd = writefd = 0; -fail1: mutex_unlock(&writelock); - return EIO; + pfd->fd = fd; + pfd->events = POLLIN; + pfd->revents = 0; } -/* Returns the number of rsyncs that have ended since the last query */ -unsigned int -rsync_finished(void) +static struct pollfd * +create_pfds(int request_fd, struct rsync_tasks *tasks, size_t tn) { - unsigned char buf[8]; - ssize_t result; + struct pollfd *pfds; + struct rsync_task *task; + size_t p; - mutex_lock(&readlock); - result = (readfd != 0) ? read(readfd, buf, sizeof(buf)) : 0; - mutex_unlock(&readlock); + pfds = pmalloc((2 * tn + 1) * sizeof(struct pollfd)); + p = 0; - return (result >= 0) ? result : 0; + init_pfd(&pfds[p++], request_fd); + LIST_FOREACH(task, tasks, lh) { + init_pfd(&pfds[p++], task->stdoutfd); + init_pfd(&pfds[p++], task->stderrfd); + } + + return pfds; } static int -wait_child(char const *name, pid_t pid) +create_pipes(int fds[2][2]) { - int status; int error; -again: status = 0; - if (waitpid(pid, &status, 0) < 0) { + if (pipe(fds[0]) < 0) { error = errno; - pr_op_err("Could not wait for %s: %s", name, strerror(error)); + pr_op_err_st("Piping rsync stderr: %s", strerror(error)); return error; } - if (WIFEXITED(status)) { - /* Happy path (but also sad path sometimes) */ - error = WEXITSTATUS(status); - pr_val_debug("%s ended. Result: %d", name, error); - return error ? EIO : 0; - } - - if (WIFSIGNALED(status)) { - pr_op_warn("%s interrupted by signal %d.", - name, WTERMSIG(status)); - return EINTR; - } - - if (WIFCONTINUED(status)) { - /* - * Testing warning: - * I can't trigger this branch. It always exits or signals; - * SIGSTOP then SIGCONT doesn't seem to wake up waitpid(). - * It's concerning because every sample code I've found assumes - * waitpid() returning always means the subprocess ended, so - * they never retry. But that contradicts all documentation, - * yet seems to be accurate to reality. - */ - pr_op_debug("%s has resumed.", name); - goto again; + if (pipe(fds[1]) < 0) { + error = errno; + pr_op_err_st("Piping rsync stdout: %s", strerror(error)); + close(fds[0][0]); + close(fds[0][1]); + return error; } - /* Dead code */ - pr_op_err("Unknown waitpid() status; giving up %s.", name); - return EINVAL; + return 0; } -void -rsync_teardown(void) +static void +prepare_rsync_args(char **args, char const *url, char const *path) { - if (readfd) - close(readfd); - if (writefd) - close(writefd); - readfd = writefd = 0; - wait_child("rsync spawner", spawner); + size_t i; + + /* + * execvp() is not going to tweak these strings; + * stop angsting over the const-to-raw conversion. + */ + + for (i = 0; rsync_args[i] != NULL; i++) + args[i] = (char *)rsync_args[i]; + args[i++] = (char *)url; + args[i++] = (char *)path; + args[i++] = NULL; } /* @@ -233,44 +186,6 @@ duplicate_fds(int fds[2][2]) close(STDOUT_READ(fds)); } -static void -prepare_rsync_args(char **args, char const *url, char const *path) -{ - size_t i = 0; - - /* - * execvp() is not going to tweak these strings; - * stop angsting over the const-to-raw conversion. - */ - - /* XXX review */ - args[i++] = (char *)config_get_rsync_program(); -#ifdef UNIT_TESTING - /* Note... --bwlimit does not seem to exist in openrsync */ - args[i++] = "--bwlimit=1K"; - args[i++] = "-vvv"; -#else - args[i++] = "-rtz"; - args[i++] = "--omit-dir-times"; - args[i++] = "--contimeout"; - args[i++] = "20"; - args[i++] = "--max-size"; - args[i++] = "20MB"; - args[i++] = "--timeout"; - args[i++] = "15"; - args[i++] = "--include=*/"; - args[i++] = "--include=*.cer"; - args[i++] = "--include=*.crl"; - args[i++] = "--include=*.gbr"; - args[i++] = "--include=*.mft"; - args[i++] = "--include=*.roa"; - args[i++] = "--exclude=*"; -#endif - args[i++] = (char *)url; - args[i++] = (char *)path; - args[i++] = NULL; -} - static int execvp_rsync(char const *url, char const *path, int fds[2][2]) { @@ -286,37 +201,142 @@ execvp_rsync(char const *url, char const *path, int fds[2][2]) } static int -create_pipes(int fds[2][2]) +fork_rsync(struct rsync_task *task) { + int fork_fds[2][2]; int error; - if (pipe(fds[0]) == -1) { + error = create_pipes(fork_fds); + if (error) + return error; + + fflush(stdout); + fflush(stderr); + + task->pid = fork(); + if (task->pid < 0) { error = errno; - pr_op_err_st("Piping rsync stderr: %s", strerror(error)); - return -error; + pr_op_err_st("Couldn't spawn the rsync process: %s", + strerror(error)); + close(STDERR_READ(fork_fds)); + close(STDOUT_READ(fork_fds)); + close(STDERR_WRITE(fork_fds)); + close(STDOUT_WRITE(fork_fds)); + return error; } - if (pipe(fds[1]) == -1) { - error = errno; + if (task->pid == 0) /* Child code */ + exit(execvp_rsync(task->url, task->path, fork_fds)); - /* Close pipe previously created */ - close(fds[0][0]); - close(fds[0][1]); + /* Parent code */ - pr_op_err_st("Piping rsync stdout: %s", strerror(error)); - return -error; + close(STDERR_WRITE(fork_fds)); + close(STDOUT_WRITE(fork_fds)); + task->stderrfd = STDERR_READ(fork_fds); + task->stdoutfd = STDOUT_READ(fork_fds); + return 0; +} + +static unsigned int +start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks, + struct timespec *now) +{ + struct rsync_task *task; + + task = pzalloc(sizeof(struct rsync_task)); + task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size); + task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size); + ts_add(&task->expiration, now, 1000 * config_get_rsync_transfer_timeout()); + + if (fork_rsync(task) != 0) { + void_task(task, s2p); + return 0; } - return 0; + LIST_INSERT_HEAD(tasks, task, lh); + pr_val_debug(RSP "Got new task: %d", task->pid); + return 1; +} + +static unsigned int +read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks, + struct timespec *now) +{ + struct read_stream *in; + ssize_t consumed; + size_t offset; + asn_dec_rval_t decres; + unsigned int t; + int error; + + in = &s2p->rd; + t = 0; + + do { + consumed = read(in->fd, in->buffer + in->len, + in->capacity - in->len); + if (consumed < 0) { + error = errno; + if (error != EAGAIN && error != EWOULDBLOCK) + rstream_close(in, true); + return t; + } + if (consumed == 0) { /* EOS */ + rstream_close(in, true); + return t; + } + + in->len += consumed; + + for (offset = 0; offset < in->len;) { + decres = ber_decode(&asn_DEF_RsyncRequest, + (void **)&s2p->rr, + in->buffer + offset, in->len - offset); + offset += decres.consumed; + switch (decres.code) { + case RC_OK: + t += start_task(s2p, tasks, now); + ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr); + break; + case RC_WMORE: + goto break_for; + case RC_FAIL: + rstream_close(in, true); + return t; + } + } + +break_for: if (offset > in->len) + pr_crit("read_tasks off:%zu len:%zu", offset, in->len); + in->len -= offset; + memmove(in->buffer, in->buffer + offset, in->len); + } while (true); } -static long -get_current_millis(void) +/* Returns number of tasks created */ +static int +handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd, + struct rsync_tasks *tasks, struct timespec *now) { - struct timespec now; - if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) - pr_crit("clock_gettime() returned %d", errno); - return 1000L * now.tv_sec + now.tv_nsec / 1000000L; + if (s2p->rd.fd == -1) + return 0; + + if (pfd->revents & POLLNVAL) { + pr_val_err(RSP "bad parent fd: %i", pfd->fd); + rstream_close(&s2p->rd, false); + return 0; + } + + if (pfd->revents & POLLERR) { + pr_val_err(RSP "Generic error during parent fd poll."); + rstream_close(&s2p->rd, true); + return 0; + } + + if (pfd->revents & (POLLIN | POLLHUP)) + return read_tasks(s2p, tasks, now); + + return 0; } static void @@ -333,7 +353,7 @@ log_buffer(char const *buffer, ssize_t read, bool is_error) cur = cpy; while ((tmp = strchr(cur, '\n')) != NULL) { *tmp = '\0'; - if(strlen(cur) == 0) { + if (strlen(cur) == 0) { cur = tmp + 1; continue; } @@ -346,162 +366,410 @@ log_buffer(char const *buffer, ssize_t read, bool is_error) free(cpy); } -#define DROP_FD(f, fail) \ - do { \ - pfd[f].fd = -1; \ - error |= fail; \ - } while (0) -#define CLOSE_FD(f, fail) \ - do { \ - close(pfd[f].fd); \ - DROP_FD(f, fail); \ - } while (0) +/* 0 = still more to read; 1 = stream down */ +static int +log_rsync_output(struct pollfd *pfd, size_t p) +{ + char buffer[1024]; + ssize_t count; + int error; + + count = read(pfd->fd, buffer, sizeof(buffer)); + if (count == 0) + goto down; /* EOF */ + if (count == -1) { + error = errno; + if (error == EINTR) + return 0; /* Dunno; retry */ + pr_val_err("rsync buffer read error: %s", strerror(error)); + goto down; /* Error */ + } + + log_buffer(buffer, count, (p & 1) == 0); + return 0; /* Keep going */ + +down: close(pfd->fd); + pfd->fd = -1; + return 1; +} + +/* Returns 1 if the stream ended */ +static int +handle_rsync_fd(struct pollfd *pfd, size_t p) +{ + if (pfd->fd == -1) { + pr_val_debug(RSP "File descriptor already closed."); + return 1; + } + + if (pfd->revents & POLLNVAL) { + pr_val_err(RSP "rsync bad fd: %i", pfd->fd); + return 1; + } + + if (pfd->revents & POLLERR) { + pr_val_err(RSP "Generic error during rsync poll."); + close(pfd->fd); + return 1; + } + + if (pfd->revents & (POLLIN | POLLHUP)) + return log_rsync_output(pfd, p); + + return 0; +} -/* - * Consumes (and throws away) all the bytes in read streams @fderr and @fdout, - * then closes them once they reach end of stream. - * - * Returns: ok -> 0, error -> 1, timeout -> 2. - */ static int -exhaust_read_fds(int fderr, int fdout) +wait_subprocess(char const *name, pid_t pid) +{ + int status; + int error; + +again: status = 0; + if (waitpid(pid, &status, 0) < 0) { + error = errno; + pr_op_err("Could not wait for %s: %s", name, strerror(error)); + return error; + } + + if (WIFEXITED(status)) { + /* Happy path (but also sad path sometimes) */ + error = WEXITSTATUS(status); + pr_val_debug("%s ended. Result: %d", name, error); + return error ? EIO : 0; + } + + if (WIFSIGNALED(status)) { + pr_op_warn("%s interrupted by signal %d (%s).", + name, WTERMSIG(status), strsignal(WTERMSIG(status))); + return EINTR; + } + + if (WIFCONTINUED(status)) { + /* + * Testing warning: + * I can't trigger this branch. It always exits or signals; + * SIGSTOP then SIGCONT doesn't seem to wake up waitpid(). + * It's concerning because every sample code I've found assumes + * waitpid() returning always means the subprocess ended, so + * they never retry. But that contradicts all documentation, + * yet seems to be accurate to reality. + */ + pr_op_debug("%s has resumed.", name); + goto again; + } + + /* Dead code */ + pr_op_err("Unknown waitpid() status; giving up %s.", name); + return EINVAL; +} + +static void +kill_subprocess(struct rsync_task *task) +{ + if (task->stdoutfd != -1) + close(task->stdoutfd); + if (task->stderrfd != -1) + close(task->stderrfd); + kill(task->pid, SIGTERM); +} + +/* Returns true if the task died. */ +static bool +maybe_expire(struct timespec *now, struct rsync_task *task, + struct s2p_socket *s2p) { - struct pollfd pfd[2]; - int error, nready, f; - long epoch, delta, timeout; + struct timespec epoch; + + ts_add(&epoch, now, 100); + if (ts_cmp(&epoch, &task->expiration) < 0) + return false; + + pr_op_debug(RSP "Task %d ran out of time.", task->pid); + kill_subprocess(task); + wait_subprocess("rsync", task->pid); + finish_task(task, s2p); + return true; +} + +static int +spawner_run( + int request_fd, /* Requests from parent to us (spawner) */ + int response_fd /* Responses from us (spawner) to parent */ +) { + struct s2p_socket s2p; /* Channel to parent */ + + struct pollfd *pfds; /* Channels to children */ + size_t p, pfds_count; + struct timespec now, expiration; + int timeout; - memset(&pfd, 0, sizeof(pfd)); - pfd[0].fd = fderr; - pfd[0].events = POLLIN; - pfd[1].fd = fdout; - pfd[1].events = POLLIN; + int events; + struct rsync_tasks tasks; + struct rsync_task *task, *tmp; + int task_count; + + int error; + + rstream_init(&s2p.rd, request_fd, 1024); + s2p.wr = response_fd; + s2p.rr = NULL; + + LIST_INIT(&tasks); + task_count = 0; error = 0; - epoch = get_current_millis(); - delta = 0; - timeout = 1000 * config_get_rsync_transfer_timeout(); + ts_now(&now); - while (1) { - nready = poll(pfd, 2, timeout - delta); - if (nready == 0) - goto timed_out; - if (nready == -1) { + do { + /* + * 0: request pipe + * odd: stdouts + * even > 0: stderrs + */ + pfds = create_pfds(s2p.rd.fd, &tasks, task_count); + pfds_count = 2 * task_count + 1; + expiration.tv_sec = now.tv_sec + 10; + expiration.tv_nsec = now.tv_nsec; + LIST_FOREACH(task, &tasks, lh) + if ((ts_cmp(&now, &task->expiration) < 0) && + (ts_cmp(&task->expiration, &expiration) < 0)) + expiration = task->expiration; + + timeout = ts_delta(&now, &expiration); + pr_val_debug(RSP "Timeout decided: %dms", timeout); + events = poll(pfds, pfds_count, timeout); + if (events < 0) { error = errno; - if (error == EINTR) - continue; - pr_val_err("rsync bad poll: %s", strerror(error)); - error = 1; - goto fail; + free(pfds); + break; } - for (f = 0; f < 2; f++) { - if (pfd[f].revents & POLLNVAL) { - pr_val_err("rsync bad fd: %i", pfd[f].fd); - DROP_FD(f, 1); - - } else if (pfd[f].revents & POLLERR) { - pr_val_err("Generic error during rsync poll."); - CLOSE_FD(f, 1); - - } else if (pfd[f].revents & (POLLIN|POLLHUP)) { - char buffer[4096]; - ssize_t count; - - count = read(pfd[f].fd, buffer, sizeof(buffer)); - if (count == -1) { - error = errno; - if (error == EINTR) - continue; - pr_val_err("rsync buffer read error: %s", - strerror(error)); - CLOSE_FD(f, 1); - continue; - } - - if (count == 0) - CLOSE_FD(f, 0); - log_buffer(buffer, count, pfd[f].fd == fderr); - } + ts_now(&now); + + if (events == 0) { /* Timeout */ + pr_val_debug(RSP "Woke up because of timeout."); + LIST_FOREACH_SAFE(task, &tasks, lh, tmp) + task_count -= maybe_expire(&now, task, &s2p); + goto cont; } - if (pfd[0].fd == -1 && pfd[1].fd == -1) - return error; /* Happy path! */ + pr_val_debug(RSP "Woke up because of input."); + p = 1; + LIST_FOREACH_SAFE(task, &tasks, lh, tmp) { + if (maybe_expire(&now, task, &s2p)) { + task_count--; + continue; + } - delta = get_current_millis() - epoch; - if (delta < 0) { - pr_val_err("This clock does not seem monotonic. " - "I'm going to have to give up this rsync."); - error = 1; - goto fail; + if (handle_rsync_fd(&pfds[p], p)) { + pr_val_debug(RSP "Task %d: Stdout closed.", + task->pid); + task->stdoutfd = -1; + } + p++; + if (handle_rsync_fd(&pfds[p], p)) { + pr_val_debug(RSP "Task %d: Stderr closed.", + task->pid); + task->stderrfd = -1; + } + p++; + if (task->stdoutfd == -1 && task->stderrfd == -1) { + pr_val_debug(RSP "Both stdout & stderr are closed; ending task %d.", + task->pid); + wait_subprocess("rsync", task->pid); + finish_task(task, &s2p); + task_count--; + } } - if (delta >= timeout) - goto timed_out; /* Read took too long */ + task_count += handle_parent_fd(&s2p, &pfds[0], &tasks, &now); + +cont: free(pfds); + } while ((s2p.rd.fd != -1 || task_count > 0)); + pr_val_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up..."); + + LIST_FOREACH_SAFE(task, &tasks, lh, tmp) { + kill_subprocess(task); + wait_subprocess("rsync", task->pid); + finish_task(task, &s2p); } -timed_out: - pr_val_err("rsync transfer timeout exhausted"); - error = 2; -fail: for (f = 0; f < 2; f++) - if (pfd[f].fd != -1) - close(pfd[f].fd); + rstream_close(&s2p.rd, true); + if (s2p.wr != -1) + close(s2p.wr); + + free_rpki_config(); + log_teardown(); return error; } -/* - * Completely consumes @fds' streams, and closes them. - * - * Originally, this was meant to redirect rsync's output to syslog: - * ac56d70c954caf49382f5f28ff4a017e859e2e0a - * (ie. we need to exhaust the streams because we dup2()'d them.) - * - * Later, @job repurposed this code to fix #74. - */ static int -exhaust_pipes(int fds[2][2]) +nonblock_pipe(int *fds) { - close(STDERR_WRITE(fds)); - close(STDOUT_WRITE(fds)); - return exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)); + int error; + int flags; + + if (pipe(fds) < 0) { + error = errno; + pr_op_err("Cannot create pipe: %s", strerror(error)); + return error; + } + + flags = fcntl(fds[RDFD], F_GETFL); + if (flags < 0) { + error = errno; + pr_op_err("Cannot retrieve pipe flags: %s", strerror(error)); + goto cancel; + } + if (fcntl(fds[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) { + error = errno; + pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(error)); + goto cancel; + } + + return 0; + +cancel: close(fds[RDFD]); + close(fds[WRFD]); + return error; } -/* rsync @url @path */ -static int -rsync(char const *url, char const *path) +void +rsync_setup(char const *program, ...) { - /* Descriptors to pipe stderr (first element) and stdout (second) */ - int fork_fds[2][2]; - pid_t child_pid; - int error; + int parent2spawner[2]; /* Pipe: Parent writes, spawner reads */ + int spawner2parent[2]; /* Pipe: Spawner writes, parent reads */ - error = create_pipes(fork_fds); - if (error) - return error; + va_list args; + array_index i; + char const *arg; + + if (program != NULL) { + rsync_args[0] = arg = program; + va_start(args, program); + for (i = 1; arg != NULL; i++) { + arg = va_arg(args, char const *); + rsync_args[i] = arg; + } + va_end(args); + } else { + /* XXX review */ + /* XXX Where is --delete? */ + i = 0; + rsync_args[i++] = config_get_rsync_program(); + rsync_args[i++] = "-rtz"; + rsync_args[i++] = "--omit-dir-times"; + rsync_args[i++] = "--contimeout"; + rsync_args[i++] = "20"; + rsync_args[i++] = "--max-size"; + rsync_args[i++] = "20MB"; + rsync_args[i++] = "--timeout"; + rsync_args[i++] = "15"; + rsync_args[i++] = "--include=*/"; + rsync_args[i++] = "--include=*.cer"; + rsync_args[i++] = "--include=*.crl"; + rsync_args[i++] = "--include=*.gbr"; + rsync_args[i++] = "--include=*.mft"; + rsync_args[i++] = "--include=*.roa"; + rsync_args[i++] = "--exclude=*"; + rsync_args[i++] = NULL; + } + + if (nonblock_pipe(parent2spawner) != 0) + goto fail1; + if (nonblock_pipe(spawner2parent) != 0) + goto fail2; fflush(stdout); fflush(stderr); - child_pid = fork(); - if (child_pid < 0) { - error = errno; - pr_op_err_st("Couldn't spawn the rsync process: %s", - strerror(error)); - /* Close all ends from the created pipes */ - close(STDERR_READ(fork_fds)); - close(STDOUT_READ(fork_fds)); - close(STDERR_WRITE(fork_fds)); - close(STDOUT_WRITE(fork_fds)); - return error; + spawner = fork(); + if (spawner < 0) { + pr_op_err("Cannot fork rsync spawner: %s", strerror(errno)); + goto fail3; } - if (child_pid == 0) - exit(execvp_rsync(url, path, fork_fds)); /* Child code */ + if (spawner == 0) { /* Client code */ + close(parent2spawner[WRFD]); + close(spawner2parent[RDFD]); + exit(spawner_run(parent2spawner[RDFD], spawner2parent[WRFD])); + } /* Parent code */ + close(parent2spawner[RDFD]); + close(spawner2parent[WRFD]); + readfd = spawner2parent[RDFD]; + writefd = parent2spawner[WRFD]; + return; - error = exhaust_pipes(fork_fds); - if (error) - kill(child_pid, SIGTERM); /* Stop the child */ +fail3: close(spawner2parent[RDFD]); + close(spawner2parent[WRFD]); +fail2: close(parent2spawner[RDFD]); + close(parent2spawner[WRFD]); +fail1: pr_op_warn("rsync will not be available."); + readfd = writefd = -1; +} - return wait_child("rsync", child_pid); +static int +send_to_spawner(const void *buffer, size_t size, void *arg) +{ + return stream_full_write(writefd, buffer, size); +} + +/* Queues rsync; doesn't wait. Call rsync_finished() later. */ +int +rsync_queue(char const *url, char const *path) +{ + struct RsyncRequest req; + asn_enc_rval_t result; + int error; + + if (RsyncRequest_init(&req, url, path) < 0) + return EINVAL; + + mutex_lock(&writelock); + + if (writefd == -1) { + error = EIO; + goto end; + } + + result = der_encode(&asn_DEF_RsyncRequest, &req, send_to_spawner, NULL); + if (result.encoded == -1) { + close(writefd); + writefd = -1; + error = EIO; + goto end; + } + + error = 0; +end: mutex_unlock(&writelock); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req); + return error; +} + +/* Returns the number of rsyncs that have ended since the last query */ +unsigned int +rsync_finished(void) +{ + unsigned char buf[8]; + ssize_t result; + + mutex_lock(&readlock); + result = (readfd != -1) ? read(readfd, buf, sizeof(buf)) : 0; + mutex_unlock(&readlock); + + return (result >= 0) ? result : 0; +} + +void +rsync_teardown(void) +{ + if (readfd != -1) + close(readfd); + if (writefd != -1) + close(writefd); + readfd = writefd = -1; + wait_subprocess("rsync spawner", spawner); } diff --git a/src/rsync.h b/src/rsync.h index 943ece5e..bdfc7df8 100644 --- a/src/rsync.h +++ b/src/rsync.h @@ -1,8 +1,8 @@ #ifndef SRC_RSYNC_RSYNC_H_ #define SRC_RSYNC_RSYNC_H_ -void rsync_setup(void); -int rsync_download(char const *, char const *); +void rsync_setup(char const *, ...); +int rsync_queue(char const *, char const *); unsigned int rsync_finished(void); void rsync_teardown(void); diff --git a/src/stream.c b/src/stream.c index 8ffd905c..6400704a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -8,60 +8,57 @@ #include "log.h" void -read_stream_init(struct read_stream *stream, int fd) +rstream_init(struct read_stream *stream, int fd, size_t initial_capacity) { stream->fd = fd; - stream->buffer = pmalloc(256); - stream->capacity = 256; + stream->buffer = pmalloc(initial_capacity); + stream->len = 0; + stream->capacity = initial_capacity; } void -read_stream_close(struct read_stream *stream) +rstream_close(struct read_stream *stream, bool do_close) { - close(stream->fd); + if (stream->fd == -1) + return; + + if (do_close) + close(stream->fd); free(stream->buffer); + stream->fd = -1; } /* - * Full read or error. + * Reads until exactly @len bytes (sleeping if necessary), EOS or error. + * @stream's capacity must be >= len. * - * Nonzero: error - * 0, stream->buffer != NULL: Success - * 0, stream->buffer == NULL: EOF + * Returns: + * - >= 0: Number of bytes read (< @len only if EOS reached). + * - < 0: error; remove sign for proper code. */ -static int -full_read(struct read_stream *stream, size_t len) +int +rstream_full_read(struct read_stream *stream, size_t len) { - ssize_t rd; size_t offset; + ssize_t rd; - if (stream->buffer == NULL) - return 0; + if (stream->buffer == NULL || stream->capacity < len) + return -ENOSPC; - offset = 0; - do { - rd = read(stream->fd, stream->buffer + offset, len); + for (offset = 0; offset < len; offset += rd) { + rd = read(stream->fd, stream->buffer + offset, len - offset); if (rd < 0) - return errno; - if (rd == 0) { - free(stream->buffer); - stream->buffer = NULL; - stream->capacity = 0; - return 0; - } - if (rd == len) - return 0; - if (rd > len) - pr_crit("rd > len: %zd > %zu", rd, len); + return -errno; + if (rd == 0) + break; + } - len -= rd; - offset += rd; - } while (true); + return offset; } -/* Full write or error. */ +/* Full write (sleeps if necessary) or error. */ int -full_write(int fd, unsigned char const *buf, size_t len) +stream_full_write(int fd, unsigned char const *buf, size_t len) { ssize_t wr; size_t offset; @@ -79,76 +76,3 @@ full_write(int fd, unsigned char const *buf, size_t len) return 0; } - -/* @value -1 means "EOF". */ -static int -read_ssize_t(struct read_stream *stream, ssize_t *value) -{ - int error; - - error = full_read(stream, 2); - if (error) - return error; - - *value = stream->buffer - ? ((stream->buffer[0] << 8) | stream->buffer[1]) - : -1; - return 0; -} - -static int -write_size_t(int fd, size_t value) -{ - unsigned char buf[2]; - - if (value > 1024) - return ENOSPC; - - buf[0] = (value >> 8) & 0xFF; - buf[1] = value & 0xFF; - - return full_write(fd, buf, 2); -} - -int -read_string(struct read_stream *stream, char **result) -{ - ssize_t len; - int error; - - error = read_ssize_t(stream, &len); - if (error) - return error; - if (len == -1) { - *result = NULL; - return 0; - } - - if (len > stream->capacity) { - do { - stream->capacity *= 2; - } while (len > stream->capacity); - stream->buffer = prealloc(stream->buffer, stream->capacity); - } - - error = full_read(stream, len); - if (error) - return error; - *result = stream->buffer ? pstrdup((char *)stream->buffer) : NULL; - return 0; -} - -int -write_string(int fd, char const *str) -{ - size_t len; - int error; - - len = strlen(str) + 1; - - error = write_size_t(fd, len); - if (error) - return error; - - return full_write(fd, (unsigned char const *) str, len); -} diff --git a/src/stream.h b/src/stream.h index 4c9ed95e..c65ab7e7 100644 --- a/src/stream.h +++ b/src/stream.h @@ -1,21 +1,20 @@ #ifndef SRC_STREAM_H_ #define SRC_STREAM_H_ +#include #include struct read_stream { int fd; unsigned char *buffer; + size_t len; size_t capacity; }; -void read_stream_init(struct read_stream *, int); -void read_stream_close(struct read_stream *); +void rstream_init(struct read_stream *, int, size_t); +int rstream_full_read(struct read_stream *, size_t); +void rstream_close(struct read_stream *, bool); -int full_write(int, unsigned char const *, size_t); - -/* NULL means "EOF". */ -int read_string(struct read_stream *, char **); -int write_string(int, char const *); +int stream_full_write(int, unsigned char const *, size_t); #endif /* SRC_STREAM_H_ */ diff --git a/test/Makefile.am b/test/Makefile.am index b220fecf..82fc5307 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -34,6 +34,10 @@ check_PROGRAMS += asn1_int.test asn1_int_test_SOURCES = asn1/asn1c/INTEGER_t_test.c asn1_int_test_LDADD = ${CHECK_LIBS} +check_PROGRAMS += asn1_stream.test +asn1_stream_test_SOURCES = asn1/asn1c/asn1stream_test.c +asn1_stream_test_LDADD = ${CHECK_LIBS} + check_PROGRAMS += base64.test base64_test_SOURCES = base64_test.c base64_test_LDADD = ${CHECK_LIBS} @@ -92,6 +96,10 @@ check_PROGRAMS += serial.test serial_test_SOURCES = types/serial_test.c serial_test_LDADD = ${CHECK_LIBS} +#check_PROGRAMS += stream.test +#stream_test_SOURCES = stream_test.c +#stream_test_LDADD = ${CHECK_LIBS} + check_PROGRAMS += tal.test tal_test_SOURCES = object/tal_test.c tal_test_LDADD = ${CHECK_LIBS} diff --git a/test/asn1/asn1c/asn1stream_test.c b/test/asn1/asn1c/asn1stream_test.c new file mode 100644 index 00000000..f2c2037a --- /dev/null +++ b/test/asn1/asn1c/asn1stream_test.c @@ -0,0 +1,93 @@ +#include + +#include "alloc.c" +#include "mock.c" +#include "asn1/asn1c/ber_decoder.c" +#include "asn1/asn1c/ber_tlv_length.c" +#include "asn1/asn1c/ber_tlv_tag.c" +#include "asn1/asn1c/constr_CHOICE.c" +#include "asn1/asn1c/constr_SEQUENCE.c" +#include "asn1/asn1c/constr_TYPE.c" +#include "asn1/asn1c/constraints.c" +#include "asn1/asn1c/der_encoder.c" +#include "asn1/asn1c/OCTET_STRING.c" +#include "asn1/asn1c/OPEN_TYPE.c" +#include "asn1/asn1c/RsyncRequest.c" + +MOCK_ABORT_PTR(json_strn_new, json_t, const char *value, size_t len) +MOCK_ABORT_PTR(json_obj_new, json_t, void) +MOCK_ABORT_PTR(json_null, json_t, void) +MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void) + +START_TEST(test_multiple) +{ + struct RsyncRequest src = { 0 }; + struct RsyncRequest *dst = NULL; + unsigned char buf[64] = { 0 }; + asn_enc_rval_t encres; + asn_dec_rval_t decres; + + ck_assert_int_eq(0, OCTET_STRING_fromString(&src.url, "url")); + ck_assert_int_eq(0, OCTET_STRING_fromString(&src.path, "path")); + encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &src, buf, sizeof(buf)); + ck_assert_int_eq(13, encres.encoded); + + ck_assert_int_eq(0, OCTET_STRING_fromString(&src.url, "https://a.b.c/d/e.cer")); + ck_assert_int_eq(0, OCTET_STRING_fromString(&src.path, "tmp/http/a.b.c/d/e.cer")); + encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &src, buf + 13, sizeof(buf) - 13); + ck_assert_int_eq(49, encres.encoded); + + decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, buf, sizeof(buf)); + ck_assert_int_eq(RC_OK, decres.code); + ck_assert_int_eq(13, decres.consumed); + ck_assert_uint_eq(3, dst->url.size); + ck_assert_mem_eq("url", dst->url.buf, 3); + ck_assert_uint_eq(4, dst->path.size); + ck_assert_mem_eq("path", dst->path.buf, 4); + + dst = NULL; + + /* Fragment */ + decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, buf + 13, 13); + ck_assert_int_eq(RC_WMORE, decres.code); + ck_assert_int_eq(13, decres.consumed); + ck_assert_ptr_ne(NULL, dst); + + decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, buf + 26, sizeof(buf) - 26); + ck_assert_int_eq(RC_OK, decres.code); + ck_assert_int_eq(36, decres.consumed); + ck_assert_uint_eq(21, dst->url.size); + ck_assert_mem_eq("https://a.b.c/d/e.cer", dst->url.buf, 21); + ck_assert_uint_eq(22, dst->path.size); + ck_assert_mem_eq("tmp/http/a.b.c/d/e.cer", dst->path.buf, 22); +} +END_TEST + +static Suite * +create_suite(void) +{ + Suite *suite; + TCase *pipes; + + pipes = tcase_create("multiple"); + tcase_add_test(pipes, test_multiple); + + suite = suite_create("asn1 stream"); + suite_add_tcase(suite, pipes); + + return suite; +} + +int +main(void) +{ + SRunner *runner; + int tests_failed; + + runner = srunner_create(create_suite()); + srunner_run_all(runner, CK_NORMAL); + tests_failed = srunner_ntests_failed(runner); + srunner_free(runner); + + return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/test/cache_test.c b/test/cache_test.c index cbaec3ac..d6007c37 100644 --- a/test/cache_test.c +++ b/test/cache_test.c @@ -23,6 +23,19 @@ #include "types/str.c" #include "types/url.c" +/* + * XXX + * + * Check rpps can't override each other: + * rpp1: rsync://domain/mod/rpp1, file rsync://domain/mod/rpp1/a + * rpp2: rsync://domain/mod/rpp1/a/b, file rsync://domain/mod/rpp1/a/b/c + * Check nodes are cleaned up when the file doesn't exist + * Check the file gets cleaned when the node doesn't exist + * If node is an error, what happens to the refresh and fallback? + * What happens if the refresh files somehow get deleted before a commit? + * What happens if the fallback files somehow get deleted before/after a commit? + */ + /* Mocks */ static unsigned int rsync_counter; /* Times the rsync function was called */ @@ -36,7 +49,7 @@ touch_file(char const *dir) } int -rsync_download(char const *url, char const *path) +rsync_queue(char const *url, char const *path) { rsync_counter++; diff --git a/test/common_test.c b/test/common_test.c index c5a49959..ec2b95e7 100644 --- a/test/common_test.c +++ b/test/common_test.c @@ -5,6 +5,8 @@ #include "alloc.c" #include "mock.c" +static const long MS2NS = 1000000L; + START_TEST(test_tt) { char str[FORT_TS_LEN + 1]; @@ -19,16 +21,185 @@ START_TEST(test_tt) } END_TEST +#define init_ts(ts, s, ns) \ + ts.tv_sec = s; \ + ts.tv_nsec = ns + +#define ck_ts(ts, s, ns) \ + ck_assert_int_eq(s, ts.tv_sec); \ + ck_assert_int_eq(ns, ts.tv_nsec) + +START_TEST(test_ts_normalize) +{ + struct timespec ts; + + init_ts(ts, 100, 0); + ts_normalize(&ts); + ck_ts(ts, 100, 0); + + init_ts(ts, 100, 999999999); + ts_normalize(&ts); + ck_ts(ts, 100, 999999999); + + init_ts(ts, 100, 1000 * MS2NS); + ts_normalize(&ts); + ck_ts(ts, 101, 0); + + init_ts(ts, 100, 2500 * MS2NS); + ts_normalize(&ts); + ck_ts(ts, 102, 500 * MS2NS); + + init_ts(ts, 100, -1); + ts_normalize(&ts); + ck_ts(ts, 99, 999999999); + + init_ts(ts, 100, -5000 * MS2NS); + ts_normalize(&ts); + ck_ts(ts, 95, 0); + + init_ts(ts, 100, -5000 * MS2NS - 2); + ts_normalize(&ts); + ck_ts(ts, 94, 999999998); +} +END_TEST + +START_TEST(test_ts_cmp) +{ + struct timespec t1, t2; + + /* Same second */ + init_ts(t1, 100, 500); + init_ts(t2, 100, 500); + ck_assert_int_eq(0, ts_cmp(&t1, &t2)); + ck_assert_int_eq(0, ts_cmp(&t2, &t1)); + + t1.tv_nsec = 1000; + t2.tv_nsec = 9000; + ck_assert(ts_cmp(&t1, &t2) < 0); + ck_assert(ts_cmp(&t2, &t1) > 0); + + t1.tv_nsec = 9000; + t2.tv_nsec = 1000; + ck_assert(ts_cmp(&t1, &t2) > 0); + ck_assert(ts_cmp(&t2, &t1) < 0); + + /* t1 < t2 by second */ + init_ts(t1, 100, 500); + init_ts(t2, 101, 500); + ck_assert(ts_cmp(&t1, &t2) < 0); + ck_assert(ts_cmp(&t2, &t1) > 0); + + t1.tv_nsec = 1000; + t2.tv_nsec = 9000; + ck_assert(ts_cmp(&t1, &t2) < 0); + ck_assert(ts_cmp(&t2, &t1) > 0); + + t1.tv_nsec = 9000; + t2.tv_nsec = 1000; + ck_assert(ts_cmp(&t1, &t2) < 0); + ck_assert(ts_cmp(&t2, &t1) > 0); + + /* t1 > t2 by second */ + init_ts(t1, 100, 500); + init_ts(t2, 99, 500); + ck_assert(ts_cmp(&t1, &t2) > 0); + ck_assert(ts_cmp(&t2, &t1) < 0); + + t1.tv_nsec = 1000; + t2.tv_nsec = 9000; + ck_assert(ts_cmp(&t1, &t2) > 0); + ck_assert(ts_cmp(&t2, &t1) < 0); + + t1.tv_nsec = 9000; + t2.tv_nsec = 1000; + ck_assert(ts_cmp(&t1, &t2) > 0); + ck_assert(ts_cmp(&t2, &t1) < 0); +} +END_TEST + +START_TEST(test_ts_delta) +{ + struct timespec t1, t2; + + init_ts(t1, 100, 0); + init_ts(t2, 100, 2 * MS2NS); + ck_assert_int_eq(2, ts_delta(&t1, &t2)); + ck_assert_int_eq(-2, ts_delta(&t2, &t1)); + + init_ts(t1, 100, 0); + init_ts(t2, 100, -2 * MS2NS); + ck_assert_int_eq(-2, ts_delta(&t1, &t2)); + ck_assert_int_eq(2, ts_delta(&t2, &t1)); + + init_ts(t1, 50, 0); + init_ts(t2, 100, 0); + ck_assert_int_eq(50000, ts_delta(&t1, &t2)); + ck_assert_int_eq(-50000, ts_delta(&t2, &t1)); + + init_ts(t1, -10, 0); + init_ts(t2, 10, 0); + ck_assert_int_eq(20000, ts_delta(&t1, &t2)); + ck_assert_int_eq(-20000, ts_delta(&t2, &t1)); + + init_ts(t1, -10, 1 * MS2NS); /* -9999ms */ + init_ts(t2, 10, 2 * MS2NS); /* 10002ms */ + ck_assert_int_eq(20001, ts_delta(&t1, &t2)); + ck_assert_int_eq(-20001, ts_delta(&t2, &t1)); +} +END_TEST + +START_TEST(test_ts_add) +{ + struct timespec src, dst; + + init_ts(src, 100, 0); /* 100 */ + ts_add(&dst, &src, 100); /* +0.1 */ + ck_ts(dst, 100, 100 * MS2NS); /* 100.1 */ + + ts_add(&dst, &src, -100); /* -0.1 */ + ck_ts(dst, 99, 900 * MS2NS); /* 99.9 */ + + init_ts(src, 100, 50 * MS2NS); /* 100.05 */ + ts_add(&dst, &src, 100); /* +0.1 */ + ck_ts(dst, 100, 150 * MS2NS); /* 100.15 */ + + ts_add(&dst, &src, -100); /* -0.1 */ + ck_ts(dst, 99, 950 * MS2NS); /* 99.95 */ + + init_ts(src, 100, 0); /* 100 */ + ts_add(&dst, &src, 10000); /* +10 */ + ck_ts(dst, 110, 0); /* 110 */ + + ts_add(&dst, &src, -10000); /* -10 */ + ck_ts(dst, 90, 0); /* 90 */ + + init_ts(src, 89, 123409876); /* 89.1234 */ + ts_add(&dst, &src, 98765); /* +98.765 */ + ck_ts(dst, 187, 888409876); /* 187.8884 */ + + init_ts(src, 9, 123409876); /* 9.1234 */ + ts_add(&dst, &src, -12345); /* -12.345 */ + ck_ts(dst, -4, 778409876); /* -3.2216 (-4+0.7784) */ +} +END_TEST + static Suite *create_suite(void) { Suite *suite; - TCase *core; + TCase *timet, *ts; + + timet = tcase_create("time_t"); + tcase_add_test(timet, test_tt); - core = tcase_create("utils"); - tcase_add_test(core, test_tt); + ts = tcase_create("timespec"); + tcase_add_test(ts, test_ts_normalize); + tcase_add_test(ts, test_ts_cmp); + tcase_add_test(ts, test_ts_delta); + tcase_add_test(ts, test_ts_add); suite = suite_create("commons"); - suite_add_tcase(suite, core); + suite_add_tcase(suite, timet); + suite_add_tcase(suite, ts); return suite; } diff --git a/test/resources/rsync/drip-feed-timeout.sh b/test/resources/rsync/drip-feed-timeout.sh new file mode 100755 index 00000000..36e429ce --- /dev/null +++ b/test/resources/rsync/drip-feed-timeout.sh @@ -0,0 +1,22 @@ +#!/bin/sh + +STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 " + +echo "$STR" +sleep 1 # 3999 +echo "$STR" +echo "$STR" 1>&2 +sleep 1 # 2998 +echo "$STR" +sleep 1 # 1997 +echo "$STR" 1>&2 +sleep 1 # The spawner should kill us around the end of this. +echo "$STR" +sleep 1 +echo "$STR" +sleep 1 # Otherwise, check will kill the spawner around here. +echo "$STR" +sleep 1 +echo "$STR" +sleep 1 +echo "$STR" diff --git a/test/resources/rsync/drip-feed.sh b/test/resources/rsync/drip-feed.sh new file mode 100755 index 00000000..c70f73ed --- /dev/null +++ b/test/resources/rsync/drip-feed.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 " + +echo "$STR" +sleep 1 +echo "$STR" +echo "$STR" 1>&2 +sleep 1 +echo "$STR" +sleep 1 +echo "$STR" 1>&2 diff --git a/test/resources/rsync/fast.sh b/test/resources/rsync/fast.sh new file mode 100755 index 00000000..6aaddcf7 --- /dev/null +++ b/test/resources/rsync/fast.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 " + +echo "$STR" 1>&2 +echo "$STR" +echo "$STR" 1>&2 +echo "$STR" diff --git a/test/resources/rsync/stalled-timeout.sh b/test/resources/rsync/stalled-timeout.sh new file mode 100755 index 00000000..e959868f --- /dev/null +++ b/test/resources/rsync/stalled-timeout.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 " + +echo "$STR" +# The spawner has to kill us after 4 seconds. +# (This is what we're testing.) +# If it doesn't, check will fail the unit test during the 6th second. +sleep 20 +echo "$STR" diff --git a/test/resources/rsync/stalled.sh b/test/resources/rsync/stalled.sh new file mode 100755 index 00000000..c246c8d9 --- /dev/null +++ b/test/resources/rsync/stalled.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 " + +echo "$STR" +sleep 3 # Barely dodge the spawner's timeout (4 seconds). +echo "$STR" diff --git a/test/rsync_test.c b/test/rsync_test.c index 3930c205..bb3ff360 100644 --- a/test/rsync_test.c +++ b/test/rsync_test.c @@ -6,16 +6,41 @@ #include "rsync.c" #include "stream.c" + +#include "asn1/asn1c/ber_decoder.c" +#include "asn1/asn1c/ber_tlv_length.c" +#include "asn1/asn1c/ber_tlv_tag.c" +#include "asn1/asn1c/constr_SEQUENCE.c" +#include "asn1/asn1c/constr_TYPE.c" +#include "asn1/asn1c/der_encoder.c" +#include "asn1/asn1c/OCTET_STRING.c" +#include "asn1/asn1c/RsyncRequest.c" + static char const STR64[] = "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "0123456789 \n"; static const size_t STR64LEN = sizeof(STR64) - 1; static char content[1024]; +#define BUFSIZE 128 + /* Mocks */ MOCK(config_get_rsync_program, char const *, "rsync", void) MOCK(config_get_rsync_transfer_timeout, long, 4, void) +MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void) + +MOCK_ABORT_PTR(json_obj_new, json_t, void) +MOCK_ABORT_VOID(json_delete, json_t *json) +MOCK_ABORT_PTR(json_strn_new, json_t, const char *value, size_t len) +MOCK_ABORT_PTR(json_null, json_t, void) + +static asn_dec_rval_t trash; +__MOCK_ABORT(OPEN_TYPE_ber_get, asn_dec_rval_t, trash, + const asn_codec_ctx_t *opt_codec_ctx, const asn_TYPE_descriptor_t *td, + void *sptr, const asn_TYPE_member_t *elm, const void *ptr, size_t size) +MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td, + const void *strt, asn_app_constraint_failed_f *cb, void *key) /* Tests */ @@ -39,268 +64,394 @@ init_content(void) } static void -init_tmp(void) +create_dir(char const *path) { - int res = mkdir("tmp/", 0700); - if (res && errno != EEXIST) - pr_crit("Could not create tmp/: %s", strerror(errno)); + if (mkdir(path, 0700) < 0) + ck_assert_int_eq(EEXIST, errno); } -static void * -rsync_fast(void *arg) +static void +create_file(char const *name, unsigned int kbs) { - int fds[2][2]; - memcpy(fds, arg, sizeof(fds)); - - ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN)); - ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); - ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN)); - ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); + FILE *file; - close(STDERR_WRITE(fds)); - close(STDOUT_WRITE(fds)); - return NULL; + file = fopen(name, "wb"); + ck_assert_ptr_ne(NULL, file); + ck_assert_int_eq(kbs, fwrite(content, sizeof(content), kbs, file)); + ck_assert_int_eq(0, fclose(file)); } -static void * -rsync_stalled(void *arg) +static void +ensure_file_deleted(char const *name) { - int fds[2][2]; - memcpy(fds, arg, sizeof(fds)); - - ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); - - sleep(5); /* The timeout is 4 seconds */ + int ret; + int error; - ck_assert_int_ne(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); + errno = 0; + ret = unlink(name); + error = errno; - close(STDERR_WRITE(fds)); - close(STDOUT_WRITE(fds)); - return NULL; + ck_assert(ret == 0 || error == ENOENT); } -static void * -rsync_drip_feed(void *arg) +static void +create_rsync_sandbox(void) { - int fds[2][2]; - memcpy(fds, arg, sizeof(fds)); - - ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); - sleep(1); - ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); - ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN)); - sleep(1); - ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); - sleep(1); - ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN)); - sleep(2); - ck_assert_int_ne(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN)); - - close(STDERR_WRITE(fds)); - close(STDOUT_WRITE(fds)); - return NULL; + create_dir("tmp"); + create_dir("tmp/rsync"); + create_dir("tmp/rsync/src"); + create_dir("tmp/rsync/dst"); + + create_file("tmp/rsync/src/a", 1); + create_file("tmp/rsync/src/b", 1); + create_file("tmp/rsync/src/c", 1); + + ensure_file_deleted("tmp/rsync/dst/a"); + ensure_file_deleted("tmp/rsync/dst/b"); + ensure_file_deleted("tmp/rsync/dst/c"); } static void -prepare_exhaust(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *)) +diff(char const *file1, char const *file2) { - ck_assert_int_eq(0, pipe(fds[0])); - ck_assert_int_eq(0, pipe(fds[1])); - ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, fds)); + int fd1, fd2; + struct read_stream rs1, rs2; + int read1, read2; + + fd1 = open(file1, O_RDONLY, 0); + ck_assert_int_ne(-1, fd1); + rstream_init(&rs1, fd1, 1024); + + fd2 = open(file2, O_RDONLY, 0); + ck_assert_int_ne(-1, fd2); + rstream_init(&rs2, fd2, 1024); + + do { + read1 = rstream_full_read(&rs1, 1024); + ck_assert_int_ge(read1, 0); + read2 = rstream_full_read(&rs2, 1024); + ck_assert_int_eq(read1, read2); + ck_assert_int_eq(0, memcmp(rs1.buffer, rs2.buffer, read1)); + } while (read1 == 1024); + + rstream_close(&rs1, true); + rstream_close(&rs2, true); } static void -finish_exhaust(pthread_t thread) +ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk, + char const *url, char const *path) { - pthread_join(thread, NULL); + struct rsync_task *task; + + task = LIST_FIRST(tasks); + ck_assert_ptr_ne(NULL, task); + ck_assert_str_eq(url, task->url); + ck_assert_str_eq(path, task->path); + finish_task(task, sk); } -START_TEST(exhaust_read_fds_test_normal) +/* Test RsyncRequest decode, feeding as few bytes as possible every time. */ +START_TEST(test_decode_extremely_fragmented) { - int fds[2][2]; - pthread_t rsync_writer; - - printf("Normal transfer\n"); - prepare_exhaust(fds, &rsync_writer, rsync_fast); - ck_assert_int_eq(0, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds))); - finish_exhaust(rsync_writer); + struct RsyncRequest src, *dst; + unsigned char encoded[BUFSIZE]; + asn_enc_rval_t encres; + asn_dec_rval_t decres; + unsigned int start, end, max; + + ck_assert_int_eq(0, RsyncRequest_init(&src, + "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123456789", + "AbCdEfGhIjKlMnOpQrStUvWxYz1234567890")); + encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &src, + encoded, sizeof(encoded)); + ck_assert_int_gt(encres.encoded, 0); + + printf("size: %zu\n", encres.encoded); + + dst = NULL; + max = 0; + for (start = end = 0; end < encres.encoded - 1; end++) { + printf("Offset %u: Requesting %u bytes...\n", + start, end - start + 1); + decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, + encoded + start, end - start + 1); + ck_assert_int_eq(RC_WMORE, decres.code); + start += decres.consumed; + + printf("Consumed %zu bytes.\n", decres.consumed); + if (decres.consumed > max) + max = decres.consumed; + } + + printf("Minimum required buffer size: %u bytes\n", max); + + decres = ber_decode(&asn_DEF_RsyncRequest, + (void **)&dst, encoded + start, end - start + 1); + ck_assert_int_eq(RC_OK, decres.code); + ck_assert_uint_eq(end - start + 1, decres.consumed); + + ck_assert_int_eq(0, OCTET_STRING_cmp(&src.url, &dst->url)); + ck_assert_int_eq(0, OCTET_STRING_cmp(&src.path, &dst->path)); + + ASN_STRUCT_RESET(asn_DEF_RsyncRequest, &src); + ASN_STRUCT_FREE(asn_DEF_RsyncRequest, dst); } END_TEST -START_TEST(exhaust_read_fds_test_stalled) +static void +encode_request(char const *url, char const *path, unsigned char *buffer) { - int fds[2][2]; - pthread_t rsync_writer; + struct RsyncRequest rr; + asn_enc_rval_t encres; - printf("Stalled transfer\n"); - prepare_exhaust(fds, &rsync_writer, rsync_stalled); - ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds))); - finish_exhaust(rsync_writer); + ck_assert_int_eq(0, RsyncRequest_init(&rr, url, path)); + encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &rr, buffer, BUFSIZE); + ck_assert_int_gt(encres.encoded, 0); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &rr); } -END_TEST -START_TEST(exhaust_read_fds_test_drip) +/* + * Tests messy request queuing; in particular, when a single read() yields less + * than one or multiple of them. + */ +START_TEST(test_read_tasks) { - int fds[2][2]; - pthread_t rsync_writer; - - printf("Drip-feed\n"); - prepare_exhaust(fds, &rsync_writer, rsync_drip_feed); - ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds))); - finish_exhaust(rsync_writer); + unsigned char bytes[BUFSIZE]; + int fds[2]; + struct s2p_socket sk; + struct rsync_tasks tasks; + struct timespec now; + + ck_assert_int_eq(0, nonblock_pipe(fds)); + rstream_init(&sk.rd, fds[0], 256); + sk.wr = -1; + sk.rr = NULL; + LIST_INIT(&tasks); + ts_now(&now); + + printf("Read yields nothing\n"); + ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now)); + + printf("Read yields less than 1 request\n"); + encode_request("111", "2222", bytes); /* 13 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10)); + ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now)); + + ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3)); + ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + + ck_1st_task(&tasks, &sk, "111", "2222"); + ck_assert(LIST_EMPTY(&tasks)); + + printf("Read yields 1 request\n"); + encode_request("3333", "444", bytes); /* 13 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13)); + ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + + ck_1st_task(&tasks, &sk, "3333", "444"); + ck_assert(LIST_EMPTY(&tasks)); + + printf("Read yields 1.5 requests\n"); + encode_request("55", "666", bytes); /* 11 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11)); + encode_request("777", "88", bytes); /* 11 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5)); + ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + + ck_1st_task(&tasks, &sk, "55", "666"); + ck_assert(LIST_EMPTY(&tasks)); + + ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6)); + ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + + ck_1st_task(&tasks, &sk, "777", "88"); + ck_assert(LIST_EMPTY(&tasks)); + + printf("Read yields 2 requests\n"); + encode_request("9999", "00", bytes); /* 12 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12)); + encode_request("aa", "bbbb", bytes); /* 12 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12)); + ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now)); + ck_1st_task(&tasks, &sk, "aa", "bbbb"); + ck_1st_task(&tasks, &sk, "9999", "00"); + ck_assert(LIST_EMPTY(&tasks)); + + printf("Read yields 2.5 requests\n"); + encode_request("cc", "dd", bytes); /* 10 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10)); + encode_request("eeeee", "fffff", bytes); /* 16 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 16)); + encode_request("gggg", "hhhhh", bytes); /* 15 bytes */ + ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3)); + + ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now)); + ck_1st_task(&tasks, &sk, "eeeee", "fffff"); + ck_1st_task(&tasks, &sk, "cc", "dd"); + ck_assert(LIST_EMPTY(&tasks)); + + ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12)); + ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + ck_1st_task(&tasks, &sk, "gggg", "hhhhh"); + ck_assert(LIST_EMPTY(&tasks)); } END_TEST static void -create_file(char const *name, unsigned int kbs) +wait_rsyncs(unsigned int count) { - FILE *file; - unsigned int k; + unsigned int done = 0; - file = fopen(name, "wb"); - ck_assert_ptr_ne(NULL, file); - for (k = 0; k < kbs; k++) - ck_assert_int_eq(sizeof(content), fwrite(content, 1, sizeof(content), file)); - ck_assert_int_eq(0, fclose(file)); + do { + sleep(1); + done += rsync_finished(); + printf("rsyncs done: %u\n", done); + } while (done < count); + + ck_assert_uint_eq(count, done); } -static void -ensure_file_deleted(char const *name) +START_TEST(test_fast_single_rsync) { - int ret; - int error; + rsync_setup("resources/rsync/fast.sh", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); - errno = 0; - ret = unlink(name); - error = errno; - - ck_assert(ret == 0 || error == ENOENT); -} + ck_assert_int_eq(0, rsync_queue("A", "B")); + wait_rsyncs(1); -START_TEST(full_rsync_timeout_test_1kb) -{ - printf("1kb\n"); - create_file("tmp/1kb", 1); - ensure_file_deleted("tmp/1kb-copy"); - ck_assert_int_eq(0, rsync("tmp/1kb", "tmp/1kb-copy")); + rsync_teardown(); } END_TEST -START_TEST(full_rsync_timeout_test_3kb) +START_TEST(test_stalled_single_rsync) { - printf("3kb\n"); - create_file("tmp/3kb", 3); - ensure_file_deleted("tmp/3kb-copy"); - ck_assert_int_eq(0, rsync("tmp/3kb", "tmp/3kb-copy")); + rsync_setup("resources/rsync/stalled.sh", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); + + ck_assert_int_eq(0, rsync_queue("A", "B")); + wait_rsyncs(1); + + rsync_teardown(); } END_TEST -START_TEST(full_rsync_timeout_test_5kb) +START_TEST(test_stalled_single_rsync_timeout) { - printf("5kb\n"); - create_file("tmp/5kb", 5); - ensure_file_deleted("tmp/5kb-copy"); - /* Max speed is 1kbps, timeout is 4 seconds */ - ck_assert_int_eq(EIO, rsync("tmp/5kb", "tmp/5kb-copy")); + rsync_setup("resources/rsync/stalled-timeout.sh", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); + + ck_assert_int_eq(0, rsync_queue("A", "B")); + wait_rsyncs(1); + + rsync_teardown(); } END_TEST -static void -wait_rsyncs(unsigned int expected) +START_TEST(test_dripfeed_single_rsync) { - unsigned int actual = 0; + rsync_setup("resources/rsync/drip-feed.sh", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); - do { - actual += rsync_finished(); - if (expected == actual) { - ck_assert_uint_eq(0, rsync_finished()); - return; - } - ck_assert(expected > actual); - sleep(1); - } while (true); + ck_assert_int_eq(0, rsync_queue("A", "B")); + wait_rsyncs(1); + + rsync_teardown(); } +END_TEST -START_TEST(test_rsync_finished) +START_TEST(test_dripfeed_single_rsync_timeout) { - printf("rsync_finished()\n"); - - create_file("tmp/2kb", 1); - ensure_file_deleted("tmp/2kb-copy"); + rsync_setup("resources/rsync/drip-feed-timeout.sh", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); - rsync_setup(); - - ck_assert_int_eq(0, rsync_download("tmp/2kb", "tmp/2kb-copy")); - ck_assert_int_eq(0, rsync_finished()); + ck_assert_int_eq(0, rsync_queue("A", "B")); wait_rsyncs(1); rsync_teardown(); } END_TEST -START_TEST(test_multi_rsync) +START_TEST(test_no_rsyncs) { - printf("simultaneous rsyncs\n"); + rsync_setup("rsync", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); - create_file("tmp/i1", 1); - ensure_file_deleted("tmp/i1-copy"); - create_file("tmp/i2", 1); - ensure_file_deleted("tmp/i2-copy"); - create_file("tmp/i3", 1); - ensure_file_deleted("tmp/i3-copy"); + sleep(2); - rsync_setup(); + rsync_teardown(); +} +END_TEST - ck_assert_int_eq(0, rsync_download("tmp/i1", "tmp/")); - ck_assert_int_eq(0, rsync_finished()); - ck_assert_int_eq(0, rsync_download("tmp/i2", "tmp/i2-copy")); - ck_assert_int_eq(0, rsync_download("tmp/i3", "tmp/i3-copy")); +START_TEST(test_simultaneous_rsyncs) +{ + create_rsync_sandbox(); + /* Note... --bwlimit does not seem to exist in openrsync */ + rsync_setup("rsync", "--bwlimit=1K", "-vvv", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); + + ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/a", "tmp/rsync/dst/a")); + ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/b", "tmp/rsync/dst/b")); + ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/c", "tmp/rsync/dst/c")); wait_rsyncs(3); rsync_teardown(); + + diff("tmp/rsync/src/a", "tmp/rsync/dst/a"); + diff("tmp/rsync/src/b", "tmp/rsync/dst/b"); + diff("tmp/rsync/src/c", "tmp/rsync/dst/c"); } END_TEST -static Suite *create_suite(void) +static Suite * +create_suite(void) { Suite *suite; - TCase *pipes, *spawner; + TCase *p2s, *s2r, *spawner; + + p2s = tcase_create("parent-spawner channel"); + tcase_add_test(p2s, test_decode_extremely_fragmented); + tcase_add_test(p2s, test_read_tasks); - pipes = tcase_create("pipes"); - tcase_add_test(pipes, exhaust_read_fds_test_normal); - tcase_add_test(pipes, exhaust_read_fds_test_stalled); - tcase_add_test(pipes, exhaust_read_fds_test_drip); - tcase_add_test(pipes, full_rsync_timeout_test_1kb); - tcase_add_test(pipes, full_rsync_timeout_test_3kb); - tcase_add_test(pipes, full_rsync_timeout_test_5kb); - tcase_set_timeout(pipes, 6); + s2r = tcase_create("spawner-rsync channel"); + tcase_add_test(p2s, test_fast_single_rsync); + tcase_add_test(p2s, test_stalled_single_rsync); + tcase_add_test(p2s, test_stalled_single_rsync_timeout); + tcase_add_test(p2s, test_dripfeed_single_rsync); + tcase_add_test(p2s, test_dripfeed_single_rsync_timeout); + tcase_set_timeout(p2s, 6); spawner = tcase_create("spawner"); - tcase_add_test(spawner, test_rsync_finished); - tcase_add_test(spawner, test_multi_rsync); + tcase_add_test(spawner, test_no_rsyncs); + tcase_add_test(spawner, test_simultaneous_rsyncs); tcase_set_timeout(spawner, 6); suite = suite_create("rsync"); - suite_add_tcase(suite, pipes); + suite_add_tcase(suite, p2s); + suite_add_tcase(suite, s2r); suite_add_tcase(suite, spawner); return suite; } -int main(void) +int +main(void) { - Suite *suite; SRunner *runner; int tests_failed; printf("This test needs to exhaust some timeouts. Please be patient.\n"); disable_sigpipe(); init_content(); - init_tmp(); - - suite = create_suite(); - runner = srunner_create(suite); + runner = srunner_create(create_suite()); srunner_run_all(runner, CK_NORMAL); tests_failed = srunner_ntests_failed(runner); srunner_free(runner); diff --git a/test/stream_test.c b/test/stream_test.c new file mode 100644 index 00000000..0d97bc4b --- /dev/null +++ b/test/stream_test.c @@ -0,0 +1,249 @@ +#include +#include + +#include "alloc.c" +#include "mock.c" +#include "stream.c" + +#define DO_WRITE(fd, str) ck_assert_int_eq(0, stream_wr_str(fd, str)) +#define CK_READ(stm, str) do { \ + ck_assert_int_eq(0, rstream_read_str(&stm, &rcvd)); \ + ck_assert_str_eq(str, rcvd); \ + free(rcvd); \ + } while (0) +#define CK_READ_END(stm) do { \ + ck_assert_int_eq(0, rstream_read_str(&stm, &rcvd)); \ + ck_assert_ptr_eq(NULL, rcvd); \ + } while (0); + +static char large[1025]; + +START_TEST(test_string_simple) +{ + int fds[2]; + struct read_stream stream; + char *rcvd; + + ck_assert_int_eq(0, pipe(fds)); + rstream_init(&stream, fds[0], 128); + + DO_WRITE(fds[1], "Lorem ipsum dolor sit amet"); + CK_READ(stream, "Lorem ipsum dolor sit amet"); + close(fds[1]); + CK_READ_END(stream); + + rstream_close(&stream, true); +} +END_TEST + +START_TEST(test_string_simple_alt) +{ + int fds[2]; + struct read_stream stream; + char *rcvd; + + ck_assert_int_eq(0, pipe(fds)); + rstream_init(&stream, fds[0], 128); + + DO_WRITE(fds[1], "Lorem ipsum dolor sit amet"); + close(fds[1]); + CK_READ(stream, "Lorem ipsum dolor sit amet"); + CK_READ_END(stream); + + rstream_close(&stream, true); +} +END_TEST + +START_TEST(test_string_multiple) +{ + int fds[2]; + struct read_stream stream; + char *rcvd; + + ck_assert_int_eq(0, pipe(fds)); + rstream_init(&stream, fds[0], 128); + + /* Write 3 separate strings, close immediately */ + DO_WRITE(fds[1], "Lorem ipsum dolor sit amet"); + DO_WRITE(fds[1], "consectetur adipiscing elit"); + DO_WRITE(fds[1], "Curabitur scelerisque tortor est"); + close(fds[1]); + + /* Read each string separately, then check read end */ + CK_READ(stream, "Lorem ipsum dolor sit amet"); + CK_READ(stream, "consectetur adipiscing elit"); + CK_READ(stream, "Curabitur scelerisque tortor est"); + CK_READ_END(stream); + + rstream_close(&stream, true); +} +END_TEST + +START_TEST(test_string_capacity_growth) +{ + int fds[2]; + struct read_stream stream; + char *rcvd; + + ck_assert_int_eq(0, pipe(fds)); + rstream_init(&stream, fds[0], 16); + + DO_WRITE(fds[1], "Lorem ipsum dolor sit amet, consectetur adipiscing elit."); + DO_WRITE(fds[1], "Curabitur scelerisque tortor est, ut lacinia eros rutrum id. Duis sed metus id nisl suscipit facilisis."); + DO_WRITE(fds[1], "Mauris at varius libero."); + close(fds[1]); + CK_READ(stream, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."); + ck_assert_uint_eq(64, stream.capacity); + CK_READ(stream, "Curabitur scelerisque tortor est, ut lacinia eros rutrum id. Duis sed metus id nisl suscipit facilisis."); + ck_assert_uint_eq(128, stream.capacity); + CK_READ(stream, "Mauris at varius libero."); + ck_assert_uint_eq(128, stream.capacity); + CK_READ_END(stream); + + rstream_close(&stream, true); +} +END_TEST + +START_TEST(test_string_max_size) +{ + int fds[2]; + struct read_stream stream; + char *rcvd; + + /* strlen(src) = 1023: Success */ + + memset(large, 'a', 1023); + large[1023] = 0; + + ck_assert_int_eq(0, pipe(fds)); + rstream_init(&stream, fds[0], 256); + + DO_WRITE(fds[1], large); + close(fds[1]); + CK_READ(stream, large); + CK_READ_END(stream); + + rstream_close(&stream, true); + + /* strlen(src) = 1024: Fail */ + + large[1023] = 'b'; + large[1024] = 0; + + ck_assert_int_eq(0, pipe(fds)); + rstream_init(&stream, fds[0], 256); + + ck_assert_int_eq(EBADLEN, stream_wr_str(fds[1], large)); + close(fds[1]); + CK_READ_END(stream); + + rstream_close(&stream, true); +} +END_TEST + +static void * +consume_fragments(void *arg) +{ + struct read_stream stream; + char *rcvd; + + rstream_init(&stream, *((int *)arg), 128); + + CK_READ(stream, "1234567890abcdefghijABCDEFGHI"); + CK_READ_END(stream); + rstream_close(&stream, true); + + return NULL; +} + +START_TEST(test_string_fragmented) +{ + pthread_t reader; + int fds[2]; + + ck_assert_int_eq(0, pipe(fds)); + ck_assert_int_eq(0, pthread_create(&reader, NULL, consume_fragments, &fds[0])); + + ck_assert_int_eq(0, write_size_t(fds[1], 30)); + ck_assert_int_eq(0, full_write(fds[1], (unsigned char *)"1234567890", 10)); + sleep(1); + ck_assert_int_eq(0, full_write(fds[1], (unsigned char *)"abcdefghij", 10)); + sleep(1); + ck_assert_int_eq(0, full_write(fds[1], (unsigned char *)"ABCDEFGHI", 10)); + close(fds[1]); + + ck_assert_int_eq(0, pthread_join(reader, NULL)); +} +END_TEST + +static void * +consume_many(void *arg) +{ + struct read_stream stream; + unsigned int i; + char *rcvd; + + rstream_init(&stream, *((int *)arg), 256); + + for (i = 0; i < 16384; i++) + CK_READ(stream, large); + CK_READ_END(stream); + rstream_close(&stream, true); + + return NULL; +} + +START_TEST(test_string_many) +{ + int fds[2]; + pthread_t reader; + unsigned int i; + + memset(large, 'a', 1023); + large[1023] = 0; + + ck_assert_int_eq(0, pipe(fds)); + ck_assert_int_eq(0, pthread_create(&reader, NULL, consume_many, &fds[0])); + + for (i = 0; i < 16384; i++) + DO_WRITE(fds[1], large); + close(fds[1]); + + ck_assert_int_eq(0, pthread_join(reader, NULL)); +} +END_TEST + +static Suite * +create_suite(void) +{ + Suite *suite; + TCase *string; + + string = tcase_create("string"); + tcase_add_test(string, test_string_simple); + tcase_add_test(string, test_string_simple_alt); + tcase_add_test(string, test_string_multiple); + tcase_add_test(string, test_string_capacity_growth); + tcase_add_test(string, test_string_max_size); + tcase_add_test(string, test_string_fragmented); + tcase_add_test(string, test_string_many); + + suite = suite_create("stream"); + suite_add_tcase(suite, string); + + return suite; +} + +int +main(void) +{ + SRunner *runner; + int failures; + + runner = srunner_create(create_suite()); + srunner_run_all(runner, CK_NORMAL); + failures = srunner_ntests_failed(runner); + srunner_free(runner); + + return (failures == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +}