]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
tcp callback handle timeout event for read and reuse keepalive.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 25 Jun 2020 12:26:29 +0000 (14:26 +0200)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 25 Jun 2020 12:26:29 +0000 (14:26 +0200)
services/outside_network.c
services/outside_network.h
util/netevent.c

index 14532b1f42433c641f63397e2f171c4cfdb82bc3..b65bf19f986bad4c86ebbd5f221e0d299c270ce4 100644 (file)
@@ -346,16 +346,27 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
 
 /** use the buffer to setup writing the query */
 static void
-outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt,
-       size_t pkt_len)
+outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
+       struct waiting_tcp* w)
 {
-       pend->id = LDNS_ID_WIRE(pkt);
-       sldns_buffer_clear(pend->c->buffer);
-       sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
-       sldns_buffer_flip(pend->c->buffer);
+       struct timeval tv;
+       pend->id = LDNS_ID_WIRE(w->pkt);
+       pend->c->tcp_write_pkt = w->pkt;
+       pend->c->tcp_write_pkt_len = w->pkt_len;
        pend->c->tcp_write_and_read = 1;
        pend->c->tcp_write_byte_count = 0;
        comm_point_start_listening(pend->c, s, -1);
+       /* set timer on the waiting_tcp entry, this is the write timeout
+        * for the written packet.  The timer on pend->c is the timer
+        * for when there is no written packet and we have readtimeouts */
+#ifndef S_SPLINT_S
+       tv.tv_sec = w->timeout/1000;
+       tv.tv_usec = (w->timeout%1000)*1000;
+#endif
+       /* if the waiting_tcp was previously waiting for a buffer in the
+        * outside_network.tcpwaitlist, then the timer is reset now that
+        * we start writing it */
+       comm_timer_set(w->timer, &tv);
 }
 
 /** use next free buffer to service a tcp query */
@@ -471,7 +482,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w)
        pend->c->repinfo.addrlen = w->addrlen;
        memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
        pend->reuse.pending = pend;
-       outnet_tcp_take_query_setup(s, pend, w->pkt, w->pkt_len);
+       outnet_tcp_take_query_setup(s, pend, w);
        return 1;
 }
 
@@ -570,7 +581,7 @@ static void
 decommission_pending_tcp(struct outside_network* outnet, 
        struct pending_tcp* pend)
 {
-       verbose(5, "decommision_pending_tcp");
+       verbose(5, "decommission_pending_tcp");
        if(pend->c->ssl) {
 #ifdef HAVE_SSL
                SSL_shutdown(pend->c->ssl);
@@ -633,6 +644,75 @@ reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
        return 1;
 }
 
+/** perform failure callbacks for waiting queries in reuse write list */
+static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
+{
+       struct waiting_tcp* w;
+       w = pend->reuse.write_wait_first;
+       while(w) {
+               comm_point_callback_type* cb = w->cb;
+               void* cb_arg = w->cb_arg;
+               fptr_ok(fptr_whitelist_pending_tcp(cb));
+               (void)(*cb)(NULL, cb_arg, err, NULL);
+               w = w->write_wait_next;
+       }
+}
+
+/** perform failure callbacks for waiting queries in reuse read rbtree */
+static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
+{
+       rbnode_type* node;
+       if(pend->reuse.tree_by_id.root == NULL ||
+               pend->reuse.tree_by_id.root == RBTREE_NULL)
+               return;
+       node = rbtree_first(&pend->reuse.tree_by_id);
+       while(node && node != RBTREE_NULL) {
+               struct waiting_tcp* w = (struct waiting_tcp*)node->key;
+               comm_point_callback_type* cb = w->cb;
+               void* cb_arg = w->cb_arg;
+               fptr_ok(fptr_whitelist_pending_tcp(cb));
+               (void)(*cb)(NULL, cb_arg, err, NULL);
+               node = rbtree_next(node);
+       }
+}
+
+/** perform failure callbacks for current written query in reuse struct */
+static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
+{
+       struct waiting_tcp* w = pend->query;
+       if(w) {
+               comm_point_callback_type* cb = w->cb;
+               void* cb_arg = w->cb_arg;
+               fptr_ok(fptr_whitelist_pending_tcp(cb));
+               (void)(*cb)(NULL, cb_arg, err, NULL);
+       }
+}
+
+/** delete element from tree by id */
+static void
+reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+       log_assert(w->id_node.key != NULL);
+       rbtree_delete(&reuse->tree_by_id, w);
+       w->id_node.key = NULL;
+}
+
+/** pop the first element from the writewait list */
+static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend)
+{
+       struct waiting_tcp* w = pend->reuse.write_wait_first;
+       if(!w)
+               return NULL;
+       if(w->write_wait_prev)
+               w->write_wait_prev->write_wait_next = w->write_wait_next;
+       else    pend->reuse.write_wait_first = w->write_wait_next;
+       if(w->write_wait_next)
+               w->write_wait_next->write_wait_prev = w->write_wait_prev;
+       else    pend->reuse.write_wait_last = w->write_wait_prev;
+       w->write_wait_queued = 0;
+       return w;
+}
+
 /** set timeout on tcp fd and setup read event to catch incoming dns msgs */
 static void
 reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
@@ -651,7 +731,38 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
        struct pending_tcp* pend = (struct pending_tcp*)arg;
        struct outside_network* outnet = pend->reuse.outnet;
        verbose(VERB_ALGO, "outnettcp cb");
-       if(error != NETEVENT_NOERROR) {
+       if(error == NETEVENT_TIMEOUT) {
+               if(pend->c->tcp_write_and_read)
+                       verbose(VERB_QUERY, "outnettcp got tcp timeout "
+                               "for read, ignored because write underway");
+               else verbose(VERB_QUERY, "outnettcp got tcp timeout %s",
+                       (pend->reuse.tree_by_id.count?"for reading pkt":
+                       "for keepalive for reuse"));
+               /* if we are writing, ignore readtimer, wait for write timer
+                * or write is done */
+               if(pend->c->tcp_write_and_read)
+                       return 0;
+               /* must be timeout for reading or keepalive reuse,
+                * close it. */
+               reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+       } else if(error == NETEVENT_PKT_WRITTEN) {
+               /* the packet we want to write has been written. */
+               log_assert(c == pend->c);
+               log_assert(pend->query->pkt == pend->c->tcp_write_pkt);
+               log_assert(pend->query->pkt_len == pend->c->tcp_write_pkt_len);
+               pend->c->tcp_write_pkt = NULL;
+               pend->c->tcp_write_pkt_len = 0;
+               /* the pend.query is already in tree_by_id */
+               pend->query = NULL;
+               /* setup to write next packet or setup read timeout */
+               if(pend->reuse.write_wait_first) {
+                       pend->query = reuse_write_wait_pop(pend);
+                       outnet_tcp_take_query_setup(pend->c->fd, pend,
+                               pend->query);
+               } else {
+                       reuse_tcp_setup_readtimeout(pend);
+               }
+       } else if(error != NETEVENT_NOERROR) {
                verbose(VERB_QUERY, "outnettcp got tcp error %d", error);
                /* pass error below and exit */
        } else {
@@ -674,6 +785,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                }
        }
        if(pend->query) {
+               reuse_tree_by_id_delete(&pend->reuse, pend->query);
                fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
                (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
                waiting_tcp_delete(pend->query);
@@ -687,7 +799,12 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
                return 0;
        }
        verbose(5, "outnet_tcp_cb reuse after cb: decommission it");
-       /* no queries on it, no space to keep it. Close it */
+       /* no queries on it, no space to keep it. or timeout or closed due
+        * to error.  Close it */
+       reuse_cb_readwait_for_failure(pend, (error==NETEVENT_TIMEOUT?
+               NETEVENT_TIMEOUT:NETEVENT_CLOSED));
+       reuse_cb_writewait_for_failure(pend, (error==NETEVENT_TIMEOUT?
+               NETEVENT_TIMEOUT:NETEVENT_CLOSED));
        decommission_pending_tcp(outnet, pend);
        use_free_buffer(outnet);
        return 0;
@@ -1501,50 +1618,6 @@ pending_udp_query(struct serviced_query* sq, struct sldns_buffer* packet,
        return pend;
 }
 
-/** perform failure callbacks for waiting queries in reuse write list */
-static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
-{
-       struct waiting_tcp* w;
-       w = pend->reuse.write_wait_first;
-       while(w) {
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, err, NULL);
-               w = w->write_wait_next;
-       }
-}
-
-/** perform failure callbacks for waiting queries in reuse read rbtree */
-static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
-{
-       rbnode_type* node;
-       if(pend->reuse.tree_by_id.root == NULL ||
-               pend->reuse.tree_by_id.root == RBTREE_NULL)
-               return;
-       node = rbtree_first(&pend->reuse.tree_by_id);
-       while(node && node != RBTREE_NULL) {
-               struct waiting_tcp* w = (struct waiting_tcp*)node->key;
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, err, NULL);
-               node = rbtree_next(node);
-       }
-}
-
-/** perform failure callbacks for current written query in reuse struct */
-static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
-{
-       struct waiting_tcp* w = pend->query;
-       if(w) {
-               comm_point_callback_type* cb = w->cb;
-               void* cb_arg = w->cb_arg;
-               fptr_ok(fptr_whitelist_pending_tcp(cb));
-               (void)(*cb)(NULL, cb_arg, err, NULL);
-       }
-}
-
 void
 outnet_tcptimer(void* arg)
 {
@@ -1683,6 +1756,15 @@ reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
        return NULL;
 }
 
+/** insert element in tree by id */
+static void
+reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+       log_assert(w->id_node.key == NULL);
+       w->id_node.key = w;
+       rbtree_insert(&reuse->tree_by_id, &w->id_node);
+}
+
 /** find element in tree by id */
 static struct waiting_tcp*
 reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
@@ -1779,7 +1861,6 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
        struct pending_tcp* pend = sq->outnet->tcp_free;
        struct reuse_tcp* reuse = NULL;
        struct waiting_tcp* w;
-       struct timeval tv;
        uint16_t id;
 
        verbose(5, "pending_tcp_query");
@@ -1827,11 +1908,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
        w->cb_arg = callback_arg;
        w->ssl_upstream = sq->ssl_upstream;
        w->tls_auth_name = sq->tls_auth_name;
-#ifndef S_SPLINT_S
-       tv.tv_sec = timeout/1000;
-       tv.tv_usec = (timeout%1000)*1000;
-#endif
-       comm_timer_set(w->timer, &tv);
+       w->timeout = timeout;
        if(pend) {
                /* we have a buffer available right now */
                if(reuse) {
@@ -1839,15 +1916,19 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
                        /* if cannot write now, store query and put it
                         * in the waiting list for this stream TODO */
                        /* and insert in tree_by_id */
+                       reuse_tree_by_id_insert(&pend->reuse, w);
                        /* and also delete it from waitlst if query gone,
                         * eg. sq is deleted TODO */
                        /* and also servfail all waiting queries if
                         * stream closes TODO */
                        /* reuse existing fd, write query and continue */
                        w->next_waiting = (void*)pend;
-                       pend->query = w;
-                       outnet_tcp_take_query_setup(pend->c->fd, pend,
-                               w->pkt, w->pkt_len);
+                       if(pend->query == NULL) {
+                               /* write straight away */
+                               pend->query = w;
+                               outnet_tcp_take_query_setup(pend->c->fd, pend,
+                                       w);
+                       }
                } else {
                        verbose(5, "pending_tcp_query: new fd, connect");
                        /* create new fd and connect to addr, setup to
@@ -1869,6 +1950,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
                comm_tcp, sq->zone, sq->zonelen, packet);
 #endif
        } else {
+               struct timeval tv;
                /* queue up */
                verbose(5, "pending_tcp_query: queue to wait");
                w->next_waiting = NULL;
@@ -1877,6 +1959,11 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
                else    sq->outnet->tcp_wait_first = w;
                sq->outnet->tcp_wait_last = w;
                w->on_tcp_waiting_list = 1;
+#ifndef S_SPLINT_S
+               tv.tv_sec = timeout/1000;
+               tv.tv_usec = (timeout%1000)*1000;
+#endif
+               comm_timer_set(w->timer, &tv);
        }
        return w;
 }
index cefee3b088ca25fed57bdd7762fd6379873e40dc..7c061938a9f473db0f4f5674797ae00277dff2a0 100644 (file)
@@ -360,6 +360,8 @@ struct waiting_tcp {
        /** timeout event; timer keeps running whether the query is
         * waiting for a buffer or the tcp reply is pending */
        struct comm_timer* timer;
+       /** timeout in msec */
+       int timeout;
        /** the outside network it is part of */
        struct outside_network* outnet;
        /** remote address. */
index 5dd746633f3c96aa798d3bc0666503419dba1e4d..2ca92b92dcce999a9b572cd863ee3fb6cdaa1a8a 100644 (file)
@@ -3221,7 +3221,9 @@ comm_point_start_listening(struct comm_point* c, int newfd, int msec)
        }
        if(c->type == comm_tcp || c->type == comm_http) {
                ub_event_del_bits(c->ev->ev, UB_EV_READ|UB_EV_WRITE);
-               if(c->tcp_is_reading)
+               if(c->tcp_write_and_read)
+                       ub_event_add_bits(c->ev->ev, UB_EV_READ|UB_EV_WRITE);
+               else if(c->tcp_is_reading)
                        ub_event_add_bits(c->ev->ev, UB_EV_READ);
                else    ub_event_add_bits(c->ev->ev, UB_EV_WRITE);
        }