]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Turn all the callback to be always asynchronous
authorOndřej Surý <ondrej@sury.org>
Wed, 11 Nov 2020 09:46:33 +0000 (10:46 +0100)
committerOndřej Surý <ondrej@sury.org>
Wed, 11 Nov 2020 21:15:40 +0000 (22:15 +0100)
When calling the high level netmgr functions, the callback would be
sometimes called synchronously if we catch the failure directly, or
asynchronously if it happens later.  The synchronous call to the
callback could create deadlocks as the caller would not expect the
failed callback to be executed directly.

lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c
lib/isc/netmgr/tls.c
lib/isc/netmgr/udp.c

index 5b63dfc98d024eecb9639f083602ccbbbf9366d4..4b56b1b772d8281f2ad8b08ff9a0a808f7f71cbb 100644 (file)
@@ -172,6 +172,11 @@ typedef enum isc__netievent_type {
        netievent_stop,
        netievent_pause,
 
+       netievent_connectcb,
+       netievent_acceptcb,
+       netievent_readcb,
+       netievent_sendcb,
+
        netievent_prio = 0xff, /* event type values higher than this
                                * will be treated as high-priority
                                * events, which can be processed
@@ -187,6 +192,7 @@ typedef union {
        isc_nm_recv_cb_t recv;
        isc_nm_cb_t send;
        isc_nm_cb_t connect;
+       isc_nm_accept_cb_t accept;
 } isc__nm_cb_t;
 
 /*
@@ -260,6 +266,18 @@ typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t;
 
+typedef struct isc__netievent__socket_req_result {
+       isc__netievent_type type;
+       isc_nmsocket_t *sock;
+       isc__nm_uvreq_t *req;
+       isc_result_t result;
+} isc__netievent__socket_req_result_t;
+
+typedef isc__netievent__socket_req_result_t isc__netievent_connectcb_t;
+typedef isc__netievent__socket_req_result_t isc__netievent_acceptcb_t;
+typedef isc__netievent__socket_req_result_t isc__netievent_readcb_t;
+typedef isc__netievent__socket_req_result_t isc__netievent_sendcb_t;
+
 typedef struct isc__netievent__socket_streaminfo_quota {
        isc__netievent_type type;
        isc_nmsocket_t *sock;
@@ -746,6 +764,48 @@ isc__nmsocket_clearcb(isc_nmsocket_t *sock);
  * Clear the recv and accept callbacks in 'sock'.
  */
 
+void
+isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                 isc_result_t eresult);
+void
+isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0);
+/*%<
+ * Issue a connect callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ */
+
+void
+isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                isc_result_t eresult);
+void
+isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0);
+/*%<
+ * Issue a accept callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ */
+
+void
+isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult);
+void
+isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0);
+
+/*%<
+ * Issue a read callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ *
+ */
+
+void
+isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult);
+void
+isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0);
+/*%<
+ * Issue a write callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ */
+
 void
 isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0);
 /*%<
index b7a8a29d8cd02c8055236934155e9d96bb40da7a..632b636733cd4f9ee08fb841a4c65b771c1eb404 100644 (file)
@@ -700,9 +700,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                        isc__nm_async_tls_do_bio(worker, ievent);
                        break;
 
+               case netievent_connectcb:
+                       isc__nm_async_connectcb(worker, ievent);
+                       break;
+               case netievent_acceptcb:
+                       isc__nm_async_acceptcb(worker, ievent);
+                       break;
+               case netievent_readcb:
+                       isc__nm_async_readcb(worker, ievent);
+                       break;
+               case netievent_sendcb:
+                       isc__nm_async_sendcb(worker, ievent);
+                       break;
                case netievent_closecb:
                        isc__nm_async_closecb(worker, ievent);
                        break;
+
                case netievent_detach:
                        isc__nm_async_detach(worker, ievent);
                        break;
@@ -1644,18 +1657,195 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) {
        }
 }
 
+void
+isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                 isc_result_t eresult) {
+       isc__netievent_connectcb_t *ievent =
+               isc__nm_get_ievent(sock->mgr, netievent_connectcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_connectcb(&sock->mgr->workers[sock->tid],
+                                       (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(ievent->sock->tid == isc_nm_tid());
+       REQUIRE(uvreq->cb.connect != NULL);
+
+       uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                isc_result_t eresult) {
+       isc__netievent_acceptcb_t *ievent =
+               isc__nm_get_ievent(sock->mgr, netievent_acceptcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_acceptcb(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_acceptcb_t *ievent = (isc__netievent_acceptcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(sock->tid == isc_nm_tid());
+       REQUIRE(uvreq->cb.accept != NULL);
+
+       uvreq->cb.accept(uvreq->handle, eresult, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult) {
+       isc__netievent_readcb_t *ievent = isc__nm_get_ievent(sock->mgr,
+                                                            netievent_readcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_readcb(&sock->mgr->workers[sock->tid],
+                                    (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+       isc_region_t region = { .base = (unsigned char *)uvreq->uvbuf.base,
+                               .length = uvreq->uvbuf.len };
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult) {
+       isc__netievent_sendcb_t *ievent = isc__nm_get_ievent(sock->mgr,
+                                                            netievent_sendcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_sendcb(&sock->mgr->workers[sock->tid],
+                                    (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
 void
 isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
 
        REQUIRE(VALID_NMSOCK(ievent->sock));
-       REQUIRE(ievent->sock->tid == isc_nm_tid());
-       REQUIRE(ievent->sock->closehandle_cb != NULL);
+       REQUIRE(sock->tid == isc_nm_tid());
+       REQUIRE(sock->closehandle_cb != NULL);
 
        UNUSED(worker);
 
-       ievent->sock->closehandle_cb(ievent->sock);
-       isc__nmsocket_detach(&ievent->sock);
+       ievent->sock->closehandle_cb(sock);
+       isc__nmsocket_detach(&sock);
 }
 
 void
index 83236e4aeb16d79c03a544ea99901584cde5d1ed..4997d6f8de5180f4cfdbd95265ea2486978f4061 100644 (file)
@@ -49,7 +49,7 @@ can_log_tcp_quota(void) {
        return (false);
 }
 
-static int
+static isc_result_t
 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
 
 static void
@@ -80,6 +80,17 @@ quota_accept_cb(isc_quota_t *quota, void *sock0);
 static void
 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
 
+static void
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult);
+
+static bool
+inactive(isc_nmsocket_t *sock) {
+       return (!isc__nmsocket_active(sock) ||
+               atomic_load(&sock->mgr->closing) ||
+               (sock->server != NULL && !isc__nmsocket_active(sock->server)));
+}
+
 static void
 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
        /*
@@ -126,20 +137,17 @@ failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        }
 
        if (!atomic_load(&sock->connecting)) {
+               isc__nm_uvreq_put(&req, sock);
                return;
        }
-
        atomic_store(&sock->connecting, false);
 
        isc__nmsocket_clearcb(sock);
        if (req->cb.connect != NULL) {
-               req->cb.connect(NULL, eresult, req->cbarg);
+               isc__nm_connectcb(sock, req, eresult);
+       } else {
+               isc__nm_uvreq_put(&req, sock);
        }
-       req->cb.connect = NULL;
-       req->cbarg = NULL;
-
-       isc__nm_uvreq_put(&req, sock);
-       isc__nmsocket_detach(&sock);
 }
 
 static void
@@ -147,16 +155,22 @@ connecttimeout_cb(uv_timer_t *handle) {
        isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle);
        isc_nmsocket_t *sock = req->sock;
 
+       REQUIRE(VALID_UVREQ(req));
+       REQUIRE(VALID_NMHANDLE(req->handle));
        REQUIRE(sock->tid == isc_nm_tid());
 
        failed_connect_cb(sock, req, ISC_R_TIMEDOUT);
+       isc__nmsocket_detach(&sock);
 }
 
-static int
+static isc_result_t
 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        isc__networker_t *worker = NULL;
        int r;
 
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(req));
+
        REQUIRE(isc__nm_in_netthread());
        REQUIRE(sock->tid == isc_nm_tid());
 
@@ -169,11 +183,8 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
                atomic_store(&sock->closing, true);
                atomic_store(&sock->closed, true);
-               atomic_store(&sock->result, isc__nm_uverr2result(r));
-               atomic_store(&sock->connect_error, true);
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
 
        if (req->local.length != 0) {
@@ -181,12 +192,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                if (r != 0) {
                        isc__nm_incstats(sock->mgr,
                                         sock->statsindex[STATID_BINDFAIL]);
-                       atomic_store(&sock->result, isc__nm_uverr2result(r));
-                       atomic_store(&sock->connect_error, true);
-                       failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                        atomic_store(&sock->active, false);
                        isc__nm_tcp_close(sock);
-                       return (r);
+                       return (isc__nm_uverr2result(r));
                }
        }
 
@@ -203,12 +211,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        if (r != 0) {
                isc__nm_incstats(sock->mgr,
                                 sock->statsindex[STATID_CONNECTFAIL]);
-               atomic_store(&sock->result, isc__nm_uverr2result(r));
-               atomic_store(&sock->connect_error, true);
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_tcp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
 
@@ -216,7 +221,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                       0);
        sock->timer_running = true;
 
-       return (0);
+       return (ISC_R_SUCCESS);
 }
 
 void
@@ -225,23 +230,29 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
                (isc__netievent_tcpconnect_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
        isc__nm_uvreq_t *req = ievent->req;
-       int r;
+       isc_result_t result = ISC_R_SUCCESS;
+
+       UNUSED(worker);
 
        REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->type == isc_nm_tcpsocket);
+       REQUIRE(sock->iface != NULL);
+       REQUIRE(sock->parent == NULL);
        REQUIRE(sock->tid == isc_nm_tid());
 
-       UNUSED(worker);
-
-       r = tcp_connect_direct(sock, req);
-       if (r != 0) {
-               LOCK(&sock->lock);
-               SIGNAL(&sock->cond);
-               UNLOCK(&sock->lock);
-               return;
+       req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
+       result = tcp_connect_direct(sock, req);
+       atomic_store(&sock->result, result);
+       if (result == ISC_R_SUCCESS) {
+               atomic_store(&sock->connected, true);
+               /* uvreq will be freed in tcp_connect_cb */
+               /* socket will be detached in tcp_connect_cb */
+       } else {
+               atomic_store(&sock->connect_error, true);
+               isc__nm_uvreq_put(&req, sock);
+               isc__nmsocket_detach(&ievent->sock);
        }
 
-       atomic_store(&sock->connected, true);
-
        LOCK(&sock->lock);
        SIGNAL(&sock->cond);
        UNLOCK(&sock->lock);
@@ -250,28 +261,32 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
 static void
 tcp_connect_cb(uv_connect_t *uvreq, int status) {
        isc_result_t result;
-       isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq);
+       isc__nm_uvreq_t *req = NULL;
        isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
        struct sockaddr_storage ss;
-       isc_nmhandle_t *handle = NULL;
        int r;
 
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->tid == isc_nm_tid());
 
-       if (sock->timer_running) {
-               uv_timer_stop(&sock->timer);
-               sock->timer_running = false;
-       }
-
+       /* We timed out */
        if (!atomic_load(&sock->connecting)) {
                return;
        }
 
+       req = uv_handle_get_data((uv_handle_t *)uvreq);
+
        REQUIRE(VALID_UVREQ(req));
+       REQUIRE(VALID_NMHANDLE(req->handle));
+
+       if (sock->timer_running) {
+               uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
+       }
 
        if (status != 0) {
                failed_connect_cb(sock, req, isc__nm_uverr2result(status));
+               isc__nmsocket_detach(&sock);
                return;
        }
 
@@ -280,6 +295,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
                               &(int){ sizeof(ss) });
        if (r != 0) {
                failed_connect_cb(sock, req, isc__nm_uverr2result(r));
+               isc__nmsocket_detach(&sock);
                return;
        }
 
@@ -288,21 +304,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
        result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
        RUNTIME_CHECK(result == ISC_R_SUCCESS);
 
-       handle = isc__nmhandle_get(sock, NULL, NULL);
-       req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
-
-       isc__nm_uvreq_put(&req, sock);
+       isc__nm_connectcb(sock, req, ISC_R_SUCCESS);
 
        /*
         * The sock is now attached to the handle.
         */
        isc__nmsocket_detach(&sock);
-
-       /*
-        * The connect callback should have attached to the handle.
-        * If it didn't, the socket will be closed now.
-        */
-       isc_nmhandle_detach(&handle);
 }
 
 isc_result_t
@@ -310,7 +317,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                  isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
                  size_t extrahandlesize) {
        isc_result_t result = ISC_R_SUCCESS;
-       isc_nmsocket_t *nsock = NULL, *tmp = NULL;
+       isc_nmsocket_t *sock = NULL, *tmp = NULL;
        isc__netievent_tcpconnect_t *ievent = NULL;
        isc__nm_uvreq_t *req = NULL;
 
@@ -318,50 +325,50 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        REQUIRE(local != NULL);
        REQUIRE(peer != NULL);
 
-       nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
-       isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
+       sock = isc_mem_get(mgr->mctx, sizeof(*sock));
+       isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local);
 
-       nsock->extrahandlesize = extrahandlesize;
-       nsock->connect_timeout = timeout;
+       sock->extrahandlesize = extrahandlesize;
+       sock->connect_timeout = timeout;
 
-       atomic_init(&nsock->result, ISC_R_SUCCESS);
-       atomic_init(&nsock->client, true);
+       atomic_init(&sock->result, ISC_R_SUCCESS);
+       atomic_init(&sock->client, true);
 
-       req = isc__nm_uvreq_get(mgr, nsock);
+       req = isc__nm_uvreq_get(mgr, sock);
        req->cb.connect = cb;
        req->cbarg = cbarg;
        req->peer = peer->addr;
        req->local = local->addr;
 
        ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
-       ievent->sock = nsock;
+       ievent->sock = sock;
        ievent->req = req;
 
        /*
         * Async callbacks can dereference the socket in the meantime,
         * we need to hold an additional reference to it.
         */
-       isc__nmsocket_attach(nsock, &tmp);
+       isc__nmsocket_attach(sock, &tmp);
 
        if (isc__nm_in_netthread()) {
-               nsock->tid = isc_nm_tid();
-               isc__nm_async_tcpconnect(&mgr->workers[nsock->tid],
+               sock->tid = isc_nm_tid();
+               isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
                                         (isc__netievent_t *)ievent);
                isc__nm_put_ievent(mgr, ievent);
        } else {
-               nsock->tid = isc_random_uniform(mgr->nworkers);
-               isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
+               sock->tid = isc_random_uniform(mgr->nworkers);
+               isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
                                       (isc__netievent_t *)ievent);
 
-               LOCK(&nsock->lock);
-               while (!atomic_load(&nsock->connected) &&
-                      !atomic_load(&nsock->connect_error)) {
-                       WAIT(&nsock->cond, &nsock->lock);
+               LOCK(&sock->lock);
+               while (!atomic_load(&sock->connected) &&
+                      !atomic_load(&sock->connect_error)) {
+                       WAIT(&sock->cond, &sock->lock);
                }
-               UNLOCK(&nsock->lock);
+               UNLOCK(&sock->lock);
        }
 
-       result = atomic_load(&nsock->result);
+       result = atomic_load(&sock->result);
 
        isc__nmsocket_detach(&tmp);
 
@@ -581,13 +588,11 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__netievent_tcpchildaccept_t *ievent =
                (isc__netievent_tcpchildaccept_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
-       isc_nmhandle_t *handle;
        isc_result_t result;
+       isc__nm_uvreq_t *req = NULL;
        struct sockaddr_storage ss;
        isc_sockaddr_t local;
        int r;
-       isc_nm_accept_cb_t accept_cb;
-       void *accept_cbarg;
 
        REQUIRE(isc__nm_in_netthread());
        REQUIRE(sock->tid == isc_nm_tid());
@@ -650,25 +655,22 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
        }
        sock->accepting = false;
 
-       handle = isc__nmhandle_get(sock, NULL, &local);
-
        INSIST(sock->accept_cb != NULL);
-       accept_cb = sock->accept_cb;
-       accept_cbarg = sock->accept_cbarg;
 
        sock->read_timeout = sock->mgr->init;
-       accept_cb(handle, ISC_R_SUCCESS, accept_cbarg);
+
+       req = isc__nm_uvreq_get(sock->mgr, sock);
+       req->handle = isc__nmhandle_get(sock, NULL, &local);
+       req->cb.accept = sock->accept_cb;
+       req->cbarg = sock->accept_cbarg;
+
+       isc__nm_acceptcb(sock, req, ISC_R_SUCCESS);
 
        /*
         * sock is now attached to the handle.
         */
        isc__nmsocket_detach(&sock);
 
-       /*
-        * The accept callback should have attached to the handle.
-        * If it didn't, the socket will be closed now.
-        */
-       isc_nmhandle_detach(&handle);
        return;
 
 error:
@@ -733,9 +735,6 @@ tcp_listenclose_cb(uv_handle_t *handle) {
 
 static void
 failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
-       isc_nm_recv_cb_t cb;
-       void *cbarg = NULL;
-
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->statichandle != NULL);
 
@@ -750,12 +749,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
 
        uv_read_stop(&sock->uv_handle.stream);
 
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-       isc__nmsocket_clearcb(sock);
+       if (sock->recv_cb != NULL) {
+               isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock);
+               isc_nmhandle_attach(sock->statichandle, &req->handle);
+               req->cb.recv = sock->recv_cb;
+               req->cbarg = sock->recv_cbarg;
+
+               isc__nmsocket_clearcb(sock);
+
+               isc__nm_readcb(sock, req, result);
+       }
+}
+
+static void
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult) {
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(req));
 
-       if (cb != NULL) {
-               cb(sock->statichandle, result, NULL, cbarg);
+       if (req->cb.send != NULL) {
+               isc__nm_sendcb(sock, req, eresult);
+       } else {
+               isc__nm_uvreq_put(&req, sock);
        }
 }
 
@@ -790,29 +805,17 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        REQUIRE(VALID_NMHANDLE(handle));
        REQUIRE(VALID_NMSOCK(handle->sock));
 
-       if (!isc__nmsocket_active(sock)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
+       sock->recv_cb = cb;
+       sock->recv_cbarg = cbarg;
 
-       if (atomic_load(&sock->mgr->closing)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
+       if (inactive(sock)) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
+               failed_read_cb(sock, ISC_R_CANCELED);
                return;
        }
 
        REQUIRE(sock->tid == isc_nm_tid());
 
-       sock->recv_cb = cb;
-       sock->recv_cbarg = cbarg;
-
        sock->read_timeout = (atomic_load(&sock->keepalive)
                                      ? sock->mgr->keepalive
                                      : sock->mgr->idle);
@@ -864,17 +867,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
 
        REQUIRE(worker->id == isc_nm_tid());
 
-       if (!isc__nmsocket_active(sock)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                failed_read_cb(sock, ISC_R_CANCELED);
                return;
        }
@@ -982,13 +975,22 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
        REQUIRE(buf != NULL);
 
        if (nread >= 0) {
-               isc_region_t region = { .base = (unsigned char *)buf->base,
-                                       .length = nread };
-               isc_nm_recv_cb_t cb = sock->recv_cb;
-               void *cbarg = sock->recv_cbarg;
-
-               if (cb != NULL) {
-                       cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg);
+               if (sock->recv_cb != NULL) {
+                       isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr,
+                                                                sock);
+                       req->cb.recv = sock->recv_cb;
+                       req->cbarg = sock->recv_cbarg;
+                       isc_nmhandle_attach(sock->statichandle, &req->handle);
+
+                       /*
+                        * The callback will be called synchronously because the
+                        * result is ISC_R_SUCCESS, so we don't need to retain
+                        * the buffer
+                        */
+                       req->uvbuf.base = buf->base;
+                       req->uvbuf.len = nread;
+
+                       isc__nm_readcb(sock, req, ISC_R_SUCCESS);
                }
 
                if (sock->timer_initialized && sock->read_timeout != 0) {
@@ -1183,24 +1185,6 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
 
        REQUIRE(sock->type == isc_nm_tcpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
-
        uvreq = isc__nm_uvreq_get(sock->mgr, sock);
        uvreq->uvbuf.base = (char *)region->base;
        uvreq->uvbuf.len = region->length;
@@ -1210,6 +1194,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
        uvreq->cb.send = cb;
        uvreq->cbarg = cbarg;
 
+       if (inactive(sock)) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
+               failed_send_cb(sock, uvreq, ISC_R_CANCELED);
+               return;
+       }
+
        if (sock->tid == isc_nm_tid()) {
                /*
                 * If we're in the same thread as the socket we can send the
@@ -1219,8 +1209,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                if (result != ISC_R_SUCCESS) {
                        isc__nm_incstats(sock->mgr,
                                         sock->statsindex[STATID_SENDFAIL]);
-                       uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-                       isc__nm_uvreq_put(&uvreq, sock);
+                       failed_send_cb(sock, uvreq, result);
                }
        } else {
                /*
@@ -1284,13 +1273,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        REQUIRE(sock->tid == isc_nm_tid());
        REQUIRE(sock->type == isc_nm_tcpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
-               return (ISC_R_CANCELED);
-       }
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               return (ISC_R_CANCELED);
-       }
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                return (ISC_R_CANCELED);
        }
 
@@ -1408,11 +1391,6 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
        }
 
        if (atomic_load(&sock->connecting)) {
-               if (sock->timer_initialized) {
-                       isc__nm_uvreq_t *req =
-                               uv_handle_get_data((uv_handle_t *)&sock->timer);
-                       failed_connect_cb(sock, req, ISC_R_CANCELED);
-               }
                return;
        }
 
index 2a1596d0e8c3d0a7998eb5bc405c08d76afd5b7c..f742c64a654b2e575c90072cca9a5b8ecc91f467 100644 (file)
@@ -787,11 +787,6 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
 
        isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
 
-       if (result != ISC_R_SUCCESS) {
-               cb(NULL, result, cbarg);
-               return;
-       }
-
        dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock));
        isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket,
                           handle->sock->iface);
@@ -807,6 +802,13 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
 
        readhandle = isc__nmhandle_get(dnssock, NULL, NULL);
 
+       if (result != ISC_R_SUCCESS) {
+               cb(readhandle, result, cbarg);
+               isc__nmsocket_detach(&dnssock);
+               isc_nmhandle_detach(&readhandle);
+               return;
+       }
+
        INSIST(dnssock->statichandle != NULL);
        INSIST(dnssock->statichandle == readhandle);
        INSIST(readhandle->sock == dnssock);
@@ -838,20 +840,26 @@ isc_result_t
 isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                     isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
                     size_t extrahandlesize) {
-       tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t));
+       isc_result_t result = ISC_R_SUCCESS;
+       tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(*conn));
 
        *conn = (tcpconnect_t){ .cb = cb,
                                .cbarg = cbarg,
                                .extrahandlesize = extrahandlesize };
        isc_mem_attach(mgr->mctx, &conn->mctx);
-       return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
-                                 timeout, 0));
+       result = isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
+                                  timeout, 0);
+       if (result != ISC_R_SUCCESS) {
+               isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
+       }
+       return (result);
 }
 
 isc_result_t
 isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                     isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
                     size_t extrahandlesize) {
+       isc_result_t result = ISC_R_SUCCESS;
        tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t));
        SSL_CTX *ctx = NULL;
 
@@ -861,9 +869,12 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        isc_mem_attach(mgr->mctx, &conn->mctx);
 
        ctx = SSL_CTX_new(SSLv23_client_method());
-       isc_result_t result = isc_nm_tlsconnect(
-               mgr, local, peer, tcpdnsconnect_cb, conn, ctx, timeout, 0);
+       result = isc_nm_tlsconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
+                                  ctx, timeout, 0);
        SSL_CTX_free(ctx);
+       if (result != ISC_R_SUCCESS) {
+               isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
+       }
        return (result);
 }
 
index acaaeb1ad5b4f01f671f9e9266238726922e6214..fdface5aa27cb12aaf7feb116798a02c960dd409 100644 (file)
@@ -79,6 +79,7 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
        UNUSED(handle);
        /*  XXXWPK TODO */
        UNUSED(eresult);
+
        isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base,
                    sock->tls.senddata.length);
        sock->tls.senddata = (isc_region_t){ NULL, 0 };
@@ -701,7 +702,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        REQUIRE(VALID_NMSOCK(tlssock));
 
        if (result != ISC_R_SUCCESS) {
-               tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
+               tlssock->connect_cb(handle, result, tlssock->connect_cbarg);
                atomic_store(&tlssock->result, result);
                atomic_store(&tlssock->connect_error, true);
                tls_close_direct(tlssock);
@@ -714,7 +715,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        isc_nmhandle_attach(handle, &tlssock->outerhandle);
        result = initialize_tls(tlssock, false);
        if (result != ISC_R_SUCCESS) {
-               tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
+               tlssock->connect_cb(handle, result, tlssock->connect_cbarg);
                atomic_store(&tlssock->result, result);
                atomic_store(&tlssock->connect_error, true);
                tls_close_direct(tlssock);
@@ -742,6 +743,7 @@ isc__nm_async_tlsconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
                                   tls_connect_cb, tlssock,
                                   tlssock->connect_timeout, 0);
        if (result != ISC_R_SUCCESS) {
+               /* FIXME: We need to pass valid handle */
                tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
                atomic_store(&tlssock->result, result);
                atomic_store(&tlssock->connect_error, true);
index 9f8a37340e8c76fecef924e24fa22b1c78c38632..ecaf66046a0d7aa76fabd714033e0bdb918a1471 100644 (file)
@@ -51,8 +51,15 @@ static void
 failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
 
 static void
-failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
-                 isc_result_t eresult);
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult);
+
+static bool
+inactive(isc_nmsocket_t *sock) {
+       return (!isc__nmsocket_active(sock) ||
+               atomic_load(&sock->mgr->closing) ||
+               (sock->server != NULL && !isc__nmsocket_active(sock->server)));
+}
 
 isc_result_t
 isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
@@ -339,14 +346,11 @@ static void
 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
            const struct sockaddr *addr, unsigned flags) {
        isc_result_t result;
-       isc_nmhandle_t *nmhandle = NULL;
        isc_sockaddr_t sockaddr;
        isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
-       isc_region_t region;
+       isc__nm_uvreq_t *req = NULL;
        uint32_t maxudp;
        bool free_buf;
-       isc_nm_recv_cb_t cb;
-       void *cbarg;
 
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->tid == isc_nm_tid());
@@ -385,38 +389,35 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
                goto done;
        }
 
-       region.base = (unsigned char *)buf->base;
-       region.length = nrecv;
-
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-
        if (sock->timer_running) {
                uv_timer_stop(&sock->timer);
                sock->timer_running = false;
        }
 
+       req = isc__nm_uvreq_get(sock->mgr, sock);
+       req->cb.recv = sock->recv_cb;
+       req->cbarg = sock->recv_cbarg;
+       /*
+        * The callback will be called synchronously, because result is
+        * ISC_R_SUCCESS.
+        */
+       req->uvbuf.base = buf->base;
+       req->uvbuf.len = nrecv;
+
        if (atomic_load(&sock->client)) {
                if (nrecv < 0) {
                        failed_read_cb(sock, isc__nm_uverr2result(nrecv));
                        return;
                }
 
-               cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg);
+               isc_nmhandle_attach(sock->statichandle, &req->handle);
        } else {
                result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
                RUNTIME_CHECK(result == ISC_R_SUCCESS);
 
-               nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
-
-               cb(nmhandle, ISC_R_SUCCESS, &region, cbarg);
-
-               /*
-                * If the recv callback wants to hold on to the handle,
-                * it needs to attach to it.
-                */
-               isc_nmhandle_detach(&nmhandle);
+               req->handle = isc__nmhandle_get(sock, &sockaddr, NULL);
        }
+       isc__nm_readcb(sock, req, ISC_R_SUCCESS);
 
 done:
        if (free_buf) {
@@ -440,21 +441,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
        uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
        int ntid;
 
-       if (!isc__nmsocket_active(sock)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
+       uvreq = isc__nm_uvreq_get(sock->mgr, sock);
+       uvreq->uvbuf.base = (char *)region->base;
+       uvreq->uvbuf.len = region->length;
 
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
+       isc_nmhandle_attach(handle, &uvreq->handle);
 
-       if (atomic_load(&sock->mgr->closing)) {
+       uvreq->cb.send = cb;
+       uvreq->cbarg = cbarg;
+
+       if (inactive(sock)) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
+               failed_send_cb(sock, uvreq, ISC_R_CANCELED);
                return;
        }
 
@@ -467,6 +465,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
         * we need to do so here.
         */
        if (maxudp != 0 && region->length > maxudp) {
+               isc__nm_uvreq_put(&uvreq, sock);
                isc_nmhandle_detach(&handle);
                return;
        }
@@ -499,15 +498,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                rsock = &psock->children[ntid];
        }
 
-       uvreq = isc__nm_uvreq_get(sock->mgr, sock);
-       uvreq->uvbuf.base = (char *)region->base;
-       uvreq->uvbuf.len = region->length;
-
-       isc_nmhandle_attach(handle, &uvreq->handle);
-
-       uvreq->cb.send = cb;
-       uvreq->cbarg = cbarg;
-
        if (isc_nm_tid() == rsock->tid) {
                /*
                 * If we're in the same thread as the socket we can send
@@ -518,8 +508,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                if (result != ISC_R_SUCCESS) {
                        isc__nm_incstats(rsock->mgr,
                                         rsock->statsindex[STATID_SENDFAIL]);
-                       uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-                       isc__nm_uvreq_put(&uvreq, sock);
+                       failed_send_cb(rsock, uvreq, result);
                }
        } else {
                /*
@@ -549,23 +538,21 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
        REQUIRE(worker->id == sock->tid);
 
        if (!isc__nmsocket_active(ievent->sock)) {
-               uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq, sock);
+               failed_send_cb(sock, uvreq, ISC_R_CANCELED);
                return;
        }
 
        result = udp_send_direct(sock, uvreq, &ievent->peer);
        if (result != ISC_R_SUCCESS) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq, sock);
+               failed_send_cb(sock, uvreq, result);
        }
 }
 
 static void
 udp_send_cb(uv_udp_send_t *req, int status) {
        isc_result_t result = ISC_R_SUCCESS;
-       isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
+       isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req);
        isc_nmsocket_t *sock = uvreq->sock;
 
        REQUIRE(VALID_UVREQ(uvreq));
@@ -576,8 +563,7 @@ udp_send_cb(uv_udp_send_t *req, int status) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
        }
 
-       uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-       isc__nm_uvreq_put(&uvreq, uvreq->sock);
+       isc__nm_sendcb(sock, uvreq, result);
 }
 
 /*
@@ -595,13 +581,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        REQUIRE(sock->tid == isc_nm_tid());
        REQUIRE(sock->type == isc_nm_udpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
-               return (ISC_R_CANCELED);
-       }
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               return (ISC_R_CANCELED);
-       }
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                return (ISC_R_CANCELED);
        }
 
@@ -625,7 +605,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        return (ISC_R_SUCCESS);
 }
 
-static int
+static isc_result_t
 udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        isc__networker_t *worker = NULL;
        int uv_bind_flags = UV_UDP_REUSEADDR;
@@ -644,11 +624,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                /* Socket was never opened; no need for isc__nm_udp_close() */
                atomic_store(&sock->closing, true);
                atomic_store(&sock->closed, true);
-               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->connect_error, true);
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
 
        r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
@@ -656,10 +634,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
                atomic_store(&sock->connect_error, true);
                atomic_store(&sock->result, isc__nm_uverr2result(r));
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_udp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
 
@@ -673,10 +650,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
                atomic_store(&sock->connect_error, true);
                atomic_store(&sock->result, isc__nm_uverr2result(r));
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_udp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
 
        uv_handle_set_data(&sock->uv_handle.handle, sock);
@@ -687,10 +663,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                                 sock->statsindex[STATID_CONNECTFAIL]);
                atomic_store(&sock->connect_error, true);
                atomic_store(&sock->result, isc__nm_uverr2result(r));
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_udp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
        atomic_store(&sock->connecting, false);
@@ -703,7 +678,7 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        uv_send_buffer_size(&sock->uv_handle.handle,
                            &(int){ ISC_SEND_BUFFER_SIZE });
 #endif
-       return (0);
+       return (ISC_R_SUCCESS);
 }
 
 /*
@@ -716,37 +691,27 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
                (isc__netievent_udpconnect_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
        isc__nm_uvreq_t *req = ievent->req;
-       isc_nmhandle_t *handle = NULL;
-       isc_nm_cb_t cb;
-       void *cbarg;
-       int r;
        isc_result_t result;
 
        UNUSED(worker);
 
+       REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->type == isc_nm_udpsocket);
        REQUIRE(sock->iface != NULL);
        REQUIRE(sock->parent == NULL);
        REQUIRE(sock->tid == isc_nm_tid());
 
-       cb = sock->connect_cb;
-       cbarg = sock->connect_cbarg;
-
-       r = udp_connect_direct(sock, req);
-       if (r != 0) {
-               LOCK(&sock->lock);
-               SIGNAL(&sock->cond);
-               UNLOCK(&sock->lock);
-               return;
+       req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
+       result = udp_connect_direct(sock, req);
+       atomic_store(&sock->result, result);
+       if (result == ISC_R_SUCCESS) {
+               atomic_store(&sock->connected, true);
+               isc__nm_connectcb(sock, req, ISC_R_SUCCESS);
+       } else {
+               atomic_store(&sock->connect_error, true);
+               isc__nm_uvreq_put(&req, sock);
        }
 
-       atomic_store(&sock->connected, true);
-       atomic_store(&sock->result, ISC_R_SUCCESS);
-       result = atomic_load(&sock->result);
-
-       handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
-       cb(handle, result, cbarg);
-
        LOCK(&sock->lock);
        SIGNAL(&sock->cond);
        UNLOCK(&sock->lock);
@@ -755,12 +720,6 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
         * The sock is now attached to the handle.
         */
        isc__nmsocket_detach(&sock);
-
-       /*
-        * The connect callback should have attached to the handle.
-        * If it didn't, the socket will be closed now.
-        */
-       isc_nmhandle_detach(&handle);
 }
 
 isc_result_t
@@ -834,7 +793,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                isc__nm_async_udpconnect(&mgr->workers[sock->tid],
                                         (isc__netievent_t *)event);
                isc__nm_put_ievent(mgr, event);
-               isc__nm_uvreq_put(&req, sock);
        } else {
                sock->tid = isc_random_uniform(mgr->nworkers);
                isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
@@ -846,7 +804,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                        WAIT(&sock->cond, &sock->lock);
                }
                UNLOCK(&sock->lock);
-               isc__nm_uvreq_put(&req, sock);
        }
 
        result = atomic_load(&sock->result);
@@ -867,9 +824,6 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
 
 static void
 failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
-       isc_nm_recv_cb_t cb;
-       void *cbarg = NULL;
-
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->statichandle != NULL);
 
@@ -880,12 +834,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
 
        uv_udp_recv_stop(&sock->uv_handle.udp);
 
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-       isc__nmsocket_clearcb(sock);
+       if (sock->recv_cb != NULL) {
+               isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock);
+               isc_nmhandle_attach(sock->statichandle, &req->handle);
+               req->cb.recv = sock->recv_cb;
+               req->cbarg = sock->recv_cbarg;
+
+               isc__nmsocket_clearcb(sock);
+
+               isc__nm_readcb(sock, req, result);
+       }
+}
+
+static void
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult) {
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(req));
 
-       if (cb != NULL) {
-               cb(sock->statichandle, result, NULL, cbarg);
+       if (req->cb.send != NULL) {
+               isc__nm_sendcb(sock, req, eresult);
+       } else {
+               isc__nm_uvreq_put(&req, sock);
        }
 }
 
@@ -911,17 +881,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
 
-       if (!isc__nmsocket_active(sock)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                failed_read_cb(sock, ISC_R_CANCELED);
                return;
        }
@@ -950,24 +910,12 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        REQUIRE(VALID_NMSOCK(handle->sock));
        REQUIRE(handle->sock->type == isc_nm_udpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
+       if (inactive(sock)) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
                cb(handle, ISC_R_CANCELED, NULL, cbarg);
                return;
        }
 
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
-
        REQUIRE(sock->tid == isc_nm_tid());
        sock->recv_cb = cb;
        sock->recv_cbarg = cbarg;
@@ -1059,35 +1007,6 @@ isc__nm_udp_close(isc_nmsocket_t *sock) {
        }
 }
 
-static void
-failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
-                 isc_result_t eresult) {
-       REQUIRE(sock->tid == isc_nm_tid());
-
-       if (sock->timer_running) {
-               uv_timer_stop(&sock->timer);
-               sock->timer_running = false;
-       }
-
-       if (!atomic_load(&sock->connecting)) {
-               return;
-       }
-
-       atomic_store(&sock->connecting, false);
-
-       INSIST(req != NULL);
-
-       isc__nmsocket_clearcb(sock);
-
-       if (req->cb.connect != NULL) {
-               req->cb.connect(NULL, eresult, req->cbarg);
-       }
-       req->cb.connect = NULL;
-       req->cbarg = NULL;
-
-       isc__nmsocket_detach(&sock);
-}
-
 void
 isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
        REQUIRE(VALID_NMSOCK(sock));
@@ -1098,11 +1017,6 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
        }
 
        if (atomic_load(&sock->connecting)) {
-               if (sock->timer_initialized) {
-                       isc__nm_uvreq_t *req =
-                               uv_handle_get_data((uv_handle_t *)&sock->timer);
-                       failed_connect_cb(sock, req, ISC_R_CANCELED);
-               }
                return;
        }