]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Update glue between main loop and rsync
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Tue, 22 Apr 2025 22:03:30 +0000 (16:03 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Tue, 22 Apr 2025 22:05:11 +0000 (16:05 -0600)
1. Propagate EBUSY so the main loop suspends the task (and takes care of
   other tasks) while the rsync runs.
2. Spawner now responds rsync URL and path to parent, so the cache can
   update the download state.

src/asn1/asn1c/OCTET_STRING.c
src/asn1/asn1c/OCTET_STRING.h
src/cache.c
src/log.h
src/main.c
src/object/tal.c
src/rsync.c
src/rsync.h
test/cache_test.c
test/mock.c
test/rsync_test.c

index 39a2746e379c62c8351eac9f60f14f35df4916f2..8ed51f4115ee5e7ea60f48ada4d8b4bd7ca327a1 100644 (file)
@@ -947,6 +947,12 @@ OCTET_STRING_new_fromBuf(const asn_TYPE_descriptor_t *td, const char *str,
        return st;
 }
 
+char *
+OCTET_STRING_toString(OCTET_STRING_t *os)
+{
+       return pstrndup((char *)os->buf, os->size);
+}
+
 json_t *
 OCTET_STRING_to_json(const asn_TYPE_descriptor_t *td, OCTET_STRING_t const *ber)
 {
index 1506cb92d40e8ede27db1e840abd3c9d4dd43222..bd0e8fc18ffa09102b518e067e9420d4c9419448 100644 (file)
@@ -65,6 +65,7 @@ int OCTET_STRING_fromBuf(OCTET_STRING_t *s, const char *str, int size);
 OCTET_STRING_t *OCTET_STRING_new_fromBuf(const asn_TYPE_descriptor_t *td,
                                          const char *str, int size);
 
+char *OCTET_STRING_toString(OCTET_STRING_t *);
 json_t *OCTET_STRING_to_json(const asn_TYPE_descriptor_t *,
                             OCTET_STRING_t const *);
 
index 6d150a54798bd059e38f36e3596188196161c531..816c985056e358058ea7f0716062bdf67d827f75 100644 (file)
@@ -730,14 +730,8 @@ static int
 dl_rsync(struct cache_node *module)
 {
        int error;
-
-       // XXX go pick some other task on zero
        error = rsync_queue(module->map.url, module->map.path);
-       if (error)
-               return error;
-
-       module->success_ts = module->attempt_ts;
-       return 0;
+       return error ? error : EBUSY;
 }
 
 static int
@@ -1178,6 +1172,32 @@ cache_commit_file(struct cache_mapping *map)
        mutex_unlock(&commits_lock);
 }
 
+void
+rsync_finished(char const *url, char const *path)
+{
+       struct cache_node *node;
+
+       mutex_lock(&cache.rsync.lock);
+
+       node = find_node(&cache.rsync, url, strlen(url));
+       if (node == NULL) {
+               mutex_unlock(&cache.rsync.lock);
+               pr_op_err("rsync '%s -> %s' finished, but cache node does not exist.",
+                   url, path);
+               return;
+       }
+       if (node->state != DLS_ONGOING)
+               pr_op_warn("rsync '%s -> %s' finished, but existing node was not in ONGOING state.",
+                   url, path);
+
+       node->state = DLS_FRESH;
+       node->dlerr = 0;
+       node->success_ts = node->attempt_ts;
+       mutex_unlock(&cache.rsync.lock);
+
+       task_wakeup_dormants();
+}
+
 char const *
 cage_rpkiNotify(struct cache_cage *cage)
 {
index 1544a1878865aff872ebf6c5d92666dfa243a99d..453899d2a29fb05fc94f027e4ad6f22a1b9fdabf 100644 (file)
--- a/src/log.h
+++ b/src/log.h
@@ -117,9 +117,9 @@ int incidence(enum incidence_id, const char *, ...) CHECK_FORMAT(2, 3);
 #define DBG_COLOR_RESET "\x1B[0m"
 #define PR_DEBUG \
     printf(DBG_COLOR "%s:%d (%s())" DBG_COLOR_RESET "\n", \
-    __FILE__, __LINE__, __func__)
+        __FILE__, __LINE__, __func__)
 #define PR_DEBUG_MSG(msg, ...) \
     printf(DBG_COLOR "%s:%d (%s()): " msg DBG_COLOR_RESET "\n", \
-    __FILE__, __LINE__, __func__, ##__VA_ARGS__)
+        __FILE__, __LINE__, __func__, ##__VA_ARGS__)
 
 #endif /* SRC_LOG_H_ */
index 6b13e38a088d384d424ca5cc8c6ea0dfd72d3625..0917288b2c2ccdce7bca07b3929108582aa94ddc 100644 (file)
@@ -120,6 +120,7 @@ main(int argc, char **argv)
        int error;
 
        /* Initializations */
+       /* (Do not start any threads until after rsync_setup() has forked.) */
 
        error = log_setup();
        if (error)
@@ -128,7 +129,7 @@ main(int argc, char **argv)
        if (error)
                goto revert_log;
 
-       rsync_setup(NULL, NULL); /* Spawn rsync spawner ASAP */
+       rsync_setup(NULL, NULL); /* Fork rsync spawner ASAP */
        register_signal_handlers();
 
        error = thvar_init();
index 8534744447a5f34d168182b43fd5322022b5c4f8..8ec75d5dbd16832c644d7917c7f79b719aefd827 100644 (file)
@@ -170,16 +170,19 @@ try_urls(struct tal *tal, bool (*url_is_protocol)(char const *),
 {
        char **url;
        struct cache_mapping map;
+       int error;
 
        ARRAYLIST_FOREACH(&tal->urls, url) {
                map.url = *url;
                if (!url_is_protocol(map.url))
                        continue;
-               // XXX if this is rsync, it seems this will queue and fail
                map.path = get_path(*url);
                if (!map.path)
                        continue;
-               if (validate_ta(tal, &map) != 0)
+               error = validate_ta(tal, &map);
+               if (error == EBUSY)
+                       return EBUSY;
+               if (error)
                        continue;
                cache_commit_file(&map);
                return 0;
@@ -201,14 +204,18 @@ traverse_tal(char const *tal_path)
                goto end1;
 
        /* Online attempts */
-       if (try_urls(&tal, url_is_https, cache_refresh_by_url) == 0)
+       error = try_urls(&tal, url_is_https, cache_refresh_by_url);
+       if (!error || error == EBUSY)
                goto end2;
-       if (try_urls(&tal, url_is_rsync, cache_refresh_by_url) == 0)
+       error = try_urls(&tal, url_is_rsync, cache_refresh_by_url);
+       if (!error || error == EBUSY)
                goto end2;
        /* Offline fallback attempts */
-       if (try_urls(&tal, url_is_https, cache_get_fallback) == 0)
+       error = try_urls(&tal, url_is_https, cache_get_fallback);
+       if (!error || error == EBUSY)
                goto end2;
-       if (try_urls(&tal, url_is_rsync, cache_get_fallback) == 0)
+       error = try_urls(&tal, url_is_rsync, cache_get_fallback);
+       if (!error || error == EBUSY)
                goto end2;
 
        pr_op_err("None of the TAL URIs yielded a successful traversal.");
@@ -233,8 +240,16 @@ pick_up_work(void *arg)
                        }
                        break;
                case VTT_TAL:
-                       if (traverse_tal(task->u.tal) != 0)
+                       switch (traverse_tal(task->u.tal)) {
+                       case 0:
+                               break;
+                       case EBUSY:
+                               task_requeue_dormant(task);
+                               task = NULL;
+                               break;
+                       default:
                                task_stop();
+                       }
                        break;
                }
 
index 772fc7a922a5297ef444b5408c562ed957176334..1bde288a019ed9d4c061d9679c5e3b51d77e3cea 100644 (file)
 #include "config.h"
 #include "log.h"
 #include "types/array.h"
+#include "types/map.h"
 
 #include "asn1/asn1c/ber_decoder.h"
 #include "asn1/asn1c/der_encoder.h"
 #include "asn1/asn1c/RsyncRequest.h"
 
 #define RSP /* rsync spawner prefix */ "[rsync spawner] "
+#define SRTP "[spawner response thread] "
 
 static char const *rsync_args[20]; /* Last must be NULL */
 
@@ -33,24 +35,17 @@ static const int WRFD = 1;
 
 static pid_t spawner;  /* The subprocess that spawns rsync runs */
 
-static int readfd;     /* Parent's end of the spawner-to-parent pipe */
-static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER;
-
-static int writefd;    /* Parent's end of the parent-to-spawner pipe */
-static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER;
-
 /*
- * "Spawner to parent" socket.
- * Socket used by the spawner to communicate with the parent.
+ * "Parent-spawner socket."
+ * Used by both parent and spawner to speak with each other.
  */
-struct s2p_socket {
-       /* Spawner's end of the parent-to-spawner pipe */
-       struct read_stream rd;
-       /* Spawner's end of the spawner-to-parent pipe */
-       int wr;
-       /* Scratchpad buffer for stream read */
-       struct RsyncRequest *rr;
-};
+struct {
+       struct read_stream rd;          /* Read fd and extra tools */
+       int wr;                         /* Write fd */
+
+       struct RsyncRequest *rr;        /* Scratchpad buffer for stream read */
+       pthread_mutex_t wrlock;         /* To sync writes */
+} pssk;
 
 struct rsync_task {
        int pid;
@@ -81,32 +76,98 @@ struct rsync_tasks {
     )
 #endif
 
+/* Spawner response thread; The thread that listens to spawner responses. */
+static pthread_t srt;
+
 static void
-void_task(struct rsync_task *task, struct s2p_socket *s2p)
+__spsk_init(int rdfd, int wrfd)
 {
-       static const unsigned char one = 1;
+       rstream_init(&pssk.rd, rdfd, 512);
+       pssk.wr = wrfd;
+       pssk.rr = NULL;
+       pthread_mutex_init(&pssk.wrlock, NULL);
+}
 
-       free(task->url);
-       free(task->path);
-       free(task);
+static void
+spsk_init(int rdpipes[2], int wrpipes[2])
+{
+       close(wrpipes[RDFD]);
+       close(rdpipes[WRFD]);
+       __spsk_init(rdpipes[RDFD], wrpipes[WRFD]);
+}
 
-       if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) {
-               pr_op_err(RSP "Cannot message parent process: %s", strerror(errno));
+static void
+spsk_cleanup(void)
+{
+       rstream_close(&pssk.rd, true);
+       if (pssk.wr != -1) {
+               close(pssk.wr);
+               pssk.wr = -1;
+       }
+       ASN_STRUCT_FREE(asn_DEF_RsyncRequest, pssk.rr);
+       pthread_mutex_destroy(&pssk.wrlock);
+}
+
+static int
+write_cb(const void *buffer, size_t size, void *arg)
+{
+       return stream_full_write(pssk.wr, buffer, size);
+}
+
+static void
+notify_parent(struct rsync_task *task)
+{
+       struct RsyncRequest req;
+       asn_enc_rval_t result;
 
-               close(s2p->wr);
-               s2p->wr = -1;
-               /* Can't signal finished rsyncs anymore; reject future ones. */
-               rstream_close(&s2p->rd, true);
+       if (pssk.wr == -1) {
+               pr_op_err(RSP "Cannot message parent process: "
+                   "The socket is closed.");
+               return;
+       }
+
+       /*
+        * TODO (asn1) these error messages are too generic.
+        * The asn1 code needs better error reporting.
+        */
+
+       if (RsyncRequest_init(&req, task->url, task->path) < 0) {
+               pr_op_err(RSP "Cannot message parent process: "
+                   "The request object cannot be created");
+               return;
+       }
+
+       result = der_encode(&asn_DEF_RsyncRequest, &req, write_cb, NULL);
+       if (result.encoded == -1) {
+               pr_op_err(RSP "Cannot message parent process: Unknown error");
+               /* TODO (asn1) Do this if the error was I/O:
+                * close(spsk.wr);
+                * spsk.wr = -1;
+                * // Can't signal finished rsyncs anymore; reject future ones.
+                * rstream_close(&spsk.rd, true);
+                */
        }
+
+       pr_op_debug(RSP "Parent notified; sent %zd bytes.", result.encoded);
+       ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req);
+}
+
+static void
+void_task(struct rsync_task *task)
+{
+       notify_parent(task);
+
+       free(task->url);
+       free(task->path);
+       free(task);
 }
 
 static void
-finish_task(struct rsync_tasks *tasks, struct rsync_task *task,
-    struct s2p_socket *s2p)
+finish_task(struct rsync_tasks *tasks, struct rsync_task *task)
 {
        LIST_REMOVE(task, lh);
        tasks->a--;
-       void_task(task, s2p);
+       void_task(task);
 }
 
 static void
@@ -248,12 +309,12 @@ fork_rsync(struct rsync_task *task)
 
 static void
 activate_task(struct rsync_tasks *tasks, struct rsync_task *task,
-    struct s2p_socket *s2p, struct timespec *now)
+    struct timespec *now)
 {
        ts_add(&task->expiration, now, 1000 * config_rsync_timeout());
 
        if (fork_rsync(task) != 0) {
-               void_task(task, s2p);
+               void_task(task);
                return;
        }
 
@@ -262,94 +323,92 @@ activate_task(struct rsync_tasks *tasks, struct rsync_task *task,
 }
 
 static void
-post_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
+post_task(struct cache_mapping *map, struct rsync_tasks *tasks,
     struct timespec *now)
 {
        struct rsync_task *task;
 
        task = pzalloc(sizeof(struct rsync_task));
-       task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size);
-       task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size);
+       task->url = map->url;
+       task->path = map->path;
 
        if (tasks->a >= config_rsync_max()) {
                LIST_INSERT_HEAD(&tasks->queued, task, lh);
                pr_op_debug(RSP "Queued new task.");
        } else {
-               activate_task(tasks, task, s2p, now);
+               activate_task(tasks, task, now);
                pr_op_debug(RSP "Got new task: %d", task->pid);
        }
 }
 
-static void
-read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
-    struct timespec *now)
+static int
+next_task(struct cache_mapping *result)
 {
-       struct read_stream *in;
-       ssize_t consumed;
-       size_t offset;
        asn_dec_rval_t decres;
+       ssize_t consumed;
        int error;
 
-       in = &s2p->rd;
-
-       do {
-               consumed = read(in->fd, in->buffer + in->len,
-                   in->capacity - in->len);
-               if (consumed < 0) {
-                       error = errno;
-                       if (error != EAGAIN && error != EWOULDBLOCK)
-                               rstream_close(in, true);
-                       return;
-               }
-               if (consumed == 0) { /* EOS */
-                       rstream_close(in, true);
-                       return;
+again: if (pssk.rd.len > 0) {
+               decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&pssk.rr,
+                   pssk.rd.buffer, pssk.rd.len);
+
+               memmove(pssk.rd.buffer, pssk.rd.buffer + decres.consumed,
+                   pssk.rd.len - decres.consumed);
+               pssk.rd.len -= decres.consumed;
+
+               switch (decres.code) {
+               case RC_OK:
+                       result->url = OCTET_STRING_toString(&pssk.rr->url);
+                       result->path = OCTET_STRING_toString(&pssk.rr->path);
+                       ASN_STRUCT_RESET(asn_DEF_RsyncRequest, pssk.rr);
+                       return 0;
+               case RC_WMORE:
+                       break;
+               case RC_FAIL:
+                       rstream_close(&pssk.rd, true);
+                       return EINVAL;
                }
+       }
 
-               in->len += consumed;
-
-               for (offset = 0; offset < in->len;) {
-                       decres = ber_decode(&asn_DEF_RsyncRequest,
-                           (void **)&s2p->rr,
-                           in->buffer + offset, in->len - offset);
-                       offset += decres.consumed;
-                       switch (decres.code) {
-                       case RC_OK:
-                               post_task(s2p, tasks, now);
-                               ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr);
-                               break;
-                       case RC_WMORE:
-                               goto break_for;
-                       case RC_FAIL:
-                               rstream_close(in, true);
-                               return;
-                       }
-               }
+       if (pssk.rd.fd == -1)
+               return EBADF;
+       consumed = read(pssk.rd.fd, pssk.rd.buffer + pssk.rd.len,
+           pssk.rd.capacity - pssk.rd.len);
+       if (consumed < 0) {
+               error = errno;
+               if (error != EAGAIN && error != EWOULDBLOCK)
+                       rstream_close(&pssk.rd, true);
+               return error;
+       }
+       if (consumed == 0) { /* EOS */
+               rstream_close(&pssk.rd, true);
+               return ENOENT;
+       }
 
-break_for:     if (offset > in->len)
-                       pr_crit("read_tasks off:%zu len:%zu", offset, in->len);
-               in->len -= offset;
-               memmove(in->buffer, in->buffer + offset, in->len);
-       } while (true);
+       pssk.rd.len += consumed;
+       goto again;
 }
 
 static void
-handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd,
-    struct rsync_tasks *tasks, struct timespec *now)
+handle_parent_fd(struct pollfd *pfd, struct rsync_tasks *tasks,
+    struct timespec *now)
 {
-       if (s2p->rd.fd == -1)
+       struct cache_mapping map;
+
+       if (pssk.rd.fd == -1)
                return;
 
        if (pfd->revents & POLLNVAL) {
                pr_op_err(RSP "bad parent fd: %i", pfd->fd);
-               rstream_close(&s2p->rd, false);
+               rstream_close(&pssk.rd, false);
 
        } else if (pfd->revents & POLLERR) {
                pr_op_err(RSP "Generic error during parent fd poll.");
-               rstream_close(&s2p->rd, true);
+               rstream_close(&pssk.rd, true);
 
        } else if (pfd->revents & (POLLIN | POLLHUP)) {
-               read_tasks(s2p, tasks, now);
+               while (next_task(&map) == 0)
+                       post_task(&map, tasks, now);
        }
 }
 
@@ -489,8 +548,7 @@ kill_subprocess(struct rsync_task *task)
 }
 
 static void
-activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p,
-    struct timespec *now)
+activate_queued(struct rsync_tasks *tasks, struct timespec *now)
 {
        struct rsync_task *task;
 
@@ -501,13 +559,13 @@ activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p,
        pr_op_debug(RSP "Activating queued task %s -> %s.",
            task->url, task->path);
        LIST_REMOVE(task, lh);
-       activate_task(tasks, task, s2p, now);
+       activate_task(tasks, task, now);
 }
 
 /* Returns true if the task died. */
 static bool
 maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task,
-    struct s2p_socket *s2p, struct timespec *now)
+    struct timespec *now)
 {
        struct timespec epoch;
 
@@ -518,19 +576,15 @@ maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task,
        pr_op_debug(RSP "Task %d ran out of time.", task->pid);
        kill_subprocess(task);
        wait_subprocess("rsync", task->pid);
-       finish_task(tasks, task, s2p);
-       activate_queued(tasks, s2p, now);
+       finish_task(tasks, task);
+       activate_queued(tasks, now);
 
        return true;
 }
 
 static int
-spawner_run(
-    int request_fd,    /* Requests from parent to us (spawner) */
-    int response_fd    /* Responses from us (spawner) to parent */
-) {
-       struct s2p_socket s2p;  /* Channel to parent */
-
+spawner_run(void)
+{
        struct pollfd *pfds;    /* Channels to children */
        size_t p, pfds_count;
        struct timespec now, expiration;
@@ -543,10 +597,6 @@ spawner_run(
 
        int error;
 
-       rstream_init(&s2p.rd, request_fd, 1024);
-       s2p.wr = response_fd;
-       s2p.rr = NULL;
-
        LIST_INIT(&tasks.active);
        LIST_INIT(&tasks.queued);
        tasks.a = 0;
@@ -560,7 +610,7 @@ spawner_run(
                 * odd: stdouts
                 * even > 0: stderrs
                 */
-               pfds = create_pfds(s2p.rd.fd, &tasks.active, tasks.a);
+               pfds = create_pfds(pssk.rd.fd, &tasks.active, tasks.a);
                pfds_count = 2 * tasks.a + 1;
                expiration.tv_sec = now.tv_sec + 10;
                expiration.tv_nsec = now.tv_nsec;
@@ -583,14 +633,14 @@ spawner_run(
                if (events == 0) { /* Timeout */
                        pr_op_debug(RSP "Woke up because of timeout.");
                        LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp)
-                               maybe_expire(&tasks, task, &s2p, &now);
+                               maybe_expire(&tasks, task, &now);
                        goto cont;
                }
 
                pr_op_debug(RSP "Woke up because of input.");
                p = 1;
                LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
-                       if (maybe_expire(&tasks, task, &s2p, &now))
+                       if (maybe_expire(&tasks, task, &now))
                                continue;
 
                        if (handle_rsync_fd(&pfds[p], p)) {
@@ -609,29 +659,27 @@ spawner_run(
                                pr_op_debug(RSP "Both stdout & stderr are closed; ending task %d.",
                                    task->pid);
                                wait_subprocess("rsync", task->pid);
-                               finish_task(&tasks, task, &s2p);
-                               activate_queued(&tasks, &s2p, &now);
+                               finish_task(&tasks, task);
+                               activate_queued(&tasks, &now);
                        }
                }
-               handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
+               handle_parent_fd(&pfds[0], &tasks, &now);
 
 cont:          free(pfds);
-       } while ((s2p.rd.fd != -1 || tasks.a > 0));
+       } while ((pssk.rd.fd != -1 || tasks.a > 0));
        pr_op_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
 
        LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
                kill_subprocess(task);
                wait_subprocess("rsync", task->pid);
-               finish_task(&tasks, task, &s2p);
+               finish_task(&tasks, task);
        }
        LIST_FOREACH_SAFE(task, &tasks.queued, lh, tmp) {
                LIST_REMOVE(task, lh);
-               void_task(task, &s2p);
+               void_task(task);
        }
 
-       rstream_close(&s2p.rd, true);
-       if (s2p.wr != -1)
-               close(s2p.wr);
+       spsk_cleanup();
 
        free_rpki_config();
        log_teardown();
@@ -646,19 +694,19 @@ nonblock_pipe(int *fds)
 
        if (pipe(fds) < 0) {
                error = errno;
-               pr_op_err("Cannot create pipe: %s", strerror(error));
+               pr_op_warn("Cannot create pipe: %s", strerror(error));
                return error;
        }
 
        flags = fcntl(fds[RDFD], F_GETFL);
        if (flags < 0) {
                error = errno;
-               pr_op_err("Cannot retrieve pipe flags: %s", strerror(error));
+               pr_op_warn("Cannot retrieve pipe flags: %s", strerror(error));
                goto cancel;
        }
        if (fcntl(fds[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) {
                error = errno;
-               pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(error));
+               pr_op_warn("Cannot enable O_NONBLOCK: %s", strerror(error));
                goto cancel;
        }
 
@@ -669,6 +717,19 @@ cancel:    close(fds[RDFD]);
        return error;
 }
 
+static void *
+rcv_spawner_responses(void *arg)
+{
+       struct cache_mapping map = { 0 };
+
+       while (next_task(&map) == 0) {
+               rsync_finished(map.url, map.path);
+               map_cleanup(&map);
+       }
+
+       return NULL;
+}
+
 void
 rsync_setup(char const *program, ...)
 {
@@ -678,6 +739,7 @@ rsync_setup(char const *program, ...)
        va_list args;
        array_index i;
        char const *arg;
+       int error;
 
        if (program != NULL) {
                rsync_args[0] = arg = program;
@@ -712,29 +774,38 @@ rsync_setup(char const *program, ...)
 
        if (nonblock_pipe(parent2spawner) != 0)
                goto fail1;
-       if (nonblock_pipe(spawner2parent) != 0)
+       if (pipe(spawner2parent) < 0) {
+               pr_op_warn("Cannot create pipe: %s", strerror(errno));
                goto fail2;
+       }
 
        fflush(stdout);
        fflush(stderr);
 
        spawner = fork();
        if (spawner < 0) {
-               pr_op_err("Cannot fork rsync spawner: %s", strerror(errno));
+               pr_op_warn("Cannot fork rsync spawner: %s", strerror(errno));
                goto fail3;
        }
 
        if (spawner == 0) { /* Client code */
-               close(parent2spawner[WRFD]);
-               close(spawner2parent[RDFD]);
-               exit(spawner_run(parent2spawner[RDFD], spawner2parent[WRFD]));
+               spsk_init(parent2spawner, spawner2parent);
+               exit(spawner_run());
        }
 
        /* Parent code */
-       close(parent2spawner[RDFD]);
-       close(spawner2parent[WRFD]);
-       readfd = spawner2parent[RDFD];
-       writefd = parent2spawner[WRFD];
+       /* (Threads can now be spawned.) */
+
+       spsk_init(spawner2parent, parent2spawner);
+
+       error = pthread_create(&srt, NULL, rcv_spawner_responses, NULL);
+       if (error) {
+               pr_op_warn("Cannot start rsync spawner listener thread: %s",
+                   strerror(error));
+               spsk_cleanup();
+               goto fail1;
+       }
+
        return;
 
 fail3: close(spawner2parent[RDFD]);
@@ -742,16 +813,15 @@ fail3:    close(spawner2parent[RDFD]);
 fail2: close(parent2spawner[RDFD]);
        close(parent2spawner[WRFD]);
 fail1: pr_op_warn("rsync will not be available.");
-       readfd = writefd = -1;
-}
-
-static int
-send_to_spawner(const void *buffer, size_t size, void *arg)
-{
-       return stream_full_write(writefd, buffer, size);
+       pssk.rd.fd = pssk.wr = -1;
 }
 
-/* Queues rsync; doesn't wait. Call rsync_finished() later.  */
+/*
+ * Queues rsync; doesn't wait.
+ *
+ * Whenever at least one rsync is finished, the function rsync_finished()
+ * will be automatically called.
+ */
 int
 rsync_queue(char const *url, char const *path)
 {
@@ -762,48 +832,30 @@ rsync_queue(char const *url, char const *path)
        if (RsyncRequest_init(&req, url, path) < 0)
                return EINVAL;
 
-       mutex_lock(&writelock);
+       mutex_lock(&pssk.wrlock);
 
-       if (writefd == -1) {
+       if (pssk.wr == -1) {
                error = EIO;
                goto end;
        }
 
-       result = der_encode(&asn_DEF_RsyncRequest, &req, send_to_spawner, NULL);
+       result = der_encode(&asn_DEF_RsyncRequest, &req, write_cb, NULL);
        if (result.encoded == -1) {
-               close(writefd);
-               writefd = -1;
+               close(pssk.wr);
+               pssk.wr = -1;
                error = EIO;
                goto end;
        }
 
        error = 0;
-end:   mutex_unlock(&writelock);
+end:   mutex_unlock(&pssk.wrlock);
        ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req);
        return error;
 }
 
-/* Returns the number of rsyncs that have ended since the last query */
-unsigned int
-rsync_finished(void)
-{
-       unsigned char buf[8];
-       ssize_t result;
-
-       mutex_lock(&readlock);
-       result = (readfd != -1) ? read(readfd, buf, sizeof(buf)) : 0;
-       mutex_unlock(&readlock);
-
-       return (result >= 0) ? result : 0;
-}
-
 void
 rsync_teardown(void)
 {
-       if (readfd != -1)
-               close(readfd);
-       if (writefd != -1)
-               close(writefd);
-       readfd = writefd = -1;
+       spsk_cleanup();
        wait_subprocess("rsync spawner", spawner);
 }
index bdfc7df80b39888075b18d54a07a8b3570436306..2d85f14708846043009abf35a7da17e4c1e97f2d 100644 (file)
@@ -3,7 +3,7 @@
 
 void rsync_setup(char const *, ...);
 int rsync_queue(char const *, char const *);
-unsigned int rsync_finished(void);
+void rsync_finished(char const *, char const *);
 void rsync_teardown(void);
 
 #endif /* SRC_RSYNC_RSYNC_H_ */
index d6007c37aa9d704eaf64000d32ad77bb0a50febb..35fee3b060136fbdfab5bed800b5924d9fac9d6c 100644 (file)
@@ -48,6 +48,9 @@ touch_file(char const *dir)
        ck_assert_int_eq(0, system(cmd));
 }
 
+static char *queued_url;
+static char *queued_path;
+
 int
 rsync_queue(char const *url, char const *path)
 {
@@ -62,6 +65,9 @@ rsync_queue(char const *url, char const *path)
        ck_assert_int_eq(0, mkdir(path, CACHE_FILEMODE));
        touch_file(path);
 
+       queued_url = pstrdup(url);
+       queued_path = pstrdup(path);
+
        return 0;
 }
 
@@ -116,6 +122,22 @@ run_dl_rsync(char *caRepository, int expected_err, unsigned int expected_calls)
        return cage;
 }
 
+static void
+finish_rsync(void)
+{
+       rsync_finished(queued_url, queued_path);
+       free(queued_url);
+       free(queued_path);
+}
+
+static struct cache_cage *
+rsync_dance(char *url)
+{
+       ck_assert_ptr_eq(NULL, run_dl_rsync(url, EBUSY, 1));
+       finish_rsync();
+       return run_dl_rsync(url, 0, 0);
+}
+
 static void
 run_dl_https(char const *url, unsigned int expected_calls,
     char const *expected_result)
@@ -389,7 +411,7 @@ START_TEST(test_cache_download_rsync)
        setup_test();
 
        printf("==== Startup ====\n");
-       cage = run_dl_rsync("rsync://a.b.c/d", 0, 1);
+       cage = rsync_dance("rsync://a.b.c/d");
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
        ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
@@ -424,7 +446,7 @@ START_TEST(test_cache_download_rsync)
         * and there would be consequences for violating it.
         */
        printf("==== rsync truncated ====\n");
-       cage = run_dl_rsync("rsync://x.y.z/m/n/o", 0, 1);
+       cage = rsync_dance("rsync://x.y.z/m/n/o");
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://x.y.z/m", "rsync/1", NULL);
        ck_cage(cage, "rsync://x.y.z/m/n/o", "rsync/1/n/o", NULL);
@@ -433,7 +455,7 @@ START_TEST(test_cache_download_rsync)
        free(cage);
 
        printf("==== Sibling ====\n");
-       cage = run_dl_rsync("rsync://a.b.c/e/f", 0, 1);
+       cage = rsync_dance("rsync://a.b.c/e/f");
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://a.b.c/e", "rsync/2", NULL);
        ck_cage(cage, "rsync://a.b.c/e/f/x/y/z", "rsync/2/f/x/y/z", NULL);
@@ -456,7 +478,7 @@ START_TEST(test_cache_download_rsync_error)
 
        printf("==== Startup ====\n");
        dl_error = 0;
-       free(run_dl_rsync("rsync://a.b.c/d", 0, 1));
+       free(rsync_dance("rsync://a.b.c/d"));
        dl_error = EINVAL;
        ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 1));
        ck_cache_rsync(nodes);
@@ -761,7 +783,11 @@ START_TEST(test_context)
         *    but does not provide RRDP as an option.
         */
        sias.rpkiNotify = NULL;
+
+       ck_assert_int_eq(EBUSY, cache_refresh_by_sias(&sias, &cage));
+       finish_rsync();
        ck_assert_int_eq(0, cache_refresh_by_sias(&sias, &cage));
+
        ck_assert_ptr_eq(NULL, cage->rpkiNotify);
        ck_assert_str_eq(FILE_RSYNC_PATH, cage_map_file(cage, FILE_URL));
        ck_assert_int_eq(false, cage_disable_refresh(cage));
index d214670e85490642a6182ab9341016f764c7764f..f79e3199b53c5d8e71a1b4cc1b1a0d866b1b9c0e 100644 (file)
@@ -81,7 +81,7 @@ void
 pr_crit(const char *format, ...)
 {
        va_list args;
-       fprintf(stderr, "pr_crit() called!\n");
+       fprintf(stderr, "pr_crit() called! ");
        va_start(args, format);
        vfprintf(stderr, format, args);
        va_end(args);
index 1676bbd909c83377f31c7dffca95183f484de117..1de30423a79cbc355d7de464349a8aec460dc65f 100644 (file)
@@ -5,6 +5,7 @@
 #include "mock.c"
 #include "rsync.c"
 #include "stream.c"
+#include "types/map.c"
 
 #include "asn1/asn1c/ber_decoder.c"
 #include "asn1/asn1c/ber_tlv_length.c"
@@ -42,6 +43,32 @@ __MOCK_ABORT(OPEN_TYPE_ber_get, asn_dec_rval_t, trash,
 MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td,
                 const void *strt, asn_app_constraint_failed_f *cb, void *key)
 
+static struct timespec rsync_request_time;
+static int rsync_expected_duration = -1;
+static int rsyncs_done = 0;
+
+void
+rsync_finished(char const *url, char const *path)
+{
+       struct timespec now;
+       int delta;
+
+       if (rsync_expected_duration == -1)
+               ck_abort_msg("rsync_finished() called, but duration not set.");
+
+       ts_now(&now);
+       delta = ts_delta(&rsync_request_time, &now);
+
+       printf("Callback! rsync finished after %dms.\n", delta);
+       if (rsync_expected_duration < 100)
+               ck_assert_int_le(0, delta);
+       else
+               ck_assert_int_le(rsync_expected_duration - 100, delta);
+       ck_assert_int_lt(delta, rsync_expected_duration + 100);
+
+       rsyncs_done++;
+}
+
 /* Tests */
 
 /* Test RsyncRequest decode, feeding as few bytes as possible every time. */
@@ -93,25 +120,25 @@ START_TEST(test_decode_extremely_fragmented)
 END_TEST
 
 static void
-ck_no_tasks(struct rsync_tasks *tasks)
+ck_no_tasks(void)
 {
-       ck_assert_int_eq(0, tasks->a);
-       ck_assert(LIST_EMPTY(&tasks->active));
-       ck_assert(LIST_EMPTY(&tasks->queued));
+       struct cache_mapping map;
+       int error;
+
+       error = next_task(&map);
+       ck_assert(error == EAGAIN || error == EWOULDBLOCK);
 }
 
 static void
-ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
-    char const *url, char const *path)
+ck_next_task(char const *url, char const *path)
 {
-       struct rsync_task *task;
-
-       task = LIST_FIRST(&tasks->active);
-       ck_assert_ptr_ne(NULL, task);
-       ck_assert_str_eq(url, task->url);
-       ck_assert_str_eq(path, task->path);
-       ck_assert(tasks->a > 0);
-       finish_task(tasks, task, sk);
+       struct cache_mapping map;
+
+       ck_assert_int_eq(0, next_task(&map));
+       ck_assert_str_eq(url, map.url);
+       ck_assert_str_eq(path, map.path);
+
+       map_cleanup(&map);
 }
 
 static void
@@ -130,90 +157,74 @@ encode_request(char const *url, char const *path, unsigned char *buffer)
  * Tests messy request queuing; in particular, when a single read() yields less
  * than one or multiple of them.
  */
-START_TEST(test_read_tasks)
+START_TEST(test_next_task)
 {
        unsigned char bytes[BUFSIZE];
        int fds[2];
-       struct s2p_socket sk;
-       struct rsync_tasks tasks;
-       struct timespec now;
 
        ck_assert_int_eq(0, nonblock_pipe(fds));
-       rstream_init(&sk.rd, fds[0], 256);
-       sk.wr = -1;
-       sk.rr = NULL;
-       LIST_INIT(&tasks.active);
-       LIST_INIT(&tasks.queued);
-       tasks.a = 0;
-       ts_now(&now);
+       __spsk_init(fds[RDFD], -1);
 
        printf("Read yields nothing\n");
-       read_tasks(&sk, &tasks, &now);
-       ck_no_tasks(&tasks);
+       ck_no_tasks();
 
        printf("Read yields less than 1 request\n");
        encode_request("111", "2222", bytes); /* 13 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
-       read_tasks(&sk, &tasks, &now);
-       ck_no_tasks(&tasks);
 
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3));
-       read_tasks(&sk, &tasks, &now);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 10));
+       ck_no_tasks();
 
-       ck_1st_task(&tasks, &sk, "111", "2222");
-       ck_no_tasks(&tasks);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 10, 3));
+       ck_next_task("111", "2222");
+       ck_no_tasks();
 
        printf("Read yields 1 request\n");
        encode_request("3333", "444", bytes); /* 13 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13));
-       read_tasks(&sk, &tasks, &now);
 
-       ck_1st_task(&tasks, &sk, "3333", "444");
-       ck_no_tasks(&tasks);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 13));
+       ck_next_task("3333", "444");
+       ck_no_tasks();
 
        printf("Read yields 1.5 requests\n");
        encode_request("55", "666", bytes); /* 11 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11));
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 11));
        encode_request("777", "88", bytes); /* 11 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5));
-       read_tasks(&sk, &tasks, &now);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 5));
 
-       ck_1st_task(&tasks, &sk, "55", "666");
-       ck_no_tasks(&tasks);
+       ck_next_task("55", "666");
+       ck_no_tasks();
 
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6));
-       read_tasks(&sk, &tasks, &now);
-
-       ck_1st_task(&tasks, &sk, "777", "88");
-       ck_no_tasks(&tasks);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 5, 6));
+       ck_next_task("777", "88");
+       ck_no_tasks();
 
        printf("Read yields 2 requests\n");
        encode_request("9999", "00", bytes); /* 12 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 12));
        encode_request("aa", "bbbb", bytes); /* 12 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
-       read_tasks(&sk, &tasks, &now);
-       ck_1st_task(&tasks, &sk, "aa", "bbbb");
-       ck_1st_task(&tasks, &sk, "9999", "00");
-       ck_no_tasks(&tasks);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 12));
+
+       ck_next_task("9999", "00");
+       ck_next_task("aa", "bbbb");
+       ck_no_tasks();
 
        printf("Read yields 2.5 requests\n");
        encode_request("cc", "dd", bytes); /* 10 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 10));
        encode_request("eeeee", "fffff", bytes); /* 16 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 16));
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 16));
        encode_request("gggg", "hhhhh", bytes); /* 15 bytes */
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3));
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 3));
 
-       read_tasks(&sk, &tasks, &now);
-       ck_1st_task(&tasks, &sk, "eeeee", "fffff");
-       ck_1st_task(&tasks, &sk, "cc", "dd");
-       ck_no_tasks(&tasks);
+       ck_next_task("cc", "dd");
+       ck_next_task("eeeee", "fffff");
+       ck_no_tasks();
 
-       ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12));
-       read_tasks(&sk, &tasks, &now);
-       ck_1st_task(&tasks, &sk, "gggg", "hhhhh");
-       ck_no_tasks(&tasks);
+       ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 3, 12));
+       ck_next_task("gggg", "hhhhh");
+       ck_no_tasks();
+
+       spsk_cleanup();
 }
 END_TEST
 
@@ -223,25 +234,28 @@ wait_rsyncs(unsigned int count, unsigned int millis)
 {
        struct timespec req;
 
-       if (millis > 100) {
-               millis -= 100;
-               req.tv_sec = millis / 1000;
-               req.tv_nsec = (millis % 1000) * 1000000;
-               ck_assert_int_eq(0, nanosleep(&req, NULL));
-               ck_assert_uint_eq(0, rsync_finished());
-       }
+       printf("Waiting for %u rsyncs after %ums.\n", count, millis);
 
-       req.tv_sec = 0;
-       req.tv_nsec = 200000000; /* 200ms */
-       ck_assert_int_eq(0, nanosleep(&req, NULL));
-       ck_assert_uint_eq(count, rsync_finished());
+       ts_now(&rsync_request_time);
+       rsync_expected_duration = millis;
+       rsyncs_done = 0;
+
+       millis += 100;
+       req.tv_sec = millis / 1000;
+       req.tv_nsec = (millis % 1000) * 1000000;
+       ck_assert_int_eq(0, nanosleep(&req, NULL)); /* Wait for rsync_finished() */
+
+       ck_assert_int_eq(count, rsyncs_done);
+       rsync_expected_duration = -1;
 }
 
 START_TEST(test_fast_single_rsync)
 {
+       printf("-- test_fast_single_rsync() --\n");
+
        rsync_setup("resources/rsync/fast.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        wait_rsyncs(1, 0);
@@ -252,9 +266,11 @@ END_TEST
 
 START_TEST(test_stalled_single_rsync)
 {
+       printf("-- test_stalled_single_rsync() --\n");
+
        rsync_setup("resources/rsync/stalled.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        wait_rsyncs(1, 3000);
@@ -265,9 +281,11 @@ END_TEST
 
 START_TEST(test_stalled_single_rsync_timeout)
 {
+       printf("-- test_stalled_single_rsync_timeout() --\n");
+
        rsync_setup("resources/rsync/stalled-timeout.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        wait_rsyncs(1, 4000); /* 4000 = timeout */
@@ -278,9 +296,11 @@ END_TEST
 
 START_TEST(test_dripfeed_single_rsync)
 {
+       printf("-- test_dripfeed_single_rsync() --\n");
+
        rsync_setup("resources/rsync/drip-feed.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        wait_rsyncs(1, 3000);
@@ -291,9 +311,11 @@ END_TEST
 
 START_TEST(test_dripfeed_single_rsync_timeout)
 {
+       printf("-- test_dripfeed_single_rsync_timeout() --\n");
+
        rsync_setup("resources/rsync/drip-feed-timeout.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        wait_rsyncs(1, 4000); /* 4000 = timeout */
@@ -304,12 +326,13 @@ END_TEST
 
 START_TEST(test_no_rsyncs)
 {
+       printf("-- test_no_rsyncs() --\n");
+
        rsync_setup("rsync", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
-       sleep(2);
-       ck_assert_uint_eq(0, rsync_finished());
+       wait_rsyncs(0, 2000);
 
        rsync_teardown();
 }
@@ -317,9 +340,11 @@ END_TEST
 
 START_TEST(test_simultaneous_rsyncs)
 {
+       printf("-- test_simultaneous_rsyncs() --\n");
+
        rsync_setup("resources/rsync/simultaneous.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        ck_assert_int_eq(0, rsync_queue("C", "D"));
@@ -333,10 +358,11 @@ END_TEST
 
 START_TEST(test_queued_rsyncs)
 {
-       rsync_setup("resources/rsync/queued.sh", NULL);
-       ck_assert_int_ne(-1, readfd);
-       ck_assert_int_ne(-1, writefd);
+       printf("-- test_queued_rsyncs() --\n");
 
+       rsync_setup("resources/rsync/queued.sh", NULL);
+       ck_assert_int_ne(-1, pssk.rd.fd);
+       ck_assert_int_ne(-1, pssk.wr);
 
        ck_assert_int_eq(0, rsync_queue("A", "B"));
        ck_assert_int_eq(0, rsync_queue("C", "D"));
@@ -344,7 +370,8 @@ START_TEST(test_queued_rsyncs)
        ck_assert_int_eq(0, rsync_queue("G", "H"));
 
        wait_rsyncs(3, 2000);
-       wait_rsyncs(1, 2000);
+       /* 2k minus the 100 extra we slept during the previous wait_rsyncs() */
+       wait_rsyncs(1, 1900);
 
        rsync_teardown();
 }
@@ -358,7 +385,7 @@ create_suite(void)
 
        p2s = tcase_create("parent-spawner channel");
        tcase_add_test(p2s, test_decode_extremely_fragmented);
-       tcase_add_test(p2s, test_read_tasks);
+       tcase_add_test(p2s, test_next_task);
 
        s2r = tcase_create("spawner-rsync channel");
        tcase_add_test(s2r, test_fast_single_rsync);