return st;
}
+char *
+OCTET_STRING_toString(OCTET_STRING_t *os)
+{
+ return pstrndup((char *)os->buf, os->size);
+}
+
json_t *
OCTET_STRING_to_json(const asn_TYPE_descriptor_t *td, OCTET_STRING_t const *ber)
{
OCTET_STRING_t *OCTET_STRING_new_fromBuf(const asn_TYPE_descriptor_t *td,
const char *str, int size);
+char *OCTET_STRING_toString(OCTET_STRING_t *);
json_t *OCTET_STRING_to_json(const asn_TYPE_descriptor_t *,
OCTET_STRING_t const *);
dl_rsync(struct cache_node *module)
{
int error;
-
- // XXX go pick some other task on zero
error = rsync_queue(module->map.url, module->map.path);
- if (error)
- return error;
-
- module->success_ts = module->attempt_ts;
- return 0;
+ return error ? error : EBUSY;
}
static int
mutex_unlock(&commits_lock);
}
+void
+rsync_finished(char const *url, char const *path)
+{
+ struct cache_node *node;
+
+ mutex_lock(&cache.rsync.lock);
+
+ node = find_node(&cache.rsync, url, strlen(url));
+ if (node == NULL) {
+ mutex_unlock(&cache.rsync.lock);
+ pr_op_err("rsync '%s -> %s' finished, but cache node does not exist.",
+ url, path);
+ return;
+ }
+ if (node->state != DLS_ONGOING)
+ pr_op_warn("rsync '%s -> %s' finished, but existing node was not in ONGOING state.",
+ url, path);
+
+ node->state = DLS_FRESH;
+ node->dlerr = 0;
+ node->success_ts = node->attempt_ts;
+ mutex_unlock(&cache.rsync.lock);
+
+ task_wakeup_dormants();
+}
+
char const *
cage_rpkiNotify(struct cache_cage *cage)
{
#define DBG_COLOR_RESET "\x1B[0m"
#define PR_DEBUG \
printf(DBG_COLOR "%s:%d (%s())" DBG_COLOR_RESET "\n", \
- __FILE__, __LINE__, __func__)
+ __FILE__, __LINE__, __func__)
#define PR_DEBUG_MSG(msg, ...) \
printf(DBG_COLOR "%s:%d (%s()): " msg DBG_COLOR_RESET "\n", \
- __FILE__, __LINE__, __func__, ##__VA_ARGS__)
+ __FILE__, __LINE__, __func__, ##__VA_ARGS__)
#endif /* SRC_LOG_H_ */
int error;
/* Initializations */
+ /* (Do not start any threads until after rsync_setup() has forked.) */
error = log_setup();
if (error)
if (error)
goto revert_log;
- rsync_setup(NULL, NULL); /* Spawn rsync spawner ASAP */
+ rsync_setup(NULL, NULL); /* Fork rsync spawner ASAP */
register_signal_handlers();
error = thvar_init();
{
char **url;
struct cache_mapping map;
+ int error;
ARRAYLIST_FOREACH(&tal->urls, url) {
map.url = *url;
if (!url_is_protocol(map.url))
continue;
- // XXX if this is rsync, it seems this will queue and fail
map.path = get_path(*url);
if (!map.path)
continue;
- if (validate_ta(tal, &map) != 0)
+ error = validate_ta(tal, &map);
+ if (error == EBUSY)
+ return EBUSY;
+ if (error)
continue;
cache_commit_file(&map);
return 0;
goto end1;
/* Online attempts */
- if (try_urls(&tal, url_is_https, cache_refresh_by_url) == 0)
+ error = try_urls(&tal, url_is_https, cache_refresh_by_url);
+ if (!error || error == EBUSY)
goto end2;
- if (try_urls(&tal, url_is_rsync, cache_refresh_by_url) == 0)
+ error = try_urls(&tal, url_is_rsync, cache_refresh_by_url);
+ if (!error || error == EBUSY)
goto end2;
/* Offline fallback attempts */
- if (try_urls(&tal, url_is_https, cache_get_fallback) == 0)
+ error = try_urls(&tal, url_is_https, cache_get_fallback);
+ if (!error || error == EBUSY)
goto end2;
- if (try_urls(&tal, url_is_rsync, cache_get_fallback) == 0)
+ error = try_urls(&tal, url_is_rsync, cache_get_fallback);
+ if (!error || error == EBUSY)
goto end2;
pr_op_err("None of the TAL URIs yielded a successful traversal.");
}
break;
case VTT_TAL:
- if (traverse_tal(task->u.tal) != 0)
+ switch (traverse_tal(task->u.tal)) {
+ case 0:
+ break;
+ case EBUSY:
+ task_requeue_dormant(task);
+ task = NULL;
+ break;
+ default:
task_stop();
+ }
break;
}
#include "config.h"
#include "log.h"
#include "types/array.h"
+#include "types/map.h"
#include "asn1/asn1c/ber_decoder.h"
#include "asn1/asn1c/der_encoder.h"
#include "asn1/asn1c/RsyncRequest.h"
#define RSP /* rsync spawner prefix */ "[rsync spawner] "
+#define SRTP "[spawner response thread] "
static char const *rsync_args[20]; /* Last must be NULL */
static pid_t spawner; /* The subprocess that spawns rsync runs */
-static int readfd; /* Parent's end of the spawner-to-parent pipe */
-static pthread_mutex_t readlock = PTHREAD_MUTEX_INITIALIZER;
-
-static int writefd; /* Parent's end of the parent-to-spawner pipe */
-static pthread_mutex_t writelock = PTHREAD_MUTEX_INITIALIZER;
-
/*
- * "Spawner to parent" socket.
- * Socket used by the spawner to communicate with the parent.
+ * "Parent-spawner socket."
+ * Used by both parent and spawner to speak with each other.
*/
-struct s2p_socket {
- /* Spawner's end of the parent-to-spawner pipe */
- struct read_stream rd;
- /* Spawner's end of the spawner-to-parent pipe */
- int wr;
- /* Scratchpad buffer for stream read */
- struct RsyncRequest *rr;
-};
+struct {
+ struct read_stream rd; /* Read fd and extra tools */
+ int wr; /* Write fd */
+
+ struct RsyncRequest *rr; /* Scratchpad buffer for stream read */
+ pthread_mutex_t wrlock; /* To sync writes */
+} pssk;
struct rsync_task {
int pid;
)
#endif
+/* Spawner response thread; The thread that listens to spawner responses. */
+static pthread_t srt;
+
static void
-void_task(struct rsync_task *task, struct s2p_socket *s2p)
+__spsk_init(int rdfd, int wrfd)
{
- static const unsigned char one = 1;
+ rstream_init(&pssk.rd, rdfd, 512);
+ pssk.wr = wrfd;
+ pssk.rr = NULL;
+ pthread_mutex_init(&pssk.wrlock, NULL);
+}
- free(task->url);
- free(task->path);
- free(task);
+static void
+spsk_init(int rdpipes[2], int wrpipes[2])
+{
+ close(wrpipes[RDFD]);
+ close(rdpipes[WRFD]);
+ __spsk_init(rdpipes[RDFD], wrpipes[WRFD]);
+}
- if (s2p->wr != -1 && write(s2p->wr, &one, 1) < 0) {
- pr_op_err(RSP "Cannot message parent process: %s", strerror(errno));
+static void
+spsk_cleanup(void)
+{
+ rstream_close(&pssk.rd, true);
+ if (pssk.wr != -1) {
+ close(pssk.wr);
+ pssk.wr = -1;
+ }
+ ASN_STRUCT_FREE(asn_DEF_RsyncRequest, pssk.rr);
+ pthread_mutex_destroy(&pssk.wrlock);
+}
+
+static int
+write_cb(const void *buffer, size_t size, void *arg)
+{
+ return stream_full_write(pssk.wr, buffer, size);
+}
+
+static void
+notify_parent(struct rsync_task *task)
+{
+ struct RsyncRequest req;
+ asn_enc_rval_t result;
- close(s2p->wr);
- s2p->wr = -1;
- /* Can't signal finished rsyncs anymore; reject future ones. */
- rstream_close(&s2p->rd, true);
+ if (pssk.wr == -1) {
+ pr_op_err(RSP "Cannot message parent process: "
+ "The socket is closed.");
+ return;
+ }
+
+ /*
+ * TODO (asn1) these error messages are too generic.
+ * The asn1 code needs better error reporting.
+ */
+
+ if (RsyncRequest_init(&req, task->url, task->path) < 0) {
+ pr_op_err(RSP "Cannot message parent process: "
+ "The request object cannot be created");
+ return;
+ }
+
+ result = der_encode(&asn_DEF_RsyncRequest, &req, write_cb, NULL);
+ if (result.encoded == -1) {
+ pr_op_err(RSP "Cannot message parent process: Unknown error");
+ /* TODO (asn1) Do this if the error was I/O:
+ * close(spsk.wr);
+ * spsk.wr = -1;
+ * // Can't signal finished rsyncs anymore; reject future ones.
+ * rstream_close(&spsk.rd, true);
+ */
}
+
+ pr_op_debug(RSP "Parent notified; sent %zd bytes.", result.encoded);
+ ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req);
+}
+
+static void
+void_task(struct rsync_task *task)
+{
+ notify_parent(task);
+
+ free(task->url);
+ free(task->path);
+ free(task);
}
static void
-finish_task(struct rsync_tasks *tasks, struct rsync_task *task,
- struct s2p_socket *s2p)
+finish_task(struct rsync_tasks *tasks, struct rsync_task *task)
{
LIST_REMOVE(task, lh);
tasks->a--;
- void_task(task, s2p);
+ void_task(task);
}
static void
static void
activate_task(struct rsync_tasks *tasks, struct rsync_task *task,
- struct s2p_socket *s2p, struct timespec *now)
+ struct timespec *now)
{
ts_add(&task->expiration, now, 1000 * config_rsync_timeout());
if (fork_rsync(task) != 0) {
- void_task(task, s2p);
+ void_task(task);
return;
}
}
static void
-post_task(struct s2p_socket *s2p, struct rsync_tasks *tasks,
+post_task(struct cache_mapping *map, struct rsync_tasks *tasks,
struct timespec *now)
{
struct rsync_task *task;
task = pzalloc(sizeof(struct rsync_task));
- task->url = pstrndup((char *)s2p->rr->url.buf, s2p->rr->url.size);
- task->path = pstrndup((char *)s2p->rr->path.buf, s2p->rr->path.size);
+ task->url = map->url;
+ task->path = map->path;
if (tasks->a >= config_rsync_max()) {
LIST_INSERT_HEAD(&tasks->queued, task, lh);
pr_op_debug(RSP "Queued new task.");
} else {
- activate_task(tasks, task, s2p, now);
+ activate_task(tasks, task, now);
pr_op_debug(RSP "Got new task: %d", task->pid);
}
}
-static void
-read_tasks(struct s2p_socket *s2p, struct rsync_tasks *tasks,
- struct timespec *now)
+static int
+next_task(struct cache_mapping *result)
{
- struct read_stream *in;
- ssize_t consumed;
- size_t offset;
asn_dec_rval_t decres;
+ ssize_t consumed;
int error;
- in = &s2p->rd;
-
- do {
- consumed = read(in->fd, in->buffer + in->len,
- in->capacity - in->len);
- if (consumed < 0) {
- error = errno;
- if (error != EAGAIN && error != EWOULDBLOCK)
- rstream_close(in, true);
- return;
- }
- if (consumed == 0) { /* EOS */
- rstream_close(in, true);
- return;
+again: if (pssk.rd.len > 0) {
+ decres = ber_decode(&asn_DEF_RsyncRequest, (void **)&pssk.rr,
+ pssk.rd.buffer, pssk.rd.len);
+
+ memmove(pssk.rd.buffer, pssk.rd.buffer + decres.consumed,
+ pssk.rd.len - decres.consumed);
+ pssk.rd.len -= decres.consumed;
+
+ switch (decres.code) {
+ case RC_OK:
+ result->url = OCTET_STRING_toString(&pssk.rr->url);
+ result->path = OCTET_STRING_toString(&pssk.rr->path);
+ ASN_STRUCT_RESET(asn_DEF_RsyncRequest, pssk.rr);
+ return 0;
+ case RC_WMORE:
+ break;
+ case RC_FAIL:
+ rstream_close(&pssk.rd, true);
+ return EINVAL;
}
+ }
- in->len += consumed;
-
- for (offset = 0; offset < in->len;) {
- decres = ber_decode(&asn_DEF_RsyncRequest,
- (void **)&s2p->rr,
- in->buffer + offset, in->len - offset);
- offset += decres.consumed;
- switch (decres.code) {
- case RC_OK:
- post_task(s2p, tasks, now);
- ASN_STRUCT_RESET(asn_DEF_RsyncRequest, s2p->rr);
- break;
- case RC_WMORE:
- goto break_for;
- case RC_FAIL:
- rstream_close(in, true);
- return;
- }
- }
+ if (pssk.rd.fd == -1)
+ return EBADF;
+ consumed = read(pssk.rd.fd, pssk.rd.buffer + pssk.rd.len,
+ pssk.rd.capacity - pssk.rd.len);
+ if (consumed < 0) {
+ error = errno;
+ if (error != EAGAIN && error != EWOULDBLOCK)
+ rstream_close(&pssk.rd, true);
+ return error;
+ }
+ if (consumed == 0) { /* EOS */
+ rstream_close(&pssk.rd, true);
+ return ENOENT;
+ }
-break_for: if (offset > in->len)
- pr_crit("read_tasks off:%zu len:%zu", offset, in->len);
- in->len -= offset;
- memmove(in->buffer, in->buffer + offset, in->len);
- } while (true);
+ pssk.rd.len += consumed;
+ goto again;
}
static void
-handle_parent_fd(struct s2p_socket *s2p, struct pollfd *pfd,
- struct rsync_tasks *tasks, struct timespec *now)
+handle_parent_fd(struct pollfd *pfd, struct rsync_tasks *tasks,
+ struct timespec *now)
{
- if (s2p->rd.fd == -1)
+ struct cache_mapping map;
+
+ if (pssk.rd.fd == -1)
return;
if (pfd->revents & POLLNVAL) {
pr_op_err(RSP "bad parent fd: %i", pfd->fd);
- rstream_close(&s2p->rd, false);
+ rstream_close(&pssk.rd, false);
} else if (pfd->revents & POLLERR) {
pr_op_err(RSP "Generic error during parent fd poll.");
- rstream_close(&s2p->rd, true);
+ rstream_close(&pssk.rd, true);
} else if (pfd->revents & (POLLIN | POLLHUP)) {
- read_tasks(s2p, tasks, now);
+ while (next_task(&map) == 0)
+ post_task(&map, tasks, now);
}
}
}
static void
-activate_queued(struct rsync_tasks *tasks, struct s2p_socket *s2p,
- struct timespec *now)
+activate_queued(struct rsync_tasks *tasks, struct timespec *now)
{
struct rsync_task *task;
pr_op_debug(RSP "Activating queued task %s -> %s.",
task->url, task->path);
LIST_REMOVE(task, lh);
- activate_task(tasks, task, s2p, now);
+ activate_task(tasks, task, now);
}
/* Returns true if the task died. */
static bool
maybe_expire(struct rsync_tasks *tasks, struct rsync_task *task,
- struct s2p_socket *s2p, struct timespec *now)
+ struct timespec *now)
{
struct timespec epoch;
pr_op_debug(RSP "Task %d ran out of time.", task->pid);
kill_subprocess(task);
wait_subprocess("rsync", task->pid);
- finish_task(tasks, task, s2p);
- activate_queued(tasks, s2p, now);
+ finish_task(tasks, task);
+ activate_queued(tasks, now);
return true;
}
static int
-spawner_run(
- int request_fd, /* Requests from parent to us (spawner) */
- int response_fd /* Responses from us (spawner) to parent */
-) {
- struct s2p_socket s2p; /* Channel to parent */
-
+spawner_run(void)
+{
struct pollfd *pfds; /* Channels to children */
size_t p, pfds_count;
struct timespec now, expiration;
int error;
- rstream_init(&s2p.rd, request_fd, 1024);
- s2p.wr = response_fd;
- s2p.rr = NULL;
-
LIST_INIT(&tasks.active);
LIST_INIT(&tasks.queued);
tasks.a = 0;
* odd: stdouts
* even > 0: stderrs
*/
- pfds = create_pfds(s2p.rd.fd, &tasks.active, tasks.a);
+ pfds = create_pfds(pssk.rd.fd, &tasks.active, tasks.a);
pfds_count = 2 * tasks.a + 1;
expiration.tv_sec = now.tv_sec + 10;
expiration.tv_nsec = now.tv_nsec;
if (events == 0) { /* Timeout */
pr_op_debug(RSP "Woke up because of timeout.");
LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp)
- maybe_expire(&tasks, task, &s2p, &now);
+ maybe_expire(&tasks, task, &now);
goto cont;
}
pr_op_debug(RSP "Woke up because of input.");
p = 1;
LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
- if (maybe_expire(&tasks, task, &s2p, &now))
+ if (maybe_expire(&tasks, task, &now))
continue;
if (handle_rsync_fd(&pfds[p], p)) {
pr_op_debug(RSP "Both stdout & stderr are closed; ending task %d.",
task->pid);
wait_subprocess("rsync", task->pid);
- finish_task(&tasks, task, &s2p);
- activate_queued(&tasks, &s2p, &now);
+ finish_task(&tasks, task);
+ activate_queued(&tasks, &now);
}
}
- handle_parent_fd(&s2p, &pfds[0], &tasks, &now);
+ handle_parent_fd(&pfds[0], &tasks, &now);
cont: free(pfds);
- } while ((s2p.rd.fd != -1 || tasks.a > 0));
+ } while ((pssk.rd.fd != -1 || tasks.a > 0));
pr_op_debug(RSP "The parent stream is closed and there are no rsync tasks running. Cleaning up...");
LIST_FOREACH_SAFE(task, &tasks.active, lh, tmp) {
kill_subprocess(task);
wait_subprocess("rsync", task->pid);
- finish_task(&tasks, task, &s2p);
+ finish_task(&tasks, task);
}
LIST_FOREACH_SAFE(task, &tasks.queued, lh, tmp) {
LIST_REMOVE(task, lh);
- void_task(task, &s2p);
+ void_task(task);
}
- rstream_close(&s2p.rd, true);
- if (s2p.wr != -1)
- close(s2p.wr);
+ spsk_cleanup();
free_rpki_config();
log_teardown();
if (pipe(fds) < 0) {
error = errno;
- pr_op_err("Cannot create pipe: %s", strerror(error));
+ pr_op_warn("Cannot create pipe: %s", strerror(error));
return error;
}
flags = fcntl(fds[RDFD], F_GETFL);
if (flags < 0) {
error = errno;
- pr_op_err("Cannot retrieve pipe flags: %s", strerror(error));
+ pr_op_warn("Cannot retrieve pipe flags: %s", strerror(error));
goto cancel;
}
if (fcntl(fds[RDFD], F_SETFL, flags | O_NONBLOCK) < 0) {
error = errno;
- pr_op_err("Cannot enable O_NONBLOCK: %s", strerror(error));
+ pr_op_warn("Cannot enable O_NONBLOCK: %s", strerror(error));
goto cancel;
}
return error;
}
+static void *
+rcv_spawner_responses(void *arg)
+{
+ struct cache_mapping map = { 0 };
+
+ while (next_task(&map) == 0) {
+ rsync_finished(map.url, map.path);
+ map_cleanup(&map);
+ }
+
+ return NULL;
+}
+
void
rsync_setup(char const *program, ...)
{
va_list args;
array_index i;
char const *arg;
+ int error;
if (program != NULL) {
rsync_args[0] = arg = program;
if (nonblock_pipe(parent2spawner) != 0)
goto fail1;
- if (nonblock_pipe(spawner2parent) != 0)
+ if (pipe(spawner2parent) < 0) {
+ pr_op_warn("Cannot create pipe: %s", strerror(errno));
goto fail2;
+ }
fflush(stdout);
fflush(stderr);
spawner = fork();
if (spawner < 0) {
- pr_op_err("Cannot fork rsync spawner: %s", strerror(errno));
+ pr_op_warn("Cannot fork rsync spawner: %s", strerror(errno));
goto fail3;
}
if (spawner == 0) { /* Client code */
- close(parent2spawner[WRFD]);
- close(spawner2parent[RDFD]);
- exit(spawner_run(parent2spawner[RDFD], spawner2parent[WRFD]));
+ spsk_init(parent2spawner, spawner2parent);
+ exit(spawner_run());
}
/* Parent code */
- close(parent2spawner[RDFD]);
- close(spawner2parent[WRFD]);
- readfd = spawner2parent[RDFD];
- writefd = parent2spawner[WRFD];
+ /* (Threads can now be spawned.) */
+
+ spsk_init(spawner2parent, parent2spawner);
+
+ error = pthread_create(&srt, NULL, rcv_spawner_responses, NULL);
+ if (error) {
+ pr_op_warn("Cannot start rsync spawner listener thread: %s",
+ strerror(error));
+ spsk_cleanup();
+ goto fail1;
+ }
+
return;
fail3: close(spawner2parent[RDFD]);
fail2: close(parent2spawner[RDFD]);
close(parent2spawner[WRFD]);
fail1: pr_op_warn("rsync will not be available.");
- readfd = writefd = -1;
-}
-
-static int
-send_to_spawner(const void *buffer, size_t size, void *arg)
-{
- return stream_full_write(writefd, buffer, size);
+ pssk.rd.fd = pssk.wr = -1;
}
-/* Queues rsync; doesn't wait. Call rsync_finished() later. */
+/*
+ * Queues rsync; doesn't wait.
+ *
+ * Whenever at least one rsync is finished, the function rsync_finished()
+ * will be automatically called.
+ */
int
rsync_queue(char const *url, char const *path)
{
if (RsyncRequest_init(&req, url, path) < 0)
return EINVAL;
- mutex_lock(&writelock);
+ mutex_lock(&pssk.wrlock);
- if (writefd == -1) {
+ if (pssk.wr == -1) {
error = EIO;
goto end;
}
- result = der_encode(&asn_DEF_RsyncRequest, &req, send_to_spawner, NULL);
+ result = der_encode(&asn_DEF_RsyncRequest, &req, write_cb, NULL);
if (result.encoded == -1) {
- close(writefd);
- writefd = -1;
+ close(pssk.wr);
+ pssk.wr = -1;
error = EIO;
goto end;
}
error = 0;
-end: mutex_unlock(&writelock);
+end: mutex_unlock(&pssk.wrlock);
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_RsyncRequest, &req);
return error;
}
-/* Returns the number of rsyncs that have ended since the last query */
-unsigned int
-rsync_finished(void)
-{
- unsigned char buf[8];
- ssize_t result;
-
- mutex_lock(&readlock);
- result = (readfd != -1) ? read(readfd, buf, sizeof(buf)) : 0;
- mutex_unlock(&readlock);
-
- return (result >= 0) ? result : 0;
-}
-
void
rsync_teardown(void)
{
- if (readfd != -1)
- close(readfd);
- if (writefd != -1)
- close(writefd);
- readfd = writefd = -1;
+ spsk_cleanup();
wait_subprocess("rsync spawner", spawner);
}
void rsync_setup(char const *, ...);
int rsync_queue(char const *, char const *);
-unsigned int rsync_finished(void);
+void rsync_finished(char const *, char const *);
void rsync_teardown(void);
#endif /* SRC_RSYNC_RSYNC_H_ */
ck_assert_int_eq(0, system(cmd));
}
+static char *queued_url;
+static char *queued_path;
+
int
rsync_queue(char const *url, char const *path)
{
ck_assert_int_eq(0, mkdir(path, CACHE_FILEMODE));
touch_file(path);
+ queued_url = pstrdup(url);
+ queued_path = pstrdup(path);
+
return 0;
}
return cage;
}
+static void
+finish_rsync(void)
+{
+ rsync_finished(queued_url, queued_path);
+ free(queued_url);
+ free(queued_path);
+}
+
+static struct cache_cage *
+rsync_dance(char *url)
+{
+ ck_assert_ptr_eq(NULL, run_dl_rsync(url, EBUSY, 1));
+ finish_rsync();
+ return run_dl_rsync(url, 0, 0);
+}
+
static void
run_dl_https(char const *url, unsigned int expected_calls,
char const *expected_result)
setup_test();
printf("==== Startup ====\n");
- cage = run_dl_rsync("rsync://a.b.c/d", 0, 1);
+ cage = rsync_dance("rsync://a.b.c/d");
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
* and there would be consequences for violating it.
*/
printf("==== rsync truncated ====\n");
- cage = run_dl_rsync("rsync://x.y.z/m/n/o", 0, 1);
+ cage = rsync_dance("rsync://x.y.z/m/n/o");
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://x.y.z/m", "rsync/1", NULL);
ck_cage(cage, "rsync://x.y.z/m/n/o", "rsync/1/n/o", NULL);
free(cage);
printf("==== Sibling ====\n");
- cage = run_dl_rsync("rsync://a.b.c/e/f", 0, 1);
+ cage = rsync_dance("rsync://a.b.c/e/f");
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://a.b.c/e", "rsync/2", NULL);
ck_cage(cage, "rsync://a.b.c/e/f/x/y/z", "rsync/2/f/x/y/z", NULL);
printf("==== Startup ====\n");
dl_error = 0;
- free(run_dl_rsync("rsync://a.b.c/d", 0, 1));
+ free(rsync_dance("rsync://a.b.c/d"));
dl_error = EINVAL;
ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 1));
ck_cache_rsync(nodes);
* but does not provide RRDP as an option.
*/
sias.rpkiNotify = NULL;
+
+ ck_assert_int_eq(EBUSY, cache_refresh_by_sias(&sias, &cage));
+ finish_rsync();
ck_assert_int_eq(0, cache_refresh_by_sias(&sias, &cage));
+
ck_assert_ptr_eq(NULL, cage->rpkiNotify);
ck_assert_str_eq(FILE_RSYNC_PATH, cage_map_file(cage, FILE_URL));
ck_assert_int_eq(false, cage_disable_refresh(cage));
pr_crit(const char *format, ...)
{
va_list args;
- fprintf(stderr, "pr_crit() called!\n");
+ fprintf(stderr, "pr_crit() called! ");
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
#include "mock.c"
#include "rsync.c"
#include "stream.c"
+#include "types/map.c"
#include "asn1/asn1c/ber_decoder.c"
#include "asn1/asn1c/ber_tlv_length.c"
MOCK_ABORT_INT(asn_generic_no_constraint, const asn_TYPE_descriptor_t *td,
const void *strt, asn_app_constraint_failed_f *cb, void *key)
+static struct timespec rsync_request_time;
+static int rsync_expected_duration = -1;
+static int rsyncs_done = 0;
+
+void
+rsync_finished(char const *url, char const *path)
+{
+ struct timespec now;
+ int delta;
+
+ if (rsync_expected_duration == -1)
+ ck_abort_msg("rsync_finished() called, but duration not set.");
+
+ ts_now(&now);
+ delta = ts_delta(&rsync_request_time, &now);
+
+ printf("Callback! rsync finished after %dms.\n", delta);
+ if (rsync_expected_duration < 100)
+ ck_assert_int_le(0, delta);
+ else
+ ck_assert_int_le(rsync_expected_duration - 100, delta);
+ ck_assert_int_lt(delta, rsync_expected_duration + 100);
+
+ rsyncs_done++;
+}
+
/* Tests */
/* Test RsyncRequest decode, feeding as few bytes as possible every time. */
END_TEST
static void
-ck_no_tasks(struct rsync_tasks *tasks)
+ck_no_tasks(void)
{
- ck_assert_int_eq(0, tasks->a);
- ck_assert(LIST_EMPTY(&tasks->active));
- ck_assert(LIST_EMPTY(&tasks->queued));
+ struct cache_mapping map;
+ int error;
+
+ error = next_task(&map);
+ ck_assert(error == EAGAIN || error == EWOULDBLOCK);
}
static void
-ck_1st_task(struct rsync_tasks *tasks, struct s2p_socket *sk,
- char const *url, char const *path)
+ck_next_task(char const *url, char const *path)
{
- struct rsync_task *task;
-
- task = LIST_FIRST(&tasks->active);
- ck_assert_ptr_ne(NULL, task);
- ck_assert_str_eq(url, task->url);
- ck_assert_str_eq(path, task->path);
- ck_assert(tasks->a > 0);
- finish_task(tasks, task, sk);
+ struct cache_mapping map;
+
+ ck_assert_int_eq(0, next_task(&map));
+ ck_assert_str_eq(url, map.url);
+ ck_assert_str_eq(path, map.path);
+
+ map_cleanup(&map);
}
static void
* Tests messy request queuing; in particular, when a single read() yields less
* than one or multiple of them.
*/
-START_TEST(test_read_tasks)
+START_TEST(test_next_task)
{
unsigned char bytes[BUFSIZE];
int fds[2];
- struct s2p_socket sk;
- struct rsync_tasks tasks;
- struct timespec now;
ck_assert_int_eq(0, nonblock_pipe(fds));
- rstream_init(&sk.rd, fds[0], 256);
- sk.wr = -1;
- sk.rr = NULL;
- LIST_INIT(&tasks.active);
- LIST_INIT(&tasks.queued);
- tasks.a = 0;
- ts_now(&now);
+ __spsk_init(fds[RDFD], -1);
printf("Read yields nothing\n");
- read_tasks(&sk, &tasks, &now);
- ck_no_tasks(&tasks);
+ ck_no_tasks();
printf("Read yields less than 1 request\n");
encode_request("111", "2222", bytes); /* 13 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
- read_tasks(&sk, &tasks, &now);
- ck_no_tasks(&tasks);
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 10, 3));
- read_tasks(&sk, &tasks, &now);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 10));
+ ck_no_tasks();
- ck_1st_task(&tasks, &sk, "111", "2222");
- ck_no_tasks(&tasks);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 10, 3));
+ ck_next_task("111", "2222");
+ ck_no_tasks();
printf("Read yields 1 request\n");
encode_request("3333", "444", bytes); /* 13 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 13));
- read_tasks(&sk, &tasks, &now);
- ck_1st_task(&tasks, &sk, "3333", "444");
- ck_no_tasks(&tasks);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 13));
+ ck_next_task("3333", "444");
+ ck_no_tasks();
printf("Read yields 1.5 requests\n");
encode_request("55", "666", bytes); /* 11 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 11));
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 11));
encode_request("777", "88", bytes); /* 11 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 5));
- read_tasks(&sk, &tasks, &now);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 5));
- ck_1st_task(&tasks, &sk, "55", "666");
- ck_no_tasks(&tasks);
+ ck_next_task("55", "666");
+ ck_no_tasks();
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 5, 6));
- read_tasks(&sk, &tasks, &now);
-
- ck_1st_task(&tasks, &sk, "777", "88");
- ck_no_tasks(&tasks);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 5, 6));
+ ck_next_task("777", "88");
+ ck_no_tasks();
printf("Read yields 2 requests\n");
encode_request("9999", "00", bytes); /* 12 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 12));
encode_request("aa", "bbbb", bytes); /* 12 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 12));
- read_tasks(&sk, &tasks, &now);
- ck_1st_task(&tasks, &sk, "aa", "bbbb");
- ck_1st_task(&tasks, &sk, "9999", "00");
- ck_no_tasks(&tasks);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 12));
+
+ ck_next_task("9999", "00");
+ ck_next_task("aa", "bbbb");
+ ck_no_tasks();
printf("Read yields 2.5 requests\n");
encode_request("cc", "dd", bytes); /* 10 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 10));
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 10));
encode_request("eeeee", "fffff", bytes); /* 16 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 16));
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 16));
encode_request("gggg", "hhhhh", bytes); /* 15 bytes */
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes, 3));
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes, 3));
- read_tasks(&sk, &tasks, &now);
- ck_1st_task(&tasks, &sk, "eeeee", "fffff");
- ck_1st_task(&tasks, &sk, "cc", "dd");
- ck_no_tasks(&tasks);
+ ck_next_task("cc", "dd");
+ ck_next_task("eeeee", "fffff");
+ ck_no_tasks();
- ck_assert_int_eq(0, stream_full_write(fds[1], bytes + 3, 12));
- read_tasks(&sk, &tasks, &now);
- ck_1st_task(&tasks, &sk, "gggg", "hhhhh");
- ck_no_tasks(&tasks);
+ ck_assert_int_eq(0, stream_full_write(fds[WRFD], bytes + 3, 12));
+ ck_next_task("gggg", "hhhhh");
+ ck_no_tasks();
+
+ spsk_cleanup();
}
END_TEST
{
struct timespec req;
- if (millis > 100) {
- millis -= 100;
- req.tv_sec = millis / 1000;
- req.tv_nsec = (millis % 1000) * 1000000;
- ck_assert_int_eq(0, nanosleep(&req, NULL));
- ck_assert_uint_eq(0, rsync_finished());
- }
+ printf("Waiting for %u rsyncs after %ums.\n", count, millis);
- req.tv_sec = 0;
- req.tv_nsec = 200000000; /* 200ms */
- ck_assert_int_eq(0, nanosleep(&req, NULL));
- ck_assert_uint_eq(count, rsync_finished());
+ ts_now(&rsync_request_time);
+ rsync_expected_duration = millis;
+ rsyncs_done = 0;
+
+ millis += 100;
+ req.tv_sec = millis / 1000;
+ req.tv_nsec = (millis % 1000) * 1000000;
+ ck_assert_int_eq(0, nanosleep(&req, NULL)); /* Wait for rsync_finished() */
+
+ ck_assert_int_eq(count, rsyncs_done);
+ rsync_expected_duration = -1;
}
START_TEST(test_fast_single_rsync)
{
+ printf("-- test_fast_single_rsync() --\n");
+
rsync_setup("resources/rsync/fast.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
wait_rsyncs(1, 0);
START_TEST(test_stalled_single_rsync)
{
+ printf("-- test_stalled_single_rsync() --\n");
+
rsync_setup("resources/rsync/stalled.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
wait_rsyncs(1, 3000);
START_TEST(test_stalled_single_rsync_timeout)
{
+ printf("-- test_stalled_single_rsync_timeout() --\n");
+
rsync_setup("resources/rsync/stalled-timeout.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
wait_rsyncs(1, 4000); /* 4000 = timeout */
START_TEST(test_dripfeed_single_rsync)
{
+ printf("-- test_dripfeed_single_rsync() --\n");
+
rsync_setup("resources/rsync/drip-feed.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
wait_rsyncs(1, 3000);
START_TEST(test_dripfeed_single_rsync_timeout)
{
+ printf("-- test_dripfeed_single_rsync_timeout() --\n");
+
rsync_setup("resources/rsync/drip-feed-timeout.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
wait_rsyncs(1, 4000); /* 4000 = timeout */
START_TEST(test_no_rsyncs)
{
+ printf("-- test_no_rsyncs() --\n");
+
rsync_setup("rsync", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
- sleep(2);
- ck_assert_uint_eq(0, rsync_finished());
+ wait_rsyncs(0, 2000);
rsync_teardown();
}
START_TEST(test_simultaneous_rsyncs)
{
+ printf("-- test_simultaneous_rsyncs() --\n");
+
rsync_setup("resources/rsync/simultaneous.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
ck_assert_int_eq(0, rsync_queue("C", "D"));
START_TEST(test_queued_rsyncs)
{
- rsync_setup("resources/rsync/queued.sh", NULL);
- ck_assert_int_ne(-1, readfd);
- ck_assert_int_ne(-1, writefd);
+ printf("-- test_queued_rsyncs() --\n");
+ rsync_setup("resources/rsync/queued.sh", NULL);
+ ck_assert_int_ne(-1, pssk.rd.fd);
+ ck_assert_int_ne(-1, pssk.wr);
ck_assert_int_eq(0, rsync_queue("A", "B"));
ck_assert_int_eq(0, rsync_queue("C", "D"));
ck_assert_int_eq(0, rsync_queue("G", "H"));
wait_rsyncs(3, 2000);
- wait_rsyncs(1, 2000);
+ /* 2k minus the 100 extra we slept during the previous wait_rsyncs() */
+ wait_rsyncs(1, 1900);
rsync_teardown();
}
p2s = tcase_create("parent-spawner channel");
tcase_add_test(p2s, test_decode_extremely_fragmented);
- tcase_add_test(p2s, test_read_tasks);
+ tcase_add_test(p2s, test_next_task);
s2r = tcase_create("spawner-rsync channel");
tcase_add_test(s2r, test_fast_single_rsync);