]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
- fixup race problems from opensll in rand init from library, with
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Fri, 25 Jan 2008 15:13:39 +0000 (15:13 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Fri, 25 Jan 2008 15:13:39 +0000 (15:13 +0000)
         a mutex around the rand init.
       - fix pass async_id=NULL to _async resolve().
       - rewrote _wait() routine, so that it is threadsafe.
       - cancelation is threadsafe.

git-svn-id: file:///svn/unbound/trunk@902 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
libunbound/unbound.c
libunbound/worker.c
testcode/asynclook.c

index a216f7957881ee65fb295b544ac2bb2e21c09969..dfa6297cfad0354ab7b0179a3a3eb8d9ed3c635c 100644 (file)
@@ -6,6 +6,11 @@
        - please doxygen, put doxygen comment in one place.
        - asynclook -b blocking mode and test.
        - refactor asynclook, nicer code.
+       - fixup race problems from opensll in rand init from library, with
+         a mutex around the rand init.
+       - fix pass async_id=NULL to _async resolve().
+       - rewrote _wait() routine, so that it is threadsafe.
+       - cancelation is threadsafe.
 
 24 January 2008: Wouter
        - tested the cancel() function.
index aab29f521541636b68383c4621635dfea0488cf8..6a650592d3ee7231fc84afb03f6a27b7d959a811 100644 (file)
@@ -322,51 +322,24 @@ int
 ub_val_poll(struct ub_val_ctx* ctx)
 {
        struct timeval t;
-       int r;
        memset(&t, 0, sizeof(t));
-       lock_basic_lock(&ctx->rrpipe_lock);
-       r = pollit(ctx, &t);
-       lock_basic_unlock(&ctx->rrpipe_lock);
-       return r;
-}
-
-int 
-ub_val_wait(struct ub_val_ctx* ctx)
-{
-       int r;
-       lock_basic_lock(&ctx->cfglock);
-       while(ctx->num_async > 0) {
-               lock_basic_unlock(&ctx->cfglock);
-               lock_basic_lock(&ctx->rrpipe_lock);
-               r = pollit(ctx, NULL);
-               lock_basic_unlock(&ctx->rrpipe_lock);
-               if(r)
-                       ub_val_process(ctx);
-               lock_basic_lock(&ctx->cfglock);
-       }
-       lock_basic_unlock(&ctx->cfglock);
-       return UB_NOERROR;
+       /* no need to hold lock while testing for readability. */
+       return pollit(ctx, &t);
 }
 
 int 
 ub_val_fd(struct ub_val_ctx* ctx)
 {
-       int fd;
-       lock_basic_lock(&ctx->rrpipe_lock);
-       fd = ctx->rrpipe[0];
-       lock_basic_unlock(&ctx->rrpipe_lock);
-       return fd;
+       return ctx->rrpipe[0];
 }
 
 /** process answer from bg worker */
 static int
-process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
+process_answer_detail(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len,
+       ub_val_callback_t* cb, void** cbarg, int* err,
+       struct ub_val_result** res)
 {
-       int err;
        struct ctx_query* q;
-       ub_val_callback_t cb;
-       void* cbarg;
-       struct ub_val_result* res;
        if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) {
                log_err("error: bad data from bg worker %d",
                        (int)context_serial_getcmd(msg, len));
@@ -374,7 +347,7 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
        }
 
        lock_basic_lock(&ctx->cfglock);
-       q = context_deserialize_answer(ctx, msg, len, &err);
+       q = context_deserialize_answer(ctx, msg, len, err);
        if(!q) {
                lock_basic_unlock(&ctx->cfglock);
                /* probably simply the lookup that failed, i.e.
@@ -384,22 +357,27 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
        log_assert(q->async);
 
        /* grab cb while locked */
-       cb = q->cb;
-       cbarg = q->cb_arg;
-       if(err) {
-               res = NULL;
+       if(q->cancelled) {
+               *cb = NULL;
+               *cbarg = NULL;
+       } else {
+               *cb = q->cb;
+               *cbarg = q->cb_arg;
+       }
+       if(*err) {
+               *res = NULL;
                ub_val_result_free(q->res);
        } else {
                /* parse the message, extract rcode, fill result */
                ldns_buffer* buf = ldns_buffer_new(q->msg_len);
                struct regional* region = regional_create();
-               res = q->res;
-               res->rcode = LDNS_RCODE_SERVFAIL;
+               *res = q->res;
+               (*res)->rcode = LDNS_RCODE_SERVFAIL;
                if(region && buf) {
                        ldns_buffer_clear(buf);
                        ldns_buffer_write(buf, q->msg, q->msg_len);
                        ldns_buffer_flip(buf);
-                       libworker_enter_result(res, buf, region,
+                       libworker_enter_result(*res, buf, region,
                                q->msg_security);
                }
                ldns_buffer_free(buf);
@@ -412,20 +390,38 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
        context_query_delete(q);
        lock_basic_unlock(&ctx->cfglock);
 
+       if(*cb) return 2;
+       return 1;
+}
+
+/** process answer from bg worker */
+static int
+process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
+{
+       int err;
+       ub_val_callback_t cb;
+       void* cbarg;
+       struct ub_val_result* res;
+       int r;
+
+       r = process_answer_detail(ctx, msg, len, &cb, &cbarg, &err, &res);
+
        /* no locks held while calling callback, so that library is
         * re-entrant. */
-       (*cb)(cbarg, err, res);
+       if(r == 2)
+               (*cb)(cbarg, err, res);
 
-       return 1;
+       return r;
 }
 
 int 
 ub_val_process(struct ub_val_ctx* ctx)
 {
        int r;
-       uint8_t* msg = NULL;
-       uint32_t len = 0;
+       uint8_t* msg;
+       uint32_t len;
        while(1) {
+               msg = NULL;
                lock_basic_lock(&ctx->rrpipe_lock);
                r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
                lock_basic_unlock(&ctx->rrpipe_lock);
@@ -442,6 +438,59 @@ ub_val_process(struct ub_val_ctx* ctx)
        return UB_NOERROR;
 }
 
+int 
+ub_val_wait(struct ub_val_ctx* ctx)
+{
+       int err;
+       ub_val_callback_t cb;
+       void* cbarg;
+       struct ub_val_result* res;
+       int r;
+       uint8_t* msg;
+       uint32_t len;
+       /* this is basically the same loop as _process(), but with changes.
+        * holds the rrpipe lock and waits with pollit */
+       while(1) {
+               lock_basic_lock(&ctx->rrpipe_lock);
+               lock_basic_lock(&ctx->cfglock);
+               if(ctx->num_async == 0) {
+                       lock_basic_unlock(&ctx->cfglock);
+                       lock_basic_unlock(&ctx->rrpipe_lock);
+                       break;
+               }
+               lock_basic_unlock(&ctx->cfglock);
+
+               /* keep rrpipe locked, while
+                *      o waiting for pipe readable
+                *      o parsing message
+                *      o possibly decrementing num_async
+                * do callback without lock
+                */
+               r = pollit(ctx, NULL);
+               if(r) {
+                       r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+                       if(r == 0) {
+                               lock_basic_unlock(&ctx->rrpipe_lock);
+                               return UB_PIPE;
+                       }
+                       if(r == -1) {
+                               lock_basic_unlock(&ctx->rrpipe_lock);
+                               continue;
+                       }
+                       r = process_answer_detail(ctx, msg, len, 
+                               &cb, &cbarg, &err, &res);
+                       lock_basic_unlock(&ctx->rrpipe_lock);
+                       if(r == 0)
+                               return UB_PIPE;
+                       if(r == 2)
+                               (*cb)(cbarg, err, res);
+               } else {
+                       lock_basic_unlock(&ctx->rrpipe_lock);
+               }
+       }
+       return UB_NOERROR;
+}
+
 int 
 ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype, 
        int rrclass, struct ub_val_result** result)
@@ -491,7 +540,8 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
        uint8_t* msg = NULL;
        uint32_t len = 0;
 
-       *async_id = 0;
+       if(async_id)
+               *async_id = 0;
        lock_basic_lock(&ctx->cfglock);
        if(!ctx->finalized) {
                int r = context_finalize(ctx);
@@ -528,7 +578,8 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
                lock_basic_unlock(&ctx->cfglock);
                return UB_NOMEM;
        }
-       *async_id = q->querynum;
+       if(async_id)
+               *async_id = q->querynum;
        lock_basic_unlock(&ctx->cfglock);
        
        lock_basic_lock(&ctx->qqpipe_lock);
@@ -557,6 +608,7 @@ ub_val_cancel(struct ub_val_ctx* ctx, int async_id)
                lock_basic_unlock(&ctx->cfglock);
                return UB_NOMEM;
        }
+       q->cancelled = 1;
        
        /* delete it */
        if(!ctx->dothread) { /* if forked */
@@ -722,7 +774,6 @@ ub_val_ctx_resolvconf(struct ub_val_ctx* ctx, char* fname)
        fclose(in);
        if(numserv == 0) {
                /* from resolv.conf(5) if none given, use localhost */
-               log_info("resconf: no nameservers, using localhost");
                return ub_val_ctx_set_fwd(ctx, "127.0.0.1");
        }
        return UB_NOERROR;
index 4af41b1982cbf6d187d0567e4bd9781f1a390524..c387b7f748a83f6465930dbcc12308b656ae6169 100644 (file)
@@ -125,11 +125,16 @@ libworker_setup(struct ub_val_ctx* ctx)
        seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
                (((unsigned int)w->thread_num)<<17);
        seed ^= (unsigned int)w->env->alloc->next_id;
+       lock_basic_lock(&ctx->cfglock);
+       /* Openssl RAND_... functions are not as threadsafe as documented,
+        * put a lock around them. */
        if(!ub_initstate(seed, w->env->rnd, RND_STATE_SIZE)) {
+               lock_basic_unlock(&ctx->cfglock);
                seed = 0;
                libworker_delete(w);
                return NULL;
        }
+       lock_basic_unlock(&ctx->cfglock);
        seed = 0;
 
        w->base = comm_base_create();
@@ -308,6 +313,9 @@ int libworker_bg(struct ub_val_ctx* ctx)
                lock_basic_unlock(&ctx->cfglock);
                w = libworker_setup(ctx);
                w->is_bg_thread = 1;
+#ifdef ENABLE_LOCK_CHECKS
+               w->thread_num = 1; /* for nicer DEBUG checklocks */
+#endif
                if(!w) return UB_NOMEM;
                ub_thread_create(&ctx->bg_tid, libworker_dobg, w);
        } else {
@@ -762,11 +770,13 @@ libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
        if(r != (ssize_t)sizeof(len)) {
                if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
                        log_err("msg write failed: %s", strerror(errno));
+                       (void)fd_set_nonblock(fd);
                        return 0;
                }
        }
        if(write(fd, buf, len) == -1) {
                log_err("msg write failed: %s", strerror(errno));
+               (void)fd_set_nonblock(fd);
                return 0;
        }
        if(!fd_set_nonblock(fd))
@@ -798,22 +808,29 @@ libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
        if(r != (ssize_t)sizeof(*len)) {
                if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
                        log_err("msg read failed: %s", strerror(errno));
+                       (void)fd_set_nonblock(fd);
                        return 0;
                }
-               if(r == 0) /* EOF */
+               if(r == 0) /* EOF */ {
+                       (void)fd_set_nonblock(fd);
                        return 0;
+               }
        }
        *buf = (uint8_t*)malloc(*len);
        if(!*buf) {
                log_err("out of memory");
+               (void)fd_set_nonblock(fd);
                return 0;
        }
        if((r=read(fd, *buf, *len)) == -1) {
                log_err("msg read failed: %s", strerror(errno));
+               (void)fd_set_nonblock(fd);
                return 0;
        }
-       if(r == 0) /* EOF */
+       if(r == 0) { /* EOF */
+               (void)fd_set_nonblock(fd);
                return 0;
+       }
        if(!fd_set_nonblock(fd))
                return 0;
        return 1;
index ec8d594980c472098f99516f0c538a59a241093b..ad8de9546d94f49ace1a32f6ad83aabdcaf32335 100644 (file)
@@ -43,6 +43,7 @@
 #include "config.h"
 #include "libunbound/unbound.h"
 #include "util/locks.h"
+#include "util/log.h"
 
 /**
  * result list for the lookups
@@ -73,6 +74,7 @@ void usage(char* argv[])
        printf("        -h : this help message\n");
        printf("        -r fname : read resolv.conf from fname\n");
        printf("        -t : use a resolver thread instead of forking a process\n");
+       printf("        -x : perform extended threaded test\n");
        exit(1);
 }
 
@@ -104,7 +106,8 @@ print_result(struct lookinfo* info)
 }
 
 /** this is a function of type ub_val_callback_t */
-void lookup_is_done(void* mydata, int err, struct ub_val_result* result)
+static void 
+lookup_is_done(void* mydata, int err, struct ub_val_result* result)
 {
        /* cast mydata back to the correct type */
        struct lookinfo* info = (struct lookinfo*)mydata;
@@ -116,7 +119,8 @@ void lookup_is_done(void* mydata, int err, struct ub_val_result* result)
 }
 
 /** check error, if bad, exit with error message */
-static void checkerr(const char* desc, int err)
+static void 
+checkerr(const char* desc, int err)
 {
        if(err != 0) {
                printf("%s error: %s\n", desc, ub_val_strerror(err));
@@ -124,6 +128,129 @@ static void checkerr(const char* desc, int err)
        }
 }
 
+/** number of threads to make in extended test */
+#define NUMTHR 10
+
+/** struct for extended thread info */
+struct ext_thr_info {
+       /** thread num for debug */
+       int thread_num;
+       /** thread id */
+       ub_thread_t tid;
+       /** context */
+       struct ub_val_ctx* ctx;
+       /** size of array to query */
+       int argc;
+       /** array of names to query */
+       char** argv;
+       /** number of queries to do */
+       int numq;
+};
+
+/** extended bg result callback, this function is ub_val_callback_t */
+static void 
+ext_callback(void* mydata, int err, struct ub_val_result* result)
+{
+       int* my_id = (int*)mydata;
+       int doprint = 0;
+       if(my_id) {
+               /* I have an id, make sure we are not cancelled */
+               if(*my_id == 0) {
+                       printf("error: query returned, but was cancelled\n");
+                       exit(1);
+               }
+               if(doprint) 
+                       printf("cb %d: ", *my_id);
+       }
+       checkerr("ext_callback", err);
+       log_assert(result);
+       if(doprint) {
+               struct lookinfo pi;
+               pi.name = result?result->qname:"noname";
+               pi.result = result;
+               pi.err = 0;
+               print_result(&pi);
+       }
+       ub_val_result_free(result);
+}
+
+/** extended thread worker */
+static void*
+ext_thread(void* arg)
+{
+       struct ext_thr_info* inf = (struct ext_thr_info*)arg;
+       int i, r;
+       struct ub_val_result* result;
+       int* async_ids = NULL;
+       log_thread_set(&inf->thread_num);
+       if(inf->thread_num > NUMTHR*2/3) {
+               async_ids = (int*)calloc((size_t)inf->numq, sizeof(int));
+               if(!async_ids) {
+                       printf("out of memory\n");
+                       exit(1);
+               }
+       }
+       for(i=0; i<inf->numq; i++) {
+               if(async_ids) {
+                       r = ub_val_resolve_async(inf->ctx, 
+                               inf->argv[i%inf->argc], LDNS_RR_TYPE_A, 
+                               LDNS_RR_CLASS_IN, &async_ids[i], ext_callback, 
+                               &async_ids[i]);
+                       checkerr("ub_val_resolve_async", r);
+                       if(i > 100) {
+                               r = ub_val_cancel(inf->ctx, async_ids[i-100]);
+                               checkerr("ub_val_cancel", r);
+                               async_ids[i-100]=0;
+                       }
+               } else if(inf->thread_num > NUMTHR/2) {
+                       /* async */
+                       r = ub_val_resolve_async(inf->ctx, 
+                               inf->argv[i%inf->argc], LDNS_RR_TYPE_A, 
+                               LDNS_RR_CLASS_IN, NULL, ext_callback, NULL);
+                       checkerr("ub_val_resolve_async", r);
+               } else  {
+                       /* blocking */
+                       r = ub_val_resolve(inf->ctx, inf->argv[i%inf->argc], 
+                               LDNS_RR_TYPE_A, LDNS_RR_CLASS_IN, &result);
+                       checkerr("ub_val_resolve", r);
+               }
+       }
+       if(inf->thread_num > NUMTHR/2) {
+               r = ub_val_wait(inf->ctx);
+               checkerr("ub_val_ctx_wait", r);
+       }
+       free(async_ids);
+       
+       return NULL;
+}
+
+/** perform extended threaded test */
+static int
+ext_test(struct ub_val_ctx* ctx, int argc, char** argv)
+{
+       struct ext_thr_info inf[NUMTHR];
+       int i;
+       printf("extended test start (%d threads)\n", NUMTHR);
+       for(i=0; i<NUMTHR; i++) {
+               /* 0 = this, 1 = library bg worker */
+               inf[i].thread_num = i+2;
+               inf[i].ctx = ctx;
+               inf[i].argc = argc;
+               inf[i].argv = argv;
+               inf[i].numq = 1000;
+               ub_thread_create(&inf[i].tid, ext_thread, &inf[i]);
+       }
+       /* the work happens here */
+       for(i=0; i<NUMTHR; i++) {
+               ub_thread_join(inf[i].tid);
+       }
+       printf("extended test end\n");
+       ub_val_ctx_delete(ctx);
+       sleep(1); /* give bg thread time to exit */
+       checklock_stop();
+       return 0;
+}
+
 /** getopt global, in case header files fail to declare it. */
 extern int optind;
 /** getopt global, in case header files fail to declare it. */
@@ -135,7 +262,7 @@ int main(int argc, char** argv)
        int c;
        struct ub_val_ctx* ctx;
        struct lookinfo* lookups;
-       int i, r, cancel=0, blocking=0;
+       int i, r, cancel=0, blocking=0, ext=0;
 
        /* lock debug start (if any) */
        checklock_start();
@@ -151,7 +278,7 @@ int main(int argc, char** argv)
        if(argc == 1) {
                usage(argv);
        }
-       while( (c=getopt(argc, argv, "bcdf:hr:t")) != -1) {
+       while( (c=getopt(argc, argv, "bcdf:hr:tx")) != -1) {
                switch(c) {
                        case 'd':
                                r = ub_val_ctx_debuglevel(ctx, 3);
@@ -181,6 +308,9 @@ int main(int argc, char** argv)
                                r = ub_val_ctx_set_fwd(ctx, optarg);
                                checkerr("ub_val_ctx_set_fwd", r);
                                break;
+                       case 'x':
+                               ext = 1;
+                               break;
                        case 'h':
                        case '?':
                        default:
@@ -190,6 +320,9 @@ int main(int argc, char** argv)
        argc -= optind;
        argv += optind;
 
+       if(ext)
+               return ext_test(ctx, argc, argv);
+
        /* allocate array for results. */
        lookups = (struct lookinfo*)calloc((size_t)argc, 
                sizeof(struct lookinfo));