#include "config.h"
#include "log.h"
+#define STDERR_WRITE(fds) fds[0][1]
+#define STDOUT_WRITE(fds) fds[1][1]
+#define STDERR_READ(fds) fds[0][0]
+#define STDOUT_READ(fds) fds[1][0]
+
/*
* Duplicate parent FDs, to pipe rsync output:
* - fds[0] = stderr
duplicate_fds(int fds[2][2])
{
/* Use the loop to catch interruptions */
- while ((dup2(fds[0][1], STDERR_FILENO) == -1)
+ while ((dup2(STDERR_WRITE(fds), STDERR_FILENO) == -1)
&& (errno == EINTR)) {}
- close(fds[0][1]);
- close(fds[0][0]);
+ close(STDERR_WRITE(fds));
+ close(STDERR_READ(fds));
- while ((dup2(fds[1][1], STDOUT_FILENO) == -1)
+ while ((dup2(STDOUT_WRITE(fds), STDOUT_FILENO) == -1)
&& (errno == EINTR)) {}
- close(fds[1][1]);
- close(fds[1][0]);
+ close(STDOUT_WRITE(fds));
+ close(STDOUT_READ(fds));
}
static void
error = errno;
/* Close pipe previously created */
- close(fds[0][0]);
- close(fds[0][1]);
+ close(STDERR_READ(fds));
+ close(STDERR_WRITE(fds));
pr_op_err_st("Piping rsync stdout: %s", strerror(error));
return -error;
}
static void
-log_buffer(char const *buffer, ssize_t read, int type)
+log_buffer(char const *buffer, ssize_t read, bool is_error)
{
#define PRE_RSYNC "[RSYNC exec]: "
char *cpy, *cur, *tmp;
cur = tmp + 1;
continue;
}
- if (type == 0) {
+ if (is_error)
pr_val_err(PRE_RSYNC "%s", cur);
- } else {
+ else
pr_val_debug(PRE_RSYNC "%s", cur);
- }
cur = tmp + 1;
}
free(cpy);
#undef PRE_RSYNC
}
+#define DROP_FD(f, fail) \
+ do { \
+ pfd[f].fd = -1; \
+ error |= fail; \
+ } while (0)
+#define CLOSE_FD(f, fail) \
+ do { \
+ close(pfd[f].fd); \
+ DROP_FD(f, fail); \
+ } while (0)
+
/*
- * Consumes (and throws away) all the bytes in read stream @fd,
- * then closes it after end of stream.
+ * Consumes (and throws away) all the bytes in read streams @fderr and @fdout,
+ * then closes them once they reach end of stream.
*
* Returns: ok -> 0, error -> 1, timeout -> 2.
*/
static int
-exhaust_read_fd(int fd, int type)
+exhaust_read_fds(int fderr, int fdout)
{
- char buffer[4096];
- ssize_t count;
- struct pollfd pfd[1];
- int error, nready;
+ struct pollfd pfd[2];
+ int error, nready, f;
long epoch, delta, timeout;
memset(&pfd, 0, sizeof(pfd));
- pfd[0].fd = fd;
+ pfd[0].fd = fderr;
pfd[0].events = POLLIN;
+ pfd[1].fd = fdout;
+ pfd[1].events = POLLIN;
+
+ error = 0;
epoch = get_current_millis();
delta = 0;
timeout = 1000 * config_get_rsync_transfer_timeout();
while (1) {
- nready = poll(pfd, 1, timeout - delta);
+ nready = poll(pfd, 2, timeout - delta);
if (nready == 0)
goto timed_out;
if (nready == -1) {
if (error == EINTR)
continue;
pr_val_err("rsync bad poll: %s", strerror(error));
+ error = 1;
goto fail;
}
- if (pfd[0].revents & POLLNVAL) {
- pr_val_err("rsync bad fd: %i", pfd[0].fd);
- return 1; /* Already closed */
- } else if (pfd[0].revents & POLLERR) {
- pr_val_err("Generic error during rsync poll.");
- goto fail;
- } else if (pfd[0].revents & (POLLIN|POLLHUP)) {
- count = read(fd, buffer, sizeof(buffer));
- if (count == -1) {
- error = errno;
- if (error == EINTR)
+
+ for (f = 0; f < 2; f++) {
+ if (pfd[f].revents & POLLNVAL) {
+ pr_val_err("rsync bad fd: %i", pfd[f].fd);
+ DROP_FD(f, 1);
+
+ } else if (pfd[f].revents & POLLERR) {
+ pr_val_err("Generic error during rsync poll.");
+ CLOSE_FD(f, 1);
+
+ } else if (pfd[f].revents & (POLLIN|POLLHUP)) {
+ char buffer[4096];
+ ssize_t count;
+
+ count = read(pfd[f].fd, buffer, sizeof(buffer));
+ if (count == -1) {
+ error = errno;
+ if (error == EINTR)
+ continue;
+ pr_val_err("rsync buffer read error: %s",
+ strerror(error));
+ CLOSE_FD(f, 1);
continue;
- pr_val_err("rsync buffer read error: %s",
- strerror(error));
- goto fail;
- }
- if (count == 0)
- break;
+ }
- log_buffer(buffer, count, type);
+ if (count == 0)
+ CLOSE_FD(f, 0);
+ log_buffer(buffer, count, pfd[f].fd == fderr);
+ }
}
+ if (pfd[0].fd == -1 && pfd[1].fd == -1)
+ return error; /* Happy path! */
+
delta = get_current_millis() - epoch;
if (delta < 0) {
- pr_val_err("This clock does not seem monotonic. I'm going to have to give up this rsync.");
+ pr_val_err("This clock does not seem monotonic. "
+ "I'm going to have to give up this rsync.");
+ error = 1;
goto fail;
}
if (delta >= timeout)
goto timed_out; /* Read took too long */
}
- close(fd); /* Close read end */
- return 0;
-
timed_out:
pr_val_err("rsync transfer timeout reached");
- close(fd);
- return 2;
-
-fail:
- close(fd);
- return 1;
+ error = 2;
+fail: for (f = 0; f < 2; f++)
+ if (pfd[f].fd != -1)
+ close(pfd[f].fd);
+ return error;
}
/*
- * Completely consumes @fd's streams, and closes them.
+ * Completely consumes @fds' streams, and closes them.
*
* Allegedly, this is a portable way to wait for the child process to finish.
* (IIRC, waitpid() doesn't do this reliably.)
static int
exhaust_pipes(int fds[2][2])
{
- int error;
-
- close(fds[0][1]); /* Standard error, write end */
- close(fds[1][1]); /* Standard output, write end */
-
- /* Standard error, read end */
- error = exhaust_read_fd(fds[0][0], 0);
- if (error) {
- close(fds[1][0]);
- return error;
- }
-
- /* Standard output, read end */
- return exhaust_read_fd(fds[1][0], 1);
+ close(STDERR_WRITE(fds));
+ close(STDOUT_WRITE(fds));
+ return exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds));
}
/*
pr_op_err_st("Couldn't fork to execute rsync: %s",
strerror(error));
/* Close all ends from the created pipes */
- close(fork_fds[0][0]);
- close(fork_fds[1][0]);
- close(fork_fds[0][1]);
- close(fork_fds[1][1]);
+ close(STDERR_READ(fork_fds));
+ close(STDOUT_READ(fork_fds));
+ close(STDERR_WRITE(fork_fds));
+ close(STDOUT_WRITE(fork_fds));
goto release_args;
}
static void *
rsync_fast(void *arg)
{
- int writefd = *((int *)arg);
- free(arg);
+ int fds[2][2];
+ memcpy(fds, arg, sizeof(fds));
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
- close(writefd);
+ close(STDERR_WRITE(fds));
+ close(STDOUT_WRITE(fds));
return NULL;
}
static void *
rsync_stalled(void *arg)
{
- int writefd = *((int *)arg);
- free(arg);
+ int fds[2][2];
+ memcpy(fds, arg, sizeof(fds));
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
sleep(5); /* The timeout is 4 seconds */
- ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_ne(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
- close(writefd);
+ close(STDERR_WRITE(fds));
+ close(STDOUT_WRITE(fds));
return NULL;
}
static void *
rsync_drip_feed(void *arg)
{
- int writefd = *((int *)arg);
- free(arg);
+ int fds[2][2];
+ memcpy(fds, arg, sizeof(fds));
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
sleep(1);
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
sleep(1);
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
sleep(1);
- ck_assert_int_eq(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_eq(PKTLEN, write(STDERR_WRITE(fds), PKT, PKTLEN));
sleep(2);
- ck_assert_int_ne(PKTLEN, write(writefd, PKT, PKTLEN));
+ ck_assert_int_ne(PKTLEN, write(STDOUT_WRITE(fds), PKT, PKTLEN));
- close(writefd);
+ close(STDERR_WRITE(fds));
+ close(STDOUT_WRITE(fds));
return NULL;
}
static void
-prepare_test(int fds[2], pthread_t *thread, void *(*rsync_simulator)(void *))
+prepare_test(int fds[2][2], pthread_t *thread, void *(*rsync_simulator)(void *))
{
- int *arg;
-
- ck_assert_int_eq(0, pipe(fds));
-
- arg = pmalloc(sizeof(fds[1]));
- *arg = fds[1];
- ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, arg));
+ ck_assert_int_eq(0, pipe(fds[0]));
+ ck_assert_int_eq(0, pipe(fds[1]));
+ ck_assert_int_eq(0, pthread_create(thread, NULL, rsync_simulator, fds));
}
static void
START_TEST(read_pipe_test) /* Tests the read_pipe() function */
{
- int fds[2];
+ int fds[2][2];
pthread_t rsync_writer;
printf("This test needs to exhaust some timeouts. Please be patient.\n");
printf("Normal transfer\n");
prepare_test(fds, &rsync_writer, rsync_fast);
- ck_assert_int_eq(0, exhaust_read_fd(fds[0], 0));
+ ck_assert_int_eq(0, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
finish_test(rsync_writer);
printf("Stalled transfer\n");
prepare_test(fds, &rsync_writer, rsync_stalled);
- ck_assert_int_eq(2, exhaust_read_fd(fds[0], 0));
+ ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
finish_test(rsync_writer);
printf("Drip-feed\n");
prepare_test(fds, &rsync_writer, rsync_drip_feed);
- ck_assert_int_eq(2, exhaust_read_fd(fds[0], 0));
+ ck_assert_int_eq(2, exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds)));
finish_test(rsync_writer);
}
END_TEST