From 2932d530c10b7519af03c43e8f00beb23fa38c8c Mon Sep 17 00:00:00 2001 From: "W.C.A. Wijngaards" Date: Mon, 27 Jul 2020 16:59:46 +0200 Subject: [PATCH] stream reuse, send queries one by one when upstream refuses multiple queries, by closing the connection. --- services/outside_network.c | 93 +++++++++++++++++++++++++++++--------- services/outside_network.h | 2 + 2 files changed, 74 insertions(+), 21 deletions(-) diff --git a/services/outside_network.c b/services/outside_network.c index c7c0743b8..0ef1dcb4c 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -759,6 +759,75 @@ use_free_buffer(struct outside_network* outnet) } } +/** add waiting_tcp element to the outnet tcp waiting list */ +static void +outnet_add_tcp_waiting(struct outside_network* outnet, struct waiting_tcp* w) +{ + struct timeval tv; + w->next_waiting = NULL; + if(outnet->tcp_wait_last) + outnet->tcp_wait_last->next_waiting = w; + else outnet->tcp_wait_first = w; + outnet->tcp_wait_last = w; + w->on_tcp_waiting_list = 1; +#ifndef S_SPLINT_S + tv.tv_sec = w->timeout/1000; + tv.tv_usec = (w->timeout%1000)*1000; +#endif + comm_timer_set(w->timer, &tv); +} + +/** delete element from tree by id */ +static void +reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w) +{ + log_assert(w->id_node.key != NULL); + rbtree_delete(&reuse->tree_by_id, w); + w->id_node.key = NULL; +} + +/** more writewait list to go for another connection. */ +static void +reuse_move_writewait_away(struct outside_network* outnet, + struct pending_tcp* pend) +{ + /* the writewait list has not been written yet, so if the + * stream was closed, they have not actually been failed, only + * the queries written. Other queries can get written to another + * stream. For upstreams that do not support multiple queries + * and answers, the stream can get closed, and then the queries + * can get written on a new socket */ + struct waiting_tcp* w; + if(pend->query && pend->query->error_count == 0 && + pend->c->tcp_write_pkt == pend->query->pkt && + pend->c->tcp_write_pkt_len == pend->query->pkt_len) { + /* since the current query is not written, it can also + * move to a free buffer */ + verbose(5, "reuse_move_writewait_away current %d done", + (int)pend->c->tcp_write_byte_count); + pend->c->tcp_write_pkt = NULL; + pend->c->tcp_write_pkt_len = 0; + pend->c->tcp_write_and_read = 0; + pend->c->tcp_more_read_again = 0; + pend->c->tcp_more_write_again = 0; + pend->c->tcp_is_reading = 1; + w = pend->query; + pend->query = NULL; + /* increase error count, so that if the next socket fails too + * the server selection is run again with this query failed + * and it can select a different server (if possible), or + * fail the query */ + w->error_count ++; + reuse_tree_by_id_delete(&pend->reuse, w); + outnet_add_tcp_waiting(outnet, w); + } + while((w = reuse_write_wait_pop(&pend->reuse)) != NULL) { + verbose(5, "reuse_move_writewait_away item"); + reuse_tree_by_id_delete(&pend->reuse, w); + outnet_add_tcp_waiting(outnet, w); + } +} + /** remove reused element from tree and lru list */ static void reuse_tcp_remove_tree_list(struct outside_network* outnet, @@ -874,15 +943,6 @@ static void reuse_cb_and_decommission(struct outside_network* outnet, reuse_del_readwait(&store); } -/** delete element from tree by id */ -static void -reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w) -{ - log_assert(w->id_node.key != NULL); - rbtree_delete(&reuse->tree_by_id, w); - w->id_node.key = NULL; -} - /** set timeout on tcp fd and setup read event to catch incoming dns msgs */ static void reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp) @@ -958,6 +1018,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, return 0; } else if(error != NETEVENT_NOERROR) { verbose(VERB_QUERY, "outnettcp got tcp error %d", error); + reuse_move_writewait_away(outnet, pend); /* pass error below and exit */ } else { /* check ID */ @@ -2009,6 +2070,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, w->write_wait_prev = NULL; w->write_wait_next = NULL; w->write_wait_queued = 0; + w->error_count = 0; if(pend) { /* we have a buffer available right now */ if(reuse) { @@ -2047,19 +2109,8 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, /* queue up */ /* waiting for a buffer on the outside network buffer wait * list */ - struct timeval tv; verbose(5, "pending_tcp_query: queue to wait"); - w->next_waiting = NULL; - if(sq->outnet->tcp_wait_last) - sq->outnet->tcp_wait_last->next_waiting = w; - else sq->outnet->tcp_wait_first = w; - sq->outnet->tcp_wait_last = w; - w->on_tcp_waiting_list = 1; -#ifndef S_SPLINT_S - tv.tv_sec = timeout/1000; - tv.tv_usec = (timeout%1000)*1000; -#endif - comm_timer_set(w->timer, &tv); + outnet_add_tcp_waiting(sq->outnet, w); } #ifdef USE_DNSTAP if(sq->outnet->dtenv && diff --git a/services/outside_network.h b/services/outside_network.h index 9ebbabe9c..26705c56d 100644 --- a/services/outside_network.h +++ b/services/outside_network.h @@ -386,6 +386,8 @@ struct waiting_tcp { int ssl_upstream; /** ref to the tls_auth_name from the serviced_query */ char* tls_auth_name; + /** the packet was involved in an error, to stop looping errors */ + int error_count; }; /** -- 2.47.3