]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Update timeout during every rsync poll
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Thu, 29 Aug 2024 21:19:02 +0000 (15:19 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Thu, 29 Aug 2024 22:25:09 +0000 (16:25 -0600)
Ensures the timeout is absolute even when poll() returns repeatedly.

src/rsync/rsync.c
test/Makefile.am
test/rsync/rsync_test.c [new file with mode: 0644]

index 17c844eecf13e47de446f3769486fb18f6d13153..1b7cb083c41787e7434548a48b44e5147227d803 100644 (file)
@@ -120,6 +120,15 @@ create_pipes(int fds[2][2])
        return 0;
 }
 
+static long
+get_current_millis(void)
+{
+       struct timespec now;
+       if (clock_gettime(CLOCK_MONOTONIC, &now) < 0)
+               pr_crit("clock_gettime() returned %d", errno);
+       return 1000L * now.tv_sec + now.tv_nsec / 1000000L;
+}
+
 static void
 log_buffer(char const *buffer, ssize_t read, int type)
 {
@@ -150,6 +159,10 @@ log_buffer(char const *buffer, ssize_t read, int type)
 #undef PRE_RSYNC
 }
 
+/*
+ * Consumes (and throws away) all the bytes in read stream fd_pipe[type][0],
+ * then closes it after end of stream.
+ */
 static int
 read_pipe(int fd_pipe[2][2], int type)
 {
@@ -157,18 +170,20 @@ read_pipe(int fd_pipe[2][2], int type)
        ssize_t count;
        struct pollfd pfd[1];
        int error, nready;
+       long epoch, delta, timeout;
 
        memset(&pfd, 0, sizeof(pfd));
        pfd[0].fd = fd_pipe[type][0];
        pfd[0].events = POLLIN;
 
+       epoch = get_current_millis();
+       delta = 0;
+       timeout = 1000 * config_get_rsync_transfer_timeout();
+
        while (1) {
-               nready = poll(pfd, 1, 1000 * config_get_rsync_transfer_timeout());
-               if (nready == 0) {
-                       pr_val_err("rsync transfer timeout reached");
-                       close(fd_pipe[type][0]);
-                       return 1;
-               }
+               nready = poll(pfd, 1, timeout - delta);
+               if (nready == 0)
+                       goto timed_out;
                if (nready == -1) {
                        if (errno == EINTR)
                                continue;
@@ -192,18 +207,34 @@ read_pipe(int fd_pipe[2][2], int type)
                        }
                        if (count == 0)
                                break;
+
+                       log_buffer(buffer, count, type);
                }
 
-               log_buffer(buffer, count, type);
+               delta = get_current_millis() - epoch;
+               if (delta < 0) {
+                       pr_val_err("This clock does not seem monotonic. I'm going to have to give up this rsync.");
+                       close(fd_pipe[type][0]);
+                       return 1;
+               }
+               if (delta >= timeout)
+                       goto timed_out; /* Read took too long */
        }
 
        close(fd_pipe[type][0]); /* Close read end */
        return 0;
+
+timed_out:
+       pr_val_err("rsync transfer timeout reached");
+       close(fd_pipe[type][0]);
+       return 1;
 }
 
 /*
- * Read the piped output from the child, assures that all pipes are closed on
- * success and on error.
+ * Completely consumes @fd's streams, and closes them.
+ *
+ * Allegedly, this is a portable way to wait for the child process to finish.
+ * (IIRC, waitpid() doesn't do this reliably.)
  */
 static int
 read_pipes(int fds[2][2])
index f1d7af946a625d2385ef985c9919a6f6a4cd3991..ad29d3d17d3fc69d7e0a2641721dcb4358b07778 100644 (file)
@@ -32,6 +32,7 @@ check_PROGRAMS += pb.test
 check_PROGRAMS += pdu_handler.test
 check_PROGRAMS += pdu_stream.test
 check_PROGRAMS += rrdp.test
+check_PROGRAMS += rsync.test
 check_PROGRAMS += serial.test
 check_PROGRAMS += tal.test
 check_PROGRAMS += thread_pool.test
@@ -72,6 +73,9 @@ pdu_stream_test_LDADD = ${MY_LDADD} ${JANSSON_LIBS}
 rrdp_test_SOURCES = rrdp_test.c
 rrdp_test_LDADD = ${MY_LDADD} ${JANSSON_LIBS} ${XML2_LIBS}
 
+rsync_test_SOURCES = rsync/rsync_test.c
+rsync_test_LDADD = ${MY_LDADD}
+
 serial_test_SOURCES = types/serial_test.c
 serial_test_LDADD = ${MY_LDADD}
 
diff --git a/test/rsync/rsync_test.c b/test/rsync/rsync_test.c
new file mode 100644 (file)
index 0000000..084788e
--- /dev/null
@@ -0,0 +1,156 @@
+#include <check.h>
+
+#include "alloc.c"
+#include "mock.c"
+#include "rsync/rsync.c"
+
+static char const * const PKT = "abcdefghijklmnopqrstuvwxyz";
+static const size_t PKTLEN = sizeof(PKT) - 1;
+
+MOCK(config_get_rsync_transfer_timeout, long, 4, void)
+__MOCK_ABORT(config_get_rsync_program, char *, NULL, void)
+__MOCK_ABORT(config_get_rsync_args, struct string_array const *, NULL, void)
+__MOCK_ABORT(config_get_rsync_retry_count, unsigned int, 0, void)
+__MOCK_ABORT(config_get_rsync_retry_interval, unsigned int, 0, void)
+
+static void
+disable_sigpipe(void)
+{
+       struct sigaction action = { .sa_handler = SIG_IGN };
+       if (sigaction(SIGPIPE, &action, NULL) == -1)
+               pr_crit("Cannot disable SIGPIPE: %s", strerror(errno));
+}
+
+static void *
+rsync_fast(void *arg)
+{
+       int writefd = *((int *)arg);
+       free(arg);
+
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+
+       close(writefd);
+       return NULL;
+}
+
+static void *
+rsync_stalled(void *arg)
+{
+       int writefd = *((int *)arg);
+       free(arg);
+
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+
+       sleep(5); /* The timeout is 4 seconds */
+
+       ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+
+       close(writefd);
+       return NULL;
+}
+
+static void *
+rsync_drip_feed(void *arg)
+{
+       int writefd = *((int *)arg);
+       free(arg);
+
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       sleep(1);
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       sleep(1);
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       sleep(1);
+       ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+       sleep(2);
+       ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+
+       close(writefd);
+       return NULL;
+}
+
+static void
+prepare_test(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *))
+{
+       int *arg;
+
+       ck_assert_int_eq(0, pipe(fds[0]));
+       memset(fds[1], 0, sizeof(fds[1]));
+
+       arg = pmalloc(sizeof(fds[0][1]));
+       *arg = fds[0][1];
+       ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, arg));
+}
+
+static void
+finish_test(int fds[2][2], pthread_t thread)
+{
+       close(fds[0][0]);
+       pthread_join(thread, NULL);
+}
+
+START_TEST(read_pipe_test) /* Tests the read_pipe() function */
+{
+       /*
+        * [0][0] = standard error, read end
+        * [0][1] = standard error, write end
+        * [1][0] = standard output, read end (unused during this test)
+        * [1][1] = standard output, write end (unused during this test)
+        */
+       int fds[2][2];
+       pthread_t rsync_writer;
+
+       printf("This test needs to exhaust some timeouts. Please be patient.\n");
+
+       printf("Normal transfer\n");
+       prepare_test(fds, &rsync_writer, rsync_fast);
+       ck_assert_int_eq(0, read_pipe(fds, 0));
+       finish_test(fds, rsync_writer);
+
+       printf("Stalled transfer\n");
+       prepare_test(fds, &rsync_writer, rsync_stalled);
+       ck_assert_int_eq(1, read_pipe(fds, 0));
+       finish_test(fds, rsync_writer);
+
+       printf("Drip-feed\n");
+       prepare_test(fds, &rsync_writer, rsync_drip_feed);
+       ck_assert_int_eq(1, read_pipe(fds, 0));
+       finish_test(fds, rsync_writer);
+}
+END_TEST
+
+static Suite *xml_load_suite(void)
+{
+       Suite *suite;
+       TCase *pipes;
+
+       pipes = tcase_create("pipes");
+       tcase_add_test(pipes, read_pipe_test);
+       tcase_set_timeout(pipes, 15);
+
+       suite = suite_create("rsync");
+       suite_add_tcase(suite, pipes);
+
+       return suite;
+}
+
+int main(void)
+{
+       Suite *suite;
+       SRunner *runner;
+       int tests_failed;
+
+       disable_sigpipe();
+
+       suite = xml_load_suite();
+
+       runner = srunner_create(suite);
+       srunner_run_all(runner, CK_NORMAL);
+       tests_failed = srunner_ntests_failed(runner);
+       srunner_free(runner);
+
+       return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}