]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
add netmgr functions to support outgoing DNS queries
authorEvan Hunt <each@isc.org>
Sat, 5 Sep 2020 18:07:40 +0000 (11:07 -0700)
committerOndřej Surý <ondrej@isc.org>
Fri, 30 Oct 2020 10:11:54 +0000 (11:11 +0100)
- isc_nm_tcpdnsconnect() sets up up an outgoing TCP DNS connection.
- isc_nm_tcpconnect(), _udpconnect() and _tcpdnsconnect() now take a
  timeout argument to ensure connections time out and are correctly
  cleaned up on failure.
- isc_nm_read() now supports UDP; it reads a single datagram and then
  stops until the next time it's called.
- isc_nm_cancelread() now runs asynchronously to prevent assertion
  failure if reading is interrupted by a non-network thread (e.g.
  a timeout).
- isc_nm_cancelread() can now apply to UDP sockets.
- added shim code to support UDP connection in versions of libuv
  prior to 1.27, when uv_udp_connect() was added

all these functions will be used to support outgoing queries in dig,
xfrin, dispatch, etc.

12 files changed:
bin/rndc/rndc.c
configure.ac
lib/isc/include/isc/netmgr.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c
lib/isc/netmgr/udp.c
lib/isc/netmgr/uv-compat.c
lib/isc/netmgr/uv-compat.h
lib/isc/netmgr/uverr2result.c
lib/isc/win32/libisc.def.in

index a8f7277a01744b65e00200b1aef9d9272f875fe8..bc3d1243285dad909ab01aa7ffd6c0c2cb4c8982 100644 (file)
@@ -603,7 +603,7 @@ rndc_startconnect(isc_sockaddr_t *addr) {
        DO("create connection",
           isc_nm_tcpconnect(netmgr, (isc_nmiface_t *)local,
                             (isc_nmiface_t *)addr, rndc_connected, &rndc_ccmsg,
-                            0));
+                            10000, 0));
 }
 
 static void
index 6c16a76c19066979ab6ec31026c3f5c82dc6ff81..c43c51eee6e19b59781ae071c7b482b667a951be 100644 (file)
@@ -574,7 +574,7 @@ LIBS="$LIBS $LIBUV_LIBS"
 
 # Those functions are only provided in newer versions of libuv, we'll be emulating them
 # for now
-AC_CHECK_FUNCS([uv_handle_get_data uv_handle_set_data uv_import])
+AC_CHECK_FUNCS([uv_handle_get_data uv_handle_set_data uv_import uv_udp_connect uv_translate_sys_error])
 AX_RESTORE_FLAGS([libuv])
 
 #
index 8843cbd54f0ec4ea24310d9bb9e112baf96ec569..063b5f41c3ddc5d52ed2eb78d5f44707cc73959a 100644 (file)
@@ -180,6 +180,25 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
  * can then be freed automatically when the handle is destroyed.
  */
 
+isc_result_t
+isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
+                 isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
+                 size_t extrahandlesize);
+/*%<
+ * Open a UDP socket, bind to 'local' and connect to 'peer', and
+ * immediately call 'cb' with a handle so that the caller can begin
+ * sending packets over UDP.
+ *
+ * When handles are allocated for the socket, 'extrasize' additional bytes
+ * can be allocated along with the handle for an associated object, which
+ * can then be freed automatically when the handle is destroyed.
+ *
+ * 'timeout' specifies the timeout interval in milliseconds.
+ *
+ * The connected socket can only be accessed via the handle passed to
+ * 'cb'.
+ */
+
 void
 isc_nm_stoplistening(isc_nmsocket_t *sock);
 /*%<
@@ -277,14 +296,17 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
 
 isc_result_t
 isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
-                 isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize);
+                 isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
+                 size_t extrahandlesize);
 /*%<
  * Create a socket using netmgr 'mgr', bind it to the address 'local',
  * and connect it to the address 'peer'.
  *
- * When the connection is complete, call 'cb' with argument 'cbarg'.
- * Allocate 'extrahandlesize' additional bytes along with the handle to use
- * for an associated object.
+ * When the connection is complete or has timed out, call 'cb' with
+ * argument 'cbarg'. Allocate 'extrahandlesize' additional bytes along
+ * with the handle to use for an associated object.
+ *
+ * 'timeout' specifies the timeout interval in milliseconds.
  *
  * The connected socket can only be accessed via the handle passed to
  * 'cb'.
@@ -390,3 +412,21 @@ isc_nm_setstats(isc_nm_t *mgr, isc_stats_t *stats);
  *\li  stats is a valid set of statistics counters supporting the
  *     full range of socket-related stats counter numbers.
  */
+
+isc_result_t
+isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
+                    isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
+                    size_t extrahandlesize);
+/*%
+ * Establish a DNS client connection over a TCP socket, bound to the
+ * address 'local', and connected to the address 'peer'.
+ *
+ * When the connection is complete or has timed out, call 'cb' with
+ * argument 'cbarg'. Allocate 'extrahandlesize' additional bytes along
+ * with the handle to use for an associated object.
+ *
+ * 'timeout' specifies the timeout interval in milliseconds.
+ *
+ * The connected socket can only be accessed via the handle passed to
+ * 'cb'.
+ */
index 99ce506987bc6cb3b9c8642be369552569f282d5..d5a167ad50ca3cdedf02df8666c1d196056fcc96 100644 (file)
@@ -135,8 +135,12 @@ struct isc_nmiface {
 };
 
 typedef enum isc__netievent_type {
+       netievent_udpconnect,
        netievent_udpsend,
+       netievent_udpread,
        netievent_udpstop,
+       netievent_udpcancel,
+       netievent_udpclose,
 
        netievent_tcpconnect,
        netievent_tcpsend,
@@ -145,9 +149,12 @@ typedef enum isc__netievent_type {
        netievent_tcpchildaccept,
        netievent_tcpaccept,
        netievent_tcpstop,
+       netievent_tcpcancel,
        netievent_tcpclose,
 
        netievent_tcpdnssend,
+       netievent_tcpdnsread,
+       netievent_tcpdnscancel,
        netievent_tcpdnsclose,
        netievent_tcpdnsstop,
 
@@ -212,13 +219,16 @@ typedef struct isc__netievent__socket {
 } isc__netievent__socket_t;
 
 typedef isc__netievent__socket_t isc__netievent_udplisten_t;
+typedef isc__netievent__socket_t isc__netievent_udpread_t;
 typedef isc__netievent__socket_t isc__netievent_udpstop_t;
+typedef isc__netievent__socket_t isc__netievent_udpclose_t;
 typedef isc__netievent__socket_t isc__netievent_tcpstop_t;
 typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
 typedef isc__netievent__socket_t isc__netievent_startread_t;
 typedef isc__netievent__socket_t isc__netievent_pauseread_t;
 typedef isc__netievent__socket_t isc__netievent_closecb_t;
 typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
+typedef isc__netievent__socket_t isc__netievent_tcpdnsread_t;
 typedef isc__netievent__socket_t isc__netievent_tcpdnsstop_t;
 
 typedef struct isc__netievent__socket_req {
@@ -227,6 +237,7 @@ typedef struct isc__netievent__socket_req {
        isc__nm_uvreq_t *req;
 } isc__netievent__socket_req_t;
 
+typedef isc__netievent__socket_req_t isc__netievent_udpconnect_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
@@ -248,6 +259,9 @@ typedef struct isc__netievent__socket_handle {
        isc_nmhandle_t *handle;
 } isc__netievent__socket_handle_t;
 
+typedef isc__netievent__socket_handle_t isc__netievent_udpcancel_t;
+typedef isc__netievent__socket_handle_t isc__netievent_tcpcancel_t;
+typedef isc__netievent__socket_handle_t isc__netievent_tcpdnscancel_t;
 typedef isc__netievent__socket_handle_t isc__netievent_detach_t;
 
 typedef struct isc__netievent__socket_quota {
@@ -399,11 +413,14 @@ struct isc_nmsocket {
        const isc_statscounter_t *statsindex;
 
        /*%
-        * TCP read timeout timer.
+        * TCP read/connect timeout timers.
         */
        uv_timer_t timer;
        bool timer_initialized;
+       bool timer_running;
        uint64_t read_timeout;
+       uint64_t connect_timeout;
+       bool timed_out;
 
        /*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */
        isc_nmsocket_t *outer;
@@ -452,6 +469,7 @@ struct isc_nmsocket {
        atomic_bool closed;
        atomic_bool listening;
        atomic_bool listen_error;
+       atomic_bool connecting;
        atomic_bool connected;
        atomic_bool connect_error;
        isc_refcount_t references;
@@ -506,9 +524,9 @@ struct isc_nmsocket {
        isc_condition_t cond;
 
        /*%
-        * Used to pass a result back from TCP listening events.
+        * Used to pass a result back from listen or connect events.
         */
-       isc_result_t result;
+       atomic_int_fast32_t result;
 
        /*%
         * List of active handles.
@@ -551,6 +569,9 @@ struct isc_nmsocket {
        isc_nm_recv_cb_t recv_cb;
        void *recv_cbarg;
 
+       isc_nm_cb_t connect_cb;
+       void *connect_cbarg;
+
        isc_nm_accept_cb_t accept_cb;
        void *accept_cbarg;
 #ifdef NETMGR_TRACE
@@ -698,16 +719,41 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
  * Back-end implementation of isc_nm_send() for UDP handles.
  */
 
+isc_result_t
+isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
+/*
+ * Back-end implementation of isc_nm_read() for UDP handles.
+ */
+
+void
+isc__nm_udp_close(isc_nmsocket_t *sock);
+/*%<
+ * Close a UDP socket.
+ */
+
+void
+isc__nm_udp_cancelread(isc_nmhandle_t *handle);
+/*%<
+ * Stop reading on a connected UDP handle.
+ */
+
 void
 isc__nm_udp_stoplistening(isc_nmsocket_t *sock);
 
 void
 isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0);
-
+void
+isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0);
 void
 isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0);
 void
 isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0);
+void
+isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0);
+void
+isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0);
+void
+isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0);
 /*%<
  * Callback handlers for asynchronous UDP events (listen, stoplisten, send).
  */
@@ -780,6 +826,8 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0);
 void
 isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0);
 void
+isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0);
+void
 isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0);
 /*%<
  * Callback handlers for asynchronous TCP events (connect, listen,
@@ -802,6 +850,8 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock);
 void
 isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
 
+void
+isc__nm_async_tcpdnscancel(isc__networker_t *worker, isc__netievent_t *ev0);
 void
 isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0);
 void
@@ -809,6 +859,18 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0);
 void
 isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0);
 
+void
+isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0);
+
+isc_result_t
+isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
+
+void
+isc__nm_tcpdns_cancelread(isc_nmhandle_t *handle);
+/*%<
+ * Stop reading on a connected TCPDNS handle.
+ */
+
 #define isc__nm_uverr2result(x) \
        isc___nm_uverr2result(x, true, __FILE__, __LINE__)
 isc_result_t
index cb6b18911c476098d186c3425a4eed30df4e0d1a..144380deaf99cb1b5ca4b39d64f042c2bc3a2683 100644 (file)
@@ -612,6 +612,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                        more = false;
                        break;
 
+               case netievent_udpconnect:
+                       isc__nm_async_udpconnect(worker, ievent);
+                       break;
                case netievent_udplisten:
                        isc__nm_async_udplisten(worker, ievent);
                        break;
@@ -621,6 +624,15 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                case netievent_udpsend:
                        isc__nm_async_udpsend(worker, ievent);
                        break;
+               case netievent_udpread:
+                       isc__nm_async_udpread(worker, ievent);
+                       break;
+               case netievent_udpcancel:
+                       isc__nm_async_udpcancel(worker, ievent);
+                       break;
+               case netievent_udpclose:
+                       isc__nm_async_udpclose(worker, ievent);
+                       break;
 
                case netievent_tcpconnect:
                        isc__nm_async_tcpconnect(worker, ievent);
@@ -649,13 +661,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                case netievent_tcpstop:
                        isc__nm_async_tcpstop(worker, ievent);
                        break;
+               case netievent_tcpcancel:
+                       isc__nm_async_tcpcancel(worker, ievent);
+                       break;
                case netievent_tcpclose:
                        isc__nm_async_tcpclose(worker, ievent);
                        break;
 
+               case netievent_tcpdnscancel:
+                       isc__nm_async_tcpdnscancel(worker, ievent);
+                       break;
                case netievent_tcpdnsclose:
                        isc__nm_async_tcpdnsclose(worker, ievent);
                        break;
+               case netievent_tcpdnsread:
+                       isc__nm_async_tcpdnsread(worker, ievent);
+                       break;
                case netievent_tcpdnsstop:
                        isc__nm_async_tcpdnsstop(worker, ievent);
                        break;
@@ -934,6 +955,9 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock) {
         */
        if (!atomic_load(&sock->closed)) {
                switch (sock->type) {
+               case isc_nm_udpsocket:
+                       isc__nm_udp_close(sock);
+                       return;
                case isc_nm_tcpsocket:
                        isc__nm_tcp_close(sock);
                        return;
@@ -1471,9 +1495,15 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        REQUIRE(VALID_NMHANDLE(handle));
 
        switch (handle->sock->type) {
+       case isc_nm_udpsocket:
+               isc__nm_udp_read(handle, cb, cbarg);
+               break;
        case isc_nm_tcpsocket:
                isc__nm_tcp_read(handle, cb, cbarg);
                break;
+       case isc_nm_tcpdnssocket:
+               isc__nm_tcpdns_read(handle, cb, cbarg);
+               break;
        default:
                INSIST(0);
                ISC_UNREACHABLE();
@@ -1485,9 +1515,15 @@ isc_nm_cancelread(isc_nmhandle_t *handle) {
        REQUIRE(VALID_NMHANDLE(handle));
 
        switch (handle->sock->type) {
+       case isc_nm_udpsocket:
+               isc__nm_udp_cancelread(handle);
+               break;
        case isc_nm_tcpsocket:
                isc__nm_tcp_cancelread(handle);
                break;
+       case isc_nm_tcpdnssocket:
+               isc__nm_tcpdns_cancelread(handle);
+               break;
        default:
                INSIST(0);
                ISC_UNREACHABLE();
index 43d3f9fc9fbb99befddc3a2caa6fe9018a166f5d..ce95559d75926aa339268d3fc7dce3dd3e3e7d46 100644 (file)
@@ -77,6 +77,22 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
 static void
 quota_accept_cb(isc_quota_t *quota, void *sock0);
 
+static void
+connecttimeout_cb(uv_timer_t *handle) {
+       isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle);
+       isc_nmsocket_t *sock = req->sock;
+
+       if (req->cb.connect != NULL) {
+               req->cb.connect(NULL, ISC_R_TIMEDOUT, req->cbarg);
+       }
+
+       uv_timer_stop(&sock->timer);
+       sock->timer_running = false;
+       sock->timed_out = true;
+       isc__nm_uvreq_put(&req, sock);
+       isc__nmsocket_detach(&sock);
+}
+
 static int
 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        isc__networker_t *worker = NULL;
@@ -86,12 +102,24 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
 
        worker = &sock->mgr->workers[isc_nm_tid()];
 
+       atomic_store(&sock->connecting, true);
+
+       if (!sock->timer_initialized) {
+               uv_timer_init(&worker->loop, &sock->timer);
+               uv_handle_set_data((uv_handle_t *)&sock->timer, req);
+               sock->timer_initialized = true;
+       }
+
+       uv_timer_start(&sock->timer, connecttimeout_cb, sock->connect_timeout,
+                      0);
+       sock->timer_running = true;
+
        r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
        if (r != 0) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
                /* Socket was never opened; no need for tcp_close_direct() */
                atomic_store(&sock->closed, true);
-               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->connect_error, true);
                return (r);
        }
@@ -101,7 +129,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                if (r != 0) {
                        isc__nm_incstats(sock->mgr,
                                         sock->statsindex[STATID_BINDFAIL]);
-                       sock->result = isc__nm_uverr2result(r);
+                       atomic_store(&sock->result, isc__nm_uverr2result(r));
                        atomic_store(&sock->connect_error, true);
                        tcp_close_direct(sock);
                        return (r);
@@ -114,11 +142,14 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        if (r != 0) {
                isc__nm_incstats(sock->mgr,
                                 sock->statsindex[STATID_CONNECTFAIL]);
-               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->connect_error, true);
                tcp_close_direct(sock);
+               return (r);
        }
-       return (r);
+       isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
+
+       return (0);
 }
 
 void
@@ -159,15 +190,30 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
 
        sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
 
-       REQUIRE(VALID_UVREQ(req));
+       atomic_store(&sock->connecting, false);
+
+       if (sock->timed_out) {
+               return;
+       }
+
+       uv_timer_stop(&sock->timer);
+       sock->timer_running = false;
 
        if (status != 0) {
                req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
-               isc__nm_uvreq_put(&req, sock);
-               isc__nmsocket_detach(&sock);
+               if (status != UV_ECANCELED) {
+                       /*
+                        * In this case the resources would already
+                        * have been freed in isc__nm_tcp_shutdown().
+                        */
+                       isc__nm_uvreq_put(&req, sock);
+                       isc__nmsocket_detach(&sock);
+               }
                return;
        }
 
+       REQUIRE(VALID_UVREQ(req));
+
        sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
        uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
@@ -180,8 +226,6 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
 
        isc__nm_uvreq_put(&req, sock);
 
-       atomic_init(&sock->client, true);
-
        /*
         * The sock is now attached to the handle.
         */
@@ -196,11 +240,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
 
 isc_result_t
 isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
-                 isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) {
+                 isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
+                 size_t extrahandlesize) {
+       isc_result_t result = ISC_R_SUCCESS;
        isc_nmsocket_t *nsock = NULL, *tmp = NULL;
        isc__netievent_tcpconnect_t *ievent = NULL;
        isc__nm_uvreq_t *req = NULL;
-       isc_result_t result = ISC_R_SUCCESS;
 
        REQUIRE(VALID_NM(mgr));
        REQUIRE(local != NULL);
@@ -209,7 +254,9 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
        isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
        nsock->extrahandlesize = extrahandlesize;
-       nsock->result = ISC_R_SUCCESS;
+       nsock->connect_timeout = timeout;
+       atomic_init(&nsock->result, ISC_R_SUCCESS);
+       atomic_init(&nsock->client, true);
 
        req = isc__nm_uvreq_get(mgr, nsock);
        req->cb.connect = cb;
@@ -245,9 +292,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                UNLOCK(&nsock->lock);
        }
 
-       if (nsock->result != ISC_R_SUCCESS) {
-               result = nsock->result;
-       }
+       result = atomic_load(&nsock->result);
 
        isc__nmsocket_detach(&tmp);
 
@@ -266,11 +311,12 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
 
        nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
        isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
+
        nsock->accept_cb = accept_cb;
        nsock->accept_cbarg = accept_cbarg;
        nsock->extrahandlesize = extrahandlesize;
        nsock->backlog = backlog;
-       nsock->result = ISC_R_SUCCESS;
+       atomic_init(&nsock->result, ISC_R_SUCCESS);
        if (quota != NULL) {
                /*
                 * We don't attach to quota, just assign - to avoid
@@ -300,11 +346,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
                UNLOCK(&nsock->lock);
        }
 
-       if (nsock->result == ISC_R_SUCCESS) {
+       if (atomic_load(&nsock->result) == ISC_R_SUCCESS) {
                *sockp = nsock;
                return (ISC_R_SUCCESS);
        } else {
-               isc_result_t result = nsock->result;
+               isc_result_t result = atomic_load(&nsock->result);
                isc__nmsocket_detach(&nsock);
                return (result);
        }
@@ -333,7 +379,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
                /* The socket was never opened, so no need for uv_close() */
                atomic_store(&sock->closed, true);
-               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->listen_error, true);
                goto done;
        }
@@ -364,7 +410,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
        if (r != 0) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
                uv_close(&sock->uv_handle.handle, tcp_close_cb);
-               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->listen_error, true);
                goto done;
        }
@@ -379,7 +425,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
                               &snamelen);
        if (r != 0) {
                uv_close(&sock->uv_handle.handle, tcp_close_cb);
-               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->listen_error, true);
                goto done;
        }
@@ -396,7 +442,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
                              "uv_listen failed: %s",
                              isc_result_totext(isc__nm_uverr2result(r)));
                uv_close(&sock->uv_handle.handle, tcp_close_cb);
-               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->listen_error, true);
                goto done;
        }
@@ -600,6 +646,7 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
 
        if (sock->timer_initialized) {
                uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
        }
 
        if (sock->quota) {
@@ -652,6 +699,7 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        }
 
        REQUIRE(sock->tid == isc_nm_tid());
+
        sock->recv_cb = cb;
        sock->recv_cbarg = cbarg;
 
@@ -719,6 +767,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
                }
                uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout,
                               0);
+               sock->timer_running = true;
        }
 }
 
@@ -756,8 +805,9 @@ isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(worker->id == isc_nm_tid());
 
-       if (sock->timer_initialized) {
+       if (sock->timer_running) {
                uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
        }
        uv_read_stop(&sock->uv_handle.stream);
 }
@@ -897,6 +947,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
        isc__netievent_tcpchildaccept_t *event = NULL;
        isc__networker_t *worker = NULL;
        uv_tcp_t *uvstream = NULL;
+       isc_nmsocket_t *csock = NULL;
        isc_mem_t *mctx = NULL;
        int r, w;
 
@@ -973,8 +1024,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
                return (result);
        }
 
-       isc_nmsocket_t *csock = isc_mem_get(ssock->mgr->mctx,
-                                           sizeof(isc_nmsocket_t));
+       csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
        isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
        csock->tid = w;
        csock->extrahandlesize = ssock->extrahandlesize;
@@ -1148,12 +1198,14 @@ tcp_close_direct(isc_nmsocket_t *sock) {
 
        uv_read_stop((uv_stream_t *)&sock->uv_handle.handle);
 
-       if (sock->timer_initialized) {
+       if (sock->timer_running) {
                uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
        }
 
        if (sock->timer_initialized) {
                sock->timer_initialized = false;
+               uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
                uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
        } else {
                uv_close(&sock->uv_handle.handle, tcp_close_cb);
@@ -1202,7 +1254,24 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
                return;
        }
 
-       if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) {
+       if (atomic_load(&sock->connecting)) {
+               isc__nm_uvreq_t *req = NULL;
+
+               atomic_store(&sock->connecting, false);
+               req = uv_handle_get_data((uv_handle_t *)&sock->timer);
+               uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
+
+               isc__nmsocket_clearcb(sock);
+               if (sock->connect_cb != NULL) {
+                       sock->connect_cb(NULL, ISC_R_CANCELED,
+                                        sock->connect_cbarg);
+               }
+
+               isc__nm_uvreq_put(&req, sock);
+               isc__nmsocket_detach(&sock);
+       } else if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL)
+       {
                failed_read_cb(sock, ISC_R_CANCELED);
        }
 }
@@ -1210,16 +1279,36 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
 void
 isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
        isc_nmsocket_t *sock = NULL;
+       isc__netievent_tcpcancel_t *ievent = NULL;
 
        REQUIRE(VALID_NMHANDLE(handle));
 
        sock = handle->sock;
 
-       REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->type == isc_nm_tcpsocket);
+
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpcancel);
+       ievent->sock = sock;
+       isc_nmhandle_attach(handle, &ievent->handle);
+       isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                              (isc__netievent_t *)ievent);
+}
+
+void
+isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc_nmhandle_t *handle = ievent->handle;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(worker->id == sock->tid);
        REQUIRE(sock->tid == isc_nm_tid());
 
+       uv_read_stop(&sock->uv_handle.stream);
+
        if (atomic_load(&sock->client)) {
                failed_read_cb(sock, ISC_R_EOF);
        }
+
+       isc_nmhandle_detach(&handle);
 }
index 63391273cb158312da840555bdd520ba5dc43c40..255c44fe2c3b6e0544a671d5e8a60a838ece13da 100644 (file)
@@ -262,7 +262,11 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
 
        if (region == NULL || eresult != ISC_R_SUCCESS) {
                /* Connection closed */
-               dnssock->result = eresult;
+               atomic_store(&dnssock->result, eresult);
+               if (atomic_load(&dnssock->client) && dnssock->recv_cb != NULL) {
+                       dnssock->recv_cb(dnssock->statichandle, eresult, NULL,
+                                        dnssock->recv_cbarg);
+               }
                if (dnssock->self != NULL) {
                        isc__nmsocket_detach(&dnssock->self);
                }
@@ -270,7 +274,15 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
                if (dnssock->outerhandle != NULL) {
                        isc_nmhandle_detach(&dnssock->outerhandle);
                }
-               isc_nmhandle_detach(&handle);
+
+               /*
+                * Server connections will hold two handle references when
+                * shut down, but client (tcpdnsconnect) connections have
+                * only one.
+                */
+               if (!atomic_load(&dnssock->client)) {
+                       isc_nmhandle_detach(&handle);
+               }
                return;
        }
 
@@ -677,3 +689,204 @@ isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) {
 
        tcpdns_close_direct(ievent->sock);
 }
+
+typedef struct {
+       isc_mem_t *mctx;
+       isc_nm_cb_t cb;
+       void *cbarg;
+       size_t extrahandlesize;
+} tcpconnect_t;
+
+static void
+tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
+       tcpconnect_t *conn = (tcpconnect_t *)arg;
+       isc_nm_cb_t cb = conn->cb;
+       void *cbarg = conn->cbarg;
+       size_t extrahandlesize = conn->extrahandlesize;
+       isc_nmsocket_t *dnssock = NULL;
+
+       REQUIRE(result != ISC_R_SUCCESS || VALID_NMHANDLE(handle));
+
+       isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
+
+       if (result != ISC_R_SUCCESS) {
+               cb(NULL, result, cbarg);
+               return;
+       }
+
+       dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock));
+       isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket,
+                          handle->sock->iface);
+
+       dnssock->extrahandlesize = extrahandlesize;
+       isc_nmhandle_attach(handle, &dnssock->outerhandle);
+
+       dnssock->peer = handle->sock->peer;
+       dnssock->read_timeout = handle->sock->mgr->init;
+       dnssock->tid = isc_nm_tid();
+
+       atomic_init(&dnssock->client, true);
+       dnssock->statichandle = isc__nmhandle_get(dnssock, NULL, NULL);
+
+       uv_timer_init(&dnssock->mgr->workers[isc_nm_tid()].loop,
+                     &dnssock->timer);
+       dnssock->timer.data = dnssock;
+       dnssock->timer_initialized = true;
+       uv_timer_start(&dnssock->timer, dnstcp_readtimeout,
+                      dnssock->read_timeout, 0);
+
+       /*
+        * The connection is now established; we start reading immediately,
+        * before we've been asked to. We'll read and buffer at most one
+        * packet.
+        */
+       isc_nm_read(handle, dnslisten_readcb, dnssock);
+       cb(dnssock->statichandle, ISC_R_SUCCESS, cbarg);
+       isc__nmsocket_detach(&dnssock);
+}
+
+isc_result_t
+isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
+                    isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
+                    size_t extrahandlesize) {
+       tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t));
+
+       *conn = (tcpconnect_t){ .cb = cb,
+                               .cbarg = cbarg,
+                               .extrahandlesize = extrahandlesize };
+       isc_mem_attach(mgr->mctx, &conn->mctx);
+       return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
+                                 timeout, 0));
+}
+
+isc_result_t
+isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
+       isc_nmsocket_t *sock = handle->sock;
+       isc__netievent_tcpdnsread_t *ievent = NULL;
+       isc_nmhandle_t *eventhandle = NULL;
+
+       REQUIRE(handle == sock->statichandle);
+       REQUIRE(sock->recv_cb == NULL);
+       REQUIRE(sock->tid == isc_nm_tid());
+       REQUIRE(atomic_load(&sock->client));
+
+       /*
+        * This MUST be done asynchronously, no matter which thread we're
+        * in. The callback function for isc_nm_read() often calls
+        * isc_nm_read() again; if we tried to do that synchronously
+        * we'd clash in processbuffer() and grow the stack indefinitely.
+        */
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnsread);
+       ievent->sock = sock;
+
+       sock->recv_cb = cb;
+       sock->recv_cbarg = cbarg;
+
+       /*
+        * Add a reference to the handle to keep it from being freed by
+        * the caller; it will be detached in in isc__nm_async_tcpdnsread().
+        */
+       isc_nmhandle_attach(handle, &eventhandle);
+       isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                              (isc__netievent_t *)ievent);
+       return (ISC_R_SUCCESS);
+}
+
+void
+isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc_result_t result;
+       isc__netievent_tcpdnsread_t *ievent =
+               (isc__netievent_tcpdnsclose_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc_nmhandle_t *handle = NULL, *newhandle = NULL;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(worker->id == sock->tid);
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       handle = sock->statichandle;
+
+       if (sock->type != isc_nm_tcpdnssocket || sock->outerhandle == NULL) {
+               if (sock->recv_cb != NULL) {
+                       sock->recv_cb(handle, ISC_R_NOTCONNECTED, NULL,
+                                     sock->recv_cbarg);
+               }
+               isc_nmhandle_detach(&handle);
+               return;
+       }
+
+       /*
+        * Maybe we have a packet already?
+        */
+       result = processbuffer(sock, &newhandle);
+       if (result == ISC_R_SUCCESS) {
+               atomic_store(&sock->outerhandle->sock->processing, true);
+               if (sock->timer_initialized) {
+                       uv_timer_stop(&sock->timer);
+               }
+               isc_nmhandle_detach(&newhandle);
+       } else if (sock->outerhandle != NULL) {
+               /* Restart reading, wait for the callback */
+               atomic_store(&sock->outerhandle->sock->processing, false);
+               if (sock->timer_initialized) {
+                       uv_timer_start(&sock->timer, dnstcp_readtimeout,
+                                      sock->read_timeout, 0);
+               }
+               isc_nm_resumeread(sock->outerhandle);
+       } else {
+               isc_nm_recv_cb_t cb = sock->recv_cb;
+               void *cbarg = sock->recv_cbarg;
+
+               isc__nmsocket_clearcb(sock);
+               cb(handle, ISC_R_NOTCONNECTED, NULL, cbarg);
+       }
+
+       isc_nmhandle_detach(&handle);
+}
+
+void
+isc__nm_tcpdns_cancelread(isc_nmhandle_t *handle) {
+       isc_nmsocket_t *sock = NULL;
+       isc__netievent_tcpdnscancel_t *ievent = NULL;
+
+       REQUIRE(VALID_NMHANDLE(handle));
+
+       sock = handle->sock;
+
+       REQUIRE(sock->type == isc_nm_tcpdnssocket);
+
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnscancel);
+       ievent->sock = sock;
+       isc_nmhandle_attach(handle, &ievent->handle);
+       isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                              (isc__netievent_t *)ievent);
+}
+
+void
+isc__nm_async_tcpdnscancel(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_tcpdnscancel_t *ievent =
+               (isc__netievent_tcpdnscancel_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc_nmhandle_t *handle = ievent->handle;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(worker->id == sock->tid);
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       if (atomic_load(&sock->client)) {
+               isc_nm_recv_cb_t cb;
+               void *cbarg = NULL;
+
+               cb = sock->recv_cb;
+               cbarg = sock->recv_cbarg;
+               isc__nmsocket_clearcb(sock);
+
+               if (cb != NULL) {
+                       cb(handle, ISC_R_EOF, NULL, cbarg);
+               }
+
+               isc__nm_tcp_cancelread(sock->outerhandle);
+       }
+
+       isc_nmhandle_detach(&handle);
+}
index d97a0b612a40f6b0e7de5dffadc8c23f0583b758..01974a53d1540bb4bbf74a0bb640e24737bed715 100644 (file)
@@ -15,6 +15,7 @@
 #include <isc/atomic.h>
 #include <isc/buffer.h>
 #include <isc/condition.h>
+#include <isc/errno.h>
 #include <isc/magic.h>
 #include <isc/mem.h>
 #include <isc/netmgr.h>
@@ -40,6 +41,12 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
 static void
 udp_send_cb(uv_udp_send_t *req, int status);
 
+static void
+udp_close_cb(uv_handle_t *uvhandle);
+
+static void
+udp_close_direct(isc_nmsocket_t *sock);
+
 isc_result_t
 isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
                 void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
@@ -336,6 +343,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
        bool free_buf = true;
        isc_nm_recv_cb_t cb;
        void *cbarg;
+       bool connected;
 
        /*
         * Even though destruction of the socket can only happen from the
@@ -371,12 +379,19 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
 
        result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
        RUNTIME_CHECK(result == ISC_R_SUCCESS);
-       nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
+       connected = atomic_load(&sock->connected);
+
+       if (!connected) {
+               nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
+       } else {
+               nmhandle = sock->statichandle;
+       }
        region.base = (unsigned char *)buf->base;
        region.length = nrecv;
 
        INSIST(sock->tid == isc_nm_tid());
        INSIST(sock->recv_cb != NULL);
+
        cb = sock->recv_cb;
        cbarg = sock->recv_cbarg;
 
@@ -395,7 +410,9 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
         * If the recv callback wants to hold on to the handle,
         * it needs to attach to it.
         */
-       isc_nmhandle_detach(&nmhandle);
+       if (!connected) {
+               isc_nmhandle_detach(&nmhandle);
+       }
 }
 
 /*
@@ -510,6 +527,12 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
        REQUIRE(sock->type == isc_nm_udpsocket);
        REQUIRE(worker->id == sock->tid);
 
+       if (!isc__nmsocket_active(ievent->sock)) {
+               uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg);
+               isc__nm_uvreq_put(&uvreq, sock);
+               return;
+       }
+
        result = udp_send_direct(sock, uvreq, &ievent->peer);
        if (result != ISC_R_SUCCESS) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
@@ -543,7 +566,7 @@ udp_send_cb(uv_udp_send_t *req, int status) {
 static isc_result_t
 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
                isc_sockaddr_t *peer) {
-       const struct sockaddr *sa = NULL;
+       const struct sockaddr *sa = &peer->type.sa;
        int r;
 
        REQUIRE(VALID_NMSOCK(sock));
@@ -555,7 +578,17 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
                return (ISC_R_CANCELED);
        }
 
-       sa = atomic_load(&sock->connected) ? NULL : &peer->type.sa;
+#ifdef HAVE_UV_UDP_CONNECT
+       /*
+        * If we used uv_udp_connect() (and not the shim version for
+        * older versions of libuv), then the peer address has to be
+        * set to NULL or else uv_udp_send() could fail or assert,
+        * depending on the libuv version.
+        */
+       if (atomic_load(&sock->connected)) {
+               sa = NULL;
+       }
+#endif
        r = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
                        &req->uvbuf, 1, sa, udp_send_cb);
        if (r < 0) {
@@ -564,3 +597,410 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
 
        return (ISC_R_SUCCESS);
 }
+
+static int
+udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
+       isc__networker_t *worker = NULL;
+       int uv_bind_flags = UV_UDP_REUSEADDR;
+       int r;
+
+       REQUIRE(isc__nm_in_netthread());
+
+       worker = &sock->mgr->workers[isc_nm_tid()];
+
+       atomic_store(&sock->connecting, true);
+
+       r = uv_udp_init(&worker->loop, &sock->uv_handle.udp);
+       if (r != 0) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
+               /* Socket was never opened; no need for udp_close_direct() */
+               atomic_store(&sock->closed, true);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
+               atomic_store(&sock->connect_error, true);
+               return (r);
+       }
+
+       r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
+       if (r != 0) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
+               atomic_store(&sock->closed, true);
+               atomic_store(&sock->connect_error, true);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
+               return (r);
+       }
+       isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
+
+       if (sock->iface->addr.type.sa.sa_family == AF_INET6) {
+               uv_bind_flags |= UV_UDP_IPV6ONLY;
+       }
+
+       r = uv_udp_bind(&sock->uv_handle.udp, &sock->iface->addr.type.sa,
+                       uv_bind_flags);
+       if (r != 0) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
+               atomic_store(&sock->connect_error, true);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
+               udp_close_direct(sock);
+               return (r);
+       }
+
+       uv_handle_set_data(&sock->uv_handle.handle, sock);
+
+       r = isc_uv_udp_connect(&sock->uv_handle.udp, &req->peer.type.sa);
+       if (r != 0) {
+               isc__nm_incstats(sock->mgr,
+                                sock->statsindex[STATID_CONNECTFAIL]);
+               atomic_store(&sock->connect_error, true);
+               atomic_store(&sock->result, isc__nm_uverr2result(r));
+               udp_close_direct(sock);
+               return (r);
+       }
+       isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
+
+#ifdef ISC_RECV_BUFFER_SIZE
+       uv_recv_buffer_size(&sock->uv_handle.handle,
+                           &(int){ ISC_RECV_BUFFER_SIZE });
+#endif
+#ifdef ISC_SEND_BUFFER_SIZE
+       uv_send_buffer_size(&sock->uv_handle.handle,
+                           &(int){ ISC_SEND_BUFFER_SIZE });
+#endif
+       return (0);
+}
+
+/*
+ * Asynchronous 'udpconnect' call handler: open a new UDP socket and call
+ * the 'open' callback with a handle.
+ */
+void
+isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_udpconnect_t *ievent =
+               (isc__netievent_udpconnect_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *req = ievent->req;
+       isc_nmhandle_t *handle = NULL;
+       isc_nm_cb_t cb;
+       void *cbarg;
+       int r;
+       isc_result_t result;
+
+       UNUSED(worker);
+
+       REQUIRE(sock->type == isc_nm_udpsocket);
+       REQUIRE(sock->iface != NULL);
+       REQUIRE(sock->parent == NULL);
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       cb = sock->connect_cb;
+       cbarg = sock->connect_cbarg;
+
+       r = udp_connect_direct(sock, req);
+       if (r != 0) {
+               result = isc__nm_uverr2result(r);
+       } else {
+               atomic_store(&sock->connected, true);
+               atomic_store(&sock->result, ISC_R_SUCCESS);
+               result = atomic_load(&sock->result);
+       }
+
+       handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
+       cb(handle, result, cbarg);
+
+       LOCK(&sock->lock);
+       SIGNAL(&sock->cond);
+       UNLOCK(&sock->lock);
+
+       /*
+        * The sock is now attached to the handle.
+        */
+       isc__nmsocket_detach(&sock);
+
+       /*
+        * The connect callback should have attached to the handle.
+        * If it didn't, the socket will be closed now.
+        */
+       isc_nmhandle_detach(&handle);
+}
+
+isc_result_t
+isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
+                 isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
+                 size_t extrahandlesize) {
+       isc_result_t result = ISC_R_SUCCESS;
+       isc_nmsocket_t *sock = NULL, *tmp = NULL;
+       isc__netievent_udpconnect_t *event = NULL;
+       isc__nm_uvreq_t *req = NULL;
+       sa_family_t sa_family;
+
+       REQUIRE(VALID_NM(mgr));
+       REQUIRE(local != NULL);
+       REQUIRE(peer != NULL);
+
+       sa_family = peer->addr.type.sa.sa_family;
+
+       sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
+       isc__nmsocket_init(sock, mgr, isc_nm_udpsocket, local);
+
+       INSIST(sock->connect_cb == NULL && sock->connect_cbarg == NULL);
+       sock->connect_cb = cb;
+       sock->connect_cbarg = cbarg;
+       sock->read_timeout = timeout;
+       sock->extrahandlesize = extrahandlesize;
+       sock->peer = peer->addr;
+       atomic_init(&sock->client, true);
+
+       sock->fd = socket(sa_family, SOCK_DGRAM, 0);
+       RUNTIME_CHECK(sock->fd >= 0);
+
+       result = isc__nm_socket_reuse(sock->fd);
+       RUNTIME_CHECK(result == ISC_R_SUCCESS ||
+                     result == ISC_R_NOTIMPLEMENTED);
+
+       result = isc__nm_socket_reuse_lb(sock->fd);
+       RUNTIME_CHECK(result == ISC_R_SUCCESS ||
+                     result == ISC_R_NOTIMPLEMENTED);
+
+       (void)isc__nm_socket_incoming_cpu(sock->fd);
+
+       (void)isc__nm_socket_dontfrag(sock->fd, sa_family);
+
+       req = isc__nm_uvreq_get(mgr, sock);
+       req->cb.connect = cb;
+       req->cbarg = cbarg;
+       req->peer = peer->addr;
+       req->local = local->addr;
+
+       event = isc__nm_get_ievent(mgr, netievent_udpconnect);
+       event->sock = sock;
+       event->req = req;
+
+       /*
+        * Hold an additional sock reference so async callbacks
+        * can't destroy it until we're ready.
+        */
+       isc__nmsocket_attach(sock, &tmp);
+
+       if (isc__nm_in_netthread()) {
+               sock->tid = isc_nm_tid();
+               isc__nm_async_udpconnect(&mgr->workers[sock->tid],
+                                        (isc__netievent_t *)event);
+               isc__nm_put_ievent(mgr, event);
+               isc__nm_uvreq_put(&req, sock);
+       } else {
+               sock->tid = isc_random_uniform(mgr->nworkers);
+               isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
+                                      (isc__netievent_t *)event);
+
+               LOCK(&sock->lock);
+               while (!atomic_load(&sock->connected) &&
+                      !atomic_load(&sock->connect_error)) {
+                       WAIT(&sock->cond, &sock->lock);
+               }
+               UNLOCK(&sock->lock);
+               isc__nm_uvreq_put(&req, sock);
+       }
+
+       if (atomic_load(&sock->result) != ISC_R_SUCCESS) {
+               result = atomic_load(&sock->result);
+       }
+
+       isc__nmsocket_detach(&tmp);
+
+       return (result);
+}
+
+static void
+udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
+           const struct sockaddr *addr, unsigned flags) {
+       isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
+
+       udp_recv_cb(handle, nrecv, buf, addr, flags);
+       uv_udp_recv_stop(&sock->uv_handle.udp);
+}
+
+static void
+readtimeout_cb(uv_timer_t *handle) {
+       isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
+       isc_nm_recv_cb_t cb;
+       void *cbarg = NULL;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       /*
+        * Timeout; stop reading and process whatever we have.
+        */
+       uv_udp_recv_stop(&sock->uv_handle.udp);
+
+       cb = sock->recv_cb;
+       cbarg = sock->recv_cbarg;
+       isc__nmsocket_clearcb(sock);
+
+       if (cb != NULL) {
+               cb(sock->statichandle, ISC_R_TIMEDOUT, NULL, cbarg);
+       }
+}
+
+/*
+ * Asynchronous 'udpread' call handler: start or resume reading on a socket;
+ * pause reading and call the 'recv' callback after each datagram.
+ */
+void
+isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+
+       REQUIRE(worker->id == isc_nm_tid());
+       if (sock->read_timeout != 0) {
+               if (!sock->timer_initialized) {
+                       uv_timer_init(&worker->loop, &sock->timer);
+                       uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
+                       sock->timer_initialized = true;
+               }
+               uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout,
+                              0);
+               sock->timer_running = true;
+       }
+
+       uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb);
+}
+
+isc_result_t
+isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
+       isc_nmsocket_t *sock = NULL;
+       isc__netievent_startread_t *ievent = NULL;
+
+       REQUIRE(VALID_NMHANDLE(handle));
+       REQUIRE(VALID_NMSOCK(handle->sock));
+       REQUIRE(handle->sock->type == isc_nm_udpsocket);
+
+       sock = handle->sock;
+
+       REQUIRE(sock->tid == isc_nm_tid());
+       sock->recv_cb = cb;
+       sock->recv_cbarg = cbarg;
+
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_udpread);
+       ievent->sock = sock;
+
+       if (sock->tid == isc_nm_tid()) {
+               isc__nm_async_udpread(&sock->mgr->workers[sock->tid],
+                                     (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+
+       return (ISC_R_SUCCESS);
+}
+
+static void
+udp_close_cb(uv_handle_t *uvhandle) {
+       isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
+
+       REQUIRE(VALID_NMSOCK(sock));
+
+       isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
+       atomic_store(&sock->closed, true);
+       isc__nmsocket_prep_destroy(sock);
+}
+
+static void
+timer_close_cb(uv_handle_t *uvhandle) {
+       isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
+
+       REQUIRE(VALID_NMSOCK(sock));
+
+       uv_close(&sock->uv_handle.handle, udp_close_cb);
+}
+
+static void
+udp_close_direct(isc_nmsocket_t *sock) {
+       uv_udp_recv_stop(&sock->uv_handle.udp);
+
+       if (sock->timer_running) {
+               uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
+       }
+
+       if (sock->timer_initialized) {
+               sock->timer_initialized = false;
+               uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
+               uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
+       } else {
+               uv_close(&sock->uv_handle.handle, udp_close_cb);
+       }
+}
+
+void
+isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+
+       REQUIRE(worker->id == ievent->sock->tid);
+
+       udp_close_direct(sock);
+}
+
+void
+isc__nm_udp_close(isc_nmsocket_t *sock) {
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->type == isc_nm_udpsocket);
+
+       if (sock->tid == isc_nm_tid()) {
+               udp_close_direct(sock);
+       } else {
+               isc__netievent_udpclose_t *ievent =
+                       isc__nm_get_ievent(sock->mgr, netievent_udpclose);
+               ievent->sock = sock;
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_udp_cancelread(isc_nmhandle_t *handle) {
+       isc_nmsocket_t *sock = NULL;
+       isc__netievent_udpcancel_t *ievent = NULL;
+
+       REQUIRE(VALID_NMHANDLE(handle));
+
+       sock = handle->sock;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->type == isc_nm_udpsocket);
+
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_udpcancel);
+       ievent->sock = sock;
+       isc_nmhandle_attach(handle, &ievent->handle);
+
+       isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                              (isc__netievent_t *)ievent);
+}
+
+void
+isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc_nmhandle_t *handle = ievent->handle;
+
+       REQUIRE(worker->id == ievent->sock->tid);
+
+       uv_udp_recv_stop(&sock->uv_handle.udp);
+
+       if (atomic_load(&sock->client)) {
+               isc_nm_recv_cb_t cb;
+               void *cbarg = NULL;
+
+               cb = sock->recv_cb;
+               cbarg = sock->recv_cbarg;
+               isc__nmsocket_clearcb(sock);
+
+               if (cb != NULL) {
+                       cb(handle, ISC_R_EOF, NULL, cbarg);
+               }
+       }
+
+       isc_nmhandle_detach(&handle);
+}
index 8ad3b1a0d18c21309f094395f0a9b3d65feb97dd..51964477e66367342f363d155edf7dc1f29ded6b 100644 (file)
@@ -187,3 +187,35 @@ isc_uv_import(uv_stream_t *stream, isc_uv_stream_info_t *info) {
 #endif /* ifdef WIN32 */
 
 #endif /* ifndef HAVE_UV_IMPORT */
+
+#ifndef HAVE_UV_UDP_CONNECT
+int
+isc_uv_udp_connect(uv_udp_t *handle, const struct sockaddr *addr) {
+       int err = 0;
+
+       do {
+               int addrlen = (addr->sa_family == AF_INET)
+                                     ? sizeof(struct sockaddr_in)
+                                     : sizeof(struct sockaddr_in6);
+#ifdef WIN32
+               err = connect(handle->socket, addr, addrlen);
+#else  /* WIN32 */
+               err = connect(handle->io_watcher.fd, addr, addrlen);
+#endif /* WIN32 */
+       } while (err == -1 && errno == EINTR);
+
+       if (err) {
+#ifdef WIN32
+               return (uv_translate_sys_error(err));
+#else /* WIN32 */
+#ifdef HAVE_UV_TRANSLATE_SYS_ERROR
+               return (uv_translate_sys_error(errno));
+#else
+               return (-errno);
+#endif /* HAVE_UV_TRANSLATE_SYS_ERROR */
+#endif /* WIN32 */
+       }
+
+       return (0);
+}
+#endif /* ifndef HAVE_UV_UDP_CONNECT */
index d02ddd3e8e9c5bfff07294053db4e601cfe9769e..334924588cac996b82bbb555db78f9a0b0c58c2a 100644 (file)
@@ -79,3 +79,19 @@ isc_uv_import(uv_stream_t *stream, isc_uv_stream_info_t *info);
  */
 
 #endif
+
+#ifdef HAVE_UV_UDP_CONNECT
+#define isc_uv_udp_connect uv_udp_connect
+#else
+
+int
+isc_uv_udp_connect(uv_udp_t *handle, const struct sockaddr *addr);
+/*%<
+ * Associate the UDP handle to a remote address and port, so every message sent
+ * by this handle is automatically sent to that destination.
+ *
+ * NOTE: This is just a limited shim for uv_udp_connect() as it requires the
+ * handle to be bound.
+ */
+
+#endif
index f1c0a74bf92486081832c62fdded301e9df68697..eb9dbea9b6a43e8c6234641f0f50c7941037b2bb 100644 (file)
@@ -79,6 +79,8 @@ isc___nm_uverr2result(int uverr, bool dolog, const char *file,
                return (ISC_R_ADDRNOTAVAIL);
        case UV_ECONNREFUSED:
                return (ISC_R_CONNREFUSED);
+       case UV_ECANCELED:
+               return (ISC_R_CANCELED);
        default:
                if (dolog) {
                        UNEXPECTED_ERROR(file, line,
index 6ce39f6fef255c52187eb271ee0554f1177222cf..9ee446ca3911528bd5b3b0713f2712f3490521d5 100644 (file)
@@ -459,11 +459,13 @@ isc_nm_setstats
 isc_nm_start
 isc_nm_stoplistening
 isc_nm_tcpconnect
+isc_nm_tcpdnsconnect
 isc_nm_tcp_gettimeouts
 isc_nm_tcp_settimeouts
 isc_nm_tcpdns_keepalive
 isc_nm_tcpdns_sequential
 isc_nm_tid
+isc_nm_udpconnect
 isc_nmsocket_close
 isc__nm_acquire_interlocked
 isc__nm_drop_interlocked