From 1eb9ff1da3d2a425edab2cf67334b6032c267826 Mon Sep 17 00:00:00 2001 From: Wouter Wijngaards Date: Tue, 27 Mar 2007 15:21:21 +0000 Subject: [PATCH] Multiple queries per thread. git-svn-id: file:///svn/unbound/trunk@201 be551aaa-1e26-0410-a405-d3ace91eadb9 --- daemon/worker.c | 156 +++++++++++++++++++++++++++---------------- daemon/worker.h | 31 ++++++--- doc/Changelog | 1 + util/data/msgreply.c | 23 ++++--- util/data/msgreply.h | 9 ++- 5 files changed, 140 insertions(+), 80 deletions(-) diff --git a/daemon/worker.c b/daemon/worker.c index 0806b9bc6..0cff28066 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -74,18 +74,34 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer, log_err("write socket: %s", strerror(errno)); } +/** release workrequest back to the freelist, + * note that the w->qinfo still needs to be cleared after this. + */ +static void +req_release(struct work_query* w) +{ + if(w->worker->num_requests == w->worker->request_size) { + /* no longer at max, start accepting again. */ + listen_resume(w->worker->front); + } + w->worker->num_requests --; + log_assert(w->worker->num_requests >= 0); + w->next = w->worker->free_queries; + w->worker->free_queries = w; +} + /** reply to query with given error code */ static void -replyerror(int r, struct worker* worker) +replyerror(int r, struct work_query* w) { - ldns_buffer* buf = worker->query_reply.c->buffer; + ldns_buffer* buf = w->query_reply.c->buffer; uint16_t flags; verbose(VERB_DETAIL, "reply with error"); ldns_buffer_clear(buf); - ldns_buffer_write_u16(buf, worker->query_id); + ldns_buffer_write_u16(buf, w->query_id); flags = (uint16_t)(0x8000 | r); /* QR and retcode*/ - flags |= (worker->query_flags & 0x0100); /* copy RD bit */ + flags |= (w->query_flags & 0x0100); /* copy RD bit */ ldns_buffer_write_u16(buf, flags); flags = 1; ldns_buffer_write_u16(buf, flags); @@ -93,17 +109,13 @@ replyerror(int r, struct worker* worker) ldns_buffer_write(buf, &flags, sizeof(uint16_t)); ldns_buffer_write(buf, &flags, sizeof(uint16_t)); ldns_buffer_write(buf, &flags, sizeof(uint16_t)); - ldns_buffer_write(buf, worker->qinfo.qname, worker->qinfo.qnamesize); - ldns_buffer_write_u16(buf, worker->qinfo.qtype); - ldns_buffer_write_u16(buf, worker->qinfo.qclass); + ldns_buffer_write(buf, w->qinfo.qname, w->qinfo.qnamesize); + ldns_buffer_write_u16(buf, w->qinfo.qtype); + ldns_buffer_write_u16(buf, w->qinfo.qclass); ldns_buffer_flip(buf); - comm_point_send_reply(&worker->query_reply); - if(worker->num_requests == 1) { - /* no longer at max, start accepting again. */ - listen_resume(worker->front); - } - worker->num_requests --; - query_info_clear(&worker->qinfo); + comm_point_send_reply(&w->query_reply); + req_release(w); + query_info_clear(&w->qinfo); } /** process incoming replies from the network */ @@ -111,13 +123,12 @@ static int worker_handle_reply(struct comm_point* c, void* arg, int error, struct comm_reply* ATTR_UNUSED(reply_info)) { - struct worker* worker = (struct worker*)arg; + struct work_query* w = (struct work_query*)arg; struct reply_info* rep; struct msgreply_entry* e; - verbose(VERB_DETAIL, "reply to query with stored ID %d", - worker->query_id); + verbose(VERB_DETAIL, "reply to query with stored ID %d", w->query_id); if(error != 0) { - replyerror(LDNS_RCODE_SERVFAIL, worker); + replyerror(LDNS_RCODE_SERVFAIL, w); return 0; } /* sanity check. */ @@ -131,7 +142,7 @@ worker_handle_reply(struct comm_point* c, void* arg, int error, rep = (struct reply_info*)malloc(sizeof(struct reply_info)); if(!rep) { log_err("out of memory"); - replyerror(LDNS_RCODE_SERVFAIL, worker); + replyerror(LDNS_RCODE_SERVFAIL, w); return 0; } rep->replysize = ldns_buffer_limit(c->buffer) - 2; /* minus ID */ @@ -140,44 +151,33 @@ worker_handle_reply(struct comm_point* c, void* arg, int error, if(!rep->reply) { free(rep); log_err("out of memory"); - replyerror(LDNS_RCODE_SERVFAIL, worker); + replyerror(LDNS_RCODE_SERVFAIL, w); return 0; } memmove(rep->reply, ldns_buffer_at(c->buffer, 2), rep->replysize); - ldns_buffer_write_u16_at(worker->query_reply.c->buffer, 0, - worker->query_id); - reply_info_answer(rep, worker->query_flags, worker->query_reply.c-> - buffer); - comm_point_send_reply(&worker->query_reply); - if(worker->num_requests == 1) { - /* no longer at max, start accepting again. */ - listen_resume(worker->front); - } - worker->num_requests --; + ldns_buffer_write_u16_at(w->query_reply.c->buffer, 0, w->query_id); + reply_info_answer(rep, w->query_flags, w->query_reply.c->buffer); + comm_point_send_reply(&w->query_reply); + req_release(w); /* store or update reply in the cache */ - if(!(e = query_info_entrysetup(&worker->qinfo, rep, - worker->query_hash))) { + if(!(e = query_info_entrysetup(&w->qinfo, rep, w->query_hash))) { log_err("out of memory"); return 0; } - slabhash_insert(worker->daemon->msg_cache, worker->query_hash, + slabhash_insert(w->worker->daemon->msg_cache, w->query_hash, &e->entry, rep); return 0; } /** process incoming request */ static void -worker_process_query(struct worker* worker) +worker_process_query(struct worker* worker, struct work_query* w) { /* query the forwarding address */ - worker->query_id = LDNS_ID_WIRE(ldns_buffer_begin( - worker->query_reply.c->buffer)); - worker->query_flags = ldns_buffer_read_u16_at(worker-> - query_reply.c->buffer, 2); - verbose(VERB_DETAIL, "process_query ID %d", worker->query_id); - pending_udp_query(worker->back, worker->query_reply.c->buffer, + verbose(VERB_DETAIL, "process_query ID %d", w->query_id); + pending_udp_query(worker->back, w->query_reply.c->buffer, &worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT, - worker_handle_reply, worker, worker->rndstate); + worker_handle_reply, w, worker->rndstate); } /** check request sanity. Returns error code, 0 OK, or -1 discard. @@ -261,6 +261,9 @@ worker_handle_request(struct comm_point* c, void* arg, int error, int ret; hashvalue_t h; struct lruhash_entry* e; + struct query_info qinfo; + struct work_query* w; + verbose(VERB_DETAIL, "worker handle request"); if(error != NETEVENT_NOERROR) { log_err("called with err=%d", error); @@ -275,27 +278,17 @@ worker_handle_request(struct comm_point* c, void* arg, int error, comm_point_drop_reply(repinfo); return 0; } - if(worker->num_requests > 0) { - /* we could get this due to a slow tcp incoming query, - that started before we performed listen_pushback */ - verbose(VERB_DETAIL, "worker: too many incoming requests " - "active. dropping incoming query."); - comm_point_drop_reply(repinfo); - return 0; - } /* see if query is in the cache */ - if(!query_info_parse(&worker->qinfo, c->buffer)) { + if(!query_info_parse(&qinfo, c->buffer)) { LDNS_QR_SET(ldns_buffer_begin(c->buffer)); LDNS_RCODE_SET(ldns_buffer_begin(c->buffer), LDNS_RCODE_FORMERR); return 1; } - h = query_info_hash(&worker->qinfo); - if((e=slabhash_lookup(worker->daemon->msg_cache, h, &worker->qinfo, - 0))) { + h = query_info_hash(&qinfo); + if((e=slabhash_lookup(worker->daemon->msg_cache, h, &qinfo, 0))) { /* answer from cache */ log_info("answer from the cache"); - query_info_clear(&worker->qinfo); /* id is still in the buffer, no need to touch it */ reply_info_answer((struct reply_info*)e->data, ldns_buffer_read_u16_at(c->buffer, 2), c->buffer); @@ -303,16 +296,40 @@ worker_handle_request(struct comm_point* c, void* arg, int error, return 1; } ldns_buffer_rewind(c->buffer); + /* perform memory allocation(s) */ + if(!query_info_allocqname(&qinfo)) { + comm_point_drop_reply(repinfo); + return 0; + } - /* answer it */ - worker->query_hash = h; + /* grab a work request structure for this new request */ + if(!worker->free_queries) { + /* we could get this due to a slow tcp incoming query, + that started before we performed listen_pushback */ + verbose(VERB_DETAIL, "worker: too many incoming requests " + "active. dropping incoming query."); + comm_point_drop_reply(repinfo); + return 0; + } + w = worker->free_queries; + worker->free_queries = w->next; worker->num_requests ++; - if(worker->num_requests >= 1) { + log_assert(worker->num_requests <= worker->request_size); + if(worker->num_requests == worker->request_size) { /* the max request number has been reached, stop accepting */ listen_pushback(worker->front); } - memcpy(&worker->query_reply, repinfo, sizeof(struct comm_reply)); - worker_process_query(worker); + + /* init request */ + w->next = NULL; + w->query_hash = h; + memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply)); + memcpy(&w->qinfo, &qinfo, sizeof(struct query_info)); + w->query_id = LDNS_ID_WIRE(ldns_buffer_begin(c->buffer)); + w->query_flags = ldns_buffer_read_u16_at(c->buffer, 2); + + /* answer it */ + worker_process_query(worker, w); return 0; } @@ -377,6 +394,22 @@ worker_create(struct daemon* daemon, int id) return worker; } +/** create request handling structures */ +static int +reqs_init(struct worker* worker) +{ + int i; + for(i=0; irequest_size; i++) { + struct work_query* q = (struct work_query*)calloc(1, + sizeof(struct work_query)); + if(!q) return 0; + q->worker = worker; + q->next = worker->free_queries; + worker->free_queries = q; + } + return 1; +} + int worker_init(struct worker* worker, struct config_file *cfg, struct listen_port* ports, size_t buffer_size, int do_sigs) @@ -451,6 +484,11 @@ worker_init(struct worker* worker, struct config_file *cfg, return 0; } } + worker->request_size = 1; + if(!reqs_init(worker)) { + worker_delete(worker); + return 0; + } /* set forwarder address */ if(cfg->fwd_address && cfg->fwd_address[0]) { diff --git a/daemon/worker.h b/daemon/worker.h index eff1141d3..5c83a1a09 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -64,6 +64,23 @@ enum worker_commands { worker_cmd_quit }; +/** information per query that is in processing */ +struct work_query { + /** next query in freelist */ + struct work_query* next; + /** the worker for this query */ + struct worker* worker; + /** the query reply destination, packet buffer and where to send. */ + struct comm_reply query_reply; + /** the query_info structure from the query */ + struct query_info qinfo; + /** hash value of the query qinfo */ + hashvalue_t query_hash; + /** id of query */ + uint16_t query_id; + /** flags uint16 from query */ + uint16_t query_flags; +}; /** * Structure holding working information for unbound. @@ -93,16 +110,10 @@ struct worker { /** number of requests currently active */ int num_requests; - /** our one and only query, packet buffer and where to send. */ - struct comm_reply query_reply; - /** id of query */ - uint16_t query_id; - /** flags uint16 from query */ - uint16_t query_flags; - /** the query_info structure from the query */ - struct query_info qinfo; - /** hash value of the query qinfo */ - hashvalue_t query_hash; + /** number of requests that can be handled by this worker */ + int request_size; + /** the free working queries */ + struct work_query* free_queries; /** address to forward to */ struct sockaddr_storage fwd_addr; diff --git a/doc/Changelog b/doc/Changelog index 11d27cb15..80c1cd8e2 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -5,6 +5,7 @@ - created test that checks if items drop out of the cache. - added word 'partitioned hash table' to documentation on slab hash. A slab hash is a partitioned hash table. + - worker can handle multiple queries at a time. 26 March 2007: Wouter - config settings for slab hash message cache. diff --git a/util/data/msgreply.c b/util/data/msgreply.c index 874685b32..72bcf550b 100644 --- a/util/data/msgreply.c +++ b/util/data/msgreply.c @@ -79,22 +79,25 @@ int query_info_parse(struct query_info* m, ldns_buffer* query) log_assert(ldns_buffer_position(query) == 0); m->has_cd = (int)LDNS_CD_WIRE(q); ldns_buffer_skip(query, LDNS_HEADER_SIZE); - q = ldns_buffer_current(query); + m->qname = ldns_buffer_current(query); if((m->qnamesize = query_dname_len(query)) == 0) return 0; /* parse error */ + if(ldns_buffer_remaining(query) < 4) + return 0; /* need qtype, qclass */ + m->qtype = ldns_buffer_read_u16(query); + m->qclass = ldns_buffer_read_u16(query); + return 1; +} + +int +query_info_allocqname(struct query_info* m) +{ + uint8_t* q = m->qname; if(!(m->qname = (uint8_t*)malloc(m->qnamesize))) { - log_err("query_info_parse: out of memory"); + log_err("query_info_allocqname: out of memory"); return 0; /* out of memory */ } memmove(m->qname, q, m->qnamesize); - - if(ldns_buffer_remaining(query) < 4) { - free(m->qname); - m->qname = NULL; - return 0; /* need qtype, qclass */ - } - m->qtype = ldns_buffer_read_u16(query); - m->qclass = ldns_buffer_read_u16(query); return 1; } diff --git a/util/data/msgreply.h b/util/data/msgreply.h index c29474a01..9d458ba7d 100644 --- a/util/data/msgreply.h +++ b/util/data/msgreply.h @@ -86,7 +86,7 @@ struct msgreply_entry { * Parse wire query into a queryinfo structure, return 0 on parse error. * initialises the (prealloced) queryinfo structure as well. sets reply to 0. * This query structure contains a pointer back info the buffer! - * This pointer avoids memory allocation. + * This pointer avoids memory allocation. * @param m: the prealloced queryinfo structure to put query into. * must be unused, or _clear()ed. * @param query: the wireformat packet query. starts with ID. @@ -94,6 +94,13 @@ struct msgreply_entry { */ int query_info_parse(struct query_info* m, ldns_buffer* query); +/** + * Allocate and copy the qname (obtained from query_info_parse()). + * @param m: the queryinfo structure. + * @return: 0 on alloc failure. + */ +int query_info_allocqname(struct query_info* m); + /** * Compare two queryinfo structures, on query, * The qname is _not_ sorted in canonical ordering. -- 2.47.2