asn1/asn1c/RouteOriginAttestation.c \
asn1/asn1c/ROAIPAddressFamily.c \
asn1/asn1c/ROAIPAddress.c \
+ asn1/asn1c/RsyncRequest.c \
asn1/asn1c/Manifest.c \
asn1/asn1c/FileAndHash.c \
asn1/asn1c/BinarySigningTime.c \
asn1/asn1c/RouteOriginAttestation.h \
asn1/asn1c/ROAIPAddressFamily.h \
asn1/asn1c/ROAIPAddress.h \
+ asn1/asn1c/RsyncRequest.h \
asn1/asn1c/Manifest.h \
asn1/asn1c/FileAndHash.h \
asn1/asn1c/BinarySigningTime.h \
xer_type_encoder_f OCTET_STRING_encode_xer_utf8;
#define OCTET_STRING_constraint asn_generic_no_constraint
+#define OCTET_STRING_cmp(a, b) OCTET_STRING_compare(&asn_DEF_OCTET_STRING, a, b)
/******************************
* Handy conversion routines. *
--- /dev/null
+/*
+ * Generated by asn1c-0.9.29 (http://lionet.info/asn1c)
+ * From ASN.1 module "RsyncSpawnerChannel"
+ * found in "rsync.asn1"
+ * `asn1c -Werror -fcompound-names -fwide-types -D asn1/asn1c -no-gen-PER -no-gen-example`
+ */
+
+#include "asn1/asn1c/RsyncRequest.h"
+
+static asn_TYPE_member_t asn_MBR_RsyncRequest_1[] = {
+ { ATF_NOFLAGS, 0, offsetof(struct RsyncRequest, url),
+ (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)),
+ 0,
+ &asn_DEF_OCTET_STRING,
+ 0,
+ { 0, 0, 0 },
+ 0, 0, /* No default value */
+ "url"
+ },
+ { ATF_NOFLAGS, 0, offsetof(struct RsyncRequest, path),
+ (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)),
+ 0,
+ &asn_DEF_OCTET_STRING,
+ 0,
+ { 0, 0, 0 },
+ 0, 0, /* No default value */
+ "path"
+ },
+};
+static const ber_tlv_tag_t asn_DEF_RsyncRequest_tags_1[] = {
+ (ASN_TAG_CLASS_UNIVERSAL | (16 << 2))
+};
+static const asn_TYPE_tag2member_t asn_MAP_RsyncRequest_tag2el_1[] = {
+ { (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)), 0, 0, 1 }, /* url */
+ { (ASN_TAG_CLASS_UNIVERSAL | (4 << 2)), 1, -1, 0 } /* path */
+};
+static asn_SEQUENCE_specifics_t asn_SPC_RsyncRequest_specs_1 = {
+ sizeof(struct RsyncRequest),
+ offsetof(struct RsyncRequest, _asn_ctx),
+ asn_MAP_RsyncRequest_tag2el_1,
+ 2, /* Count of tags in the map */
+ -1, /* First extension addition */
+};
+asn_TYPE_descriptor_t asn_DEF_RsyncRequest = {
+ "RsyncRequest",
+ "RsyncRequest",
+ &asn_OP_SEQUENCE,
+ asn_DEF_RsyncRequest_tags_1,
+ sizeof(asn_DEF_RsyncRequest_tags_1)
+ /sizeof(asn_DEF_RsyncRequest_tags_1[0]), /* 1 */
+ asn_DEF_RsyncRequest_tags_1, /* Same as above */
+ sizeof(asn_DEF_RsyncRequest_tags_1)
+ /sizeof(asn_DEF_RsyncRequest_tags_1[0]), /* 1 */
+ { 0, 0, SEQUENCE_constraint },
+ asn_MBR_RsyncRequest_1,
+ 2, /* Elements count */
+ &asn_SPC_RsyncRequest_specs_1 /* Additional specs */
+};
+
+int
+RsyncRequest_init(struct RsyncRequest *req, char const *url, char const *path)
+{
+ memset(req, 0, sizeof(*req));
+ if (OCTET_STRING_fromString(&req->url, url) < 0)
+ return -1;
+ if (OCTET_STRING_fromString(&req->path, path) < 0)
+ return -1;
+ return 0;
+}
--- /dev/null
+/*
+ * Generated by asn1c-0.9.29 (http://lionet.info/asn1c)
+ * From ASN.1 module "RsyncSpawnerChannel"
+ * found in "rsync.asn1"
+ * `asn1c -Werror -fcompound-names -fwide-types -D asn1/asn1c -no-gen-PER -no-gen-example`
+ */
+
+#ifndef _RsyncRequest_H_
+#define _RsyncRequest_H_
+
+/* Including external dependencies */
+#include "asn1/asn1c/OCTET_STRING.h"
+#include "asn1/asn1c/constr_SEQUENCE.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* RsyncRequest */
+typedef struct RsyncRequest {
+ OCTET_STRING_t url;
+ OCTET_STRING_t path;
+
+ /* Context for parsing across buffer boundaries */
+ asn_struct_ctx_t _asn_ctx;
+} RsyncRequest_t;
+
+/* Implementation */
+extern asn_TYPE_descriptor_t asn_DEF_RsyncRequest;
+
+int RsyncRequest_init(struct RsyncRequest *, char const *, char const *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RsyncRequest_H_ */
+#include "asn1/asn1c/asn_internal.h"
{
int error;
- error = rsync_download(module->map.url, module->map.path);
+ // XXX go pick some other task on zero
+ error = rsync_queue(module->map.url, module->map.path);
if (error)
return error;
node->attempt_ts = time_fatal();
rm_metadata(node);
node->dlerr = tbl->download(node);
+ if (node->dlerr == EBUSY)
+ goto ongoing;
write_metadata(node);
downloaded = true;
node->state = DLS_FRESH;
break;
case DLS_ONGOING:
- mutex_unlock(&tbl->lock);
+ongoing: mutex_unlock(&tbl->lock);
pr_val_debug("Refresh ongoing.");
return EBUSY;
case DLS_FRESH:
return 0;
}
-char *hex2str(uint8_t const *hex, size_t hexlen)
+static void
+ts_normalize(struct timespec *ts)
+{
+ if (ts->tv_nsec >= 1000000000L) {
+ ts->tv_sec += ts->tv_nsec / 1000000000L;
+ ts->tv_nsec %= 1000000000L;
+ }
+ while (ts->tv_nsec < 0) {
+ ts->tv_sec--;
+ ts->tv_nsec += 1000000000L;
+ }
+}
+
+void
+ts_now(struct timespec *now)
+{
+ if (clock_gettime(CLOCK_MONOTONIC, now) < 0)
+ pr_crit("clock_gettime() returned '%s'", strerror(errno));
+ ts_normalize(now); /* Probably not needed, but I can't find contracts */
+}
+
+int
+ts_cmp(struct timespec *ts1, struct timespec *ts2)
+{
+ if (ts1->tv_sec < ts2->tv_sec)
+ return -1;
+ if (ts1->tv_sec > ts2->tv_sec)
+ return 1;
+ if (ts1->tv_nsec < ts2->tv_nsec)
+ return -1;
+ if (ts1->tv_nsec > ts2->tv_nsec)
+ return 1;
+ return 0;
+}
+
+/* Result in milliseconds */
+int
+ts_delta(struct timespec *before, struct timespec *after)
+{
+ return (1000 * (after->tv_sec - before->tv_sec))
+ + ((after->tv_nsec - before->tv_nsec) / 1000000L);
+}
+
+/* dst = src + millis */
+void
+ts_add(struct timespec *dst, struct timespec *src, long millis)
+{
+ dst->tv_sec = src->tv_sec;
+ dst->tv_nsec = src->tv_nsec + (1000000L * millis);
+ ts_normalize(dst);
+
+}
+char *
+hex2str(uint8_t const *hex, size_t hexlen)
{
static const char * const H2C = "0123456789ABCDEF";
char *str;
}
/* @hex needs to be already allocated. */
-int str2hex(char const *str, uint8_t *hex)
+int
+str2hex(char const *str, uint8_t *hex)
{
size_t h;
int digit;
int time2str(time_t, char *);
int str2time(char const *, time_t *);
+void ts_now(struct timespec *);
+int ts_cmp(struct timespec *, struct timespec *);
+int ts_delta(struct timespec *, struct timespec *);
+void ts_add(struct timespec *, struct timespec *, long);
+
char *hex2str(uint8_t const *, size_t);
int str2hex(char const *, uint8_t *);
pr_op_debug("rm -rf %s", path);
/* TODO (performance) optimize that 32 */
+ // XXX In MacOS, this breaks if path is a file.
if (nftw(path, rm, 32, FTW_DEPTH | FTW_PHYS) < 0) {
error = errno;
// XXX This msg is sometimes annoying; maybe defer it
if (error)
goto revert_log;
- rsync_setup(); /* Spawn rsync spawner ASAP */
+ rsync_setup(NULL, NULL); /* Spawn rsync spawner ASAP */
register_signal_handlers();
error = thvar_init();
if (error)
goto end;
+ /*
+ * From now on, the trees should be considered valid, even if subsequent
+ * certificates fail.
+ * (The roots validated successfully; subtrees are isolated problems.)
+ */
+
for (t = 0; t < 5; t++) {
error = pthread_create(&threads[t], NULL, pick_up_work, NULL);
if (error)
// XXX use the cache
- error = rsync_download(src, dst);
+ error = rsync_queue(src, dst);
if (error) {
- pr_op_err("rysnc download failed: %s", strerror(abs(error)));
+ pr_op_err("rsync download failed: %s", strerror(abs(error)));
return NULL;
}
#include <poll.h>
#include <signal.h>
#include <stream.h>
+#include <sys/queue.h>
#include <sys/wait.h>
#include <syslog.h>
#include "common.h"
#include "config.h"
#include "log.h"
+#include "types/array.h"
+
+#include "asn1/asn1c/ber_decoder.h"
+#include "asn1/asn1c/der_encoder.h"
+#include "asn1/asn1c/RsyncRequest.h"
+
+#define RSP /* rsync spawner prefix */ "[rsync spawner] "
+
+static char const *rsync_args[20]; /* Last must be NULL */
+
+static const int RDFD = 0;
+static const int WRFD = 1;
#define STDERR_WRITE(fds) fds[0][1]
#define STDOUT_WRITE(fds) fds[1][1]
#define STDERR_READ(fds) fds[0][0]
#define STDOUT_READ(fds) fds[1][0]
-static pid_t spawner; /* The process that spawns rsync runs */
+static pid_t spawner; /* The subprocess that spawns rsync runs */
-static int readfd; /* Our end of the spawner-to-parent pipe */
+static int readfd; /* Parent's end of the spawner-to-parent pipe */
static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER;
-static int writefd; /* Our end of the parent-to-spawner pipe */
+static int writefd; /* Parent's end of the parent-to-spawner pipe */
static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER;
-static int rsync(char const *, char const *);
+/*
+ * "Spawner to parent" socket.
+ * Socket used by the spawner to communicate with the parent.
+ */
+struct s2p_socket {
+ /* Spawner's end of the parent-to-spawner pipe */
+ struct read_stream rd;
+ /* Spawner's end of the spawner-to-parent pipe */
+ int wr;
+ /* Scratchpad buffer for stream read */
+ struct RsyncRequest *rr;
+};
+
+struct rsync_task {
+ int pid;
+ char *url;
+ char *path;
+ int stdoutfd; /* Child rsync's standard output */
+ int stderrfd; /* Child rsync's standard error */
+ struct timespec expiration;
+
+ LIST_ENTRY(rsync_task) lh;
+};
+
+LIST_HEAD(rsync_tasks, rsync_task);
+
+#ifndef LIST_FOREACH_SAFE
+#define LIST_FOREACH_SAFE(var, ls, lh, tmp) \
+ for ( \
+ var = LIST_FIRST(ls), tmp = (var ? LIST_NEXT(var, lh) : NULL); \
+ var != NULL; \
+ var = tmp, tmp = (var ? LIST_NEXT(var, lh) : NULL) \
+ )
+#endif
-static int
-run_child(int readfd, int writefd)
+static void
+void_task(struct rsync_task *task, struct s2p_socket *s2p)
{
- unsigned char zero = 0;
- struct read_stream stream;
- char *url, *path;
- int error;
+ static const unsigned char one = 1;
- read_stream_init(&stream, readfd);
+ free(task->url);
+ free(task->path);
+ free(task);
- do {
- error = read_string(&stream, &url);
- if (error || url == NULL)
- break;
- error = read_string(&stream, &path);
- if (error || path == NULL) {
- free(url);
- break;
- }
-
- error = rsync(url, path);
-
- free(url);
- free(path);
+ if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) {
+ pr_op_err("Cannot message parent process: %s", strerror(errno));
- error = full_write(writefd, &zero, 1);
- } while (!error);
-
- read_stream_close(&stream);
- close(writefd);
- free_rpki_config();
- log_teardown();
- return error;
+ close(s2p->wr);
+ s2p->wr = -1;
+ /* Can't signal finished rsyncs anymore; reject future ones. */
+ rstream_close(&s2p->rd, true);
+ }
}
-void
-rsync_setup(void)
+static void
+finish_task(struct rsync_task *task, struct s2p_socket *s2p)
{
- static const int RDFD = 0;
- static const int WRFD = 1;
- int parent2spawner[2]; /* Pipe: Parent writes, spawner reads */
- int spawner2parent[2]; /* Pipe: Spawner writes, parent reads */
- int flags;
-
- if (pipe(parent2spawner) < 0) {
- pr_op_err("Cannot create pipe: %s", strerror(errno));
- goto fail1;
- }
- if (pipe(spawner2parent) < 0) {
- pr_op_err("Cannot create pipe: %s", strerror(errno));
- goto fail2;
- }
-
- flags = fcntl(spawner2parent[RDFD], F_GETFL);
- if (flags < 0) {
- pr_op_err("Cannot retrieve pipe flags: %s", strerror(errno));
- goto fail3;
- }
- if (fcntl(spawner2parent[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) {
- pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(errno));
- goto fail3;
- }
-
- fflush(stdout);
- fflush(stderr);
-
- spawner = fork();
- if (spawner < 0) {
- pr_op_err("Cannot fork rsync spawner: %s", strerror(errno));
- goto fail3;
- }
-
- if (spawner == 0) { /* Client code */
- close(parent2spawner[WRFD]);
- close(spawner2parent[RDFD]);
- exit(run_child(parent2spawner[RDFD], spawner2parent[WRFD]));
- }
-
- /* Parent code */
- close(parent2spawner[RDFD]);
- close(spawner2parent[WRFD]);
- readfd = spawner2parent[RDFD];
- writefd = parent2spawner[WRFD];
- return;
-
-fail3: close(spawner2parent[RDFD]);
- close(spawner2parent[WRFD]);
-fail2: close(parent2spawner[RDFD]);
- close(parent2spawner[WRFD]);
-fail1: pr_op_warn("rsync will not be available.");
- readfd = writefd = 0;
+ LIST_REMOVE(task, lh);
+ void_task(task, s2p);
}
-int
-rsync_download(char const *url, char const *path)
+static void
+init_pfd(struct pollfd *pfd, int fd)
{
- mutex_lock(&writelock);
-
- if (writefd == 0)
- goto fail1;
- if (write_string(writefd, url) != 0)
- goto fail2;
- if (write_string(writefd, path) != 0)
- goto fail2;
-
- mutex_unlock(&writelock);
- return 0; // XXX go pick some other task
-
-fail2: close(readfd);
- close(writefd);
- readfd = writefd = 0;
-fail1: mutex_unlock(&writelock);
- return EIO;
+ pfd->fd = fd;
+ pfd->events = POLLIN;
+ pfd->revents = 0;
}
-/* Returns the number of rsyncs that have ended since the last query */
-unsigned int
-rsync_finished(void)
+static struct pollfd *
+create_pfds(int request_fd, struct rsync_tasks *tasks, size_t tn)
{
- unsigned char buf[8];
- ssize_t result;
+ struct pollfd *pfds;
+ struct rsync_task *task;
+ size_t p;
- mutex_lock(&readlock);
- result = (readfd != 0) ? read(readfd, buf, sizeof(buf)) : 0;
- mutex_unlock(&readlock);
+ pfds = pmalloc((2 * tn + 1) * sizeof(struct pollfd));
+ p = 0;
- return (result >= 0) ? result : 0;
+ init_pfd(&pfds[p++], request_fd);
+ LIST_FOREACH(task, tasks, lh) {
+ init_pfd(&pfds[p++], task->stdoutfd);
+ init_pfd(&pfds[p++], task->stderrfd);
+ }
+
+ return pfds;
}
static int
-wait_child(char const *name, pid_t pid)
+create_pipes(int fds[2][2])
{
- int status;
int error;
-again: status = 0;
- if (waitpid(pid, &status, 0) < 0) {
+ if (pipe(fds[0]) < 0) {
error = errno;
- pr_op_err("Could not wait for %s: %s", name, strerror(error));
+ pr_op_err_st("Piping rsync stderr: %s", strerror(error));
return error;
}
- if (WIFEXITED(status)) {
- /* Happy path (but also sad path sometimes) */
- error = WEXITSTATUS(status);
- pr_val_debug("%s ended. Result: %d", name, error);
- return error ? EIO : 0;
- }
-
- if (WIFSIGNALED(status)) {
- pr_op_warn("%s interrupted by signal %d.",
- name, WTERMSIG(status));
- return EINTR;
- }
-
- if (WIFCONTINUED(status)) {
- /*
- * Testing warning:
- * I can't trigger this branch. It always exits or signals;
- * SIGSTOP then SIGCONT doesn't seem to wake up waitpid().
- * It's concerning because every sample code I've found assumes
- * waitpid() returning always means the subprocess ended, so
- * they never retry. But that contradicts all documentation,
- * yet seems to be accurate to reality.
- */
- pr_op_debug("%s has resumed.", name);
- goto again;
+ if (pipe(fds[1]) < 0) {
+ error = errno;
+ pr_op_err_st("Piping rsync stdout: %s", strerror(error));
+ close(fds[0][0]);
+ close(fds[0][1]);
+ return error;
}
- /* Dead code */
- pr_op_err("Unknown waitpid() status; giving up %s.", name);
- return EINVAL;
+ return 0;
}
-void
-rsync_teardown(void)
+static void
+prepare_rsync_args(char **args, char const *url, char const *path)
{
- if (readfd)
- close(readfd);
- if (writefd)
- close(writefd);
- readfd = writefd = 0;
- wait_child("rsync spawner", spawner);
+ size_t i;
+
+ /*
+ * execvp() is not going to tweak these strings;
+ * stop angsting over the const-to-raw conversion.
+ */
+
+ for (i = 0; rsync_args[i] != NULL; i++)
+ args[i] = (char *)rsync_args[i];
+ args[i++] = (char *)url;
+ args[i++] = (char *)path;
+ args[i++] = NULL;
}
/*
close(STDOUT_READ(fds));
}
-static void
-prepare_rsync_args(char **args, char const *url, char const *path)
-{
- size_t i = 0;
-
- /*
- * execvp() is not going to tweak these strings;
- * stop angsting over the const-to-raw conversion.
- */
-
- /* XXX review */
- args[i++] = (char *)config_get_rsync_program();
-#ifdef UNIT_TESTING
- /* Note... --bwlimit does not seem to exist in openrsync */
- args[i++] = "--bwlimit=1K";
- args[i++] = "-vvv";
-#else
- args[i++] = "-rtz";
- args[i++] = "--omit-dir-times";
- args[i++] = "--contimeout";
- args[i++] = "20";
- args[i++] = "--max-size";
- args[i++] = "20MB";
- args[i++] = "--timeout";
- args[i++] = "15";
- args[i++] = "--include=*/";
- args[i++] = "--include=*.cer";
- args[i++] = "--include=*.crl";
- args[i++] = "--include=*.gbr";
- args[i++] = "--include=*.mft";
- args[i++] = "--include=*.roa";
- args[i++] = "--exclude=*";
-#endif
- args[i++] = (char *)url;
- args[i++] = (char *)path;
- args[i++] = NULL;
-}
-
static int
execvp_rsync(char const *url, char const *path, int fds[2][2])
{
}
static int
-create_pipes(int fds[2][2])
+fork_rsync(struct rsync_task *task)
{
+ int fork_fds[2][2];
int error;
- if (pipe(fds[0]) == -1) {
+ error = create_pipes(fork_fds);
+ if (error)
+ return error;
+
+ fflush(stdout);
+ fflush(stderr);
+
+ task->pid = fork();
+ if (task->pid < 0) {
error = errno;
- pr_op_err_st("Piping rsync stderr: %s", strerror(error));
- return -error;
+ pr_op_err_st("Couldn't spawn the rsync process: %s",
+ strerror(error));
+ close(STDERR_READ(fork_fds));
+ close(STDOUT_READ(fork_fds));
+ close(STDERR_WRITE(fork_fds));
+ close(STDOUT_WRITE(fork_fds));
+ return error;
}
- if (pipe(fds[1]) == -1) {
- error = errno;
+ if (task->pid == 0) /* Child code */
+ exit(execvp_rsync(task->url, task->path, fork_fds));
- /* Close pipe previously created */
- close(fds[0][0]);
- close(fds[0][1]);
+ /* Parent code */
- pr_op_err_st("Piping rsync stdout: %s", strerror(error));
- return -error;
+ close(STDERR_WRITE(fork_fds));
+ close(STDOUT_WRITE(fork_fds));
+ task->stderrfd = STDERR_READ(fork_fds);
+ task->stdoutfd = STDOUT_READ(fork_fds);
+ return 0;
+}
+
+static unsigned int
+start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
+ struct timespec *now)
+{
+ struct rsync_task *task;
+
+ task = pzalloc(sizeof(struct rsync_task));
+ task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size);
+ task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size);
+ ts_add(&task->expiration, now, 1000 * config_get_rsync_transfer_timeout());
+
+ if (fork_rsync(task) != 0) {
+ void_task(task, s2p);
+ return 0;
}
- return 0;
+ LIST_INSERT_HEAD(tasks, task, lh);
+ pr_val_debug(RSP "Got new task: %d", task->pid);
+ return 1;
+}
+
+static unsigned int
+read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
+ struct timespec *now)
+{
+ struct read_stream *in;
+ ssize_t consumed;
+ size_t offset;
+ asn_dec_rval_t decres;
+ unsigned int t;
+ int error;
+
+ in = &s2p->rd;
+ t = 0;
+
+ do {
+ consumed = read(in->fd, in->buffer + in->len,
+ in->capacity - in->len);
+ if (consumed < 0) {
+ error = errno;
+ if (error != EAGAIN && error != EWOULDBLOCK)
+ rstream_close(in, true);
+ return t;
+ }
+ if (consumed == 0) { /* EOS */
+ rstream_close(in, true);
+ return t;
+ }
+
+ in->len += consumed;
+
+ for (offset = 0; offset < in->len;) {
+ decres = ber_decode(&asn_DEF_RsyncRequest,
+ (void **)&s2p->rr,
+ in->buffer + offset, in->len - offset);
+ offset += decres.consumed;
+ switch (decres.code) {
+ case RC_OK:
+ t += start_task(s2p, tasks, now);
+ ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr);
+ break;
+ case RC_WMORE:
+ goto break_for;
+ case RC_FAIL:
+ rstream_close(in, true);
+ return t;
+ }
+ }
+
+break_for: if (offset > in->len)
+ pr_crit("read_tasks off:%zu len:%zu", offset, in->len);
+ in->len -= offset;
+ memmove(in->buffer, in->buffer + offset, in->len);
+ } while (true);
}
-static long
-get_current_millis(void)
+/* Returns number of tasks created */
+static int
+handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd,
+ struct rsync_tasks *tasks, struct timespec *now)
{
- struct timespec now;
- if (clock_gettime(CLOCK_MONOTONIC, &now) < 0)
- pr_crit("clock_gettime() returned %d", errno);
- return 1000L * now.tv_sec + now.tv_nsec / 1000000L;
+ if (s2p->rd.fd == -1)
+ return 0;
+
+ if (pfd->revents & POLLNVAL) {
+ pr_val_err(RSP "bad parent fd: %i", pfd->fd);
+ rstream_close(&s2p->rd, false);
+ return 0;
+ }
+
+ if (pfd->revents & POLLERR) {
+ pr_val_err(RSP "Generic error during parent fd poll.");
+ rstream_close(&s2p->rd, true);
+ return 0;
+ }
+
+ if (pfd->revents & (POLLIN | POLLHUP))
+ return read_tasks(s2p, tasks, now);
+
+ return 0;
}
static void
cur = cpy;
while ((tmp = strchr(cur, '\n')) != NULL) {
*tmp = '\0';
- if(strlen(cur) == 0) {
+ if (strlen(cur) == 0) {
cur = tmp + 1;
continue;
}
free(cpy);
}
-#define DROP_FD(f, fail) \
- do { \
- pfd[f].fd = -1; \
- error |= fail; \
- } while (0)
-#define CLOSE_FD(f, fail) \
- do { \
- close(pfd[f].fd); \
- DROP_FD(f, fail); \
- } while (0)
+/* 0 = still more to read; 1 = stream down */
+static int
+log_rsync_output(struct pollfd *pfd, size_t p)
+{
+ char buffer[1024];
+ ssize_t count;
+ int error;
+
+ count = read(pfd->fd, buffer, sizeof(buffer));
+ if (count == 0)
+ goto down; /* EOF */
+ if (count == -1) {
+ error = errno;
+ if (error == EINTR)
+ return 0; /* Dunno; retry */
+ pr_val_err("rsync buffer read error: %s", strerror(error));
+ goto down; /* Error */
+ }
+
+ log_buffer(buffer, count, (p & 1) == 0);
+ return 0; /* Keep going */
+
+down: close(pfd->fd);
+ pfd->fd = -1;
+ return 1;
+}
+
+/* Returns 1 if the stream ended */
+static int
+handle_rsync_fd(struct pollfd *pfd, size_t p)
+{
+ if (pfd->fd == -1) {
+ pr_val_debug(RSP "File descriptor already closed.");
+ return 1;
+ }
+
+ if (pfd->revents & POLLNVAL) {
+ pr_val_err(RSP "rsync bad fd: %i", pfd->fd);
+ return 1;
+ }
+
+ if (pfd->revents & POLLERR) {
+ pr_val_err(RSP "Generic error during rsync poll.");
+ close(pfd->fd);
+ return 1;
+ }
+
+ if (pfd->revents & (POLLIN | POLLHUP))
+ return log_rsync_output(pfd, p);
+
+ return 0;
+}
-/*
- * Consumes (and throws away) all the bytes in read streams @fderr and @fdout,
- * then closes them once they reach end of stream.
- *
- * Returns: ok -> 0, error -> 1, timeout -> 2.
- */
static int
-exhaust_read_fds(int fderr, int fdout)
+wait_subprocess(char const *name, pid_t pid)
+{
+ int status;
+ int error;
+
+again: status = 0;
+ if (waitpid(pid, &status, 0) < 0) {
+ error = errno;
+ pr_op_err("Could not wait for %s: %s", name, strerror(error));
+ return error;
+ }
+
+ if (WIFEXITED(status)) {
+ /* Happy path (but also sad path sometimes) */
+ error = WEXITSTATUS(status);
+ pr_val_debug("%s ended. Result: %d", name, error);
+ return error ? EIO : 0;
+ }
+
+ if (WIFSIGNALED(status)) {
+ pr_op_warn("%s interrupted by signal %d (%s).",
+ name, WTERMSIG(status), strsignal(WTERMSIG(status)));
+ return EINTR;
+ }
+
+ if (WIFCONTINUED(status)) {
+ /*
+ * Testing warning:
+ * I can't trigger this branch. It always exits or signals;
+ * SIGSTOP then SIGCONT doesn't seem to wake up waitpid().
+ * It's concerning because every sample code I've found assumes
+ * waitpid() returning always means the subprocess ended, so
+ * they never retry. But that contradicts all documentation,
+ * yet seems to be accurate to reality.
+ */
+ pr_op_debug("%s has resumed.", name);
+ goto again;
+ }
+
+ /* Dead code */
+ pr_op_err("Unknown waitpid() status; giving up %s.", name);
+ return EINVAL;
+}
+
+static void
+kill_subprocess(struct rsync_task *task)
+{
+ if (task->stdoutfd != -1)
+ close(task->stdoutfd);
+ if (task->stderrfd != -1)
+ close(task->stderrfd);
+ kill(task->pid, SIGTERM);
+}
+
+/* Returns true if the task died. */
+static bool
+maybe_expire(struct timespec *now, struct rsync_task *task,
+ struct s2p_socket *s2p)
{
- struct pollfd pfd[2];
- int error, nready, f;
- long epoch, delta, timeout;
+ struct timespec epoch;
+
+ ts_add(&epoch, now, 100);
+ if (ts_cmp(&epoch, &task->expiration) < 0)
+ return false;
+
+ pr_op_debug(RSP "Task %d ran out of time.", task->pid);
+ kill_subprocess(task);
+ wait_subprocess("rsync", task->pid);
+ finish_task(task, s2p);
+ return true;
+}
+
+static int
+spawner_run(
+ int request_fd, /* Requests from parent to us (spawner) */
+ int response_fd /* Responses from us (spawner) to parent */
+) {
+ struct s2p_socket s2p; /* Channel to parent */
+
+ struct pollfd *pfds; /* Channels to children */
+ size_t p, pfds_count;
+ struct timespec now, expiration;
+ int timeout;
- memset(&pfd, 0, sizeof(pfd));
- pfd[0].fd = fderr;
- pfd[0].events = POLLIN;
- pfd[1].fd = fdout;
- pfd[1].events = POLLIN;
+ int events;
+ struct rsync_tasks tasks;
+ struct rsync_task *task, *tmp;
+ int task_count;
+
+ int error;
+
+ rstream_init(&s2p.rd, request_fd, 1024);
+ s2p.wr = response_fd;
+ s2p.rr = NULL;
+
+ LIST_INIT(&tasks);
+ task_count = 0;
error = 0;
- epoch = get_current_millis();
- delta = 0;
- timeout = 1000 * config_get_rsync_transfer_timeout();
+ ts_now(&now);
- while (1) {
- nready = poll(pfd, 2, timeout - delta);
- if (nready == 0)
- goto timed_out;
- if (nready == -1) {
+ do {
+ /*
+ * 0: request pipe
+ * odd: stdouts
+ * even > 0: stderrs
+ */
+ pfds = create_pfds(s2p.rd.fd, &tasks, task_count);
+ pfds_count = 2 * task_count + 1;
+ expiration.tv_sec = now.tv_sec + 10;
+ expiration.tv_nsec = now.tv_nsec;
+ LIST_FOREACH(task, &tasks, lh)
+ if ((ts_cmp(&now, &task->expiration) < 0) &&
+ (ts_cmp(&task->expiration, &expiration) < 0))
+ expiration = task->expiration;
+
+ timeout = ts_delta(&now, &expiration);
+ pr_val_debug(RSP "Timeout decided: %dms", timeout);
+ events = poll(pfds, pfds_count, timeout);
+ if (events < 0) {
error = errno;
- if (error == EINTR)
- continue;
- pr_val_err("rsync bad poll: %s", strerror(error));
- error = 1;
- goto fail;
+ free(pfds);
+ break;
}
- for (f = 0; f < 2; f++) {
- if (pfd[f].revents & POLLNVAL) {
- pr_val_err("rsync bad fd: %i", pfd[f].fd);
- DROP_FD(f, 1);
-
- } else if (pfd[f].revents & POLLERR) {
- pr_val_err("Generic error during rsync poll.");
- CLOSE_FD(f, 1);
-
- } else if (pfd[f].revents & (POLLIN|POLLHUP)) {
- char buffer[4096];
- ssize_t count;
-
- count = read(pfd[f].fd, buffer, sizeof(buffer));
- if (count == -1) {
- error = errno;
- if (error == EINTR)
- continue;
- pr_val_err("rsync buffer read error: %s",
- strerror(error));
- CLOSE_FD(f, 1);
- continue;
- }
-
- if (count == 0)
- CLOSE_FD(f, 0);
- log_buffer(buffer, count, pfd[f].fd == fderr);
- }
+ ts_now(&now);
+
+ if (events == 0) { /* Timeout */
+ pr_val_debug(RSP "Woke up because of timeout.");
+ LIST_FOREACH_SAFE(task, &tasks, lh, tmp)
+ task_count -= maybe_expire(&now, task, &s2p);
+ goto cont;
}
- if (pfd[0].fd == -1 && pfd[1].fd == -1)
- return error; /* Happy path! */
+ pr_val_debug(RSP "Woke up because of input.");
+ p = 1;
+ LIST_FOREACH_SAFE(task, &tasks, lh, tmp) {
+ if (maybe_expire(&now, task, &s2p)) {
+ task_count--;
+ continue;
+ }
- delta = get_current_millis() - epoch;
- if (delta < 0) {
- pr_val_err("This clock does not seem monotonic. "
- "I'm going to have to give up this rsync.");
- error = 1;
- goto fail;
+ if (handle_rsync_fd(&pfds[p], p)) {
+ pr_val_debug(RSP "Task %d: Stdout closed.",
+ task->pid);
+ task->stdoutfd = -1;
+ }
+ p++;
+ if (handle_rsync_fd(&pfds[p], p)) {
+ pr_val_debug(RSP "Task %d: Stderr closed.",
+ task->pid);
+ task->stderrfd = -1;
+ }
+ p++;
+ if (task->stdoutfd == -1 && task->stderrfd == -1) {
+ pr_val_debug(RSP "Both stdout & stderr are closed; ending task %d.",
+ task->pid);
+ wait_subprocess("rsync", task->pid);
+ finish_task(task, &s2p);
+ task_count--;
+ }
}
- if (delta >= timeout)
- goto timed_out; /* Read took too long */
+ task_count += handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
+
+cont: free(pfds);
+ } while ((s2p.rd.fd != -1 || task_count > 0));
+ pr_val_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
+
+ LIST_FOREACH_SAFE(task, &tasks, lh, tmp) {
+ kill_subprocess(task);
+ wait_subprocess("rsync", task->pid);
+ finish_task(task, &s2p);
}
-timed_out:
- pr_val_err("rsync transfer timeout exhausted");
- error = 2;
-fail: for (f = 0; f < 2; f++)
- if (pfd[f].fd != -1)
- close(pfd[f].fd);
+ rstream_close(&s2p.rd, true);
+ if (s2p.wr != -1)
+ close(s2p.wr);
+
+ free_rpki_config();
+ log_teardown();
return error;
}
-/*
- * Completely consumes @fds' streams, and closes them.
- *
- * Originally, this was meant to redirect rsync's output to syslog:
- * ac56d70c954caf49382f5f28ff4a017e859e2e0a
- * (ie. we need to exhaust the streams because we dup2()'d them.)
- *
- * Later, @job repurposed this code to fix #74.
- */
static int
-exhaust_pipes(int fds[2][2])
+nonblock_pipe(int *fds)
{
- close(STDERR_WRITE(fds));
- close(STDOUT_WRITE(fds));
- return exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds));
+ int error;
+ int flags;
+
+ if (pipe(fds) < 0) {
+ error = errno;
+ pr_op_err("Cannot create pipe: %s", strerror(error));
+ return error;
+ }
+
+ flags = fcntl(fds[RDFD], F_GETFL);
+ if (flags < 0) {
+ error = errno;
+ pr_op_err("Cannot retrieve pipe flags: %s", strerror(error));
+ goto cancel;
+ }
+ if (fcntl(fds[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) {
+ error = errno;
+ pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(error));
+ goto cancel;
+ }
+
+ return 0;
+
+cancel: close(fds[RDFD]);
+ close(fds[WRFD]);
+ return error;
}
-/* rsync @url @path */
-static int
-rsync(char const *url, char const *path)
+void
+rsync_setup(char const *program, ...)
{
- /* Descriptors to pipe stderr (first element) and stdout (second) */
- int fork_fds[2][2];
- pid_t child_pid;
- int error;
+ int parent2spawner[2]; /* Pipe: Parent writes, spawner reads */
+ int spawner2parent[2]; /* Pipe: Spawner writes, parent reads */
- error = create_pipes(fork_fds);
- if (error)
- return error;
+ va_list args;
+ array_index i;
+ char const *arg;
+
+ if (program != NULL) {
+ rsync_args[0] = arg = program;
+ va_start(args, program);
+ for (i = 1; arg != NULL; i++) {
+ arg = va_arg(args, char const *);
+ rsync_args[i] = arg;
+ }
+ va_end(args);
+ } else {
+ /* XXX review */
+ /* XXX Where is --delete? */
+ i = 0;
+ rsync_args[i++] = config_get_rsync_program();
+ rsync_args[i++] = "-rtz";
+ rsync_args[i++] = "--omit-dir-times";
+ rsync_args[i++] = "--contimeout";
+ rsync_args[i++] = "20";
+ rsync_args[i++] = "--max-size";
+ rsync_args[i++] = "20MB";
+ rsync_args[i++] = "--timeout";
+ rsync_args[i++] = "15";
+ rsync_args[i++] = "--include=*/";
+ rsync_args[i++] = "--include=*.cer";
+ rsync_args[i++] = "--include=*.crl";
+ rsync_args[i++] = "--include=*.gbr";
+ rsync_args[i++] = "--include=*.mft";
+ rsync_args[i++] = "--include=*.roa";
+ rsync_args[i++] = "--exclude=*";
+ rsync_args[i++] = NULL;
+ }
+
+ if (nonblock_pipe(parent2spawner) != 0)
+ goto fail1;
+ if (nonblock_pipe(spawner2parent) != 0)
+ goto fail2;
fflush(stdout);
fflush(stderr);
- child_pid = fork();
- if (child_pid < 0) {
- error = errno;
- pr_op_err_st("Couldn't spawn the rsync process: %s",
- strerror(error));
- /* Close all ends from the created pipes */
- close(STDERR_READ(fork_fds));
- close(STDOUT_READ(fork_fds));
- close(STDERR_WRITE(fork_fds));
- close(STDOUT_WRITE(fork_fds));
- return error;
+ spawner = fork();
+ if (spawner < 0) {
+ pr_op_err("Cannot fork rsync spawner: %s", strerror(errno));
+ goto fail3;
}
- if (child_pid == 0)
- exit(execvp_rsync(url, path, fork_fds)); /* Child code */
+ if (spawner == 0) { /* Client code */
+ close(parent2spawner[WRFD]);
+ close(spawner2parent[RDFD]);
+ exit(spawner_run(parent2spawner[RDFD], spawner2parent[WRFD]));
+ }
/* Parent code */
+ close(parent2spawner[RDFD]);
+ close(spawner2parent[WRFD]);
+ readfd = spawner2parent[RDFD];
+ writefd = parent2spawner[WRFD];
+ return;
- error = exhaust_pipes(fork_fds);
- if (error)
- kill(child_pid, SIGTERM); /* Stop the child */
+fail3: close(spawner2parent[RDFD]);
+ close(spawner2parent[WRFD]);
+fail2: close(parent2spawner[RDFD]);
+ close(parent2spawner[WRFD]);
+fail1: pr_op_warn("rsync will not be available.");
+ readfd = writefd = -1;
+}
- return wait_child("rsync", child_pid);
+static int
+send_to_spawner(const void *buffer, size_t size, void *arg)
+{
+ return stream_full_write(writefd, buffer, size);
+}
+
+/* Queues rsync; doesn't wait. Call rsync_finished() later. */
+int
+rsync_queue(char const *url, char const *path)
+{
+ struct RsyncRequest req;
+ asn_enc_rval_t result;
+ int error;
+
+ if (RsyncRequest_init(&req, url, path) < 0)
+ return EINVAL;
+
+ mutex_lock(&writelock);
+
+ if (writefd == -1) {
+ error = EIO;
+ goto end;
+ }
+
+ result = der_encode(&asn_DEF_RsyncRequest, &req, send_to_spawner, NULL);
+ if (result.encoded == -1) {
+ close(writefd);
+ writefd = -1;
+ error = EIO;
+ goto end;
+ }
+
+ error = 0;
+end: mutex_unlock(&writelock);
+ ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req);
+ return error;
+}
+
+/* Returns the number of rsyncs that have ended since the last query */
+unsigned int
+rsync_finished(void)
+{
+ unsigned char buf[8];
+ ssize_t result;
+
+ mutex_lock(&readlock);
+ result = (readfd != -1) ? read(readfd, buf, sizeof(buf)) : 0;
+ mutex_unlock(&readlock);
+
+ return (result >= 0) ? result : 0;
+}
+
+void
+rsync_teardown(void)
+{
+ if (readfd != -1)
+ close(readfd);
+ if (writefd != -1)
+ close(writefd);
+ readfd = writefd = -1;
+ wait_subprocess("rsync spawner", spawner);
}
#ifndef SRC_RSYNC_RSYNC_H_
#define SRC_RSYNC_RSYNC_H_
-void rsync_setup(void);
-int rsync_download(char const *, char const *);
+void rsync_setup(char const *, ...);
+int rsync_queue(char const *, char const *);
unsigned int rsync_finished(void);
void rsync_teardown(void);
#include "log.h"
void
-read_stream_init(struct read_stream *stream, int fd)
+rstream_init(struct read_stream *stream, int fd, size_t initial_capacity)
{
stream->fd = fd;
- stream->buffer = pmalloc(256);
- stream->capacity = 256;
+ stream->buffer = pmalloc(initial_capacity);
+ stream->len = 0;
+ stream->capacity = initial_capacity;
}
void
-read_stream_close(struct read_stream *stream)
+rstream_close(struct read_stream *stream, bool do_close)
{
- close(stream->fd);
+ if (stream->fd == -1)
+ return;
+
+ if (do_close)
+ close(stream->fd);
free(stream->buffer);
+ stream->fd = -1;
}
/*
- * Full read or error.
+ * Reads until exactly @len bytes (sleeping if necessary), EOS or error.
+ * @stream's capacity must be >= len.
*
- * Nonzero: error
- * 0, stream->buffer != NULL: Success
- * 0, stream->buffer == NULL: EOF
+ * Returns:
+ * - >= 0: Number of bytes read (< @len only if EOS reached).
+ * - < 0: error; remove sign for proper code.
*/
-static int
-full_read(struct read_stream *stream, size_t len)
+int
+rstream_full_read(struct read_stream *stream, size_t len)
{
- ssize_t rd;
size_t offset;
+ ssize_t rd;
- if (stream->buffer == NULL)
- return 0;
+ if (stream->buffer == NULL || stream->capacity < len)
+ return -ENOSPC;
- offset = 0;
- do {
- rd = read(stream->fd, stream->buffer + offset, len);
+ for (offset = 0; offset < len; offset += rd) {
+ rd = read(stream->fd, stream->buffer + offset, len - offset);
if (rd < 0)
- return errno;
- if (rd == 0) {
- free(stream->buffer);
- stream->buffer = NULL;
- stream->capacity = 0;
- return 0;
- }
- if (rd == len)
- return 0;
- if (rd > len)
- pr_crit("rd > len: %zd > %zu", rd, len);
+ return -errno;
+ if (rd == 0)
+ break;
+ }
- len -= rd;
- offset += rd;
- } while (true);
+ return offset;
}
-/* Full write or error. */
+/* Full write (sleeps if necessary) or error. */
int
-full_write(int fd, unsigned char const *buf, size_t len)
+stream_full_write(int fd, unsigned char const *buf, size_t len)
{
ssize_t wr;
size_t offset;
return 0;
}
-
-/* @value -1 means "EOF". */
-static int
-read_ssize_t(struct read_stream *stream, ssize_t *value)
-{
- int error;
-
- error = full_read(stream, 2);
- if (error)
- return error;
-
- *value = stream->buffer
- ? ((stream->buffer[0] << 8) | stream->buffer[1])
- : -1;
- return 0;
-}
-
-static int
-write_size_t(int fd, size_t value)
-{
- unsigned char buf[2];
-
- if (value > 1024)
- return ENOSPC;
-
- buf[0] = (value >> 8) & 0xFF;
- buf[1] = value & 0xFF;
-
- return full_write(fd, buf, 2);
-}
-
-int
-read_string(struct read_stream *stream, char **result)
-{
- ssize_t len;
- int error;
-
- error = read_ssize_t(stream, &len);
- if (error)
- return error;
- if (len == -1) {
- *result = NULL;
- return 0;
- }
-
- if (len > stream->capacity) {
- do {
- stream->capacity *= 2;
- } while (len > stream->capacity);
- stream->buffer = prealloc(stream->buffer, stream->capacity);
- }
-
- error = full_read(stream, len);
- if (error)
- return error;
- *result = stream->buffer ? pstrdup((char *)stream->buffer) : NULL;
- return 0;
-}
-
-int
-write_string(int fd, char const *str)
-{
- size_t len;
- int error;
-
- len = strlen(str) + 1;
-
- error = write_size_t(fd, len);
- if (error)
- return error;
-
- return full_write(fd, (unsigned char const *) str, len);
-}
#ifndef SRC_STREAM_H_
#define SRC_STREAM_H_
+#include <stdbool.h>
#include <stddef.h>
struct read_stream {
int fd;
unsigned char *buffer;
+ size_t len;
size_t capacity;
};
-void read_stream_init(struct read_stream *, int);
-void read_stream_close(struct read_stream *);
+void rstream_init(struct read_stream *, int, size_t);
+int rstream_full_read(struct read_stream *, size_t);
+void rstream_close(struct read_stream *, bool);
-int full_write(int, unsigned char const *, size_t);
-
-/* NULL means "EOF". */
-int read_string(struct read_stream *, char **);
-int write_string(int, char const *);
+int stream_full_write(int, unsigned char const *, size_t);
#endif /* SRC_STREAM_H_ */
asn1_int_test_SOURCES = asn1/asn1c/INTEGER_t_test.c
asn1_int_test_LDADD = ${CHECK_LIBS}
+check_PROGRAMS += asn1_stream.test
+asn1_stream_test_SOURCES = asn1/asn1c/asn1stream_test.c
+asn1_stream_test_LDADD = ${CHECK_LIBS}
+
check_PROGRAMS += base64.test
base64_test_SOURCES = base64_test.c
base64_test_LDADD = ${CHECK_LIBS}
serial_test_SOURCES = types/serial_test.c
serial_test_LDADD = ${CHECK_LIBS}
+#check_PROGRAMS += stream.test
+#stream_test_SOURCES = stream_test.c
+#stream_test_LDADD = ${CHECK_LIBS}
+
check_PROGRAMS += tal.test
tal_test_SOURCES = object/tal_test.c
tal_test_LDADD = ${CHECK_LIBS}
--- /dev/null
+#include <check.h>
+
+#include "alloc.c"
+#include "mock.c"
+#include "asn1/asn1c/ber_decoder.c"
+#include "asn1/asn1c/ber_tlv_length.c"
+#include "asn1/asn1c/ber_tlv_tag.c"
+#include "asn1/asn1c/constr_CHOICE.c"
+#include "asn1/asn1c/constr_SEQUENCE.c"
+#include "asn1/asn1c/constr_TYPE.c"
+#include "asn1/asn1c/constraints.c"
+#include "asn1/asn1c/der_encoder.c"
+#include "asn1/asn1c/OCTET_STRING.c"
+#include "asn1/asn1c/OPEN_TYPE.c"
+#include "asn1/asn1c/RsyncRequest.c"
+
+MOCK_ABORT_PTR(json_strn_new, json_t, const char *value, size_t len)
+MOCK_ABORT_PTR(json_obj_new, json_t, void)
+MOCK_ABORT_PTR(json_null, json_t, void)
+MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void)
+
+START_TEST(test_multiple)
+{
+ struct RsyncRequest src = { 0 };
+ struct RsyncRequest *dst = NULL;
+ unsigned char buf[64] = { 0 };
+ asn_enc_rval_t encres;
+ asn_dec_rval_t decres;
+
+ ck_assert_int_eq(0, OCTET_STRING_fromString(&src.url, "url"));
+ ck_assert_int_eq(0, OCTET_STRING_fromString(&src.path, "path"));
+ encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &src, buf, sizeof(buf));
+ ck_assert_int_eq(13, encres.encoded);
+
+ ck_assert_int_eq(0, OCTET_STRING_fromString(&src.url, "https://a.b.c/d/e.cer"));
+ ck_assert_int_eq(0, OCTET_STRING_fromString(&src.path, "tmp/http/a.b.c/d/e.cer"));
+ encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &src, buf + 13, sizeof(buf) - 13);
+ ck_assert_int_eq(49, encres.encoded);
+
+ decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, buf, sizeof(buf));
+ ck_assert_int_eq(RC_OK, decres.code);
+ ck_assert_int_eq(13, decres.consumed);
+ ck_assert_uint_eq(3, dst->url.size);
+ ck_assert_mem_eq("url", dst->url.buf, 3);
+ ck_assert_uint_eq(4, dst->path.size);
+ ck_assert_mem_eq("path", dst->path.buf, 4);
+
+ dst = NULL;
+
+ /* Fragment */
+ decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, buf + 13, 13);
+ ck_assert_int_eq(RC_WMORE, decres.code);
+ ck_assert_int_eq(13, decres.consumed);
+ ck_assert_ptr_ne(NULL, dst);
+
+ decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst, buf + 26, sizeof(buf) - 26);
+ ck_assert_int_eq(RC_OK, decres.code);
+ ck_assert_int_eq(36, decres.consumed);
+ ck_assert_uint_eq(21, dst->url.size);
+ ck_assert_mem_eq("https://a.b.c/d/e.cer", dst->url.buf, 21);
+ ck_assert_uint_eq(22, dst->path.size);
+ ck_assert_mem_eq("tmp/http/a.b.c/d/e.cer", dst->path.buf, 22);
+}
+END_TEST
+
+static Suite *
+create_suite(void)
+{
+ Suite *suite;
+ TCase *pipes;
+
+ pipes = tcase_create("multiple");
+ tcase_add_test(pipes, test_multiple);
+
+ suite = suite_create("asn1 stream");
+ suite_add_tcase(suite, pipes);
+
+ return suite;
+}
+
+int
+main(void)
+{
+ SRunner *runner;
+ int tests_failed;
+
+ runner = srunner_create(create_suite());
+ srunner_run_all(runner, CK_NORMAL);
+ tests_failed = srunner_ntests_failed(runner);
+ srunner_free(runner);
+
+ return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}
#include "types/str.c"
#include "types/url.c"
+/*
+ * XXX
+ *
+ * Check rpps can't override each other:
+ * rpp1: rsync://domain/mod/rpp1, file rsync://domain/mod/rpp1/a
+ * rpp2: rsync://domain/mod/rpp1/a/b, file rsync://domain/mod/rpp1/a/b/c
+ * Check nodes are cleaned up when the file doesn't exist
+ * Check the file gets cleaned when the node doesn't exist
+ * If node is an error, what happens to the refresh and fallback?
+ * What happens if the refresh files somehow get deleted before a commit?
+ * What happens if the fallback files somehow get deleted before/after a commit?
+ */
+
/* Mocks */
static unsigned int rsync_counter; /* Times the rsync function was called */
}
int
-rsync_download(char const *url, char const *path)
+rsync_queue(char const *url, char const *path)
{
rsync_counter++;
#include "alloc.c"
#include "mock.c"
+static const long MS2NS = 1000000L;
+
START_TEST(test_tt)
{
char str[FORT_TS_LEN + 1];
}
END_TEST
+#define init_ts(ts, s, ns) \
+ ts.tv_sec = s; \
+ ts.tv_nsec = ns
+
+#define ck_ts(ts, s, ns) \
+ ck_assert_int_eq(s, ts.tv_sec); \
+ ck_assert_int_eq(ns, ts.tv_nsec)
+
+START_TEST(test_ts_normalize)
+{
+ struct timespec ts;
+
+ init_ts(ts, 100, 0);
+ ts_normalize(&ts);
+ ck_ts(ts, 100, 0);
+
+ init_ts(ts, 100, 999999999);
+ ts_normalize(&ts);
+ ck_ts(ts, 100, 999999999);
+
+ init_ts(ts, 100, 1000 * MS2NS);
+ ts_normalize(&ts);
+ ck_ts(ts, 101, 0);
+
+ init_ts(ts, 100, 2500 * MS2NS);
+ ts_normalize(&ts);
+ ck_ts(ts, 102, 500 * MS2NS);
+
+ init_ts(ts, 100, -1);
+ ts_normalize(&ts);
+ ck_ts(ts, 99, 999999999);
+
+ init_ts(ts, 100, -5000 * MS2NS);
+ ts_normalize(&ts);
+ ck_ts(ts, 95, 0);
+
+ init_ts(ts, 100, -5000 * MS2NS - 2);
+ ts_normalize(&ts);
+ ck_ts(ts, 94, 999999998);
+}
+END_TEST
+
+START_TEST(test_ts_cmp)
+{
+ struct timespec t1, t2;
+
+ /* Same second */
+ init_ts(t1, 100, 500);
+ init_ts(t2, 100, 500);
+ ck_assert_int_eq(0, ts_cmp(&t1, &t2));
+ ck_assert_int_eq(0, ts_cmp(&t2, &t1));
+
+ t1.tv_nsec = 1000;
+ t2.tv_nsec = 9000;
+ ck_assert(ts_cmp(&t1, &t2) < 0);
+ ck_assert(ts_cmp(&t2, &t1) > 0);
+
+ t1.tv_nsec = 9000;
+ t2.tv_nsec = 1000;
+ ck_assert(ts_cmp(&t1, &t2) > 0);
+ ck_assert(ts_cmp(&t2, &t1) < 0);
+
+ /* t1 < t2 by second */
+ init_ts(t1, 100, 500);
+ init_ts(t2, 101, 500);
+ ck_assert(ts_cmp(&t1, &t2) < 0);
+ ck_assert(ts_cmp(&t2, &t1) > 0);
+
+ t1.tv_nsec = 1000;
+ t2.tv_nsec = 9000;
+ ck_assert(ts_cmp(&t1, &t2) < 0);
+ ck_assert(ts_cmp(&t2, &t1) > 0);
+
+ t1.tv_nsec = 9000;
+ t2.tv_nsec = 1000;
+ ck_assert(ts_cmp(&t1, &t2) < 0);
+ ck_assert(ts_cmp(&t2, &t1) > 0);
+
+ /* t1 > t2 by second */
+ init_ts(t1, 100, 500);
+ init_ts(t2, 99, 500);
+ ck_assert(ts_cmp(&t1, &t2) > 0);
+ ck_assert(ts_cmp(&t2, &t1) < 0);
+
+ t1.tv_nsec = 1000;
+ t2.tv_nsec = 9000;
+ ck_assert(ts_cmp(&t1, &t2) > 0);
+ ck_assert(ts_cmp(&t2, &t1) < 0);
+
+ t1.tv_nsec = 9000;
+ t2.tv_nsec = 1000;
+ ck_assert(ts_cmp(&t1, &t2) > 0);
+ ck_assert(ts_cmp(&t2, &t1) < 0);
+}
+END_TEST
+
+START_TEST(test_ts_delta)
+{
+ struct timespec t1, t2;
+
+ init_ts(t1, 100, 0);
+ init_ts(t2, 100, 2 * MS2NS);
+ ck_assert_int_eq(2, ts_delta(&t1, &t2));
+ ck_assert_int_eq(-2, ts_delta(&t2, &t1));
+
+ init_ts(t1, 100, 0);
+ init_ts(t2, 100, -2 * MS2NS);
+ ck_assert_int_eq(-2, ts_delta(&t1, &t2));
+ ck_assert_int_eq(2, ts_delta(&t2, &t1));
+
+ init_ts(t1, 50, 0);
+ init_ts(t2, 100, 0);
+ ck_assert_int_eq(50000, ts_delta(&t1, &t2));
+ ck_assert_int_eq(-50000, ts_delta(&t2, &t1));
+
+ init_ts(t1, -10, 0);
+ init_ts(t2, 10, 0);
+ ck_assert_int_eq(20000, ts_delta(&t1, &t2));
+ ck_assert_int_eq(-20000, ts_delta(&t2, &t1));
+
+ init_ts(t1, -10, 1 * MS2NS); /* -9999ms */
+ init_ts(t2, 10, 2 * MS2NS); /* 10002ms */
+ ck_assert_int_eq(20001, ts_delta(&t1, &t2));
+ ck_assert_int_eq(-20001, ts_delta(&t2, &t1));
+}
+END_TEST
+
+START_TEST(test_ts_add)
+{
+ struct timespec src, dst;
+
+ init_ts(src, 100, 0); /* 100 */
+ ts_add(&dst, &src, 100); /* +0.1 */
+ ck_ts(dst, 100, 100 * MS2NS); /* 100.1 */
+
+ ts_add(&dst, &src, -100); /* -0.1 */
+ ck_ts(dst, 99, 900 * MS2NS); /* 99.9 */
+
+ init_ts(src, 100, 50 * MS2NS); /* 100.05 */
+ ts_add(&dst, &src, 100); /* +0.1 */
+ ck_ts(dst, 100, 150 * MS2NS); /* 100.15 */
+
+ ts_add(&dst, &src, -100); /* -0.1 */
+ ck_ts(dst, 99, 950 * MS2NS); /* 99.95 */
+
+ init_ts(src, 100, 0); /* 100 */
+ ts_add(&dst, &src, 10000); /* +10 */
+ ck_ts(dst, 110, 0); /* 110 */
+
+ ts_add(&dst, &src, -10000); /* -10 */
+ ck_ts(dst, 90, 0); /* 90 */
+
+ init_ts(src, 89, 123409876); /* 89.1234 */
+ ts_add(&dst, &src, 98765); /* +98.765 */
+ ck_ts(dst, 187, 888409876); /* 187.8884 */
+
+ init_ts(src, 9, 123409876); /* 9.1234 */
+ ts_add(&dst, &src, -12345); /* -12.345 */
+ ck_ts(dst, -4, 778409876); /* -3.2216 (-4+0.7784) */
+}
+END_TEST
+
static Suite *create_suite(void)
{
Suite *suite;
- TCase *core;
+ TCase *timet, *ts;
+
+ timet = tcase_create("time_t");
+ tcase_add_test(timet, test_tt);
- core = tcase_create("utils");
- tcase_add_test(core, test_tt);
+ ts = tcase_create("timespec");
+ tcase_add_test(ts, test_ts_normalize);
+ tcase_add_test(ts, test_ts_cmp);
+ tcase_add_test(ts, test_ts_delta);
+ tcase_add_test(ts, test_ts_add);
suite = suite_create("commons");
- suite_add_tcase(suite, core);
+ suite_add_tcase(suite, timet);
+ suite_add_tcase(suite, ts);
return suite;
}
--- /dev/null
+#!/bin/sh
+
+STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "
+
+echo "$STR"
+sleep 1 # 3999
+echo "$STR"
+echo "$STR" 1>&2
+sleep 1 # 2998
+echo "$STR"
+sleep 1 # 1997
+echo "$STR" 1>&2
+sleep 1 # The spawner should kill us around the end of this.
+echo "$STR"
+sleep 1
+echo "$STR"
+sleep 1 # Otherwise, check will kill the spawner around here.
+echo "$STR"
+sleep 1
+echo "$STR"
+sleep 1
+echo "$STR"
--- /dev/null
+#!/bin/sh
+
+STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "
+
+echo "$STR"
+sleep 1
+echo "$STR"
+echo "$STR" 1>&2
+sleep 1
+echo "$STR"
+sleep 1
+echo "$STR" 1>&2
--- /dev/null
+#!/bin/sh
+
+STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "
+
+echo "$STR" 1>&2
+echo "$STR"
+echo "$STR" 1>&2
+echo "$STR"
--- /dev/null
+#!/bin/sh
+
+STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "
+
+echo "$STR"
+# The spawner has to kill us after 4 seconds.
+# (This is what we're testing.)
+# If it doesn't, check will fail the unit test during the 6th second.
+sleep 20
+echo "$STR"
--- /dev/null
+#!/bin/sh
+
+STR="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "
+
+echo "$STR"
+sleep 3 # Barely dodge the spawner's timeout (4 seconds).
+echo "$STR"
#include "rsync.c"
#include "stream.c"
+
+#include "asn1/asn1c/ber_decoder.c"
+#include "asn1/asn1c/ber_tlv_length.c"
+#include "asn1/asn1c/ber_tlv_tag.c"
+#include "asn1/asn1c/constr_SEQUENCE.c"
+#include "asn1/asn1c/constr_TYPE.c"
+#include "asn1/asn1c/der_encoder.c"
+#include "asn1/asn1c/OCTET_STRING.c"
+#include "asn1/asn1c/RsyncRequest.c"
+
static char const STR64[] = "abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"0123456789 \n";
static const size_t STR64LEN = sizeof(STR64) - 1;
static char content[1024];
+#define BUFSIZE 128
+
/* Mocks */
MOCK(config_get_rsync_program, char const *, "rsync", void)
MOCK(config_get_rsync_transfer_timeout, long, 4, void)
+MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void)
+
+MOCK_ABORT_PTR(json_obj_new, json_t, void)
+MOCK_ABORT_VOID(json_delete, json_t *json)
+MOCK_ABORT_PTR(json_strn_new, json_t, const char *value, size_t len)
+MOCK_ABORT_PTR(json_null, json_t, void)
+
+static asn_dec_rval_t trash;
+__MOCK_ABORT(OPEN_TYPE_ber_get, asn_dec_rval_t, trash,
+ const asn_codec_ctx_t *opt_codec_ctx, const asn_TYPE_descriptor_t *td,
+ void *sptr, const asn_TYPE_member_t *elm, const void *ptr, size_t size)
+MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td,
+ const void *strt, asn_app_constraint_failed_f *cb, void *key)
/* Tests */
}
static void
-init_tmp(void)
+create_dir(char const *path)
{
- int res = mkdir("tmp/", 0700);
- if (res && errno != EEXIST)
- pr_crit("Could not create tmp/: %s", strerror(errno));
+ if (mkdir(path, 0700) < 0)
+ ck_assert_int_eq(EEXIST, errno);
}
-static void *
-rsync_fast(void *arg)
+static void
+create_file(char const *name, unsigned int kbs)
{
- int fds[2][2];
- memcpy(fds, arg, sizeof(fds));
-
- ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN));
- ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
- ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN));
- ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
+ FILE *file;
- close(STDERR_WRITE(fds));
- close(STDOUT_WRITE(fds));
- return NULL;
+ file = fopen(name, "wb");
+ ck_assert_ptr_ne(NULL, file);
+ ck_assert_int_eq(kbs, fwrite(content, sizeof(content), kbs, file));
+ ck_assert_int_eq(0, fclose(file));
}
-static void *
-rsync_stalled(void *arg)
+static void
+ensure_file_deleted(char const *name)
{
- int fds[2][2];
- memcpy(fds, arg, sizeof(fds));
-
- ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
-
- sleep(5); /* The timeout is 4 seconds */
+ int ret;
+ int error;
- ck_assert_int_ne(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
+ errno = 0;
+ ret = unlink(name);
+ error = errno;
- close(STDERR_WRITE(fds));
- close(STDOUT_WRITE(fds));
- return NULL;
+ ck_assert(ret == 0 || error == ENOENT);
}
-static void *
-rsync_drip_feed(void *arg)
+static void
+create_rsync_sandbox(void)
{
- int fds[2][2];
- memcpy(fds, arg, sizeof(fds));
-
- ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
- sleep(1);
- ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
- ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN));
- sleep(1);
- ck_assert_int_eq(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
- sleep(1);
- ck_assert_int_eq(STR64LEN, write(STDERR_WRITE(fds), STR64, STR64LEN));
- sleep(2);
- ck_assert_int_ne(STR64LEN, write(STDOUT_WRITE(fds), STR64, STR64LEN));
-
- close(STDERR_WRITE(fds));
- close(STDOUT_WRITE(fds));
- return NULL;
+ create_dir("tmp");
+ create_dir("tmp/rsync");
+ create_dir("tmp/rsync/src");
+ create_dir("tmp/rsync/dst");
+
+ create_file("tmp/rsync/src/a", 1);
+ create_file("tmp/rsync/src/b", 1);
+ create_file("tmp/rsync/src/c", 1);
+
+ ensure_file_deleted("tmp/rsync/dst/a");
+ ensure_file_deleted("tmp/rsync/dst/b");
+ ensure_file_deleted("tmp/rsync/dst/c");
}
static void
-prepare_exhaust(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *))
+diff(char const *file1, char const *file2)
{
- ck_assert_int_eq(0, pipe(fds[0]));
- ck_assert_int_eq(0, pipe(fds[1]));
- ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, fds));
+ int fd1, fd2;
+ struct read_stream rs1, rs2;
+ int read1, read2;
+
+ fd1 = open(file1, O_RDONLY, 0);
+ ck_assert_int_ne(-1, fd1);
+ rstream_init(&rs1, fd1, 1024);
+
+ fd2 = open(file2, O_RDONLY, 0);
+ ck_assert_int_ne(-1, fd2);
+ rstream_init(&rs2, fd2, 1024);
+
+ do {
+ read1 = rstream_full_read(&rs1, 1024);
+ ck_assert_int_ge(read1, 0);
+ read2 = rstream_full_read(&rs2, 1024);
+ ck_assert_int_eq(read1, read2);
+ ck_assert_int_eq(0, memcmp(rs1.buffer, rs2.buffer, read1));
+ } while (read1 == 1024);
+
+ rstream_close(&rs1, true);
+ rstream_close(&rs2, true);
}
static void
-finish_exhaust(pthread_t thread)
+ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
+ char const *url, char const *path)
{
- pthread_join(thread, NULL);
+ struct rsync_task *task;
+
+ task = LIST_FIRST(tasks);
+ ck_assert_ptr_ne(NULL, task);
+ ck_assert_str_eq(url, task->url);
+ ck_assert_str_eq(path, task->path);
+ finish_task(task, sk);
}
-START_TEST(exhaust_read_fds_test_normal)
+/* Test RsyncRequest decode, feeding as few bytes as possible every time. */
+START_TEST(test_decode_extremely_fragmented)
{
- int fds[2][2];
- pthread_t rsync_writer;
-
- printf("Normal transfer\n");
- prepare_exhaust(fds, &rsync_writer, rsync_fast);
- ck_assert_int_eq(0, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
- finish_exhaust(rsync_writer);
+ struct RsyncRequest src, *dst;
+ unsigned char encoded[BUFSIZE];
+ asn_enc_rval_t encres;
+ asn_dec_rval_t decres;
+ unsigned int start, end, max;
+
+ ck_assert_int_eq(0, RsyncRequest_init(&src,
+ "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123456789",
+ "AbCdEfGhIjKlMnOpQrStUvWxYz1234567890"));
+ encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &src,
+ encoded, sizeof(encoded));
+ ck_assert_int_gt(encres.encoded, 0);
+
+ printf("size: %zu\n", encres.encoded);
+
+ dst = NULL;
+ max = 0;
+ for (start = end = 0; end < encres.encoded - 1; end++) {
+ printf("Offset %u: Requesting %u bytes...\n",
+ start, end - start + 1);
+ decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&dst,
+ encoded + start, end - start + 1);
+ ck_assert_int_eq(RC_WMORE, decres.code);
+ start += decres.consumed;
+
+ printf("Consumed %zu bytes.\n", decres.consumed);
+ if (decres.consumed > max)
+ max = decres.consumed;
+ }
+
+ printf("Minimum required buffer size: %u bytes\n", max);
+
+ decres = ber_decode(&asn_DEF_RsyncRequest,
+ (void **)&dst, encoded + start, end - start + 1);
+ ck_assert_int_eq(RC_OK, decres.code);
+ ck_assert_uint_eq(end - start + 1, decres.consumed);
+
+ ck_assert_int_eq(0, OCTET_STRING_cmp(&src.url, &dst->url));
+ ck_assert_int_eq(0, OCTET_STRING_cmp(&src.path, &dst->path));
+
+ ASN_STRUCT_RESET(asn_DEF_RsyncRequest, &src);
+ ASN_STRUCT_FREE(asn_DEF_RsyncRequest, dst);
}
END_TEST
-START_TEST(exhaust_read_fds_test_stalled)
+static void
+encode_request(char const *url, char const *path, unsigned char *buffer)
{
- int fds[2][2];
- pthread_t rsync_writer;
+ struct RsyncRequest rr;
+ asn_enc_rval_t encres;
- printf("Stalled transfer\n");
- prepare_exhaust(fds, &rsync_writer, rsync_stalled);
- ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
- finish_exhaust(rsync_writer);
+ ck_assert_int_eq(0, RsyncRequest_init(&rr, url, path));
+ encres = der_encode_to_buffer(&asn_DEF_RsyncRequest, &rr, buffer, BUFSIZE);
+ ck_assert_int_gt(encres.encoded, 0);
+ ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &rr);
}
-END_TEST
-START_TEST(exhaust_read_fds_test_drip)
+/*
+ * Tests messy request queuing; in particular, when a single read() yields less
+ * than one or multiple of them.
+ */
+START_TEST(test_read_tasks)
{
- int fds[2][2];
- pthread_t rsync_writer;
-
- printf("Drip-feed\n");
- prepare_exhaust(fds, &rsync_writer, rsync_drip_feed);
- ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
- finish_exhaust(rsync_writer);
+ unsigned char bytes[BUFSIZE];
+ int fds[2];
+ struct s2p_socket sk;
+ struct rsync_tasks tasks;
+ struct timespec now;
+
+ ck_assert_int_eq(0, nonblock_pipe(fds));
+ rstream_init(&sk.rd, fds[0], 256);
+ sk.wr = -1;
+ sk.rr = NULL;
+ LIST_INIT(&tasks);
+ ts_now(&now);
+
+ printf("Read yields nothing\n");
+ ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now));
+
+ printf("Read yields less than 1 request\n");
+ encode_request("111", "2222", bytes); /* 13 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
+ ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now));
+
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3));
+ ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+
+ ck_1st_task(&tasks, &sk, "111", "2222");
+ ck_assert(LIST_EMPTY(&tasks));
+
+ printf("Read yields 1 request\n");
+ encode_request("3333", "444", bytes); /* 13 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13));
+ ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+
+ ck_1st_task(&tasks, &sk, "3333", "444");
+ ck_assert(LIST_EMPTY(&tasks));
+
+ printf("Read yields 1.5 requests\n");
+ encode_request("55", "666", bytes); /* 11 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11));
+ encode_request("777", "88", bytes); /* 11 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5));
+ ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+
+ ck_1st_task(&tasks, &sk, "55", "666");
+ ck_assert(LIST_EMPTY(&tasks));
+
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6));
+ ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+
+ ck_1st_task(&tasks, &sk, "777", "88");
+ ck_assert(LIST_EMPTY(&tasks));
+
+ printf("Read yields 2 requests\n");
+ encode_request("9999", "00", bytes); /* 12 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
+ encode_request("aa", "bbbb", bytes); /* 12 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
+ ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now));
+ ck_1st_task(&tasks, &sk, "aa", "bbbb");
+ ck_1st_task(&tasks, &sk, "9999", "00");
+ ck_assert(LIST_EMPTY(&tasks));
+
+ printf("Read yields 2.5 requests\n");
+ encode_request("cc", "dd", bytes); /* 10 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
+ encode_request("eeeee", "fffff", bytes); /* 16 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 16));
+ encode_request("gggg", "hhhhh", bytes); /* 15 bytes */
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3));
+
+ ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now));
+ ck_1st_task(&tasks, &sk, "eeeee", "fffff");
+ ck_1st_task(&tasks, &sk, "cc", "dd");
+ ck_assert(LIST_EMPTY(&tasks));
+
+ ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12));
+ ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+ ck_1st_task(&tasks, &sk, "gggg", "hhhhh");
+ ck_assert(LIST_EMPTY(&tasks));
}
END_TEST
static void
-create_file(char const *name, unsigned int kbs)
+wait_rsyncs(unsigned int count)
{
- FILE *file;
- unsigned int k;
+ unsigned int done = 0;
- file = fopen(name, "wb");
- ck_assert_ptr_ne(NULL, file);
- for (k = 0; k < kbs; k++)
- ck_assert_int_eq(sizeof(content), fwrite(content, 1, sizeof(content), file));
- ck_assert_int_eq(0, fclose(file));
+ do {
+ sleep(1);
+ done += rsync_finished();
+ printf("rsyncs done: %u\n", done);
+ } while (done < count);
+
+ ck_assert_uint_eq(count, done);
}
-static void
-ensure_file_deleted(char const *name)
+START_TEST(test_fast_single_rsync)
{
- int ret;
- int error;
+ rsync_setup("resources/rsync/fast.sh", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
- errno = 0;
- ret = unlink(name);
- error = errno;
-
- ck_assert(ret == 0 || error == ENOENT);
-}
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
+ wait_rsyncs(1);
-START_TEST(full_rsync_timeout_test_1kb)
-{
- printf("1kb\n");
- create_file("tmp/1kb", 1);
- ensure_file_deleted("tmp/1kb-copy");
- ck_assert_int_eq(0, rsync("tmp/1kb", "tmp/1kb-copy"));
+ rsync_teardown();
}
END_TEST
-START_TEST(full_rsync_timeout_test_3kb)
+START_TEST(test_stalled_single_rsync)
{
- printf("3kb\n");
- create_file("tmp/3kb", 3);
- ensure_file_deleted("tmp/3kb-copy");
- ck_assert_int_eq(0, rsync("tmp/3kb", "tmp/3kb-copy"));
+ rsync_setup("resources/rsync/stalled.sh", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
+
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
+ wait_rsyncs(1);
+
+ rsync_teardown();
}
END_TEST
-START_TEST(full_rsync_timeout_test_5kb)
+START_TEST(test_stalled_single_rsync_timeout)
{
- printf("5kb\n");
- create_file("tmp/5kb", 5);
- ensure_file_deleted("tmp/5kb-copy");
- /* Max speed is 1kbps, timeout is 4 seconds */
- ck_assert_int_eq(EIO, rsync("tmp/5kb", "tmp/5kb-copy"));
+ rsync_setup("resources/rsync/stalled-timeout.sh", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
+
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
+ wait_rsyncs(1);
+
+ rsync_teardown();
}
END_TEST
-static void
-wait_rsyncs(unsigned int expected)
+START_TEST(test_dripfeed_single_rsync)
{
- unsigned int actual = 0;
+ rsync_setup("resources/rsync/drip-feed.sh", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
- do {
- actual += rsync_finished();
- if (expected == actual) {
- ck_assert_uint_eq(0, rsync_finished());
- return;
- }
- ck_assert(expected > actual);
- sleep(1);
- } while (true);
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
+ wait_rsyncs(1);
+
+ rsync_teardown();
}
+END_TEST
-START_TEST(test_rsync_finished)
+START_TEST(test_dripfeed_single_rsync_timeout)
{
- printf("rsync_finished()\n");
-
- create_file("tmp/2kb", 1);
- ensure_file_deleted("tmp/2kb-copy");
+ rsync_setup("resources/rsync/drip-feed-timeout.sh", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
- rsync_setup();
-
- ck_assert_int_eq(0, rsync_download("tmp/2kb", "tmp/2kb-copy"));
- ck_assert_int_eq(0, rsync_finished());
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
wait_rsyncs(1);
rsync_teardown();
}
END_TEST
-START_TEST(test_multi_rsync)
+START_TEST(test_no_rsyncs)
{
- printf("simultaneous rsyncs\n");
+ rsync_setup("rsync", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
- create_file("tmp/i1", 1);
- ensure_file_deleted("tmp/i1-copy");
- create_file("tmp/i2", 1);
- ensure_file_deleted("tmp/i2-copy");
- create_file("tmp/i3", 1);
- ensure_file_deleted("tmp/i3-copy");
+ sleep(2);
- rsync_setup();
+ rsync_teardown();
+}
+END_TEST
- ck_assert_int_eq(0, rsync_download("tmp/i1", "tmp/"));
- ck_assert_int_eq(0, rsync_finished());
- ck_assert_int_eq(0, rsync_download("tmp/i2", "tmp/i2-copy"));
- ck_assert_int_eq(0, rsync_download("tmp/i3", "tmp/i3-copy"));
+START_TEST(test_simultaneous_rsyncs)
+{
+ create_rsync_sandbox();
+ /* Note... --bwlimit does not seem to exist in openrsync */
+ rsync_setup("rsync", "--bwlimit=1K", "-vvv", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
+
+ ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/a", "tmp/rsync/dst/a"));
+ ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/b", "tmp/rsync/dst/b"));
+ ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/c", "tmp/rsync/dst/c"));
wait_rsyncs(3);
rsync_teardown();
+
+ diff("tmp/rsync/src/a", "tmp/rsync/dst/a");
+ diff("tmp/rsync/src/b", "tmp/rsync/dst/b");
+ diff("tmp/rsync/src/c", "tmp/rsync/dst/c");
}
END_TEST
-static Suite *create_suite(void)
+static Suite *
+create_suite(void)
{
Suite *suite;
- TCase *pipes, *spawner;
+ TCase *p2s, *s2r, *spawner;
+
+ p2s = tcase_create("parent-spawner channel");
+ tcase_add_test(p2s, test_decode_extremely_fragmented);
+ tcase_add_test(p2s, test_read_tasks);
- pipes = tcase_create("pipes");
- tcase_add_test(pipes, exhaust_read_fds_test_normal);
- tcase_add_test(pipes, exhaust_read_fds_test_stalled);
- tcase_add_test(pipes, exhaust_read_fds_test_drip);
- tcase_add_test(pipes, full_rsync_timeout_test_1kb);
- tcase_add_test(pipes, full_rsync_timeout_test_3kb);
- tcase_add_test(pipes, full_rsync_timeout_test_5kb);
- tcase_set_timeout(pipes, 6);
+ s2r = tcase_create("spawner-rsync channel");
+ tcase_add_test(p2s, test_fast_single_rsync);
+ tcase_add_test(p2s, test_stalled_single_rsync);
+ tcase_add_test(p2s, test_stalled_single_rsync_timeout);
+ tcase_add_test(p2s, test_dripfeed_single_rsync);
+ tcase_add_test(p2s, test_dripfeed_single_rsync_timeout);
+ tcase_set_timeout(p2s, 6);
spawner = tcase_create("spawner");
- tcase_add_test(spawner, test_rsync_finished);
- tcase_add_test(spawner, test_multi_rsync);
+ tcase_add_test(spawner, test_no_rsyncs);
+ tcase_add_test(spawner, test_simultaneous_rsyncs);
tcase_set_timeout(spawner, 6);
suite = suite_create("rsync");
- suite_add_tcase(suite, pipes);
+ suite_add_tcase(suite, p2s);
+ suite_add_tcase(suite, s2r);
suite_add_tcase(suite, spawner);
return suite;
}
-int main(void)
+int
+main(void)
{
- Suite *suite;
SRunner *runner;
int tests_failed;
printf("This test needs to exhaust some timeouts. Please be patient.\n");
disable_sigpipe();
init_content();
- init_tmp();
-
- suite = create_suite();
- runner = srunner_create(suite);
+ runner = srunner_create(create_suite());
srunner_run_all(runner, CK_NORMAL);
tests_failed = srunner_ntests_failed(runner);
srunner_free(runner);
--- /dev/null
+#include <check.h>
+#include <pthread.h>
+
+#include "alloc.c"
+#include "mock.c"
+#include "stream.c"
+
+#define DO_WRITE(fd, str) ck_assert_int_eq(0, stream_wr_str(fd, str))
+#define CK_READ(stm, str) do { \
+ ck_assert_int_eq(0, rstream_read_str(&stm, &rcvd)); \
+ ck_assert_str_eq(str, rcvd); \
+ free(rcvd); \
+ } while (0)
+#define CK_READ_END(stm) do { \
+ ck_assert_int_eq(0, rstream_read_str(&stm, &rcvd)); \
+ ck_assert_ptr_eq(NULL, rcvd); \
+ } while (0);
+
+static char large[1025];
+
+START_TEST(test_string_simple)
+{
+ int fds[2];
+ struct read_stream stream;
+ char *rcvd;
+
+ ck_assert_int_eq(0, pipe(fds));
+ rstream_init(&stream, fds[0], 128);
+
+ DO_WRITE(fds[1], "Lorem ipsum dolor sit amet");
+ CK_READ(stream, "Lorem ipsum dolor sit amet");
+ close(fds[1]);
+ CK_READ_END(stream);
+
+ rstream_close(&stream, true);
+}
+END_TEST
+
+START_TEST(test_string_simple_alt)
+{
+ int fds[2];
+ struct read_stream stream;
+ char *rcvd;
+
+ ck_assert_int_eq(0, pipe(fds));
+ rstream_init(&stream, fds[0], 128);
+
+ DO_WRITE(fds[1], "Lorem ipsum dolor sit amet");
+ close(fds[1]);
+ CK_READ(stream, "Lorem ipsum dolor sit amet");
+ CK_READ_END(stream);
+
+ rstream_close(&stream, true);
+}
+END_TEST
+
+START_TEST(test_string_multiple)
+{
+ int fds[2];
+ struct read_stream stream;
+ char *rcvd;
+
+ ck_assert_int_eq(0, pipe(fds));
+ rstream_init(&stream, fds[0], 128);
+
+ /* Write 3 separate strings, close immediately */
+ DO_WRITE(fds[1], "Lorem ipsum dolor sit amet");
+ DO_WRITE(fds[1], "consectetur adipiscing elit");
+ DO_WRITE(fds[1], "Curabitur scelerisque tortor est");
+ close(fds[1]);
+
+ /* Read each string separately, then check read end */
+ CK_READ(stream, "Lorem ipsum dolor sit amet");
+ CK_READ(stream, "consectetur adipiscing elit");
+ CK_READ(stream, "Curabitur scelerisque tortor est");
+ CK_READ_END(stream);
+
+ rstream_close(&stream, true);
+}
+END_TEST
+
+START_TEST(test_string_capacity_growth)
+{
+ int fds[2];
+ struct read_stream stream;
+ char *rcvd;
+
+ ck_assert_int_eq(0, pipe(fds));
+ rstream_init(&stream, fds[0], 16);
+
+ DO_WRITE(fds[1], "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
+ DO_WRITE(fds[1], "Curabitur scelerisque tortor est, ut lacinia eros rutrum id. Duis sed metus id nisl suscipit facilisis.");
+ DO_WRITE(fds[1], "Mauris at varius libero.");
+ close(fds[1]);
+ CK_READ(stream, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
+ ck_assert_uint_eq(64, stream.capacity);
+ CK_READ(stream, "Curabitur scelerisque tortor est, ut lacinia eros rutrum id. Duis sed metus id nisl suscipit facilisis.");
+ ck_assert_uint_eq(128, stream.capacity);
+ CK_READ(stream, "Mauris at varius libero.");
+ ck_assert_uint_eq(128, stream.capacity);
+ CK_READ_END(stream);
+
+ rstream_close(&stream, true);
+}
+END_TEST
+
+START_TEST(test_string_max_size)
+{
+ int fds[2];
+ struct read_stream stream;
+ char *rcvd;
+
+ /* strlen(src) = 1023: Success */
+
+ memset(large, 'a', 1023);
+ large[1023] = 0;
+
+ ck_assert_int_eq(0, pipe(fds));
+ rstream_init(&stream, fds[0], 256);
+
+ DO_WRITE(fds[1], large);
+ close(fds[1]);
+ CK_READ(stream, large);
+ CK_READ_END(stream);
+
+ rstream_close(&stream, true);
+
+ /* strlen(src) = 1024: Fail */
+
+ large[1023] = 'b';
+ large[1024] = 0;
+
+ ck_assert_int_eq(0, pipe(fds));
+ rstream_init(&stream, fds[0], 256);
+
+ ck_assert_int_eq(EBADLEN, stream_wr_str(fds[1], large));
+ close(fds[1]);
+ CK_READ_END(stream);
+
+ rstream_close(&stream, true);
+}
+END_TEST
+
+static void *
+consume_fragments(void *arg)
+{
+ struct read_stream stream;
+ char *rcvd;
+
+ rstream_init(&stream, *((int *)arg), 128);
+
+ CK_READ(stream, "1234567890abcdefghijABCDEFGHI");
+ CK_READ_END(stream);
+ rstream_close(&stream, true);
+
+ return NULL;
+}
+
+START_TEST(test_string_fragmented)
+{
+ pthread_t reader;
+ int fds[2];
+
+ ck_assert_int_eq(0, pipe(fds));
+ ck_assert_int_eq(0, pthread_create(&reader, NULL, consume_fragments, &fds[0]));
+
+ ck_assert_int_eq(0, write_size_t(fds[1], 30));
+ ck_assert_int_eq(0, full_write(fds[1], (unsigned char *)"1234567890", 10));
+ sleep(1);
+ ck_assert_int_eq(0, full_write(fds[1], (unsigned char *)"abcdefghij", 10));
+ sleep(1);
+ ck_assert_int_eq(0, full_write(fds[1], (unsigned char *)"ABCDEFGHI", 10));
+ close(fds[1]);
+
+ ck_assert_int_eq(0, pthread_join(reader, NULL));
+}
+END_TEST
+
+static void *
+consume_many(void *arg)
+{
+ struct read_stream stream;
+ unsigned int i;
+ char *rcvd;
+
+ rstream_init(&stream, *((int *)arg), 256);
+
+ for (i = 0; i < 16384; i++)
+ CK_READ(stream, large);
+ CK_READ_END(stream);
+ rstream_close(&stream, true);
+
+ return NULL;
+}
+
+START_TEST(test_string_many)
+{
+ int fds[2];
+ pthread_t reader;
+ unsigned int i;
+
+ memset(large, 'a', 1023);
+ large[1023] = 0;
+
+ ck_assert_int_eq(0, pipe(fds));
+ ck_assert_int_eq(0, pthread_create(&reader, NULL, consume_many, &fds[0]));
+
+ for (i = 0; i < 16384; i++)
+ DO_WRITE(fds[1], large);
+ close(fds[1]);
+
+ ck_assert_int_eq(0, pthread_join(reader, NULL));
+}
+END_TEST
+
+static Suite *
+create_suite(void)
+{
+ Suite *suite;
+ TCase *string;
+
+ string = tcase_create("string");
+ tcase_add_test(string, test_string_simple);
+ tcase_add_test(string, test_string_simple_alt);
+ tcase_add_test(string, test_string_multiple);
+ tcase_add_test(string, test_string_capacity_growth);
+ tcase_add_test(string, test_string_max_size);
+ tcase_add_test(string, test_string_fragmented);
+ tcase_add_test(string, test_string_many);
+
+ suite = suite_create("stream");
+ suite_add_tcase(suite, string);
+
+ return suite;
+}
+
+int
+main(void)
+{
+ SRunner *runner;
+ int failures;
+
+ runner = srunner_create(create_suite());
+ srunner_run_all(runner, CK_NORMAL);
+ failures = srunner_ntests_failed(runner);
+ srunner_free(runner);
+
+ return (failures == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}