From: W.C.A. Wijngaards Date: Thu, 25 Jun 2020 12:26:29 +0000 (+0200) Subject: tcp callback handle timeout event for read and reuse keepalive. X-Git-Tag: release-1.13.0rc1~5^2~60 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=39a50f30a33cd8d4a50ecf483bc13bff7eeca38e;p=thirdparty%2Funbound.git tcp callback handle timeout event for read and reuse keepalive. --- diff --git a/services/outside_network.c b/services/outside_network.c index 14532b1f4..b65bf19f9 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -346,16 +346,27 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen) /** use the buffer to setup writing the query */ static void -outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt, - size_t pkt_len) +outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, + struct waiting_tcp* w) { - pend->id = LDNS_ID_WIRE(pkt); - sldns_buffer_clear(pend->c->buffer); - sldns_buffer_write(pend->c->buffer, pkt, pkt_len); - sldns_buffer_flip(pend->c->buffer); + struct timeval tv; + pend->id = LDNS_ID_WIRE(w->pkt); + pend->c->tcp_write_pkt = w->pkt; + pend->c->tcp_write_pkt_len = w->pkt_len; pend->c->tcp_write_and_read = 1; pend->c->tcp_write_byte_count = 0; comm_point_start_listening(pend->c, s, -1); + /* set timer on the waiting_tcp entry, this is the write timeout + * for the written packet. The timer on pend->c is the timer + * for when there is no written packet and we have readtimeouts */ +#ifndef S_SPLINT_S + tv.tv_sec = w->timeout/1000; + tv.tv_usec = (w->timeout%1000)*1000; +#endif + /* if the waiting_tcp was previously waiting for a buffer in the + * outside_network.tcpwaitlist, then the timer is reset now that + * we start writing it */ + comm_timer_set(w->timer, &tv); } /** use next free buffer to service a tcp query */ @@ -471,7 +482,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w) pend->c->repinfo.addrlen = w->addrlen; memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen); pend->reuse.pending = pend; - outnet_tcp_take_query_setup(s, pend, w->pkt, w->pkt_len); + outnet_tcp_take_query_setup(s, pend, w); return 1; } @@ -570,7 +581,7 @@ static void decommission_pending_tcp(struct outside_network* outnet, struct pending_tcp* pend) { - verbose(5, "decommision_pending_tcp"); + verbose(5, "decommission_pending_tcp"); if(pend->c->ssl) { #ifdef HAVE_SSL SSL_shutdown(pend->c->ssl); @@ -633,6 +644,75 @@ reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp) return 1; } +/** perform failure callbacks for waiting queries in reuse write list */ +static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err) +{ + struct waiting_tcp* w; + w = pend->reuse.write_wait_first; + while(w) { + comm_point_callback_type* cb = w->cb; + void* cb_arg = w->cb_arg; + fptr_ok(fptr_whitelist_pending_tcp(cb)); + (void)(*cb)(NULL, cb_arg, err, NULL); + w = w->write_wait_next; + } +} + +/** perform failure callbacks for waiting queries in reuse read rbtree */ +static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err) +{ + rbnode_type* node; + if(pend->reuse.tree_by_id.root == NULL || + pend->reuse.tree_by_id.root == RBTREE_NULL) + return; + node = rbtree_first(&pend->reuse.tree_by_id); + while(node && node != RBTREE_NULL) { + struct waiting_tcp* w = (struct waiting_tcp*)node->key; + comm_point_callback_type* cb = w->cb; + void* cb_arg = w->cb_arg; + fptr_ok(fptr_whitelist_pending_tcp(cb)); + (void)(*cb)(NULL, cb_arg, err, NULL); + node = rbtree_next(node); + } +} + +/** perform failure callbacks for current written query in reuse struct */ +static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err) +{ + struct waiting_tcp* w = pend->query; + if(w) { + comm_point_callback_type* cb = w->cb; + void* cb_arg = w->cb_arg; + fptr_ok(fptr_whitelist_pending_tcp(cb)); + (void)(*cb)(NULL, cb_arg, err, NULL); + } +} + +/** delete element from tree by id */ +static void +reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w) +{ + log_assert(w->id_node.key != NULL); + rbtree_delete(&reuse->tree_by_id, w); + w->id_node.key = NULL; +} + +/** pop the first element from the writewait list */ +static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend) +{ + struct waiting_tcp* w = pend->reuse.write_wait_first; + if(!w) + return NULL; + if(w->write_wait_prev) + w->write_wait_prev->write_wait_next = w->write_wait_next; + else pend->reuse.write_wait_first = w->write_wait_next; + if(w->write_wait_next) + w->write_wait_next->write_wait_prev = w->write_wait_prev; + else pend->reuse.write_wait_last = w->write_wait_prev; + w->write_wait_queued = 0; + return w; +} + /** set timeout on tcp fd and setup read event to catch incoming dns msgs */ static void reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp) @@ -651,7 +731,38 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, struct pending_tcp* pend = (struct pending_tcp*)arg; struct outside_network* outnet = pend->reuse.outnet; verbose(VERB_ALGO, "outnettcp cb"); - if(error != NETEVENT_NOERROR) { + if(error == NETEVENT_TIMEOUT) { + if(pend->c->tcp_write_and_read) + verbose(VERB_QUERY, "outnettcp got tcp timeout " + "for read, ignored because write underway"); + else verbose(VERB_QUERY, "outnettcp got tcp timeout %s", + (pend->reuse.tree_by_id.count?"for reading pkt": + "for keepalive for reuse")); + /* if we are writing, ignore readtimer, wait for write timer + * or write is done */ + if(pend->c->tcp_write_and_read) + return 0; + /* must be timeout for reading or keepalive reuse, + * close it. */ + reuse_tcp_remove_tree_list(outnet, &pend->reuse); + } else if(error == NETEVENT_PKT_WRITTEN) { + /* the packet we want to write has been written. */ + log_assert(c == pend->c); + log_assert(pend->query->pkt == pend->c->tcp_write_pkt); + log_assert(pend->query->pkt_len == pend->c->tcp_write_pkt_len); + pend->c->tcp_write_pkt = NULL; + pend->c->tcp_write_pkt_len = 0; + /* the pend.query is already in tree_by_id */ + pend->query = NULL; + /* setup to write next packet or setup read timeout */ + if(pend->reuse.write_wait_first) { + pend->query = reuse_write_wait_pop(pend); + outnet_tcp_take_query_setup(pend->c->fd, pend, + pend->query); + } else { + reuse_tcp_setup_readtimeout(pend); + } + } else if(error != NETEVENT_NOERROR) { verbose(VERB_QUERY, "outnettcp got tcp error %d", error); /* pass error below and exit */ } else { @@ -674,6 +785,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, } } if(pend->query) { + reuse_tree_by_id_delete(&pend->reuse, pend->query); fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb)); (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info); waiting_tcp_delete(pend->query); @@ -687,7 +799,12 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, return 0; } verbose(5, "outnet_tcp_cb reuse after cb: decommission it"); - /* no queries on it, no space to keep it. Close it */ + /* no queries on it, no space to keep it. or timeout or closed due + * to error. Close it */ + reuse_cb_readwait_for_failure(pend, (error==NETEVENT_TIMEOUT? + NETEVENT_TIMEOUT:NETEVENT_CLOSED)); + reuse_cb_writewait_for_failure(pend, (error==NETEVENT_TIMEOUT? + NETEVENT_TIMEOUT:NETEVENT_CLOSED)); decommission_pending_tcp(outnet, pend); use_free_buffer(outnet); return 0; @@ -1501,50 +1618,6 @@ pending_udp_query(struct serviced_query* sq, struct sldns_buffer* packet, return pend; } -/** perform failure callbacks for waiting queries in reuse write list */ -static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err) -{ - struct waiting_tcp* w; - w = pend->reuse.write_wait_first; - while(w) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); - w = w->write_wait_next; - } -} - -/** perform failure callbacks for waiting queries in reuse read rbtree */ -static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err) -{ - rbnode_type* node; - if(pend->reuse.tree_by_id.root == NULL || - pend->reuse.tree_by_id.root == RBTREE_NULL) - return; - node = rbtree_first(&pend->reuse.tree_by_id); - while(node && node != RBTREE_NULL) { - struct waiting_tcp* w = (struct waiting_tcp*)node->key; - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); - node = rbtree_next(node); - } -} - -/** perform failure callbacks for current written query in reuse struct */ -static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err) -{ - struct waiting_tcp* w = pend->query; - if(w) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); - } -} - void outnet_tcptimer(void* arg) { @@ -1683,6 +1756,15 @@ reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq) return NULL; } +/** insert element in tree by id */ +static void +reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w) +{ + log_assert(w->id_node.key == NULL); + w->id_node.key = w; + rbtree_insert(&reuse->tree_by_id, &w->id_node); +} + /** find element in tree by id */ static struct waiting_tcp* reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id) @@ -1779,7 +1861,6 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, struct pending_tcp* pend = sq->outnet->tcp_free; struct reuse_tcp* reuse = NULL; struct waiting_tcp* w; - struct timeval tv; uint16_t id; verbose(5, "pending_tcp_query"); @@ -1827,11 +1908,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, w->cb_arg = callback_arg; w->ssl_upstream = sq->ssl_upstream; w->tls_auth_name = sq->tls_auth_name; -#ifndef S_SPLINT_S - tv.tv_sec = timeout/1000; - tv.tv_usec = (timeout%1000)*1000; -#endif - comm_timer_set(w->timer, &tv); + w->timeout = timeout; if(pend) { /* we have a buffer available right now */ if(reuse) { @@ -1839,15 +1916,19 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, /* if cannot write now, store query and put it * in the waiting list for this stream TODO */ /* and insert in tree_by_id */ + reuse_tree_by_id_insert(&pend->reuse, w); /* and also delete it from waitlst if query gone, * eg. sq is deleted TODO */ /* and also servfail all waiting queries if * stream closes TODO */ /* reuse existing fd, write query and continue */ w->next_waiting = (void*)pend; - pend->query = w; - outnet_tcp_take_query_setup(pend->c->fd, pend, - w->pkt, w->pkt_len); + if(pend->query == NULL) { + /* write straight away */ + pend->query = w; + outnet_tcp_take_query_setup(pend->c->fd, pend, + w); + } } else { verbose(5, "pending_tcp_query: new fd, connect"); /* create new fd and connect to addr, setup to @@ -1869,6 +1950,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, comm_tcp, sq->zone, sq->zonelen, packet); #endif } else { + struct timeval tv; /* queue up */ verbose(5, "pending_tcp_query: queue to wait"); w->next_waiting = NULL; @@ -1877,6 +1959,11 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet, else sq->outnet->tcp_wait_first = w; sq->outnet->tcp_wait_last = w; w->on_tcp_waiting_list = 1; +#ifndef S_SPLINT_S + tv.tv_sec = timeout/1000; + tv.tv_usec = (timeout%1000)*1000; +#endif + comm_timer_set(w->timer, &tv); } return w; } diff --git a/services/outside_network.h b/services/outside_network.h index cefee3b08..7c061938a 100644 --- a/services/outside_network.h +++ b/services/outside_network.h @@ -360,6 +360,8 @@ struct waiting_tcp { /** timeout event; timer keeps running whether the query is * waiting for a buffer or the tcp reply is pending */ struct comm_timer* timer; + /** timeout in msec */ + int timeout; /** the outside network it is part of */ struct outside_network* outnet; /** remote address. */ diff --git a/util/netevent.c b/util/netevent.c index 5dd746633..2ca92b92d 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -3221,7 +3221,9 @@ comm_point_start_listening(struct comm_point* c, int newfd, int msec) } if(c->type == comm_tcp || c->type == comm_http) { ub_event_del_bits(c->ev->ev, UB_EV_READ|UB_EV_WRITE); - if(c->tcp_is_reading) + if(c->tcp_write_and_read) + ub_event_add_bits(c->ev->ev, UB_EV_READ|UB_EV_WRITE); + else if(c->tcp_is_reading) ub_event_add_bits(c->ev->ev, UB_EV_READ); else ub_event_add_bits(c->ev->ev, UB_EV_WRITE); }