]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
tcp callback function refactor, split read and timeout event setup, leave
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 25 Jun 2020 15:23:46 +0000 (17:23 +0200)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 25 Jun 2020 15:23:46 +0000 (17:23 +0200)
unused queries that are already sent to track their reply on the query
pipeline, when serviced query is deleted deal with the write wait list,

services/outside_network.c
services/outside_network.h

index 71a7adebd65265923e1d33c9fc8ee2cd220a7f19..ee8275c7f710a4792b4d044d502e9cd2570ac6eb 100644 (file)
@@ -364,6 +364,23 @@ static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse)
        if(!w)
                return NULL;
        log_assert(w->write_wait_queued);
+       log_assert(!w->write_wait_prev);
+       reuse->write_wait_first = w->write_wait_next;
+       if(w->write_wait_next)
+               w->write_wait_next->write_wait_prev = NULL;
+       else    reuse->write_wait_last = NULL;
+       w->write_wait_queued = 0;
+       return w;
+}
+
+/** remove the element from the writewait list */
+static void reuse_write_wait_remove(struct reuse_tcp* reuse,
+       struct waiting_tcp* w)
+{
+       if(!w)
+               return;
+       if(!w->write_wait_queued)
+               return;
        if(w->write_wait_prev)
                w->write_wait_prev->write_wait_next = w->write_wait_next;
        else    reuse->write_wait_first = w->write_wait_next;
@@ -371,7 +388,6 @@ static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse)
                w->write_wait_next->write_wait_prev = w->write_wait_prev;
        else    reuse->write_wait_last = w->write_wait_prev;
        w->write_wait_queued = 0;
-       return w;
 }
 
 /** push the element after the last on the writewait list */
@@ -608,6 +624,17 @@ outnet_tcp_take_into_use(struct waiting_tcp* w)
        return 1;
 }
 
+/** call callback on waiting_tcp, if not NULL */
+static void
+waiting_tcp_callback(struct waiting_tcp* w, struct comm_point* c, int error,
+       struct comm_reply* reply_info)
+{
+       if(w->cb) {
+               fptr_ok(fptr_whitelist_pending_tcp(w->cb));
+               (void)(*w->cb)(c, w->cb_arg, error, reply_info);
+       }
+}
+
 /** see if buffers can be used to service TCP queries */
 static void
 use_free_buffer(struct outside_network* outnet)
@@ -641,11 +668,9 @@ use_free_buffer(struct outside_network* outnet)
                        }
                } else {
                        if(!outnet_tcp_take_into_use(w)) {
-                               comm_point_callback_type* cb = w->cb;
-                               void* cb_arg = w->cb_arg;
+                               waiting_tcp_callback(w, NULL, NETEVENT_CLOSED,
+                                       NULL);
                                waiting_tcp_delete(w);
-                               fptr_ok(fptr_whitelist_pending_tcp(cb));
-                               (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
                        }
                }
        }
@@ -780,10 +805,7 @@ static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
        struct waiting_tcp* w;
        w = pend->reuse.write_wait_first;
        while(w) {
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, err, NULL);
+               waiting_tcp_callback(w, NULL, err, NULL);
                w = w->write_wait_next;
        }
 }
@@ -798,10 +820,7 @@ static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
        node = rbtree_first(&pend->reuse.tree_by_id);
        while(node && node != RBTREE_NULL) {
                struct waiting_tcp* w = (struct waiting_tcp*)node->key;
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, err, NULL);
+               waiting_tcp_callback(w, NULL, err, NULL);
                node = rbtree_next(node);
        }
 }
@@ -811,10 +830,7 @@ static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
 {
        struct waiting_tcp* w = pend->query;
        if(w) {
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, err, NULL);
+               waiting_tcp_callback(w, NULL, err, NULL);
        }
 }
 
@@ -829,7 +845,15 @@ reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
 
 /** set timeout on tcp fd and setup read event to catch incoming dns msgs */
 static void
-reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
+reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp)
+{
+       log_reuse_tcp(5, "reuse_tcp_setup_timeout", &pend_tcp->reuse);
+       comm_point_start_listening(pend_tcp->c, -1, REUSE_TIMEOUT);
+}
+
+/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
+static void
+reuse_tcp_setup_read_and_timeout(struct pending_tcp* pend_tcp)
 {
        log_reuse_tcp(5, "reuse_tcp_setup_readtimeout", &pend_tcp->reuse);
        sldns_buffer_clear(pend_tcp->c->buffer);
@@ -875,7 +899,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                        outnet_tcp_take_query_setup(pend->c->fd, pend,
                                pend->query);
                } else {
-                       reuse_tcp_setup_readtimeout(pend);
+                       reuse_tcp_setup_timeout(pend);
                }
                return 0;
        } else if(error != NETEVENT_NOERROR) {
@@ -897,13 +921,11 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                 * query again to the same destination. */
                if(outnet->tcp_reuse.count < outnet->tcp_reuse_max) {
                        (void)reuse_tcp_insert(outnet, pend);
-                       reuse_tcp_setup_readtimeout(pend);
                }
        }
        if(pend->query) {
                reuse_tree_by_id_delete(&pend->reuse, pend->query);
-               fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
-               (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
+               waiting_tcp_callback(pend->query, c, error, reply_info);
                waiting_tcp_delete(pend->query);
                pend->query = NULL;
        }
@@ -912,6 +934,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                verbose(5, "outnet_tcp_cb reuse after cb: keep it");
                /* it is in the reuse_tcp tree, with other queries, or
                 * on the empty list. do not decommission it */
+               reuse_tcp_setup_read_and_timeout(pend);
                return 0;
        }
        verbose(5, "outnet_tcp_cb reuse after cb: decommission it");
@@ -1742,12 +1765,9 @@ outnet_tcptimer(void* arg)
        verbose(5, "outnet_tcptimer");
        if(w->on_tcp_waiting_list) {
                /* it is on the waiting list */
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
                waiting_list_remove(outnet, w);
+               waiting_tcp_callback(w, NULL, NETEVENT_TIMEOUT, NULL);
                waiting_tcp_delete(w);
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
        } else {
                /* it was in use */
                struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
@@ -2154,6 +2174,11 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
 {
        struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting;
        verbose(5, "reuse_tcp_remove_serviced_keep");
+       /* remove the callback. let query continue to write to not cancel
+        * the stream itself.  also keep it as an entry in the tree_by_id,
+        * in case the answer returns (that we no longer want), but we cannot
+        * pick the same ID number meanwhile */
+       pend_tcp->query->cb = NULL;
        /* see if can be entered in reuse tree
         * for that the FD has to be non-1 */
        if(pend_tcp->c->fd == -1) {
@@ -2163,10 +2188,8 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
        /* if in tree and used by other queries */
        if(pend_tcp->reuse.node.key) {
                verbose(5, "reuse_tcp_remove_serviced_keep: in use by other queries");
-               /* note less use of stream */
-               /* remove id value used by this svcd. */
                /* do not reset the keepalive timer, for that
-                * we'd need traffic, and this is where the servicedq is
+                * we'd need traffic, and this is where the serviced is
                 * removed due to state machine internal reasons,
                 * eg. iterator no longer interested in this query */
                return 1;
@@ -2175,13 +2198,11 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
        if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count <
                sq->outnet->tcp_reuse_max) {
                verbose(5, "reuse_tcp_remove_serviced_keep: keep open");
-               /* note less use of stream */
-               /* remove id value used by this svcd. */
                /* set a keepalive timer on it */
                if(!reuse_tcp_insert(sq->outnet, pend_tcp)) {
                        return 0;
                }
-               reuse_tcp_setup_readtimeout(pend_tcp);
+               reuse_tcp_setup_timeout(pend_tcp);
                return 1;
        }
        return 0;
@@ -2207,31 +2228,38 @@ serviced_delete(struct serviced_query* sq)
                         * mesh */
                        outnet_send_wait_udp(sq->outnet);
                } else {
-                       struct waiting_tcp* p = (struct waiting_tcp*)
+                       struct waiting_tcp* w = (struct waiting_tcp*)
                                sq->pending;
                        verbose(5, "serviced_delete: TCP");
-                       /* TODO: if on stream-write-waiting list then
+                       /* if on stream-write-waiting list then
                         * remove from waiting list and waiting_tcp_delete */
-                       if(!p->on_tcp_waiting_list) {
+                       if(w->write_wait_queued) {
+                               struct pending_tcp* pend =
+                                       (struct pending_tcp*)w->next_waiting;
+                               verbose(5, "serviced_delete: writewait");
+                               reuse_tree_by_id_delete(&pend->reuse, w);
+                               reuse_write_wait_remove(&pend->reuse, w);
+                               waiting_tcp_delete(w);
+                       } else if(!w->on_tcp_waiting_list) {
+                               struct pending_tcp* pend =
+                                       (struct pending_tcp*)w->next_waiting;
                                verbose(5, "serviced_delete: tcpreusekeep");
-                               if(!reuse_tcp_remove_serviced_keep(p, sq)) {
+                               if(!reuse_tcp_remove_serviced_keep(w, sq)) {
                                        reuse_cb_curquery_for_failure(
-                                               (struct pending_tcp*)p->
-                                               next_waiting, NETEVENT_CLOSED);
+                                               pend, NETEVENT_CLOSED);
                                        reuse_cb_readwait_for_failure(
-                                               (struct pending_tcp*)p->
-                                               next_waiting, NETEVENT_CLOSED);
+                                               pend, NETEVENT_CLOSED);
                                        reuse_cb_writewait_for_failure(
-                                               (struct pending_tcp*)p->
-                                               next_waiting, NETEVENT_CLOSED);
+                                               pend, NETEVENT_CLOSED);
                                        decommission_pending_tcp(sq->outnet,
-                                         (struct pending_tcp*)p->next_waiting);
+                                               pend);
                                        use_free_buffer(sq->outnet);
                                }
+                               sq->pending = NULL;
                        } else {
                                verbose(5, "serviced_delete: tcpwait");
-                               waiting_list_remove(sq->outnet, p);
-                               waiting_tcp_delete(p);
+                               waiting_list_remove(sq->outnet, w);
+                               waiting_tcp_delete(w);
                        }
                }
        }
index 7c061938a9f473db0f4f5674797ae00277dff2a0..0dcf1b2e1e75dba4a8a1a22c11e1a10a165a21a1 100644 (file)
@@ -375,7 +375,10 @@ struct waiting_tcp {
        uint8_t* pkt;
        /** length of query packet. */
        size_t pkt_len;
-       /** callback for the timeout, error or reply to the message */
+       /** callback for the timeout, error or reply to the message,
+        * or NULL if no user is waiting. the entry uses an ID number.
+        * a query that was written is no longer needed, but the ID number
+        * and a reply will come back and can be ignored if NULL */
        comm_point_callback_type* cb;
        /** callback user argument */
        void* cb_arg;