LIST_ENTRY(rsync_task) lh;
};
-LIST_HEAD(rsync_tasks, rsync_task);
+LIST_HEAD(rsync_task_list, rsync_task);
+
+struct rsync_tasks {
+ struct rsync_task_list active;
+ int a; /* total active */
+
+ struct rsync_task_list queued;
+};
#ifndef LIST_FOREACH_SAFE
#define LIST_FOREACH_SAFE(var, ls, lh, tmp) \
free(task);
if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) {
- pr_op_err("Cannot message parent process: %s", strerror(errno));
+ pr_op_err(RSP "Cannot message parent process: %s", strerror(errno));
close(s2p->wr);
s2p->wr = -1;
}
static void
-finish_task(struct rsync_task *task, struct s2p_socket *s2p)
+finish_task(struct rsync_tasks *tasks, struct rsync_task *task,
+ struct s2p_socket *s2p)
{
LIST_REMOVE(task, lh);
+ tasks->a--;
void_task(task, s2p);
}
}
static struct pollfd *
-create_pfds(int request_fd, struct rsync_tasks *tasks, size_t tn)
+create_pfds(int request_fd, struct rsync_task_list *tasks, size_t tn)
{
struct pollfd *pfds;
struct rsync_task *task;
if (pipe(fds[0]) < 0) {
error = errno;
- pr_op_err_st("Piping rsync stderr: %s", strerror(error));
+ pr_op_err_st(RSP "Piping rsync stderr: %s", strerror(error));
return error;
}
if (pipe(fds[1]) < 0) {
error = errno;
- pr_op_err_st("Piping rsync stdout: %s", strerror(error));
+ pr_op_err_st(RSP "Piping rsync stdout: %s", strerror(error));
close(fds[0][0]);
close(fds[0][1]);
return error;
task->pid = fork();
if (task->pid < 0) {
error = errno;
- pr_op_err_st("Couldn't spawn the rsync process: %s",
+ pr_op_err_st(RSP "Couldn't spawn the rsync process: %s",
strerror(error));
close(STDERR_READ(fork_fds));
close(STDOUT_READ(fork_fds));
return 0;
}
-static unsigned int
-start_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
+static void
+activate_task(struct rsync_tasks *tasks, struct rsync_task *task,
+ struct s2p_socket *s2p, struct timespec *now)
+{
+ ts_add(&task->expiration, now, 1000 * config_rsync_timeout());
+
+ if (fork_rsync(task) != 0) {
+ void_task(task, s2p);
+ return;
+ }
+
+ LIST_INSERT_HEAD(&tasks->active, task, lh);
+ tasks->a++;
+}
+
+static void
+post_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
struct timespec *now)
{
struct rsync_task *task;
task = pzalloc(sizeof(struct rsync_task));
task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size);
task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size);
- ts_add(&task->expiration, now, 1000 * config_get_rsync_transfer_timeout());
- if (fork_rsync(task) != 0) {
- void_task(task, s2p);
- return 0;
+ if (tasks->a >= config_rsync_max()) {
+ LIST_INSERT_HEAD(&tasks->queued, task, lh);
+ pr_op_debug(RSP "Queued new task.");
+ } else {
+ activate_task(tasks, task, s2p, now);
+ pr_op_debug(RSP "Got new task: %d", task->pid);
}
-
- LIST_INSERT_HEAD(tasks, task, lh);
- pr_val_debug(RSP "Got new task: %d", task->pid);
- return 1;
}
-static unsigned int
+static void
read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
struct timespec *now)
{
ssize_t consumed;
size_t offset;
asn_dec_rval_t decres;
- unsigned int t;
int error;
in = &s2p->rd;
- t = 0;
do {
consumed = read(in->fd, in->buffer + in->len,
error = errno;
if (error != EAGAIN && error != EWOULDBLOCK)
rstream_close(in, true);
- return t;
+ return;
}
if (consumed == 0) { /* EOS */
rstream_close(in, true);
- return t;
+ return;
}
in->len += consumed;
offset += decres.consumed;
switch (decres.code) {
case RC_OK:
- t += start_task(s2p, tasks, now);
+ post_task(s2p, tasks, now);
ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr);
break;
case RC_WMORE:
goto break_for;
case RC_FAIL:
rstream_close(in, true);
- return t;
+ return;
}
}
} while (true);
}
-/* Returns number of tasks created */
-static int
+static void
handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd,
struct rsync_tasks *tasks, struct timespec *now)
{
if (s2p->rd.fd == -1)
- return 0;
+ return;
if (pfd->revents & POLLNVAL) {
- pr_val_err(RSP "bad parent fd: %i", pfd->fd);
+ pr_op_err(RSP "bad parent fd: %i", pfd->fd);
rstream_close(&s2p->rd, false);
- return 0;
- }
- if (pfd->revents & POLLERR) {
- pr_val_err(RSP "Generic error during parent fd poll.");
+ } else if (pfd->revents & POLLERR) {
+ pr_op_err(RSP "Generic error during parent fd poll.");
rstream_close(&s2p->rd, true);
- return 0;
- }
-
- if (pfd->revents & (POLLIN | POLLHUP))
- return read_tasks(s2p, tasks, now);
- return 0;
+ } else if (pfd->revents & (POLLIN | POLLHUP)) {
+ read_tasks(s2p, tasks, now);
+ }
}
static void
continue;
}
if (is_error)
- pr_val_err("[RSYNC exec] %s", cur);
+ pr_op_err("[RSYNC exec] %s", cur);
else
- pr_val_debug("[RSYNC exec] %s", cur);
+ pr_op_debug("[RSYNC exec] %s", cur);
cur = tmp + 1;
}
free(cpy);
error = errno;
if (error == EINTR)
return 0; /* Dunno; retry */
- pr_val_err("rsync buffer read error: %s", strerror(error));
+ pr_op_err(RSP "rsync buffer read error: %s", strerror(error));
goto down; /* Error */
}
handle_rsync_fd(struct pollfd *pfd, size_t p)
{
if (pfd->fd == -1) {
- pr_val_debug(RSP "File descriptor already closed.");
+ pr_op_debug(RSP "File descriptor already closed.");
return 1;
}
if (pfd->revents & POLLNVAL) {
- pr_val_err(RSP "rsync bad fd: %i", pfd->fd);
+ pr_op_err(RSP "rsync bad fd: %i", pfd->fd);
return 1;
}
if (pfd->revents & POLLERR) {
- pr_val_err(RSP "Generic error during rsync poll.");
+ pr_op_err(RSP "Generic error during rsync poll.");
close(pfd->fd);
return 1;
}
if (WIFEXITED(status)) {
/* Happy path (but also sad path sometimes) */
error = WEXITSTATUS(status);
- pr_val_debug("%s ended. Result: %d", name, error);
+ pr_op_debug("%s ended. Result: %d", name, error);
return error ? EIO : 0;
}
kill(task->pid, SIGTERM);
}
+static void
+activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p,
+ struct timespec *now)
+{
+ struct rsync_task *task;
+
+ task = LIST_FIRST(&tasks->queued);
+ if (task == NULL)
+ return;
+
+ pr_op_debug(RSP "Activating queued task %s -> %s.",
+ task->url, task->path);
+ LIST_REMOVE(task, lh);
+ activate_task(tasks, task, s2p, now);
+}
+
/* Returns true if the task died. */
static bool
-maybe_expire(struct timespec *now, struct rsync_task *task,
- struct s2p_socket *s2p)
+maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task,
+ struct s2p_socket *s2p, struct timespec *now)
{
struct timespec epoch;
pr_op_debug(RSP "Task %d ran out of time.", task->pid);
kill_subprocess(task);
wait_subprocess("rsync", task->pid);
- finish_task(task, s2p);
+ finish_task(tasks, task, s2p);
+ activate_queued(tasks, s2p, now);
+
return true;
}
struct rsync_tasks tasks;
struct rsync_task *task, *tmp;
- int task_count;
int error;
s2p.wr = response_fd;
s2p.rr = NULL;
- LIST_INIT(&tasks);
- task_count = 0;
+ LIST_INIT(&tasks.active);
+ LIST_INIT(&tasks.queued);
+ tasks.a = 0;
error = 0;
ts_now(&now);
* odd: stdouts
* even > 0: stderrs
*/
- pfds = create_pfds(s2p.rd.fd, &tasks, task_count);
- pfds_count = 2 * task_count + 1;
+ pfds = create_pfds(s2p.rd.fd, &tasks.active, tasks.a);
+ pfds_count = 2 * tasks.a + 1;
expiration.tv_sec = now.tv_sec + 10;
expiration.tv_nsec = now.tv_nsec;
- LIST_FOREACH(task, &tasks, lh)
+ LIST_FOREACH(task, &tasks.active, lh)
if ((ts_cmp(&now, &task->expiration) < 0) &&
(ts_cmp(&task->expiration, &expiration) < 0))
expiration = task->expiration;
timeout = ts_delta(&now, &expiration);
- pr_val_debug(RSP "Timeout decided: %dms", timeout);
+ pr_op_debug(RSP "Timeout decided: %dms", timeout);
events = poll(pfds, pfds_count, timeout);
if (events < 0) {
error = errno;
ts_now(&now);
if (events == 0) { /* Timeout */
- pr_val_debug(RSP "Woke up because of timeout.");
- LIST_FOREACH_SAFE(task, &tasks, lh, tmp)
- task_count -= maybe_expire(&now, task, &s2p);
+ pr_op_debug(RSP "Woke up because of timeout.");
+ LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp)
+ maybe_expire(&tasks, task, &s2p, &now);
goto cont;
}
- pr_val_debug(RSP "Woke up because of input.");
+ pr_op_debug(RSP "Woke up because of input.");
p = 1;
- LIST_FOREACH_SAFE(task, &tasks, lh, tmp) {
- if (maybe_expire(&now, task, &s2p)) {
- task_count--;
+ LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
+ if (maybe_expire(&tasks, task, &s2p, &now))
continue;
- }
if (handle_rsync_fd(&pfds[p], p)) {
- pr_val_debug(RSP "Task %d: Stdout closed.",
+ pr_op_debug(RSP "Task %d: Stdout closed.",
task->pid);
task->stdoutfd = -1;
}
p++;
if (handle_rsync_fd(&pfds[p], p)) {
- pr_val_debug(RSP "Task %d: Stderr closed.",
+ pr_op_debug(RSP "Task %d: Stderr closed.",
task->pid);
task->stderrfd = -1;
}
p++;
if (task->stdoutfd == -1 && task->stderrfd == -1) {
- pr_val_debug(RSP "Both stdout & stderr are closed; ending task %d.",
+ pr_op_debug(RSP "Both stdout & stderr are closed; ending task %d.",
task->pid);
wait_subprocess("rsync", task->pid);
- finish_task(task, &s2p);
- task_count--;
+ finish_task(&tasks, task, &s2p);
+ activate_queued(&tasks, &s2p, &now);
}
}
- task_count += handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
+ handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
cont: free(pfds);
- } while ((s2p.rd.fd != -1 || task_count > 0));
- pr_val_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
+ } while ((s2p.rd.fd != -1 || tasks.a > 0));
+ pr_op_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
- LIST_FOREACH_SAFE(task, &tasks, lh, tmp) {
+ LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
kill_subprocess(task);
wait_subprocess("rsync", task->pid);
- finish_task(task, &s2p);
+ finish_task(&tasks, task, &s2p);
+ }
+ LIST_FOREACH_SAFE(task, &tasks.queued, lh, tmp) {
+ LIST_REMOVE(task, lh);
+ void_task(task, &s2p);
}
rstream_close(&s2p.rd, true);
#include "rsync.c"
#include "stream.c"
-
#include "asn1/asn1c/ber_decoder.c"
#include "asn1/asn1c/ber_tlv_length.c"
#include "asn1/asn1c/ber_tlv_tag.c"
/* Mocks */
MOCK(config_get_rsync_program, char const *, "rsync", void)
-MOCK(config_get_rsync_transfer_timeout, long, 4, void)
+MOCK_UINT(config_rsync_max, 3, void)
+MOCK(config_rsync_timeout, long, 4, void)
MOCK_UINT(config_get_asn1_decode_max_stack, 16 * 1024, void)
MOCK_ABORT_PTR(json_obj_new, json_t, void)
/* Tests */
-static void
-disable_sigpipe(void)
-{
- struct sigaction action = { .sa_handler = SIG_IGN };
- if (sigaction(SIGPIPE, &action, NULL) == -1)
- pr_crit("Cannot disable SIGPIPE: %s", strerror(errno));
-}
-
-static void
-init_content(void)
-{
- size_t i;
-
- if (sizeof(content) % STR64LEN != 0)
- pr_crit("content's length isn't divisible by str64's length");
- for (i = 0; i < (sizeof(content) / STR64LEN); i++)
- memcpy(content + 64 * i, STR64, STR64LEN);
-}
-
-static void
-create_dir(char const *path)
-{
- if (mkdir(path, 0700) < 0)
- ck_assert_int_eq(EEXIST, errno);
-}
-
-static void
-create_file(char const *name, unsigned int kbs)
-{
- FILE *file;
-
- file = fopen(name, "wb");
- ck_assert_ptr_ne(NULL, file);
- ck_assert_int_eq(kbs, fwrite(content, sizeof(content), kbs, file));
- ck_assert_int_eq(0, fclose(file));
-}
-
-static void
-ensure_file_deleted(char const *name)
-{
- int ret;
- int error;
-
- errno = 0;
- ret = unlink(name);
- error = errno;
-
- ck_assert(ret == 0 || error == ENOENT);
-}
-
-static void
-create_rsync_sandbox(void)
-{
- create_dir("tmp");
- create_dir("tmp/rsync");
- create_dir("tmp/rsync/src");
- create_dir("tmp/rsync/dst");
-
- create_file("tmp/rsync/src/a", 1);
- create_file("tmp/rsync/src/b", 1);
- create_file("tmp/rsync/src/c", 1);
-
- ensure_file_deleted("tmp/rsync/dst/a");
- ensure_file_deleted("tmp/rsync/dst/b");
- ensure_file_deleted("tmp/rsync/dst/c");
-}
-
-static void
-diff(char const *file1, char const *file2)
-{
- int fd1, fd2;
- struct read_stream rs1, rs2;
- int read1, read2;
-
- fd1 = open(file1, O_RDONLY, 0);
- ck_assert_int_ne(-1, fd1);
- rstream_init(&rs1, fd1, 1024);
-
- fd2 = open(file2, O_RDONLY, 0);
- ck_assert_int_ne(-1, fd2);
- rstream_init(&rs2, fd2, 1024);
-
- do {
- read1 = rstream_full_read(&rs1, 1024);
- ck_assert_int_ge(read1, 0);
- read2 = rstream_full_read(&rs2, 1024);
- ck_assert_int_eq(read1, read2);
- ck_assert_int_eq(0, memcmp(rs1.buffer, rs2.buffer, read1));
- } while (read1 == 1024);
-
- rstream_close(&rs1, true);
- rstream_close(&rs2, true);
-}
-
-static void
-ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
- char const *url, char const *path)
-{
- struct rsync_task *task;
-
- task = LIST_FIRST(tasks);
- ck_assert_ptr_ne(NULL, task);
- ck_assert_str_eq(url, task->url);
- ck_assert_str_eq(path, task->path);
- finish_task(task, sk);
-}
-
/* Test RsyncRequest decode, feeding as few bytes as possible every time. */
START_TEST(test_decode_extremely_fragmented)
{
}
END_TEST
+static void
+ck_no_tasks(struct rsync_tasks *tasks)
+{
+ ck_assert_int_eq(0, tasks->a);
+ ck_assert(LIST_EMPTY(&tasks->active));
+ ck_assert(LIST_EMPTY(&tasks->queued));
+}
+
+static void
+ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
+ char const *url, char const *path)
+{
+ struct rsync_task *task;
+
+ task = LIST_FIRST(&tasks->active);
+ ck_assert_ptr_ne(NULL, task);
+ ck_assert_str_eq(url, task->url);
+ ck_assert_str_eq(path, task->path);
+ ck_assert(tasks->a > 0);
+ finish_task(tasks, task, sk);
+}
+
static void
encode_request(char const *url, char const *path, unsigned char *buffer)
{
rstream_init(&sk.rd, fds[0], 256);
sk.wr = -1;
sk.rr = NULL;
- LIST_INIT(&tasks);
+ LIST_INIT(&tasks.active);
+ LIST_INIT(&tasks.queued);
+ tasks.a = 0;
ts_now(&now);
printf("Read yields nothing\n");
- ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
+ ck_no_tasks(&tasks);
printf("Read yields less than 1 request\n");
encode_request("111", "2222", bytes); /* 13 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
- ck_assert_uint_eq(0, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
+ ck_no_tasks(&tasks);
ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3));
- ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "111", "2222");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
printf("Read yields 1 request\n");
encode_request("3333", "444", bytes); /* 13 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13));
- ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "3333", "444");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
printf("Read yields 1.5 requests\n");
encode_request("55", "666", bytes); /* 11 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11));
encode_request("777", "88", bytes); /* 11 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5));
- ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "55", "666");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6));
- ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "777", "88");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
printf("Read yields 2 requests\n");
encode_request("9999", "00", bytes); /* 12 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
encode_request("aa", "bbbb", bytes); /* 12 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
- ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "aa", "bbbb");
ck_1st_task(&tasks, &sk, "9999", "00");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
printf("Read yields 2.5 requests\n");
encode_request("cc", "dd", bytes); /* 10 bytes */
encode_request("gggg", "hhhhh", bytes); /* 15 bytes */
ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3));
- ck_assert_uint_eq(2, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "eeeee", "fffff");
ck_1st_task(&tasks, &sk, "cc", "dd");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12));
- ck_assert_uint_eq(1, read_tasks(&sk, &tasks, &now));
+ read_tasks(&sk, &tasks, &now);
ck_1st_task(&tasks, &sk, "gggg", "hhhhh");
- ck_assert(LIST_EMPTY(&tasks));
+ ck_no_tasks(&tasks);
}
END_TEST
+/* Makes sure @count rsyncs finish after roughly @millis milliseconds. */
static void
-wait_rsyncs(unsigned int count)
+wait_rsyncs(unsigned int count, unsigned int millis)
{
- unsigned int done = 0;
-
- do {
- sleep(1);
- done += rsync_finished();
- printf("rsyncs done: %u\n", done);
- } while (done < count);
+ struct timespec req;
+
+ if (millis > 100) {
+ millis -= 100;
+ req.tv_sec = millis / 1000;
+ req.tv_nsec = (millis % 1000) * 1000000;
+ ck_assert_int_eq(0, nanosleep(&req, NULL));
+ ck_assert_uint_eq(0, rsync_finished());
+ }
- ck_assert_uint_eq(count, done);
+ req.tv_sec = 0;
+ req.tv_nsec = 200000000; /* 200ms */
+ ck_assert_int_eq(0, nanosleep(&req, NULL));
+ ck_assert_uint_eq(count, rsync_finished());
}
START_TEST(test_fast_single_rsync)
ck_assert_int_ne(-1, writefd);
ck_assert_int_eq(0, rsync_queue("A", "B"));
- wait_rsyncs(1);
+ wait_rsyncs(1, 0);
rsync_teardown();
}
ck_assert_int_ne(-1, writefd);
ck_assert_int_eq(0, rsync_queue("A", "B"));
- wait_rsyncs(1);
+ wait_rsyncs(1, 3000);
rsync_teardown();
}
ck_assert_int_ne(-1, writefd);
ck_assert_int_eq(0, rsync_queue("A", "B"));
- wait_rsyncs(1);
+ wait_rsyncs(1, 4000); /* 4000 = timeout */
rsync_teardown();
}
ck_assert_int_ne(-1, writefd);
ck_assert_int_eq(0, rsync_queue("A", "B"));
- wait_rsyncs(1);
+ wait_rsyncs(1, 3000);
rsync_teardown();
}
ck_assert_int_ne(-1, writefd);
ck_assert_int_eq(0, rsync_queue("A", "B"));
- wait_rsyncs(1);
+ wait_rsyncs(1, 4000); /* 4000 = timeout */
rsync_teardown();
}
ck_assert_int_ne(-1, writefd);
sleep(2);
+ ck_assert_uint_eq(0, rsync_finished());
rsync_teardown();
}
START_TEST(test_simultaneous_rsyncs)
{
- create_rsync_sandbox();
- /* Note... --bwlimit does not seem to exist in openrsync */
- rsync_setup("rsync", "--bwlimit=1K", "-vvv", NULL);
+ rsync_setup("resources/rsync/simultaneous.sh", NULL);
ck_assert_int_ne(-1, readfd);
ck_assert_int_ne(-1, writefd);
- ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/a", "tmp/rsync/dst/a"));
- ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/b", "tmp/rsync/dst/b"));
- ck_assert_int_eq(0, rsync_queue("tmp/rsync/src/c", "tmp/rsync/dst/c"));
- wait_rsyncs(3);
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
+ ck_assert_int_eq(0, rsync_queue("C", "D"));
+ ck_assert_int_eq(0, rsync_queue("E", "F"));
+
+ wait_rsyncs(3, 1000);
rsync_teardown();
+}
+END_TEST
- diff("tmp/rsync/src/a", "tmp/rsync/dst/a");
- diff("tmp/rsync/src/b", "tmp/rsync/dst/b");
- diff("tmp/rsync/src/c", "tmp/rsync/dst/c");
+START_TEST(test_queued_rsyncs)
+{
+ rsync_setup("resources/rsync/queued.sh", NULL);
+ ck_assert_int_ne(-1, readfd);
+ ck_assert_int_ne(-1, writefd);
+
+
+ ck_assert_int_eq(0, rsync_queue("A", "B"));
+ ck_assert_int_eq(0, rsync_queue("C", "D"));
+ ck_assert_int_eq(0, rsync_queue("E", "F"));
+ ck_assert_int_eq(0, rsync_queue("G", "H"));
+
+ wait_rsyncs(3, 2000);
+ wait_rsyncs(1, 2000);
+
+ rsync_teardown();
}
END_TEST
tcase_add_test(p2s, test_read_tasks);
s2r = tcase_create("spawner-rsync channel");
- tcase_add_test(p2s, test_fast_single_rsync);
- tcase_add_test(p2s, test_stalled_single_rsync);
- tcase_add_test(p2s, test_stalled_single_rsync_timeout);
- tcase_add_test(p2s, test_dripfeed_single_rsync);
- tcase_add_test(p2s, test_dripfeed_single_rsync_timeout);
- tcase_set_timeout(p2s, 6);
+ tcase_add_test(s2r, test_fast_single_rsync);
+ tcase_add_test(s2r, test_stalled_single_rsync);
+ tcase_add_test(s2r, test_stalled_single_rsync_timeout);
+ tcase_add_test(s2r, test_dripfeed_single_rsync);
+ tcase_add_test(s2r, test_dripfeed_single_rsync_timeout);
+ tcase_set_timeout(s2r, 6);
spawner = tcase_create("spawner");
tcase_add_test(spawner, test_no_rsyncs);
tcase_add_test(spawner, test_simultaneous_rsyncs);
- tcase_set_timeout(spawner, 6);
+ tcase_add_test(spawner, test_queued_rsyncs);
+ tcase_set_timeout(spawner, 5);
suite = suite_create("rsync");
suite_add_tcase(suite, p2s);
return suite;
}
+static void
+disable_sigpipe(void)
+{
+ struct sigaction action = { .sa_handler = SIG_IGN };
+ if (sigaction(SIGPIPE, &action, NULL) == -1)
+ pr_crit("Cannot disable SIGPIPE: %s", strerror(errno));
+}
+
+static void
+init_content(void)
+{
+ size_t i;
+
+ if (sizeof(content) % STR64LEN != 0)
+ pr_crit("content's length isn't divisible by str64's length");
+ for (i = 0; i < (sizeof(content) / STR64LEN); i++)
+ memcpy(content + 64 * i, STR64, STR64LEN);
+}
+
int
main(void)
{