From: Wouter Wijngaards Date: Tue, 22 Jan 2008 11:10:49 +0000 (+0000) Subject: unbound.c pipe code. X-Git-Tag: release-0.9~57 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0e000a8587db7fd5014ad0ae0a57d46cbccf77dd;p=thirdparty%2Funbound.git unbound.c pipe code. git-svn-id: file:///svn/unbound/trunk@883 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/doc/Changelog b/doc/Changelog index adfc97c44..a401ac476 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,6 @@ +22 January 2008: Wouter + - library code for async in libunbound/unbound.c. + 21 January 2008: Wouter - libworker work, netevent raw commpoints, write_msg, serialize. diff --git a/libunbound/context.c b/libunbound/context.c index 5c6914427..0c0d8154e 100644 --- a/libunbound/context.c +++ b/libunbound/context.c @@ -102,13 +102,13 @@ static int find_id(struct ub_val_ctx* ctx, int* id) { size_t tries = 0; + ctx->next_querynum++; while(rbtree_search(&ctx->queries, &ctx->next_querynum)) { ctx->next_querynum++; /* numerical wraparound is fine */ if(tries++ > NUM_ID_TRIES) return 0; } *id = ctx->next_querynum; - ctx->next_querynum++; return 1; } diff --git a/libunbound/context.h b/libunbound/context.h index 15c6a39fc..29a3a9db2 100644 --- a/libunbound/context.h +++ b/libunbound/context.h @@ -170,7 +170,9 @@ enum ub_ctx_err { /** cfg change after finalize() */ UB_AFTERFINAL = -6, /** initialization failed (bad settings) */ - UB_INITFAIL = -7 + UB_INITFAIL = -7, + /** error in pipe communication with async bg worker */ + UB_PIPE = -8 }; /** diff --git a/libunbound/unbound.c b/libunbound/unbound.c index 6785c9e97..1d0392e28 100644 --- a/libunbound/unbound.c +++ b/libunbound/unbound.c @@ -50,6 +50,7 @@ #include "util/config_file.h" #include "util/alloc.h" #include "util/module.h" +#include "util/regional.h" #include "util/log.h" #include "services/modstack.h" #include "services/localzone.h" @@ -104,16 +105,24 @@ ub_val_ctx_create() return ctx; } -/** delete q */ +/** delete context query */ static void -delq(rbnode_t* n, void* ATTR_UNUSED(arg)) +context_query_delete(struct ctx_query* q) { - struct ctx_query* q = (struct ctx_query*)n; if(!q) return; ub_val_result_free(q->res); + free(q->msg); free(q); } +/** delete q */ +static void +delq(rbnode_t* n, void* ATTR_UNUSED(arg)) +{ + struct ctx_query* q = (struct ctx_query*)n; + context_query_delete(q); +} + void ub_val_ctx_delete(struct ub_val_ctx* ctx) { @@ -275,13 +284,15 @@ ub_val_ctx_poll(struct ub_val_ctx* ctx) int ub_val_ctx_wait(struct ub_val_ctx* ctx) { + int r; lock_basic_lock(&ctx->cfglock); while(ctx->num_async > 0) { - lock_basic_lock(&ctx->rrpipe_lock); lock_basic_unlock(&ctx->cfglock); - (void)pollit(ctx, NULL); + lock_basic_lock(&ctx->rrpipe_lock); + r = pollit(ctx, NULL); lock_basic_unlock(&ctx->rrpipe_lock); - ub_val_ctx_process(ctx); + if(r) + ub_val_ctx_process(ctx); lock_basic_lock(&ctx->cfglock); } lock_basic_unlock(&ctx->cfglock); @@ -291,15 +302,93 @@ ub_val_ctx_wait(struct ub_val_ctx* ctx) int ub_val_ctx_fd(struct ub_val_ctx* ctx) { - return ctx->rrpipe[0]; + int fd; + lock_basic_lock(&ctx->rrpipe_lock); + fd = ctx->rrpipe[0]; + lock_basic_unlock(&ctx->rrpipe_lock); + return fd; +} + +/** process answer from bg worker */ +static int +process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len) +{ + int err; + struct ctx_query* q; + ub_val_callback_t cb; + void* cbarg; + struct ub_val_result* res; + if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) { + log_err("error: bad data from bg worker %d", + (int)context_serial_getcmd(msg, len)); + return 0; + } + + lock_basic_lock(&ctx->cfglock); + q = context_deserialize_answer(ctx, msg, len, &err); + if(!q) { + lock_basic_unlock(&ctx->cfglock); + return 0; + } + log_assert(q->async); + + /* grab cb while locked */ + cb = q->cb; + cbarg = q->cb_arg; + if(err) { + res = NULL; + ub_val_result_free(q->res); + } else { + /* parse the message, extract rcode, fill result */ + ldns_buffer* buf = ldns_buffer_new(q->msg_len); + struct regional* region = regional_create(); + res = q->res; + res->rcode = LDNS_RCODE_SERVFAIL; + if(region && buf) { + ldns_buffer_clear(buf); + ldns_buffer_write(buf, q->msg, q->msg_len); + ldns_buffer_flip(buf); + libworker_enter_result(res, buf, region, + q->msg_security); + } + ldns_buffer_free(buf); + regional_destroy(region); + } + q->res = NULL; + /* delete the q from list */ + (void)rbtree_delete(&ctx->queries, q->node.key); + ctx->num_async--; + context_query_delete(q); + lock_basic_unlock(&ctx->cfglock); + + /* no locks held while calling callback, so that library is + * re-entrant. */ + (*cb)(cbarg, err, res); + + return 1; } int ub_val_ctx_process(struct ub_val_ctx* ctx) { - /* TODO */ - /* ctx->num_asynx-- when handled; */ - return UB_NOMEM; + int r; + uint8_t* msg = NULL; + uint32_t len = 0; + while(1) { + lock_basic_lock(&ctx->rrpipe_lock); + r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); + lock_basic_unlock(&ctx->rrpipe_lock); + if(r == 0) + return UB_PIPE; + else if(r == -1) + return UB_NOERROR; + if(!process_answer(ctx, msg, len)) { + free(msg); + return UB_PIPE; + } + free(msg); + } + return UB_NOERROR; } int @@ -327,15 +416,14 @@ ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype, r = libworker_fg(ctx, q); if(r) { - ub_val_result_free(q->res); - free(q); + context_query_delete(q); return r; } *result = q->res; + q->res = NULL; (void)rbtree_delete(&ctx->queries, q->node.key); - free(q->msg); - free(q); + context_query_delete(q); return UB_NOERROR; } @@ -344,7 +432,10 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype, int rrclass, void* mydata, ub_val_callback_t callback, int* async_id) { struct ctx_query* q; + uint8_t* msg = NULL; + uint32_t len = 0; + *async_id = 0; lock_basic_lock(&ctx->cfglock); if(!ctx->finalized) { int r = context_finalize(ctx); @@ -367,27 +458,59 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype, q = context_new(ctx, name, rrtype, rrclass, callback, mydata); if(!q) return UB_NOMEM; - /* TODO write over pipe to background worker */ + + /* write over pipe to background worker */ + lock_basic_lock(&ctx->cfglock); + msg = context_serialize_new_query(q, &len); + if(!msg) { + (void)rbtree_delete(&ctx->queries, q->node.key); + ctx->num_async--; + context_query_delete(q); + lock_basic_unlock(&ctx->cfglock); + return UB_NOMEM; + } *async_id = q->querynum; - return UB_NOMEM; + lock_basic_unlock(&ctx->cfglock); + + lock_basic_lock(&ctx->qqpipe_lock); + libworker_write_msg(ctx->qqpipe[1], msg, len, 0); + lock_basic_unlock(&ctx->qqpipe_lock); + free(msg); + return UB_NOERROR; } int ub_val_cancel(struct ub_val_ctx* ctx, int async_id) { struct ctx_query* q; + uint8_t* msg = NULL; + uint32_t len = 0; lock_basic_lock(&ctx->cfglock); q = (struct ctx_query*)rbtree_search(&ctx->queries, &async_id); - lock_basic_unlock(&ctx->cfglock); - if(!q || !q->async) /* it is not there, so nothing to do */ + if(!q || !q->async) { + /* it is not there, so nothing to do */ + lock_basic_unlock(&ctx->cfglock); return UB_NOERROR; - /* TODO ; send cancel to background worker */ - - lock_basic_lock(&ctx->cfglock); - (void)rbtree_delete(&ctx->queries, &async_id); + } + log_assert(q->async); + msg = context_serialize_cancel(q, &len); + if(!msg) { + lock_basic_unlock(&ctx->cfglock); + return UB_NOMEM; + } + + /* delete it */ + (void)rbtree_delete(&ctx->queries, q->node.key); ctx->num_async--; + context_query_delete(q); lock_basic_unlock(&ctx->cfglock); - return UB_NOMEM; + + /* send cancel to background worker */ + lock_basic_lock(&ctx->qqpipe_lock); + libworker_write_msg(ctx->qqpipe[1], msg, len, 0); + lock_basic_unlock(&ctx->qqpipe_lock); + free(msg); + return UB_NOERROR; } void @@ -418,6 +541,7 @@ ub_val_strerror(int err) case UB_FORKFAIL: return "could not fork"; case UB_INITFAIL: return "initialization failure"; case UB_AFTERFINAL: return "setting change after finalize"; + case UB_PIPE: return "error in pipe communication with async"; default: return "unknown error"; } } diff --git a/libunbound/unbound.h b/libunbound/unbound.h index 91d39fd3d..bbfb64690 100644 --- a/libunbound/unbound.h +++ b/libunbound/unbound.h @@ -179,8 +179,7 @@ struct ub_val_result { /** * Callback for results of async queries. * The readable function definition looks like: - * void my_callback(void* my_arg, int err, int secure, int havedata, - * struct ub_val_result* result); + * void my_callback(void* my_arg, int err, struct ub_val_result* result); * It is called with * void* my_arg: your pointer to a (struct of) data of your choice, * or NULL. @@ -333,7 +332,7 @@ int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype, /** * Perform resolution and validation of the target name. * Asynchronous, after a while, the callback will be called with your - * data and the result + secure status. + * data and the result. * @param ctx: context. * If no thread or process has been created yet to perform the * work in the background, it is created now. @@ -345,8 +344,7 @@ int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype, * and is passed on to the callback function. * @param callback: this is called on completion of the resolution. * It is called as: - * void callback(void* mydata, int err, int secure, int havedata, - * struct ub_val_result* result) + * void callback(void* mydata, int err, struct ub_val_result* result) * with mydata: the same as passed here, you may pass NULL, * with err: is 0 when a result has been found. * with result: a newly allocated result structure. diff --git a/libunbound/worker.c b/libunbound/worker.c index 455a8a38a..1376a843e 100644 --- a/libunbound/worker.c +++ b/libunbound/worker.c @@ -368,12 +368,37 @@ fill_res(struct ub_val_result* res, struct ub_packed_rrset_key* answer, return 1; } +/** fill result from parsed message, on error fills servfail */ +void +libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf, + struct regional* temp, enum sec_status msg_security) +{ + struct query_info rq; + struct reply_info* rep; + res->rcode = LDNS_RCODE_SERVFAIL; + rep = parse_reply(buf, temp, &rq); + if(!rep) { + return; /* error parsing buf, or out of memory */ + } + if(!fill_res(res, reply_find_answer_rrset(&rq, rep), + reply_find_final_cname_target(&rq, rep), &rq)) + return; /* out of memory */ + /* rcode, havedata, nxdomain, secure, bogus */ + res->rcode = (int)FLAGS_GET_RCODE(rep->flags); + if(res->data && res->data[0]) + res->havedata = 1; + if(res->rcode == LDNS_RCODE_NXDOMAIN) + res->nxdomain = 1; + if(msg_security == sec_status_secure) + res->secure = 1; + if(msg_security == sec_status_bogus) + res->bogus = 1; +} + /** callback with fg results */ static void libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s) { - struct query_info rq; /* replied query */ - struct reply_info* rep; struct libworker_fg_data* d = (struct libworker_fg_data*)arg; /* fg query is done; exit comm base */ comm_base_exit(d->w->base); @@ -389,29 +414,13 @@ libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s) d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf)); d->q->msg_len = ldns_buffer_limit(buf); if(!d->q->msg) { - return; /* error in rcode */ + return; /* the error is in the rcode */ } /* canonname and results */ - rep = parse_reply(buf, d->w->env->scratch, &rq); - if(!rep) { - return; /* error parsing buf, or out of memory */ - } - if(!fill_res(d->q->res, reply_find_answer_rrset(&rq, rep), - reply_find_final_cname_target(&rq, rep), &rq)) - return; /* out of memory */ - /* rcode, havedata, nxdomain, secure, bogus */ - d->q->res->rcode = (int)LDNS_RCODE_WIRE(d->q->msg); - if(d->q->res->data && d->q->res->data[0]) - d->q->res->havedata = 1; - if(d->q->res->rcode == LDNS_RCODE_NXDOMAIN) - d->q->res->nxdomain = 1; - if(s == sec_status_secure) - d->q->res->secure = 1; - if(s == sec_status_bogus) - d->q->res->bogus = 1; - d->q->msg_security = s; + + libworker_enter_result(d->q->res, buf, d->w->env->scratch, s); } /** setup qinfo and edns */ diff --git a/libunbound/worker.h b/libunbound/worker.h index 036d71692..88545948c 100644 --- a/libunbound/worker.h +++ b/libunbound/worker.h @@ -44,6 +44,7 @@ #ifndef LIBUNBOUND_WORKER_H #define LIBUNBOUND_WORKER_H struct ub_val_ctx; +struct ub_val_result; struct module_env; struct comm_base; struct outside_network; @@ -54,6 +55,8 @@ struct module_qstate; struct comm_point; struct comm_reply; struct libworker_res_list; +struct regional; +enum sec_status; /** * The library-worker status structure @@ -217,4 +220,16 @@ int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock); */ int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock); +/** + * fill result from parsed message, on error fills servfail + * @param res: is clear at start, filled in at end. + * @param buf: contains DNS message. + * @param temp: temporary buffer for parse. + * @param msg_security: security status of the DNS message. + * On error, the res may contain a different status + * (out of memory is not secure, not bogus). + */ +void libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf, + struct regional* temp, enum sec_status msg_security); + #endif /* LIBUNBOUND_WORKER_H */