]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
assorted small netmgr-related changes
authorWitold Kręcicki <wpk@isc.org>
Wed, 10 Jun 2020 09:32:39 +0000 (11:32 +0200)
committerOndřej Surý <ondrej@isc.org>
Thu, 1 Oct 2020 14:44:43 +0000 (16:44 +0200)
- rename isc_nmsocket_t->tcphandle to statichandle
- cancelread functions now take handles instead of sockets
- add a 'client' flag in socket objects, currently unused, to
  indicate whether it is to be used as a client or server socket

(cherry picked from commit 7eb4564895037d72c46150acb6a8fc04edf4f8d9)

bin/tests/system/pipelined/tests.sh
lib/isc/include/isc/netmgr.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c
lib/isc/netmgr/udp.c

index c0a99a261a149b001357d64f064fbc2b20f365cb..ebacd09b6647d4606c7ab429afc7930ebba941c6 100644 (file)
@@ -27,12 +27,10 @@ $DIFF ref output > /dev/null && { ret=1 ; echo_i "diff out of order failed"; }
 if [ $ret != 0 ]; then echo_i "failed"; fi
 status=`expr $status + $ret`
 
-# flush resolver so queries will be from others again
-$RNDCCMD 10.53.0.4 flush
-sleep 1
-
 echo_i "check pipelined TCP queries using mdig"
 ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
 $MDIG $MDIGOPTS +noall +answer +vc -f input -b 10.53.0.4 @10.53.0.4 > raw.mdig
 awk '{ print $1 " " $5 }' < raw.mdig > output.mdig
 sort < output.mdig > output-sorted.mdig
@@ -43,6 +41,8 @@ status=`expr $status + $ret`
 
 echo_i "check keep-response-order"
 ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
 $PIPEQUERIES -p ${PORT} ++ < inputb > rawb || ret=1
 awk '{ print $1 " " $5 }' < rawb > outputb
 $DIFF refb outputb || ret=1
@@ -51,6 +51,8 @@ status=`expr $status + $ret`
 
 echo_i "check keep-response-order using mdig"
 ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
 $MDIG $MDIGOPTS +noall +answer +vc -f inputb -b 10.53.0.7 @10.53.0.4 > rawb.mdig
 awk '{ print $1 " " $5 }' < rawb.mdig > outputb.mdig
 $DIFF refb outputb.mdig || ret=1
@@ -59,6 +61,8 @@ status=`expr $status + $ret`
 
 echo_i "check mdig -4 -6"
 ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
 $MDIG $MDIGOPTS -4 -6 -f input @10.53.0.4 > output46.mdig 2>&1 && ret=1
 grep "only one of -4 and -6 allowed" output46.mdig > /dev/null || ret=1
 if [ $ret != 0 ]; then echo_i "failed"; fi
index 21a303e23ba921d79912a44d632349f930313df3..61ef9abee7ea4cc012be4d817c6cc376efd015ac 100644 (file)
@@ -176,8 +176,8 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
  * as its argument.
  *
  * When handles are allocated for the socket, 'extrasize' additional bytes
- * will be allocated along with the handle for an associated object
- * (typically ns_client).
+ * can be allocated along with the handle for an associated object, which
+ * can then be freed automatically when the handle is destroyed.
  */
 
 void
@@ -196,12 +196,17 @@ isc_nm_pause(isc_nm_t *mgr);
 void
 isc_nm_resume(isc_nm_t *mgr);
 /*%<
- * Resume paused processing. It will return immediately
- * after signalling workers to resume.
+ * Resume paused processing. It will return immediately after signalling
+ * workers to resume.
  */
 
 isc_result_t
 isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
+/*
+ * Begin (or continue) reading on the socket associated with 'handle', and
+ * update its recv callback to 'cb', which will be called as soon as there
+ * is data to process.
+ */
 
 isc_result_t
 isc_nm_pauseread(isc_nmhandle_t *handle);
index ba37d643bf7fbe88cffdc0074a1d60074dd66c81..9cba034a0139fe4e8a0281ec6e0741f83862847b 100644 (file)
@@ -117,12 +117,10 @@ struct isc_nmiface {
 
 typedef enum isc__netievent_type {
        netievent_udpsend,
-       netievent_udprecv,
        netievent_udpstop,
 
        netievent_tcpconnect,
        netievent_tcpsend,
-       netievent_tcprecv,
        netievent_tcpstartread,
        netievent_tcppauseread,
        netievent_tcpchildaccept,
@@ -130,8 +128,8 @@ typedef enum isc__netievent_type {
        netievent_tcpstop,
        netievent_tcpclose,
 
-       netievent_tcpdnsclose,
        netievent_tcpdnssend,
+       netievent_tcpdnsclose,
 
        netievent_closecb,
        netievent_shutdown,
@@ -146,20 +144,12 @@ typedef enum isc__netievent_type {
        netievent_tcplisten,
 } isc__netievent_type;
 
-/*
- * We have to split it because we can read and write on a socket
- * simultaneously.
- */
 typedef union {
        isc_nm_recv_cb_t recv;
+       isc_nm_cb_t connect;
        isc_nm_accept_cb_t accept;
 } isc__nm_readcb_t;
 
-typedef union {
-       isc_nm_cb_t send;
-       isc_nm_cb_t connect;
-} isc__nm_writecb_t;
-
 typedef union {
        isc_nm_recv_cb_t recv;
        isc_nm_accept_cb_t accept;
@@ -209,10 +199,10 @@ typedef isc__netievent__socket_t isc__netievent_udplisten_t;
 typedef isc__netievent__socket_t isc__netievent_udpstop_t;
 typedef isc__netievent__socket_t isc__netievent_tcpstop_t;
 typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
-typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
 typedef isc__netievent__socket_t isc__netievent_startread_t;
 typedef isc__netievent__socket_t isc__netievent_pauseread_t;
 typedef isc__netievent__socket_t isc__netievent_closecb_t;
+typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
 
 typedef struct isc__netievent__socket_req {
        isc__netievent_type type;
@@ -333,7 +323,7 @@ typedef enum isc_nmsocket_type {
        isc_nm_tcpsocket,
        isc_nm_tcplistener,
        isc_nm_tcpdnslistener,
-       isc_nm_tcpdnssocket
+       isc_nm_tcpdnssocket,
 } isc_nmsocket_type;
 
 /*%
@@ -403,7 +393,7 @@ struct isc_nmsocket {
        isc_nmsocket_t *children;
        int nchildren;
        isc_nmiface_t *iface;
-       isc_nmhandle_t *tcphandle;
+       isc_nmhandle_t *statichandle;
        isc_nmhandle_t *outerhandle;
 
        /*% Extra data allocated at the end of each isc_nmhandle_t */
@@ -445,7 +435,12 @@ struct isc_nmsocket {
        isc_refcount_t references;
 
        /*%
-        * TCPDNS socket has been set not to pipeliine.
+        * Established an outgoing connection, as client not server.
+        */
+       atomic_bool client;
+
+       /*%
+        * TCPDNS socket has been set not to pipeline.
         */
        atomic_bool sequential;
 
@@ -686,6 +681,9 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
 
 isc_result_t
 isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
+/*
+ * Back-end implementation of isc_nm_read() for TCP handles.
+ */
 
 void
 isc__nm_tcp_close(isc_nmsocket_t *sock);
@@ -713,9 +711,9 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock);
  */
 
 void
-isc__nm_tcp_cancelread(isc_nmsocket_t *sock);
+isc__nm_tcp_cancelread(isc_nmhandle_t *handle);
 /*%<
- * Stop reading on a connected socket.
+ * Stop reading on a connected TCP handle.
  */
 
 void
index 29adc61a2acf7768bc28e6197a1089f8d8056c29..a88f8f11e263b4eb1bd6a118ff6248c58285f01f 100644 (file)
@@ -591,6 +591,7 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                        uv_stop(&worker->loop);
                        isc_mempool_put(worker->mgr->evpool, ievent);
                        return;
+
                case netievent_udplisten:
                        isc__nm_async_udplisten(worker, ievent);
                        break;
@@ -600,6 +601,7 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                case netievent_udpsend:
                        isc__nm_async_udpsend(worker, ievent);
                        break;
+
                case netievent_tcpconnect:
                        isc__nm_async_tcpconnect(worker, ievent);
                        break;
@@ -630,9 +632,11 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                case netievent_tcpclose:
                        isc__nm_async_tcpclose(worker, ievent);
                        break;
+
                case netievent_tcpdnsclose:
                        isc__nm_async_tcpdnsclose(worker, ievent);
                        break;
+
                case netievent_closecb:
                        isc__nm_async_closecb(worker, ievent);
                        break;
@@ -739,7 +743,7 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
                isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
        }
 
-       sock->tcphandle = NULL;
+       sock->statichandle = NULL;
 
        if (sock->outerhandle != NULL) {
                isc_nmhandle_unref(sock->outerhandle);
@@ -833,7 +837,7 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
                }
        }
 
-       if (active_handles == 0 || sock->tcphandle != NULL) {
+       if (active_handles == 0 || sock->statichandle != NULL) {
                destroy = true;
        }
 
@@ -1051,7 +1055,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
        if (handle == NULL) {
                handle = alloc_handle(sock);
        } else {
-               isc_refcount_increment0(&handle->references);
+               isc_refcount_init(&handle->references, 1);
                INSIST(VALID_NMHANDLE(handle));
        }
 
@@ -1099,9 +1103,11 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
        handle->ah_pos = pos;
        UNLOCK(&sock->lock);
 
-       if (sock->type == isc_nm_tcpsocket) {
-               INSIST(sock->tcphandle == NULL);
-               sock->tcphandle = handle;
+       if (sock->type == isc_nm_tcpsocket ||
+           (sock->type == isc_nm_udpsocket && atomic_load(&sock->client)))
+       {
+               INSIST(sock->statichandle == NULL);
+               sock->statichandle = handle;
        }
 
        return (handle);
@@ -1208,6 +1214,10 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
                }
        }
 
+       if (handle == sock->statichandle) {
+               sock->statichandle = NULL;
+       }
+
        isc__nmsocket_detach(&sock);
 }
 
@@ -1353,7 +1363,7 @@ isc_nm_cancelread(isc_nmhandle_t *handle) {
 
        switch (handle->sock->type) {
        case isc_nm_tcpsocket:
-               isc__nm_tcp_cancelread(handle->sock);
+               isc__nm_tcp_cancelread(handle);
                break;
        default:
                INSIST(0);
index 75cfd82128cd2820bdb5438250157583173490d0..7fd2a87ef3f3358c9296f3eb8351b6048a51ece3 100644 (file)
@@ -147,49 +147,46 @@ done:
 
 static void
 tcp_connect_cb(uv_connect_t *uvreq, int status) {
+       isc_result_t result;
        isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data;
        isc_nmsocket_t *sock = NULL;
+       struct sockaddr_storage ss;
+       isc_nmhandle_t *handle = NULL;
 
        sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
 
        REQUIRE(VALID_UVREQ(req));
 
-       if (status == 0) {
-               isc_result_t result;
-               struct sockaddr_storage ss;
-               isc_nmhandle_t *handle = NULL;
+       if (status != 0) {
+               req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
+               isc__nm_uvreq_put(&req, sock);
+               return;
+       }
 
-               sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
-               uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
-                                  &(int){ sizeof(ss) });
-               result = isc_sockaddr_fromsockaddr(&sock->peer,
-                                                  (struct sockaddr *)&ss);
-               RUNTIME_CHECK(result == ISC_R_SUCCESS);
+       sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
+       isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
+       uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
+                          &(int){ sizeof(ss) });
+       result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
+       RUNTIME_CHECK(result == ISC_R_SUCCESS);
 
-               handle = isc__nmhandle_get(sock, NULL, NULL);
-               req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
+       handle = isc__nmhandle_get(sock, NULL, NULL);
+       req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
 
-               isc__nm_uvreq_put(&req, sock);
+       isc__nm_uvreq_put(&req, sock);
 
-               /*
-                * The sock is now attached to the handle.
-                */
-               isc__nmsocket_detach(&sock);
+       atomic_init(&sock->client, true);
 
-               /*
-                * If the connect callback wants to hold on to the handle,
-                * it needs to attach to it.
-                */
-               isc_nmhandle_unref(handle);
-       } else {
-               /*
-                * TODO:
-                * Handle the connect error properly and free the socket.
-                */
-               req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
-               isc__nm_uvreq_put(&req, sock);
-       }
+       /*
+        * The sock is now attached to the handle.
+        */
+       isc__nmsocket_detach(&sock);
+
+       /*
+        * If the connect callback wants to hold on to the handle,
+        * it needs to attach to it.
+        */
+       isc_nmhandle_unref(handle);
 }
 
 isc_result_t
@@ -201,6 +198,8 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        isc_result_t result = ISC_R_SUCCESS;
 
        REQUIRE(VALID_NM(mgr));
+       REQUIRE(local != NULL);
+       REQUIRE(peer != NULL);
 
        nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
        isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
@@ -211,6 +210,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        req->cb.connect = cb;
        req->cbarg = cbarg;
        req->peer = peer->addr;
+       req->local = local->addr;
 
        ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
        ievent->sock = nsock;
@@ -604,7 +604,7 @@ readtimeout_cb(uv_timer_t *handle) {
                isc_quota_detach(&sock->quota);
        }
        if (sock->rcb.recv != NULL) {
-               sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL,
+               sock->rcb.recv(sock->statichandle, ISC_R_TIMEDOUT, NULL,
                               sock->rcbarg);
                isc__nmsocket_clearcb(sock);
        }
@@ -766,8 +766,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
                                        .length = nread };
 
                if (sock->rcb.recv != NULL) {
-                       sock->rcb.recv(sock->tcphandle, ISC_R_SUCCESS, &region,
-                                      sock->rcbarg);
+                       sock->rcb.recv(sock->statichandle, ISC_R_SUCCESS,
+                                      &region, sock->rcbarg);
                }
 
                sock->read_timeout = (atomic_load(&sock->keepalive)
@@ -792,7 +792,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
         */
        if (sock->rcb.recv != NULL) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
-               sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg);
+               sock->rcb.recv(sock->statichandle, ISC_R_EOF, NULL,
+                              sock->rcbarg);
                isc__nmsocket_clearcb(sock);
        }
 
@@ -1125,24 +1126,27 @@ void
 isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
        REQUIRE(VALID_NMSOCK(sock));
 
-       if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
+       if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL &&
            sock->rcb.recv != NULL)
        {
-               sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
+               sock->rcb.recv(sock->statichandle, ISC_R_CANCELED, NULL,
                               sock->rcbarg);
                isc__nmsocket_clearcb(sock);
        }
 }
 
 void
-isc__nm_tcp_cancelread(isc_nmsocket_t *sock) {
-       REQUIRE(VALID_NMSOCK(sock));
+isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
+       isc_nmsocket_t *sock = NULL;
 
-       if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
-           sock->rcb.recv != NULL)
-       {
-               sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
-                              sock->rcbarg);
+       REQUIRE(VALID_NMHANDLE(handle));
+
+       sock = handle->sock;
+
+       REQUIRE(sock->type == isc_nm_tcpsocket);
+
+       if (atomic_load(&sock->client) && sock->rcb.recv != NULL) {
+               sock->rcb.recv(handle, ISC_R_EOF, NULL, sock->rcbarg);
                isc__nmsocket_clearcb(sock);
        }
 }
index 81d07013aeae85fbd536ba093aaa28eb26e7fdfa..5fa1d3bbfb514bb64c34d5b4a8237a9daf7affc6 100644 (file)
@@ -82,7 +82,9 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
 static void
 timer_close_cb(uv_handle_t *handle) {
        isc_nmsocket_t *sock = (isc_nmsocket_t *)uv_handle_get_data(handle);
-       INSIST(VALID_NMSOCK(sock));
+
+       REQUIRE(VALID_NMSOCK(sock));
+
        atomic_store(&sock->closed, true);
        tcpdns_close_direct(sock);
 }
@@ -94,7 +96,8 @@ dnstcp_readtimeout(uv_timer_t *timer) {
 
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->tid == isc_nm_tid());
-       /* Close the TCP connection, it's closing should fire 'our' closing */
+
+       /* Close the TCP connection; its closure should fire ours. */
        isc_nmhandle_unref(sock->outerhandle);
        sock->outerhandle = NULL;
 }
@@ -187,8 +190,15 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
         */
        len = dnslen(dnssock->buf);
        if (len <= dnssock->buf_len - 2) {
-               isc_nmhandle_t *dnshandle = isc__nmhandle_get(dnssock, NULL,
-                                                             NULL);
+               isc_nmhandle_t *dnshandle;
+               if (atomic_load(&dnssock->client) &&
+                   dnssock->statichandle != NULL) {
+                       dnshandle = dnssock->statichandle;
+                       isc_nmhandle_ref(dnshandle);
+               } else {
+                       dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
+               }
+
                isc_nmsocket_t *listener = dnssock->listener;
 
                if (listener != NULL && listener->rcb.recv != NULL) {
@@ -197,6 +207,20 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
                                &(isc_region_t){ .base = dnssock->buf + 2,
                                                 .length = len },
                                listener->rcbarg);
+               } else if (dnssock->rcb.recv != NULL) {
+                       isc_nm_recv_cb_t cb = dnssock->rcb.recv;
+                       void *cbarg = dnssock->rcbarg;
+
+                       /*
+                        * We need to clear the read callback *before*
+                        * calling it, because it might make another
+                        * call to isc_nm_read() and set up a new callback.
+                        */
+                       isc__nmsocket_clearcb(dnssock);
+                       cb(dnshandle, ISC_R_SUCCESS,
+                          &(isc_region_t){ .base = dnssock->buf + 2,
+                                           .length = len },
+                          cbarg);
                }
 
                len += 2;
@@ -227,11 +251,12 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
 
        REQUIRE(VALID_NMSOCK(dnssock));
        REQUIRE(VALID_NMHANDLE(handle));
-       REQUIRE(dnssock->tid == isc_nm_tid());
 
        if (region == NULL || eresult != ISC_R_SUCCESS) {
                /* Connection closed */
-               isc_nmhandle_unref(handle);
+               if (eresult != ISC_R_CANCELED) {
+                       isc_nmhandle_unref(handle);
+               }
                dnssock->result = eresult;
                if (dnssock->self != NULL) {
                        isc__nmsocket_detach(&dnssock->self);
@@ -277,11 +302,14 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
                        uv_timer_stop(&dnssock->timer);
                }
 
-               if (atomic_load(&dnssock->sequential)) {
+               if (atomic_load(&dnssock->sequential) ||
+                   dnssock->rcb.recv == NULL) {
                        /*
-                        * We're in sequential mode and we processed
-                        * one packet, so we're done until the next read
-                        * completes.
+                        * Two reasons we might want to pause here:
+                        * - If we're in sequential mode and we've received
+                        *   a whole packet, so we're done until it's been
+                        *   processed;
+                        * - If we no longer have a read callback.
                         */
                        isc_nm_pauseread(dnssock->outerhandle);
                        done = true;
@@ -314,7 +342,6 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
                    void *cbarg, isc_nm_accept_cb_t accept_cb,
                    void *accept_cbarg, size_t extrahandlesize, int backlog,
                    isc_quota_t *quota, isc_nmsocket_t **sockp) {
-       /* A 'wrapper' socket object with outer set to true TCP socket */
        isc_nmsocket_t *dnslistensock = isc_mem_get(mgr->mctx,
                                                    sizeof(*dnslistensock));
        isc_result_t result;
@@ -328,7 +355,11 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
        dnslistensock->accept_cbarg = accept_cbarg;
        dnslistensock->extrahandlesize = extrahandlesize;
 
-       /* We set dnslistensock->outer to a true listening socket */
+       /*
+        * dnslistensock will be a DNS 'wrapper' around a connected
+        * stream. We set dnslistensock->outer to a socket listening
+        * for a TCP connection.
+        */
        result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, dnslistensock,
                                  extrahandlesize, backlog, quota,
                                  &dnslistensock->outer);
@@ -495,8 +526,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
 
                r.base = (unsigned char *)req->uvbuf.base;
                r.length = req->uvbuf.len;
-               result = isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb,
-                                         req);
+               result = isc_nm_send(sock->outerhandle, &r, tcpdnssend_cb, req);
        }
 
        if (result != ISC_R_SUCCESS) {
@@ -538,8 +568,8 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
                r.base = (unsigned char *)uvreq->uvbuf.base;
                r.length = uvreq->uvbuf.len;
 
-               return (isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb,
-                                        uvreq));
+               return (isc_nm_send(sock->outerhandle, &r, tcpdnssend_cb,
+                                   uvreq));
        } else {
                isc__netievent_tcpdnssend_t *ievent = NULL;
 
index 2e83ff1d21c32dac1da175ae1f263c92d8fb107a..4ac562cc6cdc1a678425fa46ed09ecb8eee0a48c 100644 (file)
@@ -80,7 +80,7 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
                csock->rcb.recv = cb;
                csock->rcbarg = cbarg;
                csock->fd = socket(family, SOCK_DGRAM, 0);
-               INSIST(csock->fd >= 0);
+               RUNTIME_CHECK(csock->fd >= 0);
 
                /*
                 * This is SO_REUSE**** hell:
@@ -223,7 +223,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
 }
 
 static void
-udp_close_cb(uv_handle_t *handle) {
+udp_stop_cb(uv_handle_t *handle) {
        isc_nmsocket_t *sock = uv_handle_get_data(handle);
        atomic_store(&sock->closed, true);
 
@@ -236,7 +236,7 @@ stop_udp_child(isc_nmsocket_t *sock) {
        REQUIRE(sock->tid == isc_nm_tid());
 
        uv_udp_recv_stop(&sock->uv_handle.udp);
-       uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_close_cb);
+       uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb);
 
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
 
@@ -395,7 +395,11 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
 
        result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
        RUNTIME_CHECK(result == ISC_R_SUCCESS);
-       nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
+       if (!atomic_load(&sock->connected)) {
+               nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
+       } else {
+               nmhandle = sock->statichandle;
+       }
        region.base = (unsigned char *)buf->base;
        region.length = nrecv;
 
@@ -425,13 +429,13 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
 isc_result_t
 isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                 void *cbarg) {
-       isc_nmsocket_t *psock = NULL, *rsock = NULL;
        isc_nmsocket_t *sock = handle->sock;
+       isc_nmsocket_t *psock = NULL, *rsock = sock;
        isc_sockaddr_t *peer = &handle->peer;
        isc__netievent_udpsend_t *ievent = NULL;
        isc__nm_uvreq_t *uvreq = NULL;
-       int ntid;
        uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
+       int ntid;
 
        /*
         * We're simulating a firewall blocking UDP packets bigger than
@@ -446,12 +450,12 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                return (ISC_R_SUCCESS);
        }
 
-       if (sock->type == isc_nm_udpsocket) {
+       if (sock->type == isc_nm_udpsocket && !atomic_load(&sock->client)) {
                INSIST(sock->parent != NULL);
                psock = sock->parent;
        } else if (sock->type == isc_nm_udplistener) {
                psock = sock;
-       } else {
+       } else if (!atomic_load(&sock->client)) {
                INSIST(0);
                ISC_UNREACHABLE();
        }
@@ -467,13 +471,16 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
         */
        if (isc__nm_in_netthread()) {
                ntid = isc_nm_tid();
-       } else if (sock->type == isc_nm_udpsocket) {
+       } else if (sock->type == isc_nm_udpsocket &&
+                  !atomic_load(&sock->client)) {
                ntid = sock->tid;
        } else {
                ntid = (int)isc_random_uniform(sock->nchildren);
        }
 
-       rsock = &psock->children[ntid];
+       if (psock != NULL) {
+               rsock = &psock->children[ntid];
+       }
 
        uvreq = isc__nm_uvreq_get(sock->mgr, sock);
        uvreq->uvbuf.base = (char *)region->base;
@@ -553,6 +560,7 @@ udp_send_cb(uv_udp_send_t *req, int status) {
 static isc_result_t
 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
                isc_sockaddr_t *peer) {
+       const struct sockaddr *sa = NULL;
        int rv;
 
        REQUIRE(sock->tid == isc_nm_tid());
@@ -561,9 +569,11 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        if (!isc__nmsocket_active(sock)) {
                return (ISC_R_CANCELED);
        }
+
        isc_nmhandle_ref(req->handle);
+       sa = atomic_load(&sock->connected) ? NULL : &peer->type.sa;
        rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
-                        &req->uvbuf, 1, &peer->type.sa, udp_send_cb);
+                        &req->uvbuf, 1, sa, udp_send_cb);
        if (rv < 0) {
                isc__nm_incstats(req->sock->mgr,
                                 req->sock->statsindex[STATID_SENDFAIL]);