]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
in outside_network.c: also log messages that end up on the waiting list.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 25 Jun 2020 14:05:25 +0000 (16:05 +0200)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 25 Jun 2020 14:05:25 +0000 (16:05 +0200)
with dnstap.
for tcp use_free_buffer reuse existing entry if second wait entry on the
same addr as the other waiting.

services/outside_network.c

index b65bf19f986bad4c86ebbd5f221e0d299c270ce4..f76b0ed3a6c525b27aa62f7e9c9832fd2bed6ff9 100644 (file)
@@ -344,6 +344,127 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
        return 1;
 }
 
+/** log reuse item addr and ptr with message */
+static void
+log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
+{
+       uint16_t port;
+       char addrbuf[128];
+       if(verbosity < v) return;
+       addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
+       port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
+       verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port,
+               (unsigned long long)reuse, reuse->pending->c->fd);
+}
+
+/** pop the first element from the writewait list */
+static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse)
+{
+       struct waiting_tcp* w = reuse->write_wait_first;
+       if(!w)
+               return NULL;
+       log_assert(w->write_wait_queued);
+       if(w->write_wait_prev)
+               w->write_wait_prev->write_wait_next = w->write_wait_next;
+       else    reuse->write_wait_first = w->write_wait_next;
+       if(w->write_wait_next)
+               w->write_wait_next->write_wait_prev = w->write_wait_prev;
+       else    reuse->write_wait_last = w->write_wait_prev;
+       w->write_wait_queued = 0;
+       return w;
+}
+
+/** push the element after the last on the writewait list */
+static void reuse_write_wait_push_back(struct reuse_tcp* reuse,
+       struct waiting_tcp* w)
+{
+       if(!w) return;
+       log_assert(!w->write_wait_queued);
+       if(reuse->write_wait_last) {
+               reuse->write_wait_last->write_wait_next = w;
+               w->write_wait_prev = reuse->write_wait_last;
+       } else {
+               reuse->write_wait_first = w;
+       }
+       reuse->write_wait_last = w;
+       w->write_wait_queued = 1;
+}
+
+/** insert element in tree by id */
+static void
+reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+       log_assert(w->id_node.key == NULL);
+       w->id_node.key = w;
+       rbtree_insert(&reuse->tree_by_id, &w->id_node);
+}
+
+/** find reuse tcp stream to destination for query, or NULL if none */
+static struct reuse_tcp*
+reuse_tcp_find(struct outside_network* outnet, struct sockaddr_storage* addr,
+       socklen_t addrlen, int use_ssl)
+{
+       struct waiting_tcp key_w;
+       struct pending_tcp key_p;
+       struct comm_point c;
+       rbnode_type* result = NULL, *prev;
+       verbose(5, "reuse_tcp_find");
+       memset(&key_w, 0, sizeof(key_w));
+       memset(&key_p, 0, sizeof(key_p));
+       memset(&c, 0, sizeof(c));
+       key_p.query = &key_w;
+       key_p.c = &c;
+       key_p.reuse.pending = &key_p;
+       key_p.reuse.node.key = &key_p.reuse;
+       if(use_ssl) /* something nonNULL for comparisons in tree */
+               key_p.c->ssl = (void*)1;
+       if(addrlen > sizeof(key_p.reuse.addr))
+               return NULL;
+       memmove(&key_p.reuse.addr, addr, addrlen);
+       key_p.reuse.addrlen = addrlen;
+
+       verbose(5, "reuse_tcp_find: num reuse streams %u",
+               (unsigned)outnet->tcp_reuse.count);
+       if(outnet->tcp_reuse.root == NULL ||
+               outnet->tcp_reuse.root == RBTREE_NULL)
+               return NULL;
+       if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
+               &result)) {
+               /* exact match */
+               /* but the key is on stack, and ptr is compared, impossible */
+               log_assert(&key_p.reuse != (struct reuse_tcp*)result);
+               log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
+       }
+       /* not found, return null */
+       if(!result || result == RBTREE_NULL)
+               return NULL;
+       verbose(5, "reuse_tcp_find check inexact match");
+       /* inexact match, find one of possibly several connections to the
+        * same destination address, with the correct port, ssl, and
+        * also less than max number of open queries, or else, fail to open
+        * a new one */
+       /* rewind to start of sequence of same address,port,ssl */
+       prev = rbtree_previous(result);
+       while(prev && prev != RBTREE_NULL &&
+               reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
+               result = prev;
+               prev = rbtree_previous(result);
+       }
+
+       /* loop to find first one that has correct characteristics */
+       while(result && result != RBTREE_NULL &&
+               reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
+               if(((struct reuse_tcp*)result)->tree_by_id.count <
+                       MAX_REUSE_TCP_QUERIES) {
+                       /* same address, port, ssl-yes-or-no, and has
+                        * space for another query */
+                       return (struct reuse_tcp*)result;
+               }
+               result = rbtree_next(result);
+       }
+       return NULL;
+}
+
 /** use the buffer to setup writing the query */
 static void
 outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
@@ -482,6 +603,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w)
        pend->c->repinfo.addrlen = w->addrlen;
        memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
        pend->reuse.pending = pend;
+       reuse_tree_by_id_insert(&pend->reuse, w);
        outnet_tcp_take_query_setup(s, pend, w);
        return 1;
 }
@@ -493,17 +615,32 @@ use_free_buffer(struct outside_network* outnet)
        struct waiting_tcp* w;
        while(outnet->tcp_free && outnet->tcp_wait_first 
                && !outnet->want_to_quit) {
+               struct reuse_tcp* reuse = NULL;
                w = outnet->tcp_wait_first;
                outnet->tcp_wait_first = w->next_waiting;
                if(outnet->tcp_wait_last == w)
                        outnet->tcp_wait_last = NULL;
                w->on_tcp_waiting_list = 0;
-               if(!outnet_tcp_take_into_use(w)) {
-                       comm_point_callback_type* cb = w->cb;
-                       void* cb_arg = w->cb_arg;
-                       waiting_tcp_delete(w);
-                       fptr_ok(fptr_whitelist_pending_tcp(cb));
-                       (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
+               reuse = reuse_tcp_find(outnet, &w->addr, w->addrlen,
+                       w->ssl_upstream);
+               if(reuse) {
+                       log_reuse_tcp(5, "use free buffer for waiting tcp: "
+                               "found reuse", reuse);
+                       if(reuse->pending->query) {
+                               /* on the write wait list */
+                               comm_timer_disable(w->timer);
+                               w->next_waiting = (void*)reuse->pending;
+                               reuse_tree_by_id_insert(reuse, w);
+                               reuse_write_wait_push_back(reuse, w);
+                       }
+               } else {
+                       if(!outnet_tcp_take_into_use(w)) {
+                               comm_point_callback_type* cb = w->cb;
+                               void* cb_arg = w->cb_arg;
+                               waiting_tcp_delete(w);
+                               fptr_ok(fptr_whitelist_pending_tcp(cb));
+                               (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
+                       }
                }
        }
 }
@@ -602,19 +739,6 @@ decommission_pending_tcp(struct outside_network* outnet,
        reuse_del_writewait(pend);
 }
 
-/** log reuse item addr and ptr with message */
-static void
-log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
-{
-       uint16_t port;
-       char addrbuf[128];
-       if(verbosity < v) return;
-       addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
-       port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
-       verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port,
-               (unsigned long long)reuse, reuse->pending->c->fd);
-}
-
 /** insert into reuse tcp tree and LRU, false on failure (duplicate) */
 static int
 reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
@@ -697,22 +821,6 @@ reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
        w->id_node.key = NULL;
 }
 
-/** pop the first element from the writewait list */
-static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend)
-{
-       struct waiting_tcp* w = pend->reuse.write_wait_first;
-       if(!w)
-               return NULL;
-       if(w->write_wait_prev)
-               w->write_wait_prev->write_wait_next = w->write_wait_next;
-       else    pend->reuse.write_wait_first = w->write_wait_next;
-       if(w->write_wait_next)
-               w->write_wait_next->write_wait_prev = w->write_wait_prev;
-       else    pend->reuse.write_wait_last = w->write_wait_prev;
-       w->write_wait_queued = 0;
-       return w;
-}
-
 /** set timeout on tcp fd and setup read event to catch incoming dns msgs */
 static void
 reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
@@ -756,7 +864,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                pend->query = NULL;
                /* setup to write next packet or setup read timeout */
                if(pend->reuse.write_wait_first) {
-                       pend->query = reuse_write_wait_pop(pend);
+                       pend->query = reuse_write_wait_pop(&pend->reuse);
                        outnet_tcp_take_query_setup(pend->c->fd, pend,
                                pend->query);
                } else {
@@ -1691,80 +1799,6 @@ reuse_tcp_close_oldest(struct outside_network* outnet)
        decommission_pending_tcp(outnet, pend);
 }
 
-/** find reuse tcp stream to destination for query, or NULL if none */
-static struct reuse_tcp*
-reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
-{
-       struct waiting_tcp key_w;
-       struct pending_tcp key_p;
-       struct comm_point c;
-       rbnode_type* result = NULL, *prev;
-       verbose(5, "reuse_tcp_find");
-       memset(&key_w, 0, sizeof(key_w));
-       memset(&key_p, 0, sizeof(key_p));
-       memset(&c, 0, sizeof(c));
-       key_p.query = &key_w;
-       key_p.c = &c;
-       key_p.reuse.pending = &key_p;
-       key_p.reuse.node.key = &key_p.reuse;
-       if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */
-               key_p.c->ssl = (void*)1;
-       if(sq->addrlen > sizeof(key_p.reuse.addr))
-               return NULL;
-       memmove(&key_p.reuse.addr, &sq->addr, sq->addrlen);
-       key_p.reuse.addrlen = sq->addrlen;
-
-       verbose(5, "reuse_tcp_find: num reuse streams %u",
-               (unsigned)outnet->tcp_reuse.count);
-       if(outnet->tcp_reuse.root == NULL ||
-               outnet->tcp_reuse.root == RBTREE_NULL)
-               return NULL;
-       if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
-               &result)) {
-               /* exact match */
-               /* but the key is on stack, and ptr is compared, impossible */
-               log_assert(&key_p.reuse != (struct reuse_tcp*)result);
-               log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
-       }
-       /* not found, return null */
-       if(!result || result == RBTREE_NULL)
-               return NULL;
-       verbose(5, "reuse_tcp_find check inexact match");
-       /* inexact match, find one of possibly several connections to the
-        * same destination address, with the correct port, ssl, and
-        * also less than max number of open queries, or else, fail to open
-        * a new one */
-       /* rewind to start of sequence of same address,port,ssl */
-       prev = rbtree_previous(result);
-       while(prev && prev != RBTREE_NULL &&
-               reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
-               result = prev;
-               prev = rbtree_previous(result);
-       }
-
-       /* loop to find first one that has correct characteristics */
-       while(result && result != RBTREE_NULL &&
-               reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
-               if(((struct reuse_tcp*)result)->tree_by_id.count <
-                       MAX_REUSE_TCP_QUERIES) {
-                       /* same address, port, ssl-yes-or-no, and has
-                        * space for another query */
-                       return (struct reuse_tcp*)result;
-               }
-               result = rbtree_next(result);
-       }
-       return NULL;
-}
-
-/** insert element in tree by id */
-static void
-reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
-{
-       log_assert(w->id_node.key == NULL);
-       w->id_node.key = w;
-       rbtree_insert(&reuse->tree_by_id, &w->id_node);
-}
-
 /** find element in tree by id */
 static struct waiting_tcp*
 reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
@@ -1866,7 +1900,8 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
        verbose(5, "pending_tcp_query");
        /* find out if a reused stream to the target exists */
        /* if so, take it into use */
-       reuse = reuse_tcp_find(sq->outnet, sq);
+       reuse = reuse_tcp_find(sq->outnet, &sq->addr, sq->addrlen,
+               sq->ssl_upstream);
        if(reuse) {
                log_reuse_tcp(5, "pending_tcp_query: found reuse", reuse);
                log_assert(reuse->pending);
@@ -1909,25 +1944,28 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
        w->ssl_upstream = sq->ssl_upstream;
        w->tls_auth_name = sq->tls_auth_name;
        w->timeout = timeout;
+       w->id_node.key = NULL;
+       w->write_wait_prev = NULL;
+       w->write_wait_next = NULL;
+       w->write_wait_queued = 0;
        if(pend) {
                /* we have a buffer available right now */
                if(reuse) {
                        verbose(5, "pending_tcp_query: reuse, store");
-                       /* if cannot write now, store query and put it
-                        * in the waiting list for this stream TODO */
-                       /* and insert in tree_by_id */
-                       reuse_tree_by_id_insert(&pend->reuse, w);
-                       /* and also delete it from waitlst if query gone,
-                        * eg. sq is deleted TODO */
-                       /* and also servfail all waiting queries if
-                        * stream closes TODO */
                        /* reuse existing fd, write query and continue */
+                       /* store query in tree by id */
                        w->next_waiting = (void*)pend;
+                       reuse_tree_by_id_insert(&pend->reuse, w);
+                       /* can we write right now? */
                        if(pend->query == NULL) {
                                /* write straight away */
                                pend->query = w;
                                outnet_tcp_take_query_setup(pend->c->fd, pend,
                                        w);
+                       } else {
+                               /* put it in the waiting list for
+                                * this stream */
+                               reuse_write_wait_push_back(&pend->reuse, w);
                        }
                } else {
                        verbose(5, "pending_tcp_query: new fd, connect");
@@ -1942,13 +1980,6 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
                                return NULL;
                        }
                }
-#ifdef USE_DNSTAP
-               if(sq->outnet->dtenv &&
-                  (sq->outnet->dtenv->log_resolver_query_messages ||
-                   sq->outnet->dtenv->log_forwarder_query_messages))
-               dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
-               comm_tcp, sq->zone, sq->zonelen, packet);
-#endif
        } else {
                struct timeval tv;
                /* queue up */
@@ -1965,6 +1996,13 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
 #endif
                comm_timer_set(w->timer, &tv);
        }
+#ifdef USE_DNSTAP
+       if(sq->outnet->dtenv &&
+          (sq->outnet->dtenv->log_resolver_query_messages ||
+           sq->outnet->dtenv->log_forwarder_query_messages))
+               dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
+                       comm_tcp, sq->zone, sq->zonelen, packet);
+#endif
        return w;
 }