return 0;
}
+static long
+get_current_millis(void)
+{
+ struct timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC, &now) < 0)
+ pr_crit("clock_gettime() returned %d", errno);
+ return 1000L * now.tv_sec + now.tv_nsec / 1000000L;
+}
+
static void
log_buffer(char const *buffer, ssize_t read, int type)
{
#undef PRE_RSYNC
}
+/*
+ * Consumes (and throws away) all the bytes in read stream fd_pipe[type][0],
+ * then closes it after end of stream.
+ */
static int
read_pipe(int fd_pipe[2][2], int type)
{
ssize_t count;
struct pollfd pfd[1];
int error, nready;
+ long epoch, delta, timeout;
memset(&pfd, 0, sizeof(pfd));
pfd[0].fd = fd_pipe[type][0];
pfd[0].events = POLLIN;
+ epoch = get_current_millis();
+ delta = 0;
+ timeout = 1000 * config_get_rsync_transfer_timeout();
+
while (1) {
- nready = poll(pfd, 1, 1000 * config_get_rsync_transfer_timeout());
- if (nready == 0) {
- pr_val_err("rsync transfer timeout reached");
- close(fd_pipe[type][0]);
- return 1;
- }
+ nready = poll(pfd, 1, timeout - delta);
+ if (nready == 0)
+ goto timed_out;
if (nready == -1) {
if (errno == EINTR)
continue;
}
if (count == 0)
break;
+
+ log_buffer(buffer, count, type);
}
- log_buffer(buffer, count, type);
+ delta = get_current_millis() - epoch;
+ if (delta < 0) {
+ pr_val_err("This clock does not seem monotonic. I'm going to have to give up this rsync.");
+ close(fd_pipe[type][0]);
+ return 1;
+ }
+ if (delta >= timeout)
+ goto timed_out; /* Read took too long */
}
close(fd_pipe[type][0]); /* Close read end */
return 0;
+
+timed_out:
+ pr_val_err("rsync transfer timeout reached");
+ close(fd_pipe[type][0]);
+ return 1;
}
/*
- * Read the piped output from the child, assures that all pipes are closed on
- * success and on error.
+ * Completely consumes @fd's streams, and closes them.
+ *
+ * Allegedly, this is a portable way to wait for the child process to finish.
+ * (IIRC, waitpid() doesn't do this reliably.)
*/
static int
read_pipes(int fds[2][2])
--- /dev/null
+#include <check.h>
+
+#include "alloc.c"
+#include "mock.c"
+#include "rsync/rsync.c"
+
+static char const * const PKT = "abcdefghijklmnopqrstuvwxyz";
+static const size_t PKTLEN = sizeof(PKT) - 1;
+
+MOCK(config_get_rsync_transfer_timeout, long, 4, void)
+__MOCK_ABORT(config_get_rsync_program, char *, NULL, void)
+__MOCK_ABORT(config_get_rsync_args, struct string_array const *, NULL, void)
+__MOCK_ABORT(config_get_rsync_retry_count, unsigned int, 0, void)
+__MOCK_ABORT(config_get_rsync_retry_interval, unsigned int, 0, void)
+
+static void
+disable_sigpipe(void)
+{
+ struct sigaction action = { .sa_handler = SIG_IGN };
+ if (sigaction(SIGPIPE, &action, NULL) == -1)
+ pr_crit("Cannot disable SIGPIPE: %s", strerror(errno));
+}
+
+static void *
+rsync_fast(void *arg)
+{
+ int writefd = *((int *)arg);
+ free(arg);
+
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+
+ close(writefd);
+ return NULL;
+}
+
+static void *
+rsync_stalled(void *arg)
+{
+ int writefd = *((int *)arg);
+ free(arg);
+
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+
+ sleep(5); /* The timeout is 4 seconds */
+
+ ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+
+ close(writefd);
+ return NULL;
+}
+
+static void *
+rsync_drip_feed(void *arg)
+{
+ int writefd = *((int *)arg);
+ free(arg);
+
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ sleep(1);
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ sleep(1);
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ sleep(1);
+ ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ sleep(2);
+ ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+
+ close(writefd);
+ return NULL;
+}
+
+static void
+prepare_test(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *))
+{
+ int *arg;
+
+ ck_assert_int_eq(0, pipe(fds[0]));
+ memset(fds[1], 0, sizeof(fds[1]));
+
+ arg = pmalloc(sizeof(fds[0][1]));
+ *arg = fds[0][1];
+ ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, arg));
+}
+
+static void
+finish_test(int fds[2][2], pthread_t thread)
+{
+ close(fds[0][0]);
+ pthread_join(thread, NULL);
+}
+
+START_TEST(read_pipe_test) /* Tests the read_pipe() function */
+{
+ /*
+ * [0][0] = standard error, read end
+ * [0][1] = standard error, write end
+ * [1][0] = standard output, read end (unused during this test)
+ * [1][1] = standard output, write end (unused during this test)
+ */
+ int fds[2][2];
+ pthread_t rsync_writer;
+
+ printf("This test needs to exhaust some timeouts. Please be patient.\n");
+
+ printf("Normal transfer\n");
+ prepare_test(fds, &rsync_writer, rsync_fast);
+ ck_assert_int_eq(0, read_pipe(fds, 0));
+ finish_test(fds, rsync_writer);
+
+ printf("Stalled transfer\n");
+ prepare_test(fds, &rsync_writer, rsync_stalled);
+ ck_assert_int_eq(1, read_pipe(fds, 0));
+ finish_test(fds, rsync_writer);
+
+ printf("Drip-feed\n");
+ prepare_test(fds, &rsync_writer, rsync_drip_feed);
+ ck_assert_int_eq(1, read_pipe(fds, 0));
+ finish_test(fds, rsync_writer);
+}
+END_TEST
+
+static Suite *xml_load_suite(void)
+{
+ Suite *suite;
+ TCase *pipes;
+
+ pipes = tcase_create("pipes");
+ tcase_add_test(pipes, read_pipe_test);
+ tcase_set_timeout(pipes, 15);
+
+ suite = suite_create("rsync");
+ suite_add_tcase(suite, pipes);
+
+ return suite;
+}
+
+int main(void)
+{
+ Suite *suite;
+ SRunner *runner;
+ int tests_failed;
+
+ disable_sigpipe();
+
+ suite = xml_load_suite();
+
+ runner = srunner_create(suite);
+ srunner_run_all(runner, CK_NORMAL);
+ tests_failed = srunner_ntests_failed(runner);
+ srunner_free(runner);
+
+ return (tests_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}