]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
unbound.c pipe code.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 22 Jan 2008 11:10:49 +0000 (11:10 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 22 Jan 2008 11:10:49 +0000 (11:10 +0000)
git-svn-id: file:///svn/unbound/trunk@883 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
libunbound/context.c
libunbound/context.h
libunbound/unbound.c
libunbound/unbound.h
libunbound/worker.c
libunbound/worker.h

index adfc97c449820dad3cab595f1eefe6039adfcd94..a401ac4763a66464ddb0628b8a83057859840f69 100644 (file)
@@ -1,3 +1,6 @@
+22 January 2008: Wouter
+       - library code for async in libunbound/unbound.c.
+
 21 January 2008: Wouter
        - libworker work, netevent raw commpoints, write_msg, serialize.
 
index 5c6914427305ad11720ce426dce3878659aa4aaf..0c0d8154ec5e6c929db74160a0f051e2a1a4229d 100644 (file)
@@ -102,13 +102,13 @@ static int
 find_id(struct ub_val_ctx* ctx, int* id)
 {
        size_t tries = 0;
+       ctx->next_querynum++;
        while(rbtree_search(&ctx->queries, &ctx->next_querynum)) {
                ctx->next_querynum++; /* numerical wraparound is fine */
                if(tries++ > NUM_ID_TRIES)
                        return 0;
        }
        *id = ctx->next_querynum;
-       ctx->next_querynum++;
        return 1;
 }
 
index 15c6a39fcb6dbb04f0e93fd9b49bb71b02169250..29a3a9db2b732224c665fb610569be38658ffa8a 100644 (file)
@@ -170,7 +170,9 @@ enum ub_ctx_err {
        /** cfg change after finalize() */
        UB_AFTERFINAL = -6,
        /** initialization failed (bad settings) */
-       UB_INITFAIL = -7
+       UB_INITFAIL = -7,
+       /** error in pipe communication with async bg worker */
+       UB_PIPE = -8
 };
 
 /**
index 6785c9e97449f84623430708d45617c608168ca8..1d0392e28e4f44a964ce32a3331a2404819e9579 100644 (file)
@@ -50,6 +50,7 @@
 #include "util/config_file.h"
 #include "util/alloc.h"
 #include "util/module.h"
+#include "util/regional.h"
 #include "util/log.h"
 #include "services/modstack.h"
 #include "services/localzone.h"
@@ -104,16 +105,24 @@ ub_val_ctx_create()
        return ctx;
 }
 
-/** delete q */
+/** delete context query */
 static void
-delq(rbnode_t* n, void* ATTR_UNUSED(arg))
+context_query_delete(struct ctx_query* q) 
 {
-       struct ctx_query* q = (struct ctx_query*)n;
        if(!q) return;
        ub_val_result_free(q->res);
+       free(q->msg);
        free(q);
 }
 
+/** delete q */
+static void
+delq(rbnode_t* n, void* ATTR_UNUSED(arg))
+{
+       struct ctx_query* q = (struct ctx_query*)n;
+       context_query_delete(q);
+}
+
 void 
 ub_val_ctx_delete(struct ub_val_ctx* ctx)
 {
@@ -275,13 +284,15 @@ ub_val_ctx_poll(struct ub_val_ctx* ctx)
 int 
 ub_val_ctx_wait(struct ub_val_ctx* ctx)
 {
+       int r;
        lock_basic_lock(&ctx->cfglock);
        while(ctx->num_async > 0) {
-               lock_basic_lock(&ctx->rrpipe_lock);
                lock_basic_unlock(&ctx->cfglock);
-               (void)pollit(ctx, NULL);
+               lock_basic_lock(&ctx->rrpipe_lock);
+               r = pollit(ctx, NULL);
                lock_basic_unlock(&ctx->rrpipe_lock);
-               ub_val_ctx_process(ctx);
+               if(r)
+                       ub_val_ctx_process(ctx);
                lock_basic_lock(&ctx->cfglock);
        }
        lock_basic_unlock(&ctx->cfglock);
@@ -291,15 +302,93 @@ ub_val_ctx_wait(struct ub_val_ctx* ctx)
 int 
 ub_val_ctx_fd(struct ub_val_ctx* ctx)
 {
-       return ctx->rrpipe[0];
+       int fd;
+       lock_basic_lock(&ctx->rrpipe_lock);
+       fd = ctx->rrpipe[0];
+       lock_basic_unlock(&ctx->rrpipe_lock);
+       return fd;
+}
+
+/** process answer from bg worker */
+static int
+process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
+{
+       int err;
+       struct ctx_query* q;
+       ub_val_callback_t cb;
+       void* cbarg;
+       struct ub_val_result* res;
+       if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) {
+               log_err("error: bad data from bg worker %d",
+                       (int)context_serial_getcmd(msg, len));
+               return 0;
+       }
+
+       lock_basic_lock(&ctx->cfglock);
+       q = context_deserialize_answer(ctx, msg, len, &err);
+       if(!q) {
+               lock_basic_unlock(&ctx->cfglock);
+               return 0;
+       }
+       log_assert(q->async);
+
+       /* grab cb while locked */
+       cb = q->cb;
+       cbarg = q->cb_arg;
+       if(err) {
+               res = NULL;
+               ub_val_result_free(q->res);
+       } else {
+               /* parse the message, extract rcode, fill result */
+               ldns_buffer* buf = ldns_buffer_new(q->msg_len);
+               struct regional* region = regional_create();
+               res = q->res;
+               res->rcode = LDNS_RCODE_SERVFAIL;
+               if(region && buf) {
+                       ldns_buffer_clear(buf);
+                       ldns_buffer_write(buf, q->msg, q->msg_len);
+                       ldns_buffer_flip(buf);
+                       libworker_enter_result(res, buf, region,
+                               q->msg_security);
+               }
+               ldns_buffer_free(buf);
+               regional_destroy(region);
+       }
+       q->res = NULL;
+       /* delete the q from list */
+       (void)rbtree_delete(&ctx->queries, q->node.key);
+       ctx->num_async--;
+       context_query_delete(q);
+       lock_basic_unlock(&ctx->cfglock);
+
+       /* no locks held while calling callback, so that library is
+        * re-entrant. */
+       (*cb)(cbarg, err, res);
+
+       return 1;
 }
 
 int 
 ub_val_ctx_process(struct ub_val_ctx* ctx)
 {
-       /* TODO */
-       /* ctx->num_asynx-- when handled; */
-       return UB_NOMEM;
+       int r;
+       uint8_t* msg = NULL;
+       uint32_t len = 0;
+       while(1) {
+               lock_basic_lock(&ctx->rrpipe_lock);
+               r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+               lock_basic_unlock(&ctx->rrpipe_lock);
+               if(r == 0)
+                       return UB_PIPE;
+               else if(r == -1)
+                       return UB_NOERROR;
+               if(!process_answer(ctx, msg, len)) {
+                       free(msg);
+                       return UB_PIPE;
+               }
+               free(msg);
+       }
+       return UB_NOERROR;
 }
 
 int 
@@ -327,15 +416,14 @@ ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
 
        r = libworker_fg(ctx, q);
        if(r) {
-               ub_val_result_free(q->res);
-               free(q);
+               context_query_delete(q);
                return r;
        }
        *result = q->res;
+       q->res = NULL;
 
        (void)rbtree_delete(&ctx->queries, q->node.key);
-       free(q->msg);
-       free(q);
+       context_query_delete(q);
        return UB_NOERROR;
 }
 
@@ -344,7 +432,10 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
        int rrclass, void* mydata, ub_val_callback_t callback, int* async_id)
 {
        struct ctx_query* q;
+       uint8_t* msg = NULL;
+       uint32_t len = 0;
 
+       *async_id = 0;
        lock_basic_lock(&ctx->cfglock);
        if(!ctx->finalized) {
                int r = context_finalize(ctx);
@@ -367,27 +458,59 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
        q = context_new(ctx, name, rrtype, rrclass, callback, mydata);
        if(!q)
                return UB_NOMEM;
-       /* TODO write over pipe to background worker */
+
+       /* write over pipe to background worker */
+       lock_basic_lock(&ctx->cfglock);
+       msg = context_serialize_new_query(q, &len);
+       if(!msg) {
+               (void)rbtree_delete(&ctx->queries, q->node.key);
+               ctx->num_async--;
+               context_query_delete(q);
+               lock_basic_unlock(&ctx->cfglock);
+               return UB_NOMEM;
+       }
        *async_id = q->querynum;
-       return UB_NOMEM;
+       lock_basic_unlock(&ctx->cfglock);
+       
+       lock_basic_lock(&ctx->qqpipe_lock);
+       libworker_write_msg(ctx->qqpipe[1], msg, len, 0);
+       lock_basic_unlock(&ctx->qqpipe_lock);
+       free(msg);
+       return UB_NOERROR;
 }
 
 int 
 ub_val_cancel(struct ub_val_ctx* ctx, int async_id)
 {
        struct ctx_query* q;
+       uint8_t* msg = NULL;
+       uint32_t len = 0;
        lock_basic_lock(&ctx->cfglock);
        q = (struct ctx_query*)rbtree_search(&ctx->queries, &async_id);
-       lock_basic_unlock(&ctx->cfglock);
-       if(!q || !q->async) /* it is not there, so nothing to do */
+       if(!q || !q->async) {
+               /* it is not there, so nothing to do */
+               lock_basic_unlock(&ctx->cfglock);
                return UB_NOERROR;
-       /* TODO ; send cancel to background worker */
-
-       lock_basic_lock(&ctx->cfglock);
-       (void)rbtree_delete(&ctx->queries, &async_id);
+       }
+       log_assert(q->async);
+       msg = context_serialize_cancel(q, &len);
+       if(!msg) {
+               lock_basic_unlock(&ctx->cfglock);
+               return UB_NOMEM;
+       }
+       
+       /* delete it */
+       (void)rbtree_delete(&ctx->queries, q->node.key);
        ctx->num_async--;
+       context_query_delete(q);
        lock_basic_unlock(&ctx->cfglock);
-       return UB_NOMEM;
+
+       /* send cancel to background worker */
+       lock_basic_lock(&ctx->qqpipe_lock);
+       libworker_write_msg(ctx->qqpipe[1], msg, len, 0);
+       lock_basic_unlock(&ctx->qqpipe_lock);
+       free(msg);
+       return UB_NOERROR;
 }
 
 void 
@@ -418,6 +541,7 @@ ub_val_strerror(int err)
                case UB_FORKFAIL: return "could not fork";
                case UB_INITFAIL: return "initialization failure";
                case UB_AFTERFINAL: return "setting change after finalize";
+               case UB_PIPE: return "error in pipe communication with async";
                default: return "unknown error";
        }
 }
index 91d39fd3d93a5c4bc785bb7e3cc54f2dfdf74695..bbfb6469049e1e50f17543d3fc0c0dc154fdb028 100644 (file)
@@ -179,8 +179,7 @@ struct ub_val_result {
 /**
  * Callback for results of async queries.
  * The readable function definition looks like:
- * void my_callback(void* my_arg, int err, int secure, int havedata,
- *     struct ub_val_result* result);
+ * void my_callback(void* my_arg, int err, struct ub_val_result* result);
  * It is called with
  *     void* my_arg: your pointer to a (struct of) data of your choice, 
  *             or NULL.
@@ -333,7 +332,7 @@ int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
 /**
  * Perform resolution and validation of the target name.
  * Asynchronous, after a while, the callback will be called with your
- * data and the result + secure status.
+ * data and the result.
  * @param ctx: context.
  *     If no thread or process has been created yet to perform the
  *     work in the background, it is created now.
@@ -345,8 +344,7 @@ int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
  *     and is passed on to the callback function.
  * @param callback: this is called on completion of the resolution.
  *     It is called as:
- *     void callback(void* mydata, int err, int secure, int havedata, 
- *             struct ub_val_result* result)
+ *     void callback(void* mydata, int err, struct ub_val_result* result)
  *     with mydata: the same as passed here, you may pass NULL,
  *     with err: is 0 when a result has been found.
  *     with result: a newly allocated result structure.
index 455a8a38a05a595e93629022d5927b719eaafb80..1376a843e649942b001913c6d62ebf8a88c30679 100644 (file)
@@ -368,12 +368,37 @@ fill_res(struct ub_val_result* res, struct ub_packed_rrset_key* answer,
        return 1;
 }
 
+/** fill result from parsed message, on error fills servfail */
+void
+libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
+       struct regional* temp, enum sec_status msg_security)
+{
+       struct query_info rq;
+       struct reply_info* rep;
+       res->rcode = LDNS_RCODE_SERVFAIL;
+       rep = parse_reply(buf, temp, &rq);
+       if(!rep) {
+               return; /* error parsing buf, or out of memory */
+       }
+       if(!fill_res(res, reply_find_answer_rrset(&rq, rep), 
+               reply_find_final_cname_target(&rq, rep), &rq))
+               return; /* out of memory */
+       /* rcode, havedata, nxdomain, secure, bogus */
+       res->rcode = (int)FLAGS_GET_RCODE(rep->flags);
+       if(res->data && res->data[0])
+               res->havedata = 1;
+       if(res->rcode == LDNS_RCODE_NXDOMAIN)
+               res->nxdomain = 1;
+       if(msg_security == sec_status_secure)
+               res->secure = 1;
+       if(msg_security == sec_status_bogus)
+               res->bogus = 1;
+}
+
 /** callback with fg results */
 static void
 libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
 {
-       struct query_info rq; /* replied query */
-       struct reply_info* rep;
        struct libworker_fg_data* d = (struct libworker_fg_data*)arg;
        /* fg query is done; exit comm base */
        comm_base_exit(d->w->base);
@@ -389,29 +414,13 @@ libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
        d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf));
        d->q->msg_len = ldns_buffer_limit(buf);
        if(!d->q->msg) {
-               return; /* error in rcode */
+               return; /* the error is in the rcode */
        }
 
        /* canonname and results */
-       rep = parse_reply(buf, d->w->env->scratch, &rq);
-       if(!rep) {
-               return; /* error parsing buf, or out of memory */
-       }
-       if(!fill_res(d->q->res, reply_find_answer_rrset(&rq, rep), 
-               reply_find_final_cname_target(&rq, rep), &rq))
-               return; /* out of memory */
-       /* rcode, havedata, nxdomain, secure, bogus */
-       d->q->res->rcode = (int)LDNS_RCODE_WIRE(d->q->msg);
-       if(d->q->res->data && d->q->res->data[0])
-               d->q->res->havedata = 1;
-       if(d->q->res->rcode == LDNS_RCODE_NXDOMAIN)
-               d->q->res->nxdomain = 1;
-       if(s == sec_status_secure)
-               d->q->res->secure = 1;
-       if(s == sec_status_bogus)
-               d->q->res->bogus = 1;
-
        d->q->msg_security = s;
+
+       libworker_enter_result(d->q->res, buf, d->w->env->scratch, s);
 }
 
 /** setup qinfo and edns */
index 036d7169276670a2757b81221ff892d70a13938d..88545948c2be7d531add4ac68e91cf7ed9c1340e 100644 (file)
@@ -44,6 +44,7 @@
 #ifndef LIBUNBOUND_WORKER_H
 #define LIBUNBOUND_WORKER_H
 struct ub_val_ctx;
+struct ub_val_result;
 struct module_env;
 struct comm_base;
 struct outside_network;
@@ -54,6 +55,8 @@ struct module_qstate;
 struct comm_point;
 struct comm_reply;
 struct libworker_res_list;
+struct regional;
+enum sec_status;
 
 /** 
  * The library-worker status structure
@@ -217,4 +220,16 @@ int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
  */
 int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
 
+/** 
+ * fill result from parsed message, on error fills servfail 
+ * @param res: is clear at start, filled in at end.
+ * @param buf: contains DNS message.
+ * @param temp: temporary buffer for parse.
+ * @param msg_security: security status of the DNS message.
+ *   On error, the res may contain a different status 
+ *   (out of memory is not secure, not bogus).
+ */
+void libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
+       struct regional* temp, enum sec_status msg_security);
+
 #endif /* LIBUNBOUND_WORKER_H */