From: Alberto Leiva Popper Date: Sat, 19 Oct 2024 00:18:00 +0000 (-0600) Subject: Move the rsync spawner to a separate process X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=085987fce0f31d2097ce7750a81ab41e116d75db;p=thirdparty%2FFORT-validator.git Move the rsync spawner to a separate process The fork()s (needed to spawn rsyncs) duplicate Fort's process. Which is messy in a multithreaded program. Quoting the Linux man page: > * The child process is created with a single thread—the one that > called fork(). The entire virtual address space of the parent is > replicated in the child, including the states of mutexes, condition > variables, and other pthreads objects. (...) > * After a fork() in a multithreaded program, the child can safely call > only async-signal-safe functions (...) until such time as it calls > execve(2). As far as I can tell, since the forked child was, in fact, careful to only invoke async-signal-safe functions, this wasn't really a bug. Still, it wasn't quality architecture either. Moving the rsync spawner to a dedicated subprocess should stop the forks from threatening to clash with the multithreading completely. Relies on the new core loop design, so this won't work properly until that's implemented. --- diff --git a/src/cache.c b/src/cache.c index 8cb0e641..704cf95a 100644 --- a/src/cache.c +++ b/src/cache.c @@ -316,12 +316,6 @@ cache_setup(void) return 0; } -void -cache_teardown(void) -{ - /* Empty */ -} - static struct cache_node * json2node(json_t *json) { diff --git a/src/cache.h b/src/cache.h index 75939810..24d1ea7d 100644 --- a/src/cache.h +++ b/src/cache.h @@ -6,7 +6,6 @@ #include "types/rpp.h" int cache_setup(void); /* Init this module */ -void cache_teardown(void); /* Destroy this module */ void cache_atexit(void); int cache_prepare(void); /* Prepare cache for new validation cycle */ diff --git a/src/log.c b/src/log.c index 09b81b17..0c2bd25d 100644 --- a/src/log.c +++ b/src/log.c @@ -211,15 +211,6 @@ log_teardown(void) pthread_mutex_destroy(&logck); } -void -log_flush(void) -{ - if (op_config.fprintf_enabled || val_config.fprintf_enabled) { - fflush(stdout); - fflush(stderr); - } -} - bool log_val_enabled(unsigned int level) { diff --git a/src/log.h b/src/log.h index 5c94dd43..8fb10aa0 100644 --- a/src/log.h +++ b/src/log.h @@ -59,9 +59,6 @@ int log_setup(void); void log_start(void); void log_teardown(void); -/* Call to flush the stdout/stderr streams */ -void log_flush(void); - /* * Check if corresponding logging is enabled. You can use these to short-circuit * out of heavy logging code. diff --git a/src/main.c b/src/main.c index ee61d7d6..994bf7d7 100644 --- a/src/main.c +++ b/src/main.c @@ -11,6 +11,7 @@ #include "print_file.h" #include "relax_ng.h" #include "rtr/rtr.h" +#include "rsync.h" #include "sig.h" #include "thread_var.h" @@ -121,20 +122,22 @@ main(int argc, char **argv) error = log_setup(); if (error) goto just_quit; + error = handle_flags_config(argc, argv); + if (error) + goto revert_log; + + rsync_setup(); /* Spawn rsync spawner ASAP */ register_signal_handlers(); + error = thvar_init(); if (error) - goto revert_log; + goto revert_rsync; error = incidence_init(); if (error) - goto revert_log; - error = handle_flags_config(argc, argv); - if (error) - goto revert_log; - + goto revert_rsync; error = nid_init(); if (error) - goto revert_config; + goto revert_rsync; error = extension_init(); if (error) goto revert_nid; @@ -144,7 +147,6 @@ main(int argc, char **argv) error = http_init(); if (error) goto revert_hash; - error = relax_ng_init(); if (error) goto revert_http; @@ -156,7 +158,7 @@ main(int argc, char **argv) goto revert_vrps; error = output_setup(); if (error) - goto revert_cache; + goto revert_vrps; /* Meat */ @@ -173,8 +175,6 @@ main(int argc, char **argv) } /* End */ -revert_cache: - cache_teardown(); revert_vrps: vrps_destroy(); revert_relax_ng: @@ -185,7 +185,8 @@ revert_hash: hash_teardown(); revert_nid: nid_destroy(); -revert_config: +revert_rsync: + rsync_teardown(); free_rpki_config(); revert_log: log_teardown(); diff --git a/src/rsync.c b/src/rsync.c index e77afb6e..25f15846 100644 --- a/src/rsync.c +++ b/src/rsync.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,201 @@ #define STDERR_READ(fds) fds[0][0] #define STDOUT_READ(fds) fds[1][0] +static pid_t spawner; /* The process that spawns rsync runs */ + +static int readfd; /* Our end of the spawner-to-parent pipe */ +static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER; + +static int writefd; /* Our end of the parent-to-spawner pipe */ +static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER; + +static int rsync(char const *, char const *); + +static int +run_child(int readfd, int writefd) +{ + unsigned char zero = 0; + struct read_stream stream; + char *url, *path; + int error; + + read_stream_init(&stream, readfd); + + do { + error = read_string(&stream, &url); + if (error || url == NULL) + break; + error = read_string(&stream, &path); + if (error || path == NULL) { + free(url); + break; + } + + error = rsync(url, path); + + free(url); + free(path); + + error = full_write(writefd, &zero, 1); + } while (!error); + + read_stream_close(&stream); + close(writefd); + free_rpki_config(); + log_teardown(); + return error; +} + +void +rsync_setup(void) +{ + static const int RDFD = 0; + static const int WRFD = 1; + int parent2spawner[2]; /* Pipe: Parent writes, spawner reads */ + int spawner2parent[2]; /* Pipe: Spawner writes, parent reads */ + int flags; + + if (pipe(parent2spawner) < 0) { + pr_op_err("Cannot create pipe: %s", strerror(errno)); + goto fail1; + } + if (pipe(spawner2parent) < 0) { + pr_op_err("Cannot create pipe: %s", strerror(errno)); + goto fail2; + } + + flags = fcntl(spawner2parent[RDFD], F_GETFL); + if (flags < 0) { + pr_op_err("Cannot retrieve pipe flags: %s", strerror(errno)); + goto fail3; + } + if (fcntl(spawner2parent[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) { + pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(errno)); + goto fail3; + } + + fflush(stdout); + fflush(stderr); + + spawner = fork(); + if (spawner < 0) { + pr_op_err("Cannot fork rsync spawner: %s", strerror(errno)); + goto fail3; + } + + if (spawner == 0) { /* Client code */ + close(parent2spawner[WRFD]); + close(spawner2parent[RDFD]); + exit(run_child(parent2spawner[RDFD], spawner2parent[WRFD])); + } + + /* Parent code */ + close(parent2spawner[RDFD]); + close(spawner2parent[WRFD]); + readfd = spawner2parent[RDFD]; + writefd = parent2spawner[WRFD]; + return; + +fail3: close(spawner2parent[RDFD]); + close(spawner2parent[WRFD]); +fail2: close(parent2spawner[RDFD]); + close(parent2spawner[WRFD]); +fail1: pr_op_warn("rsync will not be available."); + readfd = writefd = 0; +} + +int +rsync_download(char const *url, char const *path) +{ + mutex_lock(&writelock); + + if (writefd == 0) + goto fail1; + if (write_string(writefd, url) != 0) + goto fail2; + if (write_string(writefd, path) != 0) + goto fail2; + + mutex_unlock(&writelock); + return 0; // XXX go pick some other task + +fail2: close(readfd); + close(writefd); + readfd = writefd = 0; +fail1: mutex_unlock(&writelock); + return EIO; +} + +/* Returns the number of rsyncs that have ended since the last query */ +unsigned int +rsync_finished(void) +{ + unsigned char buf[8]; + ssize_t result; + + mutex_lock(&readlock); + result = (readfd != 0) ? read(readfd, buf, sizeof(buf)) : 0; + mutex_unlock(&readlock); + + return (result >= 0) ? result : 0; +} + +static int +wait_child(char const *name, pid_t pid) +{ + int status; + int error; + +again: status = 0; + if (waitpid(pid, &status, 0) < 0) { + error = errno; + pr_op_err("Could not wait for %s: %s", name, strerror(error)); + return error; + } + + if (WIFEXITED(status)) { + /* Happy path (but also sad path sometimes) */ + error = WEXITSTATUS(status); + pr_val_debug("%s ended. Result: %d", name, error); + return error ? EIO : 0; + } + + if (WIFSIGNALED(status)) { + pr_op_warn("%s interrupted by signal %d.", + name, WTERMSIG(status)); + return EINTR; + } + + if (WIFCONTINUED(status)) { + /* + * Testing warning: + * I can't trigger this branch. It always exits or signals; + * SIGSTOP then SIGCONT doesn't seem to wake up waitpid(). + * It's concerning because every sample code I've found assumes + * waitpid() returning always means the subprocess ended, so + * they never retry. But that contradicts all documentation, + * yet seems to be accurate to reality. + */ + pr_op_debug("%s has resumed.", name); + goto again; + } + + /* Dead code */ + pr_op_err("Unknown waitpid() status; giving up %s.", name); + return EINVAL; +} + +void +rsync_teardown(void) +{ + if (readfd) + close(readfd); + if (writefd) + close(writefd); + readfd = writefd = 0; + wait_child("rsync spawner", spawner); +} + /* * Duplicate parent FDs, to pipe rsync output: * - fds[0] = stderr @@ -75,24 +271,18 @@ prepare_rsync_args(char **args, char const *url, char const *path) args[i++] = NULL; } -__dead static void -handle_child_thread(char const *url, char const *path, int fds[2][2]) +static int +execvp_rsync(char const *url, char const *path, int fds[2][2]) { - /* THIS FUNCTION MUST NEVER RETURN!!! */ char *args[20]; - int error; prepare_rsync_args(args, url, path); duplicate_fds(fds); - execvp(args[0], args); - error = errno; - /* Log directly to stderr, redirected by the pipes */ - fprintf(stderr, "Could not execute the rsync command: %s\n", - strerror(error)); + if (execvp(args[0], args) < 0) + return errno; - /* https://stackoverflow.com/a/14493459/1735458 */ - exit(-error); + return EINVAL; /* Unreachable, but whatever */ } static int @@ -110,8 +300,8 @@ create_pipes(int fds[2][2]) error = errno; /* Close pipe previously created */ - close(STDERR_READ(fds)); - close(STDERR_WRITE(fds)); + close(fds[0][0]); + close(fds[0][1]); pr_op_err_st("Piping rsync stdout: %s", strerror(error)); return -error; @@ -132,7 +322,6 @@ get_current_millis(void) static void log_buffer(char const *buffer, ssize_t read, bool is_error) { -#define PRE_RSYNC "[RSYNC exec]: " char *cpy, *cur, *tmp; cpy = pmalloc(read + 1); @@ -149,13 +338,12 @@ log_buffer(char const *buffer, ssize_t read, bool is_error) continue; } if (is_error) - pr_val_err(PRE_RSYNC "%s", cur); + pr_val_err("[RSYNC exec] %s", cur); else - pr_val_debug(PRE_RSYNC "%s", cur); + pr_val_debug("[RSYNC exec] %s", cur); cur = tmp + 1; } free(cpy); -#undef PRE_RSYNC } #define DROP_FD(f, fail) \ @@ -252,7 +440,7 @@ exhaust_read_fds(int fderr, int fdout) } timed_out: - pr_val_err("rsync transfer timeout reached"); + pr_val_err("rsync transfer timeout exhausted"); error = 2; fail: for (f = 0; f < 2; f++) if (pfd[f].fd != -1) @@ -263,8 +451,11 @@ fail: for (f = 0; f < 2; f++) /* * Completely consumes @fds' streams, and closes them. * - * Allegedly, this is a portable way to wait for the child process to finish. - * (IIRC, waitpid() doesn't do this reliably.) + * Originally, this was meant to redirect rsync's output to syslog: + * ac56d70c954caf49382f5f28ff4a017e859e2e0a + * (ie. we need to exhaust the streams because we dup2()'d them.) + * + * Later, @job repurposed this code to fix #74. */ static int exhaust_pipes(int fds[2][2]) @@ -275,42 +466,25 @@ exhaust_pipes(int fds[2][2]) } /* rsync @url @path */ -int -rsync_download(char const *url, char const *path) +static int +rsync(char const *url, char const *path) { /* Descriptors to pipe stderr (first element) and stdout (second) */ int fork_fds[2][2]; pid_t child_pid; - int child; int error; - pr_val_info("rsync: %s -> %s", url, path); - error = create_pipes(fork_fds); if (error) return error; - /* Flush output (avoid locks between father and child) */ - log_flush(); + fflush(stdout); + fflush(stderr); - /* We need to fork because execvp() magics the thread away. */ child_pid = fork(); - if (child_pid == 0) { - /* - * This code is run by the child, and should try to - * call execvp() as soon as possible. - * - * Refer to - * https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html - * "{..} to avoid errors, the child process may only - * execute async-signal-safe operations until such time - * as one of the exec functions is called." - */ - handle_child_thread(url, path, fork_fds); - } if (child_pid < 0) { error = errno; - pr_op_err_st("Couldn't fork to execute rsync: %s", + pr_op_err_st("Couldn't spawn the rsync process: %s", strerror(error)); /* Close all ends from the created pipes */ close(STDERR_READ(fork_fds)); @@ -320,30 +494,14 @@ rsync_download(char const *url, char const *path) return error; } - /* This code is run by us. */ + if (child_pid == 0) + exit(execvp_rsync(url, path, fork_fds)); /* Child code */ + + /* Parent code */ + error = exhaust_pipes(fork_fds); if (error) kill(child_pid, SIGTERM); /* Stop the child */ - child = 0; - if (waitpid(child_pid, &child, 0) < 0) { - error = errno; - pr_op_err("Could not wait for rsync: %s", strerror(error)); - return error; - } - - if (WIFEXITED(child)) { - /* Happy path (but also sad path sometimes) */ - error = WEXITSTATUS(child); - pr_val_debug("rsync ended. Result: %d", error); - return error ? EIO : 0; - } - - if (WIFSIGNALED(child)) { - pr_op_warn("rsync interrupted by signal %d.", WTERMSIG(child)); - return EINTR; /* Meh? */ - } - - pr_op_err_st("rsync died in an unknown way."); - return -EINVAL; + return wait_child("rsync", child_pid); } diff --git a/src/rsync.h b/src/rsync.h index ef4fcc71..943ece5e 100644 --- a/src/rsync.h +++ b/src/rsync.h @@ -1,6 +1,9 @@ #ifndef SRC_RSYNC_RSYNC_H_ #define SRC_RSYNC_RSYNC_H_ +void rsync_setup(void); int rsync_download(char const *, char const *); +unsigned int rsync_finished(void); +void rsync_teardown(void); #endif /* SRC_RSYNC_RSYNC_H_ */ diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 00000000..0a4055e1 --- /dev/null +++ b/src/stream.c @@ -0,0 +1,151 @@ +#include "stream.h" + +#include +#include +#include + +void +read_stream_init(struct read_stream *stream, int fd) +{ + stream->fd = fd; + stream->buffer = pmalloc(256); + stream->capacity = 256; +} + +void +read_stream_close(struct read_stream *stream) +{ + close(stream->fd); + free(stream->buffer); +} + +/* + * Full read or error. + * + * Nonzero: error + * 0, stream->buffer != NULL: Success + * 0, stream->buffer == NULL: EOF + */ +static int +full_read(struct read_stream *stream, size_t len) +{ + ssize_t rd; + size_t offset; + + if (stream->buffer == NULL) + return 0; + + offset = 0; + do { + rd = read(stream->fd, stream->buffer + offset, len); + if (rd < 0) + return errno; + if (rd == 0) { + free(stream->buffer); + stream->buffer = NULL; + stream->capacity = 0; + return 0; + } + if (rd == len) + return 0; + if (rd > len) + pr_crit("rd > len: %zd > %zu", rd, len); + + len -= rd; + offset += rd; + } while (true); +} + +/* Full write or error. */ +int +full_write(int fd, unsigned char const *buf, size_t len) +{ + ssize_t wr; + size_t offset; + + offset = 0; + do { + wr = write(fd, buf + offset, len); + if (wr < 0) + return errno; + if (wr > len) + pr_crit("wr > len: %zd > %zu", wr, len); + len -= wr; + offset += wr; + } while (len > 0); + + return 0; +} + +/* @value -1 means "EOF". */ +static int +read_ssize_t(struct read_stream *stream, ssize_t *value) +{ + int error; + + error = full_read(stream, 2); + if (error) + return error; + + *value = stream->buffer + ? ((stream->buffer[0] << 8) | stream->buffer[1]) + : -1; + return 0; +} + +static int +write_size_t(int fd, size_t value) +{ + unsigned char buf[2]; + + if (value > 1024) + return ENOSPC; + + buf[0] = (value >> 8) & 0xFF; + buf[1] = value & 0xFF; + + return full_write(fd, buf, 2); +} + +int +read_string(struct read_stream *stream, char **result) +{ + ssize_t len; + int error; + + error = read_ssize_t(stream, &len); + if (error) + return error; + if (len == -1) { + *result = NULL; + return 0; + } + + if (len > stream->capacity) { + do { + stream->capacity *= 2; + } while (len > stream->capacity); + stream->buffer = prealloc(stream->buffer, stream->capacity); + } + + error = full_read(stream, len); + if (error) + return error; + *result = stream->buffer ? pstrdup((char *)stream->buffer) : NULL; + return 0; +} + +int +write_string(int fd, char const *str) +{ + size_t len; + int error; + + len = strlen(str) + 1; + + error = write_size_t(fd, len); + if (error) + return error; + + return full_write(fd, (unsigned char const *) str, len); +} diff --git a/src/stream.h b/src/stream.h new file mode 100644 index 00000000..f0f34398 --- /dev/null +++ b/src/stream.h @@ -0,0 +1,19 @@ +#ifndef SRC_STREAM_H_ +#define SRC_STREAM_H_ + +struct read_stream { + int fd; + unsigned char *buffer; + size_t capacity; +}; + +void read_stream_init(struct read_stream *, int); +void read_stream_close(struct read_stream *); + +int full_write(int, unsigned char const *, size_t); + +/* NULL means "EOF". */ +int read_string(struct read_stream *, char **); +int write_string(int, char const *); + +#endif /* SRC_STREAM_H_ */ diff --git a/test/mock.c b/test/mock.c index 51550e0c..89a76fbd 100644 --- a/test/mock.c +++ b/test/mock.c @@ -81,6 +81,8 @@ pr_crit(const char *format, ...) ck_abort(); } +MOCK_VOID(log_teardown, void) + static char addr_buffer1[INET6_ADDRSTRLEN]; static char addr_buffer2[INET6_ADDRSTRLEN]; @@ -123,6 +125,7 @@ MOCK_NULL(config_get_output_bgpsec, char const *, void) MOCK(config_get_op_log_file_format, enum filename_format, FNF_NAME, void) MOCK(config_get_val_log_file_format, enum filename_format, FNF_NAME, void) MOCK(logv_filename, char const *, path, char const *path) +MOCK_VOID(free_rpki_config, void) MOCK_VOID(fnstack_init, void) MOCK_VOID(fnstack_push, char const *file) diff --git a/test/rsync_test.c b/test/rsync_test.c index e6053386..3930c205 100644 --- a/test/rsync_test.c +++ b/test/rsync_test.c @@ -4,6 +4,7 @@ #include "common.c" #include "mock.c" #include "rsync.c" +#include "stream.c" static char const STR64[] = "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" @@ -16,13 +17,6 @@ static char content[1024]; MOCK(config_get_rsync_program, char const *, "rsync", void) MOCK(config_get_rsync_transfer_timeout, long, 4, void) -void -log_flush(void) -{ - fflush(stdout); - fflush(stderr); -} - /* Tests */ static void @@ -188,9 +182,7 @@ START_TEST(full_rsync_timeout_test_1kb) printf("1kb\n"); create_file("tmp/1kb", 1); ensure_file_deleted("tmp/1kb-copy"); - // XXX this is creating directories because of rsync_download's mkdir_p. - // Is this a symptom of a problem? - ck_assert_int_eq(0, rsync_download("tmp/1kb", "tmp/1kb-copy")); + ck_assert_int_eq(0, rsync("tmp/1kb", "tmp/1kb-copy")); } END_TEST @@ -199,7 +191,7 @@ START_TEST(full_rsync_timeout_test_3kb) printf("3kb\n"); create_file("tmp/3kb", 3); ensure_file_deleted("tmp/3kb-copy"); - ck_assert_int_eq(0, rsync_download("tmp/3kb", "tmp/3kb-copy")); + ck_assert_int_eq(0, rsync("tmp/3kb", "tmp/3kb-copy")); } END_TEST @@ -209,14 +201,70 @@ START_TEST(full_rsync_timeout_test_5kb) create_file("tmp/5kb", 5); ensure_file_deleted("tmp/5kb-copy"); /* Max speed is 1kbps, timeout is 4 seconds */ - ck_assert_int_eq(EIO, rsync_download("tmp/5kb", "tmp/5kb-copy")); + ck_assert_int_eq(EIO, rsync("tmp/5kb", "tmp/5kb-copy")); +} +END_TEST + +static void +wait_rsyncs(unsigned int expected) +{ + unsigned int actual = 0; + + do { + actual += rsync_finished(); + if (expected == actual) { + ck_assert_uint_eq(0, rsync_finished()); + return; + } + ck_assert(expected > actual); + sleep(1); + } while (true); +} + +START_TEST(test_rsync_finished) +{ + printf("rsync_finished()\n"); + + create_file("tmp/2kb", 1); + ensure_file_deleted("tmp/2kb-copy"); + + rsync_setup(); + + ck_assert_int_eq(0, rsync_download("tmp/2kb", "tmp/2kb-copy")); + ck_assert_int_eq(0, rsync_finished()); + wait_rsyncs(1); + + rsync_teardown(); +} +END_TEST + +START_TEST(test_multi_rsync) +{ + printf("simultaneous rsyncs\n"); + + create_file("tmp/i1", 1); + ensure_file_deleted("tmp/i1-copy"); + create_file("tmp/i2", 1); + ensure_file_deleted("tmp/i2-copy"); + create_file("tmp/i3", 1); + ensure_file_deleted("tmp/i3-copy"); + + rsync_setup(); + + ck_assert_int_eq(0, rsync_download("tmp/i1", "tmp/")); + ck_assert_int_eq(0, rsync_finished()); + ck_assert_int_eq(0, rsync_download("tmp/i2", "tmp/i2-copy")); + ck_assert_int_eq(0, rsync_download("tmp/i3", "tmp/i3-copy")); + wait_rsyncs(3); + + rsync_teardown(); } END_TEST static Suite *create_suite(void) { Suite *suite; - TCase *pipes; + TCase *pipes, *spawner; pipes = tcase_create("pipes"); tcase_add_test(pipes, exhaust_read_fds_test_normal); @@ -227,8 +275,14 @@ static Suite *create_suite(void) tcase_add_test(pipes, full_rsync_timeout_test_5kb); tcase_set_timeout(pipes, 6); + spawner = tcase_create("spawner"); + tcase_add_test(spawner, test_rsync_finished); + tcase_add_test(spawner, test_multi_rsync); + tcase_set_timeout(spawner, 6); + suite = suite_create("rsync"); suite_add_tcase(suite, pipes); + suite_add_tcase(suite, spawner); return suite; }