From: Wouter Wijngaards Date: Tue, 27 Feb 2007 11:25:44 +0000 (+0000) Subject: Pushback when full makes all threads help out. X-Git-Tag: release-0.1~17 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9c52b0a7bd47b40f24fc0e1e0d1989b8d61bb9d7;p=thirdparty%2Funbound.git Pushback when full makes all threads help out. git-svn-id: file:///svn/unbound/trunk@154 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/daemon/worker.c b/daemon/worker.c index c2f5eccde..58b17871c 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -81,6 +81,10 @@ replyerror(int r, struct worker* worker) LDNS_QR_SET(ldns_buffer_begin(worker->query_reply.c->buffer)); LDNS_RCODE_SET(ldns_buffer_begin(worker->query_reply.c->buffer), r); comm_point_send_reply(&worker->query_reply); + if(worker->num_requests == 1) { + /* no longer at max, start accepting again. */ + listen_resume(worker->front); + } worker->num_requests --; } @@ -90,6 +94,7 @@ worker_handle_reply(struct comm_point* c, void* arg, int error, struct comm_reply* ATTR_UNUSED(reply_info)) { struct worker* worker = (struct worker*)arg; + log_info("reply to query with stored ID %d", worker->query_id); LDNS_ID_SET(ldns_buffer_begin(worker->query_reply.c->buffer), worker->query_id); if(error != 0) { @@ -105,6 +110,10 @@ worker_handle_reply(struct comm_point* c, void* arg, int error, LDNS_QR_SET(ldns_buffer_begin(worker->query_reply.c->buffer)); ldns_buffer_flip(worker->query_reply.c->buffer); comm_point_send_reply(&worker->query_reply); + if(worker->num_requests == 1) { + /* no longer at max, start accepting again. */ + listen_resume(worker->front); + } worker->num_requests --; return 0; } @@ -116,6 +125,7 @@ worker_process_query(struct worker* worker) /* query the forwarding address */ worker->query_id = LDNS_ID_WIRE(ldns_buffer_begin( worker->query_reply.c->buffer)); + log_info("stored in process_query ID %d", worker->query_id); pending_udp_query(worker->back, worker->query_reply.c->buffer, &worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT, worker_handle_reply, worker, worker->rndstate); @@ -215,6 +225,8 @@ worker_handle_request(struct comm_point* c, void* arg, int error, return 0; } if(worker->num_requests > 0) { + /* we could get this due to a slow tcp incoming query, + that started before we performed listen_pushback */ verbose(VERB_DETAIL, "worker: too many incoming requests " "active. dropping incoming query."); comm_point_drop_reply(repinfo); @@ -222,6 +234,10 @@ worker_handle_request(struct comm_point* c, void* arg, int error, } /* answer it */ worker->num_requests ++; + if(worker->num_requests >= 1) { + /* the max request number has been reached, stop accepting */ + listen_pushback(worker->front); + } memcpy(&worker->query_reply, repinfo, sizeof(struct comm_reply)); worker_process_query(worker); return 0; diff --git a/doc/Changelog b/doc/Changelog index 36a01189b..cb853ee29 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -6,6 +6,9 @@ - During reloads the daemon will temporarily handle signals, so that they do not result in problems. - Also randomize the outgoing port range for tests. + - If query list is full, will stop selecting listening ports for read. + This makes all threads service incoming requests, instead of one. + No memory is leaking during reloads, service of queries, etc. 26 February 2007: Wouter - ub_random code used to select ID and port. diff --git a/services/listen_dnsport.c b/services/listen_dnsport.c index f44c49ea3..d3d3c6d85 100644 --- a/services/listen_dnsport.c +++ b/services/listen_dnsport.c @@ -358,6 +358,32 @@ listen_delete(struct listen_dnsport* front) free(front); } +void listen_pushback(struct listen_dnsport* listen) +{ + struct listen_list *p; + log_assert(listen); + for(p = listen->cps; p; p = p->next) + { + if(p->com->type != comm_udp && + p->com->type != comm_tcp_accept) + continue; + comm_point_stop_listening(p->com); + } +} + +void listen_resume(struct listen_dnsport* listen) +{ + struct listen_list *p; + log_assert(listen); + for(p = listen->cps; p; p = p->next) + { + if(p->com->type != comm_udp && + p->com->type != comm_tcp_accept) + continue; + comm_point_start_listening(p->com, -1, -1); + } +} + struct listen_port* listening_ports_open(struct config_file* cfg) { diff --git a/services/listen_dnsport.h b/services/listen_dnsport.h index ce774697a..2dff15b56 100644 --- a/services/listen_dnsport.h +++ b/services/listen_dnsport.h @@ -117,6 +117,25 @@ struct listen_dnsport* listen_create(struct comm_base* base, struct listen_port* ports, size_t bufsize, comm_point_callback_t* cb, void* cb_arg); +/** + * Stop listening to the dnsports. Ports are still open but not checked + * for readability - performs pushback of the load. + * @param listen: the listening structs to stop listening on. Note that + * udp and tcp-accept handlers stop, but ongoing tcp-handlers are kept + * going, since its rude to 'reset connection by peer' them, instead, + * we keep them and the callback will be called when its ready. It can + * be dropped at that time. New tcp and udp queries can be served by + * other threads. + */ +void listen_pushback(struct listen_dnsport* listen); + +/** + * Start listening again to the dnsports. + * Call after the listen_pushback has been called. + * @param listen: the listening structs to stop listening on. + */ +void listen_resume(struct listen_dnsport* listen); + /** * delete the listening structure */ diff --git a/testcode/fake_event.c b/testcode/fake_event.c index 8e697d7b8..4cb0c210c 100644 --- a/testcode/fake_event.c +++ b/testcode/fake_event.c @@ -698,6 +698,14 @@ void listening_ports_free(struct listen_port* list) free(list); } +void listen_pushback(struct listen_dnsport* ATTR_UNUSED(listen)) +{ +} + +void listen_resume(struct listen_dnsport* ATTR_UNUSED(listen)) +{ +} + struct comm_point* comm_point_create_local(struct comm_base* ATTR_UNUSED(base), int ATTR_UNUSED(fd), size_t ATTR_UNUSED(bufsize), comm_point_callback_t* ATTR_UNUSED(callback), diff --git a/util/netevent.c b/util/netevent.c index 28b6eecd0..566b4367f 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -755,7 +755,7 @@ comm_point_delete(struct comm_point* c) free(c->tcp_handlers); } free(c->timeout); - if(c->type == comm_tcp) + if(c->type == comm_tcp || c->type == comm_local) ldns_buffer_free(c->buffer); free(c->ev); free(c);