]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Add --rsync.max
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Thu, 10 Apr 2025 22:14:51 +0000 (16:14 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Thu, 10 Apr 2025 23:31:06 +0000 (17:31 -0600)
Maximum simultaneous forked rsyncs.

src/config.c
src/config.h
src/rsync.c
test/mock.c
test/resources/rsync/queued.sh [new file with mode: 0755]
test/resources/rsync/simultaneous.sh [new file with mode: 0755]
test/rsync_test.c

index 31495f48b8ffbbc4edf4f543e0de0668da727d1b..58ca8c28df792b8725b435e2255af4c562fdede7 100644 (file)
@@ -87,6 +87,8 @@ struct rpki_config {
                bool enabled;
                /* Protocol preference; compared to http.priority */
                unsigned int priority;
+               /* Maximum simultaneous rsyncs */
+               unsigned int max;
                /* Deprecated; does nothing. */
                char *strategy;
                unsigned int transfer_timeout;
@@ -446,6 +448,14 @@ static const struct option_field options[] = {
                .type = &gt_bool,
                .offset = offsetof(struct rpki_config, rsync.enabled),
                .doc = "Enables RSYNC execution",
+       }, {
+               .id = 3001,
+               .name = "rsync.max",
+               .type = &gt_uint,
+               .offset = offsetof(struct rpki_config, rsync.max),
+               .doc = "Maximum simultaneous rsyncs.",
+               .min = 0,
+               .max = UINT_MAX,
        }, {
                .id = 3001,
                .name = "rsync.priority",
@@ -951,6 +961,7 @@ set_default_values(void)
 
        rpki_config.rsync.enabled = true;
        rpki_config.rsync.priority = 50;
+       rpki_config.rsync.max = 1;
        rpki_config.rsync.strategy = pstrdup("<deprecated>");
        rpki_config.rsync.transfer_timeout = 900;
        rpki_config.rsync.program = pstrdup("rsync");
@@ -1381,8 +1392,14 @@ config_get_rsync_priority(void)
        return rpki_config.rsync.priority;
 }
 
+unsigned int
+config_rsync_max(void)
+{
+       return rpki_config.rsync.max;
+}
+
 long
-config_get_rsync_transfer_timeout(void)
+config_rsync_timeout(void)
 {
        return rpki_config.rsync.transfer_timeout;
 }
index 8a959408a98ae04389385f54915305983f3d5884..a9caee0a949fa00243af6d4a075affb05b9708dc 100644 (file)
@@ -45,7 +45,8 @@ char const *config_get_http_ca_path(void);
 unsigned int config_get_rrdp_delta_threshold(void);
 bool config_get_rsync_enabled(void);
 unsigned int config_get_rsync_priority(void);
-long config_get_rsync_transfer_timeout(void);
+unsigned int config_rsync_max(void);
+long config_rsync_timeout(void);
 char const *config_get_rsync_program(void);
 bool config_get_http_enabled(void);
 unsigned int config_get_http_priority(void);
index d50d01fd3d54cca3aaa7d155068e6d3374df7607..772fc7a922a5297ef444b5408c562ed957176334 100644 (file)
@@ -63,7 +63,14 @@ struct rsync_task {
        LIST_ENTRY(rsync_task) lh;
 };
 
-LIST_HEAD(rsync_tasks, rsync_task);
+LIST_HEAD(rsync_task_list, rsync_task);
+
+struct rsync_tasks {
+       struct rsync_task_list active;
+       int a; /* total active */
+
+       struct rsync_task_list queued;
+};
 
 #ifndef LIST_FOREACH_SAFE
 #define LIST_FOREACH_SAFE(var, ls, lh, tmp)                            \
@@ -84,7 +91,7 @@ void_task(struct rsync_task *task, struct s2p_socket *s2p)
        free(task);
 
        if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) {
-               pr_op_err("Cannot message parent process: %s", strerror(errno));
+               pr_op_err(RSP "Cannot message parent process: %s", strerror(errno));
 
                close(s2p->wr);
                s2p->wr = -1;
@@ -94,9 +101,11 @@ void_task(struct rsync_task *task, struct s2p_socket *s2p)
 }
 
 static void
-finish_task(struct rsync_task *task, struct s2p_socket *s2p)
+finish_task(struct rsync_tasks *tasks, struct rsync_task *task,
+    struct s2p_socket *s2p)
 {
        LIST_REMOVE(task, lh);
+       tasks->a--;
        void_task(task, s2p);
 }
 
@@ -109,7 +118,7 @@ init_pfd(struct pollfd *pfd, int fd)
 }
 
 static struct pollfd *
-create_pfds(int request_fd, struct rsync_tasks *tasks, size_t tn)
+create_pfds(int request_fd, struct rsync_task_list *tasks, size_t tn)
 {
        struct pollfd *pfds;
        struct rsync_task *task;
@@ -134,13 +143,13 @@ create_pipes(int fds[2][2])
 
        if (pipe(fds[0]) < 0) {
                error = errno;
-               pr_op_err_st("Piping rsync stderr: %s", strerror(error));
+               pr_op_err_st(RSP "Piping rsync stderr: %s", strerror(error));
                return error;
        }
 
        if (pipe(fds[1]) < 0) {
                error = errno;
-               pr_op_err_st("Piping rsync stdout: %s", strerror(error));
+               pr_op_err_st(RSP "Piping rsync stdout: %s", strerror(error));
                close(fds[0][0]);
                close(fds[0][1]);
                return error;
@@ -216,7 +225,7 @@ fork_rsync(struct rsync_task *task)
        task->pid = fork();
        if (task->pid < 0) {
                error = errno;
-               pr_op_err_st("Couldn't spawn the rsync process: %s",
+               pr_op_err_st(RSP "Couldn't spawn the rsync process: %s",
                    strerror(error));
                close(STDERR_READ(fork_fds));
                close(STDOUT_READ(fork_fds));
@@ -237,8 +246,23 @@ fork_rsync(struct rsync_task *task)
        return 0;
 }
 
-static unsigned int
-start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
+static void
+activate_task(struct rsync_tasks *tasks, struct rsync_task *task,
+    struct s2p_socket *s2p, struct timespec *now)
+{
+       ts_add(&task->expiration, now, 1000 * config_rsync_timeout());
+
+       if (fork_rsync(task) != 0) {
+               void_task(task, s2p);
+               return;
+       }
+
+       LIST_INSERT_HEAD(&tasks->active, task, lh);
+       tasks->a++;
+}
+
+static void
+post_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
     struct timespec *now)
 {
        struct rsync_task *task;
@@ -246,19 +270,17 @@ start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
        task = pzalloc(sizeof(struct rsync_task));
        task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size);
        task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size);
-       ts_add(&task->expiration, now, 1000 * config_get_rsync_transfer_timeout());
 
-       if (fork_rsync(task) != 0) {
-               void_task(task, s2p);
-               return 0;
+       if (tasks->a >= config_rsync_max()) {
+               LIST_INSERT_HEAD(&tasks->queued, task, lh);
+               pr_op_debug(RSP "Queued new task.");
+       } else {
+               activate_task(tasks, task, s2p, now);
+               pr_op_debug(RSP "Got new task: %d", task->pid);
        }
-
-       LIST_INSERT_HEAD(tasks, task, lh);
-       pr_val_debug(RSP "Got new task: %d", task->pid);
-       return 1;
 }
 
-static unsigned int
+static void
 read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
     struct timespec *now)
 {
@@ -266,11 +288,9 @@ read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
        ssize_t consumed;
        size_t offset;
        asn_dec_rval_t decres;
-       unsigned int t;
        int error;
 
        in = &s2p->rd;
-       t = 0;
 
        do {
                consumed = read(in->fd, in->buffer + in->len,
@@ -279,11 +299,11 @@ read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
                        error = errno;
                        if (error != EAGAIN && error != EWOULDBLOCK)
                                rstream_close(in, true);
-                       return t;
+                       return;
                }
                if (consumed == 0) { /* EOS */
                        rstream_close(in, true);
-                       return t;
+                       return;
                }
 
                in->len += consumed;
@@ -295,14 +315,14 @@ read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
                        offset += decres.consumed;
                        switch (decres.code) {
                        case RC_OK:
-                               t += start_task(s2p, tasks, now);
+                               post_task(s2p, tasks, now);
                                ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr);
                                break;
                        case RC_WMORE:
                                goto break_for;
                        case RC_FAIL:
                                rstream_close(in, true);
-                               return t;
+                               return;
                        }
                }
 
@@ -313,30 +333,24 @@ break_for:        if (offset > in->len)
        } while (true);
 }
 
-/* Returns number of tasks created */
-static int
+static void
 handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd,
     struct rsync_tasks *tasks, struct timespec *now)
 {
        if (s2p->rd.fd == -1)
-               return 0;
+               return;
 
        if (pfd->revents & POLLNVAL) {
-               pr_val_err(RSP "bad parent fd: %i", pfd->fd);
+               pr_op_err(RSP "bad parent fd: %i", pfd->fd);
                rstream_close(&s2p->rd, false);
-               return 0;
-       }
 
-       if (pfd->revents & POLLERR) {
-               pr_val_err(RSP "Generic error during parent fd poll.");
+       } else if (pfd->revents & POLLERR) {
+               pr_op_err(RSP "Generic error during parent fd poll.");
                rstream_close(&s2p->rd, true);
-               return 0;
-       }
-
-       if (pfd->revents & (POLLIN | POLLHUP))
-               return read_tasks(s2p, tasks, now);
 
-       return 0;
+       } else if (pfd->revents & (POLLIN | POLLHUP)) {
+               read_tasks(s2p, tasks, now);
+       }
 }
 
 static void
@@ -358,9 +372,9 @@ log_buffer(char const *buffer, ssize_t read, bool is_error)
                        continue;
                }
                if (is_error)
-                       pr_val_err("[RSYNC exec] %s", cur);
+                       pr_op_err("[RSYNC exec] %s", cur);
                else
-                       pr_val_debug("[RSYNC exec] %s", cur);
+                       pr_op_debug("[RSYNC exec] %s", cur);
                cur = tmp + 1;
        }
        free(cpy);
@@ -381,7 +395,7 @@ log_rsync_output(struct pollfd *pfd, size_t p)
                error = errno;
                if (error == EINTR)
                        return 0; /* Dunno; retry */
-               pr_val_err("rsync buffer read error: %s", strerror(error));
+               pr_op_err(RSP "rsync buffer read error: %s", strerror(error));
                goto down; /* Error */
        }
 
@@ -398,17 +412,17 @@ static int
 handle_rsync_fd(struct pollfd *pfd, size_t p)
 {
        if (pfd->fd == -1) {
-               pr_val_debug(RSP "File descriptor already closed.");
+               pr_op_debug(RSP "File descriptor already closed.");
                return 1;
        }
 
        if (pfd->revents & POLLNVAL) {
-               pr_val_err(RSP "rsync bad fd: %i", pfd->fd);
+               pr_op_err(RSP "rsync bad fd: %i", pfd->fd);
                return 1;
        }
 
        if (pfd->revents & POLLERR) {
-               pr_val_err(RSP "Generic error during rsync poll.");
+               pr_op_err(RSP "Generic error during rsync poll.");
                close(pfd->fd);
                return 1;
        }
@@ -435,7 +449,7 @@ again:      status = 0;
        if (WIFEXITED(status)) {
                /* Happy path (but also sad path sometimes) */
                error = WEXITSTATUS(status);
-               pr_val_debug("%s ended. Result: %d", name, error);
+               pr_op_debug("%s ended. Result: %d", name, error);
                return error ? EIO : 0;
        }
 
@@ -474,10 +488,26 @@ kill_subprocess(struct rsync_task *task)
        kill(task->pid, SIGTERM);
 }
 
+static void
+activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p,
+    struct timespec *now)
+{
+       struct rsync_task *task;
+
+       task = LIST_FIRST(&tasks->queued);
+       if (task == NULL)
+               return;
+
+       pr_op_debug(RSP "Activating queued task %s -> %s.",
+           task->url, task->path);
+       LIST_REMOVE(task, lh);
+       activate_task(tasks, task, s2p, now);
+}
+
 /* Returns true if the task died. */
 static bool
-maybe_expire(struct timespec *now, struct rsync_task *task,
-    struct s2p_socket *s2p)
+maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task,
+    struct s2p_socket *s2p, struct timespec *now)
 {
        struct timespec epoch;
 
@@ -488,7 +518,9 @@ maybe_expire(struct timespec *now, struct rsync_task *task,
        pr_op_debug(RSP "Task %d ran out of time.", task->pid);
        kill_subprocess(task);
        wait_subprocess("rsync", task->pid);
-       finish_task(task, s2p);
+       finish_task(tasks, task, s2p);
+       activate_queued(tasks, s2p, now);
+
        return true;
 }
 
@@ -508,7 +540,6 @@ spawner_run(
 
        struct rsync_tasks tasks;
        struct rsync_task *task, *tmp;
-       int task_count;
 
        int error;
 
@@ -516,8 +547,9 @@ spawner_run(
        s2p.wr = response_fd;
        s2p.rr = NULL;
 
-       LIST_INIT(&tasks);
-       task_count = 0;
+       LIST_INIT(&tasks.active);
+       LIST_INIT(&tasks.queued);
+       tasks.a = 0;
        error = 0;
 
        ts_now(&now);
@@ -528,17 +560,17 @@ spawner_run(
                 * odd: stdouts
                 * even > 0: stderrs
                 */
-               pfds = create_pfds(s2p.rd.fd, &tasks, task_count);
-               pfds_count = 2 * task_count + 1;
+               pfds = create_pfds(s2p.rd.fd, &tasks.active, tasks.a);
+               pfds_count = 2 * tasks.a + 1;
                expiration.tv_sec = now.tv_sec + 10;
                expiration.tv_nsec = now.tv_nsec;
-               LIST_FOREACH(task, &tasks, lh)
+               LIST_FOREACH(task, &tasks.active, lh)
                        if ((ts_cmp(&now, &task->expiration) < 0) &&
                            (ts_cmp(&task->expiration, &expiration) < 0))
                                expiration = task->expiration;
 
                timeout = ts_delta(&now, &expiration);
-               pr_val_debug(RSP "Timeout decided: %dms", timeout);
+               pr_op_debug(RSP "Timeout decided: %dms", timeout);
                events = poll(pfds, pfds_count, timeout);
                if (events < 0) {
                        error = errno;
@@ -549,50 +581,52 @@ spawner_run(
                ts_now(&now);
 
                if (events == 0) { /* Timeout */
-                       pr_val_debug(RSP "Woke up because of timeout.");
-                       LIST_FOREACH_SAFE(task, &tasks, lh, tmp)
-                               task_count -= maybe_expire(&now, task, &s2p);
+                       pr_op_debug(RSP "Woke up because of timeout.");
+                       LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp)
+                               maybe_expire(&tasks, task, &s2p, &now);
                        goto cont;
                }
 
-               pr_val_debug(RSP "Woke up because of input.");
+               pr_op_debug(RSP "Woke up because of input.");
                p = 1;
-               LIST_FOREACH_SAFE(task, &tasks, lh, tmp) {
-                       if (maybe_expire(&now, task, &s2p)) {
-                               task_count--;
+               LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
+                       if (maybe_expire(&tasks, task, &s2p, &now))
                                continue;
-                       }
 
                        if (handle_rsync_fd(&pfds[p], p)) {
-                               pr_val_debug(RSP "Task %d: Stdout closed.",
+                               pr_op_debug(RSP "Task %d: Stdout closed.",
                                    task->pid);
                                task->stdoutfd = -1;
                        }
                        p++;
                        if (handle_rsync_fd(&pfds[p], p)) {
-                               pr_val_debug(RSP "Task %d: Stderr closed.",
+                               pr_op_debug(RSP "Task %d: Stderr closed.",
                                    task->pid);
                                task->stderrfd = -1;
                        }
                        p++;
                        if (task->stdoutfd == -1 && task->stderrfd == -1) {
-                               pr_val_debug(RSP "Both stdout & stderr are closed; ending task %d.",
+                               pr_op_debug(RSP "Both stdout & stderr are closed; ending task %d.",
                                    task->pid);
                                wait_subprocess("rsync", task->pid);
-                               finish_task(task, &s2p);
-                               task_count--;
+                               finish_task(&tasks, task, &s2p);
+                               activate_queued(&tasks, &s2p, &now);
                        }
                }
-               task_count += handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
+               handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
 
 cont:          free(pfds);
-       } while ((s2p.rd.fd != -1 || task_count > 0));
-       pr_val_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
+       } while ((s2p.rd.fd != -1 || tasks.a > 0));
+       pr_op_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
 
-       LIST_FOREACH_SAFE(task, &tasks, lh, tmp) {
+       LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
                kill_subprocess(task);
                wait_subprocess("rsync", task->pid);
-               finish_task(task, &s2p);
+               finish_task(&tasks, task, &s2p);
+       }
+       LIST_FOREACH_SAFE(task, &tasks.queued, lh, tmp) {
+               LIST_REMOVE(task, lh);
+               void_task(task, &s2p);
        }
 
        rstream_close(&s2p.rd, true);
index 37818a8396ce140641247a1eb09d7226695e8636..d214670e85490642a6182ab9341016f764c7764f 100644 (file)
@@ -2,6 +2,7 @@
 
 #include <errno.h>
 #include <arpa/inet.h>
+#include <time.h>
 #include "config.h"
 #include "incidence.h"
 #include "log.h"
 
 /* Some core functions, as linked from unit tests. */
 
-/* CFLAGS=-DPRINT_PRS make check */
-#ifdef PRINT_PRS
+#if 0
+
+static void
+print_monotime(void)
+{
+       struct timespec now;
+       if (clock_gettime(CLOCK_MONOTONIC, &now) < 0)
+               pr_crit("clock_gettime() returned '%s'", strerror(errno));
+       printf("%ld.%.3ld ", now.tv_sec, now.tv_nsec / 1000000);
+}
+
 #define MOCK_PRINT(color)                                              \
        do {                                                            \
                va_list args;                                           \
                printf(color);                                          \
+               print_monotime();                                       \
                va_start(args, format);                                 \
                vfprintf(stdout, format, args);                         \
                va_end(args);                                           \
                printf(PR_COLOR_RST "\n");                              \
        } while (0)
+
 #else
 #define MOCK_PRINT(color)
 #endif
diff --git a/test/resources/rsync/queued.sh b/test/resources/rsync/queued.sh
new file mode 100755 (executable)
index 0000000..eb866c4
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+echo "rsync begins: $1 -> $2"
+sleep 2
+echo "rsync ends: $1 -> $2"
diff --git a/test/resources/rsync/simultaneous.sh b/test/resources/rsync/simultaneous.sh
new file mode 100755 (executable)
index 0000000..56322a0
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+echo "rsync begins: $1 -> $2"
+sleep 1
+echo "rsync ends: $1 -> $2"
index bb3ff3601717915303723e23eac7a4fdc475c9db..1676bbd909c83377f31c7dffca95183f484de117 100644 (file)
@@ -6,7 +6,6 @@
 #include "rsync.c"
 #include "stream.c"
 
-
 #include "asn1/asn1c/ber_decoder.c"
 #include "asn1/asn1c/ber_tlv_length.c"
 #include "asn1/asn1c/ber_tlv_tag.c"
@@ -27,7 +26,8 @@ static char content[1024];
 /* Mocks */
 
 MOCK(config_get_rsync_program, char const *, "rsync", void)
-MOCK(config_get_rsync_transfer_timeout, long, 4, void)
+MOCK_UINT(config_rsync_max, 3, void)
+MOCK(config_rsync_timeout, long, 4, void)
 MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void)
 
 MOCK_ABORT_PTR(json_obj_new, json_t, void)
@@ -44,113 +44,6 @@ MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td,
 
 /* Tests */
 
-static void
-disable_sigpipe(void)
-{
-       struct sigaction action = { .sa_handler = SIG_IGN };
-       if (sigaction(SIGPIPE, &action, NULL) == -1)
-               pr_crit("Cannot disable SIGPIPE: %s", strerror(errno));
-}
-
-static void
-init_content(void)
-{
-       size_t i;
-
-       if (sizeof(content) % STR64LEN != 0)
-               pr_crit("content's length isn't divisible by str64's length");
-       for (i = 0; i < (sizeof(content) / STR64LEN); i++)
-               memcpy(content + 64 * i, STR64, STR64LEN);
-}
-
-static void
-create_dir(char const *path)
-{
-       if (mkdir(path, 0700) < 0)
-               ck_assert_int_eq(EEXIST, errno);
-}
-
-static void
-create_file(char const *name, unsigned int kbs)
-{
-       FILE *file;
-
-       file = fopen(name, "wb");
-       ck_assert_ptr_ne(NULL, file);
-       ck_assert_int_eq(kbs, fwrite(content, sizeof(content), kbs, file));
-       ck_assert_int_eq(0, fclose(file));
-}
-
-static void
-ensure_file_deleted(char const *name)
-{
-       int ret;
-       int error;
-
-       errno = 0;
-       ret = unlink(name);
-       error = errno;
-
-       ck_assert(ret == 0 || error == ENOENT);
-}
-
-static void
-create_rsync_sandbox(void)
-{
-       create_dir("tmp");
-       create_dir("tmp/rsync");
-       create_dir("tmp/rsync/src");
-       create_dir("tmp/rsync/dst");
-
-       create_file("tmp/rsync/src/a", 1);
-       create_file("tmp/rsync/src/b", 1);
-       create_file("tmp/rsync/src/c", 1);
-
-       ensure_file_deleted("tmp/rsync/dst/a");
-       ensure_file_deleted("tmp/rsync/dst/b");
-       ensure_file_deleted("tmp/rsync/dst/c");
-}
-
-static void
-diff(char const *file1, char const *file2)
-{
-       int fd1, fd2;
-       struct read_stream rs1, rs2;
-       int read1, read2;
-
-       fd1 = open(file1, O_RDONLY, 0);
-       ck_assert_int_ne(-1, fd1);
-       rstream_init(&rs1, fd1, 1024);
-
-       fd2 = open(file2, O_RDONLY, 0);
-       ck_assert_int_ne(-1, fd2);
-       rstream_init(&rs2, fd2, 1024);
-
-       do {
-               read1 = rstream_full_read(&rs1, 1024);
-               ck_assert_int_ge(read1, 0);
-               read2 = rstream_full_read(&rs2, 1024);
-               ck_assert_int_eq(read1, read2);
-               ck_assert_int_eq(0, memcmp(rs1.buffer, rs2.buffer, read1));
-       } while (read1 == 1024);
-
-       rstream_close(&rs1, true);
-       rstream_close(&rs2, true);
-}
-
-static void
-ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
-    char const *url, char const *path)
-{
-       struct rsync_task *task;
-
-       task = LIST_FIRST(tasks);
-       ck_assert_ptr_ne(NULL, task);
-       ck_assert_str_eq(url, task->url);
-       ck_assert_str_eq(path, task->path);
-       finish_task(task, sk);
-}
-
 /* Test RsyncRequest decode, feeding as few bytes as possible every time. */
 START_TEST(test_decode_extremely_fragmented)
 {
@@ -199,6 +92,28 @@ START_TEST(test_decode_extremely_fragmented)
 }
 END_TEST
 
+static void
+ck_no_tasks(struct rsync_tasks *tasks)
+{
+       ck_assert_int_eq(0, tasks->a);
+       ck_assert(LIST_EMPTY(&tasks->active));
+       ck_assert(LIST_EMPTY(&tasks->queued));
+}
+
+static void
+ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
+    char const *url, char const *path)
+{
+       struct rsync_task *task;
+
+       task = LIST_FIRST(&tasks->active);
+       ck_assert_ptr_ne(NULL, task);
+       ck_assert_str_eq(url, task->url);
+       ck_assert_str_eq(path, task->path);
+       ck_assert(tasks->a > 0);
+       finish_task(tasks, task, sk);
+}
+
 static void
 encode_request(char const *url, char const *path, unsigned char *buffer)
 {
@@ -227,56 +142,60 @@ START_TEST(test_read_tasks)
        rstream_init(&sk.rd, fds[0], 256);
        sk.wr = -1;
        sk.rr = NULL;
-       LIST_INIT(&tasks);
+       LIST_INIT(&tasks.active);
+       LIST_INIT(&tasks.queued);
+       tasks.a = 0;
        ts_now(&now);
 
        printf("Read yields nothing\n");
-       ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
+       ck_no_tasks(&tasks);
 
        printf("Read yields less than 1 request\n");
        encode_request("111", "2222", bytes); /* 13 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
-       ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
+       ck_no_tasks(&tasks);
 
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3));
-       ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
 
        ck_1st_task(&tasks, &sk, "111", "2222");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 
        printf("Read yields 1 request\n");
        encode_request("3333", "444", bytes); /* 13 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13));
-       ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
 
        ck_1st_task(&tasks, &sk, "3333", "444");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 
        printf("Read yields 1.5 requests\n");
        encode_request("55", "666", bytes); /* 11 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11));
        encode_request("777", "88", bytes); /* 11 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5));
-       ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
 
        ck_1st_task(&tasks, &sk, "55", "666");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6));
-       ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
 
        ck_1st_task(&tasks, &sk, "777", "88");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 
        printf("Read yields 2 requests\n");
        encode_request("9999", "00", bytes); /* 12 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
        encode_request("aa", "bbbb", bytes); /* 12 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
-       ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
        ck_1st_task(&tasks, &sk, "aa", "bbbb");
        ck_1st_task(&tasks, &sk, "9999", "00");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 
        printf("Read yields 2.5 requests\n");
        encode_request("cc", "dd", bytes); /* 10 bytes */
@@ -286,30 +205,36 @@ START_TEST(test_read_tasks)
        encode_request("gggg", "hhhhh", bytes); /* 15 bytes */
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3));
 
-       ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
        ck_1st_task(&tasks, &sk, "eeeee", "fffff");
        ck_1st_task(&tasks, &sk, "cc", "dd");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 
        ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12));
-       ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+       read_tasks(&sk, &tasks, &now);
        ck_1st_task(&tasks, &sk, "gggg", "hhhhh");
-       ck_assert(LIST_EMPTY(&tasks));
+       ck_no_tasks(&tasks);
 }
 END_TEST
 
+/* Makes sure @count rsyncs finish after roughly @millis milliseconds. */
 static void
-wait_rsyncs(unsigned int count)
+wait_rsyncs(unsigned int count, unsigned int millis)
 {
-       unsigned int done = 0;
-
-       do {
-               sleep(1);
-               done += rsync_finished();
-               printf("rsyncs done: %u\n", done);
-       } while (done < count);
+       struct timespec req;
+
+       if (millis > 100) {
+               millis -= 100;
+               req.tv_sec = millis / 1000;
+               req.tv_nsec = (millis % 1000) * 1000000;
+               ck_assert_int_eq(0, nanosleep(&req, NULL));
+               ck_assert_uint_eq(0, rsync_finished());
+       }
 
-       ck_assert_uint_eq(count, done);
+       req.tv_sec = 0;
+       req.tv_nsec = 200000000; /* 200ms */
+       ck_assert_int_eq(0, nanosleep(&req, NULL));
+       ck_assert_uint_eq(count, rsync_finished());
 }
 
 START_TEST(test_fast_single_rsync)
@@ -319,7 +244,7 @@ START_TEST(test_fast_single_rsync)
        ck_assert_int_ne(-1, writefd);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
-       wait_rsyncs(1);
+       wait_rsyncs(1, 0);
 
        rsync_teardown();
 }
@@ -332,7 +257,7 @@ START_TEST(test_stalled_single_rsync)
        ck_assert_int_ne(-1, writefd);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
-       wait_rsyncs(1);
+       wait_rsyncs(1, 3000);
 
        rsync_teardown();
 }
@@ -345,7 +270,7 @@ START_TEST(test_stalled_single_rsync_timeout)
        ck_assert_int_ne(-1, writefd);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
-       wait_rsyncs(1);
+       wait_rsyncs(1, 4000); /* 4000 = timeout */
 
        rsync_teardown();
 }
@@ -358,7 +283,7 @@ START_TEST(test_dripfeed_single_rsync)
        ck_assert_int_ne(-1, writefd);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
-       wait_rsyncs(1);
+       wait_rsyncs(1, 3000);
 
        rsync_teardown();
 }
@@ -371,7 +296,7 @@ START_TEST(test_dripfeed_single_rsync_timeout)
        ck_assert_int_ne(-1, writefd);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
-       wait_rsyncs(1);
+       wait_rsyncs(1, 4000); /* 4000 = timeout */
 
        rsync_teardown();
 }
@@ -384,6 +309,7 @@ START_TEST(test_no_rsyncs)
        ck_assert_int_ne(-1, writefd);
 
        sleep(2);
+       ck_assert_uint_eq(0, rsync_finished());
 
        rsync_teardown();
 }
@@ -391,22 +317,36 @@ END_TEST
 
 START_TEST(test_simultaneous_rsyncs)
 {
-       create_rsync_sandbox();
-       /* Note... --bwlimit does not seem to exist in openrsync */
-       rsync_setup("rsync", "--bwlimit=1K", "-vvv", NULL);
+       rsync_setup("resources/rsync/simultaneous.sh", NULL);
        ck_assert_int_ne(-1, readfd);
        ck_assert_int_ne(-1, writefd);
 
-       ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/a", "tmp/rsync/dst/a"));
-       ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/b", "tmp/rsync/dst/b"));
-       ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/c", "tmp/rsync/dst/c"));
-       wait_rsyncs(3);
+       ck_assert_int_eq(0, rsync_queue("A", "B"));
+       ck_assert_int_eq(0, rsync_queue("C", "D"));
+       ck_assert_int_eq(0, rsync_queue("E", "F"));
+
+       wait_rsyncs(3, 1000);
 
        rsync_teardown();
+}
+END_TEST
 
-       diff("tmp/rsync/src/a", "tmp/rsync/dst/a");
-       diff("tmp/rsync/src/b", "tmp/rsync/dst/b");
-       diff("tmp/rsync/src/c", "tmp/rsync/dst/c");
+START_TEST(test_queued_rsyncs)
+{
+       rsync_setup("resources/rsync/queued.sh", NULL);
+       ck_assert_int_ne(-1, readfd);
+       ck_assert_int_ne(-1, writefd);
+
+
+       ck_assert_int_eq(0, rsync_queue("A", "B"));
+       ck_assert_int_eq(0, rsync_queue("C", "D"));
+       ck_assert_int_eq(0, rsync_queue("E", "F"));
+       ck_assert_int_eq(0, rsync_queue("G", "H"));
+
+       wait_rsyncs(3, 2000);
+       wait_rsyncs(1, 2000);
+
+       rsync_teardown();
 }
 END_TEST
 
@@ -421,17 +361,18 @@ create_suite(void)
        tcase_add_test(p2s, test_read_tasks);
 
        s2r = tcase_create("spawner-rsync channel");
-       tcase_add_test(p2s, test_fast_single_rsync);
-       tcase_add_test(p2s, test_stalled_single_rsync);
-       tcase_add_test(p2s, test_stalled_single_rsync_timeout);
-       tcase_add_test(p2s, test_dripfeed_single_rsync);
-       tcase_add_test(p2s, test_dripfeed_single_rsync_timeout);
-       tcase_set_timeout(p2s, 6);
+       tcase_add_test(s2r, test_fast_single_rsync);
+       tcase_add_test(s2r, test_stalled_single_rsync);
+       tcase_add_test(s2r, test_stalled_single_rsync_timeout);
+       tcase_add_test(s2r, test_dripfeed_single_rsync);
+       tcase_add_test(s2r, test_dripfeed_single_rsync_timeout);
+       tcase_set_timeout(s2r, 6);
 
        spawner = tcase_create("spawner");
        tcase_add_test(spawner, test_no_rsyncs);
        tcase_add_test(spawner, test_simultaneous_rsyncs);
-       tcase_set_timeout(spawner, 6);
+       tcase_add_test(spawner, test_queued_rsyncs);
+       tcase_set_timeout(spawner, 5);
 
        suite = suite_create("rsync");
        suite_add_tcase(suite, p2s);
@@ -441,6 +382,25 @@ create_suite(void)
        return suite;
 }
 
+static void
+disable_sigpipe(void)
+{
+       struct sigaction action = { .sa_handler = SIG_IGN };
+       if (sigaction(SIGPIPE, &action, NULL) == -1)
+               pr_crit("Cannot disable SIGPIPE: %s", strerror(errno));
+}
+
+static void
+init_content(void)
+{
+       size_t i;
+
+       if (sizeof(content) % STR64LEN != 0)
+               pr_crit("content's length isn't divisible by str64's length");
+       for (i = 0; i < (sizeof(content) / STR64LEN); i++)
+               memcpy(content + 64 * i, STR64, STR64LEN);
+}
+
 int
 main(void)
 {