]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Turn all the callback to be always asynchronous
authorOndřej Surý <ondrej@sury.org>
Wed, 11 Nov 2020 09:46:33 +0000 (10:46 +0100)
committerOndřej Surý <ondrej@sury.org>
Wed, 9 Dec 2020 09:46:16 +0000 (10:46 +0100)
When calling the high level netmgr functions, the callback would be
sometimes called synchronously if we catch the failure directly, or
asynchronously if it happens later.  The synchronous call to the
callback could create deadlocks as the caller would not expect the
failed callback to be executed directly.

(cherry picked from commit a49d88568fef290f1ad3ce5656d0e30d41d61606)

lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c
lib/isc/netmgr/tls.c
lib/isc/netmgr/udp.c

index 624bfb224491ab96fe3d497fd3991f49308e6712..e23efb20ab202e5d6131ccaace80948b10b1b6e1 100644 (file)
@@ -172,6 +172,11 @@ typedef enum isc__netievent_type {
        netievent_stop,
        netievent_pause,
 
+       netievent_connectcb,
+       netievent_acceptcb,
+       netievent_readcb,
+       netievent_sendcb,
+
        netievent_prio = 0xff, /* event type values higher than this
                                * will be treated as high-priority
                                * events, which can be processed
@@ -187,6 +192,7 @@ typedef union {
        isc_nm_recv_cb_t recv;
        isc_nm_cb_t send;
        isc_nm_cb_t connect;
+       isc_nm_accept_cb_t accept;
 } isc__nm_cb_t;
 
 /*
@@ -260,6 +266,18 @@ typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t;
 
+typedef struct isc__netievent__socket_req_result {
+       isc__netievent_type type;
+       isc_nmsocket_t *sock;
+       isc__nm_uvreq_t *req;
+       isc_result_t result;
+} isc__netievent__socket_req_result_t;
+
+typedef isc__netievent__socket_req_result_t isc__netievent_connectcb_t;
+typedef isc__netievent__socket_req_result_t isc__netievent_acceptcb_t;
+typedef isc__netievent__socket_req_result_t isc__netievent_readcb_t;
+typedef isc__netievent__socket_req_result_t isc__netievent_sendcb_t;
+
 typedef struct isc__netievent__socket_streaminfo_quota {
        isc__netievent_type type;
        isc_nmsocket_t *sock;
@@ -750,6 +768,48 @@ isc__nmsocket_clearcb(isc_nmsocket_t *sock);
  * Clear the recv and accept callbacks in 'sock'.
  */
 
+void
+isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                 isc_result_t eresult);
+void
+isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0);
+/*%<
+ * Issue a connect callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ */
+
+void
+isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                isc_result_t eresult);
+void
+isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0);
+/*%<
+ * Issue a accept callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ */
+
+void
+isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult);
+void
+isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0);
+
+/*%<
+ * Issue a read callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ *
+ */
+
+void
+isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult);
+void
+isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0);
+/*%<
+ * Issue a write callback on the socket, used to call the callback
+ * on failed conditions when the event can't be scheduled on the uv loop.
+ */
+
 void
 isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0);
 /*%<
index 404b7d1636d6d60065bdda331c3b8dab64c871f3..d834500db0d4f39ad27e1fef1d431687bb0082f4 100644 (file)
@@ -700,9 +700,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
                        isc__nm_async_tls_do_bio(worker, ievent);
                        break;
 
+               case netievent_connectcb:
+                       isc__nm_async_connectcb(worker, ievent);
+                       break;
+               case netievent_acceptcb:
+                       isc__nm_async_acceptcb(worker, ievent);
+                       break;
+               case netievent_readcb:
+                       isc__nm_async_readcb(worker, ievent);
+                       break;
+               case netievent_sendcb:
+                       isc__nm_async_sendcb(worker, ievent);
+                       break;
                case netievent_closecb:
                        isc__nm_async_closecb(worker, ievent);
                        break;
+
                case netievent_detach:
                        isc__nm_async_detach(worker, ievent);
                        break;
@@ -1644,18 +1657,195 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) {
        }
 }
 
+void
+isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                 isc_result_t eresult) {
+       isc__netievent_connectcb_t *ievent =
+               isc__nm_get_ievent(sock->mgr, netievent_connectcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_connectcb(&sock->mgr->workers[sock->tid],
+                                       (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(ievent->sock->tid == isc_nm_tid());
+       REQUIRE(uvreq->cb.connect != NULL);
+
+       uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+                isc_result_t eresult) {
+       isc__netievent_acceptcb_t *ievent =
+               isc__nm_get_ievent(sock->mgr, netievent_acceptcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_acceptcb(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_acceptcb_t *ievent = (isc__netievent_acceptcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(sock->tid == isc_nm_tid());
+       REQUIRE(uvreq->cb.accept != NULL);
+
+       uvreq->cb.accept(uvreq->handle, eresult, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult) {
+       isc__netievent_readcb_t *ievent = isc__nm_get_ievent(sock->mgr,
+                                                            netievent_readcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_readcb(&sock->mgr->workers[sock->tid],
+                                    (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+       isc_region_t region = { .base = (unsigned char *)uvreq->uvbuf.base,
+                               .length = uvreq->uvbuf.len };
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
+void
+isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
+              isc_result_t eresult) {
+       isc__netievent_sendcb_t *ievent = isc__nm_get_ievent(sock->mgr,
+                                                            netievent_sendcb);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+
+       ievent->sock = sock;
+       ievent->req = uvreq;
+       ievent->result = eresult;
+
+       if (eresult == ISC_R_SUCCESS) {
+               isc__nm_async_sendcb(&sock->mgr->workers[sock->tid],
+                                    (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
+       } else {
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *)ievent);
+       }
+}
+
+void
+isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) {
+       isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
+       isc__nm_uvreq_t *uvreq = ievent->req;
+       isc_result_t eresult = ievent->result;
+
+       UNUSED(worker);
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(uvreq));
+       REQUIRE(VALID_NMHANDLE(uvreq->handle));
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg);
+
+       isc__nm_uvreq_put(&uvreq, sock);
+}
+
 void
 isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0;
+       isc_nmsocket_t *sock = ievent->sock;
 
        REQUIRE(VALID_NMSOCK(ievent->sock));
-       REQUIRE(ievent->sock->tid == isc_nm_tid());
-       REQUIRE(ievent->sock->closehandle_cb != NULL);
+       REQUIRE(sock->tid == isc_nm_tid());
+       REQUIRE(sock->closehandle_cb != NULL);
 
        UNUSED(worker);
 
-       ievent->sock->closehandle_cb(ievent->sock);
-       isc__nmsocket_detach(&ievent->sock);
+       ievent->sock->closehandle_cb(sock);
+       isc__nmsocket_detach(&sock);
 }
 
 void
index 448ffc5f7c0725b8dc94e74daa8a5ca1ebb1a331..af5ebe2e94b879880f4ba69eb2d1bb0083f08351 100644 (file)
@@ -49,7 +49,7 @@ can_log_tcp_quota() {
        return (false);
 }
 
-static int
+static isc_result_t
 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
 
 static void
@@ -80,6 +80,17 @@ quota_accept_cb(isc_quota_t *quota, void *sock0);
 static void
 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
 
+static void
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult);
+
+static bool
+inactive(isc_nmsocket_t *sock) {
+       return (!isc__nmsocket_active(sock) ||
+               atomic_load(&sock->mgr->closing) ||
+               (sock->server != NULL && !isc__nmsocket_active(sock->server)));
+}
+
 static void
 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
        /*
@@ -126,20 +137,17 @@ failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        }
 
        if (!atomic_load(&sock->connecting)) {
+               isc__nm_uvreq_put(&req, sock);
                return;
        }
-
        atomic_store(&sock->connecting, false);
 
        isc__nmsocket_clearcb(sock);
        if (req->cb.connect != NULL) {
-               req->cb.connect(NULL, eresult, req->cbarg);
+               isc__nm_connectcb(sock, req, eresult);
+       } else {
+               isc__nm_uvreq_put(&req, sock);
        }
-       req->cb.connect = NULL;
-       req->cbarg = NULL;
-
-       isc__nm_uvreq_put(&req, sock);
-       isc__nmsocket_detach(&sock);
 }
 
 static void
@@ -147,16 +155,22 @@ connecttimeout_cb(uv_timer_t *handle) {
        isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle);
        isc_nmsocket_t *sock = req->sock;
 
+       REQUIRE(VALID_UVREQ(req));
+       REQUIRE(VALID_NMHANDLE(req->handle));
        REQUIRE(sock->tid == isc_nm_tid());
 
        failed_connect_cb(sock, req, ISC_R_TIMEDOUT);
+       isc__nmsocket_detach(&sock);
 }
 
-static int
+static isc_result_t
 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        isc__networker_t *worker = NULL;
        int r;
 
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(req));
+
        REQUIRE(isc__nm_in_netthread());
        REQUIRE(sock->tid == isc_nm_tid());
 
@@ -169,11 +183,8 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
                atomic_store(&sock->closing, true);
                atomic_store(&sock->closed, true);
-               atomic_store(&sock->result, isc__nm_uverr2result(r));
-               atomic_store(&sock->connect_error, true);
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
 
        if (req->local.length != 0) {
@@ -181,12 +192,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                if (r != 0) {
                        isc__nm_incstats(sock->mgr,
                                         sock->statsindex[STATID_BINDFAIL]);
-                       atomic_store(&sock->result, isc__nm_uverr2result(r));
-                       atomic_store(&sock->connect_error, true);
-                       failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                        atomic_store(&sock->active, false);
                        isc__nm_tcp_close(sock);
-                       return (r);
+                       return (isc__nm_uverr2result(r));
                }
        }
 
@@ -203,12 +211,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        if (r != 0) {
                isc__nm_incstats(sock->mgr,
                                 sock->statsindex[STATID_CONNECTFAIL]);
-               atomic_store(&sock->result, isc__nm_uverr2result(r));
-               atomic_store(&sock->connect_error, true);
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_tcp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
 
@@ -216,7 +221,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                       0);
        sock->timer_running = true;
 
-       return (0);
+       return (ISC_R_SUCCESS);
 }
 
 void
@@ -225,23 +230,29 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
                (isc__netievent_tcpconnect_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
        isc__nm_uvreq_t *req = ievent->req;
-       int r;
+       isc_result_t result = ISC_R_SUCCESS;
+
+       UNUSED(worker);
 
        REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->type == isc_nm_tcpsocket);
+       REQUIRE(sock->iface != NULL);
+       REQUIRE(sock->parent == NULL);
        REQUIRE(sock->tid == isc_nm_tid());
 
-       UNUSED(worker);
-
-       r = tcp_connect_direct(sock, req);
-       if (r != 0) {
-               LOCK(&sock->lock);
-               SIGNAL(&sock->cond);
-               UNLOCK(&sock->lock);
-               return;
+       req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
+       result = tcp_connect_direct(sock, req);
+       atomic_store(&sock->result, result);
+       if (result == ISC_R_SUCCESS) {
+               atomic_store(&sock->connected, true);
+               /* uvreq will be freed in tcp_connect_cb */
+               /* socket will be detached in tcp_connect_cb */
+       } else {
+               atomic_store(&sock->connect_error, true);
+               isc__nm_uvreq_put(&req, sock);
+               isc__nmsocket_detach(&ievent->sock);
        }
 
-       atomic_store(&sock->connected, true);
-
        LOCK(&sock->lock);
        SIGNAL(&sock->cond);
        UNLOCK(&sock->lock);
@@ -250,28 +261,32 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
 static void
 tcp_connect_cb(uv_connect_t *uvreq, int status) {
        isc_result_t result;
-       isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq);
+       isc__nm_uvreq_t *req = NULL;
        isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
        struct sockaddr_storage ss;
-       isc_nmhandle_t *handle = NULL;
        int r;
 
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->tid == isc_nm_tid());
 
-       if (sock->timer_running) {
-               uv_timer_stop(&sock->timer);
-               sock->timer_running = false;
-       }
-
+       /* We timed out */
        if (!atomic_load(&sock->connecting)) {
                return;
        }
 
+       req = uv_handle_get_data((uv_handle_t *)uvreq);
+
        REQUIRE(VALID_UVREQ(req));
+       REQUIRE(VALID_NMHANDLE(req->handle));
+
+       if (sock->timer_running) {
+               uv_timer_stop(&sock->timer);
+               sock->timer_running = false;
+       }
 
        if (status != 0) {
                failed_connect_cb(sock, req, isc__nm_uverr2result(status));
+               isc__nmsocket_detach(&sock);
                return;
        }
 
@@ -280,6 +295,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
                               &(int){ sizeof(ss) });
        if (r != 0) {
                failed_connect_cb(sock, req, isc__nm_uverr2result(r));
+               isc__nmsocket_detach(&sock);
                return;
        }
 
@@ -288,21 +304,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
        result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
        RUNTIME_CHECK(result == ISC_R_SUCCESS);
 
-       handle = isc__nmhandle_get(sock, NULL, NULL);
-       req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
-
-       isc__nm_uvreq_put(&req, sock);
+       isc__nm_connectcb(sock, req, ISC_R_SUCCESS);
 
        /*
         * The sock is now attached to the handle.
         */
        isc__nmsocket_detach(&sock);
-
-       /*
-        * The connect callback should have attached to the handle.
-        * If it didn't, the socket will be closed now.
-        */
-       isc_nmhandle_detach(&handle);
 }
 
 isc_result_t
@@ -310,7 +317,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                  isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
                  size_t extrahandlesize) {
        isc_result_t result = ISC_R_SUCCESS;
-       isc_nmsocket_t *nsock = NULL, *tmp = NULL;
+       isc_nmsocket_t *sock = NULL, *tmp = NULL;
        isc__netievent_tcpconnect_t *ievent = NULL;
        isc__nm_uvreq_t *req = NULL;
 
@@ -318,50 +325,50 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        REQUIRE(local != NULL);
        REQUIRE(peer != NULL);
 
-       nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
-       isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
+       sock = isc_mem_get(mgr->mctx, sizeof(*sock));
+       isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local);
 
-       nsock->extrahandlesize = extrahandlesize;
-       nsock->connect_timeout = timeout;
+       sock->extrahandlesize = extrahandlesize;
+       sock->connect_timeout = timeout;
 
-       atomic_init(&nsock->result, ISC_R_SUCCESS);
-       atomic_init(&nsock->client, true);
+       atomic_init(&sock->result, ISC_R_SUCCESS);
+       atomic_init(&sock->client, true);
 
-       req = isc__nm_uvreq_get(mgr, nsock);
+       req = isc__nm_uvreq_get(mgr, sock);
        req->cb.connect = cb;
        req->cbarg = cbarg;
        req->peer = peer->addr;
        req->local = local->addr;
 
        ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
-       ievent->sock = nsock;
+       ievent->sock = sock;
        ievent->req = req;
 
        /*
         * Async callbacks can dereference the socket in the meantime,
         * we need to hold an additional reference to it.
         */
-       isc__nmsocket_attach(nsock, &tmp);
+       isc__nmsocket_attach(sock, &tmp);
 
        if (isc__nm_in_netthread()) {
-               nsock->tid = isc_nm_tid();
-               isc__nm_async_tcpconnect(&mgr->workers[nsock->tid],
+               sock->tid = isc_nm_tid();
+               isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
                                         (isc__netievent_t *)ievent);
                isc__nm_put_ievent(mgr, ievent);
        } else {
-               nsock->tid = isc_random_uniform(mgr->nworkers);
-               isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
+               sock->tid = isc_random_uniform(mgr->nworkers);
+               isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
                                       (isc__netievent_t *)ievent);
 
-               LOCK(&nsock->lock);
-               while (!atomic_load(&nsock->connected) &&
-                      !atomic_load(&nsock->connect_error)) {
-                       WAIT(&nsock->cond, &nsock->lock);
+               LOCK(&sock->lock);
+               while (!atomic_load(&sock->connected) &&
+                      !atomic_load(&sock->connect_error)) {
+                       WAIT(&sock->cond, &sock->lock);
                }
-               UNLOCK(&nsock->lock);
+               UNLOCK(&sock->lock);
        }
 
-       result = atomic_load(&nsock->result);
+       result = atomic_load(&sock->result);
 
        isc__nmsocket_detach(&tmp);
 
@@ -581,13 +588,11 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__netievent_tcpchildaccept_t *ievent =
                (isc__netievent_tcpchildaccept_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
-       isc_nmhandle_t *handle;
        isc_result_t result;
+       isc__nm_uvreq_t *req = NULL;
        struct sockaddr_storage ss;
        isc_sockaddr_t local;
        int r;
-       isc_nm_accept_cb_t accept_cb;
-       void *accept_cbarg;
 
        REQUIRE(isc__nm_in_netthread());
        REQUIRE(sock->tid == isc_nm_tid());
@@ -650,25 +655,22 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
        }
        sock->accepting = false;
 
-       handle = isc__nmhandle_get(sock, NULL, &local);
-
        INSIST(sock->accept_cb != NULL);
-       accept_cb = sock->accept_cb;
-       accept_cbarg = sock->accept_cbarg;
 
        sock->read_timeout = sock->mgr->init;
-       accept_cb(handle, ISC_R_SUCCESS, accept_cbarg);
+
+       req = isc__nm_uvreq_get(sock->mgr, sock);
+       req->handle = isc__nmhandle_get(sock, NULL, &local);
+       req->cb.accept = sock->accept_cb;
+       req->cbarg = sock->accept_cbarg;
+
+       isc__nm_acceptcb(sock, req, ISC_R_SUCCESS);
 
        /*
         * sock is now attached to the handle.
         */
        isc__nmsocket_detach(&sock);
 
-       /*
-        * The accept callback should have attached to the handle.
-        * If it didn't, the socket will be closed now.
-        */
-       isc_nmhandle_detach(&handle);
        return;
 
 error:
@@ -733,9 +735,6 @@ tcp_listenclose_cb(uv_handle_t *handle) {
 
 static void
 failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
-       isc_nm_recv_cb_t cb;
-       void *cbarg = NULL;
-
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->statichandle != NULL);
 
@@ -750,12 +749,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
 
        uv_read_stop(&sock->uv_handle.stream);
 
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-       isc__nmsocket_clearcb(sock);
+       if (sock->recv_cb != NULL) {
+               isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock);
+               isc_nmhandle_attach(sock->statichandle, &req->handle);
+               req->cb.recv = sock->recv_cb;
+               req->cbarg = sock->recv_cbarg;
+
+               isc__nmsocket_clearcb(sock);
+
+               isc__nm_readcb(sock, req, result);
+       }
+}
+
+static void
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult) {
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(req));
 
-       if (cb != NULL) {
-               cb(sock->statichandle, result, NULL, cbarg);
+       if (req->cb.send != NULL) {
+               isc__nm_sendcb(sock, req, eresult);
+       } else {
+               isc__nm_uvreq_put(&req, sock);
        }
 }
 
@@ -790,29 +805,17 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        REQUIRE(VALID_NMHANDLE(handle));
        REQUIRE(VALID_NMSOCK(handle->sock));
 
-       if (!isc__nmsocket_active(sock)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
+       sock->recv_cb = cb;
+       sock->recv_cbarg = cbarg;
 
-       if (atomic_load(&sock->mgr->closing)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
+       if (inactive(sock)) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
+               failed_read_cb(sock, ISC_R_CANCELED);
                return;
        }
 
        REQUIRE(sock->tid == isc_nm_tid());
 
-       sock->recv_cb = cb;
-       sock->recv_cbarg = cbarg;
-
        sock->read_timeout = (atomic_load(&sock->keepalive)
                                      ? sock->mgr->keepalive
                                      : sock->mgr->idle);
@@ -864,17 +867,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
 
        REQUIRE(worker->id == isc_nm_tid());
 
-       if (!isc__nmsocket_active(sock)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                failed_read_cb(sock, ISC_R_CANCELED);
                return;
        }
@@ -982,13 +975,22 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
        REQUIRE(buf != NULL);
 
        if (nread >= 0) {
-               isc_region_t region = { .base = (unsigned char *)buf->base,
-                                       .length = nread };
-               isc_nm_recv_cb_t cb = sock->recv_cb;
-               void *cbarg = sock->recv_cbarg;
-
-               if (cb != NULL) {
-                       cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg);
+               if (sock->recv_cb != NULL) {
+                       isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr,
+                                                                sock);
+                       req->cb.recv = sock->recv_cb;
+                       req->cbarg = sock->recv_cbarg;
+                       isc_nmhandle_attach(sock->statichandle, &req->handle);
+
+                       /*
+                        * The callback will be called synchronously because the
+                        * result is ISC_R_SUCCESS, so we don't need to retain
+                        * the buffer
+                        */
+                       req->uvbuf.base = buf->base;
+                       req->uvbuf.len = nread;
+
+                       isc__nm_readcb(sock, req, ISC_R_SUCCESS);
                }
 
                if (sock->timer_initialized && sock->read_timeout != 0) {
@@ -1183,24 +1185,6 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
 
        REQUIRE(sock->type == isc_nm_tcpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
-
        uvreq = isc__nm_uvreq_get(sock->mgr, sock);
        uvreq->uvbuf.base = (char *)region->base;
        uvreq->uvbuf.len = region->length;
@@ -1210,6 +1194,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
        uvreq->cb.send = cb;
        uvreq->cbarg = cbarg;
 
+       if (inactive(sock)) {
+               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
+               failed_send_cb(sock, uvreq, ISC_R_CANCELED);
+               return;
+       }
+
        if (sock->tid == isc_nm_tid()) {
                /*
                 * If we're in the same thread as the socket we can send the
@@ -1219,8 +1209,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                if (result != ISC_R_SUCCESS) {
                        isc__nm_incstats(sock->mgr,
                                         sock->statsindex[STATID_SENDFAIL]);
-                       uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-                       isc__nm_uvreq_put(&uvreq, sock);
+                       failed_send_cb(sock, uvreq, result);
                }
        } else {
                /*
@@ -1284,13 +1273,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        REQUIRE(sock->tid == isc_nm_tid());
        REQUIRE(sock->type == isc_nm_tcpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
-               return (ISC_R_CANCELED);
-       }
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               return (ISC_R_CANCELED);
-       }
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                return (ISC_R_CANCELED);
        }
 
@@ -1408,11 +1391,6 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
        }
 
        if (atomic_load(&sock->connecting)) {
-               if (sock->timer_initialized) {
-                       isc__nm_uvreq_t *req =
-                               uv_handle_get_data((uv_handle_t *)&sock->timer);
-                       failed_connect_cb(sock, req, ISC_R_CANCELED);
-               }
                return;
        }
 
index 2a1596d0e8c3d0a7998eb5bc405c08d76afd5b7c..f742c64a654b2e575c90072cca9a5b8ecc91f467 100644 (file)
@@ -787,11 +787,6 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
 
        isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
 
-       if (result != ISC_R_SUCCESS) {
-               cb(NULL, result, cbarg);
-               return;
-       }
-
        dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock));
        isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket,
                           handle->sock->iface);
@@ -807,6 +802,13 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
 
        readhandle = isc__nmhandle_get(dnssock, NULL, NULL);
 
+       if (result != ISC_R_SUCCESS) {
+               cb(readhandle, result, cbarg);
+               isc__nmsocket_detach(&dnssock);
+               isc_nmhandle_detach(&readhandle);
+               return;
+       }
+
        INSIST(dnssock->statichandle != NULL);
        INSIST(dnssock->statichandle == readhandle);
        INSIST(readhandle->sock == dnssock);
@@ -838,20 +840,26 @@ isc_result_t
 isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                     isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
                     size_t extrahandlesize) {
-       tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t));
+       isc_result_t result = ISC_R_SUCCESS;
+       tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(*conn));
 
        *conn = (tcpconnect_t){ .cb = cb,
                                .cbarg = cbarg,
                                .extrahandlesize = extrahandlesize };
        isc_mem_attach(mgr->mctx, &conn->mctx);
-       return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
-                                 timeout, 0));
+       result = isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
+                                  timeout, 0);
+       if (result != ISC_R_SUCCESS) {
+               isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
+       }
+       return (result);
 }
 
 isc_result_t
 isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                     isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
                     size_t extrahandlesize) {
+       isc_result_t result = ISC_R_SUCCESS;
        tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t));
        SSL_CTX *ctx = NULL;
 
@@ -861,9 +869,12 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
        isc_mem_attach(mgr->mctx, &conn->mctx);
 
        ctx = SSL_CTX_new(SSLv23_client_method());
-       isc_result_t result = isc_nm_tlsconnect(
-               mgr, local, peer, tcpdnsconnect_cb, conn, ctx, timeout, 0);
+       result = isc_nm_tlsconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
+                                  ctx, timeout, 0);
        SSL_CTX_free(ctx);
+       if (result != ISC_R_SUCCESS) {
+               isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
+       }
        return (result);
 }
 
index acaaeb1ad5b4f01f671f9e9266238726922e6214..fdface5aa27cb12aaf7feb116798a02c960dd409 100644 (file)
@@ -79,6 +79,7 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
        UNUSED(handle);
        /*  XXXWPK TODO */
        UNUSED(eresult);
+
        isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base,
                    sock->tls.senddata.length);
        sock->tls.senddata = (isc_region_t){ NULL, 0 };
@@ -701,7 +702,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        REQUIRE(VALID_NMSOCK(tlssock));
 
        if (result != ISC_R_SUCCESS) {
-               tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
+               tlssock->connect_cb(handle, result, tlssock->connect_cbarg);
                atomic_store(&tlssock->result, result);
                atomic_store(&tlssock->connect_error, true);
                tls_close_direct(tlssock);
@@ -714,7 +715,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        isc_nmhandle_attach(handle, &tlssock->outerhandle);
        result = initialize_tls(tlssock, false);
        if (result != ISC_R_SUCCESS) {
-               tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
+               tlssock->connect_cb(handle, result, tlssock->connect_cbarg);
                atomic_store(&tlssock->result, result);
                atomic_store(&tlssock->connect_error, true);
                tls_close_direct(tlssock);
@@ -742,6 +743,7 @@ isc__nm_async_tlsconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
                                   tls_connect_cb, tlssock,
                                   tlssock->connect_timeout, 0);
        if (result != ISC_R_SUCCESS) {
+               /* FIXME: We need to pass valid handle */
                tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
                atomic_store(&tlssock->result, result);
                atomic_store(&tlssock->connect_error, true);
index 0d085124f79d69839bc160a6287d89ad893a3cec..a12026a1e19e5846049a32e66a1cdd6494cf2321 100644 (file)
@@ -51,8 +51,15 @@ static void
 failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
 
 static void
-failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
-                 isc_result_t eresult);
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult);
+
+static bool
+inactive(isc_nmsocket_t *sock) {
+       return (!isc__nmsocket_active(sock) ||
+               atomic_load(&sock->mgr->closing) ||
+               (sock->server != NULL && !isc__nmsocket_active(sock->server)));
+}
 
 isc_result_t
 isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
@@ -337,14 +344,11 @@ static void
 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
            const struct sockaddr *addr, unsigned flags) {
        isc_result_t result;
-       isc_nmhandle_t *nmhandle = NULL;
        isc_sockaddr_t sockaddr;
        isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
-       isc_region_t region;
+       isc__nm_uvreq_t *req = NULL;
        uint32_t maxudp;
        bool free_buf;
-       isc_nm_recv_cb_t cb;
-       void *cbarg;
 
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->tid == isc_nm_tid());
@@ -383,38 +387,35 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
                goto done;
        }
 
-       region.base = (unsigned char *)buf->base;
-       region.length = nrecv;
-
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-
        if (sock->timer_running) {
                uv_timer_stop(&sock->timer);
                sock->timer_running = false;
        }
 
+       req = isc__nm_uvreq_get(sock->mgr, sock);
+       req->cb.recv = sock->recv_cb;
+       req->cbarg = sock->recv_cbarg;
+       /*
+        * The callback will be called synchronously, because result is
+        * ISC_R_SUCCESS.
+        */
+       req->uvbuf.base = buf->base;
+       req->uvbuf.len = nrecv;
+
        if (atomic_load(&sock->client)) {
                if (nrecv < 0) {
                        failed_read_cb(sock, isc__nm_uverr2result(nrecv));
                        return;
                }
 
-               cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg);
+               isc_nmhandle_attach(sock->statichandle, &req->handle);
        } else {
                result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
                RUNTIME_CHECK(result == ISC_R_SUCCESS);
 
-               nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
-
-               cb(nmhandle, ISC_R_SUCCESS, &region, cbarg);
-
-               /*
-                * If the recv callback wants to hold on to the handle,
-                * it needs to attach to it.
-                */
-               isc_nmhandle_detach(&nmhandle);
+               req->handle = isc__nmhandle_get(sock, &sockaddr, NULL);
        }
+       isc__nm_readcb(sock, req, ISC_R_SUCCESS);
 
 done:
        if (free_buf) {
@@ -438,21 +439,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
        uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
        int ntid;
 
-       if (!isc__nmsocket_active(sock)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
+       uvreq = isc__nm_uvreq_get(sock->mgr, sock);
+       uvreq->uvbuf.base = (char *)region->base;
+       uvreq->uvbuf.len = region->length;
 
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
-               return;
-       }
+       isc_nmhandle_attach(handle, &uvreq->handle);
 
-       if (atomic_load(&sock->mgr->closing)) {
+       uvreq->cb.send = cb;
+       uvreq->cbarg = cbarg;
+
+       if (inactive(sock)) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, cbarg);
+               failed_send_cb(sock, uvreq, ISC_R_CANCELED);
                return;
        }
 
@@ -465,6 +463,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
         * we need to do so here.
         */
        if (maxudp != 0 && region->length > maxudp) {
+               isc__nm_uvreq_put(&uvreq, sock);
                isc_nmhandle_detach(&handle);
                return;
        }
@@ -497,15 +496,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                rsock = &psock->children[ntid];
        }
 
-       uvreq = isc__nm_uvreq_get(sock->mgr, sock);
-       uvreq->uvbuf.base = (char *)region->base;
-       uvreq->uvbuf.len = region->length;
-
-       isc_nmhandle_attach(handle, &uvreq->handle);
-
-       uvreq->cb.send = cb;
-       uvreq->cbarg = cbarg;
-
        if (isc_nm_tid() == rsock->tid) {
                /*
                 * If we're in the same thread as the socket we can send
@@ -516,8 +506,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
                if (result != ISC_R_SUCCESS) {
                        isc__nm_incstats(rsock->mgr,
                                         rsock->statsindex[STATID_SENDFAIL]);
-                       uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-                       isc__nm_uvreq_put(&uvreq, sock);
+                       failed_send_cb(rsock, uvreq, result);
                }
        } else {
                /*
@@ -547,23 +536,21 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
        REQUIRE(worker->id == sock->tid);
 
        if (!isc__nmsocket_active(ievent->sock)) {
-               uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq, sock);
+               failed_send_cb(sock, uvreq, ISC_R_CANCELED);
                return;
        }
 
        result = udp_send_direct(sock, uvreq, &ievent->peer);
        if (result != ISC_R_SUCCESS) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-               isc__nm_uvreq_put(&uvreq, sock);
+               failed_send_cb(sock, uvreq, result);
        }
 }
 
 static void
 udp_send_cb(uv_udp_send_t *req, int status) {
        isc_result_t result = ISC_R_SUCCESS;
-       isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
+       isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req);
        isc_nmsocket_t *sock = uvreq->sock;
 
        REQUIRE(VALID_UVREQ(uvreq));
@@ -574,8 +561,7 @@ udp_send_cb(uv_udp_send_t *req, int status) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
        }
 
-       uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
-       isc__nm_uvreq_put(&uvreq, uvreq->sock);
+       isc__nm_sendcb(sock, uvreq, result);
 }
 
 /*
@@ -593,13 +579,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        REQUIRE(sock->tid == isc_nm_tid());
        REQUIRE(sock->type == isc_nm_udpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
-               return (ISC_R_CANCELED);
-       }
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               return (ISC_R_CANCELED);
-       }
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                return (ISC_R_CANCELED);
        }
 
@@ -623,7 +603,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
        return (ISC_R_SUCCESS);
 }
 
-static int
+static isc_result_t
 udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        isc__networker_t *worker = NULL;
        int uv_bind_flags = UV_UDP_REUSEADDR;
@@ -642,11 +622,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                /* Socket was never opened; no need for isc__nm_udp_close() */
                atomic_store(&sock->closing, true);
                atomic_store(&sock->closed, true);
-               atomic_store(&sock->result, isc__nm_uverr2result(r));
                atomic_store(&sock->connect_error, true);
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
 
        r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
@@ -654,10 +632,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
                atomic_store(&sock->connect_error, true);
                atomic_store(&sock->result, isc__nm_uverr2result(r));
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_udp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
 
@@ -671,10 +648,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
                atomic_store(&sock->connect_error, true);
                atomic_store(&sock->result, isc__nm_uverr2result(r));
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_udp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
 
        uv_handle_set_data(&sock->uv_handle.handle, sock);
@@ -685,10 +661,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                                 sock->statsindex[STATID_CONNECTFAIL]);
                atomic_store(&sock->connect_error, true);
                atomic_store(&sock->result, isc__nm_uverr2result(r));
-               failed_connect_cb(sock, req, isc__nm_uverr2result(r));
                atomic_store(&sock->active, false);
                isc__nm_udp_close(sock);
-               return (r);
+               return (isc__nm_uverr2result(r));
        }
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
        atomic_store(&sock->connecting, false);
@@ -701,7 +676,7 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        uv_send_buffer_size(&sock->uv_handle.handle,
                            &(int){ ISC_SEND_BUFFER_SIZE });
 #endif
-       return (0);
+       return (ISC_R_SUCCESS);
 }
 
 /*
@@ -714,37 +689,27 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
                (isc__netievent_udpconnect_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
        isc__nm_uvreq_t *req = ievent->req;
-       isc_nmhandle_t *handle = NULL;
-       isc_nm_cb_t cb;
-       void *cbarg;
-       int r;
        isc_result_t result;
 
        UNUSED(worker);
 
+       REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->type == isc_nm_udpsocket);
        REQUIRE(sock->iface != NULL);
        REQUIRE(sock->parent == NULL);
        REQUIRE(sock->tid == isc_nm_tid());
 
-       cb = sock->connect_cb;
-       cbarg = sock->connect_cbarg;
-
-       r = udp_connect_direct(sock, req);
-       if (r != 0) {
-               LOCK(&sock->lock);
-               SIGNAL(&sock->cond);
-               UNLOCK(&sock->lock);
-               return;
+       req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
+       result = udp_connect_direct(sock, req);
+       atomic_store(&sock->result, result);
+       if (result == ISC_R_SUCCESS) {
+               atomic_store(&sock->connected, true);
+               isc__nm_connectcb(sock, req, ISC_R_SUCCESS);
+       } else {
+               atomic_store(&sock->connect_error, true);
+               isc__nm_uvreq_put(&req, sock);
        }
 
-       atomic_store(&sock->connected, true);
-       atomic_store(&sock->result, ISC_R_SUCCESS);
-       result = atomic_load(&sock->result);
-
-       handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
-       cb(handle, result, cbarg);
-
        LOCK(&sock->lock);
        SIGNAL(&sock->cond);
        UNLOCK(&sock->lock);
@@ -753,12 +718,6 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
         * The sock is now attached to the handle.
         */
        isc__nmsocket_detach(&sock);
-
-       /*
-        * The connect callback should have attached to the handle.
-        * If it didn't, the socket will be closed now.
-        */
-       isc_nmhandle_detach(&handle);
 }
 
 isc_result_t
@@ -832,7 +791,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                isc__nm_async_udpconnect(&mgr->workers[sock->tid],
                                         (isc__netievent_t *)event);
                isc__nm_put_ievent(mgr, event);
-               isc__nm_uvreq_put(&req, sock);
        } else {
                sock->tid = isc_random_uniform(mgr->nworkers);
                isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
@@ -844,7 +802,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
                        WAIT(&sock->cond, &sock->lock);
                }
                UNLOCK(&sock->lock);
-               isc__nm_uvreq_put(&req, sock);
        }
 
        result = atomic_load(&sock->result);
@@ -865,9 +822,6 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
 
 static void
 failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
-       isc_nm_recv_cb_t cb;
-       void *cbarg = NULL;
-
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->statichandle != NULL);
 
@@ -878,12 +832,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
 
        uv_udp_recv_stop(&sock->uv_handle.udp);
 
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-       isc__nmsocket_clearcb(sock);
+       if (sock->recv_cb != NULL) {
+               isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock);
+               isc_nmhandle_attach(sock->statichandle, &req->handle);
+               req->cb.recv = sock->recv_cb;
+               req->cbarg = sock->recv_cbarg;
+
+               isc__nmsocket_clearcb(sock);
+
+               isc__nm_readcb(sock, req, result);
+       }
+}
+
+static void
+failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+              isc_result_t eresult) {
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(VALID_UVREQ(req));
 
-       if (cb != NULL) {
-               cb(sock->statichandle, result, NULL, cbarg);
+       if (req->cb.send != NULL) {
+               isc__nm_sendcb(sock, req, eresult);
+       } else {
+               isc__nm_uvreq_put(&req, sock);
        }
 }
 
@@ -909,17 +879,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
        isc_nmsocket_t *sock = ievent->sock;
 
-       if (!isc__nmsocket_active(sock)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               failed_read_cb(sock, ISC_R_CANCELED);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
+       if (inactive(sock)) {
                failed_read_cb(sock, ISC_R_CANCELED);
                return;
        }
@@ -948,24 +908,12 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        REQUIRE(VALID_NMSOCK(handle->sock));
        REQUIRE(handle->sock->type == isc_nm_udpsocket);
 
-       if (!isc__nmsocket_active(sock)) {
+       if (inactive(sock)) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
                cb(handle, ISC_R_CANCELED, NULL, cbarg);
                return;
        }
 
-       if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
-
-       if (atomic_load(&sock->mgr->closing)) {
-               isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
-               cb(handle, ISC_R_CANCELED, NULL, cbarg);
-               return;
-       }
-
        REQUIRE(sock->tid == isc_nm_tid());
        sock->recv_cb = cb;
        sock->recv_cbarg = cbarg;
@@ -1057,35 +1005,6 @@ isc__nm_udp_close(isc_nmsocket_t *sock) {
        }
 }
 
-static void
-failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
-                 isc_result_t eresult) {
-       REQUIRE(sock->tid == isc_nm_tid());
-
-       if (sock->timer_running) {
-               uv_timer_stop(&sock->timer);
-               sock->timer_running = false;
-       }
-
-       if (!atomic_load(&sock->connecting)) {
-               return;
-       }
-
-       atomic_store(&sock->connecting, false);
-
-       INSIST(req != NULL);
-
-       isc__nmsocket_clearcb(sock);
-
-       if (req->cb.connect != NULL) {
-               req->cb.connect(NULL, eresult, req->cbarg);
-       }
-       req->cb.connect = NULL;
-       req->cbarg = NULL;
-
-       isc__nmsocket_detach(&sock);
-}
-
 void
 isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
        REQUIRE(VALID_NMSOCK(sock));
@@ -1096,11 +1015,6 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
        }
 
        if (atomic_load(&sock->connecting)) {
-               if (sock->timer_initialized) {
-                       isc__nm_uvreq_t *req =
-                               uv_handle_get_data((uv_handle_t *)&sock->timer);
-                       failed_connect_cb(sock, req, ISC_R_CANCELED);
-               }
                return;
        }