]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
- Initial commit for out-of-order processing for TCP and TLS.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Fri, 11 Jan 2019 14:12:27 +0000 (14:12 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Fri, 11 Jan 2019 14:12:27 +0000 (14:12 +0000)
git-svn-id: file:///svn/unbound/trunk@5032 be551aaa-1e26-0410-a405-d3ace91eadb9

Makefile.in
doc/Changelog
services/listen_dnsport.c
services/listen_dnsport.h
services/mesh.c
services/mesh.h
testcode/fake_event.c
testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.post
testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.test
util/netevent.c
util/netevent.h

index af5b10f6507c4bb8f5cb862ec19d47bd0c94469b..0a257bda5f20c365a4376132797d680691bbcf26 100644 (file)
@@ -744,7 +744,10 @@ listen_dnsport.lo listen_dnsport.o: $(srcdir)/services/listen_dnsport.c config.h
  $(srcdir)/services/listen_dnsport.h $(srcdir)/util/netevent.h $(srcdir)/dnscrypt/dnscrypt.h \
   $(srcdir)/services/outside_network.h $(srcdir)/util/rbtree.h \
   $(srcdir)/util/log.h $(srcdir)/util/config_file.h $(srcdir)/util/net_help.h \
- $(srcdir)/sldns/sbuffer.h
+ $(srcdir)/sldns/sbuffer.h $(srcdir)/services/mesh.h $(srcdir)/util/data/msgparse.h \
+ $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h \
+ $(srcdir)/util/module.h $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h \
+ $(srcdir)/services/modstack.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h
 localzone.lo localzone.o: $(srcdir)/services/localzone.c config.h $(srcdir)/services/localzone.h \
  $(srcdir)/util/rbtree.h $(srcdir)/util/locks.h $(srcdir)/util/log.h $(srcdir)/util/storage/dnstree.h \
  $(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/data/msgreply.h \
@@ -762,7 +765,8 @@ mesh.lo mesh.o: $(srcdir)/services/mesh.c config.h $(srcdir)/services/mesh.h $(s
  $(srcdir)/util/data/msgencode.h $(srcdir)/util/timehist.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h \
  $(srcdir)/util/alloc.h $(srcdir)/util/config_file.h $(srcdir)/util/edns.h $(srcdir)/sldns/sbuffer.h \
  $(srcdir)/sldns/wire2str.h $(srcdir)/services/localzone.h $(srcdir)/util/storage/dnstree.h \
- $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h
+ $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h \
+ $(srcdir)/services/listen_dnsport.h
 modstack.lo modstack.o: $(srcdir)/services/modstack.c config.h $(srcdir)/services/modstack.h \
  $(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/log.h \
  $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \
@@ -870,7 +874,7 @@ netevent.lo netevent.o: $(srcdir)/util/netevent.c config.h $(srcdir)/util/neteve
  $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \
  $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h $(srcdir)/util/tube.h $(srcdir)/services/mesh.h \
  $(srcdir)/services/modstack.h $(srcdir)/sldns/sbuffer.h $(srcdir)/sldns/str2wire.h $(srcdir)/dnstap/dnstap.h \
-  \
+  $(srcdir)/services/listen_dnsport.h \
  
 net_help.lo net_help.o: $(srcdir)/util/net_help.c config.h $(srcdir)/util/net_help.h $(srcdir)/util/log.h \
  $(srcdir)/util/data/dname.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/module.h \
index f95c10f6b10e8b2824625773ee7e2f22be63e4e3..45e187e4c352ea6cca042bffb236e93e81b23a27 100644 (file)
@@ -1,3 +1,6 @@
+11 January 2018: Wouter
+       - Initial commit for out-of-order processing for TCP and TLS.
+
 9 January 2018: Wouter
        - Log query name for looping module errors.
 
index d4a0d6a8c1ab9e03be706ddc49c555fa2cfa18e9..311cf2dda865ab665be247d6f2ce634ade702bc4 100644 (file)
@@ -53,6 +53,8 @@
 #include "util/config_file.h"
 #include "util/net_help.h"
 #include "sldns/sbuffer.h"
+#include "services/mesh.h"
+#include "util/fptr_wlist.h"
 
 #ifdef HAVE_NETDB_H
 #include <netdb.h>
@@ -1276,11 +1278,13 @@ listen_create(struct comm_base* base, struct listen_port* ports,
                                ports->ftype == listen_type_tcp_dnscrypt)
                        cp = comm_point_create_tcp(base, ports->fd, 
                                tcp_accept_count, tcp_idle_timeout,
-                               tcp_conn_limit, bufsize, cb, cb_arg);
+                               tcp_conn_limit, bufsize, front->udp_buff,
+                               cb, cb_arg);
                else if(ports->ftype == listen_type_ssl) {
                        cp = comm_point_create_tcp(base, ports->fd, 
                                tcp_accept_count, tcp_idle_timeout,
-                               tcp_conn_limit, bufsize, cb, cb_arg);
+                               tcp_conn_limit, bufsize, front->udp_buff,
+                               cb, cb_arg);
                        cp->ssl = sslctx;
                } else if(ports->ftype == listen_type_udpancil ||
                                  ports->ftype == listen_type_udpancil_dnscrypt)
@@ -1508,3 +1512,311 @@ void listen_start_accept(struct listen_dnsport* listen)
        }
 }
 
+struct tcp_req_info*
+tcp_req_info_create(struct sldns_buffer* spoolbuf)
+{
+       struct tcp_req_info* req = (struct tcp_req_info*)malloc(sizeof(*req));
+       if(!req) {
+               log_err("malloc failure for new stream outoforder processing structure");
+               return NULL;
+       }
+       memset(req, 0, sizeof(*req));
+       req->spool_buffer = spoolbuf;
+       return req;
+}
+
+void
+tcp_req_info_delete(struct tcp_req_info* req)
+{
+       if(!req) return;
+       tcp_req_info_clear(req);
+       /* cp is pointer back to commpoint that owns this struct and
+        * called delete on us */
+       /* spool_buffer is shared udp buffer, not deleted here */
+       free(req);
+}
+
+void tcp_req_info_clear(struct tcp_req_info* req)
+{
+       struct tcp_req_open_item* open, *nopen;
+       struct tcp_req_done_item* item, *nitem;
+       if(!req) return;
+
+       /* free outstanding request mesh reply entries */
+       open = req->open_req_list;
+       while(open) {
+               nopen = open->next;
+               mesh_state_remove_reply(open->mesh, open->mesh_state, req->cp);
+               free(open);
+               open = nopen;
+       }
+       req->open_req_list = NULL;
+       req->num_open_req = 0;
+       
+       /* free pending writable result packets */
+       item = req->done_req_list;
+       while(item) {
+               nitem = item->next;
+               free(item->buf);
+               free(item);
+               item = nitem;
+       }
+       req->done_req_list = NULL;
+       req->num_done_req = 0;
+       req->read_is_closed = 0;
+}
+
+void
+tcp_req_info_remove_mesh_state(struct tcp_req_info* req, struct mesh_state* m)
+{
+       struct tcp_req_open_item* open, *prev = NULL;
+       if(!req || !m) return;
+       open = req->open_req_list;
+       while(open) {
+               if(open->mesh_state == m) {
+                       struct tcp_req_open_item* next;
+                       if(prev) prev->next = open->next;
+                       else req->open_req_list = open->next;
+                       /* caller has to manage the mesh state reply entry */
+                       next = open->next;
+                       free(open);
+                       req->num_open_req --;
+
+                       /* prev = prev; */
+                       open = next;
+                       continue;
+               }
+               prev = open;
+               open = open->next;
+       }
+}
+
+/** number of simultaneous requests a client can have */
+#define TCP_MAX_REQ_SIMULTANEOUS 10
+
+/** setup listening for read or write */
+static void
+tcp_req_info_setup_listen(struct tcp_req_info* req)
+{
+       int wr = 0;
+       int rd = 0;
+
+       if(req->cp->tcp_byte_count != 0) {
+               /* cannot change, halfway through */
+               return;
+       }
+
+       if(!req->cp->tcp_is_reading)
+               wr = 1;
+       if(req->num_open_req + req->num_done_req < TCP_MAX_REQ_SIMULTANEOUS &&
+               !req->read_is_closed)
+               rd = 1;
+       
+       if(wr) {
+               req->cp->tcp_is_reading = 0;
+               comm_point_start_listening(req->cp, -1,
+                       req->cp->tcp_timeout_msec);
+       } else if(rd) {
+               req->cp->tcp_is_reading = 1;
+               comm_point_start_listening(req->cp, -1,
+                       req->cp->tcp_timeout_msec);
+       } else {
+               comm_point_start_listening(req->cp, -1,
+                       req->cp->tcp_timeout_msec);
+               comm_point_listen_for_rw(req->cp, 0, 0);
+       }
+}
+
+/** remove first item from list of pending results */
+static struct tcp_req_done_item*
+tcp_req_info_pop_done(struct tcp_req_info* req)
+{
+       struct tcp_req_done_item* item;
+       log_assert(req->num_done_req > 0 && req->done_req_list);
+       item = req->done_req_list;
+       req->done_req_list = req->done_req_list->next;
+       req->num_done_req --;
+       return item;
+}
+
+/** Send given buffer and setup to write */
+static void
+tcp_req_info_start_write_buf(struct tcp_req_info* req, uint8_t* buf,
+       size_t len)
+{
+       sldns_buffer_clear(req->cp->buffer);
+       sldns_buffer_write(req->cp->buffer, buf, len);
+       sldns_buffer_flip(req->cp->buffer);
+
+       req->cp->tcp_is_reading = 0; /* we are now writing */
+}
+
+/** pick up the next result and start writing it to the channel */
+static void
+tcp_req_pickup_next_result(struct tcp_req_info* req)
+{
+       if(req->num_done_req > 0) {
+               /* unlist the done item from the list of pending results */
+               struct tcp_req_done_item* item = tcp_req_info_pop_done(req);
+               tcp_req_info_start_write_buf(req, item->buf, item->len);
+               free(item->buf);
+               free(item);
+       }
+}
+
+/** the read channel has closed */
+int
+tcp_req_info_handle_read_close(struct tcp_req_info* req)
+{
+       verbose(VERB_ALGO, "tcp channel read side closed %d", req->cp->fd);
+       if(req->num_done_req != 0) {
+               tcp_req_pickup_next_result(req);
+               tcp_req_info_setup_listen(req);
+               return 1;
+       }
+       if(req->num_open_req == 0 && req->num_done_req == 0)
+               return 0;
+       req->read_is_closed = 1;
+       tcp_req_info_setup_listen(req);
+       return 1;
+}
+
+void
+tcp_req_info_handle_writedone(struct tcp_req_info* req)
+{
+       /* back to reading state, we finished this write event */
+       sldns_buffer_clear(req->cp->buffer);
+       req->cp->tcp_is_reading = 1;
+       /* see if another result needs writing */
+       tcp_req_pickup_next_result(req);
+
+       /* see if there is more to write, if not stop_listening for writing */
+       /* see if new requests are allowed, if so, start_listening
+        * for reading */
+       tcp_req_info_setup_listen(req);
+}
+
+void
+tcp_req_info_handle_readdone(struct tcp_req_info* req)
+{
+       struct comm_point* c = req->cp;
+
+       /* we want to read up several requests, unless there are
+        * pending answers */
+
+       req->is_drop = 0;
+       req->is_reply = 0;
+       req->in_worker_handle = 1;
+       /* handle the current request,  */
+       fptr_ok(fptr_whitelist_comm_point(c->callback));
+       if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
+               req->in_worker_handle = 0;
+               /* there is an answer, put it up.  It is already in the
+                * c->buffer, just send it. */
+               /* since we were just reading a query, the channel is
+                * clear to write to */
+       send_it:
+               c->tcp_is_reading = 0;
+               comm_point_start_listening(c, -1, c->tcp_timeout_msec);
+               return;
+       }
+       req->in_worker_handle = 0;
+       /* it should be waiting in the mesh for recursion.
+        * If mesh failed(formerr) and called commpoint_drop_reply.  Then the
+        * mesh state has been cleared. */
+       if(req->is_drop) {
+               return;
+       }
+       /* If mesh failed(mallocfail) and called commpoint_send_reply with
+        * something like servfail then we pick up that reply below. */
+       if(req->is_reply) {
+               goto send_it;
+       }
+
+       sldns_buffer_clear(req->cp->buffer);
+       /* if pending answers, pick up an answer and start sending it */
+       tcp_req_pickup_next_result(req);
+
+       /* if answers pending, start sending answers */
+       /* read more requests if we can have more requests */
+       tcp_req_info_setup_listen(req);
+}
+
+int
+tcp_req_info_add_meshstate(struct tcp_req_info* req,
+       struct mesh_area* mesh, struct mesh_state* m)
+{
+       struct tcp_req_open_item* item;
+       log_assert(req && mesh && m);
+       item = (struct tcp_req_open_item*)malloc(sizeof(*item));
+       if(!item) return 0;
+       item->next = req->open_req_list;
+       item->mesh = mesh;
+       item->mesh_state = m;
+       req->open_req_list = item;
+       req->num_open_req++;
+       return 1;
+}
+
+/** Add a result to the result list.  At the end. */
+static int
+tcp_req_info_add_result(struct tcp_req_info* req, uint8_t* buf, size_t len)
+{
+       struct tcp_req_done_item* last = NULL;
+       struct tcp_req_done_item* item;
+       /* find last element */
+       last = req->done_req_list;
+       while(last && last->next)
+               last = last->next;
+       
+       /* create new element */
+       item = (struct tcp_req_done_item*)malloc(sizeof(*item));
+       if(!item) {
+               return 0;
+       }
+       item->next = NULL;
+       item->len = len;
+       item->buf = memdup(buf, len);
+       if(!item->buf) {
+               free(item);
+               return 0;
+       }
+
+       /* link in */
+       if(last) last->next = item;
+       else req->done_req_list = item;
+       req->num_done_req++;
+       return 1;
+}
+
+void
+tcp_req_info_send_reply(struct tcp_req_info* req)
+{
+       if(req->in_worker_handle) {
+               /* It is in the right buffer to answer straight away */
+               req->is_reply = 1;
+               return;
+       }
+       /* now that the query has been handled, that mesh_reply entry
+        * should be removed, from the tcp_req_info list */
+       /* TODO: find it, need mstate ptr */
+       /* see if we can send it straight away (we are not doing
+        * anything else).  If so, copy to buffer and start */
+       if(req->cp->tcp_is_reading && req->cp->tcp_byte_count == 0) {
+               /* buffer is free, and was ready to read new query into,
+                * but we are now going to use it to send this answer */
+               tcp_req_info_start_write_buf(req,
+                       sldns_buffer_begin(req->spool_buffer),
+                       sldns_buffer_limit(req->spool_buffer));
+               /* switch to listen to write events */
+               comm_point_stop_listening(req->cp);
+               comm_point_start_listening(req->cp, -1,
+                       req->cp->tcp_timeout_msec);
+               return;
+       }
+       /* queue up the answer behind the others already pending */
+       if(!tcp_req_info_add_result(req, sldns_buffer_begin(req->spool_buffer),
+               sldns_buffer_limit(req->spool_buffer))) {
+               log_err("malloc failure adding reply to stream result list");
+       }
+}
index 46b432d4b93438464d24aede1e07005eb4e0042f..653413bc6cfbff8698e44db9a4bea5f3664e3f5c 100644 (file)
@@ -237,4 +237,127 @@ int create_tcp_accept_sock(struct addrinfo *addr, int v6only, int* noproto,
  */
 int create_local_accept_sock(const char* path, int* noproto, int use_systemd);
 
+/**
+ * TCP request info.  List of requests outstanding on the channel, that
+ * are asked for but not yet answered back.
+ */
+struct tcp_req_info {
+       /** the TCP comm point for this.  Its buffer is used for read/write */
+       struct comm_point* cp;
+       /** the buffer to use to spool reply from mesh into,
+        * it can then be copied to the result list and written.
+        * it is a pointer to the shared udp buffer. */
+       struct sldns_buffer* spool_buffer;
+       /** are we in worker_handle function call (for recursion callback)*/
+       int in_worker_handle;
+       /** is the comm point dropped (by worker handle).
+        * That means we have to disconnect the channel. */
+       int is_drop;
+       /** is the comm point set to send_reply (by mesh new client in worker
+        * handle), if so answer is available in c.buffer */
+       int is_reply;
+       /** read channel has closed, just write pending results */
+       int read_is_closed;
+       /** number of outstanding requests */
+       int num_open_req;
+       /** list of outstanding requests */
+       struct tcp_req_open_item* open_req_list;
+       /** number of pending writeable results */
+       int num_done_req;
+       /** list of pending writable result packets, malloced one at a time */
+       struct tcp_req_done_item* done_req_list;
+};
+
+/**
+ * List of open items in TCP channel
+ */
+struct tcp_req_open_item {
+       /** next in list */
+       struct tcp_req_open_item* next;
+       /** the mesh area of the mesh_state */
+       struct mesh_area* mesh;
+       /** the mesh state */
+       struct mesh_state* mesh_state;
+};
+
+/**
+ * List of done items in TCP channel
+ */
+struct tcp_req_done_item {
+       /** next in list */
+       struct tcp_req_done_item* next;
+       /** the buffer with packet contents */
+       uint8_t* buf;
+       /** length of the buffer */
+       size_t len;
+};
+
+/**
+ * Create tcp request info structure that keeps track of open
+ * requests on the TCP channel that are resolved at the same time,
+ * and the pending results that have to get written back to that client.
+ * @param spoolbuf: shared buffer
+ * @return new structure or NULL on alloc failure.
+ */
+struct tcp_req_info* tcp_req_info_create(struct sldns_buffer* spoolbuf);
+
+/**
+ * Delete tcp request structure.  Called by owning commpoint.
+ * Removes mesh entry references and stored results from the lists.
+ * @param req: the tcp request info
+ */
+void tcp_req_info_delete(struct tcp_req_info* req);
+
+/**
+ * Clear tcp request structure.  Removes list entries, sets it up ready
+ * for the next connection.
+ * @param req: tcp request info structure.
+ */
+void tcp_req_info_clear(struct tcp_req_info* req);
+
+/**
+ * Remove mesh state entry from list in tcp_req_info.
+ * caller has to manage the mesh state reply entry in the mesh state.
+ * @param req: the tcp req info that has the entry removed from the list.
+ * @param m: the state removed from the list.
+ */
+void tcp_req_info_remove_mesh_state(struct tcp_req_info* req,
+       struct mesh_state* m);
+
+/**
+ * Handle write done of the last result packet
+ */
+void tcp_req_info_handle_writedone(struct tcp_req_info* req);
+
+/**
+ * Handle read done of a new request from the client
+ */
+void tcp_req_info_handle_readdone(struct tcp_req_info* req);
+
+/**
+ * Add mesh state to the tcp req list of open requests.
+ * So the comm_reply can be removed off the mesh reply list when
+ * the tcp channel has to be closed (for other reasons then that that
+ * request was done, eg. channel closed by client or some format error.
+ * @param req: tcp req info structure.  It keeps track of the simultaneous
+ *     requests and results on a tcp (or TLS) channel.
+ * @param mesh: mesh area for the state.
+ * @param m: mesh state to add.
+ * @return 0 on failure (malloc failure).
+ */
+int tcp_req_info_add_meshstate(struct tcp_req_info* req,
+       struct mesh_area* mesh, struct mesh_state* m);
+
+/**
+ * Send reply on tcp simultaneous answer channel.  May queue it up.
+ * @param req: request info structure.
+ */
+void tcp_req_info_send_reply(struct tcp_req_info* req);
+
+/** the read channel has closed
+ * @param req: request. remaining queries are looked up and answered. 
+ * @return zero if nothing to do, just close the tcp.
+ */
+int tcp_req_info_handle_read_close(struct tcp_req_info* req);
+
 #endif /* LISTEN_DNSPORT_H */
index 3d4403f2460b67b84b74a85c858833da94b34d4b..01771af35733bffaaca556bb1a61562c7e5a70d9 100644 (file)
@@ -61,6 +61,7 @@
 #include "services/localzone.h"
 #include "util/data/dname.h"
 #include "respip/respip.h"
+#include "services/listen_dnsport.h"
 
 /** subtract timers and the values do not overflow or become negative */
 static void
@@ -429,6 +430,7 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
        /* add reply to s */
        if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo)) {
                        log_err("mesh_new_client: out of memory; SERVFAIL");
+               servfail_mem:
                        if(!inplace_cb_reply_servfail_call(mesh->env, qinfo, &s->s,
                                NULL, LDNS_RCODE_SERVFAIL, edns, rep, mesh->env->scratch))
                                        edns->opt_list = NULL;
@@ -439,6 +441,12 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
                                mesh_state_delete(&s->s);
                        return;
        }
+       if(rep->c->tcp_req_info) {
+               if(!tcp_req_info_add_meshstate(rep->c->tcp_req_info, mesh, s)) {
+                       log_err("mesh_new_client: out of memory add tcpreqinfo");
+                       goto servfail_mem;
+               }
+       }
        /* update statistics */
        if(was_detached) {
                log_assert(mesh->num_detached_states > 0);
@@ -1031,11 +1039,14 @@ mesh_do_callback(struct mesh_state* m, int rcode, struct reply_info* rep,
  * @param rcode: if not 0, error code.
  * @param rep: reply to send (or NULL if rcode is set).
  * @param r: reply entry
+ * @param r_buffer: buffer to use for reply entry.
  * @param prev: previous reply, already has its answer encoded in buffer.
+ * @param prev_buffer: buffer for previous reply.
  */
 static void
 mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
-       struct mesh_reply* r, struct mesh_reply* prev)
+       struct mesh_reply* r, struct sldns_buffer* r_buffer,
+       struct mesh_reply* prev, struct sldns_buffer* prev_buffer)
 {
        struct timeval end_time;
        struct timeval duration;
@@ -1063,7 +1074,7 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
         * and still reuse the previous answer if they are the same, but that
         * would be complicated and error prone for the relatively minor case.
         * So we err on the side of safety. */
-       if(prev && prev->qflags == r->qflags && 
+       if(prev && prev_buffer && prev->qflags == r->qflags && 
                !prev->local_alias && !r->local_alias &&
                prev->edns.edns_present == r->edns.edns_present && 
                prev->edns.bits == r->edns.bits && 
@@ -1071,13 +1082,11 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
                edns_opt_list_compare(prev->edns.opt_list, r->edns.opt_list)
                == 0) {
                /* if the previous reply is identical to this one, fix ID */
-               if(prev->query_reply.c->buffer != r->query_reply.c->buffer)
-                       sldns_buffer_copy(r->query_reply.c->buffer, 
-                               prev->query_reply.c->buffer);
-               sldns_buffer_write_at(r->query_reply.c->buffer, 0, 
-                       &r->qid, sizeof(uint16_t));
-               sldns_buffer_write_at(r->query_reply.c->buffer, 12, 
-                       r->qname, m->s.qinfo.qname_len);
+               if(prev_buffer != r_buffer)
+                       sldns_buffer_copy(r_buffer, prev_buffer);
+               sldns_buffer_write_at(r_buffer, 0, &r->qid, sizeof(uint16_t));
+               sldns_buffer_write_at(r_buffer, 12, r->qname,
+                       m->s.qinfo.qname_len);
                comm_point_send_reply(&r->query_reply);
        } else if(rcode) {
                m->s.qinfo.qname = r->qname;
@@ -1091,8 +1100,8 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
                                &r->edns, NULL, m->s.region))
                                        r->edns.opt_list = NULL;
                }
-               error_encode(r->query_reply.c->buffer, rcode, &m->s.qinfo,
-                       r->qid, r->qflags, &r->edns);
+               error_encode(r_buffer, rcode, &m->s.qinfo, r->qid,
+                       r->qflags, &r->edns);
                comm_point_send_reply(&r->query_reply);
        } else {
                size_t udp_size = r->edns.udp_size;
@@ -1108,16 +1117,15 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
                                m->s.env->cfg, r->query_reply.c,
                                m->s.region) ||
                        !reply_info_answer_encode(&m->s.qinfo, rep, r->qid, 
-                       r->qflags, r->query_reply.c->buffer, 0, 1, 
-                       m->s.env->scratch, udp_size, &r->edns, 
-                       (int)(r->edns.bits & EDNS_DO), secure)) 
+                       r->qflags, r_buffer, 0, 1, m->s.env->scratch,
+                       udp_size, &r->edns, (int)(r->edns.bits & EDNS_DO),
+                       secure)) 
                {
                        if(!inplace_cb_reply_servfail_call(m->s.env, &m->s.qinfo, &m->s,
                        rep, LDNS_RCODE_SERVFAIL, &r->edns, NULL, m->s.region))
                                r->edns.opt_list = NULL;
-                       error_encode(r->query_reply.c->buffer, 
-                               LDNS_RCODE_SERVFAIL, &m->s.qinfo, r->qid, 
-                               r->qflags, &r->edns);
+                       error_encode(r_buffer, LDNS_RCODE_SERVFAIL,
+                               &m->s.qinfo, r->qid, r->qflags, &r->edns);
                }
                r->edns = edns_bak;
                comm_point_send_reply(&r->query_reply);
@@ -1132,19 +1140,17 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
        timeval_add(&m->s.env->mesh->replies_sum_wait, &duration);
        timehist_insert(m->s.env->mesh->histogram, &duration);
        if(m->s.env->cfg->stat_extended) {
-               uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(r->
-                       query_reply.c->buffer, 2));
+               uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(
+                       r_buffer, 2));
                if(secure) m->s.env->mesh->ans_secure++;
                m->s.env->mesh->ans_rcode[ rc ] ++;
-               if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r->
-                       query_reply.c->buffer)) == 0)
+               if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r_buffer)) == 0)
                        m->s.env->mesh->ans_nodata++;
        }
        /* Log reply sent */
        if(m->s.env->cfg->log_replies) {
                log_reply_info(0, &m->s.qinfo, &r->query_reply.addr,
-                       r->query_reply.addrlen, duration, 0,
-                       r->query_reply.c->buffer);
+                       r->query_reply.addrlen, duration, 0, r_buffer);
        }
 }
 
@@ -1152,6 +1158,7 @@ void mesh_query_done(struct mesh_state* mstate)
 {
        struct mesh_reply* r;
        struct mesh_reply* prev = NULL;
+       struct sldns_buffer* prev_buffer = NULL;
        struct mesh_cb* c;
        struct reply_info* rep = (mstate->s.return_msg?
                mstate->s.return_msg->rep:NULL);
@@ -1180,9 +1187,15 @@ void mesh_query_done(struct mesh_state* mstate)
                if(mstate->s.is_drop)
                        comm_point_drop_reply(&r->query_reply);
                else {
+                       struct sldns_buffer* r_buffer = r->query_reply.c->buffer;
+                       if(r->query_reply.c->tcp_req_info)
+                               r_buffer = r->query_reply.c->tcp_req_info->spool_buffer;
                        mesh_send_reply(mstate, mstate->s.return_rcode, rep,
-                               r, prev);
+                               r, r_buffer, prev, prev_buffer);
+                       if(r->query_reply.c->tcp_req_info)
+                               tcp_req_info_remove_mesh_state(r->query_reply.c->tcp_req_info, mstate);
                        prev = r;
+                       prev_buffer = r_buffer;
                }
        }
        mstate->replies_sent = 1;
@@ -1613,3 +1626,36 @@ void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp,
                m->prev->next = m->next;
        else    *fp = m->next;
 }
+
+void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m,
+       struct comm_point* cp)
+{
+       struct mesh_reply* n, *prev = NULL;
+       n = m->reply_list;
+       if(!n) return; /* nothing to remove, also no accounting needed */
+       while(n) {
+               if(n->query_reply.c == cp) {
+                       /* unlink it */
+                       if(prev) prev->next = n->next;
+                       else m->reply_list = n->next;
+                       /* delete it, but allocated in m region */
+                       mesh->num_reply_addrs--;
+
+                       /* prev = prev; */
+                       n = n->next;
+                       continue;
+               }
+               prev = n;
+               n = n->next;
+       }
+       /* it was not detached (because it had a reply list), could be now */
+       if(!m->reply_list && !m->cb_list
+               && m->super_set.count == 0) {
+               mesh->num_detached_states++;
+       }
+       /* if not replies any more in mstate, it is no longer a reply_state */
+       if(!m->reply_list && !m->cb_list) {
+               log_assert(mesh->num_reply_states > 0);
+               mesh->num_reply_states--;
+       }
+}
index b4ce03e7e2a0b4942b330d6dee480abe665df269..e11c06bf4e013a33010858e8e4c2fd5edfac22c2 100644 (file)
@@ -633,4 +633,14 @@ void mesh_list_insert(struct mesh_state* m, struct mesh_state** fp,
 void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp,
        struct mesh_state** lp);
 
+/**
+ * Remove mesh reply entry from the reply entry list.  Searches for
+ * the repinfo pointer.
+ * @param mesh: to update the counters.
+ * @param m: the mesh state.
+ * @param cp: the commpoint to remove from the list.
+ */
+void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m,
+       struct comm_point* cp);
+
 #endif /* SERVICES_MESH_H */
index 777ed7355ed770fe0ce37297b66d6728c1140668..5407999d5088cfa6fee385481db8d3bc69a9a3f9 100644 (file)
@@ -1802,4 +1802,18 @@ int outnet_tcp_connect(int ATTR_UNUSED(s), struct sockaddr_storage* ATTR_UNUSED(
        return 0;
 }
 
+int tcp_req_info_add_meshstate(struct tcp_req_info* ATTR_UNUSED(req),
+        struct mesh_area* ATTR_UNUSED(mesh), struct mesh_state* ATTR_UNUSED(m))
+{
+       log_assert(0);
+       return 0;
+}
+
+void
+tcp_req_info_remove_mesh_state(struct tcp_req_info* ATTR_UNUSED(req),
+       struct mesh_state* ATTR_UNUSED(m))
+{
+       log_assert(0);
+}
+
 /*********** End of Dummy routines ***********/
index e6dda048d77de0329ccc14de3f349321783ed06a..897f8cf706870762f3feb90525000480c7cd506a 100644 (file)
@@ -8,3 +8,4 @@
 . ../common.sh
 kill_pid $FWD_PID
 kill_pid $UNBOUND_PID
+cat unbound.log
index 67354d014fff99faf60dea3d0884a3c838409178..de4250c3e9eba98973ab4846db221702ea950dec 100644 (file)
@@ -6,9 +6,9 @@
 
 # check what sort of netcat we have
 if nc -h 2>&1 | grep "q secs"; then
-       ncopt="-q 3 -w 2"
+       ncopt="-q 3 -i 2"
 else
-       ncopt="-w 2"
+       ncopt="-i 2"
 fi
 
 PRE="../.."
index 37353804142086d5b69b916f1ddc73c983d1e9c0..58c65220ad67b32c59de53a32af3d736f48822b6 100644 (file)
@@ -50,6 +50,7 @@
 #include "sldns/str2wire.h"
 #include "dnstap/dnstap.h"
 #include "dnscrypt/dnscrypt.h"
+#include "services/listen_dnsport.h"
 #ifdef HAVE_OPENSSL_SSL_H
 #include <openssl/ssl.h>
 #endif
@@ -150,7 +151,8 @@ struct internal_signal {
 /** create a tcp handler with a parent */
 static struct comm_point* comm_point_create_tcp_handler(
        struct comm_base *base, struct comm_point* parent, size_t bufsize,
-        comm_point_callback_type* callback, void* callback_arg);
+       struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
+       void* callback_arg);
 
 /* -------- End of local definitions -------- */
 
@@ -988,7 +990,11 @@ tcp_callback_writer(struct comm_point* c)
        c->tcp_byte_count = 0;
        /* switch from listening(write) to listening(read) */
        comm_point_stop_listening(c);
-       comm_point_start_listening(c, -1, -1);
+       if(c->tcp_req_info) {
+               tcp_req_info_handle_writedone(c->tcp_req_info);
+       } else {
+               comm_point_start_listening(c, -1, -1);
+       }
 }
 
 /** do the callback when reading is done */
@@ -1002,9 +1008,13 @@ tcp_callback_reader(struct comm_point* c)
        c->tcp_byte_count = 0;
        if(c->type == comm_tcp)
                comm_point_stop_listening(c);
-       fptr_ok(fptr_whitelist_comm_point(c->callback));
-       if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
-               comm_point_start_listening(c, -1, c->tcp_timeout_msec);
+       if(c->tcp_req_info) {
+               tcp_req_info_handle_readdone(c->tcp_req_info);
+       } else {
+               fptr_ok(fptr_whitelist_comm_point(c->callback));
+               if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
+                       comm_point_start_listening(c, -1, c->tcp_timeout_msec);
+               }
        }
 }
 
@@ -1163,6 +1173,8 @@ ssl_handle_read(struct comm_point* c)
                        c->tcp_byte_count))) <= 0) {
                        int want = SSL_get_error(c->ssl, r);
                        if(want == SSL_ERROR_ZERO_RETURN) {
+                               if(c->tcp_req_info)
+                                       return tcp_req_info_handle_read_close(c->tcp_req_info);
                                return 0; /* shutdown, closed */
                        } else if(want == SSL_ERROR_WANT_READ) {
                                ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
@@ -1205,6 +1217,8 @@ ssl_handle_read(struct comm_point* c)
                if(r <= 0) {
                        int want = SSL_get_error(c->ssl, r);
                        if(want == SSL_ERROR_ZERO_RETURN) {
+                               if(c->tcp_req_info)
+                                       return tcp_req_info_handle_read_close(c->tcp_req_info);
                                return 0; /* shutdown, closed */
                        } else if(want == SSL_ERROR_WANT_READ) {
                                ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
@@ -1365,9 +1379,11 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
                /* read length bytes */
                r = recv(fd,(void*)sldns_buffer_at(c->buffer,c->tcp_byte_count),
                        sizeof(uint16_t)-c->tcp_byte_count, 0);
-               if(r == 0)
+               if(r == 0) {
+                       if(c->tcp_req_info)
+                               return tcp_req_info_handle_read_close(c->tcp_req_info);
                        return 0;
-               else if(r == -1) {
+               else if(r == -1) {
 #ifndef USE_WINSOCK
                        if(errno == EINTR || errno == EAGAIN)
                                return 1;
@@ -1416,6 +1432,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
        r = recv(fd, (void*)sldns_buffer_current(c->buffer), 
                sldns_buffer_remaining(c->buffer), 0);
        if(r == 0) {
+               if(c->tcp_req_info)
+                       return tcp_req_info_handle_read_close(c->tcp_req_info);
                return 0;
        } else if(r == -1) {
 #ifndef USE_WINSOCK
@@ -2523,7 +2541,8 @@ comm_point_create_udp_ancil(struct comm_base *base, int fd,
 static struct comm_point* 
 comm_point_create_tcp_handler(struct comm_base *base, 
        struct comm_point* parent, size_t bufsize,
-        comm_point_callback_type* callback, void* callback_arg)
+       struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
+       void* callback_arg)
 {
        struct comm_point* c = (struct comm_point*)calloc(1,
                sizeof(struct comm_point));
@@ -2579,6 +2598,20 @@ comm_point_create_tcp_handler(struct comm_base *base,
        c->repinfo.c = c;
        c->callback = callback;
        c->cb_arg = callback_arg;
+       if(spoolbuf) {
+               c->tcp_req_info = tcp_req_info_create(spoolbuf);
+               if(!c->tcp_req_info) {
+                       log_err("could not create tcp commpoint");
+                       sldns_buffer_free(c->buffer);
+                       free(c->timeout);
+                       free(c->ev);
+                       free(c);
+                       return NULL;
+               }
+               c->tcp_req_info->cp = c;
+               c->tcp_do_close = 1;
+               c->tcp_do_toggle_rw = 0;
+       }
        /* add to parent free list */
        c->tcp_free = parent->tcp_free;
        parent->tcp_free = c;
@@ -2590,6 +2623,9 @@ comm_point_create_tcp_handler(struct comm_base *base,
        {
                log_err("could not basetset tcphdl event");
                parent->tcp_free = c->tcp_free;
+               tcp_req_info_delete(c->tcp_req_info);
+               sldns_buffer_free(c->buffer);
+               free(c->timeout);
                free(c->ev);
                free(c);
                return NULL;
@@ -2600,7 +2636,8 @@ comm_point_create_tcp_handler(struct comm_base *base,
 struct comm_point* 
 comm_point_create_tcp(struct comm_base *base, int fd, int num,
        int idle_timeout, struct tcl_list* tcp_conn_limit, size_t bufsize,
-        comm_point_callback_type* callback, void* callback_arg)
+       struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
+       void* callback_arg)
 {
        struct comm_point* c = (struct comm_point*)calloc(1,
                sizeof(struct comm_point));
@@ -2667,7 +2704,7 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num,
        /* now prealloc the tcp handlers */
        for(i=0; i<num; i++) {
                c->tcp_handlers[i] = comm_point_create_tcp_handler(base,
-                       c, bufsize, callback, callback_arg);
+                       c, bufsize, spoolbuf, callback, callback_arg);
                if(!c->tcp_handlers[i]) {
                        comm_point_delete(c);
                        return NULL;
@@ -2949,6 +2986,8 @@ comm_point_close(struct comm_point* c)
                }
        }
        tcl_close_connection(c->tcl_addr);
+       if(c->tcp_req_info)
+               tcp_req_info_clear(c->tcp_req_info);
        /* close fd after removing from event lists, or epoll.. is messed up */
        if(c->fd != -1 && !c->do_not_close) {
                if(c->type == comm_tcp || c->type == comm_http) {
@@ -2992,6 +3031,9 @@ comm_point_delete(struct comm_point* c)
                        sldns_buffer_free(c->dnscrypt_buffer);
                }
 #endif
+               if(c->tcp_req_info) {
+                       tcp_req_info_delete(c->tcp_req_info);
+               }
        }
        ub_event_free(c->ev->ev);
        free(c->ev);
@@ -3032,8 +3074,12 @@ comm_point_send_reply(struct comm_reply *repinfo)
                        dt_msg_send_client_response(repinfo->c->tcp_parent->dtenv,
                        &repinfo->addr, repinfo->c->type, repinfo->c->buffer);
 #endif
-               comm_point_start_listening(repinfo->c, -1,
-                       repinfo->c->tcp_timeout_msec);
+               if(repinfo->c->tcp_req_info) {
+                       tcp_req_info_send_reply(repinfo->c->tcp_req_info);
+               } else {
+                       comm_point_start_listening(repinfo->c, -1,
+                               repinfo->c->tcp_timeout_msec);
+               }
        }
 }
 
@@ -3046,6 +3092,8 @@ comm_point_drop_reply(struct comm_reply* repinfo)
        log_assert(repinfo->c->type != comm_tcp_accept);
        if(repinfo->c->type == comm_udp)
                return;
+       if(repinfo->c->tcp_req_info)
+               repinfo->c->tcp_req_info->is_drop = 1;
        reclaim_tcp_handler(repinfo->c);
 }
 
index f6b6af688b94864e1ea74a6f633b495c7a5628ce..d80c72b33431c5ef052b159a3544ecbca8be35fc 100644 (file)
@@ -268,6 +268,9 @@ struct comm_point {
        /** the entry for the connection. */
        struct tcl_addr* tcl_addr;
 
+       /** the structure to keep track of open requests on this channel */
+       struct tcp_req_info* tcp_req_info;
+
 #ifdef USE_MSG_FASTOPEN
        /** used to track if the sendto() call should be done when using TFO. */
        int tcp_do_fastopen;
@@ -455,6 +458,8 @@ struct comm_point* comm_point_create_udp_ancil(struct comm_base* base,
  * @param idle_timeout: TCP idle timeout in ms.
  * @param tcp_conn_limit: TCP connection limit info.
  * @param bufsize: size of buffer to create for handlers.
+ * @param spoolbuf: shared spool buffer for tcp_req_info structures.
+ *     or NULL to not create those structures in the tcp handlers.
  * @param callback: callback function pointer for TCP handlers.
  * @param callback_arg: will be passed to your callback function.
  * @return: returns the TCP listener commpoint. You can find the
@@ -464,7 +469,8 @@ struct comm_point* comm_point_create_udp_ancil(struct comm_base* base,
  */
 struct comm_point* comm_point_create_tcp(struct comm_base* base,
        int fd, int num, int idle_timeout, struct tcl_list* tcp_conn_limit,
-       size_t bufsize, comm_point_callback_type* callback, void* callback_arg);
+       size_t bufsize, struct sldns_buffer* spoolbuf,
+       comm_point_callback_type* callback, void* callback_arg);
 
 /**
  * Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.