From ed57c4de4ca1dec60a2dc63f2e96fe1952f6f1ed Mon Sep 17 00:00:00 2001 From: Wouter Wijngaards Date: Fri, 25 Jan 2008 15:13:39 +0000 Subject: [PATCH] - fixup race problems from opensll in rand init from library, with a mutex around the rand init. - fix pass async_id=NULL to _async resolve(). - rewrote _wait() routine, so that it is threadsafe. - cancelation is threadsafe. git-svn-id: file:///svn/unbound/trunk@902 be551aaa-1e26-0410-a405-d3ace91eadb9 --- doc/Changelog | 5 ++ libunbound/unbound.c | 147 +++++++++++++++++++++++++++++-------------- libunbound/worker.c | 21 ++++++- testcode/asynclook.c | 141 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 260 insertions(+), 54 deletions(-) diff --git a/doc/Changelog b/doc/Changelog index a216f7957..dfa6297cf 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -6,6 +6,11 @@ - please doxygen, put doxygen comment in one place. - asynclook -b blocking mode and test. - refactor asynclook, nicer code. + - fixup race problems from opensll in rand init from library, with + a mutex around the rand init. + - fix pass async_id=NULL to _async resolve(). + - rewrote _wait() routine, so that it is threadsafe. + - cancelation is threadsafe. 24 January 2008: Wouter - tested the cancel() function. diff --git a/libunbound/unbound.c b/libunbound/unbound.c index aab29f521..6a650592d 100644 --- a/libunbound/unbound.c +++ b/libunbound/unbound.c @@ -322,51 +322,24 @@ int ub_val_poll(struct ub_val_ctx* ctx) { struct timeval t; - int r; memset(&t, 0, sizeof(t)); - lock_basic_lock(&ctx->rrpipe_lock); - r = pollit(ctx, &t); - lock_basic_unlock(&ctx->rrpipe_lock); - return r; -} - -int -ub_val_wait(struct ub_val_ctx* ctx) -{ - int r; - lock_basic_lock(&ctx->cfglock); - while(ctx->num_async > 0) { - lock_basic_unlock(&ctx->cfglock); - lock_basic_lock(&ctx->rrpipe_lock); - r = pollit(ctx, NULL); - lock_basic_unlock(&ctx->rrpipe_lock); - if(r) - ub_val_process(ctx); - lock_basic_lock(&ctx->cfglock); - } - lock_basic_unlock(&ctx->cfglock); - return UB_NOERROR; + /* no need to hold lock while testing for readability. */ + return pollit(ctx, &t); } int ub_val_fd(struct ub_val_ctx* ctx) { - int fd; - lock_basic_lock(&ctx->rrpipe_lock); - fd = ctx->rrpipe[0]; - lock_basic_unlock(&ctx->rrpipe_lock); - return fd; + return ctx->rrpipe[0]; } /** process answer from bg worker */ static int -process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len) +process_answer_detail(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len, + ub_val_callback_t* cb, void** cbarg, int* err, + struct ub_val_result** res) { - int err; struct ctx_query* q; - ub_val_callback_t cb; - void* cbarg; - struct ub_val_result* res; if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) { log_err("error: bad data from bg worker %d", (int)context_serial_getcmd(msg, len)); @@ -374,7 +347,7 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len) } lock_basic_lock(&ctx->cfglock); - q = context_deserialize_answer(ctx, msg, len, &err); + q = context_deserialize_answer(ctx, msg, len, err); if(!q) { lock_basic_unlock(&ctx->cfglock); /* probably simply the lookup that failed, i.e. @@ -384,22 +357,27 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len) log_assert(q->async); /* grab cb while locked */ - cb = q->cb; - cbarg = q->cb_arg; - if(err) { - res = NULL; + if(q->cancelled) { + *cb = NULL; + *cbarg = NULL; + } else { + *cb = q->cb; + *cbarg = q->cb_arg; + } + if(*err) { + *res = NULL; ub_val_result_free(q->res); } else { /* parse the message, extract rcode, fill result */ ldns_buffer* buf = ldns_buffer_new(q->msg_len); struct regional* region = regional_create(); - res = q->res; - res->rcode = LDNS_RCODE_SERVFAIL; + *res = q->res; + (*res)->rcode = LDNS_RCODE_SERVFAIL; if(region && buf) { ldns_buffer_clear(buf); ldns_buffer_write(buf, q->msg, q->msg_len); ldns_buffer_flip(buf); - libworker_enter_result(res, buf, region, + libworker_enter_result(*res, buf, region, q->msg_security); } ldns_buffer_free(buf); @@ -412,20 +390,38 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len) context_query_delete(q); lock_basic_unlock(&ctx->cfglock); + if(*cb) return 2; + return 1; +} + +/** process answer from bg worker */ +static int +process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len) +{ + int err; + ub_val_callback_t cb; + void* cbarg; + struct ub_val_result* res; + int r; + + r = process_answer_detail(ctx, msg, len, &cb, &cbarg, &err, &res); + /* no locks held while calling callback, so that library is * re-entrant. */ - (*cb)(cbarg, err, res); + if(r == 2) + (*cb)(cbarg, err, res); - return 1; + return r; } int ub_val_process(struct ub_val_ctx* ctx) { int r; - uint8_t* msg = NULL; - uint32_t len = 0; + uint8_t* msg; + uint32_t len; while(1) { + msg = NULL; lock_basic_lock(&ctx->rrpipe_lock); r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); lock_basic_unlock(&ctx->rrpipe_lock); @@ -442,6 +438,59 @@ ub_val_process(struct ub_val_ctx* ctx) return UB_NOERROR; } +int +ub_val_wait(struct ub_val_ctx* ctx) +{ + int err; + ub_val_callback_t cb; + void* cbarg; + struct ub_val_result* res; + int r; + uint8_t* msg; + uint32_t len; + /* this is basically the same loop as _process(), but with changes. + * holds the rrpipe lock and waits with pollit */ + while(1) { + lock_basic_lock(&ctx->rrpipe_lock); + lock_basic_lock(&ctx->cfglock); + if(ctx->num_async == 0) { + lock_basic_unlock(&ctx->cfglock); + lock_basic_unlock(&ctx->rrpipe_lock); + break; + } + lock_basic_unlock(&ctx->cfglock); + + /* keep rrpipe locked, while + * o waiting for pipe readable + * o parsing message + * o possibly decrementing num_async + * do callback without lock + */ + r = pollit(ctx, NULL); + if(r) { + r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); + if(r == 0) { + lock_basic_unlock(&ctx->rrpipe_lock); + return UB_PIPE; + } + if(r == -1) { + lock_basic_unlock(&ctx->rrpipe_lock); + continue; + } + r = process_answer_detail(ctx, msg, len, + &cb, &cbarg, &err, &res); + lock_basic_unlock(&ctx->rrpipe_lock); + if(r == 0) + return UB_PIPE; + if(r == 2) + (*cb)(cbarg, err, res); + } else { + lock_basic_unlock(&ctx->rrpipe_lock); + } + } + return UB_NOERROR; +} + int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype, int rrclass, struct ub_val_result** result) @@ -491,7 +540,8 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype, uint8_t* msg = NULL; uint32_t len = 0; - *async_id = 0; + if(async_id) + *async_id = 0; lock_basic_lock(&ctx->cfglock); if(!ctx->finalized) { int r = context_finalize(ctx); @@ -528,7 +578,8 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype, lock_basic_unlock(&ctx->cfglock); return UB_NOMEM; } - *async_id = q->querynum; + if(async_id) + *async_id = q->querynum; lock_basic_unlock(&ctx->cfglock); lock_basic_lock(&ctx->qqpipe_lock); @@ -557,6 +608,7 @@ ub_val_cancel(struct ub_val_ctx* ctx, int async_id) lock_basic_unlock(&ctx->cfglock); return UB_NOMEM; } + q->cancelled = 1; /* delete it */ if(!ctx->dothread) { /* if forked */ @@ -722,7 +774,6 @@ ub_val_ctx_resolvconf(struct ub_val_ctx* ctx, char* fname) fclose(in); if(numserv == 0) { /* from resolv.conf(5) if none given, use localhost */ - log_info("resconf: no nameservers, using localhost"); return ub_val_ctx_set_fwd(ctx, "127.0.0.1"); } return UB_NOERROR; diff --git a/libunbound/worker.c b/libunbound/worker.c index 4af41b198..c387b7f74 100644 --- a/libunbound/worker.c +++ b/libunbound/worker.c @@ -125,11 +125,16 @@ libworker_setup(struct ub_val_ctx* ctx) seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^ (((unsigned int)w->thread_num)<<17); seed ^= (unsigned int)w->env->alloc->next_id; + lock_basic_lock(&ctx->cfglock); + /* Openssl RAND_... functions are not as threadsafe as documented, + * put a lock around them. */ if(!ub_initstate(seed, w->env->rnd, RND_STATE_SIZE)) { + lock_basic_unlock(&ctx->cfglock); seed = 0; libworker_delete(w); return NULL; } + lock_basic_unlock(&ctx->cfglock); seed = 0; w->base = comm_base_create(); @@ -308,6 +313,9 @@ int libworker_bg(struct ub_val_ctx* ctx) lock_basic_unlock(&ctx->cfglock); w = libworker_setup(ctx); w->is_bg_thread = 1; +#ifdef ENABLE_LOCK_CHECKS + w->thread_num = 1; /* for nicer DEBUG checklocks */ +#endif if(!w) return UB_NOMEM; ub_thread_create(&ctx->bg_tid, libworker_dobg, w); } else { @@ -762,11 +770,13 @@ libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock) if(r != (ssize_t)sizeof(len)) { if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) { log_err("msg write failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); return 0; } } if(write(fd, buf, len) == -1) { log_err("msg write failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); return 0; } if(!fd_set_nonblock(fd)) @@ -798,22 +808,29 @@ libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock) if(r != (ssize_t)sizeof(*len)) { if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) { log_err("msg read failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); return 0; } - if(r == 0) /* EOF */ + if(r == 0) /* EOF */ { + (void)fd_set_nonblock(fd); return 0; + } } *buf = (uint8_t*)malloc(*len); if(!*buf) { log_err("out of memory"); + (void)fd_set_nonblock(fd); return 0; } if((r=read(fd, *buf, *len)) == -1) { log_err("msg read failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); return 0; } - if(r == 0) /* EOF */ + if(r == 0) { /* EOF */ + (void)fd_set_nonblock(fd); return 0; + } if(!fd_set_nonblock(fd)) return 0; return 1; diff --git a/testcode/asynclook.c b/testcode/asynclook.c index ec8d59498..ad8de9546 100644 --- a/testcode/asynclook.c +++ b/testcode/asynclook.c @@ -43,6 +43,7 @@ #include "config.h" #include "libunbound/unbound.h" #include "util/locks.h" +#include "util/log.h" /** * result list for the lookups @@ -73,6 +74,7 @@ void usage(char* argv[]) printf(" -h : this help message\n"); printf(" -r fname : read resolv.conf from fname\n"); printf(" -t : use a resolver thread instead of forking a process\n"); + printf(" -x : perform extended threaded test\n"); exit(1); } @@ -104,7 +106,8 @@ print_result(struct lookinfo* info) } /** this is a function of type ub_val_callback_t */ -void lookup_is_done(void* mydata, int err, struct ub_val_result* result) +static void +lookup_is_done(void* mydata, int err, struct ub_val_result* result) { /* cast mydata back to the correct type */ struct lookinfo* info = (struct lookinfo*)mydata; @@ -116,7 +119,8 @@ void lookup_is_done(void* mydata, int err, struct ub_val_result* result) } /** check error, if bad, exit with error message */ -static void checkerr(const char* desc, int err) +static void +checkerr(const char* desc, int err) { if(err != 0) { printf("%s error: %s\n", desc, ub_val_strerror(err)); @@ -124,6 +128,129 @@ static void checkerr(const char* desc, int err) } } +/** number of threads to make in extended test */ +#define NUMTHR 10 + +/** struct for extended thread info */ +struct ext_thr_info { + /** thread num for debug */ + int thread_num; + /** thread id */ + ub_thread_t tid; + /** context */ + struct ub_val_ctx* ctx; + /** size of array to query */ + int argc; + /** array of names to query */ + char** argv; + /** number of queries to do */ + int numq; +}; + +/** extended bg result callback, this function is ub_val_callback_t */ +static void +ext_callback(void* mydata, int err, struct ub_val_result* result) +{ + int* my_id = (int*)mydata; + int doprint = 0; + if(my_id) { + /* I have an id, make sure we are not cancelled */ + if(*my_id == 0) { + printf("error: query returned, but was cancelled\n"); + exit(1); + } + if(doprint) + printf("cb %d: ", *my_id); + } + checkerr("ext_callback", err); + log_assert(result); + if(doprint) { + struct lookinfo pi; + pi.name = result?result->qname:"noname"; + pi.result = result; + pi.err = 0; + print_result(&pi); + } + ub_val_result_free(result); +} + +/** extended thread worker */ +static void* +ext_thread(void* arg) +{ + struct ext_thr_info* inf = (struct ext_thr_info*)arg; + int i, r; + struct ub_val_result* result; + int* async_ids = NULL; + log_thread_set(&inf->thread_num); + if(inf->thread_num > NUMTHR*2/3) { + async_ids = (int*)calloc((size_t)inf->numq, sizeof(int)); + if(!async_ids) { + printf("out of memory\n"); + exit(1); + } + } + for(i=0; inumq; i++) { + if(async_ids) { + r = ub_val_resolve_async(inf->ctx, + inf->argv[i%inf->argc], LDNS_RR_TYPE_A, + LDNS_RR_CLASS_IN, &async_ids[i], ext_callback, + &async_ids[i]); + checkerr("ub_val_resolve_async", r); + if(i > 100) { + r = ub_val_cancel(inf->ctx, async_ids[i-100]); + checkerr("ub_val_cancel", r); + async_ids[i-100]=0; + } + } else if(inf->thread_num > NUMTHR/2) { + /* async */ + r = ub_val_resolve_async(inf->ctx, + inf->argv[i%inf->argc], LDNS_RR_TYPE_A, + LDNS_RR_CLASS_IN, NULL, ext_callback, NULL); + checkerr("ub_val_resolve_async", r); + } else { + /* blocking */ + r = ub_val_resolve(inf->ctx, inf->argv[i%inf->argc], + LDNS_RR_TYPE_A, LDNS_RR_CLASS_IN, &result); + checkerr("ub_val_resolve", r); + } + } + if(inf->thread_num > NUMTHR/2) { + r = ub_val_wait(inf->ctx); + checkerr("ub_val_ctx_wait", r); + } + free(async_ids); + + return NULL; +} + +/** perform extended threaded test */ +static int +ext_test(struct ub_val_ctx* ctx, int argc, char** argv) +{ + struct ext_thr_info inf[NUMTHR]; + int i; + printf("extended test start (%d threads)\n", NUMTHR); + for(i=0; i