From: Alberto Leiva Popper Date: Thu, 10 Apr 2025 22:14:51 +0000 (-0600) Subject: Add --rsync.max X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=02d3ee7b15cdbf86d5d33a051b4c895b59ebe145;p=thirdparty%2FFORT-validator.git Add --rsync.max Maximum simultaneous forked rsyncs. --- diff --git a/src/config.c b/src/config.c index 31495f48..58ca8c28 100644 --- a/src/config.c +++ b/src/config.c @@ -87,6 +87,8 @@ struct rpki_config { bool enabled; /* Protocol preference; compared to http.priority */ unsigned int priority; + /* Maximum simultaneous rsyncs */ + unsigned int max; /* Deprecated; does nothing. */ char *strategy; unsigned int transfer_timeout; @@ -446,6 +448,14 @@ static const struct option_field options[] = { .type = >_bool, .offset = offsetof(struct rpki_config, rsync.enabled), .doc = "Enables RSYNC execution", + }, { + .id = 3001, + .name = "rsync.max", + .type = >_uint, + .offset = offsetof(struct rpki_config, rsync.max), + .doc = "Maximum simultaneous rsyncs.", + .min = 0, + .max = UINT_MAX, }, { .id = 3001, .name = "rsync.priority", @@ -951,6 +961,7 @@ set_default_values(void) rpki_config.rsync.enabled = true; rpki_config.rsync.priority = 50; + rpki_config.rsync.max = 1; rpki_config.rsync.strategy = pstrdup(""); rpki_config.rsync.transfer_timeout = 900; rpki_config.rsync.program = pstrdup("rsync"); @@ -1381,8 +1392,14 @@ config_get_rsync_priority(void) return rpki_config.rsync.priority; } +unsigned int +config_rsync_max(void) +{ + return rpki_config.rsync.max; +} + long -config_get_rsync_transfer_timeout(void) +config_rsync_timeout(void) { return rpki_config.rsync.transfer_timeout; } diff --git a/src/config.h b/src/config.h index 8a959408..a9caee0a 100644 --- a/src/config.h +++ b/src/config.h @@ -45,7 +45,8 @@ char const *config_get_http_ca_path(void); unsigned int config_get_rrdp_delta_threshold(void); bool config_get_rsync_enabled(void); unsigned int config_get_rsync_priority(void); -long config_get_rsync_transfer_timeout(void); +unsigned int config_rsync_max(void); +long config_rsync_timeout(void); char const *config_get_rsync_program(void); bool config_get_http_enabled(void); unsigned int config_get_http_priority(void); diff --git a/src/rsync.c b/src/rsync.c index d50d01fd..772fc7a9 100644 --- a/src/rsync.c +++ b/src/rsync.c @@ -63,7 +63,14 @@ struct rsync_task { LIST_ENTRY(rsync_task) lh; }; -LIST_HEAD(rsync_tasks, rsync_task); +LIST_HEAD(rsync_task_list, rsync_task); + +struct rsync_tasks { + struct rsync_task_list active; + int a; /* total active */ + + struct rsync_task_list queued; +}; #ifndef LIST_FOREACH_SAFE #define LIST_FOREACH_SAFE(var, ls, lh, tmp) \ @@ -84,7 +91,7 @@ void_task(struct rsync_task *task, struct s2p_socket *s2p) free(task); if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) { - pr_op_err("Cannot message parent process: %s", strerror(errno)); + pr_op_err(RSP "Cannot message parent process: %s", strerror(errno)); close(s2p->wr); s2p->wr = -1; @@ -94,9 +101,11 @@ void_task(struct rsync_task *task, struct s2p_socket *s2p) } static void -finish_task(struct rsync_task *task, struct s2p_socket *s2p) +finish_task(struct rsync_tasks *tasks, struct rsync_task *task, + struct s2p_socket *s2p) { LIST_REMOVE(task, lh); + tasks->a--; void_task(task, s2p); } @@ -109,7 +118,7 @@ init_pfd(struct pollfd *pfd, int fd) } static struct pollfd * -create_pfds(int request_fd, struct rsync_tasks *tasks, size_t tn) +create_pfds(int request_fd, struct rsync_task_list *tasks, size_t tn) { struct pollfd *pfds; struct rsync_task *task; @@ -134,13 +143,13 @@ create_pipes(int fds[2][2]) if (pipe(fds[0]) < 0) { error = errno; - pr_op_err_st("Piping rsync stderr: %s", strerror(error)); + pr_op_err_st(RSP "Piping rsync stderr: %s", strerror(error)); return error; } if (pipe(fds[1]) < 0) { error = errno; - pr_op_err_st("Piping rsync stdout: %s", strerror(error)); + pr_op_err_st(RSP "Piping rsync stdout: %s", strerror(error)); close(fds[0][0]); close(fds[0][1]); return error; @@ -216,7 +225,7 @@ fork_rsync(struct rsync_task *task) task->pid = fork(); if (task->pid < 0) { error = errno; - pr_op_err_st("Couldn't spawn the rsync process: %s", + pr_op_err_st(RSP "Couldn't spawn the rsync process: %s", strerror(error)); close(STDERR_READ(fork_fds)); close(STDOUT_READ(fork_fds)); @@ -237,8 +246,23 @@ fork_rsync(struct rsync_task *task) return 0; } -static unsigned int -start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks, +static void +activate_task(struct rsync_tasks *tasks, struct rsync_task *task, + struct s2p_socket *s2p, struct timespec *now) +{ + ts_add(&task->expiration, now, 1000 * config_rsync_timeout()); + + if (fork_rsync(task) != 0) { + void_task(task, s2p); + return; + } + + LIST_INSERT_HEAD(&tasks->active, task, lh); + tasks->a++; +} + +static void +post_task(struct s2p_socket *s2p, struct rsync_tasks *tasks, struct timespec *now) { struct rsync_task *task; @@ -246,19 +270,17 @@ start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks, task = pzalloc(sizeof(struct rsync_task)); task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size); task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size); - ts_add(&task->expiration, now, 1000 * config_get_rsync_transfer_timeout()); - if (fork_rsync(task) != 0) { - void_task(task, s2p); - return 0; + if (tasks->a >= config_rsync_max()) { + LIST_INSERT_HEAD(&tasks->queued, task, lh); + pr_op_debug(RSP "Queued new task."); + } else { + activate_task(tasks, task, s2p, now); + pr_op_debug(RSP "Got new task: %d", task->pid); } - - LIST_INSERT_HEAD(tasks, task, lh); - pr_val_debug(RSP "Got new task: %d", task->pid); - return 1; } -static unsigned int +static void read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks, struct timespec *now) { @@ -266,11 +288,9 @@ read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks, ssize_t consumed; size_t offset; asn_dec_rval_t decres; - unsigned int t; int error; in = &s2p->rd; - t = 0; do { consumed = read(in->fd, in->buffer + in->len, @@ -279,11 +299,11 @@ read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks, error = errno; if (error != EAGAIN && error != EWOULDBLOCK) rstream_close(in, true); - return t; + return; } if (consumed == 0) { /* EOS */ rstream_close(in, true); - return t; + return; } in->len += consumed; @@ -295,14 +315,14 @@ read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks, offset += decres.consumed; switch (decres.code) { case RC_OK: - t += start_task(s2p, tasks, now); + post_task(s2p, tasks, now); ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr); break; case RC_WMORE: goto break_for; case RC_FAIL: rstream_close(in, true); - return t; + return; } } @@ -313,30 +333,24 @@ break_for: if (offset > in->len) } while (true); } -/* Returns number of tasks created */ -static int +static void handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd, struct rsync_tasks *tasks, struct timespec *now) { if (s2p->rd.fd == -1) - return 0; + return; if (pfd->revents & POLLNVAL) { - pr_val_err(RSP "bad parent fd: %i", pfd->fd); + pr_op_err(RSP "bad parent fd: %i", pfd->fd); rstream_close(&s2p->rd, false); - return 0; - } - if (pfd->revents & POLLERR) { - pr_val_err(RSP "Generic error during parent fd poll."); + } else if (pfd->revents & POLLERR) { + pr_op_err(RSP "Generic error during parent fd poll."); rstream_close(&s2p->rd, true); - return 0; - } - - if (pfd->revents & (POLLIN | POLLHUP)) - return read_tasks(s2p, tasks, now); - return 0; + } else if (pfd->revents & (POLLIN | POLLHUP)) { + read_tasks(s2p, tasks, now); + } } static void @@ -358,9 +372,9 @@ log_buffer(char const *buffer, ssize_t read, bool is_error) continue; } if (is_error) - pr_val_err("[RSYNC exec] %s", cur); + pr_op_err("[RSYNC exec] %s", cur); else - pr_val_debug("[RSYNC exec] %s", cur); + pr_op_debug("[RSYNC exec] %s", cur); cur = tmp + 1; } free(cpy); @@ -381,7 +395,7 @@ log_rsync_output(struct pollfd *pfd, size_t p) error = errno; if (error == EINTR) return 0; /* Dunno; retry */ - pr_val_err("rsync buffer read error: %s", strerror(error)); + pr_op_err(RSP "rsync buffer read error: %s", strerror(error)); goto down; /* Error */ } @@ -398,17 +412,17 @@ static int handle_rsync_fd(struct pollfd *pfd, size_t p) { if (pfd->fd == -1) { - pr_val_debug(RSP "File descriptor already closed."); + pr_op_debug(RSP "File descriptor already closed."); return 1; } if (pfd->revents & POLLNVAL) { - pr_val_err(RSP "rsync bad fd: %i", pfd->fd); + pr_op_err(RSP "rsync bad fd: %i", pfd->fd); return 1; } if (pfd->revents & POLLERR) { - pr_val_err(RSP "Generic error during rsync poll."); + pr_op_err(RSP "Generic error during rsync poll."); close(pfd->fd); return 1; } @@ -435,7 +449,7 @@ again: status = 0; if (WIFEXITED(status)) { /* Happy path (but also sad path sometimes) */ error = WEXITSTATUS(status); - pr_val_debug("%s ended. Result: %d", name, error); + pr_op_debug("%s ended. Result: %d", name, error); return error ? EIO : 0; } @@ -474,10 +488,26 @@ kill_subprocess(struct rsync_task *task) kill(task->pid, SIGTERM); } +static void +activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p, + struct timespec *now) +{ + struct rsync_task *task; + + task = LIST_FIRST(&tasks->queued); + if (task == NULL) + return; + + pr_op_debug(RSP "Activating queued task %s -> %s.", + task->url, task->path); + LIST_REMOVE(task, lh); + activate_task(tasks, task, s2p, now); +} + /* Returns true if the task died. */ static bool -maybe_expire(struct timespec *now, struct rsync_task *task, - struct s2p_socket *s2p) +maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task, + struct s2p_socket *s2p, struct timespec *now) { struct timespec epoch; @@ -488,7 +518,9 @@ maybe_expire(struct timespec *now, struct rsync_task *task, pr_op_debug(RSP "Task %d ran out of time.", task->pid); kill_subprocess(task); wait_subprocess("rsync", task->pid); - finish_task(task, s2p); + finish_task(tasks, task, s2p); + activate_queued(tasks, s2p, now); + return true; } @@ -508,7 +540,6 @@ spawner_run( struct rsync_tasks tasks; struct rsync_task *task, *tmp; - int task_count; int error; @@ -516,8 +547,9 @@ spawner_run( s2p.wr = response_fd; s2p.rr = NULL; - LIST_INIT(&tasks); - task_count = 0; + LIST_INIT(&tasks.active); + LIST_INIT(&tasks.queued); + tasks.a = 0; error = 0; ts_now(&now); @@ -528,17 +560,17 @@ spawner_run( * odd: stdouts * even > 0: stderrs */ - pfds = create_pfds(s2p.rd.fd, &tasks, task_count); - pfds_count = 2 * task_count + 1; + pfds = create_pfds(s2p.rd.fd, &tasks.active, tasks.a); + pfds_count = 2 * tasks.a + 1; expiration.tv_sec = now.tv_sec + 10; expiration.tv_nsec = now.tv_nsec; - LIST_FOREACH(task, &tasks, lh) + LIST_FOREACH(task, &tasks.active, lh) if ((ts_cmp(&now, &task->expiration) < 0) && (ts_cmp(&task->expiration, &expiration) < 0)) expiration = task->expiration; timeout = ts_delta(&now, &expiration); - pr_val_debug(RSP "Timeout decided: %dms", timeout); + pr_op_debug(RSP "Timeout decided: %dms", timeout); events = poll(pfds, pfds_count, timeout); if (events < 0) { error = errno; @@ -549,50 +581,52 @@ spawner_run( ts_now(&now); if (events == 0) { /* Timeout */ - pr_val_debug(RSP "Woke up because of timeout."); - LIST_FOREACH_SAFE(task, &tasks, lh, tmp) - task_count -= maybe_expire(&now, task, &s2p); + pr_op_debug(RSP "Woke up because of timeout."); + LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) + maybe_expire(&tasks, task, &s2p, &now); goto cont; } - pr_val_debug(RSP "Woke up because of input."); + pr_op_debug(RSP "Woke up because of input."); p = 1; - LIST_FOREACH_SAFE(task, &tasks, lh, tmp) { - if (maybe_expire(&now, task, &s2p)) { - task_count--; + LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) { + if (maybe_expire(&tasks, task, &s2p, &now)) continue; - } if (handle_rsync_fd(&pfds[p], p)) { - pr_val_debug(RSP "Task %d: Stdout closed.", + pr_op_debug(RSP "Task %d: Stdout closed.", task->pid); task->stdoutfd = -1; } p++; if (handle_rsync_fd(&pfds[p], p)) { - pr_val_debug(RSP "Task %d: Stderr closed.", + pr_op_debug(RSP "Task %d: Stderr closed.", task->pid); task->stderrfd = -1; } p++; if (task->stdoutfd == -1 && task->stderrfd == -1) { - pr_val_debug(RSP "Both stdout & stderr are closed; ending task %d.", + pr_op_debug(RSP "Both stdout & stderr are closed; ending task %d.", task->pid); wait_subprocess("rsync", task->pid); - finish_task(task, &s2p); - task_count--; + finish_task(&tasks, task, &s2p); + activate_queued(&tasks, &s2p, &now); } } - task_count += handle_parent_fd(&s2p, &pfds[0], &tasks, &now); + handle_parent_fd(&s2p, &pfds[0], &tasks, &now); cont: free(pfds); - } while ((s2p.rd.fd != -1 || task_count > 0)); - pr_val_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up..."); + } while ((s2p.rd.fd != -1 || tasks.a > 0)); + pr_op_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up..."); - LIST_FOREACH_SAFE(task, &tasks, lh, tmp) { + LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) { kill_subprocess(task); wait_subprocess("rsync", task->pid); - finish_task(task, &s2p); + finish_task(&tasks, task, &s2p); + } + LIST_FOREACH_SAFE(task, &tasks.queued, lh, tmp) { + LIST_REMOVE(task, lh); + void_task(task, &s2p); } rstream_close(&s2p.rd, true); diff --git a/test/mock.c b/test/mock.c index 37818a83..d214670e 100644 --- a/test/mock.c +++ b/test/mock.c @@ -2,6 +2,7 @@ #include #include +#include #include "config.h" #include "incidence.h" #include "log.h" @@ -9,17 +10,28 @@ /* Some core functions, as linked from unit tests. */ -/* CFLAGS=-DPRINT_PRS make check */ -#ifdef PRINT_PRS +#if 0 + +static void +print_monotime(void) +{ + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) + pr_crit("clock_gettime() returned '%s'", strerror(errno)); + printf("%ld.%.3ld ", now.tv_sec, now.tv_nsec / 1000000); +} + #define MOCK_PRINT(color) \ do { \ va_list args; \ printf(color); \ + print_monotime(); \ va_start(args, format); \ vfprintf(stdout, format, args); \ va_end(args); \ printf(PR_COLOR_RST "\n"); \ } while (0) + #else #define MOCK_PRINT(color) #endif diff --git a/test/resources/rsync/queued.sh b/test/resources/rsync/queued.sh new file mode 100755 index 00000000..eb866c4a --- /dev/null +++ b/test/resources/rsync/queued.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +echo "rsync begins: $1 -> $2" +sleep 2 +echo "rsync ends: $1 -> $2" diff --git a/test/resources/rsync/simultaneous.sh b/test/resources/rsync/simultaneous.sh new file mode 100755 index 00000000..56322a0b --- /dev/null +++ b/test/resources/rsync/simultaneous.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +echo "rsync begins: $1 -> $2" +sleep 1 +echo "rsync ends: $1 -> $2" diff --git a/test/rsync_test.c b/test/rsync_test.c index bb3ff360..1676bbd9 100644 --- a/test/rsync_test.c +++ b/test/rsync_test.c @@ -6,7 +6,6 @@ #include "rsync.c" #include "stream.c" - #include "asn1/asn1c/ber_decoder.c" #include "asn1/asn1c/ber_tlv_length.c" #include "asn1/asn1c/ber_tlv_tag.c" @@ -27,7 +26,8 @@ static char content[1024]; /* Mocks */ MOCK(config_get_rsync_program, char const *, "rsync", void) -MOCK(config_get_rsync_transfer_timeout, long, 4, void) +MOCK_UINT(config_rsync_max, 3, void) +MOCK(config_rsync_timeout, long, 4, void) MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void) MOCK_ABORT_PTR(json_obj_new, json_t, void) @@ -44,113 +44,6 @@ MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td, /* Tests */ -static void -disable_sigpipe(void) -{ - struct sigaction action = { .sa_handler = SIG_IGN }; - if (sigaction(SIGPIPE, &action, NULL) == -1) - pr_crit("Cannot disable SIGPIPE: %s", strerror(errno)); -} - -static void -init_content(void) -{ - size_t i; - - if (sizeof(content) % STR64LEN != 0) - pr_crit("content's length isn't divisible by str64's length"); - for (i = 0; i < (sizeof(content) / STR64LEN); i++) - memcpy(content + 64 * i, STR64, STR64LEN); -} - -static void -create_dir(char const *path) -{ - if (mkdir(path, 0700) < 0) - ck_assert_int_eq(EEXIST, errno); -} - -static void -create_file(char const *name, unsigned int kbs) -{ - FILE *file; - - file = fopen(name, "wb"); - ck_assert_ptr_ne(NULL, file); - ck_assert_int_eq(kbs, fwrite(content, sizeof(content), kbs, file)); - ck_assert_int_eq(0, fclose(file)); -} - -static void -ensure_file_deleted(char const *name) -{ - int ret; - int error; - - errno = 0; - ret = unlink(name); - error = errno; - - ck_assert(ret == 0 || error == ENOENT); -} - -static void -create_rsync_sandbox(void) -{ - create_dir("tmp"); - create_dir("tmp/rsync"); - create_dir("tmp/rsync/src"); - create_dir("tmp/rsync/dst"); - - create_file("tmp/rsync/src/a", 1); - create_file("tmp/rsync/src/b", 1); - create_file("tmp/rsync/src/c", 1); - - ensure_file_deleted("tmp/rsync/dst/a"); - ensure_file_deleted("tmp/rsync/dst/b"); - ensure_file_deleted("tmp/rsync/dst/c"); -} - -static void -diff(char const *file1, char const *file2) -{ - int fd1, fd2; - struct read_stream rs1, rs2; - int read1, read2; - - fd1 = open(file1, O_RDONLY, 0); - ck_assert_int_ne(-1, fd1); - rstream_init(&rs1, fd1, 1024); - - fd2 = open(file2, O_RDONLY, 0); - ck_assert_int_ne(-1, fd2); - rstream_init(&rs2, fd2, 1024); - - do { - read1 = rstream_full_read(&rs1, 1024); - ck_assert_int_ge(read1, 0); - read2 = rstream_full_read(&rs2, 1024); - ck_assert_int_eq(read1, read2); - ck_assert_int_eq(0, memcmp(rs1.buffer, rs2.buffer, read1)); - } while (read1 == 1024); - - rstream_close(&rs1, true); - rstream_close(&rs2, true); -} - -static void -ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk, - char const *url, char const *path) -{ - struct rsync_task *task; - - task = LIST_FIRST(tasks); - ck_assert_ptr_ne(NULL, task); - ck_assert_str_eq(url, task->url); - ck_assert_str_eq(path, task->path); - finish_task(task, sk); -} - /* Test RsyncRequest decode, feeding as few bytes as possible every time. */ START_TEST(test_decode_extremely_fragmented) { @@ -199,6 +92,28 @@ START_TEST(test_decode_extremely_fragmented) } END_TEST +static void +ck_no_tasks(struct rsync_tasks *tasks) +{ + ck_assert_int_eq(0, tasks->a); + ck_assert(LIST_EMPTY(&tasks->active)); + ck_assert(LIST_EMPTY(&tasks->queued)); +} + +static void +ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk, + char const *url, char const *path) +{ + struct rsync_task *task; + + task = LIST_FIRST(&tasks->active); + ck_assert_ptr_ne(NULL, task); + ck_assert_str_eq(url, task->url); + ck_assert_str_eq(path, task->path); + ck_assert(tasks->a > 0); + finish_task(tasks, task, sk); +} + static void encode_request(char const *url, char const *path, unsigned char *buffer) { @@ -227,56 +142,60 @@ START_TEST(test_read_tasks) rstream_init(&sk.rd, fds[0], 256); sk.wr = -1; sk.rr = NULL; - LIST_INIT(&tasks); + LIST_INIT(&tasks.active); + LIST_INIT(&tasks.queued); + tasks.a = 0; ts_now(&now); printf("Read yields nothing\n"); - ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); + ck_no_tasks(&tasks); printf("Read yields less than 1 request\n"); encode_request("111", "2222", bytes); /* 13 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10)); - ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); + ck_no_tasks(&tasks); ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3)); - ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "111", "2222"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); printf("Read yields 1 request\n"); encode_request("3333", "444", bytes); /* 13 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13)); - ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "3333", "444"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); printf("Read yields 1.5 requests\n"); encode_request("55", "666", bytes); /* 11 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11)); encode_request("777", "88", bytes); /* 11 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5)); - ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "55", "666"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6)); - ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "777", "88"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); printf("Read yields 2 requests\n"); encode_request("9999", "00", bytes); /* 12 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12)); encode_request("aa", "bbbb", bytes); /* 12 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12)); - ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "aa", "bbbb"); ck_1st_task(&tasks, &sk, "9999", "00"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); printf("Read yields 2.5 requests\n"); encode_request("cc", "dd", bytes); /* 10 bytes */ @@ -286,30 +205,36 @@ START_TEST(test_read_tasks) encode_request("gggg", "hhhhh", bytes); /* 15 bytes */ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3)); - ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "eeeee", "fffff"); ck_1st_task(&tasks, &sk, "cc", "dd"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12)); - ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now)); + read_tasks(&sk, &tasks, &now); ck_1st_task(&tasks, &sk, "gggg", "hhhhh"); - ck_assert(LIST_EMPTY(&tasks)); + ck_no_tasks(&tasks); } END_TEST +/* Makes sure @count rsyncs finish after roughly @millis milliseconds. */ static void -wait_rsyncs(unsigned int count) +wait_rsyncs(unsigned int count, unsigned int millis) { - unsigned int done = 0; - - do { - sleep(1); - done += rsync_finished(); - printf("rsyncs done: %u\n", done); - } while (done < count); + struct timespec req; + + if (millis > 100) { + millis -= 100; + req.tv_sec = millis / 1000; + req.tv_nsec = (millis % 1000) * 1000000; + ck_assert_int_eq(0, nanosleep(&req, NULL)); + ck_assert_uint_eq(0, rsync_finished()); + } - ck_assert_uint_eq(count, done); + req.tv_sec = 0; + req.tv_nsec = 200000000; /* 200ms */ + ck_assert_int_eq(0, nanosleep(&req, NULL)); + ck_assert_uint_eq(count, rsync_finished()); } START_TEST(test_fast_single_rsync) @@ -319,7 +244,7 @@ START_TEST(test_fast_single_rsync) ck_assert_int_ne(-1, writefd); ck_assert_int_eq(0, rsync_queue("A", "B")); - wait_rsyncs(1); + wait_rsyncs(1, 0); rsync_teardown(); } @@ -332,7 +257,7 @@ START_TEST(test_stalled_single_rsync) ck_assert_int_ne(-1, writefd); ck_assert_int_eq(0, rsync_queue("A", "B")); - wait_rsyncs(1); + wait_rsyncs(1, 3000); rsync_teardown(); } @@ -345,7 +270,7 @@ START_TEST(test_stalled_single_rsync_timeout) ck_assert_int_ne(-1, writefd); ck_assert_int_eq(0, rsync_queue("A", "B")); - wait_rsyncs(1); + wait_rsyncs(1, 4000); /* 4000 = timeout */ rsync_teardown(); } @@ -358,7 +283,7 @@ START_TEST(test_dripfeed_single_rsync) ck_assert_int_ne(-1, writefd); ck_assert_int_eq(0, rsync_queue("A", "B")); - wait_rsyncs(1); + wait_rsyncs(1, 3000); rsync_teardown(); } @@ -371,7 +296,7 @@ START_TEST(test_dripfeed_single_rsync_timeout) ck_assert_int_ne(-1, writefd); ck_assert_int_eq(0, rsync_queue("A", "B")); - wait_rsyncs(1); + wait_rsyncs(1, 4000); /* 4000 = timeout */ rsync_teardown(); } @@ -384,6 +309,7 @@ START_TEST(test_no_rsyncs) ck_assert_int_ne(-1, writefd); sleep(2); + ck_assert_uint_eq(0, rsync_finished()); rsync_teardown(); } @@ -391,22 +317,36 @@ END_TEST START_TEST(test_simultaneous_rsyncs) { - create_rsync_sandbox(); - /* Note... --bwlimit does not seem to exist in openrsync */ - rsync_setup("rsync", "--bwlimit=1K", "-vvv", NULL); + rsync_setup("resources/rsync/simultaneous.sh", NULL); ck_assert_int_ne(-1, readfd); ck_assert_int_ne(-1, writefd); - ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/a", "tmp/rsync/dst/a")); - ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/b", "tmp/rsync/dst/b")); - ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/c", "tmp/rsync/dst/c")); - wait_rsyncs(3); + ck_assert_int_eq(0, rsync_queue("A", "B")); + ck_assert_int_eq(0, rsync_queue("C", "D")); + ck_assert_int_eq(0, rsync_queue("E", "F")); + + wait_rsyncs(3, 1000); rsync_teardown(); +} +END_TEST - diff("tmp/rsync/src/a", "tmp/rsync/dst/a"); - diff("tmp/rsync/src/b", "tmp/rsync/dst/b"); - diff("tmp/rsync/src/c", "tmp/rsync/dst/c"); +START_TEST(test_queued_rsyncs) +{ + rsync_setup("resources/rsync/queued.sh", NULL); + ck_assert_int_ne(-1, readfd); + ck_assert_int_ne(-1, writefd); + + + ck_assert_int_eq(0, rsync_queue("A", "B")); + ck_assert_int_eq(0, rsync_queue("C", "D")); + ck_assert_int_eq(0, rsync_queue("E", "F")); + ck_assert_int_eq(0, rsync_queue("G", "H")); + + wait_rsyncs(3, 2000); + wait_rsyncs(1, 2000); + + rsync_teardown(); } END_TEST @@ -421,17 +361,18 @@ create_suite(void) tcase_add_test(p2s, test_read_tasks); s2r = tcase_create("spawner-rsync channel"); - tcase_add_test(p2s, test_fast_single_rsync); - tcase_add_test(p2s, test_stalled_single_rsync); - tcase_add_test(p2s, test_stalled_single_rsync_timeout); - tcase_add_test(p2s, test_dripfeed_single_rsync); - tcase_add_test(p2s, test_dripfeed_single_rsync_timeout); - tcase_set_timeout(p2s, 6); + tcase_add_test(s2r, test_fast_single_rsync); + tcase_add_test(s2r, test_stalled_single_rsync); + tcase_add_test(s2r, test_stalled_single_rsync_timeout); + tcase_add_test(s2r, test_dripfeed_single_rsync); + tcase_add_test(s2r, test_dripfeed_single_rsync_timeout); + tcase_set_timeout(s2r, 6); spawner = tcase_create("spawner"); tcase_add_test(spawner, test_no_rsyncs); tcase_add_test(spawner, test_simultaneous_rsyncs); - tcase_set_timeout(spawner, 6); + tcase_add_test(spawner, test_queued_rsyncs); + tcase_set_timeout(spawner, 5); suite = suite_create("rsync"); suite_add_tcase(suite, p2s); @@ -441,6 +382,25 @@ create_suite(void) return suite; } +static void +disable_sigpipe(void) +{ + struct sigaction action = { .sa_handler = SIG_IGN }; + if (sigaction(SIGPIPE, &action, NULL) == -1) + pr_crit("Cannot disable SIGPIPE: %s", strerror(errno)); +} + +static void +init_content(void) +{ + size_t i; + + if (sizeof(content) % STR64LEN != 0) + pr_crit("content's length isn't divisible by str64's length"); + for (i = 0; i < (sizeof(content) / STR64LEN); i++) + memcpy(content + 64 * i, STR64, STR64LEN); +} + int main(void) {