From: W.C.A. Wijngaards Date: Thu, 25 Jun 2020 14:05:25 +0000 (+0200) Subject: in outside_network.c: also log messages that end up on the waiting list. X-Git-Tag: release-1.13.0rc1~5^2~59 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=34c063701e19f2736da4d1d855dac88860e1a011;p=thirdparty%2Funbound.git in outside_network.c: also log messages that end up on the waiting list. with dnstap. for tcp use_free_buffer reuse existing entry if second wait entry on the same addr as the other waiting. --- diff --git a/services/outside_network.c b/services/outside_network.c index b65bf19f9..f76b0ed3a 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -344,6 +344,127 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen) return 1; } +/** log reuse item addr and ptr with message */ +static void +log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse) +{ + uint16_t port; + char addrbuf[128]; + if(verbosity < v) return; + addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf)); + port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port); + verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port, + (unsigned long long)reuse, reuse->pending->c->fd); +} + +/** pop the first element from the writewait list */ +static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse) +{ + struct waiting_tcp* w = reuse->write_wait_first; + if(!w) + return NULL; + log_assert(w->write_wait_queued); + if(w->write_wait_prev) + w->write_wait_prev->write_wait_next = w->write_wait_next; + else reuse->write_wait_first = w->write_wait_next; + if(w->write_wait_next) + w->write_wait_next->write_wait_prev = w->write_wait_prev; + else reuse->write_wait_last = w->write_wait_prev; + w->write_wait_queued = 0; + return w; +} + +/** push the element after the last on the writewait list */ +static void reuse_write_wait_push_back(struct reuse_tcp* reuse, + struct waiting_tcp* w) +{ + if(!w) return; + log_assert(!w->write_wait_queued); + if(reuse->write_wait_last) { + reuse->write_wait_last->write_wait_next = w; + w->write_wait_prev = reuse->write_wait_last; + } else { + reuse->write_wait_first = w; + } + reuse->write_wait_last = w; + w->write_wait_queued = 1; +} + +/** insert element in tree by id */ +static void +reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w) +{ + log_assert(w->id_node.key == NULL); + w->id_node.key = w; + rbtree_insert(&reuse->tree_by_id, &w->id_node); +} + +/** find reuse tcp stream to destination for query, or NULL if none */ +static struct reuse_tcp* +reuse_tcp_find(struct outside_network* outnet, struct sockaddr_storage* addr, + socklen_t addrlen, int use_ssl) +{ + struct waiting_tcp key_w; + struct pending_tcp key_p; + struct comm_point c; + rbnode_type* result = NULL, *prev; + verbose(5, "reuse_tcp_find"); + memset(&key_w, 0, sizeof(key_w)); + memset(&key_p, 0, sizeof(key_p)); + memset(&c, 0, sizeof(c)); + key_p.query = &key_w; + key_p.c = &c; + key_p.reuse.pending = &key_p; + key_p.reuse.node.key = &key_p.reuse; + if(use_ssl) /* something nonNULL for comparisons in tree */ + key_p.c->ssl = (void*)1; + if(addrlen > sizeof(key_p.reuse.addr)) + return NULL; + memmove(&key_p.reuse.addr, addr, addrlen); + key_p.reuse.addrlen = addrlen; + + verbose(5, "reuse_tcp_find: num reuse streams %u", + (unsigned)outnet->tcp_reuse.count); + if(outnet->tcp_reuse.root == NULL || + outnet->tcp_reuse.root == RBTREE_NULL) + return NULL; + if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node, + &result)) { + /* exact match */ + /* but the key is on stack, and ptr is compared, impossible */ + log_assert(&key_p.reuse != (struct reuse_tcp*)result); + log_assert(&key_p != ((struct reuse_tcp*)result)->pending); + } + /* not found, return null */ + if(!result || result == RBTREE_NULL) + return NULL; + verbose(5, "reuse_tcp_find check inexact match"); + /* inexact match, find one of possibly several connections to the + * same destination address, with the correct port, ssl, and + * also less than max number of open queries, or else, fail to open + * a new one */ + /* rewind to start of sequence of same address,port,ssl */ + prev = rbtree_previous(result); + while(prev && prev != RBTREE_NULL && + reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) { + result = prev; + prev = rbtree_previous(result); + } + + /* loop to find first one that has correct characteristics */ + while(result && result != RBTREE_NULL && + reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) { + if(((struct reuse_tcp*)result)->tree_by_id.count < + MAX_REUSE_TCP_QUERIES) { + /* same address, port, ssl-yes-or-no, and has + * space for another query */ + return (struct reuse_tcp*)result; + } + result = rbtree_next(result); + } + return NULL; +} + /** use the buffer to setup writing the query */ static void outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, @@ -482,6 +603,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w) pend->c->repinfo.addrlen = w->addrlen; memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen); pend->reuse.pending = pend; + reuse_tree_by_id_insert(&pend->reuse, w); outnet_tcp_take_query_setup(s, pend, w); return 1; } @@ -493,17 +615,32 @@ use_free_buffer(struct outside_network* outnet) struct waiting_tcp* w; while(outnet->tcp_free && outnet->tcp_wait_first && !outnet->want_to_quit) { + struct reuse_tcp* reuse = NULL; w = outnet->tcp_wait_first; outnet->tcp_wait_first = w->next_waiting; if(outnet->tcp_wait_last == w) outnet->tcp_wait_last = NULL; w->on_tcp_waiting_list = 0; - if(!outnet_tcp_take_into_use(w)) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - waiting_tcp_delete(w); - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL); + reuse = reuse_tcp_find(outnet, &w->addr, w->addrlen, + w->ssl_upstream); + if(reuse) { + log_reuse_tcp(5, "use free buffer for waiting tcp: " + "found reuse", reuse); + if(reuse->pending->query) { + /* on the write wait list */ + comm_timer_disable(w->timer); + w->next_waiting = (void*)reuse->pending; + reuse_tree_by_id_insert(reuse, w); + reuse_write_wait_push_back(reuse, w); + } + } else { + if(!outnet_tcp_take_into_use(w)) { + comm_point_callback_type* cb = w->cb; + void* cb_arg = w->cb_arg; + waiting_tcp_delete(w); + fptr_ok(fptr_whitelist_pending_tcp(cb)); + (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL); + } } } } @@ -602,19 +739,6 @@ decommission_pending_tcp(struct outside_network* outnet, reuse_del_writewait(pend); } -/** log reuse item addr and ptr with message */ -static void -log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse) -{ - uint16_t port; - char addrbuf[128]; - if(verbosity < v) return; - addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf)); - port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port); - verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port, - (unsigned long long)reuse, reuse->pending->c->fd); -} - /** insert into reuse tcp tree and LRU, false on failure (duplicate) */ static int reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp) @@ -697,22 +821,6 @@ reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w) w->id_node.key = NULL; } -/** pop the first element from the writewait list */ -static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend) -{ - struct waiting_tcp* w = pend->reuse.write_wait_first; - if(!w) - return NULL; - if(w->write_wait_prev) - w->write_wait_prev->write_wait_next = w->write_wait_next; - else pend->reuse.write_wait_first = w->write_wait_next; - if(w->write_wait_next) - w->write_wait_next->write_wait_prev = w->write_wait_prev; - else pend->reuse.write_wait_last = w->write_wait_prev; - w->write_wait_queued = 0; - return w; -} - /** set timeout on tcp fd and setup read event to catch incoming dns msgs */ static void reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp) @@ -756,7 +864,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, pend->query = NULL; /* setup to write next packet or setup read timeout */ if(pend->reuse.write_wait_first) { - pend->query = reuse_write_wait_pop(pend); + pend->query = reuse_write_wait_pop(&pend->reuse); outnet_tcp_take_query_setup(pend->c->fd, pend, pend->query); } else { @@ -1691,80 +1799,6 @@ reuse_tcp_close_oldest(struct outside_network* outnet) decommission_pending_tcp(outnet, pend); } -/** find reuse tcp stream to destination for query, or NULL if none */ -static struct reuse_tcp* -reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq) -{ - struct waiting_tcp key_w; - struct pending_tcp key_p; - struct comm_point c; - rbnode_type* result = NULL, *prev; - verbose(5, "reuse_tcp_find"); - memset(&key_w, 0, sizeof(key_w)); - memset(&key_p, 0, sizeof(key_p)); - memset(&c, 0, sizeof(c)); - key_p.query = &key_w; - key_p.c = &c; - key_p.reuse.pending = &key_p; - key_p.reuse.node.key = &key_p.reuse; - if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */ - key_p.c->ssl = (void*)1; - if(sq->addrlen > sizeof(key_p.reuse.addr)) - return NULL; - memmove(&key_p.reuse.addr, &sq->addr, sq->addrlen); - key_p.reuse.addrlen = sq->addrlen; - - verbose(5, "reuse_tcp_find: num reuse streams %u", - (unsigned)outnet->tcp_reuse.count); - if(outnet->tcp_reuse.root == NULL || - outnet->tcp_reuse.root == RBTREE_NULL) - return NULL; - if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node, - &result)) { - /* exact match */ - /* but the key is on stack, and ptr is compared, impossible */ - log_assert(&key_p.reuse != (struct reuse_tcp*)result); - log_assert(&key_p != ((struct reuse_tcp*)result)->pending); - } - /* not found, return null */ - if(!result || result == RBTREE_NULL) - return NULL; - verbose(5, "reuse_tcp_find check inexact match"); - /* inexact match, find one of possibly several connections to the - * same destination address, with the correct port, ssl, and - * also less than max number of open queries, or else, fail to open - * a new one */ - /* rewind to start of sequence of same address,port,ssl */ - prev = rbtree_previous(result); - while(prev && prev != RBTREE_NULL && - reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) { - result = prev; - prev = rbtree_previous(result); - } - - /* loop to find first one that has correct characteristics */ - while(result && result != RBTREE_NULL && - reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) { - if(((struct reuse_tcp*)result)->tree_by_id.count < - MAX_REUSE_TCP_QUERIES) { - /* same address, port, ssl-yes-or-no, and has - * space for another query */ - return (struct reuse_tcp*)result; - } - result = rbtree_next(result); - } - return NULL; -} - -/** insert element in tree by id */ -static void -reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w) -{ - log_assert(w->id_node.key == NULL); - w->id_node.key = w; - rbtree_insert(&reuse->tree_by_id, &w->id_node); -} - /** find element in tree by id */ static struct waiting_tcp* reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id) @@ -1866,7 +1900,8 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, verbose(5, "pending_tcp_query"); /* find out if a reused stream to the target exists */ /* if so, take it into use */ - reuse = reuse_tcp_find(sq->outnet, sq); + reuse = reuse_tcp_find(sq->outnet, &sq->addr, sq->addrlen, + sq->ssl_upstream); if(reuse) { log_reuse_tcp(5, "pending_tcp_query: found reuse", reuse); log_assert(reuse->pending); @@ -1909,25 +1944,28 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, w->ssl_upstream = sq->ssl_upstream; w->tls_auth_name = sq->tls_auth_name; w->timeout = timeout; + w->id_node.key = NULL; + w->write_wait_prev = NULL; + w->write_wait_next = NULL; + w->write_wait_queued = 0; if(pend) { /* we have a buffer available right now */ if(reuse) { verbose(5, "pending_tcp_query: reuse, store"); - /* if cannot write now, store query and put it - * in the waiting list for this stream TODO */ - /* and insert in tree_by_id */ - reuse_tree_by_id_insert(&pend->reuse, w); - /* and also delete it from waitlst if query gone, - * eg. sq is deleted TODO */ - /* and also servfail all waiting queries if - * stream closes TODO */ /* reuse existing fd, write query and continue */ + /* store query in tree by id */ w->next_waiting = (void*)pend; + reuse_tree_by_id_insert(&pend->reuse, w); + /* can we write right now? */ if(pend->query == NULL) { /* write straight away */ pend->query = w; outnet_tcp_take_query_setup(pend->c->fd, pend, w); + } else { + /* put it in the waiting list for + * this stream */ + reuse_write_wait_push_back(&pend->reuse, w); } } else { verbose(5, "pending_tcp_query: new fd, connect"); @@ -1942,13 +1980,6 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, return NULL; } } -#ifdef USE_DNSTAP - if(sq->outnet->dtenv && - (sq->outnet->dtenv->log_resolver_query_messages || - sq->outnet->dtenv->log_forwarder_query_messages)) - dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr, - comm_tcp, sq->zone, sq->zonelen, packet); -#endif } else { struct timeval tv; /* queue up */ @@ -1965,6 +1996,13 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, #endif comm_timer_set(w->timer, &tv); } +#ifdef USE_DNSTAP + if(sq->outnet->dtenv && + (sq->outnet->dtenv->log_resolver_query_messages || + sq->outnet->dtenv->log_forwarder_query_messages)) + dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr, + comm_tcp, sq->zone, sq->zonelen, packet); +#endif return w; }