#include "log.h"
#include "rrdp.h"
#include "rsync.h"
+#include "task.h"
#include "types/array.h"
#include "types/path.h"
#include "types/url.h"
#include "types/uthash.h"
+enum dl_state {
+ DLS_OUTDATED = 0, /* Not downloaded yet */
+ DLS_ONGOING, /* Download in progress */
+ DLS_FRESH, /* Download complete */
+};
+
+/*
+ * This is a delicate structure; pay attention.
+ *
+ * During the multithreaded stage of the validation cycle, the entire cache_node
+ * (except @hh) becomes (effectively) constant when @state becomes DLS_FRESH.
+ *
+ * The cache (ie. this module) only hands the node to the validation code
+ * (through cache_cage) when @state becomes DLS_FRESH.
+ *
+ * This is intended to allow the validation code to read the remaining fields
+ * (all except @hh) without having to hold the table mutex.
+ *
+ * C cannot entirely ensure the node remains constant after it's handed outside;
+ * this must be done through careful coding and review.
+ */
struct cache_node {
struct cache_mapping map;
- /* XXX change to boolean? */
- int fresh; /* Refresh already attempted? */
+ enum dl_state state;
int dlerr; /* Result code of recent download attempt */
time_t mtim; /* Last successful download time, or zero */
typedef int (*dl_cb)(struct cache_node *rpp);
+/*
+ * When concurrency is at play, you need @lock to access @nodes and @seq.
+ * @name, @enabled and @download stay constant through the validation.
+ *
+ * @lock also protects the nodes' @state and @hh, which have additional rules.
+ * (See cache_node.)
+ */
struct cache_table {
char *name;
bool enabled;
struct cache_sequence seq;
struct cache_node *nodes; /* Hash Table */
dl_cb download;
+ pthread_mutex_t lock;
};
static struct rpki_cache {
static volatile sig_atomic_t lockfile_owned;
struct cache_cage {
- struct cache_node *refresh;
- struct cache_node *fallback;
+ struct cache_node const *refresh;
+ struct cache_node const *fallback;
};
struct cache_commit {
};
static STAILQ_HEAD(cache_commits, cache_commit) commits = STAILQ_HEAD_INITIALIZER(commits);
+static pthread_mutex_t commits_lock = PTHREAD_MUTEX_INITIALIZER;
#define LOCKFILE ".lock"
#define INDEX_FILE "index.json"
tbl->enabled = enabled;
cseq_init(&tbl->seq, name, false);
tbl->download = dl;
+ panic_on_fail(pthread_mutex_init(&tbl->lock, NULL),
+ "pthread_mutex_init");
}
static void
return 0;
}
+/* Caller must lock @tbl->lock */
static struct cache_node *
find_node(struct cache_table *tbl, char const *url, size_t urlen)
{
return node;
}
-/* @uri is either a caRepository or a rpkiNotify */
-static struct cache_node *
-do_refresh(struct cache_table *tbl, char const *uri)
+/*
+ * @uri is either a caRepository or a rpkiNotify
+ * By contract, only sets @result on return 0.
+ * By contract, @result->state will be DLS_FRESH on return 0.
+ */
+static int
+do_refresh(struct cache_table *tbl, char const *uri, struct cache_node **result)
{
struct cache_node *node;
+ bool downloaded = false;
pr_val_debug("Trying %s (online)...", uri);
if (!tbl->enabled) {
pr_val_debug("Protocol disabled.");
- return NULL;
+ return ESRCH;
}
if (tbl == &cache.rsync) {
char *module = get_rsync_module(uri);
if (module == NULL)
- return NULL;
+ return EINVAL;
+ mutex_lock(&tbl->lock);
node = provide_node(tbl, module);
free(module);
} else {
+ mutex_lock(&tbl->lock);
node = provide_node(tbl, uri);
}
- if (!node)
- return NULL;
+ if (!node) {
+ mutex_unlock(&tbl->lock);
+ return EINVAL;
+ }
+
+ switch (node->state) {
+ case DLS_OUTDATED:
+ node->state = DLS_ONGOING;
+ mutex_unlock(&tbl->lock);
- if (!node->fresh) {
- node->fresh = true;
node->dlerr = tbl->download(node);
+ downloaded = true;
+
+ mutex_lock(&tbl->lock);
+ node->state = DLS_FRESH;
+ break;
+ case DLS_ONGOING:
+ mutex_unlock(&tbl->lock);
+ pr_val_debug("Refresh ongoing.");
+ return EBUSY;
+ case DLS_FRESH:
+ break;
+ default:
+ pr_crit("Unknown node state: %d", node->state);
}
- pr_val_debug(node->dlerr ? "Refresh failed." : "Refresh succeeded.");
- return node;
+ mutex_unlock(&tbl->lock);
+ /* node->state is guaranteed to be DLS_FRESH at this point. */
+
+ if (downloaded) /* Kickstart tasks that fell into DLS_ONGOING */
+ task_wakeup_busy();
+
+ if (node->dlerr != 0) {
+ pr_val_debug("Refresh failed.");
+ return node->dlerr;
+ }
+
+ pr_val_debug("Refresh succeeded.");
+ *result = node;
+ return 0;
}
static struct cache_node *
/* Do not free nor modify the result. */
char *
-cache_refresh_url(char const *url)
+cache_refresh_by_url(char const *url)
{
struct cache_node *node = NULL;
- // XXX mutex
// XXX review result signs
// XXX Normalize @url
if (url_is_https(url))
- node = do_refresh(&cache.https, url);
+ do_refresh(&cache.https, url, &node);
else if (url_is_rsync(url))
- node = do_refresh(&cache.rsync, url);
+ do_refresh(&cache.rsync, url, &node);
- // XXX Maybe strdup path so the caller can't corrupt our string
- return (node && !node->dlerr) ? node->map.path : NULL;
+ return node ? node->map.path : NULL;
}
/* Do not free nor modify the result. */
char *
-cache_fallback_url(char const *url)
+cache_get_fallback(char const *url)
{
struct cache_node *node;
+ /*
+ * The fallback table is read-only until the cleanup.
+ * Mutex not needed here.
+ */
+
pr_val_debug("Trying %s (offline)...", url);
node = find_node(&cache.fallback, url, strlen(url));
* XXX Need to normalize the sias.
* XXX Fallback only if parent is fallback
*/
-struct cache_cage *
-cache_refresh_sias(struct sia_uris *sias)
+int
+cache_refresh_by_sias(struct sia_uris *sias, struct cache_cage **result)
{
- struct cache_cage *cage;
struct cache_node *node;
+ struct cache_cage *cage;
// XXX Make sure somewhere validates rpkiManifest matches caRepository.
- // XXX mutex
// XXX review result signs
// XXX normalize rpkiNotify & caRepository?
- cage = pzalloc(sizeof(struct cache_cage));
- cage->fallback = get_fallback(sias->caRepository);
-
+ /* Try RRDP + optional fallback */
if (sias->rpkiNotify) {
- node = do_refresh(&cache.rrdp, sias->rpkiNotify);
- if (node && !node->dlerr) {
- cage->refresh = node;
- return cage; /* RRDP + optional fallback happy path */
+ switch (do_refresh(&cache.rrdp, sias->rpkiNotify, &node)) {
+ case 0:
+ goto refresh_success;
+ case EBUSY:
+ return EBUSY;
}
}
- node = do_refresh(&cache.rsync, sias->caRepository);
- if (node && !node->dlerr) {
- cage->refresh = node;
- return cage; /* rsync + optional fallback happy path */
+ /* Try rsync + optional fallback */
+ switch (do_refresh(&cache.rsync, sias->caRepository, &node)) {
+ case 0:
+ goto refresh_success;
+ case EBUSY:
+ return EBUSY;
}
- if (cage->fallback == NULL) {
- free(cage);
- return NULL;
- }
+ /* Try fallback only */
+ node = get_fallback(sias->caRepository); /* XXX (test) does this catch notifies? */
+ if (!node)
+ return EINVAL; /* Nothing to work with */
- return cage; /* fallback happy path */
+ *result = cage = pmalloc(sizeof(struct cache_cage));
+ cage->refresh = NULL;
+ cage->fallback = node;
+ return 0;
+
+refresh_success:
+ *result = cage = pmalloc(sizeof(struct cache_cage));
+ cage->refresh = node;
+ cage->fallback = get_fallback(sias->caRepository);
+ return 0;
}
-char const *
-node2file(struct cache_node *node, char const *url)
+static char const *
+node2file(struct cache_node const *node, char const *url)
{
if (node == NULL)
return NULL;
char const *
cage_map_file(struct cache_cage *cage, char const *url)
{
+ /*
+ * Remember: In addition to honoring the consts of cache->refresh and
+ * cache->fallback, anything these structures point to MUST NOT be
+ * modified either.
+ */
+
char const *file;
file = node2file(cage->refresh, url);
bool
cage_disable_refresh(struct cache_cage *cage)
{
+ /*
+ * Remember: In addition to honoring the consts of cache->refresh and
+ * cache->fallback, anything these structures point to MUST NOT be
+ * modified either.
+ */
+
bool enabled = (cage->refresh != NULL);
cage->refresh = NULL;
commit->caRepository = pstrdup(caRepository);
commit->files = rpp->files;
commit->nfiles = rpp->nfiles;
+
+ mutex_lock(&commits_lock);
STAILQ_INSERT_TAIL(&commits, commit, lh);
+ mutex_unlock(&commits_lock);
rpp->files = NULL;
rpp->nfiles = 0;
}
+/* XXX not called */
void
cache_commit_file(struct cache_mapping *map)
{
commit->files[0].url = pstrdup(map->url);
commit->files[0].path = pstrdup(map->path);
commit->nfiles = 1;
+
+ mutex_lock(&commits_lock);
STAILQ_INSERT_TAIL(&commits, commit, lh);
+ mutex_unlock(&commits_lock);
}
static void
return;
printf("\t%s (%s): ", node->map.url, node->map.path);
- if (node->fresh)
- printf("fresh (errcode %d)", node->dlerr);
- else
+ switch (node->state) {
+ case DLS_OUTDATED:
printf("stale");
+ break;
+ case DLS_ONGOING:
+ printf("downloading");
+ break;
+ case DLS_FRESH:
+ printf("fresh (errcode %d)", node->dlerr);
+ break;
+ }
printf("\n");
}
strerror(errno));
}
-freshen: fb->fresh = 1;
+freshen: fb->state = DLS_FRESH;
skip: free(commit->caRepository);
for (i = 0; i < commit->nfiles; i++) {
free(commit->files[i].url);
}
HASH_ITER(hh, cache.fallback.nodes, fb, tmp) {
- if (fb->fresh)
+ if (fb->state == DLS_FRESH)
continue;
/*
void sias_init(struct sia_uris *);
void sias_cleanup(struct sia_uris *);
-char *cache_refresh_url(char const *);
-char *cache_fallback_url(char const *);
+char *cache_refresh_by_url(char const *);
+char *cache_get_fallback(char const *);
struct cache_cage;
-struct cache_cage *cache_refresh_sias(struct sia_uris *);
+int cache_refresh_by_sias(struct sia_uris *, struct cache_cage **);
char const *cage_map_file(struct cache_cage *, char const *);
bool cage_disable_refresh(struct cache_cage *);
void cache_commit_rpp(char const *, struct rpp *);
panic_on_fail(int error, char const *function_name)
{
if (error)
- pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
+ pr_crit("%s() returned error code %d. "
+ "This is too critical for a graceful recovery; "
+ "I must die now.",
function_name, error);
}
unsigned int queued;
int error;
- error = certificate_validate(ca);
- if (error)
- return error;
+ if (!ca->x509) {
+ error = certificate_validate(ca);
+ if (error)
+ return error;
- if (ca->type != CERTYPE_TA && ca->type != CERTYPE_CA)
- return 0;
+ if (ca->type != CERTYPE_TA && ca->type != CERTYPE_CA)
+ return 0;
+ } /* else "we already did this, and returned EBUSY" */
- cage = cache_refresh_sias(&ca->sias);
- if (!cage)
+ switch (cache_refresh_by_sias(&ca->sias, &cage)) {
+ case 0:
+ break;
+ case EBUSY:
+ return EBUSY;
+ default:
return pr_val_err("caRepository '%s' could not be refreshed, "
"and there is no fallback in the cache. "
"I'm going to have to skip it.", ca->sias.caRepository);
+ }
retry: mft = cage_map_file(cage, ca->sias.rpkiManifest);
if (!mft) {
/* Online attempts */
ARRAYLIST_FOREACH(&tal.urls, url) {
map.url = *url;
- map.path = cache_refresh_url(*url);
+ map.path = cache_refresh_by_url(*url);
if (!map.path)
continue;
if (validate_ta(&tal, &map) != 0)
/* Offline fallback attempts */
ARRAYLIST_FOREACH(&tal.urls, url) {
map.url = *url;
- map.path = cache_fallback_url(*url);
+ map.path = cache_get_fallback(*url);
if (!map.path)
continue;
if (validate_ta(&tal, &map) != 0)
{
struct validation_task *task = NULL;
- while ((task = task_dequeue(task)) != NULL)
- certificate_traverse(task->ca);
+ while ((task = task_dequeue(task)) != NULL) {
+ if (certificate_traverse(task->ca) == EBUSY) {
+ task_requeue_busy(task);
+ task = NULL;
+ }
+ }
return NULL;
}
perform_standalone_validation(void)
{
pthread_t threads[5]; // XXX variabilize
- unsigned int ids[5];
array_index t, t2;
int error;
goto end;
for (t = 0; t < 5; t++) {
- ids[t] = t;
- error = pthread_create(&threads[t], NULL, pick_up_work, &ids[t]);
+ error = pthread_create(&threads[t], NULL, pick_up_work, NULL);
if (error) {
pr_op_err("Could not spawn validation thread %zu: %s",
t, strerror(error));
}
static struct cache_file *
-state_find_file(struct rrdp_state *state, char const *url, size_t len)
+state_find_file(struct rrdp_state const *state, char const *url, size_t len)
{
struct cache_file *file;
HASH_FIND(hh, state->files, url, len, file);
}
char const *
-rrdp_file(struct rrdp_state *state, char const *url)
+rrdp_file(struct rrdp_state const *state, char const *url)
{
struct cache_file *file;
file = state_find_file(state, url, strlen(url));
int rrdp_update(struct cache_mapping const *, time_t, bool *,
struct cache_sequence *, struct rrdp_state **);
-char const *rrdp_file(struct rrdp_state *, char const *);
+char const *rrdp_file(struct rrdp_state const *, char const *);
char const *rrdp_create_fallback(char *, struct rrdp_state **, char const *);
#include "common.h"
#include "log.h"
+STAILQ_HEAD(validation_tasks, validation_task);
+
/* Queued, not yet claimed tasks */
-static STAILQ_HEAD(validation_tasks, validation_task) tasks;
-/* Active tasks (length of @tasks plus number of running tasks) */
+static struct validation_tasks tasks;
+/* Queued, but not yet available for claiming */
+static struct validation_tasks busy;
+/* Active tasks (length(@tasks) + length(@busy) + number of running tasks) */
static int active;
static bool enabled = true;
task_setup(void)
{
STAILQ_INIT(&tasks);
+ STAILQ_INIT(&busy);
active = 0;
enabled = true;
panic_on_fail(pthread_mutex_init(&lock, NULL), "pthread_mutex_init");
}
static void
-cleanup(void)
+cleanup_tasks(struct validation_tasks *tasks)
{
struct validation_task *task;
- enabled = false;
- active = 0;
- while (!STAILQ_EMPTY(&tasks)) {
- task = STAILQ_FIRST(&tasks);
- STAILQ_REMOVE_HEAD(&tasks, lh);
+ while (!STAILQ_EMPTY(tasks)) {
+ task = STAILQ_FIRST(tasks);
+ STAILQ_REMOVE_HEAD(tasks, lh);
task_free(task);
}
}
+static void
+cleanup(void)
+{
+ enabled = false;
+ active = 0;
+ cleanup_tasks(&tasks);
+ cleanup_tasks(&busy);
+}
+
void
task_start(void)
{
task->ca = ca;
mutex_lock(&lock);
-
if (enabled) {
STAILQ_INSERT_TAIL(&tasks, task, lh);
+ task = NULL;
active++;
- } else {
- task_free(task);
}
-
mutex_unlock(&lock);
+ if (task) {
+ task_free(task); /* Couldn't queue */
+ return 0;
+ }
+
return 1;
}
+void
+task_requeue_busy(struct validation_task *task)
+{
+ mutex_lock(&lock);
+ if (enabled) {
+ STAILQ_INSERT_TAIL(&busy, task, lh);
+ task = NULL;
+ }
+ mutex_unlock(&lock);
+
+ if (task)
+ task_free(task); /* Couldn't queue */
+}
+
/* Wakes up threads currently waiting for tasks. */
void
task_wakeup(void)
mutex_unlock(&lock);
}
+void
+task_wakeup_busy(void)
+{
+ mutex_lock(&lock);
+ STAILQ_CONCAT(&tasks, &busy);
+ panic_on_fail(pthread_cond_broadcast(&awakener),
+ "pthread_cond_broadcast");
+ mutex_unlock(&lock);
+}
+
/*
* Frees the @prev previous task, and returns the next one.
*
- * If no task is available yet, will sleep until someone calls task_wakeup().
+ * If no task is available yet, will sleep until someone calls task_wakeup() or
+ * task_wakeup_busy().
* If all the tasks are done, returns NULL.
*
* Assumes at least one task has been queued before the first dequeue.
void task_teardown(void);
unsigned int task_enqueue(struct cache_mapping *, struct rpki_certificate *);
+void task_requeue_busy(struct validation_task *);
void task_wakeup(void);
+void task_wakeup_busy(void);
struct validation_task *task_dequeue(struct validation_task *);
#endif /* SRC_TASK_H_ */
}
MOCK_VOID(__delete_node_cb, struct cache_node const *node)
+MOCK_VOID(task_wakeup_busy, void)
/* Helpers */
}
static struct cache_cage *
-run_dl_rsync(char *caRepository, unsigned int expected_calls)
+run_dl_rsync(char *caRepository, int expected_err, unsigned int expected_calls)
{
struct sia_uris sias = { .caRepository = caRepository };
- struct cache_cage *cage;
+ struct cache_cage *cage = NULL;
rsync_counter = 0;
https_counter = 0;
printf("---- Downloading... ----\n");
- cage = cache_refresh_sias(&sias);
+ ck_assert_int_eq(expected_err, cache_refresh_by_sias(&sias, &cage));
printf("---- Downloaded. ----\n");
ck_assert_uint_eq(expected_calls, rsync_counter);
ck_assert_uint_eq(0, https_counter);
rsync_counter = 0;
https_counter = 0;
printf("---- Downloading... ----\n");
- result = cache_refresh_url(url);
+ result = cache_refresh_by_url(url);
printf("---- Downloaded. ----\n");
ck_assert_uint_eq(0, rsync_counter);
ck_assert_uint_eq(expected_calls, https_counter);
ck_assert_str(expected_result, result);
- ck_assert_str(NULL, cache_fallback_url(url));
+ ck_assert_str(NULL, cache_get_fallback(url));
}
ck_cage(struct cache_cage *cage, char const *url,
char const *refresh, char const *fallback)
{
- struct cache_node *bkp;
+ struct cache_node const *bkp;
ck_assert_str(refresh, cage_map_file(cage, url));
{
node->map.url = url;
node->map.path = path;
- node->fresh = fresh;
+ node->state = fresh ? DLS_FRESH : DLS_OUTDATED; /* XXX (test) */
node->dlerr = dlerr;
node->rrdp = NULL;
}
{
node->map.url = url;
node->map.path = path;
- node->fresh = fresh;
+ node->state = fresh ? DLS_FRESH : DLS_OUTDATED;
node->dlerr = dlerr;
node->rrdp = NULL;
}
{
ck_assert_str_eq(expected->map.url, actual->map.url);
ck_assert_str_eq(expected->map.path, actual->map.path);
- ck_assert_int_eq(expected->fresh, actual->fresh);
+ ck_assert_int_eq(expected->state, actual->state);
ck_assert_int_eq(expected->dlerr, actual->dlerr);
if (expected->rrdp == NULL)
ck_assert_ptr_eq(expected->rrdp, actual->rrdp);
static void
unfreshen(struct cache_table *tbl, struct cache_node *node)
{
- node->fresh = 0;
+ node->state = DLS_OUTDATED;
}
static int
setup_test();
printf("==== Startup ====\n");
- cage = run_dl_rsync("rsync://a.b.c/d", 1);
+ cage = run_dl_rsync("rsync://a.b.c/d", 0, 1);
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
free(cage);
printf("==== Redownload same file, nothing should happen ====\n");
- cage = run_dl_rsync("rsync://a.b.c/d", 0);
+ cage = run_dl_rsync("rsync://a.b.c/d", 0, 0);
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
* download d, we needn't bother redownloading d/e.
*/
printf("==== Don't redownload child ====\n");
- cage = run_dl_rsync("rsync://a.b.c/d/e", 0);
+ cage = run_dl_rsync("rsync://a.b.c/d/e", 0, 0);
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
* and there would be consequences for violating it.
*/
printf("==== rsync truncated ====\n");
- cage = run_dl_rsync("rsync://x.y.z/m/n/o", 1);
+ cage = run_dl_rsync("rsync://x.y.z/m/n/o", 0, 1);
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://x.y.z/m", "rsync/1", NULL);
ck_cage(cage, "rsync://x.y.z/m/n/o", "rsync/1/n/o", NULL);
free(cage);
printf("==== Sibling ====\n");
- cage = run_dl_rsync("rsync://a.b.c/e/f", 1);
+ cage = run_dl_rsync("rsync://a.b.c/e/f", 0, 1);
ck_assert_ptr_ne(NULL, cage);
ck_cage(cage, "rsync://a.b.c/e", "rsync/2", NULL);
ck_cage(cage, "rsync://a.b.c/e/f/x/y/z", "rsync/2/f/x/y/z", NULL);
printf("==== Startup ====\n");
dl_error = 0;
- free(run_dl_rsync("rsync://a.b.c/d", 1));
+ free(run_dl_rsync("rsync://a.b.c/d", 0, 1));
dl_error = EINVAL;
- ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 1));
+ ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 1));
ck_cache_rsync(nodes);
printf("==== Regardless of error, not reattempted because same iteration ====\n");
dl_error = EINVAL;
- ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 0));
+ ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 0));
ck_cache_rsync(nodes);
dl_error = 0;
- ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 0));
+ ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 0));
ck_cache_rsync(nodes);
cleanup_test();
#include "alloc.c"
#include "common.c"
#include "mock.c"
+#include "types/array.h"
#include "types/map.c"
#include "types/path.c"
}
END_TEST
+#define TEST_TASKS 3000
+
+struct test_task {
+ char id[8];
+ STAILQ_ENTRY(test_task) lh;
+};
+
+static STAILQ_HEAD(test_task_list, test_task) test_tasks;
+static pthread_mutex_t test_tasks_lock = PTHREAD_MUTEX_INITIALIZER;
+static bool return_busy;
+
+static void
+populate_test_tasks(void)
+{
+ struct test_task *task;
+ int printed;
+ unsigned int i;
+
+ STAILQ_INIT(&test_tasks);
+ for (i = 0; i < TEST_TASKS; i++) {
+ task = pmalloc(sizeof(struct test_task));
+ printed = snprintf(task->id, sizeof(task->id), "%u", i);
+ ck_assert_int_gt(printed, 0);
+ ck_assert_int_lt(printed, sizeof(task->id));
+ STAILQ_INSERT_TAIL(&test_tasks, task, lh);
+ }
+
+ printf("+ th-1: Queuing 'starter'\n");
+ queue_1("starter");
+}
+
+static int
+certificate_traverse_mock(struct rpki_certificate *ca, int thid)
+{
+ struct test_task *new[10];
+ unsigned int n;
+
+ /* Queue 10 of the available tasks for each dequeue */
+
+ mutex_lock(&test_tasks_lock);
+ for (n = 0; n < 10; n++) {
+ new[n] = STAILQ_FIRST(&test_tasks);
+ if (new[n])
+ STAILQ_REMOVE_HEAD(&test_tasks, lh);
+ }
+ mutex_unlock(&test_tasks_lock);
+
+ for (n = 0; n < 10; n++) {
+ if (!new[n])
+ break;
+ printf("+ th%d: Queuing '%s'\n", thid, new[n]->id);
+ queue_1(new[n]->id);
+ free(new[n]);
+ }
+
+ if (n != 0)
+ task_wakeup();
+
+ if (return_busy && (rand() & 3) == 0)
+ return EBUSY; /* Return "busy" 25% of the time */
+
+ return 0;
+}
+
+static void *
+user_thread(void *arg)
+{
+ int thid = *((int *)arg);
+ struct validation_task *task = NULL;
+ int total_dequeued = 0;
+
+ printf("th%d: Started.\n", thid);
+
+ while ((task = task_dequeue(task)) != NULL) {
+ printf("- th%d: Dequeued '%s'\n", thid, task->ca->map.url);
+ total_dequeued++;
+
+ if (certificate_traverse_mock(task->ca, thid) == EBUSY) {
+ printf("+ th%d: Requeuing '%s'\n",
+ thid, task->ca->map.url);
+ task_requeue_busy(task);
+ task = NULL;
+ }
+ }
+
+ printf("th%d: Dequeued %u times.\n", thid, total_dequeued);
+ return NULL;
+}
+
+static void
+run_threads(void)
+{
+ pthread_t threads[10];
+ int thids[10];
+ unsigned int i;
+
+ for (i = 0; i < ARRAY_LEN(threads); i++) {
+ thids[i] = i;
+ ck_assert_int_eq(0, pthread_create(&threads[i], NULL,
+ user_thread, &thids[i]));
+ }
+
+ for (i = 0; i < ARRAY_LEN(threads); i++)
+ ck_assert_int_eq(0, pthread_join(threads[i], NULL));
+
+ ck_assert_int_eq(1, STAILQ_EMPTY(&test_tasks));
+ ck_assert_ptr_eq(NULL, task_dequeue(NULL));
+}
+
+START_TEST(test_queue_multiuser)
+{
+ return_busy = false;
+
+ task_setup();
+ task_start();
+
+ populate_test_tasks();
+ run_threads();
+
+ task_stop();
+ task_teardown();
+}
+END_TEST
+
+static void *
+release_busies(void *arg)
+{
+ unsigned int i;
+
+ for (i = 0; i < 2; i++) {
+ sleep(1);
+ printf("Waking up busy tasks!\n");
+ task_wakeup_busy();
+ }
+
+ sleep(1);
+ return_busy = false;
+ printf("Waking up busy tasks for the last time!\n");
+ task_wakeup_busy();
+
+ return NULL;
+}
+
+START_TEST(test_queue_multiuser_busy)
+{
+ pthread_t thr;
+
+ return_busy = true;
+
+ task_setup();
+ task_start();
+
+ ck_assert_int_eq(0, pthread_create(&thr, NULL, release_busies, NULL));
+
+ populate_test_tasks();
+ run_threads();
+
+ ck_assert_int_eq(0, pthread_join(thr, NULL));
+
+ task_stop();
+ task_teardown();
+}
+END_TEST
+
static Suite *create_suite(void)
{
Suite *suite;
tcase_add_test(queue, test_queue_3);
tcase_add_test(queue, test_queue_multiple);
tcase_add_test(queue, test_queue_interrupted);
+ tcase_add_test(queue, test_queue_multiuser);
+ tcase_add_test(queue, test_queue_multiuser_busy);
suite = suite_create("task");
suite_add_tcase(suite, queue);