]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
Stream reuse branch, for TCP and TLS stream reuse.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 16 Jan 2020 16:12:32 +0000 (17:12 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 16 Jan 2020 16:12:32 +0000 (17:12 +0100)
This is for upstream pipes and using them again for the next query.

Signposted code for reuse_tcp structure in outside_network.h

services/outside_network.c
services/outside_network.h
testcode/fake_event.c
util/fptr_wlist.c

index f865f13c1390c7ca9b58559f7d0761c069695423..a5724a42fc853302120ba10868017c6c1fb8c7da 100644 (file)
@@ -131,6 +131,34 @@ serviced_cmp(const void* key1, const void* key2)
        return sockaddr_cmp(&q1->addr, q1->addrlen, &q2->addr, q2->addrlen);
 }
 
+int
+reuse_cmp(const void* key1, const void* key2)
+{
+       struct reuse_tcp* r1 = (struct reuse_tcp*)key1;
+       struct reuse_tcp* r2 = (struct reuse_tcp*)key2;
+       int r;
+       /* make sure the entries are in use (have a waiting_tcp entry) */
+       if(!r1->pending->query && !r2->pending->query)
+               return 0;
+       if(r1->pending->query && !r2->pending->query)
+               return 1;
+       if(!r1->pending->query && r2->pending->query)
+               return -1;
+
+       /* compare address and port */
+       r = sockaddr_cmp(&r1->pending->query->addr, r1->pending->query->addrlen,
+               &r2->pending->query->addr, r2->pending->query->addrlen);
+       if(r != 0)
+               return r;
+
+       /* compare if SSL-enabled */
+       if(r1->pending->c->ssl && !r2->pending->c->ssl)
+               return 1;
+       if(!r1->pending->c->ssl && r2->pending->c->ssl)
+               return -1;
+       return 0;
+}
+
 /** delete waiting_tcp entry. Does not unlink from waiting list. 
  * @param w: to delete.
  */
@@ -281,6 +309,20 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
        return 1;
 }
 
+/** use the buffer to setup writing the query */
+static void
+outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt,
+       size_t pkt_len)
+{
+       pend->id = LDNS_ID_WIRE(pkt);
+       sldns_buffer_clear(pend->c->buffer);
+       sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
+       sldns_buffer_flip(pend->c->buffer);
+       pend->c->tcp_is_reading = 0;
+       pend->c->tcp_byte_count = 0;
+       comm_point_start_listening(pend->c, s, -1);
+}
+
 /** use next free buffer to service a tcp query */
 static int
 outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
@@ -412,19 +454,13 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
        }
        w->pkt = NULL;
        w->next_waiting = (void*)pend;
-       pend->id = LDNS_ID_WIRE(pkt);
        w->outnet->num_tcp_outgoing++;
        w->outnet->tcp_free = pend->next_free;
        pend->next_free = NULL;
        pend->query = w;
        pend->c->repinfo.addrlen = w->addrlen;
        memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
-       sldns_buffer_clear(pend->c->buffer);
-       sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
-       sldns_buffer_flip(pend->c->buffer);
-       pend->c->tcp_is_reading = 0;
-       pend->c->tcp_byte_count = 0;
-       comm_point_start_listening(pend->c, s, -1);
+       outnet_tcp_take_query_setup(s, pend, pkt, pkt_len);
        return 1;
 }
 
@@ -449,6 +485,42 @@ use_free_buffer(struct outside_network* outnet)
        }
 }
 
+/** remove reused element from tree and lru list */
+static void
+reuse_tcp_remove_tree_list(struct outside_network* outnet,
+       struct reuse_tcp* reuse)
+{
+       if(reuse->node.key) {
+               /* delete it from reuse tree */
+               (void)rbtree_delete(&outnet->tcp_reuse, &reuse->node);
+               reuse->node.key = NULL;
+       }
+       /* delete from reuse list */
+       if(reuse->pending) {
+               if(reuse->prev) {
+                       /* assert that members of the lru list are waiting
+                        * and thus have a pending pointer to the struct */
+                       log_assert(reuse->prev->pending);
+                       reuse->prev->next = reuse->next;
+               } else {
+                       log_assert(!reuse->next || reuse->next->pending);
+                       outnet->tcp_reuse_first =
+                               (reuse->next?reuse->next->pending:NULL);
+               }
+               if(reuse->next) {
+                       /* assert that members of the lru list are waiting
+                        * and thus have a pending pointer to the struct */
+                       log_assert(reuse->next->pending);
+                       reuse->next->prev = reuse->prev;
+               } else {
+                       log_assert(!reuse->prev || reuse->prev->pending);
+                       outnet->tcp_reuse_last =
+                               (reuse->prev?reuse->prev->pending:NULL);
+               }
+               reuse->pending = NULL;
+       }
+}
+
 /** decommission a tcp buffer, closes commpoint and frees waiting_tcp entry */
 static void
 decommission_pending_tcp(struct outside_network* outnet, 
@@ -464,9 +536,37 @@ decommission_pending_tcp(struct outside_network* outnet,
        comm_point_close(pend->c);
        pend->next_free = outnet->tcp_free;
        outnet->tcp_free = pend;
+       if(pend->reuse.pending) {
+               /* needs unlink from the reuse tree to get deleted */
+               reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+       }
        waiting_tcp_delete(pend->query);
        pend->query = NULL;
-       use_free_buffer(outnet);
+}
+
+/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
+static int
+reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
+{
+       pend_tcp->reuse.node.key = &pend_tcp->reuse;
+       pend_tcp->reuse.pending = pend_tcp;
+       if(!rbtree_insert(&outnet->tcp_reuse, &pend_tcp->reuse.node)) {
+               /* this is a duplicate connection, close this one */
+               pend_tcp->reuse.node.key = NULL;
+               pend_tcp->reuse.pending = NULL;
+               return 0;
+       }
+       /* insert into LRU, first is newest */
+       pend_tcp->reuse.prev = NULL;
+       if(outnet->tcp_reuse_first) {
+               pend_tcp->reuse.next = &outnet->tcp_reuse_first->reuse;
+               outnet->tcp_reuse_first->reuse.prev = &pend_tcp->reuse;
+       } else {
+               pend_tcp->reuse.next = NULL;
+               outnet->tcp_reuse_last = pend_tcp;
+       }
+       outnet->tcp_reuse_first = pend_tcp;
+       return 1;
 }
 
 int 
@@ -489,9 +589,21 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                        error = NETEVENT_CLOSED;
                }
        }
+       if(error == NETEVENT_NOERROR) {
+               /* add to reuse tree so it can be reused, if not a failure.
+                * This is possible if the state machine wants to make a tcp
+                * query again to the same destination. */
+               (void)reuse_tcp_insert(outnet, pend);
+       }
        fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
        (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
+       /* if reused, it should not be decommissioned, TODO */
+       /* or if another query wants to write, write that and read for more
+        * or more outstanding queries on the stream.  TODO */
+       /* TODO also write multiple queries over the stream, even if no
+        * replies have returned yet */
        decommission_pending_tcp(outnet, pend);
+       use_free_buffer(outnet);
        return 0;
 }
 
@@ -816,6 +928,8 @@ outside_network_create(struct comm_base *base, size_t bufsize,
                outside_network_delete(outnet);
                return NULL;
        }
+       rbtree_init(&outnet->tcp_reuse, reuse_cmp);
+       outnet->tcp_reuse_max = num_tcp;
 
        /* allocate commpoints */
        for(k=0; k<num_ports; k++) {
@@ -977,6 +1091,8 @@ outside_network_delete(struct outside_network* outnet)
                        if(outnet->tcp_conns[i]) {
                                comm_point_delete(outnet->tcp_conns[i]->c);
                                waiting_tcp_delete(outnet->tcp_conns[i]->query);
+                               /* TODO: loop over tcpwrite wait list and
+                                * delete waiting_tcp_delete them */
                                free(outnet->tcp_conns[i]);
                        }
                free(outnet->tcp_conns);
@@ -989,6 +1105,10 @@ outside_network_delete(struct outside_network* outnet)
                        p = np;
                }
        }
+       /* was allocated in struct pending that was deleted above */
+       rbtree_init(&outnet->tcp_reuse, reuse_cmp);
+       outnet->tcp_reuse_first = NULL;
+       outnet->tcp_reuse_last = NULL;
        if(outnet->udp_wait_first) {
                struct pending* p = outnet->udp_wait_first, *np;
                while(p) {
@@ -1283,14 +1403,20 @@ outnet_tcptimer(void* arg)
 {
        struct waiting_tcp* w = (struct waiting_tcp*)arg;
        struct outside_network* outnet = w->outnet;
-       comm_point_callback_type* cb;
-       void* cb_arg;
+       int do_callback = 1;
        if(w->pkt) {
                /* it is on the waiting list */
                waiting_list_remove(outnet, w);
        } else {
                /* it was in use */
                struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
+               /* see if it needs unlink from reuse tree */
+               if(pend->reuse.pending) {
+                       reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+                       do_callback = 0;
+               }
+               /* do failure callbacks for all the queries in the
+                * wait for write list and in the id-tree TODO */
                if(pend->c->ssl) {
 #ifdef HAVE_SSL
                        SSL_shutdown(pend->c->ssl);
@@ -1303,23 +1429,100 @@ outnet_tcptimer(void* arg)
                pend->next_free = outnet->tcp_free;
                outnet->tcp_free = pend;
        }
-       cb = w->cb;
-       cb_arg = w->cb_arg;
-       waiting_tcp_delete(w);
-       fptr_ok(fptr_whitelist_pending_tcp(cb));
-       (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
+       if(do_callback) {
+               comm_point_callback_type* cb = w->cb;
+               void* cb_arg = w->cb_arg;
+               waiting_tcp_delete(w);
+               fptr_ok(fptr_whitelist_pending_tcp(cb));
+               (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
+       } else {
+               waiting_tcp_delete(w);
+       }
        use_free_buffer(outnet);
 }
 
+/** close the oldest reuse_tcp connection to make a fd and struct pend
+ * available for a new stream connection */
+static void
+reuse_tcp_close_oldest(struct outside_network* outnet)
+{
+       struct pending_tcp* pend;
+       if(!outnet->tcp_reuse_last) return;
+       pend = outnet->tcp_reuse_last;
+
+       /* snip off of LRU */
+       log_assert(pend->reuse.next == NULL);
+       if(pend->reuse.prev) {
+               log_assert(pend->reuse.prev->pending);
+               outnet->tcp_reuse_last = pend->reuse.prev->pending;
+               pend->reuse.prev->next = NULL;
+       } else {
+               outnet->tcp_reuse_last = NULL;
+               outnet->tcp_reuse_first = NULL;
+       }
+
+       /* TODO should only close unused in tree, not ones that are in use,
+        * for which we need also a tree to find in-use streams for multiple
+        * queries on them */
+       /* free up */
+       decommission_pending_tcp(outnet, pend);
+}
+
+/** find reuse tcp stream to destination for query, or NULL if none */
+static struct reuse_tcp*
+reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
+{
+       struct waiting_tcp key_w;
+       struct pending_tcp key_p;
+       struct comm_point c;
+       memset(&key_w, 0, sizeof(key_w));
+       memset(&key_p, 0, sizeof(key_p));
+       memset(&c, 0, sizeof(c));
+       key_p.query = &key_w;
+       key_p.c = &c;
+       key_p.reuse.pending = &key_p;
+       key_p.reuse.node.key = &key_p.reuse;
+       if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */
+               key_p.c->ssl = (void*)1;
+       if(sq->addrlen > sizeof(key_w.addr))
+               return NULL;
+       memmove(&key_w.addr, &sq->addr, sq->addrlen);
+       key_w.addrlen = sq->addrlen;
+
+       return (struct reuse_tcp*)rbtree_search(&outnet->tcp_reuse,
+               &key_p.reuse.node);
+}
+
 struct waiting_tcp*
 pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
        int timeout, comm_point_callback_type* callback, void* callback_arg)
 {
        struct pending_tcp* pend = sq->outnet->tcp_free;
+       struct reuse_tcp* reuse = NULL;
        struct waiting_tcp* w;
        struct timeval tv;
        uint16_t id;
+
+       /* find out if a reused stream to the target exists */
+       /* if so, take it into use */
+       reuse = reuse_tcp_find(sq->outnet, sq);
+       if(reuse) {
+               log_assert(reuse->pending);
+               pend = reuse->pending;
+       }
+
+       /* if !pend but we have reuse streams, close a reuse stream
+        * to be able to open a new one to this target, no use waiting
+        * to reuse a file descriptor while another query needs to use
+        * that buffer and file descriptor now. */
+       if(!pend) {
+               reuse_tcp_close_oldest(sq->outnet);
+               pend = sq->outnet->tcp_free;
+       }
+
        /* if no buffer is free allocate space to store query */
+       /* TODO: if reuse cannot write right now, store query even though
+        * pend is nonNULL */
        w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp) 
                + (pend?0:sldns_buffer_limit(packet)));
        if(!w) {
@@ -1347,10 +1550,26 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
        comm_timer_set(w->timer, &tv);
        if(pend) {
                /* we have a buffer available right now */
-               if(!outnet_tcp_take_into_use(w, sldns_buffer_begin(packet),
-                       sldns_buffer_limit(packet))) {
-                       waiting_tcp_delete(w);
-                       return NULL;
+               if(reuse) {
+                       /* if cannot write now, store query and put it
+                        * in the waiting list for this stream TODO */
+                       /* and also delete it from waitlst if query gone,
+                        * eg. sq is deleted TODO */
+                       /* and also servfail all waiting queries if
+                        * stream closes TODO */
+                       /* reuse existing fd, write query and continue */
+                       outnet_tcp_take_query_setup(pend->c->fd, pend,
+                               sldns_buffer_begin(packet),
+                               sldns_buffer_limit(packet));
+               } else {
+                       /* create new fd and connect to addr, setup to
+                        * write query */
+                       if(!outnet_tcp_take_into_use(w,
+                               sldns_buffer_begin(packet),
+                               sldns_buffer_limit(packet))) {
+                               waiting_tcp_delete(w);
+                               return NULL;
+                       }
                }
 #ifdef USE_DNSTAP
                if(sq->outnet->dtenv &&
@@ -1502,6 +1721,42 @@ waiting_list_remove(struct outside_network* outnet, struct waiting_tcp* w)
        }
 }
 
+/** reuse tcp stream, remove serviced query from stream,
+ * return true if the stream is kept, false if it is to be closed */
+static int
+reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
+       struct serviced_query* sq)
+{
+       struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting;
+       /* see if can be entered in reuse tree
+        * for that the FD has to be non-1 */
+       if(pend_tcp->c->fd == -1) {
+               return 0;
+       }
+       /* if in tree and used by other queries */
+       if(pend_tcp->reuse.node.key) {
+               /* note less use of stream */
+               /* remove id value used by this svcd. */
+               /* do not reset the keepalive timer, for that
+                * we'd need traffic, and this is where the servicedq is
+                * removed due to state machine internal reasons,
+                * eg. iterator no longer interested in this query */
+               return 1;
+       }
+       /* if still open and want to keep it open */
+       if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count <
+               sq->outnet->tcp_reuse_max) {
+               /* note less use of stream */
+               /* remove id value used by this svcd. */
+               /* set a keepalive timer on it */
+               if(!reuse_tcp_insert(sq->outnet, pend_tcp)) {
+                       return 0;
+               }
+               return 1;
+       }
+       return 0;
+}
+
 /** cleanup serviced query entry */
 static void
 serviced_delete(struct serviced_query* sq)
@@ -1522,9 +1777,14 @@ serviced_delete(struct serviced_query* sq)
                } else {
                        struct waiting_tcp* p = (struct waiting_tcp*)
                                sq->pending;
+                       /* TODO: if on stream-write-waiting list then
+                        * remove from waiting list and waiting_tcp_delete */
                        if(p->pkt == NULL) {
-                               decommission_pending_tcp(sq->outnet, 
-                                       (struct pending_tcp*)p->next_waiting);
+                               if(!reuse_tcp_remove_serviced_keep(p, sq)) {
+                                       decommission_pending_tcp(sq->outnet,
+                                         (struct pending_tcp*)p->next_waiting);
+                                       use_free_buffer(sq->outnet);
+                               }
                        } else {
                                waiting_list_remove(sq->outnet, p);
                                waiting_tcp_delete(p);
index 3456a3da38b0b67941d70da8d05551680fff7006..a0b277b1e3b312efee13cb0a1ce9de90b0a29827 100644 (file)
@@ -52,6 +52,7 @@ struct ub_randstate;
 struct pending_tcp;
 struct waiting_tcp;
 struct waiting_udp;
+struct reuse_tcp;
 struct infra_cache;
 struct port_comm;
 struct port_if;
@@ -150,6 +151,21 @@ struct outside_network {
        size_t num_tcp;
        /** number of tcp communication points in use. */
        size_t num_tcp_outgoing;
+       /**
+        * tree of still-open and waiting tcp connections for reuse.
+        * can be closed and reopened to get a new tcp connection.
+        * or reused to the same destination again.  with timeout to close.
+        * Entries are of type struct reuse_tcp.
+        * The entries are both active and empty connections.
+        */
+       rbtree_type tcp_reuse;
+       /** max number of tcp_reuse entries we want to keep open */
+       size_t tcp_reuse_max;
+       /** first and last(oldest) in lru list of reuse connections.
+        * the oldest can be closed to get a new free pending_tcp if needed
+        * The list contains empty connections, that wait for timeout or
+        * a new query that can use the existing connection. */
+       struct pending_tcp* tcp_reuse_first, *tcp_reuse_last;
        /** list of tcp comm points that are free for use */
        struct pending_tcp* tcp_free;
        /** list of tcp queries waiting for a buffer */
@@ -205,6 +221,43 @@ struct port_comm {
        struct comm_point* cp;
 };
 
+/**
+ * Reuse TCP connection, still open can be used again.
+ */
+struct reuse_tcp {
+       /** rbtree node with links in tcp_reuse tree. key is NULL when not
+        * in tree. Both active and empty connections are in the tree. */
+       rbnode_type node;
+       /** lru chain, so that the oldest can be removed to get a new
+        * connection when all are in (re)use. oldest is last in list.
+        * The lru only contains empty connections waiting for reuse,
+        * the ones with active queries are not on the list because they
+        * do not need to be closed to make space for others.  They already
+        * service a query so the close for another query does not help
+        * service a larger number of queries.
+        * TODO
+        */
+       struct reuse_tcp* next, *prev;
+       /** the connection to reuse, the fd is non-1 and is open.
+        * the addr and port determine where the connection is going,
+        * and is key to the rbtree.  The SSL ptr determines if it is
+        * a TLS connection or a plain TCP connection there.  And TLS
+        * or not is also part of the key to the rbtree.
+        * There is a timeout and read event on the fd, to close it.
+        */
+       struct pending_tcp* pending;
+       /** rbtree with other queries waiting on the connection, by ID number,
+        * of type struct waiting_tcp. It is for looking up received
+        * answers to the structure for callback.  And also to see if ID
+        * numbers are unused and can be used for a new query. TODO */
+       rbtree_type tree_by_id;
+       /** list of queries waiting to be written on the channel,
+        * if NULL no queries are waiting to be written and the pending->query
+        * is the query currently serviced.  The first is the next in line.
+        * Once written, a query moves to the tree_by_id. TODO */
+       struct waiting_tcp* write_wait_first, *write_wait_last;
+};
+
 /**
  * A query that has an answer pending for it.
  */
@@ -255,6 +308,11 @@ struct pending_tcp {
        struct comm_point* c;
        /** the query being serviced, NULL if the pending_tcp is unused. */
        struct waiting_tcp* query;
+       /** the pre-allocated reuse tcp structure.  if ->pending is nonNULL
+        * it is in use and the connection is waiting for reuse.
+        * It is here for memory pre-allocation, and used to make this
+        * pending_tcp wait for reuse. */
+       struct reuse_tcp reuse;
 };
 
 /**
@@ -266,6 +324,13 @@ struct waiting_tcp {
         * if pkt==0, this points to the pending_tcp structure.
         */
        struct waiting_tcp* next_waiting;
+       /** next and prev in query waiting list for stream connection */
+       struct waiting_tcp* write_wait_prev, *write_wait_next;
+       /** true if the waiting_tcp structure is on the write_wait queue */
+       int write_wait_queued;
+       /** entry in reuse.tree_by_id, if key is NULL, not in tree, otherwise,
+        * this struct is key and sorted by ID from pending_tcp->id. */
+       rbnode_type id_node;
        /** timeout event; timer keeps running whether the query is
         * waiting for a buffer or the tcp reply is pending */
        struct comm_timer* timer;
@@ -635,4 +700,7 @@ int pending_cmp(const void* key1, const void* key2);
 /** compare function of serviced query rbtree */
 int serviced_cmp(const void* key1, const void* key2);
 
+/** compare function of reuse_tcp rbtree */
+int reuse_cmp(const void* key1, const void* key2);
+
 #endif /* OUTSIDE_NETWORK_H */
index d6e904a4d3cb9f6733fa279f5635991caad8501c..b04543f1e4b42dad3409803d5c98e1be78961240 100644 (file)
@@ -1488,6 +1488,12 @@ int serviced_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
        return 0;
 }
 
+int reuse_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
+{
+       log_assert(0);
+       return 0;
+}
+
 /* timers in testbound for autotrust. statistics tested in tdir. */
 struct comm_timer* comm_timer_create(struct comm_base* base, 
        void (*cb)(void*), void* cb_arg)
index f5da501de19be48cc09e211ff7d36619b3885caf..84b18516027dbe657507cf35c02700d5b71221fe 100644 (file)
@@ -210,6 +210,7 @@ fptr_whitelist_rbtree_cmp(int (*fptr) (const void *, const void *))
        else if(fptr == &fwd_cmp) return 1;
        else if(fptr == &pending_cmp) return 1;
        else if(fptr == &serviced_cmp) return 1;
+       else if(fptr == &reuse_cmp) return 1;
        else if(fptr == &name_tree_compare) return 1;
        else if(fptr == &order_lock_cmp) return 1;
        else if(fptr == &codeline_cmp) return 1;