From d033ce6c23afb2f5c3fb0eaf463d1e230c22f3eb Mon Sep 17 00:00:00 2001 From: "W.C.A. Wijngaards" Date: Thu, 25 Jun 2020 17:23:46 +0200 Subject: [PATCH] tcp callback function refactor, split read and timeout event setup, leave unused queries that are already sent to track their reply on the query pipeline, when serviced query is deleted deal with the write wait list, --- services/outside_network.c | 118 +++++++++++++++++++++++-------------- services/outside_network.h | 5 +- 2 files changed, 77 insertions(+), 46 deletions(-) diff --git a/services/outside_network.c b/services/outside_network.c index 71a7adebd..ee8275c7f 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -364,6 +364,23 @@ static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse) if(!w) return NULL; log_assert(w->write_wait_queued); + log_assert(!w->write_wait_prev); + reuse->write_wait_first = w->write_wait_next; + if(w->write_wait_next) + w->write_wait_next->write_wait_prev = NULL; + else reuse->write_wait_last = NULL; + w->write_wait_queued = 0; + return w; +} + +/** remove the element from the writewait list */ +static void reuse_write_wait_remove(struct reuse_tcp* reuse, + struct waiting_tcp* w) +{ + if(!w) + return; + if(!w->write_wait_queued) + return; if(w->write_wait_prev) w->write_wait_prev->write_wait_next = w->write_wait_next; else reuse->write_wait_first = w->write_wait_next; @@ -371,7 +388,6 @@ static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse) w->write_wait_next->write_wait_prev = w->write_wait_prev; else reuse->write_wait_last = w->write_wait_prev; w->write_wait_queued = 0; - return w; } /** push the element after the last on the writewait list */ @@ -608,6 +624,17 @@ outnet_tcp_take_into_use(struct waiting_tcp* w) return 1; } +/** call callback on waiting_tcp, if not NULL */ +static void +waiting_tcp_callback(struct waiting_tcp* w, struct comm_point* c, int error, + struct comm_reply* reply_info) +{ + if(w->cb) { + fptr_ok(fptr_whitelist_pending_tcp(w->cb)); + (void)(*w->cb)(c, w->cb_arg, error, reply_info); + } +} + /** see if buffers can be used to service TCP queries */ static void use_free_buffer(struct outside_network* outnet) @@ -641,11 +668,9 @@ use_free_buffer(struct outside_network* outnet) } } else { if(!outnet_tcp_take_into_use(w)) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; + waiting_tcp_callback(w, NULL, NETEVENT_CLOSED, + NULL); waiting_tcp_delete(w); - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL); } } } @@ -780,10 +805,7 @@ static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err) struct waiting_tcp* w; w = pend->reuse.write_wait_first; while(w) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); + waiting_tcp_callback(w, NULL, err, NULL); w = w->write_wait_next; } } @@ -798,10 +820,7 @@ static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err) node = rbtree_first(&pend->reuse.tree_by_id); while(node && node != RBTREE_NULL) { struct waiting_tcp* w = (struct waiting_tcp*)node->key; - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); + waiting_tcp_callback(w, NULL, err, NULL); node = rbtree_next(node); } } @@ -811,10 +830,7 @@ static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err) { struct waiting_tcp* w = pend->query; if(w) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); + waiting_tcp_callback(w, NULL, err, NULL); } } @@ -829,7 +845,15 @@ reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w) /** set timeout on tcp fd and setup read event to catch incoming dns msgs */ static void -reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp) +reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp) +{ + log_reuse_tcp(5, "reuse_tcp_setup_timeout", &pend_tcp->reuse); + comm_point_start_listening(pend_tcp->c, -1, REUSE_TIMEOUT); +} + +/** set timeout on tcp fd and setup read event to catch incoming dns msgs */ +static void +reuse_tcp_setup_read_and_timeout(struct pending_tcp* pend_tcp) { log_reuse_tcp(5, "reuse_tcp_setup_readtimeout", &pend_tcp->reuse); sldns_buffer_clear(pend_tcp->c->buffer); @@ -875,7 +899,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, outnet_tcp_take_query_setup(pend->c->fd, pend, pend->query); } else { - reuse_tcp_setup_readtimeout(pend); + reuse_tcp_setup_timeout(pend); } return 0; } else if(error != NETEVENT_NOERROR) { @@ -897,13 +921,11 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, * query again to the same destination. */ if(outnet->tcp_reuse.count < outnet->tcp_reuse_max) { (void)reuse_tcp_insert(outnet, pend); - reuse_tcp_setup_readtimeout(pend); } } if(pend->query) { reuse_tree_by_id_delete(&pend->reuse, pend->query); - fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb)); - (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info); + waiting_tcp_callback(pend->query, c, error, reply_info); waiting_tcp_delete(pend->query); pend->query = NULL; } @@ -912,6 +934,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, verbose(5, "outnet_tcp_cb reuse after cb: keep it"); /* it is in the reuse_tcp tree, with other queries, or * on the empty list. do not decommission it */ + reuse_tcp_setup_read_and_timeout(pend); return 0; } verbose(5, "outnet_tcp_cb reuse after cb: decommission it"); @@ -1742,12 +1765,9 @@ outnet_tcptimer(void* arg) verbose(5, "outnet_tcptimer"); if(w->on_tcp_waiting_list) { /* it is on the waiting list */ - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; waiting_list_remove(outnet, w); + waiting_tcp_callback(w, NULL, NETEVENT_TIMEOUT, NULL); waiting_tcp_delete(w); - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL); } else { /* it was in use */ struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting; @@ -2154,6 +2174,11 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, { struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting; verbose(5, "reuse_tcp_remove_serviced_keep"); + /* remove the callback. let query continue to write to not cancel + * the stream itself. also keep it as an entry in the tree_by_id, + * in case the answer returns (that we no longer want), but we cannot + * pick the same ID number meanwhile */ + pend_tcp->query->cb = NULL; /* see if can be entered in reuse tree * for that the FD has to be non-1 */ if(pend_tcp->c->fd == -1) { @@ -2163,10 +2188,8 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, /* if in tree and used by other queries */ if(pend_tcp->reuse.node.key) { verbose(5, "reuse_tcp_remove_serviced_keep: in use by other queries"); - /* note less use of stream */ - /* remove id value used by this svcd. */ /* do not reset the keepalive timer, for that - * we'd need traffic, and this is where the servicedq is + * we'd need traffic, and this is where the serviced is * removed due to state machine internal reasons, * eg. iterator no longer interested in this query */ return 1; @@ -2175,13 +2198,11 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count < sq->outnet->tcp_reuse_max) { verbose(5, "reuse_tcp_remove_serviced_keep: keep open"); - /* note less use of stream */ - /* remove id value used by this svcd. */ /* set a keepalive timer on it */ if(!reuse_tcp_insert(sq->outnet, pend_tcp)) { return 0; } - reuse_tcp_setup_readtimeout(pend_tcp); + reuse_tcp_setup_timeout(pend_tcp); return 1; } return 0; @@ -2207,31 +2228,38 @@ serviced_delete(struct serviced_query* sq) * mesh */ outnet_send_wait_udp(sq->outnet); } else { - struct waiting_tcp* p = (struct waiting_tcp*) + struct waiting_tcp* w = (struct waiting_tcp*) sq->pending; verbose(5, "serviced_delete: TCP"); - /* TODO: if on stream-write-waiting list then + /* if on stream-write-waiting list then * remove from waiting list and waiting_tcp_delete */ - if(!p->on_tcp_waiting_list) { + if(w->write_wait_queued) { + struct pending_tcp* pend = + (struct pending_tcp*)w->next_waiting; + verbose(5, "serviced_delete: writewait"); + reuse_tree_by_id_delete(&pend->reuse, w); + reuse_write_wait_remove(&pend->reuse, w); + waiting_tcp_delete(w); + } else if(!w->on_tcp_waiting_list) { + struct pending_tcp* pend = + (struct pending_tcp*)w->next_waiting; verbose(5, "serviced_delete: tcpreusekeep"); - if(!reuse_tcp_remove_serviced_keep(p, sq)) { + if(!reuse_tcp_remove_serviced_keep(w, sq)) { reuse_cb_curquery_for_failure( - (struct pending_tcp*)p-> - next_waiting, NETEVENT_CLOSED); + pend, NETEVENT_CLOSED); reuse_cb_readwait_for_failure( - (struct pending_tcp*)p-> - next_waiting, NETEVENT_CLOSED); + pend, NETEVENT_CLOSED); reuse_cb_writewait_for_failure( - (struct pending_tcp*)p-> - next_waiting, NETEVENT_CLOSED); + pend, NETEVENT_CLOSED); decommission_pending_tcp(sq->outnet, - (struct pending_tcp*)p->next_waiting); + pend); use_free_buffer(sq->outnet); } + sq->pending = NULL; } else { verbose(5, "serviced_delete: tcpwait"); - waiting_list_remove(sq->outnet, p); - waiting_tcp_delete(p); + waiting_list_remove(sq->outnet, w); + waiting_tcp_delete(w); } } } diff --git a/services/outside_network.h b/services/outside_network.h index 7c061938a..0dcf1b2e1 100644 --- a/services/outside_network.h +++ b/services/outside_network.h @@ -375,7 +375,10 @@ struct waiting_tcp { uint8_t* pkt; /** length of query packet. */ size_t pkt_len; - /** callback for the timeout, error or reply to the message */ + /** callback for the timeout, error or reply to the message, + * or NULL if no user is waiting. the entry uses an ID number. + * a query that was written is no longer needed, but the ID number + * and a reply will come back and can be ignored if NULL */ comm_point_callback_type* cb; /** callback user argument */ void* cb_arg; -- 2.47.3