From: Wouter Wijngaards Date: Tue, 26 Jun 2007 13:06:44 +0000 (+0000) Subject: Mesh used now. X-Git-Tag: release-0.4~43 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=06cfef3252b11c2533803452d42e7804b8bb93c2;p=thirdparty%2Funbound.git Mesh used now. git-svn-id: file:///svn/unbound/trunk@423 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/daemon/daemon.c b/daemon/daemon.c index 07a74b298..f9873314e 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -186,7 +186,6 @@ static void daemon_setup_modules(struct daemon* daemon) daemon->env->worker = NULL; daemon->env->send_packet = &worker_send_packet; daemon->env->send_query = &worker_send_query; - daemon->env->remove_subqueries = &worker_slumber_subqueries; for(i=0; inum_modules; i++) { log_info("init module %d: %s", i, daemon->modfunc[i]->name); if(!(*daemon->modfunc[i]->init)(daemon->env, i)) { @@ -194,7 +193,6 @@ static void daemon_setup_modules(struct daemon* daemon) daemon->modfunc[i]->name); } } - } /** diff --git a/daemon/stats.c b/daemon/stats.c index 0ba195141..401d67170 100644 --- a/daemon/stats.c +++ b/daemon/stats.c @@ -42,6 +42,7 @@ #include "config.h" #include "daemon/stats.h" #include "daemon/worker.h" +#include "services/mesh.h" void server_stats_init(struct server_stats* stats) { @@ -51,9 +52,9 @@ void server_stats_init(struct server_stats* stats) void server_stats_querymiss(struct server_stats* stats, struct worker* worker) { stats->num_queries_missed_cache++; - stats->sum_query_list_size += worker->num_requests; - if(worker->num_requests > stats->max_query_list_size) - stats->max_query_list_size = worker->num_requests; + stats->sum_query_list_size += worker->env.mesh->all.count; + if(worker->env.mesh->all.count > stats->max_query_list_size) + stats->max_query_list_size = worker->env.mesh->all.count; } void server_stats_log(struct server_stats* stats, int threadnum) diff --git a/daemon/worker.c b/daemon/worker.c index 4c102350d..56a41b3b0 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -81,248 +81,19 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer, log_err("write socket: %s", strerror(errno)); } -/** delete subrequest */ -static void -qstate_cleanup(struct worker* worker, struct module_qstate* qstate) -{ - int i; - if(!qstate) - return; - /* call de-init while all is OK */ - for(i=0; idaemon->num_modules; i++) - (*worker->daemon->modfunc[i]->clear)(qstate, i); - /* cleanup this query */ - region_free_all(qstate->region); - query_info_clear(&qstate->qinfo); - if(qstate->parent) { - /* subquery of parent */ - module_subreq_remove(&qstate->parent->subquery_first, qstate); - region_destroy(qstate->region); - free(qstate); - } else if (!qstate->work_info) { - /* slumbering query */ - module_subreq_remove(&worker->slumber_list, qstate); - region_destroy(qstate->region); - free(qstate); - verbose(VERB_ALGO, "cleanup: slumber list has %d entries", - module_subreq_num(worker->slumber_list)); - } -} - -/** delete subrequest recursively */ -static void -qstate_free_recurs_list(struct worker* worker, struct module_qstate* list) -{ - struct module_qstate* n; - /* remove subqueries */ - while(list) { - n = list->subquery_next; - qstate_free_recurs_list(worker, list->subquery_first); - qstate_cleanup(worker, list); - list = n; - } -} - -/** delete subrequest */ -static void -qstate_free(struct worker* worker, struct module_qstate* qstate) -{ - if(!qstate) - return; - worker_slumber_subqueries(qstate); - qstate_cleanup(worker, qstate); -} - -/** release workrequest back to the freelist, - * note that the w->qinfo still needs to be cleared after this. - */ -static void -req_release(struct work_query* w) -{ - struct worker* worker = w->state.env->worker; - if(worker->num_requests == worker->request_size) { - /* no longer at max, start accepting again. */ - listen_resume(worker->front); - } - log_assert(worker->num_requests >= 1); - worker->num_requests --; - w->next = worker->free_queries; - worker->free_queries = w; - verbose(VERB_ALGO, "released query to pool, %d in use", - (int)worker->num_requests); -} - -/** reply to query with given error code */ -static void -replyerror(int r, struct work_query* w) -{ - error_encode(w->query_reply.c->buffer, r, &w->state.qinfo, - w->query_id, w->state.query_flags, &w->state.edns); - comm_point_send_reply(&w->query_reply); - req_release(w); - query_info_clear(&w->state.qinfo); -} - -/** init qstate module states */ -static void -set_extstates_initial(struct worker* worker, struct module_qstate* qstate) -{ - int i; - for(i=0; idaemon->num_modules; i++) - qstate->ext_state[i] = module_state_initial; -} - -/** recursive debug logging of (sub)query structure */ -static void -run_debug(struct module_qstate* p, int d) -{ - char buf[80+1+1]; /* max nn=80; marker is 1, zero at end is 1 */ - int i, nn = d*2; - if(nn > 80) - nn = 80; - for(i=0; iqinfo); - for(p = p->subquery_first; p; p = p->subquery_next) { - run_debug(p, d+1); - } -} - -/** find runnable recursive */ -static struct module_qstate* -find_run_in(struct module_qstate* pfirst) -{ - struct module_qstate* q, *p; - for(p = pfirst; p; p = p->subquery_next) { - if(p->ext_state[p->curmod] == module_state_initial) - return p; - if((q=find_run_in(p->subquery_first))) - return q; - } - return NULL; -} - -/** find other runnable subqueries */ -static struct module_qstate* -find_runnable(struct module_qstate* subq) -{ - struct module_qstate* p = subq; - verbose(VERB_ALGO, "find runnable"); - if(p->subquery_next && p->subquery_next->ext_state[ - p->subquery_next->curmod] == module_state_initial) - return p->subquery_next; - while(p->parent) - p = p->parent; - if(verbosity >= VERB_ALGO) - run_debug(p, 0); - p = find_run_in(p->subquery_first); - if(p) return p; - p = find_run_in(subq->env->worker->slumber_list); - return p; -} - -/** process incoming request */ -static void -worker_process_query(struct worker* worker, struct module_qstate* qstate, - enum module_ev event, struct outbound_entry* entry) -{ - enum module_ext_state s; - verbose(VERB_DETAIL, "worker process handle event"); - if(event == module_event_new) { - qstate->curmod = 0; - set_extstates_initial(worker, qstate); - } - /* allow current module to run */ - /* loops for subqueries or parent queries. */ - while(1) { - (*worker->daemon->modfunc[qstate->curmod]->operate)(qstate, - event, qstate->curmod, entry); - region_free_all(worker->scratchpad); - qstate->reply = NULL; - s = qstate->ext_state[qstate->curmod]; - verbose(VERB_ALGO, "worker_process_query: module " - "exit state is %s", strextstate(s)); - if(s == module_state_initial) { - log_err("module exit in initial state, " - "it loops; parent query is aborted"); - while(qstate->parent) - qstate = qstate->parent; - s = module_error; - } - /* examine results, start further modules, etc. */ - if(s != module_error && s != module_finished) { - /* see if we can continue with other subrequests */ - struct module_qstate* nxt = find_runnable(qstate); - if(nxt) { - /* start submodule */ - qstate = nxt; - set_extstates_initial(worker, qstate); - entry = NULL; - event = module_event_pass; - continue; - } - } - - /* subrequest done */ - if(s == module_error && qstate->parent) { - struct module_qstate* up = qstate->parent; - qstate_free(worker, qstate); - qstate = up; - entry = NULL; - event = module_event_subq_error; - continue; - } - if(s == module_finished && qstate->parent) { - struct module_qstate* up = qstate->parent; - qstate_free(worker, qstate); - qstate = up; - entry = NULL; - event = module_event_subq_done; - continue; - } - break; - } - /* request done */ - if(s == module_error) { - if(qstate->work_info) { - replyerror(LDNS_RCODE_SERVFAIL, qstate->work_info); - } - qstate_free(worker, qstate); - verbose(VERB_DETAIL, "worker process suspend"); - return; - } - if(s == module_finished) { - if(qstate->work_info) { - memcpy(ldns_buffer_begin(qstate->work_info-> - query_reply.c->buffer), &qstate-> - work_info->query_id, sizeof(qstate-> - work_info->query_id)); - comm_point_send_reply(&qstate->work_info->query_reply); - req_release(qstate->work_info); - } - qstate_free(worker, qstate); - verbose(VERB_DETAIL, "worker process suspend"); - return; - } - /* suspend, waits for wakeup callback */ - verbose(VERB_DETAIL, "worker process suspend"); -} - /** process incoming replies from the network */ static int worker_handle_reply(struct comm_point* c, void* arg, int error, struct comm_reply* reply_info) { - struct work_query* w = (struct work_query*)arg; - struct worker* worker = w->state.env->worker; + struct module_qstate* q = (struct module_qstate*)arg; + struct worker* worker = q->env->worker; + struct outbound_entry e; + e.qstate = q; + e.qsent = NULL; - w->state.reply = reply_info; if(error != 0) { - worker_process_query(worker, &w->state, - module_event_timeout, NULL); + mesh_report_reply(worker->env.mesh, &e, 0, reply_info); return 0; } /* sanity check. */ @@ -332,11 +103,10 @@ worker_handle_reply(struct comm_point* c, void* arg, int error, || LDNS_QDCOUNT(ldns_buffer_begin(c->buffer)) > 1) { /* error becomes timeout for the module as if this reply * never arrived. */ - worker_process_query(worker, &w->state, - module_event_timeout, NULL); + mesh_report_reply(worker->env.mesh, &e, 0, reply_info); return 0; } - worker_process_query(worker, &w->state, module_event_reply, NULL); + mesh_report_reply(worker->env.mesh, &e, 1, reply_info); return 0; } @@ -349,10 +119,8 @@ worker_handle_service_reply(struct comm_point* c, void* arg, int error, struct worker* worker = e->qstate->env->worker; verbose(VERB_ALGO, "worker scvd callback for qstate %p", e->qstate); - e->qstate->reply = reply_info; if(error != 0) { - worker_process_query(worker, e->qstate, - module_event_timeout, e); + mesh_report_reply(worker->env.mesh, e, 0, reply_info); return 0; } /* sanity check. */ @@ -363,11 +131,10 @@ worker_handle_service_reply(struct comm_point* c, void* arg, int error, /* error becomes timeout for the module as if this reply * never arrived. */ verbose(VERB_ALGO, "worker: bad reply handled as timeout"); - worker_process_query(worker, e->qstate, - module_event_timeout, e); + mesh_report_reply(worker->env.mesh, e, 0, reply_info); return 0; } - worker_process_query(worker, e->qstate, module_event_reply, e); + mesh_report_reply(worker->env.mesh, e, 1, reply_info); return 0; } @@ -503,7 +270,6 @@ worker_handle_request(struct comm_point* c, void* arg, int error, hashvalue_t h; struct lruhash_entry* e; struct query_info qinfo; - struct work_query* w; struct edns_data edns; if(error != NETEVENT_NOERROR) { @@ -588,45 +354,29 @@ worker_handle_request(struct comm_point* c, void* arg, int error, } ldns_buffer_rewind(c->buffer); server_stats_querymiss(&worker->stats, worker); - /* perform memory allocation(s) */ - if(!query_info_allocqname(&qinfo)) { - comm_point_drop_reply(repinfo); - return 0; - } /* grab a work request structure for this new request */ - if(!(w = worker->free_queries)) { + if(worker->env.mesh->all.count > worker->request_size) { /* we could get this due to a slow tcp incoming query, that started before we performed listen_pushback */ verbose(VERB_DETAIL, "worker: too many incoming requests " "active. dropping incoming query."); verbose(VERB_ALGO, "currently servicing %d of %d queries", - (int)worker->num_requests, (int)worker->request_size); + (int)worker->env.mesh->all.count, + (int)worker->request_size); worker->stats.num_query_list_exceeded++; comm_point_drop_reply(repinfo); query_info_clear(&qinfo); return 0; } - w->state.edns = edns; - worker->free_queries = w->next; - worker->num_requests ++; - log_assert(worker->num_requests <= worker->request_size); - if(worker->num_requests == worker->request_size) { + mesh_new_client(worker->env.mesh, &qinfo, + ldns_buffer_read_u16_at(c->buffer, 2), + &edns, repinfo, *(uint16_t*)ldns_buffer_begin(c->buffer)); + + if(worker->env.mesh->all.count == worker->request_size) { /* the max request number has been reached, stop accepting */ listen_pushback(worker->front); } - - /* init request */ - w->next = NULL; - w->state.query_hash = h; - memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply)); - memcpy(&w->state.qinfo, &qinfo, sizeof(struct query_info)); - memcpy(&w->query_id, ldns_buffer_begin(c->buffer), sizeof(uint16_t)); - w->state.query_flags = ldns_buffer_read_u16_at(c->buffer, 2); - - /* answer it */ - w->state.buf = c->buffer; - worker_process_query(worker, &w->state, module_event_new, NULL); return 0; } @@ -694,51 +444,6 @@ worker_create(struct daemon* daemon, int id) return worker; } -/** create request handling structures */ -static int -reqs_init(struct worker* worker) -{ - size_t i; - for(i=0; irequest_size; i++) { - struct work_query* q = (struct work_query*)calloc(1, - sizeof(struct work_query)); - if(!q) return 0; - q->state.buf = worker->front->udp_buff; - q->state.region = region_create_custom(malloc, free, 1024, - 64, 16, 0); - if(!q->state.region) { - free(q); - return 0; - } - q->state.env = &worker->env; - q->state.parent = NULL; - q->state.work_info = q; - q->next = worker->free_queries; - worker->free_queries = q; - q->all_next = worker->all_queries; - worker->all_queries = q; - } - return 1; -} - -/** delete request list */ -static void -reqs_delete(struct worker* worker) -{ - struct work_query* q = worker->all_queries; - struct work_query* n; - while(q) { - n = q->all_next; - log_assert(q->state.env->worker == worker); - /* comm_reply closed in outside_network_delete */ - qstate_free_recurs_list(worker, q->state.subquery_first); - qstate_cleanup(worker, &q->state); - region_destroy(q->state.region); - free(q); - q = n; - } -} - int worker_init(struct worker* worker, struct config_file *cfg, struct listen_port* ports, size_t buffer_size, int do_sigs) @@ -825,11 +530,6 @@ worker_init(struct worker* worker, struct config_file *cfg, return 0; } worker->request_size = cfg->num_queries_per_thread; - if(!reqs_init(worker)) { - worker_delete(worker); - return 0; - } - worker->slumber_list = NULL; server_stats_init(&worker->stats); alloc_init(&worker->alloc, &worker->daemon->superalloc, @@ -843,6 +543,7 @@ worker_init(struct worker* worker, struct config_file *cfg, worker->daemon->modfunc, &worker->env); worker->env.detach_subs = &mesh_detach_subs; worker->env.attach_sub = &mesh_attach_sub; + worker->env.kill_sub = &mesh_state_delete; worker->env.query_done = &mesh_query_done; worker->env.walk_supers = &mesh_walk_supers; if(!worker->env.mesh) { @@ -865,8 +566,6 @@ worker_delete(struct worker* worker) return; server_stats_log(&worker->stats, worker->thread_num); mesh_delete(worker->env.mesh); - reqs_delete(worker); - qstate_free_recurs_list(worker, worker->slumber_list); listen_delete(worker->front); outside_network_delete(worker->back); comm_signal_delete(worker->comsig); @@ -894,11 +593,11 @@ worker_send_packet(ldns_buffer* pkt, struct sockaddr_storage* addr, struct worker* worker = q->env->worker; if(use_tcp) { return pending_tcp_query(worker->back, pkt, addr, addrlen, - timeout, worker_handle_reply, q->work_info, + timeout, worker_handle_reply, q, worker->rndstate) != 0; } return pending_udp_query(worker->back, pkt, addr, addrlen, - timeout*1000, worker_handle_reply, q->work_info, + timeout*1000, worker_handle_reply, q, worker->rndstate) != 0; } @@ -934,21 +633,3 @@ worker_send_query(uint8_t* qname, size_t qnamelen, uint16_t qtype, } return e; } - -void -worker_slumber_subqueries(struct module_qstate* qstate) -{ - struct worker* worker = qstate->env->worker; - if(qstate->subquery_first) { - while(qstate->subquery_first) { - /* put subqueries on slumber list */ - struct module_qstate* s = qstate->subquery_first; - module_subreq_remove(&qstate->subquery_first, s); - s->parent = NULL; - s->work_info = NULL; - module_subreq_insert(&worker->slumber_list, s); - } - verbose(VERB_ALGO, "worker: slumber list has %d entries", - module_subreq_num(worker->slumber_list)); - } -} diff --git a/daemon/worker.h b/daemon/worker.h index 13b868058..19820343b 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -68,20 +68,6 @@ enum worker_commands { worker_cmd_quit }; -/** information per query that is in processing */ -struct work_query { - /** next query in freelist */ - struct work_query* next; - /** query state */ - struct module_qstate state; - /** the query reply destination, packet buffer and where to send. */ - struct comm_reply query_reply; - /** id of query, in network byteorder. */ - uint16_t query_id; - /** next query in all-list */ - struct work_query* all_next; -}; - /** * Structure holding working information for unbound. * Holds globally visible information. @@ -108,16 +94,8 @@ struct worker { /** commpoint to listen to commands. */ struct comm_point* cmd_com; - /** number of requests currently active */ - size_t num_requests; /** number of requests that can be handled by this worker */ size_t request_size; - /** the free working queries */ - struct work_query* free_queries; - /** list of all working queries */ - struct work_query* all_queries; - /** list of slumbering states, with promiscuous queries */ - struct module_qstate* slumber_list; /** random() table for this worker. */ struct ub_randstate* rndstate; @@ -215,10 +193,4 @@ struct outbound_entry* worker_send_query(uint8_t* qname, size_t qnamelen, struct sockaddr_storage* addr, socklen_t addrlen, struct module_qstate* q); -/** - * Remove subqueries, by moving them to the slumber list. - * @param qstate: this state has subqueries removed. - */ -void worker_slumber_subqueries(struct module_qstate* qstate); - #endif /* DAEMON_WORKER_H */ diff --git a/doc/Changelog b/doc/Changelog index 34ff75a6b..2dca432d6 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,8 @@ +26 June 2007: Wouter + - mesh is called by worker, and iterator uses it. + This removes the hierarchical code. + QueryTargets state and Finished state are merged for iterator. + 25 June 2007: Wouter - more mesh work. - error encode routine for ease. diff --git a/iterator/iterator.c b/iterator/iterator.c index 48a05d5b4..9744fbac3 100644 --- a/iterator/iterator.c +++ b/iterator/iterator.c @@ -145,31 +145,24 @@ fwd_new(struct module_qstate* qstate, int id) /** iterator handle reply from authoritative server */ static int -iter_handlereply(struct module_qstate* qstate, int id, - struct outbound_entry* ATTR_UNUSED(outbound)) +iter_handlereply(struct module_qstate* qstate, int id) { struct module_env* env = qstate->env; - uint16_t us = qstate->edns.udp_size; struct query_info reply_qinfo; struct reply_info* reply_msg; struct edns_data reply_edns; + hashvalue_t h; int r; if((r=reply_info_parse(qstate->reply->c->buffer, env->alloc, &reply_qinfo, &reply_msg, qstate->env->scratch, &reply_edns))!=0) return 0; - qstate->edns.edns_version = EDNS_ADVERTISED_VERSION; - qstate->edns.udp_size = EDNS_ADVERTISED_SIZE; - qstate->edns.ext_rcode = 0; - qstate->edns.bits &= EDNS_DO; - if(!reply_info_answer_encode(&reply_qinfo, reply_msg, 0, - qstate->query_flags, qstate->buf, 0, 0, - qstate->env->scratch, us, &qstate->edns, - (int)(qstate->edns.bits&EDNS_DO))) - return 0; - dns_cache_store_msg(qstate->env, &reply_qinfo, qstate->query_hash, - reply_msg); + h = query_info_hash(&qstate->qinfo); + (*qstate->env->query_done)(qstate, LDNS_RCODE_NOERROR, reply_msg); + /* there should be no dependencies in this forwarding mode */ + (*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_SERVFAIL, NULL); + dns_cache_store_msg(qstate->env, &reply_qinfo, h, reply_msg); qstate->ext_state[id] = module_finished; return 1; } @@ -181,26 +174,45 @@ perform_forward(struct module_qstate* qstate, enum module_ev event, int id, { verbose(VERB_ALGO, "iterator: forwarding"); if(event == module_event_new) { - if(!fwd_new(qstate, id)) + if(!fwd_new(qstate, id)) { + (*qstate->env->query_done)(qstate, + LDNS_RCODE_SERVFAIL, NULL); + (*qstate->env->walk_supers)(qstate, id, + LDNS_RCODE_SERVFAIL, NULL); qstate->ext_state[id] = module_error; + return; + } return; } /* it must be a query reply */ if(!outbound) { verbose(VERB_ALGO, "query reply was not serviced"); + (*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL); + (*qstate->env->walk_supers)(qstate, id, + LDNS_RCODE_SERVFAIL, NULL); qstate->ext_state[id] = module_error; return; } if(event == module_event_timeout || event == module_event_error) { + (*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL); + (*qstate->env->walk_supers)(qstate, id, + LDNS_RCODE_SERVFAIL, NULL); qstate->ext_state[id] = module_error; return; } if(event == module_event_reply) { - if(!iter_handlereply(qstate, id, outbound)) + if(!iter_handlereply(qstate, id)) { + (*qstate->env->query_done)(qstate, + LDNS_RCODE_SERVFAIL, NULL); + (*qstate->env->walk_supers)(qstate, id, + LDNS_RCODE_SERVFAIL, NULL); qstate->ext_state[id] = module_error; + } return; } log_err("bad event for iterator[forwarding]"); + (*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL); + (*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_SERVFAIL, NULL); qstate->ext_state[id] = module_error; } @@ -247,8 +259,54 @@ final_state(struct iter_qstate* iq) return next_state(iq, iq->final_state); } +/** + * Callback routine to handle errors in parent query states + * @param qstate: query state that failed. + * @param id: module id. + * @param super: super state. + * @param rcode: the error code. + */ +static void +error_supers(struct module_qstate* qstate, int id, + struct module_qstate* super, int rcode) +{ + struct iter_qstate* super_iq = (struct iter_qstate*)super->minfo[id]; + log_assert(rcode != LDNS_RCODE_NOERROR); + + if(qstate->qinfo.qtype == LDNS_RR_TYPE_A || + qstate->qinfo.qtype == LDNS_RR_TYPE_AAAA) { + /* mark address as failed. */ + struct delegpt_ns* dpns = NULL; + if(super_iq->dp) + dpns = delegpt_find_ns(super_iq->dp, + qstate->qinfo.qname, qstate->qinfo.qname_len); + if(!dpns) { + /* not interested */ + verbose(VERB_ALGO, "subq error, but not interested"); + log_query_info(VERB_ALGO, "superq", &super->qinfo); + delegpt_log(super_iq->dp); + log_assert(0); + return; + } + dpns->resolved = 1; /* mark as failed */ + super_iq->num_target_queries--; + } + if(qstate->qinfo.qtype == LDNS_RR_TYPE_NS) { + /* prime failed to get delegation */ + super_iq->dp = NULL; + } + /* evaluate targets again */ + super_iq->state = QUERYTARGETS_STATE; + /* super becomes runnable, and will process this change */ +} + /** * Return an error to the client + * @param qstate: our query state + * @param id: module id + * @param rcode: error code (DNS errcode). + * @return: 0 for use by caller, to make notation easy, like: + * return error_response(..). */ static int error_response(struct module_qstate* qstate, int id, int rcode) @@ -256,21 +314,10 @@ error_response(struct module_qstate* qstate, int id, int rcode) verbose(VERB_DETAIL, "return error response %s", ldns_lookup_by_id(ldns_rcodes, rcode)? ldns_lookup_by_id(ldns_rcodes, rcode)->name:"??"); - qinfo_query_encode(qstate->buf, &qstate->qinfo); - LDNS_RCODE_SET(ldns_buffer_begin(qstate->buf), rcode); - LDNS_RA_SET(ldns_buffer_begin(qstate->buf)); - LDNS_QR_SET(ldns_buffer_begin(qstate->buf)); - if((qstate->query_flags & BIT_RD)) - LDNS_RD_SET(ldns_buffer_begin(qstate->buf)); - if((qstate->query_flags & BIT_CD)) - LDNS_CD_SET(ldns_buffer_begin(qstate->buf)); - - if(qstate->parent) { - /* return subquery error module event to parent */ - qstate->ext_state[id] = module_error; - return 0; - } - /* return to client */ + /* tell clients that we failed */ + (*qstate->env->query_done)(qstate, rcode, NULL); + /* tell our parents that we failed */ + (*qstate->env->walk_supers)(qstate, id, rcode, &error_supers); qstate->ext_state[id] = module_finished; return 0; } @@ -304,42 +351,6 @@ iter_prepend(struct iter_qstate* iq, struct dns_msg* msg, return 1; } -/** - * Encode response message for iterator responses. Into response buffer. - * On error an error message is encoded. - * @param qstate: query state. With qinfo information. - * @param iq: iterator query state. With prepend list. - * @param msg: answer message. - * @param id: module id (used in error condition). - */ -static void -iter_encode_respmsg(struct module_qstate* qstate, struct iter_qstate* iq, - struct dns_msg* msg, int id) -{ - struct edns_data edns; - if(iq->prepend_list) { - if(!iter_prepend(iq, msg, qstate->region)) { - log_err("prepend rrsets: out of memory"); - error_response(qstate, id, LDNS_RCODE_SERVFAIL); - return; - } - } - - edns.edns_present = qstate->edns.edns_present; - edns.edns_version = EDNS_ADVERTISED_VERSION; - edns.udp_size = EDNS_ADVERTISED_SIZE; - edns.ext_rcode = 0; - edns.bits = qstate->edns.bits & EDNS_DO; - if(!reply_info_answer_encode(&qstate->qinfo, msg->rep, 0, - qstate->query_flags, qstate->buf, 0, 1, - qstate->env->scratch, qstate->edns.udp_size, - &edns, (int)(qstate->edns.bits & EDNS_DO))) { - /* encode servfail */ - error_response(qstate, id, LDNS_RCODE_SERVFAIL); - return; - } -} - /** * Add rrset to prepend list * @param qstate: query state. @@ -434,68 +445,30 @@ handle_cname_response(struct module_qstate* qstate, struct iter_qstate* iq, * need iterative processing * @param final_state The final state for the response to this * request. - * @return generated subquerystate, or NULL on error (malloc). + * @param subq_ret: if newly allocated, the subquerystate, or NULL if it does + * not need initialisation. + * @return false on error (malloc). */ -static struct module_qstate* +static int generate_sub_request(uint8_t* qname, size_t qnamelen, uint16_t qtype, uint16_t qclass, struct module_qstate* qstate, int id, struct iter_qstate* iq, enum iter_state initial_state, - enum iter_state final_state) + enum iter_state final_state, struct module_qstate** subq_ret) { - struct module_qstate* subq = (struct module_qstate*)malloc( - sizeof(struct module_qstate)); - struct iter_qstate* subiq; - if(!subq) - return NULL; - memset(subq, 0, sizeof(*subq)); - subq->qinfo.qname = memdup(qname, qnamelen); - if(!subq->qinfo.qname) { - free(subq); - return NULL; - } - subq->qinfo.qname_len = qnamelen; - subq->qinfo.qtype = qtype; - subq->qinfo.qclass = qclass; - subq->query_hash = query_info_hash(&subq->qinfo); - subq->query_flags = 0; /* OPCODE QUERY, no flags */ - subq->edns.udp_size = 65535; - subq->buf = qstate->buf; - subq->env = qstate->env; - subq->env->scratch = qstate->env->scratch; - subq->region = region_create(malloc, free); - if(!subq->region) { - free(subq->qinfo.qname); - free(subq); - return NULL; - } - subq->curmod = id; - subq->ext_state[id] = module_state_initial; - subq->minfo[id] = region_alloc(subq->region, - sizeof(struct iter_qstate)); - if(!subq->minfo[id]) { - region_destroy(subq->region); - free(subq->qinfo.qname); - free(subq); - return NULL; - } - subq->work_info = NULL; - subq->parent = qstate; - module_subreq_insert(&qstate->subquery_first, subq); - - subiq = (struct iter_qstate*)subq->minfo[id]; - memset(subiq, 0, sizeof(*subiq)); - subiq->num_target_queries = 0; - subiq->num_current_queries = 0; - subiq->depth = iq->depth+1; - outbound_list_init(&subiq->outlist); - subiq->state = initial_state; - subiq->final_state = final_state; - subiq->qchase = subq->qinfo; + struct module_qstate* subq = NULL; + struct iter_qstate* subiq = NULL; + uint16_t qflags = 0; /* OPCODE QUERY, no flags */ + struct query_info qinf; + int prime = (final_state == PRIME_RESP_STATE)?1:0; + qinf.qname = qname; + qinf.qname_len = qnamelen; + qinf.qtype = qtype; + qinf.qclass = qclass; /* RD should be set only when sending the query back through the INIT * state. */ if(initial_state == INIT_REQUEST_STATE) - subq->query_flags |= BIT_RD; + qflags |= BIT_RD; /* We set the CD flag so we can send this through the "head" of * the resolution chain, which might have a validator. We are * uninterested in validating things not on the direct resolution @@ -503,9 +476,35 @@ generate_sub_request(uint8_t* qname, size_t qnamelen, uint16_t qtype, /* Turned off! CD does not make a difference in query results. qstate->query_flags |= BIT_CD; */ - subiq->chase_flags = subq->query_flags; - - return subq; + + /* attach subquery, lookup existing or make a new one */ + if(!(*qstate->env->attach_sub)(qstate, &qinf, qflags, prime, &subq)) { + return 0; + } + *subq_ret = subq; + if(subq) { + /* initialise the new subquery */ + subq->curmod = id; + subq->ext_state[id] = module_state_initial; + subq->minfo[id] = region_alloc(subq->region, + sizeof(struct iter_qstate)); + if(!subq->minfo[id]) { + log_err("init subq: out of memory"); + (*qstate->env->kill_sub)(subq); + return 0; + } + subiq = (struct iter_qstate*)subq->minfo[id]; + memset(subiq, 0, sizeof(*subiq)); + subiq->num_target_queries = 0; + subiq->num_current_queries = 0; + subiq->depth = iq->depth+1; + outbound_list_init(&subiq->outlist); + subiq->state = initial_state; + subiq->final_state = final_state; + subiq->qchase = subq->qinfo; + subiq->chase_flags = subq->query_flags; + } + return 1; } /** @@ -522,7 +521,6 @@ prime_root(struct module_qstate* qstate, struct iter_qstate* iq, { struct delegpt* dp; struct module_qstate* subq; - struct iter_qstate* subiq; verbose(VERB_ALGO, "priming . NS %s", ldns_lookup_by_id(ldns_rr_classes, (int)qclass)? ldns_lookup_by_id(ldns_rr_classes, (int)qclass)->name:"??"); @@ -533,19 +531,21 @@ prime_root(struct module_qstate* qstate, struct iter_qstate* iq, } /* Priming requests start at the QUERYTARGETS state, skipping * the normal INIT state logic (which would cause an infloop). */ - subq = generate_sub_request((uint8_t*)"\000", 1, LDNS_RR_TYPE_NS, - qclass, qstate, id, iq, QUERYTARGETS_STATE, PRIME_RESP_STATE); - if(!subq) { + if(!generate_sub_request((uint8_t*)"\000", 1, LDNS_RR_TYPE_NS, + qclass, qstate, id, iq, QUERYTARGETS_STATE, PRIME_RESP_STATE, + &subq)) { log_err("out of memory priming root"); return 0; } - subiq = (struct iter_qstate*)subq->minfo[id]; - - /* Set the initial delegation point to the hint. */ - subiq->dp = dp; - /* suppress any target queries. */ - subiq->num_target_queries = 0; - subiq->priming = 1; + if(subq) { + struct iter_qstate* subiq = + (struct iter_qstate*)subq->minfo[id]; + /* Set the initial delegation point to the hint. */ + subiq->dp = dp; + /* there should not be any target queries. */ + subiq->num_target_queries = 0; + subiq->priming = 1; + } /* this module stops, our submodule starts, and does the query. */ qstate->ext_state[id] = module_wait_subquery; @@ -574,7 +574,6 @@ prime_stub(struct module_qstate* qstate, struct iter_qstate* iq, struct delegpt* stub_dp = hints_lookup_stub(ie->hints, qname, qclass, iq->dp); struct module_qstate* subq; - struct iter_qstate* subiq; /* The stub (if there is one) does not need priming. */ if(!stub_dp) return 0; @@ -585,23 +584,26 @@ prime_stub(struct module_qstate* qstate, struct iter_qstate* iq, /* Stub priming events start at the QUERYTARGETS state to avoid the * redundant INIT state processing. */ - subq = generate_sub_request(stub_dp->name, stub_dp->namelen, + if(!generate_sub_request(stub_dp->name, stub_dp->namelen, LDNS_RR_TYPE_NS, qclass, qstate, id, iq, - QUERYTARGETS_STATE, PRIME_RESP_STATE); - if(!subq) { + QUERYTARGETS_STATE, PRIME_RESP_STATE, &subq)) { log_err("out of memory priming stub"); (void)error_response(qstate, id, LDNS_RCODE_SERVFAIL); return 1; /* return 1 to make module stop, with error */ } - subiq = (struct iter_qstate*)subq->minfo[id]; - - /* Set the initial delegation point to the hint. */ - subiq->dp = stub_dp; - /* suppress any target queries -- although there wouldn't be anyway, - * since stub hints never have missing targets.*/ - subiq->num_target_queries = 0; - subiq->priming = 1; - subiq->priming_stub = 1; + if(subq) { + struct iter_qstate* subiq = + (struct iter_qstate*)subq->minfo[id]; + + /* Set the initial delegation point to the hint. */ + subiq->dp = stub_dp; + /* there should not be any target queries -- although there + * wouldn't be anyway, since stub hints never have + * missing targets. */ + subiq->num_target_queries = 0; + subiq->priming = 1; + subiq->priming_stub = 1; + } /* this module stops, our submodule starts, and does the query. */ qstate->ext_state[id] = module_wait_subquery; @@ -853,22 +855,22 @@ static int generate_target_query(struct module_qstate* qstate, struct iter_qstate* iq, int id, uint8_t* name, size_t namelen, uint16_t qtype, uint16_t qclass) { - struct module_qstate* subq = generate_sub_request(name, namelen, qtype, - qclass, qstate, id, iq, INIT_REQUEST_STATE, TARGET_RESP_STATE); - struct iter_qstate* subiq; - if(!subq) - return 0; - subiq = (struct iter_qstate*)subq->minfo[id]; - subiq->dp = delegpt_copy(iq->dp, subq->region); - if(!subiq->dp) { - module_subreq_remove(&qstate->subquery_first, subq); - region_destroy(subq->region); - free(subq->qinfo.qname); - free(subq); + struct module_qstate* subq; + if(!generate_sub_request(name, namelen, qtype, qclass, qstate, + id, iq, INIT_REQUEST_STATE, FINISHED_STATE, &subq)) return 0; + if(subq) { + struct iter_qstate* subiq = + (struct iter_qstate*)subq->minfo[id]; + subiq->dp = delegpt_copy(iq->dp, subq->region); + if(!subiq->dp) { + log_err("init targetq: out of memory"); + (*qstate->env->kill_sub)(subq); + return 0; + } + delegpt_log(subiq->dp); } log_nametypeclass(VERB_DETAIL, "new target", name, qtype, qclass); - delegpt_log(subiq->dp); return 1; } @@ -1077,7 +1079,7 @@ processQueryTargets(struct module_qstate* qstate, struct iter_qstate* iq, } /* move other targets to slumber list */ if(iq->num_target_queries>0) { - (*qstate->env->remove_subqueries)(qstate); + (*qstate->env->detach_subs)(qstate); iq->num_target_queries = 0; } @@ -1140,7 +1142,7 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq, /* close down outstanding requests to be discarded */ outbound_list_clear(&iq->outlist); iq->num_current_queries = 0; - (*qstate->env->remove_subqueries)(qstate); + (*qstate->env->detach_subs)(qstate); iq->num_target_queries = 0; return final_state(iq); } else if(type == RESPONSE_TYPE_REFERRAL) { @@ -1168,7 +1170,7 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq, */ outbound_list_clear(&iq->outlist); iq->num_current_queries = 0; - (*qstate->env->remove_subqueries)(qstate); + (*qstate->env->detach_subs)(qstate); iq->num_target_queries = 0; verbose(VERB_ALGO, "cleared outbound list for next round"); return next_state(iq, QUERYTARGETS_STATE); @@ -1204,7 +1206,7 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq, */ outbound_list_clear(&iq->outlist); iq->num_current_queries = 0; - (*qstate->env->remove_subqueries)(qstate); + (*qstate->env->detach_subs)(qstate); iq->num_target_queries = 0; verbose(VERB_ALGO, "cleared outbound list for query restart"); /* go to INIT_REQUEST_STATE for new qname. */ @@ -1237,36 +1239,29 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq, return next_state(iq, QUERYTARGETS_STATE); } -/** - * This handles the response to a priming query. This is used to handle both - * root and stub priming responses. This is basically the equivalent of the - * QUERY_RESP_STATE, but will not handle CNAME responses and will treat - * REFERRALs as ANSWERS. It will also update and reactivate the originating - * event. +/** + * Return priming query results to interestes super querystates. + * + * Sets the delegation point and delegation message (not nonRD queries). + * This is a callback from walk_supers. * - * @param qstate: query state. - * @param iq: iterator query state. + * @param qstate: priming query state that finished. * @param id: module id. - * @return true if the event needs more immediate processing, false if not. - * This state always returns false. + * @param forq: the qstate for which priming has been done. + * @param rcode: error code. */ -static int -processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq, - int id) +static void +prime_supers(struct module_qstate* qstate, int id, + struct module_qstate* forq, int rcode) { - struct module_qstate* forq = qstate->parent; - struct iter_qstate* foriq; + struct iter_qstate* iq = (struct iter_qstate*)qstate->minfo[id]; + struct iter_qstate* foriq = (struct iter_qstate*)forq->minfo[id]; struct delegpt* dp = NULL; enum response_type type = response_type_from_server(iq->response, &iq->qchase, iq->dp); - /* This event is finished. */ - qstate->ext_state[id] = module_finished; - - if(!qstate->parent) { - /* no more parent - it is not interested anymore */ - return 0; - } + log_assert(rcode == LDNS_RCODE_NOERROR); + log_assert(iq->priming || iq->priming_stub); if(type == RESPONSE_TYPE_ANSWER) { /* Convert our response to a delegation point */ dp = delegpt_from_message(iq->response, forq->region); @@ -1276,17 +1271,20 @@ processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq, * the ANSWER type was (presumably) a negative answer. */ verbose(VERB_ALGO, "prime response was not a positive " "ANSWER; failing"); - return error_response(qstate, id, LDNS_RCODE_SERVFAIL); + foriq->dp = NULL; + foriq->state = QUERYTARGETS_STATE; + return; } log_query_info(VERB_DETAIL, "priming successful for", &iq->qchase); delegpt_log(dp); - foriq = (struct iter_qstate*)forq->minfo[id]; foriq->dp = dp; foriq->deleg_msg = dns_copy_msg(iq->response, forq->region); if(!foriq->deleg_msg) { log_err("copy prime response: out of memory"); - return error_response(qstate, id, LDNS_RCODE_SERVFAIL); + foriq->dp = NULL; + foriq->state = QUERYTARGETS_STATE; + return; } /* root priming responses go to init stage 2, priming stub @@ -1295,6 +1293,32 @@ processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq, foriq->state = INIT_REQUEST_3_STATE; else foriq->state = INIT_REQUEST_2_STATE; /* because we are finished, the parent will be reactivated */ +} + +/** + * This handles the response to a priming query. This is used to handle both + * root and stub priming responses. This is basically the equivalent of the + * QUERY_RESP_STATE, but will not handle CNAME responses and will treat + * REFERRALs as ANSWERS. It will also update and reactivate the originating + * event. + * + * @param qstate: query state. + * @param id: module id. + * @return true if the event needs more immediate processing, false if not. + * This state always returns false. + */ +static int +processPrimeResponse(struct module_qstate* qstate, int id) +{ + /* This event is finished. */ + qstate->ext_state[id] = module_finished; + + /* there should be no outside clients subscribed tell them to + * bugger off (and retry) */ + (*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL); + /* tell interested supers that priming is done */ + (*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_NOERROR, + &prime_supers); return 0; } @@ -1304,30 +1328,26 @@ processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq, * responsible for reactiving the original event, and housekeeping related * to received target responses (caching, updating the current delegation * point, etc). + * Callback from walk_supers for every super state that is interested in + * the results from thiis query. * * @param qstate: query state. - * @param iq: iterator query state. * @param id: module id. - * @return true if the event requires more (response) processing - * immediately, false if not. This particular state always returns - * false. + * @param forq: super query state. + * @param rcode: if not NOERROR, an error occurred. */ -static int -processTargetResponse(struct module_qstate* qstate, struct iter_qstate* iq, - int id) +static void +processTargetResponse(struct module_qstate* qstate, int id, + struct module_qstate* forq, int rcode) { + struct iter_qstate* iq = (struct iter_qstate*)qstate->minfo[id]; + struct iter_qstate* foriq = (struct iter_qstate*)forq->minfo[id]; struct ub_packed_rrset_key* rrset; struct delegpt_ns* dpns; - struct module_qstate* forq = qstate->parent; - struct iter_qstate* foriq; - - qstate->ext_state[id] = module_finished; - if(!qstate->parent) { - /* no parent, it is not interested anymore */ - return 0; - } - foriq = (struct iter_qstate*)forq->minfo[id]; + foriq->state = QUERYTARGETS_STATE; + /* use error_response for errs*/ + log_assert(rcode == LDNS_RCODE_NOERROR); /* check to see if parent event is still interested (in orig name). */ dpns = delegpt_find_ns(foriq->dp, qstate->qinfo.qname, @@ -1337,7 +1357,10 @@ processTargetResponse(struct module_qstate* qstate, struct iter_qstate* iq, * anyways? */ /* If not, just stop processing this event */ verbose(VERB_ALGO, "subq: parent not interested anymore"); - return 0; + /* this is an error, and will cause parent to be reactivated + * even though nothing has happened */ + log_assert(0); + return; } /* Tell the originating event that this target query has finished @@ -1363,11 +1386,6 @@ processTargetResponse(struct module_qstate* qstate, struct iter_qstate* iq, } else dpns->resolved = 1; /* fail the target */ log_assert(dpns->resolved); /* one way or another it is now done */ - - /* Reactivate the forEvent, now that it has either a new target or a - * failed target. */ - foriq->state = QUERYTARGETS_STATE; - return 0; } /** @@ -1409,9 +1427,22 @@ processFinished(struct module_qstate* qstate, struct iter_qstate* iq, /* TODO: we are using a private TTL, trim the response. */ /* if (mPrivateTTL > 0){IterUtils.setPrivateTTL(resp, mPrivateTTL); } */ - /* Makes sure the final response contains the original question. */ - /* and prepends items we have to prepend. Stores reponse in buffer */ - iter_encode_respmsg(qstate, iq, iq->response, id); + /* prepend any items we have accumulated */ + if(iq->prepend_list) { + if(!iter_prepend(iq, iq->response, qstate->region)) { + log_err("prepend rrsets: out of memory"); + return error_response(qstate, id, LDNS_RCODE_SERVFAIL); + } + } + if(query_dname_compare(qstate->qinfo.qname, + iq->response->qinfo.qname) == 0) { + /* use server supplied upper/lower case */ + qstate->qinfo.qname = iq->response->qinfo.qname; + } + (*qstate->env->query_done)(qstate, LDNS_RCODE_NOERROR, + iq->response->rep); + (*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_NOERROR, + &processTargetResponse); return 0; } @@ -1453,10 +1484,7 @@ iter_handle(struct module_qstate* qstate, struct iter_qstate* iq, cont = processQueryResponse(qstate, iq, id); break; case PRIME_RESP_STATE: - cont = processPrimeResponse(qstate, iq, id); - break; - case TARGET_RESP_STATE: - cont = processTargetResponse(qstate, iq, id); + cont = processPrimeResponse(qstate, id); break; case FINISHED_STATE: cont = processFinished(qstate, iq, id); @@ -1552,50 +1580,6 @@ handle_it: outbound_list_remove(&iq->outlist, outbound); iter_handle(qstate, iq, ie, id); } -/** - * Handles subquery errors. Checks if query is still relevant, and adjusts - * the state. - * @param qstate: query state. - * @param ie: iterator shared global environment. - * @param iq: iterator query state. - * @param id: module id. - */ -static void -process_subq_error(struct module_qstate* qstate, struct iter_qstate* iq, - struct iter_env* ie, int id) -{ - struct query_info errinf; - struct delegpt_ns* dpns = NULL; - if(!query_info_parse(&errinf, qstate->buf)) { - log_err("Could not parse error from sub module"); - return; - } - if(errinf.qtype == LDNS_RR_TYPE_NS) { - /* a priming query has failed. */ - (void)error_response(qstate, id, LDNS_RCODE_SERVFAIL); - return; - } - if(errinf.qtype != LDNS_RR_TYPE_A && - errinf.qtype != LDNS_RR_TYPE_AAAA) { - log_err("Bad error from sub module"); - return; - } - /* see if we are still interested in this subquery result */ - if(iq->dp) - dpns = delegpt_find_ns(iq->dp, errinf.qname, - errinf.qname_len); - if(!dpns) { - /* not interested */ - verbose(VERB_ALGO, "got subq error, but not interested"); - log_query_info(VERB_ALGO, "errname", &errinf); - delegpt_log(iq->dp); - return; - } - dpns->resolved = 1; /* mark as failed */ - iq->num_target_queries--; /* and the query is finished */ - iq->state = QUERYTARGETS_STATE; /* evaluate targets again */ - iter_handle(qstate, iq, ie, id); -} /** iterator operate on a query */ static void @@ -1634,16 +1618,6 @@ iter_operate(struct module_qstate* qstate, enum module_ev event, int id, process_response(qstate, iq, ie, id, outbound, event); return; } - if(event == module_event_subq_done) { - /* subquery has set our state correctly */ - iter_handle(qstate, iq, ie, id); - return; - } - if(event == module_event_subq_error) { - /* need to delist subquery and continue processing */ - process_subq_error(qstate, iq, ie, id); - return; - } if(event == module_event_error) { verbose(VERB_ALGO, "got called with event error, giving up"); (void)error_response(qstate, id, LDNS_RCODE_SERVFAIL); @@ -1700,8 +1674,6 @@ iter_state_to_string(enum iter_state state) return "PRIME RESPONSE STATE"; case QUERY_RESP_STATE : return "QUERY RESPONSE STATE"; - case TARGET_RESP_STATE : - return "TARGET RESPONSE STATE"; case FINISHED_STATE : return "FINISHED RESPONSE STATE"; default : diff --git a/iterator/iterator.h b/iterator/iterator.h index 4807115d4..0c6729d41 100644 --- a/iterator/iterator.h +++ b/iterator/iterator.h @@ -135,10 +135,8 @@ enum iter_state { /** Responses to priming queries finish at this state. */ PRIME_RESP_STATE, - /** Responses to target queries start at this state. */ - TARGET_RESP_STATE, - - /** Responses that are to be returned upstream end at this state. */ + /** Responses that are to be returned upstream end at this state. + * As well as responses to target queries. */ FINISHED_STATE }; diff --git a/services/mesh.c b/services/mesh.c index a266898a4..c3b68a607 100644 --- a/services/mesh.c +++ b/services/mesh.c @@ -149,7 +149,7 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo, qinfo, qid, qflags, edns); comm_point_send_reply(rep); if(added) - mesh_state_delete(s); + mesh_state_delete(&s->s); return; } /* update statistics */ @@ -240,12 +240,14 @@ mesh_state_cleanup(struct mesh_state* mstate) } void -mesh_state_delete(struct mesh_state* mstate) +mesh_state_delete(struct module_qstate* qstate) { struct mesh_area* mesh; struct mesh_state_ref* super, ref; - if(!mstate) + struct mesh_state* mstate; + if(!qstate) return; + mstate = qstate->mesh_info; mesh = mstate->s.env->mesh; mesh_detach_subs(&mstate->s); if(!mstate->reply_list && mstate->super_set.count == 0) { @@ -350,7 +352,7 @@ timeval_subtract(struct timeval* d, struct timeval* end, struct timeval* start) #ifndef S_SPLINT_S d->tv_sec = end->tv_sec - start->tv_sec; while(end->tv_usec < start->tv_usec) { - d->tv_usec += 1000000; + end->tv_usec += 1000000; d->tv_sec--; } d->tv_usec = end->tv_usec - start->tv_usec; @@ -371,6 +373,25 @@ timeval_add(struct timeval* d, struct timeval* add) #endif } +/** divide sum of timers to get average */ +static void +timeval_divide(struct timeval* avg, struct timeval* sum, size_t d) +{ +#ifndef S_SPLINT_S + size_t leftover; + if(d == 0) { + avg->tv_sec = 0; + avg->tv_usec = 0; + return; + } + avg->tv_sec = sum->tv_sec / d; + avg->tv_usec = sum->tv_usec / d; + /* handle fraction from seconds divide */ + leftover = sum->tv_sec - avg->tv_sec*d; + avg->tv_usec += (leftover*1000000)/d; +#endif +} + /** * Send reply to mesh reply entry * @param m: mesh state to send it for. @@ -412,7 +433,7 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, } else { struct timeval duration; timeval_subtract(&duration, &end_time, &r->start_time); - verbose(VERB_ALGO, "query took %d s %d usec", + verbose(VERB_ALGO, "query took %d.%6.6d sec", (int)duration.tv_sec, (int)duration.tv_usec); m->s.env->mesh->replies_sent++; timeval_add(&m->s.env->mesh->replies_sum_wait, &duration); @@ -503,7 +524,7 @@ void mesh_run(struct mesh_area* mesh, struct mesh_state* mstate, if(s == module_error || s == module_finished) { /* must have called _done and _supers */ log_assert(mstate->debug_flags == 3); - mesh_state_delete(mstate); + mesh_state_delete(&mstate->s); } /* run more modules */ @@ -515,5 +536,17 @@ void mesh_run(struct mesh_area* mesh, struct mesh_state* mstate, (void)rbtree_delete(&mesh->run, mstate); } else mstate = NULL; } - verbose(VERB_ALGO, "mesh_run: end"); + verbose(VERB_ALGO, "mesh_run: end, %u states (%u with reply, " + "%u detached), %u total replies", (unsigned)mesh->all.count, + (unsigned)mesh->num_reply_states, + (unsigned)mesh->num_detached_states, + (unsigned)mesh->num_reply_addrs); + if(1) { + struct timeval avg; + timeval_divide(&avg, &mesh->replies_sum_wait, + mesh->replies_sent); + verbose(VERB_ALGO, "send %u replies, with average wait " + "of %d.%6.6d", (unsigned)mesh->replies_sent, + (int)avg.tv_sec, (int)avg.tv_usec); + } } diff --git a/services/mesh.h b/services/mesh.h index 58f21d2fc..0c66acd44 100644 --- a/services/mesh.h +++ b/services/mesh.h @@ -266,6 +266,13 @@ void mesh_query_done(struct module_qstate* qstate, int rcode, void mesh_walk_supers(struct module_qstate* qstate, int id, int rcode, void (*cb)(struct module_qstate*, int, struct module_qstate*, int)); +/** + * Delete mesh state, cleanup and also rbtrees and so on. + * Will detach from all super/subnodes. + * @param qstate: to remove. + */ +void mesh_state_delete(struct module_qstate* qstate); + /* ------------------- Functions for mesh -------------------- */ /** @@ -288,13 +295,6 @@ struct mesh_state* mesh_state_create(struct module_env* env, */ void mesh_state_cleanup(struct mesh_state* mstate); -/** - * Delete mesh state, cleanup and also rbtrees and so on. - * Will detach from all super/subnodes. - * @param mstate: to remove. - */ -void mesh_state_delete(struct mesh_state* mstate); - /** * Find a mesh state in the mesh area. Pass relevant flags. * diff --git a/util/module.c b/util/module.c index d383654a2..fe1dd2f0b 100644 --- a/util/module.c +++ b/util/module.c @@ -62,47 +62,7 @@ strmodulevent(enum module_ev e) case module_event_pass: return "module_event_pass"; case module_event_reply: return "module_event_reply"; case module_event_timeout: return "module_event_timeout"; - case module_event_mod_done: return "module_event_mod_done"; - case module_event_subq_done: return "module_event_subq_done"; - case module_event_subq_error: return "module_event_subq_error"; case module_event_error: return "module_event_error"; } return "bad_event_value"; } - -void -module_subreq_remove(struct module_qstate** head, struct module_qstate* sub) -{ - if(!sub || !head) - return; - /* snip off double linked list */ - if(sub->subquery_prev) - sub->subquery_prev->subquery_next = sub->subquery_next; - else *head = sub->subquery_next; - if(sub->subquery_next) - sub->subquery_next->subquery_prev = sub->subquery_prev; - /* cleanup values for robustness */ - sub->subquery_next = NULL; - sub->subquery_prev = NULL; -} - -void -module_subreq_insert(struct module_qstate** head, struct module_qstate* sub) -{ - if(*head) - (*head)->subquery_prev = sub; - sub->subquery_next = *head; - sub->subquery_prev = NULL; - *head = sub; -} - -int -module_subreq_num(struct module_qstate* q) -{ - int n = 0; - while(q) { - n++; - q = q->subquery_next; - } - return n; -} diff --git a/util/module.h b/util/module.h index 52ceaa7cb..ece262050 100644 --- a/util/module.h +++ b/util/module.h @@ -153,6 +153,15 @@ struct module_env { struct query_info* qinfo, uint16_t qflags, int prime, struct module_qstate** newq); + /** + * Kill newly attached sub. If attach_sub returns newq for + * initialisation, but that fails, then this routine will cleanup and + * delete the fresly created sub. + * @param newq: the new subquery that is no longer needed. + * It is removed. + */ + void (*kill_sub)(struct module_qstate* newq); + /** * Query state is done, send messages to reply entries. * Encode messages using reply entry values and the querystate @@ -202,17 +211,6 @@ struct module_env { struct ub_randstate* rnd; /** module specific data. indexed by module id. */ void* modinfo[MAX_MODULE]; - - /** @@@ TO BE DELETED */ - /** - * Cleanup subqueries from this query state. Either delete or - * move them somewhere else. This query state no longer needs the - * results from those subqueries. - * @param qstate: query state. - * subqueries are (re)moved so that no subq_done events from - * them will reach this qstate. - */ - void (*remove_subqueries)(struct module_qstate* qstate); }; /** @@ -247,12 +245,6 @@ enum module_ev { module_event_reply, /** timeout */ module_event_timeout, - /** other module finished */ - module_event_mod_done, - /** subquery finished */ - module_event_subq_done, - /** subquery finished with error */ - module_event_subq_error, /** error */ module_event_error }; @@ -281,24 +273,6 @@ struct module_qstate { struct module_env* env; /** mesh related information for this query */ struct mesh_state* mesh_info; - - /** ----- TO DELETE */ - struct work_query* work_info; - /** hash value of the query qinfo */ - hashvalue_t query_hash; - /** edns data from the query */ - struct edns_data edns; - /** buffer, store resulting reply here. - * May be cleared when module blocks. */ - ldns_buffer* buf; - /** parent query, only nonNULL for subqueries */ - struct module_qstate* parent; - /** pointer to first subquery below this one; makes list with next */ - struct module_qstate* subquery_first; - /** pointer to next sibling subquery (not above or below this one) */ - struct module_qstate* subquery_next; - /** pointer to prev sibling subquery (not above or below this one) */ - struct module_qstate* subquery_prev; }; /** @@ -361,29 +335,4 @@ const char* strextstate(enum module_ext_state s); */ const char* strmodulevent(enum module_ev e); -/** - * Remove subqrequest from list. - * @param head: List head. pointer to start of subquery_next/prev sibling list. - * mostly reference to the parent subquery_first. - * @param sub: subrequest. It is snipped off. - */ -void module_subreq_remove(struct module_qstate** head, - struct module_qstate* sub); - -/** - * Insert subqrequest in list. You must set the parent ptr of sub correctly. - * @param head: List head. pointer to start of subquery_next/prev sibling list. - * mostly reference to the parent subquery_first. - * @param sub: subrequest. It is added to the list. - */ -void module_subreq_insert(struct module_qstate** head, - struct module_qstate* sub); - -/** - * Calculate number of queries in the query list. - * @param q: the start of the list, pass subquery_first. - * @return: number, 0 if q was NULL. - */ -int module_subreq_num(struct module_qstate* q); - #endif /* UTIL_MODULE_H */