From: W.C.A. Wijngaards Date: Thu, 16 Jan 2020 16:12:32 +0000 (+0100) Subject: Stream reuse branch, for TCP and TLS stream reuse. X-Git-Tag: release-1.13.0rc1~5^2~91 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=57aefd102e0bb80af6d4042795731e6ab6ad6946;p=thirdparty%2Funbound.git Stream reuse branch, for TCP and TLS stream reuse. This is for upstream pipes and using them again for the next query. Signposted code for reuse_tcp structure in outside_network.h --- diff --git a/services/outside_network.c b/services/outside_network.c index f865f13c1..a5724a42f 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -131,6 +131,34 @@ serviced_cmp(const void* key1, const void* key2) return sockaddr_cmp(&q1->addr, q1->addrlen, &q2->addr, q2->addrlen); } +int +reuse_cmp(const void* key1, const void* key2) +{ + struct reuse_tcp* r1 = (struct reuse_tcp*)key1; + struct reuse_tcp* r2 = (struct reuse_tcp*)key2; + int r; + /* make sure the entries are in use (have a waiting_tcp entry) */ + if(!r1->pending->query && !r2->pending->query) + return 0; + if(r1->pending->query && !r2->pending->query) + return 1; + if(!r1->pending->query && r2->pending->query) + return -1; + + /* compare address and port */ + r = sockaddr_cmp(&r1->pending->query->addr, r1->pending->query->addrlen, + &r2->pending->query->addr, r2->pending->query->addrlen); + if(r != 0) + return r; + + /* compare if SSL-enabled */ + if(r1->pending->c->ssl && !r2->pending->c->ssl) + return 1; + if(!r1->pending->c->ssl && r2->pending->c->ssl) + return -1; + return 0; +} + /** delete waiting_tcp entry. Does not unlink from waiting list. * @param w: to delete. */ @@ -281,6 +309,20 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen) return 1; } +/** use the buffer to setup writing the query */ +static void +outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt, + size_t pkt_len) +{ + pend->id = LDNS_ID_WIRE(pkt); + sldns_buffer_clear(pend->c->buffer); + sldns_buffer_write(pend->c->buffer, pkt, pkt_len); + sldns_buffer_flip(pend->c->buffer); + pend->c->tcp_is_reading = 0; + pend->c->tcp_byte_count = 0; + comm_point_start_listening(pend->c, s, -1); +} + /** use next free buffer to service a tcp query */ static int outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len) @@ -412,19 +454,13 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len) } w->pkt = NULL; w->next_waiting = (void*)pend; - pend->id = LDNS_ID_WIRE(pkt); w->outnet->num_tcp_outgoing++; w->outnet->tcp_free = pend->next_free; pend->next_free = NULL; pend->query = w; pend->c->repinfo.addrlen = w->addrlen; memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen); - sldns_buffer_clear(pend->c->buffer); - sldns_buffer_write(pend->c->buffer, pkt, pkt_len); - sldns_buffer_flip(pend->c->buffer); - pend->c->tcp_is_reading = 0; - pend->c->tcp_byte_count = 0; - comm_point_start_listening(pend->c, s, -1); + outnet_tcp_take_query_setup(s, pend, pkt, pkt_len); return 1; } @@ -449,6 +485,42 @@ use_free_buffer(struct outside_network* outnet) } } +/** remove reused element from tree and lru list */ +static void +reuse_tcp_remove_tree_list(struct outside_network* outnet, + struct reuse_tcp* reuse) +{ + if(reuse->node.key) { + /* delete it from reuse tree */ + (void)rbtree_delete(&outnet->tcp_reuse, &reuse->node); + reuse->node.key = NULL; + } + /* delete from reuse list */ + if(reuse->pending) { + if(reuse->prev) { + /* assert that members of the lru list are waiting + * and thus have a pending pointer to the struct */ + log_assert(reuse->prev->pending); + reuse->prev->next = reuse->next; + } else { + log_assert(!reuse->next || reuse->next->pending); + outnet->tcp_reuse_first = + (reuse->next?reuse->next->pending:NULL); + } + if(reuse->next) { + /* assert that members of the lru list are waiting + * and thus have a pending pointer to the struct */ + log_assert(reuse->next->pending); + reuse->next->prev = reuse->prev; + } else { + log_assert(!reuse->prev || reuse->prev->pending); + outnet->tcp_reuse_last = + (reuse->prev?reuse->prev->pending:NULL); + } + reuse->pending = NULL; + } +} + /** decommission a tcp buffer, closes commpoint and frees waiting_tcp entry */ static void decommission_pending_tcp(struct outside_network* outnet, @@ -464,9 +536,37 @@ decommission_pending_tcp(struct outside_network* outnet, comm_point_close(pend->c); pend->next_free = outnet->tcp_free; outnet->tcp_free = pend; + if(pend->reuse.pending) { + /* needs unlink from the reuse tree to get deleted */ + reuse_tcp_remove_tree_list(outnet, &pend->reuse); + } waiting_tcp_delete(pend->query); pend->query = NULL; - use_free_buffer(outnet); +} + +/** insert into reuse tcp tree and LRU, false on failure (duplicate) */ +static int +reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp) +{ + pend_tcp->reuse.node.key = &pend_tcp->reuse; + pend_tcp->reuse.pending = pend_tcp; + if(!rbtree_insert(&outnet->tcp_reuse, &pend_tcp->reuse.node)) { + /* this is a duplicate connection, close this one */ + pend_tcp->reuse.node.key = NULL; + pend_tcp->reuse.pending = NULL; + return 0; + } + /* insert into LRU, first is newest */ + pend_tcp->reuse.prev = NULL; + if(outnet->tcp_reuse_first) { + pend_tcp->reuse.next = &outnet->tcp_reuse_first->reuse; + outnet->tcp_reuse_first->reuse.prev = &pend_tcp->reuse; + } else { + pend_tcp->reuse.next = NULL; + outnet->tcp_reuse_last = pend_tcp; + } + outnet->tcp_reuse_first = pend_tcp; + return 1; } int @@ -489,9 +589,21 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, error = NETEVENT_CLOSED; } } + if(error == NETEVENT_NOERROR) { + /* add to reuse tree so it can be reused, if not a failure. + * This is possible if the state machine wants to make a tcp + * query again to the same destination. */ + (void)reuse_tcp_insert(outnet, pend); + } fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb)); (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info); + /* if reused, it should not be decommissioned, TODO */ + /* or if another query wants to write, write that and read for more + * or more outstanding queries on the stream. TODO */ + /* TODO also write multiple queries over the stream, even if no + * replies have returned yet */ decommission_pending_tcp(outnet, pend); + use_free_buffer(outnet); return 0; } @@ -816,6 +928,8 @@ outside_network_create(struct comm_base *base, size_t bufsize, outside_network_delete(outnet); return NULL; } + rbtree_init(&outnet->tcp_reuse, reuse_cmp); + outnet->tcp_reuse_max = num_tcp; /* allocate commpoints */ for(k=0; ktcp_conns[i]) { comm_point_delete(outnet->tcp_conns[i]->c); waiting_tcp_delete(outnet->tcp_conns[i]->query); + /* TODO: loop over tcpwrite wait list and + * delete waiting_tcp_delete them */ free(outnet->tcp_conns[i]); } free(outnet->tcp_conns); @@ -989,6 +1105,10 @@ outside_network_delete(struct outside_network* outnet) p = np; } } + /* was allocated in struct pending that was deleted above */ + rbtree_init(&outnet->tcp_reuse, reuse_cmp); + outnet->tcp_reuse_first = NULL; + outnet->tcp_reuse_last = NULL; if(outnet->udp_wait_first) { struct pending* p = outnet->udp_wait_first, *np; while(p) { @@ -1283,14 +1403,20 @@ outnet_tcptimer(void* arg) { struct waiting_tcp* w = (struct waiting_tcp*)arg; struct outside_network* outnet = w->outnet; - comm_point_callback_type* cb; - void* cb_arg; + int do_callback = 1; if(w->pkt) { /* it is on the waiting list */ waiting_list_remove(outnet, w); } else { /* it was in use */ struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting; + /* see if it needs unlink from reuse tree */ + if(pend->reuse.pending) { + reuse_tcp_remove_tree_list(outnet, &pend->reuse); + do_callback = 0; + } + /* do failure callbacks for all the queries in the + * wait for write list and in the id-tree TODO */ if(pend->c->ssl) { #ifdef HAVE_SSL SSL_shutdown(pend->c->ssl); @@ -1303,23 +1429,100 @@ outnet_tcptimer(void* arg) pend->next_free = outnet->tcp_free; outnet->tcp_free = pend; } - cb = w->cb; - cb_arg = w->cb_arg; - waiting_tcp_delete(w); - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL); + if(do_callback) { + comm_point_callback_type* cb = w->cb; + void* cb_arg = w->cb_arg; + waiting_tcp_delete(w); + fptr_ok(fptr_whitelist_pending_tcp(cb)); + (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL); + } else { + waiting_tcp_delete(w); + } use_free_buffer(outnet); } +/** close the oldest reuse_tcp connection to make a fd and struct pend + * available for a new stream connection */ +static void +reuse_tcp_close_oldest(struct outside_network* outnet) +{ + struct pending_tcp* pend; + if(!outnet->tcp_reuse_last) return; + pend = outnet->tcp_reuse_last; + + /* snip off of LRU */ + log_assert(pend->reuse.next == NULL); + if(pend->reuse.prev) { + log_assert(pend->reuse.prev->pending); + outnet->tcp_reuse_last = pend->reuse.prev->pending; + pend->reuse.prev->next = NULL; + } else { + outnet->tcp_reuse_last = NULL; + outnet->tcp_reuse_first = NULL; + } + + /* TODO should only close unused in tree, not ones that are in use, + * for which we need also a tree to find in-use streams for multiple + * queries on them */ + /* free up */ + decommission_pending_tcp(outnet, pend); +} + +/** find reuse tcp stream to destination for query, or NULL if none */ +static struct reuse_tcp* +reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq) +{ + struct waiting_tcp key_w; + struct pending_tcp key_p; + struct comm_point c; + memset(&key_w, 0, sizeof(key_w)); + memset(&key_p, 0, sizeof(key_p)); + memset(&c, 0, sizeof(c)); + key_p.query = &key_w; + key_p.c = &c; + key_p.reuse.pending = &key_p; + key_p.reuse.node.key = &key_p.reuse; + if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */ + key_p.c->ssl = (void*)1; + if(sq->addrlen > sizeof(key_w.addr)) + return NULL; + memmove(&key_w.addr, &sq->addr, sq->addrlen); + key_w.addrlen = sq->addrlen; + + return (struct reuse_tcp*)rbtree_search(&outnet->tcp_reuse, + &key_p.reuse.node); +} + struct waiting_tcp* pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, int timeout, comm_point_callback_type* callback, void* callback_arg) { struct pending_tcp* pend = sq->outnet->tcp_free; + struct reuse_tcp* reuse = NULL; struct waiting_tcp* w; struct timeval tv; uint16_t id; + + /* find out if a reused stream to the target exists */ + /* if so, take it into use */ + reuse = reuse_tcp_find(sq->outnet, sq); + if(reuse) { + log_assert(reuse->pending); + pend = reuse->pending; + } + + /* if !pend but we have reuse streams, close a reuse stream + * to be able to open a new one to this target, no use waiting + * to reuse a file descriptor while another query needs to use + * that buffer and file descriptor now. */ + if(!pend) { + reuse_tcp_close_oldest(sq->outnet); + pend = sq->outnet->tcp_free; + } + /* if no buffer is free allocate space to store query */ + /* TODO: if reuse cannot write right now, store query even though + * pend is nonNULL */ w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp) + (pend?0:sldns_buffer_limit(packet))); if(!w) { @@ -1347,10 +1550,26 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, comm_timer_set(w->timer, &tv); if(pend) { /* we have a buffer available right now */ - if(!outnet_tcp_take_into_use(w, sldns_buffer_begin(packet), - sldns_buffer_limit(packet))) { - waiting_tcp_delete(w); - return NULL; + if(reuse) { + /* if cannot write now, store query and put it + * in the waiting list for this stream TODO */ + /* and also delete it from waitlst if query gone, + * eg. sq is deleted TODO */ + /* and also servfail all waiting queries if + * stream closes TODO */ + /* reuse existing fd, write query and continue */ + outnet_tcp_take_query_setup(pend->c->fd, pend, + sldns_buffer_begin(packet), + sldns_buffer_limit(packet)); + } else { + /* create new fd and connect to addr, setup to + * write query */ + if(!outnet_tcp_take_into_use(w, + sldns_buffer_begin(packet), + sldns_buffer_limit(packet))) { + waiting_tcp_delete(w); + return NULL; + } } #ifdef USE_DNSTAP if(sq->outnet->dtenv && @@ -1502,6 +1721,42 @@ waiting_list_remove(struct outside_network* outnet, struct waiting_tcp* w) } } +/** reuse tcp stream, remove serviced query from stream, + * return true if the stream is kept, false if it is to be closed */ +static int +reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, + struct serviced_query* sq) +{ + struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting; + /* see if can be entered in reuse tree + * for that the FD has to be non-1 */ + if(pend_tcp->c->fd == -1) { + return 0; + } + /* if in tree and used by other queries */ + if(pend_tcp->reuse.node.key) { + /* note less use of stream */ + /* remove id value used by this svcd. */ + /* do not reset the keepalive timer, for that + * we'd need traffic, and this is where the servicedq is + * removed due to state machine internal reasons, + * eg. iterator no longer interested in this query */ + return 1; + } + /* if still open and want to keep it open */ + if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count < + sq->outnet->tcp_reuse_max) { + /* note less use of stream */ + /* remove id value used by this svcd. */ + /* set a keepalive timer on it */ + if(!reuse_tcp_insert(sq->outnet, pend_tcp)) { + return 0; + } + return 1; + } + return 0; +} + /** cleanup serviced query entry */ static void serviced_delete(struct serviced_query* sq) @@ -1522,9 +1777,14 @@ serviced_delete(struct serviced_query* sq) } else { struct waiting_tcp* p = (struct waiting_tcp*) sq->pending; + /* TODO: if on stream-write-waiting list then + * remove from waiting list and waiting_tcp_delete */ if(p->pkt == NULL) { - decommission_pending_tcp(sq->outnet, - (struct pending_tcp*)p->next_waiting); + if(!reuse_tcp_remove_serviced_keep(p, sq)) { + decommission_pending_tcp(sq->outnet, + (struct pending_tcp*)p->next_waiting); + use_free_buffer(sq->outnet); + } } else { waiting_list_remove(sq->outnet, p); waiting_tcp_delete(p); diff --git a/services/outside_network.h b/services/outside_network.h index 3456a3da3..a0b277b1e 100644 --- a/services/outside_network.h +++ b/services/outside_network.h @@ -52,6 +52,7 @@ struct ub_randstate; struct pending_tcp; struct waiting_tcp; struct waiting_udp; +struct reuse_tcp; struct infra_cache; struct port_comm; struct port_if; @@ -150,6 +151,21 @@ struct outside_network { size_t num_tcp; /** number of tcp communication points in use. */ size_t num_tcp_outgoing; + /** + * tree of still-open and waiting tcp connections for reuse. + * can be closed and reopened to get a new tcp connection. + * or reused to the same destination again. with timeout to close. + * Entries are of type struct reuse_tcp. + * The entries are both active and empty connections. + */ + rbtree_type tcp_reuse; + /** max number of tcp_reuse entries we want to keep open */ + size_t tcp_reuse_max; + /** first and last(oldest) in lru list of reuse connections. + * the oldest can be closed to get a new free pending_tcp if needed + * The list contains empty connections, that wait for timeout or + * a new query that can use the existing connection. */ + struct pending_tcp* tcp_reuse_first, *tcp_reuse_last; /** list of tcp comm points that are free for use */ struct pending_tcp* tcp_free; /** list of tcp queries waiting for a buffer */ @@ -205,6 +221,43 @@ struct port_comm { struct comm_point* cp; }; +/** + * Reuse TCP connection, still open can be used again. + */ +struct reuse_tcp { + /** rbtree node with links in tcp_reuse tree. key is NULL when not + * in tree. Both active and empty connections are in the tree. */ + rbnode_type node; + /** lru chain, so that the oldest can be removed to get a new + * connection when all are in (re)use. oldest is last in list. + * The lru only contains empty connections waiting for reuse, + * the ones with active queries are not on the list because they + * do not need to be closed to make space for others. They already + * service a query so the close for another query does not help + * service a larger number of queries. + * TODO + */ + struct reuse_tcp* next, *prev; + /** the connection to reuse, the fd is non-1 and is open. + * the addr and port determine where the connection is going, + * and is key to the rbtree. The SSL ptr determines if it is + * a TLS connection or a plain TCP connection there. And TLS + * or not is also part of the key to the rbtree. + * There is a timeout and read event on the fd, to close it. + */ + struct pending_tcp* pending; + /** rbtree with other queries waiting on the connection, by ID number, + * of type struct waiting_tcp. It is for looking up received + * answers to the structure for callback. And also to see if ID + * numbers are unused and can be used for a new query. TODO */ + rbtree_type tree_by_id; + /** list of queries waiting to be written on the channel, + * if NULL no queries are waiting to be written and the pending->query + * is the query currently serviced. The first is the next in line. + * Once written, a query moves to the tree_by_id. TODO */ + struct waiting_tcp* write_wait_first, *write_wait_last; +}; + /** * A query that has an answer pending for it. */ @@ -255,6 +308,11 @@ struct pending_tcp { struct comm_point* c; /** the query being serviced, NULL if the pending_tcp is unused. */ struct waiting_tcp* query; + /** the pre-allocated reuse tcp structure. if ->pending is nonNULL + * it is in use and the connection is waiting for reuse. + * It is here for memory pre-allocation, and used to make this + * pending_tcp wait for reuse. */ + struct reuse_tcp reuse; }; /** @@ -266,6 +324,13 @@ struct waiting_tcp { * if pkt==0, this points to the pending_tcp structure. */ struct waiting_tcp* next_waiting; + /** next and prev in query waiting list for stream connection */ + struct waiting_tcp* write_wait_prev, *write_wait_next; + /** true if the waiting_tcp structure is on the write_wait queue */ + int write_wait_queued; + /** entry in reuse.tree_by_id, if key is NULL, not in tree, otherwise, + * this struct is key and sorted by ID from pending_tcp->id. */ + rbnode_type id_node; /** timeout event; timer keeps running whether the query is * waiting for a buffer or the tcp reply is pending */ struct comm_timer* timer; @@ -635,4 +700,7 @@ int pending_cmp(const void* key1, const void* key2); /** compare function of serviced query rbtree */ int serviced_cmp(const void* key1, const void* key2); +/** compare function of reuse_tcp rbtree */ +int reuse_cmp(const void* key1, const void* key2); + #endif /* OUTSIDE_NETWORK_H */ diff --git a/testcode/fake_event.c b/testcode/fake_event.c index d6e904a4d..b04543f1e 100644 --- a/testcode/fake_event.c +++ b/testcode/fake_event.c @@ -1488,6 +1488,12 @@ int serviced_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b)) return 0; } +int reuse_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b)) +{ + log_assert(0); + return 0; +} + /* timers in testbound for autotrust. statistics tested in tdir. */ struct comm_timer* comm_timer_create(struct comm_base* base, void (*cb)(void*), void* cb_arg) diff --git a/util/fptr_wlist.c b/util/fptr_wlist.c index f5da501de..84b185160 100644 --- a/util/fptr_wlist.c +++ b/util/fptr_wlist.c @@ -210,6 +210,7 @@ fptr_whitelist_rbtree_cmp(int (*fptr) (const void *, const void *)) else if(fptr == &fwd_cmp) return 1; else if(fptr == &pending_cmp) return 1; else if(fptr == &serviced_cmp) return 1; + else if(fptr == &reuse_cmp) return 1; else if(fptr == &name_tree_compare) return 1; else if(fptr == &order_lock_cmp) return 1; else if(fptr == &codeline_cmp) return 1;