]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
Pushback when full makes all threads help out.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 27 Feb 2007 11:25:44 +0000 (11:25 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 27 Feb 2007 11:25:44 +0000 (11:25 +0000)
git-svn-id: file:///svn/unbound/trunk@154 be551aaa-1e26-0410-a405-d3ace91eadb9

daemon/worker.c
doc/Changelog
services/listen_dnsport.c
services/listen_dnsport.h
testcode/fake_event.c
util/netevent.c

index c2f5eccdeedd0d5945f84332c228fde7f83d1d23..58b17871c840ca05359ecbc4bc28767f80e44f1f 100644 (file)
@@ -81,6 +81,10 @@ replyerror(int r, struct worker* worker)
        LDNS_QR_SET(ldns_buffer_begin(worker->query_reply.c->buffer));
        LDNS_RCODE_SET(ldns_buffer_begin(worker->query_reply.c->buffer), r);
        comm_point_send_reply(&worker->query_reply);
+       if(worker->num_requests == 1)  {
+               /* no longer at max, start accepting again. */
+               listen_resume(worker->front);
+       }
        worker->num_requests --;
 }
 
@@ -90,6 +94,7 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
        struct comm_reply* ATTR_UNUSED(reply_info))
 {
        struct worker* worker = (struct worker*)arg;
+       log_info("reply to query with stored ID %d", worker->query_id);
        LDNS_ID_SET(ldns_buffer_begin(worker->query_reply.c->buffer),
                worker->query_id);
        if(error != 0) {
@@ -105,6 +110,10 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
        LDNS_QR_SET(ldns_buffer_begin(worker->query_reply.c->buffer));
        ldns_buffer_flip(worker->query_reply.c->buffer);
        comm_point_send_reply(&worker->query_reply);
+       if(worker->num_requests == 1)  {
+               /* no longer at max, start accepting again. */
+               listen_resume(worker->front);
+       }
        worker->num_requests --;
        return 0;
 }
@@ -116,6 +125,7 @@ worker_process_query(struct worker* worker)
        /* query the forwarding address */
        worker->query_id = LDNS_ID_WIRE(ldns_buffer_begin(
                worker->query_reply.c->buffer));
+       log_info("stored in process_query ID %d", worker->query_id);
        pending_udp_query(worker->back, worker->query_reply.c->buffer, 
                &worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
                worker_handle_reply, worker, worker->rndstate);
@@ -215,6 +225,8 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
                return 0;
        }
        if(worker->num_requests > 0) {
+               /* we could get this due to a slow tcp incoming query, 
+                  that started before we performed listen_pushback */
                verbose(VERB_DETAIL, "worker: too many incoming requests "
                        "active. dropping incoming query.");
                comm_point_drop_reply(repinfo);
@@ -222,6 +234,10 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
        }
        /* answer it */
        worker->num_requests ++;
+       if(worker->num_requests >= 1)  {
+               /* the max request number has been reached, stop accepting */
+               listen_pushback(worker->front);
+       }
        memcpy(&worker->query_reply, repinfo, sizeof(struct comm_reply));
        worker_process_query(worker);
        return 0;
index 36a01189b4e53f257d9d2ab85b86fe03dd377389..cb853ee29ed227eee9e9a2c1c3b4cf11a64fc56b 100644 (file)
@@ -6,6 +6,9 @@
        - During reloads the daemon will temporarily handle signals,
          so that they do not result in problems.
        - Also randomize the outgoing port range for tests.
+       - If query list is full, will stop selecting listening ports for read.
+         This makes all threads service incoming requests, instead of one.
+         No memory is leaking during reloads, service of queries, etc.
 
 26 February 2007: Wouter
        - ub_random code used to select ID and port.
index f44c49ea3f1deaf5e21812c7db3a1b223f07b57e..d3d3c6d8518face098b273805d1f459234fdd174 100644 (file)
@@ -358,6 +358,32 @@ listen_delete(struct listen_dnsport* front)
        free(front);
 }
 
+void listen_pushback(struct listen_dnsport* listen)
+{
+       struct listen_list *p;
+       log_assert(listen);
+       for(p = listen->cps; p; p = p->next)
+       {
+               if(p->com->type != comm_udp &&
+                       p->com->type != comm_tcp_accept)
+                       continue;
+               comm_point_stop_listening(p->com);
+       }
+}
+
+void listen_resume(struct listen_dnsport* listen)
+{
+       struct listen_list *p;
+       log_assert(listen);
+       for(p = listen->cps; p; p = p->next)
+       {
+               if(p->com->type != comm_udp &&
+                       p->com->type != comm_tcp_accept)
+                       continue;
+               comm_point_start_listening(p->com, -1, -1);
+       }
+}
+
 struct listen_port* 
 listening_ports_open(struct config_file* cfg)
 {
index ce774697a39daab0e93d94616d85af9b54882895..2dff15b56bf0f0decf529f162725bff4a7ed7011 100644 (file)
@@ -117,6 +117,25 @@ struct listen_dnsport* listen_create(struct comm_base* base,
        struct listen_port* ports, size_t bufsize, 
        comm_point_callback_t* cb, void* cb_arg);
 
+/**
+ * Stop listening to the dnsports. Ports are still open but not checked
+ * for readability - performs pushback of the load.
+ * @param listen: the listening structs to stop listening on. Note that
+ *    udp and tcp-accept handlers stop, but ongoing tcp-handlers are kept
+ *    going, since its rude to 'reset connection by peer' them, instead,
+ *    we keep them and the callback will be called when its ready. It can
+ *    be dropped at that time. New tcp and udp queries can be served by 
+ *    other threads.
+ */
+void listen_pushback(struct listen_dnsport* listen);
+
+/**
+ * Start listening again to the dnsports.
+ * Call after the listen_pushback has been called.
+ * @param listen: the listening structs to stop listening on.
+ */
+void listen_resume(struct listen_dnsport* listen);
+
 /**
  * delete the listening structure
  */
index 8e697d7b85f80e50d0faa63781835550734da33c..4cb0c210ce90aeb948409c3e3e68ae06edee9ede 100644 (file)
@@ -698,6 +698,14 @@ void listening_ports_free(struct listen_port* list)
        free(list);
 }
 
+void listen_pushback(struct listen_dnsport* ATTR_UNUSED(listen))
+{
+}
+
+void listen_resume(struct listen_dnsport* ATTR_UNUSED(listen))
+{
+}
+
 struct comm_point* comm_point_create_local(struct comm_base* ATTR_UNUSED(base),
         int ATTR_UNUSED(fd), size_t ATTR_UNUSED(bufsize),
         comm_point_callback_t* ATTR_UNUSED(callback), 
index 28b6eecd0e38c508817fe5e52a9f35cd31086d50..566b4367ffd032ba389dca401b109577443d3fe4 100644 (file)
@@ -755,7 +755,7 @@ comm_point_delete(struct comm_point* c)
                free(c->tcp_handlers);
        }
        free(c->timeout);
-       if(c->type == comm_tcp)
+       if(c->type == comm_tcp || c->type == comm_local)
                ldns_buffer_free(c->buffer);
        free(c->ev);
        free(c);