From: Wouter Wijngaards Date: Wed, 23 Jan 2008 15:15:37 +0000 (+0000) Subject: bg resolution works. X-Git-Tag: release-0.9~50 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e56a3a60efd3ed45a7c4f1059314176cb92a566f;p=thirdparty%2Funbound.git bg resolution works. git-svn-id: file:///svn/unbound/trunk@890 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/doc/Changelog b/doc/Changelog index c42077542..3a243bc58 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,5 +1,9 @@ 23 January 2008: Wouter - removed debug prints from if-auto, verb-algo enables some. + - libunbound QUIT setup, remove memory leaks, when using threads + will share memory for passing results instead of writing it over + the pipe, only writes ID number over the pipe (towards the handler + thread that does process() ). 22 January 2008: Wouter - library code for async in libunbound/unbound.c. diff --git a/libunbound/context.c b/libunbound/context.c index 965a967a9..c1436121f 100644 --- a/libunbound/context.c +++ b/libunbound/context.c @@ -250,6 +250,24 @@ context_deserialize_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len) return q; } +struct ctx_query* +context_lookup_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len) +{ + struct ctx_query* q; + int querynum; + if(len < 4*sizeof(uint32_t)+1) { + return NULL; + } + log_assert( ldns_read_uint32(p) == UB_LIBCMD_NEWQUERY); + querynum = (int)ldns_read_uint32(p+sizeof(uint32_t)); + q = (struct ctx_query*)rbtree_search(&ctx->queries, &querynum); + if(!q) { + return NULL; + } + log_assert(q->async); + return q; +} + uint8_t* context_serialize_answer(struct ctx_query* q, int err, ldns_buffer* pkt, uint32_t* len) @@ -298,11 +316,7 @@ context_deserialize_answer(struct ub_val_ctx* ctx, *err = UB_NOMEM; return q; } - } else { - q->msg_len = 0; - free(q->msg); - q->msg = NULL; - } + } return q; } diff --git a/libunbound/context.h b/libunbound/context.h index 6a57c32ca..e1e8819ec 100644 --- a/libunbound/context.h +++ b/libunbound/context.h @@ -46,6 +46,7 @@ #include "services/modstack.h" #include "libunbound/unbound.h" #include "util/data/packed_rrset.h" +struct libworker; /** * The context structure @@ -130,7 +131,7 @@ struct ctx_query { int querynum; /** was this an async query? */ int async; - /** has this query been cancelled? (for bg thread) */ + /** was this query cancelled (for bg worker) */ int cancelled; /** for async query, the callback function */ @@ -144,6 +145,8 @@ struct ctx_query { size_t msg_len; /** validation status on security */ enum sec_status msg_security; + /** store libworker that is handling this query */ + struct libworker* w; /** result structure, also contains original query, type, class. * malloced ptr ready to hand to the client. */ @@ -284,6 +287,16 @@ uint8_t* context_serialize_quit(uint32_t* len); */ enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len); +/** + * Lookup query from new_query buffer. + * @param ctx: context + * @param p: buffer serialized. + * @param len: length of buffer. + * @return looked up ctx_query or NULL for malloc failure. + */ +struct ctx_query* context_lookup_new_query(struct ub_val_ctx* ctx, + uint8_t* p, uint32_t len); + /** * Deserialize a new_query buffer. * @param ctx: context diff --git a/libunbound/unbound.c b/libunbound/unbound.c index 9e1de576e..1bec20d17 100644 --- a/libunbound/unbound.c +++ b/libunbound/unbound.c @@ -118,6 +118,30 @@ ub_val_ctx_delete(struct ub_val_ctx* ctx) { struct alloc_cache* a, *na; if(!ctx) return; + /* stop the bg thread */ + lock_basic_lock(&ctx->cfglock); + if(ctx->created_bg) { + uint8_t* msg; + uint32_t len; + uint32_t cmd = UB_LIBCMD_QUIT; + lock_basic_unlock(&ctx->cfglock); + lock_basic_lock(&ctx->qqpipe_lock); + (void)libworker_write_msg(ctx->qqpipe[1], (uint8_t*)&cmd, + (uint32_t)sizeof(cmd), 0); + lock_basic_unlock(&ctx->qqpipe_lock); + lock_basic_lock(&ctx->rrpipe_lock); + while(libworker_read_msg(ctx->rrpipe[0], &msg, &len, 0)) { + /* discard all results except a quit confirm */ + if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) { + free(msg); + break; + } + free(msg); + } + lock_basic_unlock(&ctx->rrpipe_lock); + } + else lock_basic_unlock(&ctx->cfglock); + modstack_desetup(&ctx->mods, ctx->env); a = ctx->alloc_list; while(a) { @@ -131,10 +155,18 @@ ub_val_ctx_delete(struct ub_val_ctx* ctx) lock_basic_destroy(&ctx->qqpipe_lock); lock_basic_destroy(&ctx->rrpipe_lock); lock_basic_destroy(&ctx->cfglock); - close(ctx->qqpipe[0]); - close(ctx->qqpipe[1]); - close(ctx->rrpipe[0]); - close(ctx->rrpipe[1]); + if(ctx->qqpipe[0] != -1) + close(ctx->qqpipe[0]); + if(ctx->qqpipe[1] != -1) + close(ctx->qqpipe[1]); + if(ctx->rrpipe[0] != -1) + close(ctx->rrpipe[0]); + if(ctx->rrpipe[1] != -1) + close(ctx->rrpipe[1]); + ctx->qqpipe[0] = -1; + ctx->qqpipe[1] = -1; + ctx->rrpipe[0] = -1; + ctx->rrpipe[1] = -1; if(ctx->env) { slabhash_delete(ctx->env->msg_cache); rrset_cache_delete(ctx->env->rrset_cache); @@ -233,6 +265,10 @@ ub_val_ctx_debuglevel(struct ub_val_ctx* ctx, int d) int ub_val_ctx_async(struct ub_val_ctx* ctx, int dothread) { +#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS) + if(dothread) /* cannot do threading */ + return UB_NOERROR; +#endif lock_basic_lock(&ctx->cfglock); if(ctx->finalized) { lock_basic_unlock(&ctx->cfglock); @@ -406,14 +442,19 @@ ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype, r = libworker_fg(ctx, q); if(r) { + lock_basic_lock(&ctx->cfglock); + (void)rbtree_delete(&ctx->queries, q->node.key); context_query_delete(q); + lock_basic_unlock(&ctx->cfglock); return r; } *result = q->res; q->res = NULL; + lock_basic_lock(&ctx->cfglock); (void)rbtree_delete(&ctx->queries, q->node.key); context_query_delete(q); + lock_basic_unlock(&ctx->cfglock); return UB_NOERROR; } @@ -435,14 +476,17 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype, } } if(!ctx->created_bg) { - int r = libworker_bg(ctx); + int r; + ctx->created_bg = 1; + lock_basic_unlock(&ctx->cfglock); + r = libworker_bg(ctx); if(r) { + lock_basic_lock(&ctx->cfglock); + ctx->created_bg = 0; lock_basic_unlock(&ctx->cfglock); return r; } - ctx->created_bg = 1; - } - lock_basic_unlock(&ctx->cfglock); + } else lock_basic_unlock(&ctx->cfglock); /* create new ctx_query and attempt to add to the list */ q = context_new(ctx, name, rrtype, rrclass, callback, mydata); @@ -490,9 +534,11 @@ ub_val_cancel(struct ub_val_ctx* ctx, int async_id) } /* delete it */ - (void)rbtree_delete(&ctx->queries, q->node.key); - ctx->num_async--; - context_query_delete(q); + if(!ctx->dothread) { /* if forked */ + (void)rbtree_delete(&ctx->queries, q->node.key); + ctx->num_async--; + context_query_delete(q); + } lock_basic_unlock(&ctx->cfglock); /* send cancel to background worker */ diff --git a/libunbound/worker.c b/libunbound/worker.c index 3af59552f..9f35b6d8c 100644 --- a/libunbound/worker.c +++ b/libunbound/worker.c @@ -108,6 +108,7 @@ libworker_setup(struct ub_val_ctx* ctx) libworker_delete(w); return NULL; } + w->thread_num = w->env->alloc->thread_num; alloc_set_id_cleanup(w->env->alloc, &libworker_alloc_cleanup, w); w->env->scratch = regional_create_custom(cfg->msg_buffer_size); w->env->scratch_buffer = ldns_buffer_new(cfg->msg_buffer_size); @@ -169,6 +170,7 @@ handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len) return; } q->cancelled = 1; + free(buf); } /** handle control command coming into server */ @@ -190,7 +192,6 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg, if(r==-1) /* nothing to read now, try later */ return 0; - log_info("bg got cmd %d", (int)context_serial_getcmd(buf, len)); switch(context_serial_getcmd(buf, len)) { default: case UB_LIBCMD_ANSWER: @@ -198,6 +199,7 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg, (int)context_serial_getcmd(buf, len)); /* and fall through to quit */ case UB_LIBCMD_QUIT: + free(buf); comm_base_exit(w->base); break; case UB_LIBCMD_NEWQUERY: @@ -222,8 +224,6 @@ libworker_handle_result_write(struct comm_point* c, void* arg, comm_point_stop_listening(c); return 0; } - log_info("bg write msg %d", (int)context_serial_getcmd( - item->buf, item->len)); r = libworker_write_msg(c->fd, item->buf, item->len, 1); if(r == -1) return 0; /* try again later */ @@ -250,60 +250,85 @@ static void* libworker_dobg(void* arg) { /* setup */ - struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg; - struct libworker* w = libworker_setup(ctx); + uint32_t m; + int fd; + struct libworker* w = (struct libworker*)arg; + struct ub_val_ctx* ctx = w->ctx; log_thread_set(&w->thread_num); - log_info("start bg"); /* @@@ DEBUG */ - /*verbosity=3; @@@ DEBUG */ +#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS) + /* we are forked */ + w->is_bg_thread = 0; + /* close non-used parts of the pipes */ + if(ctx->qqpipe[1] != -1) { + close(ctx->qqpipe[1]); + ctx->qqpipe[1] = -1; + } + if(ctx->rrpipe[0] != -1) { + close(ctx->rrpipe[0]); + ctx->rrpipe[0] = -1; + } +#endif if(!w) { log_err("libunbound bg worker init failed, nomem"); return NULL; } - lock_basic_lock(&ctx->qqpipe_lock); if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0, libworker_handle_control_cmd, w))) { - lock_basic_unlock(&ctx->qqpipe_lock); log_err("libunbound bg worker init failed, no cmdcom"); return NULL; } - lock_basic_unlock(&ctx->qqpipe_lock); - lock_basic_lock(&ctx->rrpipe_lock); if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1, libworker_handle_result_write, w))) { - lock_basic_unlock(&ctx->qqpipe_lock); - log_err("libunbound bg worker init failed, no cmdcom"); + log_err("libunbound bg worker init failed, no rescom"); return NULL; } - lock_basic_unlock(&ctx->rrpipe_lock); /* do the work */ comm_base_dispatch(w->base); /* cleanup */ + fd = ctx->rrpipe[1]; + ctx->rrpipe[1] = -1; + m = UB_LIBCMD_QUIT; + close(ctx->qqpipe[0]); + ctx->qqpipe[0] = -1; libworker_delete(w); + (void)libworker_write_msg(fd, (uint8_t*)&m, (uint32_t)sizeof(m), 0); + close(fd); return NULL; } int libworker_bg(struct ub_val_ctx* ctx) { + struct libworker* w; /* fork or threadcreate */ + lock_basic_lock(&ctx->cfglock); if(ctx->dothread) { - ub_thread_create(&ctx->bg_tid, libworker_dobg, ctx); + lock_basic_unlock(&ctx->cfglock); + w = libworker_setup(ctx); + w->is_bg_thread = 1; + if(!w) return UB_NOMEM; + ub_thread_create(&ctx->bg_tid, libworker_dobg, w); } else { + lock_basic_unlock(&ctx->cfglock); switch((ctx->bg_pid=fork())) { case 0: - lock_basic_unlock(&ctx->cfglock); - (void)libworker_dobg(ctx); + w = libworker_setup(ctx); + if(!w) fatal_exit("out of memory"); + /* close non-used parts of the pipes */ + close(ctx->qqpipe[1]); + close(ctx->rrpipe[0]); + ctx->qqpipe[1] = -1; + ctx->rrpipe[0] = -1; + (void)libworker_dobg(w); exit(0); break; case -1: - lock_basic_unlock(&ctx->cfglock); return UB_FORKFAIL; default: break; } } - lock_basic_unlock(&ctx->cfglock); return UB_NOERROR; } @@ -383,6 +408,7 @@ libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf, res->rcode = LDNS_RCODE_SERVFAIL; rep = parse_reply(buf, temp, &rq); if(!rep) { + log_err("cannot parse buf"); return; /* error parsing buf, or out of memory */ } if(!fill_res(res, reply_find_answer_rrset(&rq, rep), @@ -404,27 +430,27 @@ libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf, static void libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s) { - struct libworker_cb_data* d = (struct libworker_cb_data*)arg; + struct ctx_query* q = (struct ctx_query*)arg; /* fg query is done; exit comm base */ - comm_base_exit(d->w->base); + comm_base_exit(q->w->base); if(rcode != 0) { - d->q->res->rcode = rcode; - d->q->msg_security = s; + q->res->rcode = rcode; + q->msg_security = s; return; } - d->q->res->rcode = LDNS_RCODE_SERVFAIL; - d->q->msg_security = 0; - d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf)); - d->q->msg_len = ldns_buffer_limit(buf); - if(!d->q->msg) { + q->res->rcode = LDNS_RCODE_SERVFAIL; + q->msg_security = 0; + q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf)); + q->msg_len = ldns_buffer_limit(buf); + if(!q->msg) { return; /* the error is in the rcode */ } /* canonname and results */ - d->q->msg_security = s; - libworker_enter_result(d->q->res, buf, d->w->env->scratch, s); + q->msg_security = s; + libworker_enter_result(q->res, buf, q->w->env->scratch, s); } /** setup qinfo and edns */ @@ -459,7 +485,6 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q) uint16_t qflags, qid; struct query_info qinfo; struct edns_data edns; - struct libworker_cb_data d; if(!w) return UB_INITFAIL; if(!setup_qinfo_edns(w, q, &qinfo, &edns)) { @@ -468,12 +493,11 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q) } qid = 0; qflags = BIT_RD; - d.q = q; - d.w = w; + q->w = w; /* see if there is a fixed answer */ if(local_zones_answer(ctx->local_zones, &qinfo, &edns, w->back->udp_buff, w->env->scratch)) { - libworker_fg_done_cb(&d, LDNS_RCODE_NOERROR, + libworker_fg_done_cb(q, LDNS_RCODE_NOERROR, w->back->udp_buff, sec_status_insecure); libworker_delete(w); free(qinfo.qname); @@ -481,7 +505,7 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q) } /* process new query */ if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns, - w->back->udp_buff, qid, libworker_fg_done_cb, &d)) { + w->back->udp_buff, qid, libworker_fg_done_cb, q)) { free(qinfo.qname); return UB_NOMEM; } @@ -504,9 +528,20 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt, struct libworker_res_list* item; /* serialize and delete unneeded q */ - msg = context_serialize_answer(q, err, pkt, &len); - (void)rbtree_delete(&w->ctx->queries, q->node.key); - context_query_delete(q); + if(w->is_bg_thread) { + lock_basic_lock(&w->ctx->cfglock); + q->msg_len = ldns_buffer_remaining(pkt); + q->msg = memdup(ldns_buffer_begin(pkt), q->msg_len); + if(!q->msg) + msg = context_serialize_answer(q, UB_NOMEM, NULL, &len); + else msg = context_serialize_answer(q, err, NULL, &len); + lock_basic_unlock(&w->ctx->cfglock); + } else { + msg = context_serialize_answer(q, err, pkt, &len); + (void)rbtree_delete(&w->ctx->queries, q->node.key); + w->ctx->num_async--; + context_query_delete(q); + } if(!msg) { log_err("out of memory for async answer"); @@ -536,13 +571,26 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt, static void libworker_bg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s) { - struct libworker_cb_data* d = (struct libworker_cb_data*)arg; - - d->q->msg_security = s; + struct ctx_query* q = (struct ctx_query*)arg; + + if(q->cancelled) { + if(q->w->is_bg_thread) { + /* delete it now */ + struct ub_val_ctx* ctx = q->w->ctx; + lock_basic_lock(&ctx->cfglock); + (void)rbtree_delete(&ctx->queries, q->node.key); + ctx->num_async--; + context_query_delete(q); + lock_basic_unlock(&ctx->cfglock); + } + /* cancelled, do not give answer */ + return; + } + q->msg_security = s; if(rcode != 0) { error_encode(buf, rcode, NULL, 0, BIT_RD, NULL); } - add_bg_result(d->w, d->q, buf, UB_NOERROR); + add_bg_result(q->w, q, buf, UB_NOERROR); } @@ -553,8 +601,13 @@ handle_newq(struct libworker* w, uint8_t* buf, uint32_t len) uint16_t qflags, qid; struct query_info qinfo; struct edns_data edns; - struct libworker_cb_data d; - struct ctx_query* q = context_deserialize_new_query(w->ctx, buf, len); + struct ctx_query* q; + if(w->is_bg_thread) { + q = context_lookup_new_query(w->ctx, buf, len); + } else { + q = context_deserialize_new_query(w->ctx, buf, len); + } + free(buf); if(!q) { log_err("failed to deserialize newq"); return; @@ -565,8 +618,6 @@ handle_newq(struct libworker* w, uint8_t* buf, uint32_t len) } qid = 0; qflags = BIT_RD; - d.q = q; - d.w = w; /* see if there is a fixed answer */ if(local_zones_answer(w->ctx->local_zones, &qinfo, &edns, w->back->udp_buff, w->env->scratch)) { @@ -575,9 +626,10 @@ handle_newq(struct libworker* w, uint8_t* buf, uint32_t len) free(qinfo.qname); return; } + q->w = w; /* process new query */ if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns, - w->back->udp_buff, qid, libworker_bg_done_cb, &d)) { + w->back->udp_buff, qid, libworker_bg_done_cb, q)) { add_bg_result(w, q, NULL, UB_NOMEM); } free(qinfo.qname); diff --git a/libunbound/worker.h b/libunbound/worker.h index b7c1ffb64..f1969b99d 100644 --- a/libunbound/worker.h +++ b/libunbound/worker.h @@ -68,8 +68,8 @@ struct libworker { /** context we are operating under */ struct ub_val_ctx* ctx; - /** is this a background worker? */ - int is_bg; + /** is this a bg worker that is threaded (not forked)? */ + int is_bg_thread; /** copy of the module environment with worker local entries. */ struct module_env* env; @@ -91,16 +91,6 @@ struct libworker { struct libworker_res_list* res_last; }; -/** - * Libworker query cb struct - */ -struct libworker_cb_data { - /** the worker involved */ - struct libworker* w; - /** the query involved */ - struct ctx_query* q; -}; - /** * List of results (arbitrary command serializations) to write back */ diff --git a/testcode/asynclook.c b/testcode/asynclook.c index 7e511c4f7..70b8f2dd5 100644 --- a/testcode/asynclook.c +++ b/testcode/asynclook.c @@ -65,6 +65,8 @@ void usage(char* argv[]) { printf("usage: %s name ...\n", argv[0]); printf("names are looked up at the same time, asynchronously.\n"); + printf("-d : enable debug output\n"); + printf("-t : use a resolver thread instead of forking a process\n"); exit(1); } @@ -89,9 +91,28 @@ int main(int argc, char** argv) if(argc == 1) { usage(argv); } + if(argc > 1 && strcmp(argv[1], "-h") == 0) + usage(argv); argc--; argv++; + /* create context */ + ctx = ub_val_ctx_create(); + if(!ctx) { + printf("could not create context, %s", strerror(errno)); + return 1; + } + if(argc > 0 && strcmp(argv[0], "-d") == 0) { + ub_val_ctx_debuglevel(ctx, 3); + argc--; + argv++; + } + if(argc > 0 && strcmp(argv[0], "-t") == 0) { + ub_val_ctx_async(ctx, 1); + argc--; + argv++; + } + /* allocate array for results. */ lookups = (struct lookinfo*)calloc((size_t)argc, sizeof(struct lookinfo)); @@ -99,20 +120,10 @@ int main(int argc, char** argv) printf("out of memory\n"); return 1; } - /* create context */ - ctx = ub_val_ctx_create(); - if(!ctx) { - printf("could not create context, %s", strerror(errno)); - return 1; - } /* perform asyncronous calls */ num_wait = argc; for(i=0; ircode != 0) - printf("%s: DNS error %d\n", lookups[i].qname, - lookups[i].result->rcode); - else if(!lookups[i].result->havedata) - printf("%s: no data %s\n", lookups[i].qname, + else if(lookups[i].result->havedata) + printf("%s: %s\n", lookups[i].qname, + inet_ntop(AF_INET, lookups[i].result->data[0], + buf, (socklen_t)sizeof(buf))); + else { + /* there is no data, why that? */ + if(lookups[i].result->rcode == 0 /*noerror*/ || + lookups[i].result->nxdomain) + printf("%s: no data %s\n", lookups[i].qname, lookups[i].result->nxdomain?"(no such host)": "(no IP4 address)"); - else printf("%s: %s\n", lookups[i].qname, - inet_ntop(AF_INET, lookups[i].result->data[0], - buf, (socklen_t)sizeof(buf))); + else /* some error (from the server) */ + printf("%s: DNS error %d\n", lookups[i].qname, + lookups[i].result->rcode); + } } ub_val_ctx_delete(ctx); + for(i=0; inum = *(int*)(thr->arg); log_assert(thr->num < THRDEBUG_MAX_THREADS); - log_assert(thread_infos[thr->num] == NULL); + /* as an aside, due to this, won't work for libunbound bg thread */ + if(thread_infos[thr->num] != NULL) + log_warn("thread warning, thr->num %d not NULL", thr->num); thread_infos[thr->num] = thr; LOCKRET(pthread_setspecific(thr_debug_key, thr)); if(check_locking_order)