]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Move the rsync spawner to a separate process
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Sat, 19 Oct 2024 00:18:00 +0000 (18:18 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Mon, 21 Oct 2024 23:31:53 +0000 (17:31 -0600)
The fork()s (needed to spawn rsyncs) duplicate Fort's process.

Which is messy in a multithreaded program. Quoting the Linux man page:

> * The child process is created with a single thread—the one that
>   called fork(). The entire virtual address space of the parent is
>   replicated in the child, including the states of mutexes, condition
>   variables, and other pthreads objects. (...)
> * After a fork() in a multithreaded program, the child can safely call
>   only async-signal-safe functions (...) until such time as it calls
>   execve(2).

As far as I can tell, since the forked child was, in fact, careful to
only invoke async-signal-safe functions, this wasn't really a bug.
Still, it wasn't quality architecture either.

Moving the rsync spawner to a dedicated subprocess should stop the forks
from threatening to clash with the multithreading completely.

Relies on the new core loop design, so this won't work properly until
that's implemented.

src/cache.c
src/cache.h
src/log.c
src/log.h
src/main.c
src/rsync.c
src/rsync.h
src/stream.c [new file with mode: 0644]
src/stream.h [new file with mode: 0644]
test/mock.c
test/rsync_test.c

index 8cb0e64108f4393ceba154b8678092eeaab3f710..704cf95abde506de93b77d8539dae9cc8a1e3958 100644 (file)
@@ -316,12 +316,6 @@ cache_setup(void)
        return 0;
 }
 
-void
-cache_teardown(void)
-{
-       /* Empty */
-}
-
 static struct cache_node *
 json2node(json_t *json)
 {
index 75939810834a63e770607c02ec2b8b874735e1f3..24d1ea7d55b3a178040bc31b56b6c1ae71bf9c5f 100644 (file)
@@ -6,7 +6,6 @@
 #include "types/rpp.h"
 
 int cache_setup(void);         /* Init this module */
-void cache_teardown(void);     /* Destroy this module */
 void cache_atexit(void);
 
 int cache_prepare(void);       /* Prepare cache for new validation cycle */
index 09b81b1744ea558eb2c57d5e9e8c2e432551ce50..0c2bd25d5b3b4284eb63957e3136346824604136 100644 (file)
--- a/src/log.c
+++ b/src/log.c
@@ -211,15 +211,6 @@ log_teardown(void)
        pthread_mutex_destroy(&logck);
 }
 
-void
-log_flush(void)
-{
-       if (op_config.fprintf_enabled || val_config.fprintf_enabled) {
-               fflush(stdout);
-               fflush(stderr);
-       }
-}
-
 bool
 log_val_enabled(unsigned int level)
 {
index 5c94dd4326bed8b0ab7975f2e01a32db87708816..8fb10aa0df717b1f1b81e744d0960ab0cb378496 100644 (file)
--- a/src/log.h
+++ b/src/log.h
@@ -59,9 +59,6 @@ int log_setup(void);
 void log_start(void);
 void log_teardown(void);
 
-/* Call to flush the stdout/stderr streams */
-void log_flush(void);
-
 /*
  * Check if corresponding logging is enabled. You can use these to short-circuit
  * out of heavy logging code.
index ee61d7d69412b6b49815d2ddd28bed95558bd187..994bf7d7e0b4b54e1d2e82dcf740fd3359775df8 100644 (file)
@@ -11,6 +11,7 @@
 #include "print_file.h"
 #include "relax_ng.h"
 #include "rtr/rtr.h"
+#include "rsync.h"
 #include "sig.h"
 #include "thread_var.h"
 
@@ -121,20 +122,22 @@ main(int argc, char **argv)
        error = log_setup();
        if (error)
                goto just_quit;
+       error = handle_flags_config(argc, argv);
+       if (error)
+               goto revert_log;
+
+       rsync_setup(); /* Spawn rsync spawner ASAP */
        register_signal_handlers();
+
        error = thvar_init();
        if (error)
-               goto revert_log;
+               goto revert_rsync;
        error = incidence_init();
        if (error)
-               goto revert_log;
-       error = handle_flags_config(argc, argv);
-       if (error)
-               goto revert_log;
-
+               goto revert_rsync;
        error = nid_init();
        if (error)
-               goto revert_config;
+               goto revert_rsync;
        error = extension_init();
        if (error)
                goto revert_nid;
@@ -144,7 +147,6 @@ main(int argc, char **argv)
        error = http_init();
        if (error)
                goto revert_hash;
-
        error = relax_ng_init();
        if (error)
                goto revert_http;
@@ -156,7 +158,7 @@ main(int argc, char **argv)
                goto revert_vrps;
        error = output_setup();
        if (error)
-               goto revert_cache;
+               goto revert_vrps;
 
        /* Meat */
 
@@ -173,8 +175,6 @@ main(int argc, char **argv)
        }
 
        /* End */
-revert_cache:
-       cache_teardown();
 revert_vrps:
        vrps_destroy();
 revert_relax_ng:
@@ -185,7 +185,8 @@ revert_hash:
        hash_teardown();
 revert_nid:
        nid_destroy();
-revert_config:
+revert_rsync:
+       rsync_teardown();
        free_rpki_config();
 revert_log:
        log_teardown();
index e77afb6e78352432e2affd6a7b4d3e5dc2ae550e..25f158462c1d055139affc328a3d5208af5cf70e 100644 (file)
@@ -4,6 +4,7 @@
 #include <fcntl.h>
 #include <poll.h>
 #include <signal.h>
+#include <stream.h>
 #include <sys/wait.h>
 #include <syslog.h>
 
 #define STDERR_READ(fds)  fds[0][0]
 #define STDOUT_READ(fds)  fds[1][0]
 
+static pid_t spawner;          /* The process that spawns rsync runs */
+
+static int readfd;             /* Our end of the spawner-to-parent pipe */
+static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER;
+
+static int writefd;            /* Our end of the parent-to-spawner pipe */
+static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER;
+
+static int rsync(char const *, char const *);
+
+static int
+run_child(int readfd, int writefd)
+{
+       unsigned char zero = 0;
+       struct read_stream stream;
+       char *url, *path;
+       int error;
+
+       read_stream_init(&stream, readfd);
+
+       do {
+               error = read_string(&stream, &url);
+               if (error || url == NULL)
+                       break;
+               error = read_string(&stream, &path);
+               if (error || path == NULL) {
+                       free(url);
+                       break;
+               }
+
+               error = rsync(url, path);
+
+               free(url);
+               free(path);
+
+               error = full_write(writefd, &zero, 1);
+       } while (!error);
+
+       read_stream_close(&stream);
+       close(writefd);
+       free_rpki_config();
+       log_teardown();
+       return error;
+}
+
+void
+rsync_setup(void)
+{
+       static const int RDFD = 0;
+       static const int WRFD = 1;
+       int parent2spawner[2];  /* Pipe: Parent writes, spawner reads */
+       int spawner2parent[2];  /* Pipe: Spawner writes, parent reads */
+       int flags;
+
+       if (pipe(parent2spawner) < 0) {
+               pr_op_err("Cannot create pipe: %s", strerror(errno));
+               goto fail1;
+       }
+       if (pipe(spawner2parent) < 0) {
+               pr_op_err("Cannot create pipe: %s", strerror(errno));
+               goto fail2;
+       }
+
+       flags = fcntl(spawner2parent[RDFD], F_GETFL);
+       if (flags < 0) {
+               pr_op_err("Cannot retrieve pipe flags: %s", strerror(errno));
+               goto fail3;
+       }
+       if (fcntl(spawner2parent[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) {
+               pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(errno));
+               goto fail3;
+       }
+
+       fflush(stdout);
+       fflush(stderr);
+
+       spawner = fork();
+       if (spawner < 0) {
+               pr_op_err("Cannot fork rsync spawner: %s", strerror(errno));
+               goto fail3;
+       }
+
+       if (spawner == 0) { /* Client code */
+               close(parent2spawner[WRFD]);
+               close(spawner2parent[RDFD]);
+               exit(run_child(parent2spawner[RDFD], spawner2parent[WRFD]));
+       }
+
+       /* Parent code */
+       close(parent2spawner[RDFD]);
+       close(spawner2parent[WRFD]);
+       readfd = spawner2parent[RDFD];
+       writefd = parent2spawner[WRFD];
+       return;
+
+fail3: close(spawner2parent[RDFD]);
+       close(spawner2parent[WRFD]);
+fail2: close(parent2spawner[RDFD]);
+       close(parent2spawner[WRFD]);
+fail1: pr_op_warn("rsync will not be available.");
+       readfd = writefd = 0;
+}
+
+int
+rsync_download(char const *url, char const *path)
+{
+       mutex_lock(&writelock);
+
+       if (writefd == 0)
+               goto fail1;
+       if (write_string(writefd, url) != 0)
+               goto fail2;
+       if (write_string(writefd, path) != 0)
+               goto fail2;
+
+       mutex_unlock(&writelock);
+       return 0; // XXX go pick some other task
+
+fail2: close(readfd);
+       close(writefd);
+       readfd = writefd = 0;
+fail1: mutex_unlock(&writelock);
+       return EIO;
+}
+
+/* Returns the number of rsyncs that have ended since the last query */
+unsigned int
+rsync_finished(void)
+{
+       unsigned char buf[8];
+       ssize_t result;
+
+       mutex_lock(&readlock);
+       result = (readfd != 0) ? read(readfd, buf, sizeof(buf)) : 0;
+       mutex_unlock(&readlock);
+
+       return (result >= 0) ? result : 0;
+}
+
+static int
+wait_child(char const *name, pid_t pid)
+{
+       int status;
+       int error;
+
+again: status = 0;
+       if (waitpid(pid, &status, 0) < 0) {
+               error = errno;
+               pr_op_err("Could not wait for %s: %s", name, strerror(error));
+               return error;
+       }
+
+       if (WIFEXITED(status)) {
+               /* Happy path (but also sad path sometimes) */
+               error = WEXITSTATUS(status);
+               pr_val_debug("%s ended. Result: %d", name, error);
+               return error ? EIO : 0;
+       }
+
+       if (WIFSIGNALED(status)) {
+               pr_op_warn("%s interrupted by signal %d.",
+                   name, WTERMSIG(status));
+               return EINTR;
+       }
+
+       if (WIFCONTINUED(status)) {
+               /*
+                * Testing warning:
+                * I can't trigger this branch. It always exits or signals;
+                * SIGSTOP then SIGCONT doesn't seem to wake up waitpid().
+                * It's concerning because every sample code I've found assumes
+                * waitpid() returning always means the subprocess ended, so
+                * they never retry. But that contradicts all documentation,
+                * yet seems to be accurate to reality.
+                */
+               pr_op_debug("%s has resumed.", name);
+               goto again;
+       }
+
+       /* Dead code */
+       pr_op_err("Unknown waitpid() status; giving up %s.", name);
+       return EINVAL;
+}
+
+void
+rsync_teardown(void)
+{
+       if (readfd)
+               close(readfd);
+       if (writefd)
+               close(writefd);
+       readfd = writefd = 0;
+       wait_child("rsync spawner", spawner);
+}
+
 /*
  * Duplicate parent FDs, to pipe rsync output:
  * - fds[0] = stderr
@@ -75,24 +271,18 @@ prepare_rsync_args(char **args, char const *url, char const *path)
        args[i++] = NULL;
 }
 
-__dead static void
-handle_child_thread(char const *url, char const *path, int fds[2][2])
+static int
+execvp_rsync(char const *url, char const *path, int fds[2][2])
 {
-       /* THIS FUNCTION MUST NEVER RETURN!!! */
        char *args[20];
-       int error;
 
        prepare_rsync_args(args, url, path);
        duplicate_fds(fds);
 
-       execvp(args[0], args);
-       error = errno;
-       /* Log directly to stderr, redirected by the pipes */
-       fprintf(stderr, "Could not execute the rsync command: %s\n",
-           strerror(error));
+       if (execvp(args[0], args) < 0)
+               return errno;
 
-       /* https://stackoverflow.com/a/14493459/1735458 */
-       exit(-error);
+       return EINVAL; /* Unreachable, but whatever */
 }
 
 static int
@@ -110,8 +300,8 @@ create_pipes(int fds[2][2])
                error = errno;
 
                /* Close pipe previously created */
-               close(STDERR_READ(fds));
-               close(STDERR_WRITE(fds));
+               close(fds[0][0]);
+               close(fds[0][1]);
 
                pr_op_err_st("Piping rsync stdout: %s", strerror(error));
                return -error;
@@ -132,7 +322,6 @@ get_current_millis(void)
 static void
 log_buffer(char const *buffer, ssize_t read, bool is_error)
 {
-#define PRE_RSYNC "[RSYNC exec]: "
        char *cpy, *cur, *tmp;
 
        cpy = pmalloc(read + 1);
@@ -149,13 +338,12 @@ log_buffer(char const *buffer, ssize_t read, bool is_error)
                        continue;
                }
                if (is_error)
-                       pr_val_err(PRE_RSYNC "%s", cur);
+                       pr_val_err("[RSYNC exec] %s", cur);
                else
-                       pr_val_debug(PRE_RSYNC "%s", cur);
+                       pr_val_debug("[RSYNC exec] %s", cur);
                cur = tmp + 1;
        }
        free(cpy);
-#undef PRE_RSYNC
 }
 
 #define DROP_FD(f, fail)               \
@@ -252,7 +440,7 @@ exhaust_read_fds(int fderr, int fdout)
        }
 
 timed_out:
-       pr_val_err("rsync transfer timeout reached");
+       pr_val_err("rsync transfer timeout exhausted");
        error = 2;
 fail:  for (f = 0; f < 2; f++)
                if (pfd[f].fd != -1)
@@ -263,8 +451,11 @@ fail:      for (f = 0; f < 2; f++)
 /*
  * Completely consumes @fds' streams, and closes them.
  *
- * Allegedly, this is a portable way to wait for the child process to finish.
- * (IIRC, waitpid() doesn't do this reliably.)
+ * Originally, this was meant to redirect rsync's output to syslog:
+ * ac56d70c954caf49382f5f28ff4a017e859e2e0a
+ * (ie. we need to exhaust the streams because we dup2()'d them.)
+ *
+ * Later, @job repurposed this code to fix #74.
  */
 static int
 exhaust_pipes(int fds[2][2])
@@ -275,42 +466,25 @@ exhaust_pipes(int fds[2][2])
 }
 
 /* rsync @url @path */
-int
-rsync_download(char const *url, char const *path)
+static int
+rsync(char const *url, char const *path)
 {
        /* Descriptors to pipe stderr (first element) and stdout (second) */
        int fork_fds[2][2];
        pid_t child_pid;
-       int child;
        int error;
 
-       pr_val_info("rsync: %s -> %s", url, path);
-
        error = create_pipes(fork_fds);
        if (error)
                return error;
 
-       /* Flush output (avoid locks between father and child) */
-       log_flush();
+       fflush(stdout);
+       fflush(stderr);
 
-       /* We need to fork because execvp() magics the thread away. */
        child_pid = fork();
-       if (child_pid == 0) {
-               /*
-                * This code is run by the child, and should try to
-                * call execvp() as soon as possible.
-                *
-                * Refer to
-                * https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html
-                * "{..} to avoid errors, the child process may only
-                * execute async-signal-safe operations until such time
-                * as one of the exec functions is called."
-                */
-               handle_child_thread(url, path, fork_fds);
-       }
        if (child_pid < 0) {
                error = errno;
-               pr_op_err_st("Couldn't fork to execute rsync: %s",
+               pr_op_err_st("Couldn't spawn the rsync process: %s",
                    strerror(error));
                /* Close all ends from the created pipes */
                close(STDERR_READ(fork_fds));
@@ -320,30 +494,14 @@ rsync_download(char const *url, char const *path)
                return error;
        }
 
-       /* This code is run by us. */
+       if (child_pid == 0)
+               exit(execvp_rsync(url, path, fork_fds)); /* Child code */
+
+       /* Parent code */
+
        error = exhaust_pipes(fork_fds);
        if (error)
                kill(child_pid, SIGTERM); /* Stop the child */
 
-       child = 0;
-       if (waitpid(child_pid, &child, 0) < 0) {
-               error = errno;
-               pr_op_err("Could not wait for rsync: %s", strerror(error));
-               return error;
-       }
-
-       if (WIFEXITED(child)) {
-               /* Happy path (but also sad path sometimes) */
-               error = WEXITSTATUS(child);
-               pr_val_debug("rsync ended. Result: %d", error);
-               return error ? EIO : 0;
-       }
-
-       if (WIFSIGNALED(child)) {
-               pr_op_warn("rsync interrupted by signal %d.", WTERMSIG(child));
-               return EINTR; /* Meh? */
-       }
-
-       pr_op_err_st("rsync died in an unknown way.");
-       return -EINVAL;
+       return wait_child("rsync", child_pid);
 }
index ef4fcc712530e767a4ed2e9163ed130c18f508bd..943ece5ea1a81f6880a10ab7da61114f32c59f05 100644 (file)
@@ -1,6 +1,9 @@
 #ifndef SRC_RSYNC_RSYNC_H_
 #define SRC_RSYNC_RSYNC_H_
 
+void rsync_setup(void);
 int rsync_download(char const *, char const *);
+unsigned int rsync_finished(void);
+void rsync_teardown(void);
 
 #endif /* SRC_RSYNC_RSYNC_H_ */
diff --git a/src/stream.c b/src/stream.c
new file mode 100644 (file)
index 0000000..0a4055e
--- /dev/null
@@ -0,0 +1,151 @@
+#include "stream.h"
+
+#include <errno.h>
+#include <stdbool.h>
+#include <stddef.h>
+
+void
+read_stream_init(struct read_stream *stream, int fd)
+{
+       stream->fd = fd;
+       stream->buffer = pmalloc(256);
+       stream->capacity = 256;
+}
+
+void
+read_stream_close(struct read_stream *stream)
+{
+       close(stream->fd);
+       free(stream->buffer);
+}
+
+/*
+ * Full read or error.
+ *
+ * Nonzero: error
+ * 0, stream->buffer != NULL: Success
+ * 0, stream->buffer == NULL: EOF
+ */
+static int
+full_read(struct read_stream *stream, size_t len)
+{
+       ssize_t rd;
+       size_t offset;
+
+       if (stream->buffer == NULL)
+               return 0;
+
+       offset = 0;
+       do {
+               rd = read(stream->fd, stream->buffer + offset, len);
+               if (rd < 0)
+                       return errno;
+               if (rd == 0) {
+                       free(stream->buffer);
+                       stream->buffer = NULL;
+                       stream->capacity = 0;
+                       return 0;
+               }
+               if (rd == len)
+                       return 0;
+               if (rd > len)
+                       pr_crit("rd > len: %zd > %zu", rd, len);
+
+               len -= rd;
+               offset += rd;
+       } while (true);
+}
+
+/* Full write or error. */
+int
+full_write(int fd, unsigned char const *buf, size_t len)
+{
+       ssize_t wr;
+       size_t offset;
+
+       offset = 0;
+       do {
+               wr = write(fd, buf + offset, len);
+               if (wr < 0)
+                       return errno;
+               if (wr > len)
+                       pr_crit("wr > len: %zd > %zu", wr, len);
+               len -= wr;
+               offset += wr;
+       } while (len > 0);
+
+       return 0;
+}
+
+/* @value -1 means "EOF". */
+static int
+read_ssize_t(struct read_stream *stream, ssize_t *value)
+{
+       int error;
+
+       error = full_read(stream, 2);
+       if (error)
+               return error;
+
+       *value = stream->buffer
+           ? ((stream->buffer[0] << 8) | stream->buffer[1])
+           : -1;
+       return 0;
+}
+
+static int
+write_size_t(int fd, size_t value)
+{
+       unsigned char buf[2];
+
+       if (value > 1024)
+               return ENOSPC;
+
+       buf[0] = (value >> 8) & 0xFF;
+       buf[1] = value & 0xFF;
+
+       return full_write(fd, buf, 2);
+}
+
+int
+read_string(struct read_stream *stream, char **result)
+{
+       ssize_t len;
+       int error;
+
+       error = read_ssize_t(stream, &len);
+       if (error)
+               return error;
+       if (len == -1) {
+               *result = NULL;
+               return 0;
+       }
+
+       if (len > stream->capacity) {
+               do {
+                       stream->capacity *= 2;
+               } while (len > stream->capacity);
+               stream->buffer = prealloc(stream->buffer, stream->capacity);
+       }
+
+       error = full_read(stream, len);
+       if (error)
+               return error;
+       *result = stream->buffer ? pstrdup((char *)stream->buffer) : NULL;
+       return 0;
+}
+
+int
+write_string(int fd, char const *str)
+{
+       size_t len;
+       int error;
+
+       len = strlen(str) + 1;
+
+       error = write_size_t(fd, len);
+       if (error)
+               return error;
+
+       return full_write(fd, (unsigned char const *) str, len);
+}
diff --git a/src/stream.h b/src/stream.h
new file mode 100644 (file)
index 0000000..f0f3439
--- /dev/null
@@ -0,0 +1,19 @@
+#ifndef SRC_STREAM_H_
+#define SRC_STREAM_H_
+
+struct read_stream {
+       int fd;
+       unsigned char *buffer;
+       size_t capacity;
+};
+
+void read_stream_init(struct read_stream *, int);
+void read_stream_close(struct read_stream *);
+
+int full_write(int, unsigned char const *, size_t);
+
+/* NULL means "EOF". */
+int read_string(struct read_stream *, char **);
+int write_string(int, char const *);
+
+#endif /* SRC_STREAM_H_ */
index 51550e0cb26268f368fd35139617f214585f2009..89a76fbdbb358ba6b760fdf222023b9ad8fee244 100644 (file)
@@ -81,6 +81,8 @@ pr_crit(const char *format, ...)
        ck_abort();
 }
 
+MOCK_VOID(log_teardown, void)
+
 static char addr_buffer1[INET6_ADDRSTRLEN];
 static char addr_buffer2[INET6_ADDRSTRLEN];
 
@@ -123,6 +125,7 @@ MOCK_NULL(config_get_output_bgpsec, char const *, void)
 MOCK(config_get_op_log_file_format, enum filename_format, FNF_NAME, void)
 MOCK(config_get_val_log_file_format, enum filename_format, FNF_NAME, void)
 MOCK(logv_filename, char const *, path, char const *path)
+MOCK_VOID(free_rpki_config, void)
 
 MOCK_VOID(fnstack_init, void)
 MOCK_VOID(fnstack_push, char const *file)
index e6053386ca220814cc36b47220bfe9093c2e0c99..3930c205f7d00fece66c6f3cbf071d3a82ea7355 100644 (file)
@@ -4,6 +4,7 @@
 #include "common.c"
 #include "mock.c"
 #include "rsync.c"
+#include "stream.c"
 
 static char const STR64[] = "abcdefghijklmnopqrstuvwxyz"
                "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@@ -16,13 +17,6 @@ static char content[1024];
 MOCK(config_get_rsync_program, char const *, "rsync", void)
 MOCK(config_get_rsync_transfer_timeout, long, 4, void)
 
-void
-log_flush(void)
-{
-       fflush(stdout);
-       fflush(stderr);
-}
-
 /* Tests */
 
 static void
@@ -188,9 +182,7 @@ START_TEST(full_rsync_timeout_test_1kb)
        printf("1kb\n");
        create_file("tmp/1kb", 1);
        ensure_file_deleted("tmp/1kb-copy");
-       // XXX this is creating directories because of rsync_download's mkdir_p.
-       // Is this a symptom of a problem?
-       ck_assert_int_eq(0, rsync_download("tmp/1kb", "tmp/1kb-copy"));
+       ck_assert_int_eq(0, rsync("tmp/1kb", "tmp/1kb-copy"));
 }
 END_TEST
 
@@ -199,7 +191,7 @@ START_TEST(full_rsync_timeout_test_3kb)
        printf("3kb\n");
        create_file("tmp/3kb", 3);
        ensure_file_deleted("tmp/3kb-copy");
-       ck_assert_int_eq(0, rsync_download("tmp/3kb", "tmp/3kb-copy"));
+       ck_assert_int_eq(0, rsync("tmp/3kb", "tmp/3kb-copy"));
 }
 END_TEST
 
@@ -209,14 +201,70 @@ START_TEST(full_rsync_timeout_test_5kb)
        create_file("tmp/5kb", 5);
        ensure_file_deleted("tmp/5kb-copy");
        /* Max speed is 1kbps, timeout is 4 seconds */
-       ck_assert_int_eq(EIO, rsync_download("tmp/5kb", "tmp/5kb-copy"));
+       ck_assert_int_eq(EIO, rsync("tmp/5kb", "tmp/5kb-copy"));
+}
+END_TEST
+
+static void
+wait_rsyncs(unsigned int expected)
+{
+       unsigned int actual = 0;
+
+       do {
+               actual += rsync_finished();
+               if (expected == actual) {
+                       ck_assert_uint_eq(0, rsync_finished());
+                       return;
+               }
+               ck_assert(expected > actual);
+               sleep(1);
+       } while (true);
+}
+
+START_TEST(test_rsync_finished)
+{
+       printf("rsync_finished()\n");
+
+       create_file("tmp/2kb", 1);
+       ensure_file_deleted("tmp/2kb-copy");
+
+       rsync_setup();
+
+       ck_assert_int_eq(0, rsync_download("tmp/2kb", "tmp/2kb-copy"));
+       ck_assert_int_eq(0, rsync_finished());
+       wait_rsyncs(1);
+
+       rsync_teardown();
+}
+END_TEST
+
+START_TEST(test_multi_rsync)
+{
+       printf("simultaneous rsyncs\n");
+
+       create_file("tmp/i1", 1);
+       ensure_file_deleted("tmp/i1-copy");
+       create_file("tmp/i2", 1);
+       ensure_file_deleted("tmp/i2-copy");
+       create_file("tmp/i3", 1);
+       ensure_file_deleted("tmp/i3-copy");
+
+       rsync_setup();
+
+       ck_assert_int_eq(0, rsync_download("tmp/i1", "tmp/"));
+       ck_assert_int_eq(0, rsync_finished());
+       ck_assert_int_eq(0, rsync_download("tmp/i2", "tmp/i2-copy"));
+       ck_assert_int_eq(0, rsync_download("tmp/i3", "tmp/i3-copy"));
+       wait_rsyncs(3);
+
+       rsync_teardown();
 }
 END_TEST
 
 static Suite *create_suite(void)
 {
        Suite *suite;
-       TCase *pipes;
+       TCase *pipes, *spawner;
 
        pipes = tcase_create("pipes");
        tcase_add_test(pipes, exhaust_read_fds_test_normal);
@@ -227,8 +275,14 @@ static Suite *create_suite(void)
        tcase_add_test(pipes, full_rsync_timeout_test_5kb);
        tcase_set_timeout(pipes, 6);
 
+       spawner = tcase_create("spawner");
+       tcase_add_test(spawner, test_rsync_finished);
+       tcase_add_test(spawner, test_multi_rsync);
+       tcase_set_timeout(spawner, 6);
+
        suite = suite_create("rsync");
        suite_add_tcase(suite, pipes);
+       suite_add_tcase(suite, spawner);
 
        return suite;
 }