]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
implement isc_nm_tcpconnect()
authorEvan Hunt <each@isc.org>
Tue, 17 Dec 2019 02:24:55 +0000 (18:24 -0800)
committerOndřej Surý <ondrej@isc.org>
Thu, 1 Oct 2020 14:44:43 +0000 (16:44 +0200)
the isc_nm_tcpconnect() function establishes a client connection via
TCP.  once the connection is esablished, a callback function will be
called with a newly created network manager handle.

(cherry picked from commit abbb79f9d18d3313973e214b7118a6436ff47063)

lib/isc/include/isc/netmgr.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/win32/libisc.def.in

index e030cf06924beaf24aa6ab1692c7df48e9caffec..1c9fb8f4c10d658b9ec7393b7ac3ce5ff87c22dd 100644 (file)
@@ -238,6 +238,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
  * prepended with a two-byte length field, and handles buffering.
  */
 
+isc_result_t
+isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
+                 isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize);
+/*%<
+ * Create a socket using netmgr 'mgr', bind it to the address 'local',
+ * and connect it to the address 'peer'.
+ *
+ * When the connection is complete, call 'cb' with argument 'cbarg'.
+ * Allocate 'extrahandlesize' additional bytes along with the handle to use
+ * for an associated object.
+ *
+ * The connected socket can only be accessed via the handle passed to
+ * 'cb'.
+ */
+
 isc_result_t
 isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
                    void *cbarg, isc_nm_cb_t accept_cb, void *accept_cbarg,
index f6d8417f6d8113f12fe7a5394caedd93d7f55675..ecb6cadda32741ea01de91922dbfff9448dcee5b 100644 (file)
@@ -96,8 +96,7 @@ struct isc_nmhandle {
         * the socket.
         */
        isc_nmsocket_t *sock;
-       size_t ah_pos; /* Position in the socket's
-                       * 'active handles' array */
+       size_t ah_pos; /* Position in the socket's 'active handles' array */
 
        /*
         * The handle is 'inflight' if netmgr is not currently processing
@@ -141,6 +140,7 @@ typedef enum isc__netievent_type {
        netievent_closecb,
        netievent_shutdown,
        netievent_stop,
+
        netievent_prio = 0xff, /* event type values higher than this
                                * will be treated as high-priority
                                * events, which can be processed
@@ -443,6 +443,8 @@ struct isc_nmsocket {
        atomic_bool closed;
        atomic_bool listening;
        atomic_bool listen_error;
+       atomic_bool connected;
+       atomic_bool connect_error;
        isc_refcount_t references;
 
        /*%
index 3f23eff056441ee8364d7c904b3001b9c469388a..84bd31edc6c28bc043db0e2df9dd3b0eeb700082 100644 (file)
@@ -1140,6 +1140,9 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
 
 static void
 nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
+       size_t handlenum;
+       bool reuse = false;
+
        /*
         * We do all of this under lock to avoid races with socket
         * destruction.  We have to do this now, because at this point the
@@ -1152,10 +1155,9 @@ nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
        INSIST(atomic_load(&sock->ah) > 0);
 
        sock->ah_handles[handle->ah_pos] = NULL;
-       size_t handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
+       handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
        sock->ah_frees[handlenum] = handle->ah_pos;
        handle->ah_pos = 0;
-       bool reuse = false;
        if (atomic_load(&sock->active)) {
                reuse = isc_astack_trypush(sock->inactivehandles, handle);
        }
index b23860f5b5d5233c42a462ff719f7c5966572a96..38afd00ddbb60ef08d0e1cc406ad81d8d4a49dfe 100644 (file)
@@ -88,6 +88,10 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
        if (r != 0) {
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
+               /* Socket was never opened; no need for tcp_close_direct() */
+               atomic_store(&sock->closed, true);
+               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->connect_error, true);
                return (r);
        }
 
@@ -96,13 +100,23 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                if (r != 0) {
                        isc__nm_incstats(sock->mgr,
                                         sock->statsindex[STATID_BINDFAIL]);
+                       sock->result = isc__nm_uverr2result(r);
+                       atomic_store(&sock->connect_error, true);
                        tcp_close_direct(sock);
                        return (r);
                }
        }
+
        uv_handle_set_data(&sock->uv_handle.handle, sock);
        r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
                           &req->peer.type.sa, tcp_connect_cb);
+       if (r != 0) {
+               isc__nm_incstats(sock->mgr,
+                                sock->statsindex[STATID_CONNECTFAIL]);
+               sock->result = isc__nm_uverr2result(r);
+               atomic_store(&sock->connect_error, true);
+               tcp_close_direct(sock);
+       }
        return (r);
 }
 
@@ -114,14 +128,21 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
        isc__nm_uvreq_t *req = ievent->req;
        int r;
 
-       REQUIRE(sock->type == isc_nm_tcpsocket);
-       REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id);
+       UNUSED(worker);
 
        r = tcp_connect_direct(sock, req);
        if (r != 0) {
                /* We need to issue callbacks ourselves */
                tcp_connect_cb(&req->uv_req.connect, r);
+               goto done;
        }
+
+       atomic_store(&sock->connected, true);
+
+done:
+       LOCK(&sock->lock);
+       SIGNAL(&sock->cond);
+       UNLOCK(&sock->lock);
 }
 
 static void
@@ -138,6 +159,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
                struct sockaddr_storage ss;
                isc_nmhandle_t *handle = NULL;
 
+               sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
                uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
                                   &(int){ sizeof(ss) });
@@ -165,13 +187,61 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
                 * TODO:
                 * Handle the connect error properly and free the socket.
                 */
-               isc__nm_incstats(sock->mgr,
-                                sock->statsindex[STATID_CONNECTFAIL]);
                req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
                isc__nm_uvreq_put(&req, sock);
        }
 }
 
+isc_result_t
+isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
+                 isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) {
+       isc_nmsocket_t *nsock = NULL;
+       isc__netievent_tcpconnect_t *ievent = NULL;
+       isc__nm_uvreq_t *req = NULL;
+
+       REQUIRE(VALID_NM(mgr));
+
+       nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
+       isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
+       nsock->extrahandlesize = extrahandlesize;
+       nsock->result = ISC_R_SUCCESS;
+
+       req = isc__nm_uvreq_get(mgr, nsock);
+       req->cb.connect = cb;
+       req->cbarg = cbarg;
+       req->peer = peer->addr;
+
+       ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
+       ievent->sock = nsock;
+       ievent->req = req;
+
+       if (isc__nm_in_netthread()) {
+               nsock->tid = isc_nm_tid();
+               isc__nm_async_tcpconnect(&mgr->workers[nsock->tid],
+                                        (isc__netievent_t *)ievent);
+               isc__nm_put_ievent(mgr, ievent);
+       } else {
+               nsock->tid = isc_random_uniform(mgr->nworkers);
+               isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
+                                      (isc__netievent_t *)ievent);
+
+               LOCK(&nsock->lock);
+               while (!atomic_load(&nsock->connected) &&
+                      !atomic_load(&nsock->connect_error)) {
+                       WAIT(&nsock->cond, &nsock->lock);
+               }
+               UNLOCK(&nsock->lock);
+       }
+
+       if (nsock->result != ISC_R_SUCCESS) {
+               isc_result_t result = nsock->result;
+               isc__nmsocket_detach(&nsock);
+               return (result);
+       }
+
+       return (ISC_R_SUCCESS);
+}
+
 isc_result_t
 isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
                 void *cbarg, size_t extrahandlesize, int backlog,
@@ -245,8 +315,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
 
        r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
        if (r != 0) {
-               /* It was never opened */
                isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
+               /* The socket was never opened, so no need for uv_close() */
                atomic_store(&sock->closed, true);
                sock->result = isc__nm_uverr2result(r);
                atomic_store(&sock->listen_error, true);
@@ -866,6 +936,7 @@ static void
 tcp_send_cb(uv_write_t *req, int status) {
        isc_result_t result = ISC_R_SUCCESS;
        isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
+       isc_nmsocket_t *sock = NULL;
 
        REQUIRE(VALID_UVREQ(uvreq));
        REQUIRE(VALID_NMHANDLE(uvreq->handle));
@@ -877,8 +948,10 @@ tcp_send_cb(uv_write_t *req, int status) {
        }
 
        uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
+
+       sock = uvreq->handle->sock;
        isc_nmhandle_unref(uvreq->handle);
-       isc__nm_uvreq_put(&uvreq, uvreq->handle->sock);
+       isc__nm_uvreq_put(&uvreq, sock);
 }
 
 /*
@@ -931,6 +1004,7 @@ tcp_close_cb(uv_handle_t *uvhandle) {
 
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
        atomic_store(&sock->closed, true);
+       atomic_store(&sock->connected, false);
        isc__nmsocket_prep_destroy(sock);
 }
 
index c63eb53539be9375b93f0a71e71c4692cbe5f0ef..ad27e4b35ff123d670edc295cec5ad9b982f43fc 100644 (file)
@@ -463,6 +463,7 @@ isc_nm_send
 isc_nm_setstats
 isc_nm_start
 isc_nm_stoplistening
+isc_nm_tcpconnect
 isc_nm_tcp_gettimeouts
 isc_nm_tcp_settimeouts
 isc_nm_tcpdns_keepalive