]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
Multiple queries per thread.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 27 Mar 2007 15:21:21 +0000 (15:21 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 27 Mar 2007 15:21:21 +0000 (15:21 +0000)
git-svn-id: file:///svn/unbound/trunk@201 be551aaa-1e26-0410-a405-d3ace91eadb9

daemon/worker.c
daemon/worker.h
doc/Changelog
util/data/msgreply.c
util/data/msgreply.h

index 0806b9bc671e480376ce649c6585c0d748fcdb2b..0cff28066c1af1c317c66a07784b01763123c1a9 100644 (file)
@@ -74,18 +74,34 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
                log_err("write socket: %s", strerror(errno));
 }
 
+/** release workrequest back to the freelist,
+ * note that the w->qinfo still needs to be cleared after this. 
+ */
+static void
+req_release(struct work_query* w)
+{
+       if(w->worker->num_requests == w->worker->request_size)  {
+               /* no longer at max, start accepting again. */
+               listen_resume(w->worker->front);
+       }
+       w->worker->num_requests --;
+       log_assert(w->worker->num_requests >= 0);
+       w->next = w->worker->free_queries;
+       w->worker->free_queries = w;
+}
+
 /** reply to query with given error code */
 static void 
-replyerror(int r, struct worker* worker)
+replyerror(int r, struct work_query* w)
 {
-       ldns_buffer* buf = worker->query_reply.c->buffer;
+       ldns_buffer* buf = w->query_reply.c->buffer;
        uint16_t flags;
        verbose(VERB_DETAIL, "reply with error");
        
        ldns_buffer_clear(buf);
-       ldns_buffer_write_u16(buf, worker->query_id);
+       ldns_buffer_write_u16(buf, w->query_id);
        flags = (uint16_t)(0x8000 | r); /* QR and retcode*/
-       flags |= (worker->query_flags & 0x0100); /* copy RD bit */
+       flags |= (w->query_flags & 0x0100); /* copy RD bit */
        ldns_buffer_write_u16(buf, flags);
        flags = 1;
        ldns_buffer_write_u16(buf, flags);
@@ -93,17 +109,13 @@ replyerror(int r, struct worker* worker)
        ldns_buffer_write(buf, &flags, sizeof(uint16_t));
        ldns_buffer_write(buf, &flags, sizeof(uint16_t));
        ldns_buffer_write(buf, &flags, sizeof(uint16_t));
-       ldns_buffer_write(buf, worker->qinfo.qname, worker->qinfo.qnamesize);
-       ldns_buffer_write_u16(buf, worker->qinfo.qtype);
-       ldns_buffer_write_u16(buf, worker->qinfo.qclass);
+       ldns_buffer_write(buf, w->qinfo.qname, w->qinfo.qnamesize);
+       ldns_buffer_write_u16(buf, w->qinfo.qtype);
+       ldns_buffer_write_u16(buf, w->qinfo.qclass);
        ldns_buffer_flip(buf);
-       comm_point_send_reply(&worker->query_reply);
-       if(worker->num_requests == 1)  {
-               /* no longer at max, start accepting again. */
-               listen_resume(worker->front);
-       }
-       worker->num_requests --;
-       query_info_clear(&worker->qinfo);
+       comm_point_send_reply(&w->query_reply);
+       req_release(w);
+       query_info_clear(&w->qinfo);
 }
 
 /** process incoming replies from the network */
@@ -111,13 +123,12 @@ static int
 worker_handle_reply(struct comm_point* c, void* arg, int error, 
        struct comm_reply* ATTR_UNUSED(reply_info))
 {
-       struct worker* worker = (struct worker*)arg;
+       struct work_query* w = (struct work_query*)arg;
        struct reply_info* rep;
        struct msgreply_entry* e;
-       verbose(VERB_DETAIL, "reply to query with stored ID %d", 
-               worker->query_id);
+       verbose(VERB_DETAIL, "reply to query with stored ID %d", w->query_id);
        if(error != 0) {
-               replyerror(LDNS_RCODE_SERVFAIL, worker);
+               replyerror(LDNS_RCODE_SERVFAIL, w);
                return 0;
        }
        /* sanity check. */
@@ -131,7 +142,7 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
        rep = (struct reply_info*)malloc(sizeof(struct reply_info));
        if(!rep) {
                log_err("out of memory");
-               replyerror(LDNS_RCODE_SERVFAIL, worker);
+               replyerror(LDNS_RCODE_SERVFAIL, w);
                return 0;
        }
        rep->replysize = ldns_buffer_limit(c->buffer) - 2; /* minus ID */
@@ -140,44 +151,33 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
        if(!rep->reply) {
                free(rep);
                log_err("out of memory");
-               replyerror(LDNS_RCODE_SERVFAIL, worker);
+               replyerror(LDNS_RCODE_SERVFAIL, w);
                return 0;
        }
        memmove(rep->reply, ldns_buffer_at(c->buffer, 2), rep->replysize);
-       ldns_buffer_write_u16_at(worker->query_reply.c->buffer, 0, 
-               worker->query_id);
-       reply_info_answer(rep, worker->query_flags, worker->query_reply.c->
-               buffer);
-       comm_point_send_reply(&worker->query_reply);
-       if(worker->num_requests == 1)  {
-               /* no longer at max, start accepting again. */
-               listen_resume(worker->front);
-       }
-       worker->num_requests --;
+       ldns_buffer_write_u16_at(w->query_reply.c->buffer, 0, w->query_id);
+       reply_info_answer(rep, w->query_flags, w->query_reply.c->buffer);
+       comm_point_send_reply(&w->query_reply);
+       req_release(w);
        /* store or update reply in the cache */
-       if(!(e = query_info_entrysetup(&worker->qinfo, rep, 
-               worker->query_hash))) {
+       if(!(e = query_info_entrysetup(&w->qinfo, rep, w->query_hash))) {
                log_err("out of memory");
                return 0;
        }
-       slabhash_insert(worker->daemon->msg_cache, worker->query_hash, 
+       slabhash_insert(w->worker->daemon->msg_cache, w->query_hash, 
                &e->entry, rep);
        return 0;
 }
 
 /** process incoming request */
 static void 
-worker_process_query(struct worker* worker) 
+worker_process_query(struct worker* worker, struct work_query* w
 {
        /* query the forwarding address */
-       worker->query_id = LDNS_ID_WIRE(ldns_buffer_begin(
-               worker->query_reply.c->buffer));
-       worker->query_flags = ldns_buffer_read_u16_at(worker->
-               query_reply.c->buffer, 2);
-       verbose(VERB_DETAIL, "process_query ID %d", worker->query_id);
-       pending_udp_query(worker->back, worker->query_reply.c->buffer, 
+       verbose(VERB_DETAIL, "process_query ID %d", w->query_id);
+       pending_udp_query(worker->back, w->query_reply.c->buffer, 
                &worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
-               worker_handle_reply, worker, worker->rndstate);
+               worker_handle_reply, w, worker->rndstate);
 }
 
 /** check request sanity. Returns error code, 0 OK, or -1 discard. 
@@ -261,6 +261,9 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
        int ret;
        hashvalue_t h;
        struct lruhash_entry* e;
+       struct query_info qinfo;
+       struct work_query* w;
+
        verbose(VERB_DETAIL, "worker handle request");
        if(error != NETEVENT_NOERROR) {
                log_err("called with err=%d", error);
@@ -275,27 +278,17 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
                comm_point_drop_reply(repinfo);
                return 0;
        }
-       if(worker->num_requests > 0) {
-               /* we could get this due to a slow tcp incoming query, 
-                  that started before we performed listen_pushback */
-               verbose(VERB_DETAIL, "worker: too many incoming requests "
-                       "active. dropping incoming query.");
-               comm_point_drop_reply(repinfo);
-               return 0;
-       }
        /* see if query is in the cache */
-       if(!query_info_parse(&worker->qinfo, c->buffer)) {
+       if(!query_info_parse(&qinfo, c->buffer)) {
                LDNS_QR_SET(ldns_buffer_begin(c->buffer));
                LDNS_RCODE_SET(ldns_buffer_begin(c->buffer), 
                        LDNS_RCODE_FORMERR);
                return 1;
        }
-       h = query_info_hash(&worker->qinfo);
-       if((e=slabhash_lookup(worker->daemon->msg_cache, h, &worker->qinfo, 
-               0))) {
+       h = query_info_hash(&qinfo);
+       if((e=slabhash_lookup(worker->daemon->msg_cache, h, &qinfo, 0))) {
                /* answer from cache */
                log_info("answer from the cache");
-               query_info_clear(&worker->qinfo);
                /* id is still in the buffer, no need to touch it */
                reply_info_answer((struct reply_info*)e->data, 
                        ldns_buffer_read_u16_at(c->buffer, 2), c->buffer);
@@ -303,16 +296,40 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
                return 1;
        }
        ldns_buffer_rewind(c->buffer);
+       /* perform memory allocation(s) */
+       if(!query_info_allocqname(&qinfo)) {
+               comm_point_drop_reply(repinfo);
+               return 0;
+       }
 
-       /* answer it */
-       worker->query_hash = h;
+       /* grab a work request structure for this new request */
+       if(!worker->free_queries) {
+               /* we could get this due to a slow tcp incoming query, 
+                  that started before we performed listen_pushback */
+               verbose(VERB_DETAIL, "worker: too many incoming requests "
+                       "active. dropping incoming query.");
+               comm_point_drop_reply(repinfo);
+               return 0;
+       }
+       w = worker->free_queries;
+       worker->free_queries = w->next;
        worker->num_requests ++;
-       if(worker->num_requests >= 1)  {
+       log_assert(worker->num_requests <= worker->request_size);
+       if(worker->num_requests == worker->request_size)  {
                /* the max request number has been reached, stop accepting */
                listen_pushback(worker->front);
        }
-       memcpy(&worker->query_reply, repinfo, sizeof(struct comm_reply));
-       worker_process_query(worker);
+
+       /* init request */
+       w->next = NULL;
+       w->query_hash = h;
+       memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply));
+       memcpy(&w->qinfo, &qinfo, sizeof(struct query_info));
+       w->query_id = LDNS_ID_WIRE(ldns_buffer_begin(c->buffer));
+       w->query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
+
+       /* answer it */
+       worker_process_query(worker, w);
        return 0;
 }
 
@@ -377,6 +394,22 @@ worker_create(struct daemon* daemon, int id)
        return worker;
 }
 
+/** create request handling structures */
+static int
+reqs_init(struct worker* worker)
+{
+       int i;
+       for(i=0; i<worker->request_size; i++) {
+               struct work_query* q = (struct work_query*)calloc(1,
+                       sizeof(struct work_query));
+               if(!q) return 0;
+               q->worker = worker;
+               q->next = worker->free_queries;
+               worker->free_queries = q;
+       }
+       return 1;
+}
+
 int
 worker_init(struct worker* worker, struct config_file *cfg, 
        struct listen_port* ports, size_t buffer_size, int do_sigs)
@@ -451,6 +484,11 @@ worker_init(struct worker* worker, struct config_file *cfg,
                        return 0;
                }
        }
+       worker->request_size = 1;
+       if(!reqs_init(worker)) {
+               worker_delete(worker);
+               return 0;
+       }
 
        /* set forwarder address */
        if(cfg->fwd_address && cfg->fwd_address[0]) {
index eff1141d35eae70e56e980959dc20f0e26e376cb..5c83a1a09110486442754155a93c4b78f6d1acf4 100644 (file)
@@ -64,6 +64,23 @@ enum worker_commands {
        worker_cmd_quit
 };
 
+/** information per query that is in processing */
+struct work_query {
+       /** next query in freelist */
+       struct work_query* next;
+       /** the worker for this query */
+       struct worker* worker;
+       /** the query reply destination, packet buffer and where to send. */
+       struct comm_reply query_reply;
+       /** the query_info structure from the query */
+       struct query_info qinfo;
+       /** hash value of the query qinfo */
+       hashvalue_t query_hash;
+       /** id of query */
+       uint16_t query_id;
+       /** flags uint16 from query */
+       uint16_t query_flags;
+};
 
 /**
  * Structure holding working information for unbound.
@@ -93,16 +110,10 @@ struct worker {
 
        /** number of requests currently active */
        int num_requests;
-       /** our one and only query, packet buffer and where to send. */
-       struct comm_reply query_reply;
-       /** id of query */
-       uint16_t query_id;
-       /** flags uint16 from query */
-       uint16_t query_flags;
-       /** the query_info structure from the query */
-       struct query_info qinfo;
-       /** hash value of the query qinfo */
-       hashvalue_t query_hash;
+       /** number of requests that can be handled by this worker */
+       int request_size;
+       /** the free working queries */
+       struct work_query* free_queries;
 
        /** address to forward to */
        struct sockaddr_storage fwd_addr;
index 11d27cb1596994e1e9d1a601ece58fc453e1dbb3..80c1cd8e27b46142bee0097535c7bce66096d2a6 100644 (file)
@@ -5,6 +5,7 @@
        - created test that checks if items drop out of the cache.
        - added word 'partitioned hash table' to documentation on slab hash.
          A slab hash is a partitioned hash table.
+       - worker can handle multiple queries at a time.
 
 26 March 2007: Wouter
        - config settings for slab hash message cache.
index 874685b329d0bcf3845894b581c60b28153c7cdb..72bcf550b312387653143089646132e0ae6b3f35 100644 (file)
@@ -79,22 +79,25 @@ int query_info_parse(struct query_info* m, ldns_buffer* query)
        log_assert(ldns_buffer_position(query) == 0);
        m->has_cd = (int)LDNS_CD_WIRE(q);
        ldns_buffer_skip(query, LDNS_HEADER_SIZE);
-       q = ldns_buffer_current(query);
+       m->qname = ldns_buffer_current(query);
        if((m->qnamesize = query_dname_len(query)) == 0)
                return 0; /* parse error */
+       if(ldns_buffer_remaining(query) < 4)
+               return 0; /* need qtype, qclass */
+       m->qtype = ldns_buffer_read_u16(query);
+       m->qclass = ldns_buffer_read_u16(query);
+       return 1;
+}
+
+int 
+query_info_allocqname(struct query_info* m)
+{
+       uint8_t* q = m->qname;
        if(!(m->qname = (uint8_t*)malloc(m->qnamesize))) {
-               log_err("query_info_parse: out of memory");
+               log_err("query_info_allocqname: out of memory");
                return 0; /* out of memory */
        }
        memmove(m->qname, q, m->qnamesize);
-
-       if(ldns_buffer_remaining(query) < 4) {
-               free(m->qname);
-               m->qname = NULL;
-               return 0; /* need qtype, qclass */
-       }
-       m->qtype = ldns_buffer_read_u16(query);
-       m->qclass = ldns_buffer_read_u16(query);
        return 1;
 }
 
index c29474a01e99249bdd63564bf4e185b9514707d2..9d458ba7da511ce37010ad30e52b92ee364df481 100644 (file)
@@ -86,7 +86,7 @@ struct msgreply_entry {
  * Parse wire query into a queryinfo structure, return 0 on parse error. 
  * initialises the (prealloced) queryinfo structure as well. sets reply to 0.
  * This query structure contains a pointer back info the buffer!
- * This pointer avoids memory allocation.
+ * This pointer avoids memory allocation. 
  * @param m: the prealloced queryinfo structure to put query into.
  *    must be unused, or _clear()ed.
  * @param query: the wireformat packet query. starts with ID.
@@ -94,6 +94,13 @@ struct msgreply_entry {
  */
 int query_info_parse(struct query_info* m, ldns_buffer* query);
 
+/**
+ * Allocate and copy the qname (obtained from query_info_parse()).
+ * @param m: the queryinfo structure.
+ * @return: 0 on alloc failure.
+ */
+int query_info_allocqname(struct query_info* m);
+
 /**
  * Compare two queryinfo structures, on query, 
  * The qname is _not_ sorted in canonical ordering.