- please doxygen, put doxygen comment in one place.
- asynclook -b blocking mode and test.
- refactor asynclook, nicer code.
+ - fixup race problems from opensll in rand init from library, with
+ a mutex around the rand init.
+ - fix pass async_id=NULL to _async resolve().
+ - rewrote _wait() routine, so that it is threadsafe.
+ - cancelation is threadsafe.
24 January 2008: Wouter
- tested the cancel() function.
ub_val_poll(struct ub_val_ctx* ctx)
{
struct timeval t;
- int r;
memset(&t, 0, sizeof(t));
- lock_basic_lock(&ctx->rrpipe_lock);
- r = pollit(ctx, &t);
- lock_basic_unlock(&ctx->rrpipe_lock);
- return r;
-}
-
-int
-ub_val_wait(struct ub_val_ctx* ctx)
-{
- int r;
- lock_basic_lock(&ctx->cfglock);
- while(ctx->num_async > 0) {
- lock_basic_unlock(&ctx->cfglock);
- lock_basic_lock(&ctx->rrpipe_lock);
- r = pollit(ctx, NULL);
- lock_basic_unlock(&ctx->rrpipe_lock);
- if(r)
- ub_val_process(ctx);
- lock_basic_lock(&ctx->cfglock);
- }
- lock_basic_unlock(&ctx->cfglock);
- return UB_NOERROR;
+ /* no need to hold lock while testing for readability. */
+ return pollit(ctx, &t);
}
int
ub_val_fd(struct ub_val_ctx* ctx)
{
- int fd;
- lock_basic_lock(&ctx->rrpipe_lock);
- fd = ctx->rrpipe[0];
- lock_basic_unlock(&ctx->rrpipe_lock);
- return fd;
+ return ctx->rrpipe[0];
}
/** process answer from bg worker */
static int
-process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
+process_answer_detail(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len,
+ ub_val_callback_t* cb, void** cbarg, int* err,
+ struct ub_val_result** res)
{
- int err;
struct ctx_query* q;
- ub_val_callback_t cb;
- void* cbarg;
- struct ub_val_result* res;
if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) {
log_err("error: bad data from bg worker %d",
(int)context_serial_getcmd(msg, len));
}
lock_basic_lock(&ctx->cfglock);
- q = context_deserialize_answer(ctx, msg, len, &err);
+ q = context_deserialize_answer(ctx, msg, len, err);
if(!q) {
lock_basic_unlock(&ctx->cfglock);
/* probably simply the lookup that failed, i.e.
log_assert(q->async);
/* grab cb while locked */
- cb = q->cb;
- cbarg = q->cb_arg;
- if(err) {
- res = NULL;
+ if(q->cancelled) {
+ *cb = NULL;
+ *cbarg = NULL;
+ } else {
+ *cb = q->cb;
+ *cbarg = q->cb_arg;
+ }
+ if(*err) {
+ *res = NULL;
ub_val_result_free(q->res);
} else {
/* parse the message, extract rcode, fill result */
ldns_buffer* buf = ldns_buffer_new(q->msg_len);
struct regional* region = regional_create();
- res = q->res;
- res->rcode = LDNS_RCODE_SERVFAIL;
+ *res = q->res;
+ (*res)->rcode = LDNS_RCODE_SERVFAIL;
if(region && buf) {
ldns_buffer_clear(buf);
ldns_buffer_write(buf, q->msg, q->msg_len);
ldns_buffer_flip(buf);
- libworker_enter_result(res, buf, region,
+ libworker_enter_result(*res, buf, region,
q->msg_security);
}
ldns_buffer_free(buf);
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
+ if(*cb) return 2;
+ return 1;
+}
+
+/** process answer from bg worker */
+static int
+process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
+{
+ int err;
+ ub_val_callback_t cb;
+ void* cbarg;
+ struct ub_val_result* res;
+ int r;
+
+ r = process_answer_detail(ctx, msg, len, &cb, &cbarg, &err, &res);
+
/* no locks held while calling callback, so that library is
* re-entrant. */
- (*cb)(cbarg, err, res);
+ if(r == 2)
+ (*cb)(cbarg, err, res);
- return 1;
+ return r;
}
int
ub_val_process(struct ub_val_ctx* ctx)
{
int r;
- uint8_t* msg = NULL;
- uint32_t len = 0;
+ uint8_t* msg;
+ uint32_t len;
while(1) {
+ msg = NULL;
lock_basic_lock(&ctx->rrpipe_lock);
r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
lock_basic_unlock(&ctx->rrpipe_lock);
return UB_NOERROR;
}
+int
+ub_val_wait(struct ub_val_ctx* ctx)
+{
+ int err;
+ ub_val_callback_t cb;
+ void* cbarg;
+ struct ub_val_result* res;
+ int r;
+ uint8_t* msg;
+ uint32_t len;
+ /* this is basically the same loop as _process(), but with changes.
+ * holds the rrpipe lock and waits with pollit */
+ while(1) {
+ lock_basic_lock(&ctx->rrpipe_lock);
+ lock_basic_lock(&ctx->cfglock);
+ if(ctx->num_async == 0) {
+ lock_basic_unlock(&ctx->cfglock);
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ break;
+ }
+ lock_basic_unlock(&ctx->cfglock);
+
+ /* keep rrpipe locked, while
+ * o waiting for pipe readable
+ * o parsing message
+ * o possibly decrementing num_async
+ * do callback without lock
+ */
+ r = pollit(ctx, NULL);
+ if(r) {
+ r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+ if(r == 0) {
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ return UB_PIPE;
+ }
+ if(r == -1) {
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ continue;
+ }
+ r = process_answer_detail(ctx, msg, len,
+ &cb, &cbarg, &err, &res);
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ if(r == 0)
+ return UB_PIPE;
+ if(r == 2)
+ (*cb)(cbarg, err, res);
+ } else {
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ }
+ }
+ return UB_NOERROR;
+}
+
int
ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
int rrclass, struct ub_val_result** result)
uint8_t* msg = NULL;
uint32_t len = 0;
- *async_id = 0;
+ if(async_id)
+ *async_id = 0;
lock_basic_lock(&ctx->cfglock);
if(!ctx->finalized) {
int r = context_finalize(ctx);
lock_basic_unlock(&ctx->cfglock);
return UB_NOMEM;
}
- *async_id = q->querynum;
+ if(async_id)
+ *async_id = q->querynum;
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock);
lock_basic_unlock(&ctx->cfglock);
return UB_NOMEM;
}
+ q->cancelled = 1;
/* delete it */
if(!ctx->dothread) { /* if forked */
fclose(in);
if(numserv == 0) {
/* from resolv.conf(5) if none given, use localhost */
- log_info("resconf: no nameservers, using localhost");
return ub_val_ctx_set_fwd(ctx, "127.0.0.1");
}
return UB_NOERROR;
seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
(((unsigned int)w->thread_num)<<17);
seed ^= (unsigned int)w->env->alloc->next_id;
+ lock_basic_lock(&ctx->cfglock);
+ /* Openssl RAND_... functions are not as threadsafe as documented,
+ * put a lock around them. */
if(!ub_initstate(seed, w->env->rnd, RND_STATE_SIZE)) {
+ lock_basic_unlock(&ctx->cfglock);
seed = 0;
libworker_delete(w);
return NULL;
}
+ lock_basic_unlock(&ctx->cfglock);
seed = 0;
w->base = comm_base_create();
lock_basic_unlock(&ctx->cfglock);
w = libworker_setup(ctx);
w->is_bg_thread = 1;
+#ifdef ENABLE_LOCK_CHECKS
+ w->thread_num = 1; /* for nicer DEBUG checklocks */
+#endif
if(!w) return UB_NOMEM;
ub_thread_create(&ctx->bg_tid, libworker_dobg, w);
} else {
if(r != (ssize_t)sizeof(len)) {
if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
log_err("msg write failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
return 0;
}
}
if(write(fd, buf, len) == -1) {
log_err("msg write failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
return 0;
}
if(!fd_set_nonblock(fd))
if(r != (ssize_t)sizeof(*len)) {
if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
log_err("msg read failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
return 0;
}
- if(r == 0) /* EOF */
+ if(r == 0) /* EOF */ {
+ (void)fd_set_nonblock(fd);
return 0;
+ }
}
*buf = (uint8_t*)malloc(*len);
if(!*buf) {
log_err("out of memory");
+ (void)fd_set_nonblock(fd);
return 0;
}
if((r=read(fd, *buf, *len)) == -1) {
log_err("msg read failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
return 0;
}
- if(r == 0) /* EOF */
+ if(r == 0) { /* EOF */
+ (void)fd_set_nonblock(fd);
return 0;
+ }
if(!fd_set_nonblock(fd))
return 0;
return 1;
#include "config.h"
#include "libunbound/unbound.h"
#include "util/locks.h"
+#include "util/log.h"
/**
* result list for the lookups
printf(" -h : this help message\n");
printf(" -r fname : read resolv.conf from fname\n");
printf(" -t : use a resolver thread instead of forking a process\n");
+ printf(" -x : perform extended threaded test\n");
exit(1);
}
}
/** this is a function of type ub_val_callback_t */
-void lookup_is_done(void* mydata, int err, struct ub_val_result* result)
+static void
+lookup_is_done(void* mydata, int err, struct ub_val_result* result)
{
/* cast mydata back to the correct type */
struct lookinfo* info = (struct lookinfo*)mydata;
}
/** check error, if bad, exit with error message */
-static void checkerr(const char* desc, int err)
+static void
+checkerr(const char* desc, int err)
{
if(err != 0) {
printf("%s error: %s\n", desc, ub_val_strerror(err));
}
}
+/** number of threads to make in extended test */
+#define NUMTHR 10
+
+/** struct for extended thread info */
+struct ext_thr_info {
+ /** thread num for debug */
+ int thread_num;
+ /** thread id */
+ ub_thread_t tid;
+ /** context */
+ struct ub_val_ctx* ctx;
+ /** size of array to query */
+ int argc;
+ /** array of names to query */
+ char** argv;
+ /** number of queries to do */
+ int numq;
+};
+
+/** extended bg result callback, this function is ub_val_callback_t */
+static void
+ext_callback(void* mydata, int err, struct ub_val_result* result)
+{
+ int* my_id = (int*)mydata;
+ int doprint = 0;
+ if(my_id) {
+ /* I have an id, make sure we are not cancelled */
+ if(*my_id == 0) {
+ printf("error: query returned, but was cancelled\n");
+ exit(1);
+ }
+ if(doprint)
+ printf("cb %d: ", *my_id);
+ }
+ checkerr("ext_callback", err);
+ log_assert(result);
+ if(doprint) {
+ struct lookinfo pi;
+ pi.name = result?result->qname:"noname";
+ pi.result = result;
+ pi.err = 0;
+ print_result(&pi);
+ }
+ ub_val_result_free(result);
+}
+
+/** extended thread worker */
+static void*
+ext_thread(void* arg)
+{
+ struct ext_thr_info* inf = (struct ext_thr_info*)arg;
+ int i, r;
+ struct ub_val_result* result;
+ int* async_ids = NULL;
+ log_thread_set(&inf->thread_num);
+ if(inf->thread_num > NUMTHR*2/3) {
+ async_ids = (int*)calloc((size_t)inf->numq, sizeof(int));
+ if(!async_ids) {
+ printf("out of memory\n");
+ exit(1);
+ }
+ }
+ for(i=0; i<inf->numq; i++) {
+ if(async_ids) {
+ r = ub_val_resolve_async(inf->ctx,
+ inf->argv[i%inf->argc], LDNS_RR_TYPE_A,
+ LDNS_RR_CLASS_IN, &async_ids[i], ext_callback,
+ &async_ids[i]);
+ checkerr("ub_val_resolve_async", r);
+ if(i > 100) {
+ r = ub_val_cancel(inf->ctx, async_ids[i-100]);
+ checkerr("ub_val_cancel", r);
+ async_ids[i-100]=0;
+ }
+ } else if(inf->thread_num > NUMTHR/2) {
+ /* async */
+ r = ub_val_resolve_async(inf->ctx,
+ inf->argv[i%inf->argc], LDNS_RR_TYPE_A,
+ LDNS_RR_CLASS_IN, NULL, ext_callback, NULL);
+ checkerr("ub_val_resolve_async", r);
+ } else {
+ /* blocking */
+ r = ub_val_resolve(inf->ctx, inf->argv[i%inf->argc],
+ LDNS_RR_TYPE_A, LDNS_RR_CLASS_IN, &result);
+ checkerr("ub_val_resolve", r);
+ }
+ }
+ if(inf->thread_num > NUMTHR/2) {
+ r = ub_val_wait(inf->ctx);
+ checkerr("ub_val_ctx_wait", r);
+ }
+ free(async_ids);
+
+ return NULL;
+}
+
+/** perform extended threaded test */
+static int
+ext_test(struct ub_val_ctx* ctx, int argc, char** argv)
+{
+ struct ext_thr_info inf[NUMTHR];
+ int i;
+ printf("extended test start (%d threads)\n", NUMTHR);
+ for(i=0; i<NUMTHR; i++) {
+ /* 0 = this, 1 = library bg worker */
+ inf[i].thread_num = i+2;
+ inf[i].ctx = ctx;
+ inf[i].argc = argc;
+ inf[i].argv = argv;
+ inf[i].numq = 1000;
+ ub_thread_create(&inf[i].tid, ext_thread, &inf[i]);
+ }
+ /* the work happens here */
+ for(i=0; i<NUMTHR; i++) {
+ ub_thread_join(inf[i].tid);
+ }
+ printf("extended test end\n");
+ ub_val_ctx_delete(ctx);
+ sleep(1); /* give bg thread time to exit */
+ checklock_stop();
+ return 0;
+}
+
/** getopt global, in case header files fail to declare it. */
extern int optind;
/** getopt global, in case header files fail to declare it. */
int c;
struct ub_val_ctx* ctx;
struct lookinfo* lookups;
- int i, r, cancel=0, blocking=0;
+ int i, r, cancel=0, blocking=0, ext=0;
/* lock debug start (if any) */
checklock_start();
if(argc == 1) {
usage(argv);
}
- while( (c=getopt(argc, argv, "bcdf:hr:t")) != -1) {
+ while( (c=getopt(argc, argv, "bcdf:hr:tx")) != -1) {
switch(c) {
case 'd':
r = ub_val_ctx_debuglevel(ctx, 3);
r = ub_val_ctx_set_fwd(ctx, optarg);
checkerr("ub_val_ctx_set_fwd", r);
break;
+ case 'x':
+ ext = 1;
+ break;
case 'h':
case '?':
default:
argc -= optind;
argv += optind;
+ if(ext)
+ return ext_test(ctx, argc, argv);
+
/* allocate array for results. */
lookups = (struct lookinfo*)calloc((size_t)argc,
sizeof(struct lookinfo));