#include <fcntl.h>
#include <poll.h>
#include <signal.h>
+#include <stream.h>
#include <sys/wait.h>
#include <syslog.h>
#define STDERR_READ(fds) fds[0][0]
#define STDOUT_READ(fds) fds[1][0]
+static pid_t spawner; /* The process that spawns rsync runs */
+
+static int readfd; /* Our end of the spawner-to-parent pipe */
+static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER;
+
+static int writefd; /* Our end of the parent-to-spawner pipe */
+static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER;
+
+static int rsync(char const *, char const *);
+
+static int
+run_child(int readfd, int writefd)
+{
+ unsigned char zero = 0;
+ struct read_stream stream;
+ char *url, *path;
+ int error;
+
+ read_stream_init(&stream, readfd);
+
+ do {
+ error = read_string(&stream, &url);
+ if (error || url == NULL)
+ break;
+ error = read_string(&stream, &path);
+ if (error || path == NULL) {
+ free(url);
+ break;
+ }
+
+ error = rsync(url, path);
+
+ free(url);
+ free(path);
+
+ error = full_write(writefd, &zero, 1);
+ } while (!error);
+
+ read_stream_close(&stream);
+ close(writefd);
+ free_rpki_config();
+ log_teardown();
+ return error;
+}
+
+void
+rsync_setup(void)
+{
+ static const int RDFD = 0;
+ static const int WRFD = 1;
+ int parent2spawner[2]; /* Pipe: Parent writes, spawner reads */
+ int spawner2parent[2]; /* Pipe: Spawner writes, parent reads */
+ int flags;
+
+ if (pipe(parent2spawner) < 0) {
+ pr_op_err("Cannot create pipe: %s", strerror(errno));
+ goto fail1;
+ }
+ if (pipe(spawner2parent) < 0) {
+ pr_op_err("Cannot create pipe: %s", strerror(errno));
+ goto fail2;
+ }
+
+ flags = fcntl(spawner2parent[RDFD], F_GETFL);
+ if (flags < 0) {
+ pr_op_err("Cannot retrieve pipe flags: %s", strerror(errno));
+ goto fail3;
+ }
+ if (fcntl(spawner2parent[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) {
+ pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(errno));
+ goto fail3;
+ }
+
+ fflush(stdout);
+ fflush(stderr);
+
+ spawner = fork();
+ if (spawner < 0) {
+ pr_op_err("Cannot fork rsync spawner: %s", strerror(errno));
+ goto fail3;
+ }
+
+ if (spawner == 0) { /* Client code */
+ close(parent2spawner[WRFD]);
+ close(spawner2parent[RDFD]);
+ exit(run_child(parent2spawner[RDFD], spawner2parent[WRFD]));
+ }
+
+ /* Parent code */
+ close(parent2spawner[RDFD]);
+ close(spawner2parent[WRFD]);
+ readfd = spawner2parent[RDFD];
+ writefd = parent2spawner[WRFD];
+ return;
+
+fail3: close(spawner2parent[RDFD]);
+ close(spawner2parent[WRFD]);
+fail2: close(parent2spawner[RDFD]);
+ close(parent2spawner[WRFD]);
+fail1: pr_op_warn("rsync will not be available.");
+ readfd = writefd = 0;
+}
+
+int
+rsync_download(char const *url, char const *path)
+{
+ mutex_lock(&writelock);
+
+ if (writefd == 0)
+ goto fail1;
+ if (write_string(writefd, url) != 0)
+ goto fail2;
+ if (write_string(writefd, path) != 0)
+ goto fail2;
+
+ mutex_unlock(&writelock);
+ return 0; // XXX go pick some other task
+
+fail2: close(readfd);
+ close(writefd);
+ readfd = writefd = 0;
+fail1: mutex_unlock(&writelock);
+ return EIO;
+}
+
+/* Returns the number of rsyncs that have ended since the last query */
+unsigned int
+rsync_finished(void)
+{
+ unsigned char buf[8];
+ ssize_t result;
+
+ mutex_lock(&readlock);
+ result = (readfd != 0) ? read(readfd, buf, sizeof(buf)) : 0;
+ mutex_unlock(&readlock);
+
+ return (result >= 0) ? result : 0;
+}
+
+static int
+wait_child(char const *name, pid_t pid)
+{
+ int status;
+ int error;
+
+again: status = 0;
+ if (waitpid(pid, &status, 0) < 0) {
+ error = errno;
+ pr_op_err("Could not wait for %s: %s", name, strerror(error));
+ return error;
+ }
+
+ if (WIFEXITED(status)) {
+ /* Happy path (but also sad path sometimes) */
+ error = WEXITSTATUS(status);
+ pr_val_debug("%s ended. Result: %d", name, error);
+ return error ? EIO : 0;
+ }
+
+ if (WIFSIGNALED(status)) {
+ pr_op_warn("%s interrupted by signal %d.",
+ name, WTERMSIG(status));
+ return EINTR;
+ }
+
+ if (WIFCONTINUED(status)) {
+ /*
+ * Testing warning:
+ * I can't trigger this branch. It always exits or signals;
+ * SIGSTOP then SIGCONT doesn't seem to wake up waitpid().
+ * It's concerning because every sample code I've found assumes
+ * waitpid() returning always means the subprocess ended, so
+ * they never retry. But that contradicts all documentation,
+ * yet seems to be accurate to reality.
+ */
+ pr_op_debug("%s has resumed.", name);
+ goto again;
+ }
+
+ /* Dead code */
+ pr_op_err("Unknown waitpid() status; giving up %s.", name);
+ return EINVAL;
+}
+
+void
+rsync_teardown(void)
+{
+ if (readfd)
+ close(readfd);
+ if (writefd)
+ close(writefd);
+ readfd = writefd = 0;
+ wait_child("rsync spawner", spawner);
+}
+
/*
* Duplicate parent FDs, to pipe rsync output:
* - fds[0] = stderr
args[i++] = NULL;
}
-__dead static void
-handle_child_thread(char const *url, char const *path, int fds[2][2])
+static int
+execvp_rsync(char const *url, char const *path, int fds[2][2])
{
- /* THIS FUNCTION MUST NEVER RETURN!!! */
char *args[20];
- int error;
prepare_rsync_args(args, url, path);
duplicate_fds(fds);
- execvp(args[0], args);
- error = errno;
- /* Log directly to stderr, redirected by the pipes */
- fprintf(stderr, "Could not execute the rsync command: %s\n",
- strerror(error));
+ if (execvp(args[0], args) < 0)
+ return errno;
- /* https://stackoverflow.com/a/14493459/1735458 */
- exit(-error);
+ return EINVAL; /* Unreachable, but whatever */
}
static int
error = errno;
/* Close pipe previously created */
- close(STDERR_READ(fds));
- close(STDERR_WRITE(fds));
+ close(fds[0][0]);
+ close(fds[0][1]);
pr_op_err_st("Piping rsync stdout: %s", strerror(error));
return -error;
static void
log_buffer(char const *buffer, ssize_t read, bool is_error)
{
-#define PRE_RSYNC "[RSYNC exec]: "
char *cpy, *cur, *tmp;
cpy = pmalloc(read + 1);
continue;
}
if (is_error)
- pr_val_err(PRE_RSYNC "%s", cur);
+ pr_val_err("[RSYNC exec] %s", cur);
else
- pr_val_debug(PRE_RSYNC "%s", cur);
+ pr_val_debug("[RSYNC exec] %s", cur);
cur = tmp + 1;
}
free(cpy);
-#undef PRE_RSYNC
}
#define DROP_FD(f, fail) \
}
timed_out:
- pr_val_err("rsync transfer timeout reached");
+ pr_val_err("rsync transfer timeout exhausted");
error = 2;
fail: for (f = 0; f < 2; f++)
if (pfd[f].fd != -1)
/*
* Completely consumes @fds' streams, and closes them.
*
- * Allegedly, this is a portable way to wait for the child process to finish.
- * (IIRC, waitpid() doesn't do this reliably.)
+ * Originally, this was meant to redirect rsync's output to syslog:
+ * ac56d70c954caf49382f5f28ff4a017e859e2e0a
+ * (ie. we need to exhaust the streams because we dup2()'d them.)
+ *
+ * Later, @job repurposed this code to fix #74.
*/
static int
exhaust_pipes(int fds[2][2])
}
/* rsync @url @path */
-int
-rsync_download(char const *url, char const *path)
+static int
+rsync(char const *url, char const *path)
{
/* Descriptors to pipe stderr (first element) and stdout (second) */
int fork_fds[2][2];
pid_t child_pid;
- int child;
int error;
- pr_val_info("rsync: %s -> %s", url, path);
-
error = create_pipes(fork_fds);
if (error)
return error;
- /* Flush output (avoid locks between father and child) */
- log_flush();
+ fflush(stdout);
+ fflush(stderr);
- /* We need to fork because execvp() magics the thread away. */
child_pid = fork();
- if (child_pid == 0) {
- /*
- * This code is run by the child, and should try to
- * call execvp() as soon as possible.
- *
- * Refer to
- * https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html
- * "{..} to avoid errors, the child process may only
- * execute async-signal-safe operations until such time
- * as one of the exec functions is called."
- */
- handle_child_thread(url, path, fork_fds);
- }
if (child_pid < 0) {
error = errno;
- pr_op_err_st("Couldn't fork to execute rsync: %s",
+ pr_op_err_st("Couldn't spawn the rsync process: %s",
strerror(error));
/* Close all ends from the created pipes */
close(STDERR_READ(fork_fds));
return error;
}
- /* This code is run by us. */
+ if (child_pid == 0)
+ exit(execvp_rsync(url, path, fork_fds)); /* Child code */
+
+ /* Parent code */
+
error = exhaust_pipes(fork_fds);
if (error)
kill(child_pid, SIGTERM); /* Stop the child */
- child = 0;
- if (waitpid(child_pid, &child, 0) < 0) {
- error = errno;
- pr_op_err("Could not wait for rsync: %s", strerror(error));
- return error;
- }
-
- if (WIFEXITED(child)) {
- /* Happy path (but also sad path sometimes) */
- error = WEXITSTATUS(child);
- pr_val_debug("rsync ended. Result: %d", error);
- return error ? EIO : 0;
- }
-
- if (WIFSIGNALED(child)) {
- pr_op_warn("rsync interrupted by signal %d.", WTERMSIG(child));
- return EINTR; /* Meh? */
- }
-
- pr_op_err_st("rsync died in an unknown way.");
- return -EINVAL;
+ return wait_child("rsync", child_pid);
}
--- /dev/null
+#include "stream.h"
+
+#include <errno.h>
+#include <stdbool.h>
+#include <stddef.h>
+
+void
+read_stream_init(struct read_stream *stream, int fd)
+{
+ stream->fd = fd;
+ stream->buffer = pmalloc(256);
+ stream->capacity = 256;
+}
+
+void
+read_stream_close(struct read_stream *stream)
+{
+ close(stream->fd);
+ free(stream->buffer);
+}
+
+/*
+ * Full read or error.
+ *
+ * Nonzero: error
+ * 0, stream->buffer != NULL: Success
+ * 0, stream->buffer == NULL: EOF
+ */
+static int
+full_read(struct read_stream *stream, size_t len)
+{
+ ssize_t rd;
+ size_t offset;
+
+ if (stream->buffer == NULL)
+ return 0;
+
+ offset = 0;
+ do {
+ rd = read(stream->fd, stream->buffer + offset, len);
+ if (rd < 0)
+ return errno;
+ if (rd == 0) {
+ free(stream->buffer);
+ stream->buffer = NULL;
+ stream->capacity = 0;
+ return 0;
+ }
+ if (rd == len)
+ return 0;
+ if (rd > len)
+ pr_crit("rd > len: %zd > %zu", rd, len);
+
+ len -= rd;
+ offset += rd;
+ } while (true);
+}
+
+/* Full write or error. */
+int
+full_write(int fd, unsigned char const *buf, size_t len)
+{
+ ssize_t wr;
+ size_t offset;
+
+ offset = 0;
+ do {
+ wr = write(fd, buf + offset, len);
+ if (wr < 0)
+ return errno;
+ if (wr > len)
+ pr_crit("wr > len: %zd > %zu", wr, len);
+ len -= wr;
+ offset += wr;
+ } while (len > 0);
+
+ return 0;
+}
+
+/* @value -1 means "EOF". */
+static int
+read_ssize_t(struct read_stream *stream, ssize_t *value)
+{
+ int error;
+
+ error = full_read(stream, 2);
+ if (error)
+ return error;
+
+ *value = stream->buffer
+ ? ((stream->buffer[0] << 8) | stream->buffer[1])
+ : -1;
+ return 0;
+}
+
+static int
+write_size_t(int fd, size_t value)
+{
+ unsigned char buf[2];
+
+ if (value > 1024)
+ return ENOSPC;
+
+ buf[0] = (value >> 8) & 0xFF;
+ buf[1] = value & 0xFF;
+
+ return full_write(fd, buf, 2);
+}
+
+int
+read_string(struct read_stream *stream, char **result)
+{
+ ssize_t len;
+ int error;
+
+ error = read_ssize_t(stream, &len);
+ if (error)
+ return error;
+ if (len == -1) {
+ *result = NULL;
+ return 0;
+ }
+
+ if (len > stream->capacity) {
+ do {
+ stream->capacity *= 2;
+ } while (len > stream->capacity);
+ stream->buffer = prealloc(stream->buffer, stream->capacity);
+ }
+
+ error = full_read(stream, len);
+ if (error)
+ return error;
+ *result = stream->buffer ? pstrdup((char *)stream->buffer) : NULL;
+ return 0;
+}
+
+int
+write_string(int fd, char const *str)
+{
+ size_t len;
+ int error;
+
+ len = strlen(str) + 1;
+
+ error = write_size_t(fd, len);
+ if (error)
+ return error;
+
+ return full_write(fd, (unsigned char const *) str, len);
+}
#include "common.c"
#include "mock.c"
#include "rsync.c"
+#include "stream.c"
static char const STR64[] = "abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
MOCK(config_get_rsync_program, char const *, "rsync", void)
MOCK(config_get_rsync_transfer_timeout, long, 4, void)
-void
-log_flush(void)
-{
- fflush(stdout);
- fflush(stderr);
-}
-
/* Tests */
static void
printf("1kb\n");
create_file("tmp/1kb", 1);
ensure_file_deleted("tmp/1kb-copy");
- // XXX this is creating directories because of rsync_download's mkdir_p.
- // Is this a symptom of a problem?
- ck_assert_int_eq(0, rsync_download("tmp/1kb", "tmp/1kb-copy"));
+ ck_assert_int_eq(0, rsync("tmp/1kb", "tmp/1kb-copy"));
}
END_TEST
printf("3kb\n");
create_file("tmp/3kb", 3);
ensure_file_deleted("tmp/3kb-copy");
- ck_assert_int_eq(0, rsync_download("tmp/3kb", "tmp/3kb-copy"));
+ ck_assert_int_eq(0, rsync("tmp/3kb", "tmp/3kb-copy"));
}
END_TEST
create_file("tmp/5kb", 5);
ensure_file_deleted("tmp/5kb-copy");
/* Max speed is 1kbps, timeout is 4 seconds */
- ck_assert_int_eq(EIO, rsync_download("tmp/5kb", "tmp/5kb-copy"));
+ ck_assert_int_eq(EIO, rsync("tmp/5kb", "tmp/5kb-copy"));
+}
+END_TEST
+
+static void
+wait_rsyncs(unsigned int expected)
+{
+ unsigned int actual = 0;
+
+ do {
+ actual += rsync_finished();
+ if (expected == actual) {
+ ck_assert_uint_eq(0, rsync_finished());
+ return;
+ }
+ ck_assert(expected > actual);
+ sleep(1);
+ } while (true);
+}
+
+START_TEST(test_rsync_finished)
+{
+ printf("rsync_finished()\n");
+
+ create_file("tmp/2kb", 1);
+ ensure_file_deleted("tmp/2kb-copy");
+
+ rsync_setup();
+
+ ck_assert_int_eq(0, rsync_download("tmp/2kb", "tmp/2kb-copy"));
+ ck_assert_int_eq(0, rsync_finished());
+ wait_rsyncs(1);
+
+ rsync_teardown();
+}
+END_TEST
+
+START_TEST(test_multi_rsync)
+{
+ printf("simultaneous rsyncs\n");
+
+ create_file("tmp/i1", 1);
+ ensure_file_deleted("tmp/i1-copy");
+ create_file("tmp/i2", 1);
+ ensure_file_deleted("tmp/i2-copy");
+ create_file("tmp/i3", 1);
+ ensure_file_deleted("tmp/i3-copy");
+
+ rsync_setup();
+
+ ck_assert_int_eq(0, rsync_download("tmp/i1", "tmp/"));
+ ck_assert_int_eq(0, rsync_finished());
+ ck_assert_int_eq(0, rsync_download("tmp/i2", "tmp/i2-copy"));
+ ck_assert_int_eq(0, rsync_download("tmp/i3", "tmp/i3-copy"));
+ wait_rsyncs(3);
+
+ rsync_teardown();
}
END_TEST
static Suite *create_suite(void)
{
Suite *suite;
- TCase *pipes;
+ TCase *pipes, *spawner;
pipes = tcase_create("pipes");
tcase_add_test(pipes, exhaust_read_fds_test_normal);
tcase_add_test(pipes, full_rsync_timeout_test_5kb);
tcase_set_timeout(pipes, 6);
+ spawner = tcase_create("spawner");
+ tcase_add_test(spawner, test_rsync_finished);
+ tcase_add_test(spawner, test_multi_rsync);
+ tcase_set_timeout(spawner, 6);
+
suite = suite_create("rsync");
suite_add_tcase(suite, pipes);
+ suite_add_tcase(suite, spawner);
return suite;
}