From: Alberto Leiva Popper Date: Sat, 9 Nov 2024 01:00:05 +0000 (-0600) Subject: Add thread safety to the new cache X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f5a047258dddb8b99d2db102a686e343cace17b2;p=thirdparty%2FFORT-validator.git Add thread safety to the new cache The thread that handles the first task that needs repository A refreshes A. While the refresh happens, threads handling other tasks that also need A postpone said tasks. Such postponed tasks wake up when the refresh completes. --- diff --git a/src/cache.c b/src/cache.c index a3e6359d..b56c3340 100644 --- a/src/cache.c +++ b/src/cache.c @@ -20,16 +20,37 @@ #include "log.h" #include "rrdp.h" #include "rsync.h" +#include "task.h" #include "types/array.h" #include "types/path.h" #include "types/url.h" #include "types/uthash.h" +enum dl_state { + DLS_OUTDATED = 0, /* Not downloaded yet */ + DLS_ONGOING, /* Download in progress */ + DLS_FRESH, /* Download complete */ +}; + +/* + * This is a delicate structure; pay attention. + * + * During the multithreaded stage of the validation cycle, the entire cache_node + * (except @hh) becomes (effectively) constant when @state becomes DLS_FRESH. + * + * The cache (ie. this module) only hands the node to the validation code + * (through cache_cage) when @state becomes DLS_FRESH. + * + * This is intended to allow the validation code to read the remaining fields + * (all except @hh) without having to hold the table mutex. + * + * C cannot entirely ensure the node remains constant after it's handed outside; + * this must be done through careful coding and review. + */ struct cache_node { struct cache_mapping map; - /* XXX change to boolean? */ - int fresh; /* Refresh already attempted? */ + enum dl_state state; int dlerr; /* Result code of recent download attempt */ time_t mtim; /* Last successful download time, or zero */ @@ -40,12 +61,20 @@ struct cache_node { typedef int (*dl_cb)(struct cache_node *rpp); +/* + * When concurrency is at play, you need @lock to access @nodes and @seq. + * @name, @enabled and @download stay constant through the validation. + * + * @lock also protects the nodes' @state and @hh, which have additional rules. + * (See cache_node.) + */ struct cache_table { char *name; bool enabled; struct cache_sequence seq; struct cache_node *nodes; /* Hash Table */ dl_cb download; + pthread_mutex_t lock; }; static struct rpki_cache { @@ -63,8 +92,8 @@ static struct rpki_cache { static volatile sig_atomic_t lockfile_owned; struct cache_cage { - struct cache_node *refresh; - struct cache_node *fallback; + struct cache_node const *refresh; + struct cache_node const *fallback; }; struct cache_commit { @@ -75,6 +104,7 @@ struct cache_commit { }; static STAILQ_HEAD(cache_commits, cache_commit) commits = STAILQ_HEAD_INITIALIZER(commits); +static pthread_mutex_t commits_lock = PTHREAD_MUTEX_INITIALIZER; #define LOCKFILE ".lock" #define INDEX_FILE "index.json" @@ -163,6 +193,8 @@ init_table(struct cache_table *tbl, char *name, bool enabled, dl_cb dl) tbl->enabled = enabled; cseq_init(&tbl->seq, name, false); tbl->download = dl; + panic_on_fail(pthread_mutex_init(&tbl->lock, NULL), + "pthread_mutex_init"); } static void @@ -608,6 +640,7 @@ dl_http(struct cache_node *file) return 0; } +/* Caller must lock @tbl->lock */ static struct cache_node * find_node(struct cache_table *tbl, char const *url, size_t urlen) { @@ -640,38 +673,75 @@ provide_node(struct cache_table *tbl, char const *url) return node; } -/* @uri is either a caRepository or a rpkiNotify */ -static struct cache_node * -do_refresh(struct cache_table *tbl, char const *uri) +/* + * @uri is either a caRepository or a rpkiNotify + * By contract, only sets @result on return 0. + * By contract, @result->state will be DLS_FRESH on return 0. + */ +static int +do_refresh(struct cache_table *tbl, char const *uri, struct cache_node **result) { struct cache_node *node; + bool downloaded = false; pr_val_debug("Trying %s (online)...", uri); if (!tbl->enabled) { pr_val_debug("Protocol disabled."); - return NULL; + return ESRCH; } if (tbl == &cache.rsync) { char *module = get_rsync_module(uri); if (module == NULL) - return NULL; + return EINVAL; + mutex_lock(&tbl->lock); node = provide_node(tbl, module); free(module); } else { + mutex_lock(&tbl->lock); node = provide_node(tbl, uri); } - if (!node) - return NULL; + if (!node) { + mutex_unlock(&tbl->lock); + return EINVAL; + } + + switch (node->state) { + case DLS_OUTDATED: + node->state = DLS_ONGOING; + mutex_unlock(&tbl->lock); - if (!node->fresh) { - node->fresh = true; node->dlerr = tbl->download(node); + downloaded = true; + + mutex_lock(&tbl->lock); + node->state = DLS_FRESH; + break; + case DLS_ONGOING: + mutex_unlock(&tbl->lock); + pr_val_debug("Refresh ongoing."); + return EBUSY; + case DLS_FRESH: + break; + default: + pr_crit("Unknown node state: %d", node->state); } - pr_val_debug(node->dlerr ? "Refresh failed." : "Refresh succeeded."); - return node; + mutex_unlock(&tbl->lock); + /* node->state is guaranteed to be DLS_FRESH at this point. */ + + if (downloaded) /* Kickstart tasks that fell into DLS_ONGOING */ + task_wakeup_busy(); + + if (node->dlerr != 0) { + pr_val_debug("Refresh failed."); + return node->dlerr; + } + + pr_val_debug("Refresh succeeded."); + *result = node; + return 0; } static struct cache_node * @@ -688,29 +758,32 @@ get_fallback(char const *caRepository) /* Do not free nor modify the result. */ char * -cache_refresh_url(char const *url) +cache_refresh_by_url(char const *url) { struct cache_node *node = NULL; - // XXX mutex // XXX review result signs // XXX Normalize @url if (url_is_https(url)) - node = do_refresh(&cache.https, url); + do_refresh(&cache.https, url, &node); else if (url_is_rsync(url)) - node = do_refresh(&cache.rsync, url); + do_refresh(&cache.rsync, url, &node); - // XXX Maybe strdup path so the caller can't corrupt our string - return (node && !node->dlerr) ? node->map.path : NULL; + return node ? node->map.path : NULL; } /* Do not free nor modify the result. */ char * -cache_fallback_url(char const *url) +cache_get_fallback(char const *url) { struct cache_node *node; + /* + * The fallback table is read-only until the cleanup. + * Mutex not needed here. + */ + pr_val_debug("Trying %s (offline)...", url); node = find_node(&cache.fallback, url, strlen(url)); @@ -729,44 +802,53 @@ cache_fallback_url(char const *url) * XXX Need to normalize the sias. * XXX Fallback only if parent is fallback */ -struct cache_cage * -cache_refresh_sias(struct sia_uris *sias) +int +cache_refresh_by_sias(struct sia_uris *sias, struct cache_cage **result) { - struct cache_cage *cage; struct cache_node *node; + struct cache_cage *cage; // XXX Make sure somewhere validates rpkiManifest matches caRepository. - // XXX mutex // XXX review result signs // XXX normalize rpkiNotify & caRepository? - cage = pzalloc(sizeof(struct cache_cage)); - cage->fallback = get_fallback(sias->caRepository); - + /* Try RRDP + optional fallback */ if (sias->rpkiNotify) { - node = do_refresh(&cache.rrdp, sias->rpkiNotify); - if (node && !node->dlerr) { - cage->refresh = node; - return cage; /* RRDP + optional fallback happy path */ + switch (do_refresh(&cache.rrdp, sias->rpkiNotify, &node)) { + case 0: + goto refresh_success; + case EBUSY: + return EBUSY; } } - node = do_refresh(&cache.rsync, sias->caRepository); - if (node && !node->dlerr) { - cage->refresh = node; - return cage; /* rsync + optional fallback happy path */ + /* Try rsync + optional fallback */ + switch (do_refresh(&cache.rsync, sias->caRepository, &node)) { + case 0: + goto refresh_success; + case EBUSY: + return EBUSY; } - if (cage->fallback == NULL) { - free(cage); - return NULL; - } + /* Try fallback only */ + node = get_fallback(sias->caRepository); /* XXX (test) does this catch notifies? */ + if (!node) + return EINVAL; /* Nothing to work with */ - return cage; /* fallback happy path */ + *result = cage = pmalloc(sizeof(struct cache_cage)); + cage->refresh = NULL; + cage->fallback = node; + return 0; + +refresh_success: + *result = cage = pmalloc(sizeof(struct cache_cage)); + cage->refresh = node; + cage->fallback = get_fallback(sias->caRepository); + return 0; } -char const * -node2file(struct cache_node *node, char const *url) +static char const * +node2file(struct cache_node const *node, char const *url) { if (node == NULL) return NULL; @@ -779,6 +861,12 @@ node2file(struct cache_node *node, char const *url) char const * cage_map_file(struct cache_cage *cage, char const *url) { + /* + * Remember: In addition to honoring the consts of cache->refresh and + * cache->fallback, anything these structures point to MUST NOT be + * modified either. + */ + char const *file; file = node2file(cage->refresh, url); @@ -792,6 +880,12 @@ cage_map_file(struct cache_cage *cage, char const *url) bool cage_disable_refresh(struct cache_cage *cage) { + /* + * Remember: In addition to honoring the consts of cache->refresh and + * cache->fallback, anything these structures point to MUST NOT be + * modified either. + */ + bool enabled = (cage->refresh != NULL); cage->refresh = NULL; @@ -822,12 +916,16 @@ cache_commit_rpp(char const *caRepository, struct rpp *rpp) commit->caRepository = pstrdup(caRepository); commit->files = rpp->files; commit->nfiles = rpp->nfiles; + + mutex_lock(&commits_lock); STAILQ_INSERT_TAIL(&commits, commit, lh); + mutex_unlock(&commits_lock); rpp->files = NULL; rpp->nfiles = 0; } +/* XXX not called */ void cache_commit_file(struct cache_mapping *map) { @@ -840,7 +938,10 @@ cache_commit_file(struct cache_mapping *map) commit->files[0].url = pstrdup(map->url); commit->files[0].path = pstrdup(map->path); commit->nfiles = 1; + + mutex_lock(&commits_lock); STAILQ_INSERT_TAIL(&commits, commit, lh); + mutex_unlock(&commits_lock); } static void @@ -850,10 +951,17 @@ cachent_print(struct cache_node *node) return; printf("\t%s (%s): ", node->map.url, node->map.path); - if (node->fresh) - printf("fresh (errcode %d)", node->dlerr); - else + switch (node->state) { + case DLS_OUTDATED: printf("stale"); + break; + case DLS_ONGOING: + printf("downloading"); + break; + case DLS_FRESH: + printf("fresh (errcode %d)", node->dlerr); + break; + } printf("\n"); } @@ -1093,7 +1201,7 @@ commit_fallbacks(void) strerror(errno)); } -freshen: fb->fresh = 1; +freshen: fb->state = DLS_FRESH; skip: free(commit->caRepository); for (i = 0; i < commit->nfiles; i++) { free(commit->files[i].url); @@ -1104,7 +1212,7 @@ skip: free(commit->caRepository); } HASH_ITER(hh, cache.fallback.nodes, fb, tmp) { - if (fb->fresh) + if (fb->state == DLS_FRESH) continue; /* diff --git a/src/cache.h b/src/cache.h index 24d1ea7d..c791346a 100644 --- a/src/cache.h +++ b/src/cache.h @@ -37,11 +37,11 @@ struct sia_uris { void sias_init(struct sia_uris *); void sias_cleanup(struct sia_uris *); -char *cache_refresh_url(char const *); -char *cache_fallback_url(char const *); +char *cache_refresh_by_url(char const *); +char *cache_get_fallback(char const *); struct cache_cage; -struct cache_cage *cache_refresh_sias(struct sia_uris *); +int cache_refresh_by_sias(struct sia_uris *, struct cache_cage **); char const *cage_map_file(struct cache_cage *, char const *); bool cage_disable_refresh(struct cache_cage *); void cache_commit_rpp(char const *, struct rpp *); diff --git a/src/common.c b/src/common.c index 1f7d7e68..03a5c762 100644 --- a/src/common.c +++ b/src/common.c @@ -32,7 +32,9 @@ void panic_on_fail(int error, char const *function_name) { if (error) - pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.", + pr_crit("%s() returned error code %d. " + "This is too critical for a graceful recovery; " + "I must die now.", function_name, error); } diff --git a/src/object/certificate.c b/src/object/certificate.c index 9de21803..99fd1285 100644 --- a/src/object/certificate.c +++ b/src/object/certificate.c @@ -1900,18 +1900,25 @@ certificate_traverse(struct rpki_certificate *ca) unsigned int queued; int error; - error = certificate_validate(ca); - if (error) - return error; + if (!ca->x509) { + error = certificate_validate(ca); + if (error) + return error; - if (ca->type != CERTYPE_TA && ca->type != CERTYPE_CA) - return 0; + if (ca->type != CERTYPE_TA && ca->type != CERTYPE_CA) + return 0; + } /* else "we already did this, and returned EBUSY" */ - cage = cache_refresh_sias(&ca->sias); - if (!cage) + switch (cache_refresh_by_sias(&ca->sias, &cage)) { + case 0: + break; + case EBUSY: + return EBUSY; + default: return pr_val_err("caRepository '%s' could not be refreshed, " "and there is no fallback in the cache. " "I'm going to have to skip it.", ca->sias.caRepository); + } retry: mft = cage_map_file(cage, ca->sias.rpkiManifest); if (!mft) { diff --git a/src/object/tal.c b/src/object/tal.c index 084412ea..5c33c99d 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -169,7 +169,7 @@ traverse_tal(char const *tal_path, void *arg) /* Online attempts */ ARRAYLIST_FOREACH(&tal.urls, url) { map.url = *url; - map.path = cache_refresh_url(*url); + map.path = cache_refresh_by_url(*url); if (!map.path) continue; if (validate_ta(&tal, &map) != 0) @@ -180,7 +180,7 @@ traverse_tal(char const *tal_path, void *arg) /* Offline fallback attempts */ ARRAYLIST_FOREACH(&tal.urls, url) { map.url = *url; - map.path = cache_fallback_url(*url); + map.path = cache_get_fallback(*url); if (!map.path) continue; if (validate_ta(&tal, &map) != 0) @@ -201,8 +201,12 @@ pick_up_work(void *arg) { struct validation_task *task = NULL; - while ((task = task_dequeue(task)) != NULL) - certificate_traverse(task->ca); + while ((task = task_dequeue(task)) != NULL) { + if (certificate_traverse(task->ca) == EBUSY) { + task_requeue_busy(task); + task = NULL; + } + } return NULL; } @@ -211,7 +215,6 @@ int perform_standalone_validation(void) { pthread_t threads[5]; // XXX variabilize - unsigned int ids[5]; array_index t, t2; int error; @@ -225,8 +228,7 @@ perform_standalone_validation(void) goto end; for (t = 0; t < 5; t++) { - ids[t] = t; - error = pthread_create(&threads[t], NULL, pick_up_work, &ids[t]); + error = pthread_create(&threads[t], NULL, pick_up_work, NULL); if (error) { pr_op_err("Could not spawn validation thread %zu: %s", t, strerror(error)); diff --git a/src/rrdp.c b/src/rrdp.c index a42e4c6f..fee1de1b 100644 --- a/src/rrdp.c +++ b/src/rrdp.c @@ -143,7 +143,7 @@ session_cleanup(struct rrdp_session *meta) } static struct cache_file * -state_find_file(struct rrdp_state *state, char const *url, size_t len) +state_find_file(struct rrdp_state const *state, char const *url, size_t len) { struct cache_file *file; HASH_FIND(hh, state->files, url, len, file); @@ -1276,7 +1276,7 @@ end: fnstack_pop(); } char const * -rrdp_file(struct rrdp_state *state, char const *url) +rrdp_file(struct rrdp_state const *state, char const *url) { struct cache_file *file; file = state_find_file(state, url, strlen(url)); diff --git a/src/rrdp.h b/src/rrdp.h index 804bbe1a..4d7126fc 100644 --- a/src/rrdp.h +++ b/src/rrdp.h @@ -12,7 +12,7 @@ struct rrdp_state; int rrdp_update(struct cache_mapping const *, time_t, bool *, struct cache_sequence *, struct rrdp_state **); -char const *rrdp_file(struct rrdp_state *, char const *); +char const *rrdp_file(struct rrdp_state const *, char const *); char const *rrdp_create_fallback(char *, struct rrdp_state **, char const *); diff --git a/src/task.c b/src/task.c index 555f7516..6fab1945 100644 --- a/src/task.c +++ b/src/task.c @@ -6,9 +6,13 @@ #include "common.h" #include "log.h" +STAILQ_HEAD(validation_tasks, validation_task); + /* Queued, not yet claimed tasks */ -static STAILQ_HEAD(validation_tasks, validation_task) tasks; -/* Active tasks (length of @tasks plus number of running tasks) */ +static struct validation_tasks tasks; +/* Queued, but not yet available for claiming */ +static struct validation_tasks busy; +/* Active tasks (length(@tasks) + length(@busy) + number of running tasks) */ static int active; static bool enabled = true; @@ -27,6 +31,7 @@ void task_setup(void) { STAILQ_INIT(&tasks); + STAILQ_INIT(&busy); active = 0; enabled = true; panic_on_fail(pthread_mutex_init(&lock, NULL), "pthread_mutex_init"); @@ -34,19 +39,26 @@ task_setup(void) } static void -cleanup(void) +cleanup_tasks(struct validation_tasks *tasks) { struct validation_task *task; - enabled = false; - active = 0; - while (!STAILQ_EMPTY(&tasks)) { - task = STAILQ_FIRST(&tasks); - STAILQ_REMOVE_HEAD(&tasks, lh); + while (!STAILQ_EMPTY(tasks)) { + task = STAILQ_FIRST(tasks); + STAILQ_REMOVE_HEAD(tasks, lh); task_free(task); } } +static void +cleanup(void) +{ + enabled = false; + active = 0; + cleanup_tasks(&tasks); + cleanup_tasks(&busy); +} + void task_start(void) { @@ -91,19 +103,35 @@ task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent) task->ca = ca; mutex_lock(&lock); - if (enabled) { STAILQ_INSERT_TAIL(&tasks, task, lh); + task = NULL; active++; - } else { - task_free(task); } - mutex_unlock(&lock); + if (task) { + task_free(task); /* Couldn't queue */ + return 0; + } + return 1; } +void +task_requeue_busy(struct validation_task *task) +{ + mutex_lock(&lock); + if (enabled) { + STAILQ_INSERT_TAIL(&busy, task, lh); + task = NULL; + } + mutex_unlock(&lock); + + if (task) + task_free(task); /* Couldn't queue */ +} + /* Wakes up threads currently waiting for tasks. */ void task_wakeup(void) @@ -114,10 +142,21 @@ task_wakeup(void) mutex_unlock(&lock); } +void +task_wakeup_busy(void) +{ + mutex_lock(&lock); + STAILQ_CONCAT(&tasks, &busy); + panic_on_fail(pthread_cond_broadcast(&awakener), + "pthread_cond_broadcast"); + mutex_unlock(&lock); +} + /* * Frees the @prev previous task, and returns the next one. * - * If no task is available yet, will sleep until someone calls task_wakeup(). + * If no task is available yet, will sleep until someone calls task_wakeup() or + * task_wakeup_busy(). * If all the tasks are done, returns NULL. * * Assumes at least one task has been queued before the first dequeue. diff --git a/src/task.h b/src/task.h index 155e8e41..356cf53f 100644 --- a/src/task.h +++ b/src/task.h @@ -17,7 +17,9 @@ void task_stop(void); void task_teardown(void); unsigned int task_enqueue(struct cache_mapping *, struct rpki_certificate *); +void task_requeue_busy(struct validation_task *); void task_wakeup(void); +void task_wakeup_busy(void); struct validation_task *task_dequeue(struct validation_task *); #endif /* SRC_TASK_H_ */ diff --git a/test/cache_test.c b/test/cache_test.c index 5ba727bc..6eef59e4 100644 --- a/test/cache_test.c +++ b/test/cache_test.c @@ -48,6 +48,7 @@ rsync_download(char const *url, char const *path) } MOCK_VOID(__delete_node_cb, struct cache_node const *node) +MOCK_VOID(task_wakeup_busy, void) /* Helpers */ @@ -61,15 +62,15 @@ setup_test(void) } static struct cache_cage * -run_dl_rsync(char *caRepository, unsigned int expected_calls) +run_dl_rsync(char *caRepository, int expected_err, unsigned int expected_calls) { struct sia_uris sias = { .caRepository = caRepository }; - struct cache_cage *cage; + struct cache_cage *cage = NULL; rsync_counter = 0; https_counter = 0; printf("---- Downloading... ----\n"); - cage = cache_refresh_sias(&sias); + ck_assert_int_eq(expected_err, cache_refresh_by_sias(&sias, &cage)); printf("---- Downloaded. ----\n"); ck_assert_uint_eq(expected_calls, rsync_counter); ck_assert_uint_eq(0, https_counter); @@ -86,13 +87,13 @@ run_dl_https(char const *url, unsigned int expected_calls, rsync_counter = 0; https_counter = 0; printf("---- Downloading... ----\n"); - result = cache_refresh_url(url); + result = cache_refresh_by_url(url); printf("---- Downloaded. ----\n"); ck_assert_uint_eq(0, rsync_counter); ck_assert_uint_eq(expected_calls, https_counter); ck_assert_str(expected_result, result); - ck_assert_str(NULL, cache_fallback_url(url)); + ck_assert_str(NULL, cache_get_fallback(url)); } @@ -100,7 +101,7 @@ static void ck_cage(struct cache_cage *cage, char const *url, char const *refresh, char const *fallback) { - struct cache_node *bkp; + struct cache_node const *bkp; ck_assert_str(refresh, cage_map_file(cage, url)); @@ -215,7 +216,7 @@ init_node_rsync(struct cache_node *node, char *url, char *path, { node->map.url = url; node->map.path = path; - node->fresh = fresh; + node->state = fresh ? DLS_FRESH : DLS_OUTDATED; /* XXX (test) */ node->dlerr = dlerr; node->rrdp = NULL; } @@ -226,7 +227,7 @@ init_node_https(struct cache_node *node, char *url, char *path, { node->map.url = url; node->map.path = path; - node->fresh = fresh; + node->state = fresh ? DLS_FRESH : DLS_OUTDATED; node->dlerr = dlerr; node->rrdp = NULL; } @@ -236,7 +237,7 @@ ck_cache_node_eq(struct cache_node *expected, struct cache_node *actual) { ck_assert_str_eq(expected->map.url, actual->map.url); ck_assert_str_eq(expected->map.path, actual->map.path); - ck_assert_int_eq(expected->fresh, actual->fresh); + ck_assert_int_eq(expected->state, actual->state); ck_assert_int_eq(expected->dlerr, actual->dlerr); if (expected->rrdp == NULL) ck_assert_ptr_eq(expected->rrdp, actual->rrdp); @@ -298,7 +299,7 @@ static time_t epoch; static void unfreshen(struct cache_table *tbl, struct cache_node *node) { - node->fresh = 0; + node->state = DLS_OUTDATED; } static int @@ -347,7 +348,7 @@ START_TEST(test_cache_download_rsync) setup_test(); printf("==== Startup ====\n"); - cage = run_dl_rsync("rsync://a.b.c/d", 1); + cage = run_dl_rsync("rsync://a.b.c/d", 0, 1); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL); ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL); @@ -356,7 +357,7 @@ START_TEST(test_cache_download_rsync) free(cage); printf("==== Redownload same file, nothing should happen ====\n"); - cage = run_dl_rsync("rsync://a.b.c/d", 0); + cage = run_dl_rsync("rsync://a.b.c/d", 0, 0); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL); ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL); @@ -368,7 +369,7 @@ START_TEST(test_cache_download_rsync) * download d, we needn't bother redownloading d/e. */ printf("==== Don't redownload child ====\n"); - cage = run_dl_rsync("rsync://a.b.c/d/e", 0); + cage = run_dl_rsync("rsync://a.b.c/d/e", 0, 0); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL); ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL); @@ -382,7 +383,7 @@ START_TEST(test_cache_download_rsync) * and there would be consequences for violating it. */ printf("==== rsync truncated ====\n"); - cage = run_dl_rsync("rsync://x.y.z/m/n/o", 1); + cage = run_dl_rsync("rsync://x.y.z/m/n/o", 0, 1); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://x.y.z/m", "rsync/1", NULL); ck_cage(cage, "rsync://x.y.z/m/n/o", "rsync/1/n/o", NULL); @@ -391,7 +392,7 @@ START_TEST(test_cache_download_rsync) free(cage); printf("==== Sibling ====\n"); - cage = run_dl_rsync("rsync://a.b.c/e/f", 1); + cage = run_dl_rsync("rsync://a.b.c/e/f", 0, 1); ck_assert_ptr_ne(NULL, cage); ck_cage(cage, "rsync://a.b.c/e", "rsync/2", NULL); ck_cage(cage, "rsync://a.b.c/e/f/x/y/z", "rsync/2/f/x/y/z", NULL); @@ -414,17 +415,17 @@ START_TEST(test_cache_download_rsync_error) printf("==== Startup ====\n"); dl_error = 0; - free(run_dl_rsync("rsync://a.b.c/d", 1)); + free(run_dl_rsync("rsync://a.b.c/d", 0, 1)); dl_error = EINVAL; - ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 1)); + ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 1)); ck_cache_rsync(nodes); printf("==== Regardless of error, not reattempted because same iteration ====\n"); dl_error = EINVAL; - ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 0)); + ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 0)); ck_cache_rsync(nodes); dl_error = 0; - ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 0)); + ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 0)); ck_cache_rsync(nodes); cleanup_test(); diff --git a/test/task_test.c b/test/task_test.c index fe322e93..c2e34392 100644 --- a/test/task_test.c +++ b/test/task_test.c @@ -5,6 +5,7 @@ #include "alloc.c" #include "common.c" #include "mock.c" +#include "types/array.h" #include "types/map.c" #include "types/path.c" @@ -130,6 +131,170 @@ START_TEST(test_queue_interrupted) } END_TEST +#define TEST_TASKS 3000 + +struct test_task { + char id[8]; + STAILQ_ENTRY(test_task) lh; +}; + +static STAILQ_HEAD(test_task_list, test_task) test_tasks; +static pthread_mutex_t test_tasks_lock = PTHREAD_MUTEX_INITIALIZER; +static bool return_busy; + +static void +populate_test_tasks(void) +{ + struct test_task *task; + int printed; + unsigned int i; + + STAILQ_INIT(&test_tasks); + for (i = 0; i < TEST_TASKS; i++) { + task = pmalloc(sizeof(struct test_task)); + printed = snprintf(task->id, sizeof(task->id), "%u", i); + ck_assert_int_gt(printed, 0); + ck_assert_int_lt(printed, sizeof(task->id)); + STAILQ_INSERT_TAIL(&test_tasks, task, lh); + } + + printf("+ th-1: Queuing 'starter'\n"); + queue_1("starter"); +} + +static int +certificate_traverse_mock(struct rpki_certificate *ca, int thid) +{ + struct test_task *new[10]; + unsigned int n; + + /* Queue 10 of the available tasks for each dequeue */ + + mutex_lock(&test_tasks_lock); + for (n = 0; n < 10; n++) { + new[n] = STAILQ_FIRST(&test_tasks); + if (new[n]) + STAILQ_REMOVE_HEAD(&test_tasks, lh); + } + mutex_unlock(&test_tasks_lock); + + for (n = 0; n < 10; n++) { + if (!new[n]) + break; + printf("+ th%d: Queuing '%s'\n", thid, new[n]->id); + queue_1(new[n]->id); + free(new[n]); + } + + if (n != 0) + task_wakeup(); + + if (return_busy && (rand() & 3) == 0) + return EBUSY; /* Return "busy" 25% of the time */ + + return 0; +} + +static void * +user_thread(void *arg) +{ + int thid = *((int *)arg); + struct validation_task *task = NULL; + int total_dequeued = 0; + + printf("th%d: Started.\n", thid); + + while ((task = task_dequeue(task)) != NULL) { + printf("- th%d: Dequeued '%s'\n", thid, task->ca->map.url); + total_dequeued++; + + if (certificate_traverse_mock(task->ca, thid) == EBUSY) { + printf("+ th%d: Requeuing '%s'\n", + thid, task->ca->map.url); + task_requeue_busy(task); + task = NULL; + } + } + + printf("th%d: Dequeued %u times.\n", thid, total_dequeued); + return NULL; +} + +static void +run_threads(void) +{ + pthread_t threads[10]; + int thids[10]; + unsigned int i; + + for (i = 0; i < ARRAY_LEN(threads); i++) { + thids[i] = i; + ck_assert_int_eq(0, pthread_create(&threads[i], NULL, + user_thread, &thids[i])); + } + + for (i = 0; i < ARRAY_LEN(threads); i++) + ck_assert_int_eq(0, pthread_join(threads[i], NULL)); + + ck_assert_int_eq(1, STAILQ_EMPTY(&test_tasks)); + ck_assert_ptr_eq(NULL, task_dequeue(NULL)); +} + +START_TEST(test_queue_multiuser) +{ + return_busy = false; + + task_setup(); + task_start(); + + populate_test_tasks(); + run_threads(); + + task_stop(); + task_teardown(); +} +END_TEST + +static void * +release_busies(void *arg) +{ + unsigned int i; + + for (i = 0; i < 2; i++) { + sleep(1); + printf("Waking up busy tasks!\n"); + task_wakeup_busy(); + } + + sleep(1); + return_busy = false; + printf("Waking up busy tasks for the last time!\n"); + task_wakeup_busy(); + + return NULL; +} + +START_TEST(test_queue_multiuser_busy) +{ + pthread_t thr; + + return_busy = true; + + task_setup(); + task_start(); + + ck_assert_int_eq(0, pthread_create(&thr, NULL, release_busies, NULL)); + + populate_test_tasks(); + run_threads(); + + ck_assert_int_eq(0, pthread_join(thr, NULL)); + + task_stop(); + task_teardown(); +} +END_TEST + static Suite *create_suite(void) { Suite *suite; @@ -141,6 +306,8 @@ static Suite *create_suite(void) tcase_add_test(queue, test_queue_3); tcase_add_test(queue, test_queue_multiple); tcase_add_test(queue, test_queue_interrupted); + tcase_add_test(queue, test_queue_multiuser); + tcase_add_test(queue, test_queue_multiuser_busy); suite = suite_create("task"); suite_add_tcase(suite, queue);