]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Exhaust rsync's stderr and stdout at the same time
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Fri, 30 Aug 2024 02:48:54 +0000 (20:48 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Fri, 30 Aug 2024 02:48:54 +0000 (20:48 -0600)
I'm assuming this consumes less RAM, as stdout no longer has to buffer
completely until stderr is done.

src/rsync/rsync.c
test/rsync/rsync_test.c

index bdd4e887efd2dac78ddb8f7c235e8c0e49b0275f..28f2ed0f54806c2b3a5e47924f9e0cc6d861213b 100644 (file)
 #include "config.h"
 #include "log.h"
 
+#define STDERR_WRITE(fds) fds[0][1]
+#define STDOUT_WRITE(fds) fds[1][1]
+#define STDERR_READ(fds)  fds[0][0]
+#define STDOUT_READ(fds)  fds[1][0]
+
 /*
  * Duplicate parent FDs, to pipe rsync output:
  * - fds[0] = stderr
@@ -21,15 +26,15 @@ static void
 duplicate_fds(int fds[2][2])
 {
        /* Use the loop to catch interruptions */
-       while ((dup2(fds[0][1], STDERR_FILENO) == -1)
+       while ((dup2(STDERR_WRITE(fds), STDERR_FILENO) == -1)
                && (errno == EINTR)) {}
-       close(fds[0][1]);
-       close(fds[0][0]);
+       close(STDERR_WRITE(fds));
+       close(STDERR_READ(fds));
 
-       while ((dup2(fds[1][1], STDOUT_FILENO) == -1)
+       while ((dup2(STDOUT_WRITE(fds), STDOUT_FILENO) == -1)
            && (errno == EINTR)) {}
-       close(fds[1][1]);
-       close(fds[1][0]);
+       close(STDOUT_WRITE(fds));
+       close(STDOUT_READ(fds));
 }
 
 static void
@@ -110,8 +115,8 @@ create_pipes(int fds[2][2])
                error = errno;
 
                /* Close pipe previously created */
-               close(fds[0][0]);
-               close(fds[0][1]);
+               close(STDERR_READ(fds));
+               close(STDERR_WRITE(fds));
 
                pr_op_err_st("Piping rsync stdout: %s", strerror(error));
                return -error;
@@ -130,7 +135,7 @@ get_current_millis(void)
 }
 
 static void
-log_buffer(char const *buffer, ssize_t read, int type)
+log_buffer(char const *buffer, ssize_t read, bool is_error)
 {
 #define PRE_RSYNC "[RSYNC exec]: "
        char *cpy, *cur, *tmp;
@@ -148,42 +153,54 @@ log_buffer(char const *buffer, ssize_t read, int type)
                        cur = tmp + 1;
                        continue;
                }
-               if (type == 0) {
+               if (is_error)
                        pr_val_err(PRE_RSYNC "%s", cur);
-               } else {
+               else
                        pr_val_debug(PRE_RSYNC "%s", cur);
-               }
                cur = tmp + 1;
        }
        free(cpy);
 #undef PRE_RSYNC
 }
 
+#define DROP_FD(f, fail)               \
+       do {                            \
+               pfd[f].fd = -1;         \
+               error |= fail;          \
+       } while (0)
+#define CLOSE_FD(f, fail)              \
+       do {                            \
+               close(pfd[f].fd);       \
+               DROP_FD(f, fail);       \
+       } while (0)
+
 /*
- * Consumes (and throws away) all the bytes in read stream @fd,
- * then closes it after end of stream.
+ * Consumes (and throws away) all the bytes in read streams @fderr and @fdout,
+ * then closes them once they reach end of stream.
  *
  * Returns: ok -> 0, error -> 1, timeout -> 2.
  */
 static int
-exhaust_read_fd(int fd, int type)
+exhaust_read_fds(int fderr, int fdout)
 {
-       char buffer[4096];
-       ssize_t count;
-       struct pollfd pfd[1];
-       int error, nready;
+       struct pollfd pfd[2];
+       int error, nready, f;
        long epoch, delta, timeout;
 
        memset(&pfd, 0, sizeof(pfd));
-       pfd[0].fd = fd;
+       pfd[0].fd = fderr;
        pfd[0].events = POLLIN;
+       pfd[1].fd = fdout;
+       pfd[1].events = POLLIN;
+
+       error = 0;
 
        epoch = get_current_millis();
        delta = 0;
        timeout = 1000 * config_get_rsync_transfer_timeout();
 
        while (1) {
-               nready = poll(pfd, 1, timeout - delta);
+               nready = poll(pfd, 2, timeout - delta);
                if (nready == 0)
                        goto timed_out;
                if (nready == -1) {
@@ -191,54 +208,65 @@ exhaust_read_fd(int fd, int type)
                        if (error == EINTR)
                                continue;
                        pr_val_err("rsync bad poll: %s", strerror(error));
+                       error = 1;
                        goto fail;
                }
-               if (pfd[0].revents & POLLNVAL) {
-                       pr_val_err("rsync bad fd: %i", pfd[0].fd);
-                       return 1; /* Already closed */
-               } else if (pfd[0].revents & POLLERR) {
-                       pr_val_err("Generic error during rsync poll.");
-                       goto fail;
-               } else if (pfd[0].revents & (POLLIN|POLLHUP)) {
-                       count = read(fd, buffer, sizeof(buffer));
-                       if (count == -1) {
-                               error = errno;
-                               if (error == EINTR)
+
+               for (f = 0; f < 2; f++) {
+                       if (pfd[f].revents & POLLNVAL) {
+                               pr_val_err("rsync bad fd: %i", pfd[f].fd);
+                               DROP_FD(f, 1);
+
+                       } else if (pfd[f].revents & POLLERR) {
+                               pr_val_err("Generic error during rsync poll.");
+                               CLOSE_FD(f, 1);
+
+                       } else if (pfd[f].revents & (POLLIN|POLLHUP)) {
+                               char buffer[4096];
+                               ssize_t count;
+
+                               count = read(pfd[f].fd, buffer, sizeof(buffer));
+                               if (count == -1) {
+                                       error = errno;
+                                       if (error == EINTR)
+                                               continue;
+                                       pr_val_err("rsync buffer read error: %s",
+                                           strerror(error));
+                                       CLOSE_FD(f, 1);
                                        continue;
-                               pr_val_err("rsync buffer read error: %s",
-                                   strerror(error));
-                               goto fail;
-                       }
-                       if (count == 0)
-                               break;
+                               }
 
-                       log_buffer(buffer, count, type);
+                               if (count == 0)
+                                       CLOSE_FD(f, 0);
+                               log_buffer(buffer, count, pfd[f].fd == fderr);
+                       }
                }
 
+               if (pfd[0].fd == -1 && pfd[1].fd == -1)
+                       return error; /* Happy path! */
+
                delta = get_current_millis() - epoch;
                if (delta < 0) {
-                       pr_val_err("This clock does not seem monotonic. I'm going to have to give up this rsync.");
+                       pr_val_err("This clock does not seem monotonic. "
+                           "I'm going to have to give up this rsync.");
+                       error = 1;
                        goto fail;
                }
                if (delta >= timeout)
                        goto timed_out; /* Read took too long */
        }
 
-       close(fd); /* Close read end */
-       return 0;
-
 timed_out:
        pr_val_err("rsync transfer timeout reached");
-       close(fd);
-       return 2;
-
-fail:
-       close(fd);
-       return 1;
+       error = 2;
+fail:  for (f = 0; f < 2; f++)
+               if (pfd[f].fd != -1)
+                       close(pfd[f].fd);
+       return error;
 }
 
 /*
- * Completely consumes @fd's streams, and closes them.
+ * Completely consumes @fds' streams, and closes them.
  *
  * Allegedly, this is a portable way to wait for the child process to finish.
  * (IIRC, waitpid() doesn't do this reliably.)
@@ -246,20 +274,9 @@ fail:
 static int
 exhaust_pipes(int fds[2][2])
 {
-       int error;
-
-       close(fds[0][1]); /* Standard error, write end */
-       close(fds[1][1]); /* Standard output, write end */
-
-       /* Standard error, read end */
-       error = exhaust_read_fd(fds[0][0], 0);
-       if (error) {
-               close(fds[1][0]);
-               return error;
-       }
-
-       /* Standard output, read end */
-       return exhaust_read_fd(fds[1][0], 1);
+       close(STDERR_WRITE(fds));
+       close(STDOUT_WRITE(fds));
+       return exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds));
 }
 
 /*
@@ -326,10 +343,10 @@ rsync_download(char const *src, char const *dst, bool is_directory)
                        pr_op_err_st("Couldn't fork to execute rsync: %s",
                           strerror(error));
                        /* Close all ends from the created pipes */
-                       close(fork_fds[0][0]);
-                       close(fork_fds[1][0]);
-                       close(fork_fds[0][1]);
-                       close(fork_fds[1][1]);
+                       close(STDERR_READ(fork_fds));
+                       close(STDOUT_READ(fork_fds));
+                       close(STDERR_WRITE(fork_fds));
+                       close(STDOUT_WRITE(fork_fds));
                        goto release_args;
                }
 
index bc124a58445635e9c388ee5fd00c28c822eeb01f..4e5425313632a7d6d11edc9c7d1b3f88477fe69b 100644 (file)
@@ -24,64 +24,64 @@ disable_sigpipe(void)
 static void *
 rsync_fast(void *arg)
 {
-       int writefd = *((int *)arg);
-       free(arg);
+       int fds[2][2];
+       memcpy(fds, arg, sizeof(fds));
 
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
 
-       close(writefd);
+       close(STDERR_WRITE(fds));
+       close(STDOUT_WRITE(fds));
        return NULL;
 }
 
 static void *
 rsync_stalled(void *arg)
 {
-       int writefd = *((int *)arg);
-       free(arg);
+       int fds[2][2];
+       memcpy(fds, arg, sizeof(fds));
 
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
 
        sleep(5); /* The timeout is 4 seconds */
 
-       ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_ne(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
 
-       close(writefd);
+       close(STDERR_WRITE(fds));
+       close(STDOUT_WRITE(fds));
        return NULL;
 }
 
 static void *
 rsync_drip_feed(void *arg)
 {
-       int writefd = *((int *)arg);
-       free(arg);
+       int fds[2][2];
+       memcpy(fds, arg, sizeof(fds));
 
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
        sleep(1);
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
        sleep(1);
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
        sleep(1);
-       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
        sleep(2);
-       ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_ne(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
 
-       close(writefd);
+       close(STDERR_WRITE(fds));
+       close(STDOUT_WRITE(fds));
        return NULL;
 }
 
 static void
-prepare_test(int fds[2], pthread_t *thread, void *(*rsync_simulator)(void *))
+prepare_test(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *))
 {
-       int *arg;
-
-       ck_assert_int_eq(0, pipe(fds));
-
-       arg = pmalloc(sizeof(fds[1]));
-       *arg = fds[1];
-       ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, arg));
+       ck_assert_int_eq(0, pipe(fds[0]));
+       ck_assert_int_eq(0, pipe(fds[1]));
+       ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, fds));
 }
 
 static void
@@ -92,24 +92,24 @@ finish_test(pthread_t thread)
 
 START_TEST(read_pipe_test) /* Tests the read_pipe() function */
 {
-       int fds[2];
+       int fds[2][2];
        pthread_t rsync_writer;
 
        printf("This test needs to exhaust some timeouts. Please be patient.\n");
 
        printf("Normal transfer\n");
        prepare_test(fds, &rsync_writer, rsync_fast);
-       ck_assert_int_eq(0, exhaust_read_fd(fds[0], 0));
+       ck_assert_int_eq(0, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
        finish_test(rsync_writer);
 
        printf("Stalled transfer\n");
        prepare_test(fds, &rsync_writer, rsync_stalled);
-       ck_assert_int_eq(2, exhaust_read_fd(fds[0], 0));
+       ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
        finish_test(rsync_writer);
 
        printf("Drip-feed\n");
        prepare_test(fds, &rsync_writer, rsync_drip_feed);
-       ck_assert_int_eq(2, exhaust_read_fd(fds[0], 0));
+       ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
        finish_test(rsync_writer);
 }
 END_TEST