From: Alberto Leiva Popper Date: Fri, 30 Aug 2024 02:48:54 +0000 (-0600) Subject: Exhaust rsync's stderr and stdout at the same time X-Git-Tag: 1.6.4~3^2~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=6f1ecd1f3349e8da11d8fb4a88f63b79bfe7a14b;p=thirdparty%2FFORT-validator.git Exhaust rsync's stderr and stdout at the same time I'm assuming this consumes less RAM, as stdout no longer has to buffer completely until stderr is done. --- diff --git a/src/rsync/rsync.c b/src/rsync/rsync.c index bdd4e887..28f2ed0f 100644 --- a/src/rsync/rsync.c +++ b/src/rsync/rsync.c @@ -12,6 +12,11 @@ #include "config.h" #include "log.h" +#define STDERR_WRITE(fds) fds[0][1] +#define STDOUT_WRITE(fds) fds[1][1] +#define STDERR_READ(fds) fds[0][0] +#define STDOUT_READ(fds) fds[1][0] + /* * Duplicate parent FDs, to pipe rsync output: * - fds[0] = stderr @@ -21,15 +26,15 @@ static void duplicate_fds(int fds[2][2]) { /* Use the loop to catch interruptions */ - while ((dup2(fds[0][1], STDERR_FILENO) == -1) + while ((dup2(STDERR_WRITE(fds), STDERR_FILENO) == -1) && (errno == EINTR)) {} - close(fds[0][1]); - close(fds[0][0]); + close(STDERR_WRITE(fds)); + close(STDERR_READ(fds)); - while ((dup2(fds[1][1], STDOUT_FILENO) == -1) + while ((dup2(STDOUT_WRITE(fds), STDOUT_FILENO) == -1) && (errno == EINTR)) {} - close(fds[1][1]); - close(fds[1][0]); + close(STDOUT_WRITE(fds)); + close(STDOUT_READ(fds)); } static void @@ -110,8 +115,8 @@ create_pipes(int fds[2][2]) error = errno; /* Close pipe previously created */ - close(fds[0][0]); - close(fds[0][1]); + close(STDERR_READ(fds)); + close(STDERR_WRITE(fds)); pr_op_err_st("Piping rsync stdout: %s", strerror(error)); return -error; @@ -130,7 +135,7 @@ get_current_millis(void) } static void -log_buffer(char const *buffer, ssize_t read, int type) +log_buffer(char const *buffer, ssize_t read, bool is_error) { #define PRE_RSYNC "[RSYNC exec]: " char *cpy, *cur, *tmp; @@ -148,42 +153,54 @@ log_buffer(char const *buffer, ssize_t read, int type) cur = tmp + 1; continue; } - if (type == 0) { + if (is_error) pr_val_err(PRE_RSYNC "%s", cur); - } else { + else pr_val_debug(PRE_RSYNC "%s", cur); - } cur = tmp + 1; } free(cpy); #undef PRE_RSYNC } +#define DROP_FD(f, fail) \ + do { \ + pfd[f].fd = -1; \ + error |= fail; \ + } while (0) +#define CLOSE_FD(f, fail) \ + do { \ + close(pfd[f].fd); \ + DROP_FD(f, fail); \ + } while (0) + /* - * Consumes (and throws away) all the bytes in read stream @fd, - * then closes it after end of stream. + * Consumes (and throws away) all the bytes in read streams @fderr and @fdout, + * then closes them once they reach end of stream. * * Returns: ok -> 0, error -> 1, timeout -> 2. */ static int -exhaust_read_fd(int fd, int type) +exhaust_read_fds(int fderr, int fdout) { - char buffer[4096]; - ssize_t count; - struct pollfd pfd[1]; - int error, nready; + struct pollfd pfd[2]; + int error, nready, f; long epoch, delta, timeout; memset(&pfd, 0, sizeof(pfd)); - pfd[0].fd = fd; + pfd[0].fd = fderr; pfd[0].events = POLLIN; + pfd[1].fd = fdout; + pfd[1].events = POLLIN; + + error = 0; epoch = get_current_millis(); delta = 0; timeout = 1000 * config_get_rsync_transfer_timeout(); while (1) { - nready = poll(pfd, 1, timeout - delta); + nready = poll(pfd, 2, timeout - delta); if (nready == 0) goto timed_out; if (nready == -1) { @@ -191,54 +208,65 @@ exhaust_read_fd(int fd, int type) if (error == EINTR) continue; pr_val_err("rsync bad poll: %s", strerror(error)); + error = 1; goto fail; } - if (pfd[0].revents & POLLNVAL) { - pr_val_err("rsync bad fd: %i", pfd[0].fd); - return 1; /* Already closed */ - } else if (pfd[0].revents & POLLERR) { - pr_val_err("Generic error during rsync poll."); - goto fail; - } else if (pfd[0].revents & (POLLIN|POLLHUP)) { - count = read(fd, buffer, sizeof(buffer)); - if (count == -1) { - error = errno; - if (error == EINTR) + + for (f = 0; f < 2; f++) { + if (pfd[f].revents & POLLNVAL) { + pr_val_err("rsync bad fd: %i", pfd[f].fd); + DROP_FD(f, 1); + + } else if (pfd[f].revents & POLLERR) { + pr_val_err("Generic error during rsync poll."); + CLOSE_FD(f, 1); + + } else if (pfd[f].revents & (POLLIN|POLLHUP)) { + char buffer[4096]; + ssize_t count; + + count = read(pfd[f].fd, buffer, sizeof(buffer)); + if (count == -1) { + error = errno; + if (error == EINTR) + continue; + pr_val_err("rsync buffer read error: %s", + strerror(error)); + CLOSE_FD(f, 1); continue; - pr_val_err("rsync buffer read error: %s", - strerror(error)); - goto fail; - } - if (count == 0) - break; + } - log_buffer(buffer, count, type); + if (count == 0) + CLOSE_FD(f, 0); + log_buffer(buffer, count, pfd[f].fd == fderr); + } } + if (pfd[0].fd == -1 && pfd[1].fd == -1) + return error; /* Happy path! */ + delta = get_current_millis() - epoch; if (delta < 0) { - pr_val_err("This clock does not seem monotonic. I'm going to have to give up this rsync."); + pr_val_err("This clock does not seem monotonic. " + "I'm going to have to give up this rsync."); + error = 1; goto fail; } if (delta >= timeout) goto timed_out; /* Read took too long */ } - close(fd); /* Close read end */ - return 0; - timed_out: pr_val_err("rsync transfer timeout reached"); - close(fd); - return 2; - -fail: - close(fd); - return 1; + error = 2; +fail: for (f = 0; f < 2; f++) + if (pfd[f].fd != -1) + close(pfd[f].fd); + return error; } /* - * Completely consumes @fd's streams, and closes them. + * Completely consumes @fds' streams, and closes them. * * Allegedly, this is a portable way to wait for the child process to finish. * (IIRC, waitpid() doesn't do this reliably.) @@ -246,20 +274,9 @@ fail: static int exhaust_pipes(int fds[2][2]) { - int error; - - close(fds[0][1]); /* Standard error, write end */ - close(fds[1][1]); /* Standard output, write end */ - - /* Standard error, read end */ - error = exhaust_read_fd(fds[0][0], 0); - if (error) { - close(fds[1][0]); - return error; - } - - /* Standard output, read end */ - return exhaust_read_fd(fds[1][0], 1); + close(STDERR_WRITE(fds)); + close(STDOUT_WRITE(fds)); + return exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)); } /* @@ -326,10 +343,10 @@ rsync_download(char const *src, char const *dst, bool is_directory) pr_op_err_st("Couldn't fork to execute rsync: %s", strerror(error)); /* Close all ends from the created pipes */ - close(fork_fds[0][0]); - close(fork_fds[1][0]); - close(fork_fds[0][1]); - close(fork_fds[1][1]); + close(STDERR_READ(fork_fds)); + close(STDOUT_READ(fork_fds)); + close(STDERR_WRITE(fork_fds)); + close(STDOUT_WRITE(fork_fds)); goto release_args; } diff --git a/test/rsync/rsync_test.c b/test/rsync/rsync_test.c index bc124a58..4e542531 100644 --- a/test/rsync/rsync_test.c +++ b/test/rsync/rsync_test.c @@ -24,64 +24,64 @@ disable_sigpipe(void) static void * rsync_fast(void *arg) { - int writefd = *((int *)arg); - free(arg); + int fds[2][2]; + memcpy(fds, arg, sizeof(fds)); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); - close(writefd); + close(STDERR_WRITE(fds)); + close(STDOUT_WRITE(fds)); return NULL; } static void * rsync_stalled(void *arg) { - int writefd = *((int *)arg); - free(arg); + int fds[2][2]; + memcpy(fds, arg, sizeof(fds)); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); sleep(5); /* The timeout is 4 seconds */ - ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_ne(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); - close(writefd); + close(STDERR_WRITE(fds)); + close(STDOUT_WRITE(fds)); return NULL; } static void * rsync_drip_feed(void *arg) { - int writefd = *((int *)arg); - free(arg); + int fds[2][2]; + memcpy(fds, arg, sizeof(fds)); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); sleep(1); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN)); sleep(1); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); sleep(1); - ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN)); sleep(2); - ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN)); + ck_assert_int_ne(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN)); - close(writefd); + close(STDERR_WRITE(fds)); + close(STDOUT_WRITE(fds)); return NULL; } static void -prepare_test(int fds[2], pthread_t *thread, void *(*rsync_simulator)(void *)) +prepare_test(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *)) { - int *arg; - - ck_assert_int_eq(0, pipe(fds)); - - arg = pmalloc(sizeof(fds[1])); - *arg = fds[1]; - ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, arg)); + ck_assert_int_eq(0, pipe(fds[0])); + ck_assert_int_eq(0, pipe(fds[1])); + ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, fds)); } static void @@ -92,24 +92,24 @@ finish_test(pthread_t thread) START_TEST(read_pipe_test) /* Tests the read_pipe() function */ { - int fds[2]; + int fds[2][2]; pthread_t rsync_writer; printf("This test needs to exhaust some timeouts. Please be patient.\n"); printf("Normal transfer\n"); prepare_test(fds, &rsync_writer, rsync_fast); - ck_assert_int_eq(0, exhaust_read_fd(fds[0], 0)); + ck_assert_int_eq(0, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds))); finish_test(rsync_writer); printf("Stalled transfer\n"); prepare_test(fds, &rsync_writer, rsync_stalled); - ck_assert_int_eq(2, exhaust_read_fd(fds[0], 0)); + ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds))); finish_test(rsync_writer); printf("Drip-feed\n"); prepare_test(fds, &rsync_writer, rsync_drip_feed); - ck_assert_int_eq(2, exhaust_read_fd(fds[0], 0)); + ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds))); finish_test(rsync_writer); } END_TEST