From: Evan Hunt Date: Tue, 17 Dec 2019 02:24:55 +0000 (-0800) Subject: implement isc_nm_tcpconnect() X-Git-Tag: v9.17.3~49^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=abbb79f9d18d3313973e214b7118a6436ff47063;p=thirdparty%2Fbind9.git implement isc_nm_tcpconnect() the isc_nm_tcpconnect() function establishes a client connection via TCP. once the connection is esablished, a callback function will be called with a newly created network manager handle. --- diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index 52262d58578..10b75bd171c 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -238,6 +238,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, * prepended with a two-byte length field, and handles buffering. */ +isc_result_t +isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, + isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize); +/*%< + * Create a socket using netmgr 'mgr', bind it to the address 'local', + * and connect it to the address 'peer'. + * + * When the connection is complete, call 'cb' with argument 'cbarg'. + * Allocate 'extrahandlesize' additional bytes along with the handle to use + * for an associated object. + * + * The connected socket can only be accessed via the handle passed to + * 'cb'. + */ + isc_result_t isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, void *cbarg, isc_nm_cb_t accept_cb, void *accept_cbarg, diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 2a38b5b5325..5325c8cfbf8 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -96,8 +96,7 @@ struct isc_nmhandle { * the socket. */ isc_nmsocket_t *sock; - size_t ah_pos; /* Position in the socket's - * 'active handles' array */ + size_t ah_pos; /* Position in the socket's 'active handles' array */ /* * The handle is 'inflight' if netmgr is not currently processing @@ -141,6 +140,7 @@ typedef enum isc__netievent_type { netievent_closecb, netievent_shutdown, netievent_stop, + netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority * events, which can be processed @@ -443,6 +443,8 @@ struct isc_nmsocket { atomic_bool closed; atomic_bool listening; atomic_bool listen_error; + atomic_bool connected; + atomic_bool connect_error; isc_refcount_t references; /*% diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index a1e69887108..0f45cc90d53 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1140,6 +1140,9 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { static void nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { + size_t handlenum; + bool reuse = false; + /* * We do all of this under lock to avoid races with socket * destruction. We have to do this now, because at this point the @@ -1152,10 +1155,9 @@ nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { INSIST(atomic_load(&sock->ah) > 0); sock->ah_handles[handle->ah_pos] = NULL; - size_t handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; + handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; sock->ah_frees[handlenum] = handle->ah_pos; handle->ah_pos = 0; - bool reuse = false; if (atomic_load(&sock->active)) { reuse = isc_astack_trypush(sock->inactivehandles, handle); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index f2b487d2962..5e1a57ec7b1 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -88,6 +88,10 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); + /* Socket was never opened; no need for tcp_close_direct() */ + atomic_store(&sock->closed, true); + sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->connect_error, true); return (r); } @@ -96,13 +100,23 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); + sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->connect_error, true); tcp_close_direct(sock); return (r); } } + uv_handle_set_data(&sock->uv_handle.handle, sock); r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, &req->peer.type.sa, tcp_connect_cb); + if (r != 0) { + isc__nm_incstats(sock->mgr, + sock->statsindex[STATID_CONNECTFAIL]); + sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->connect_error, true); + tcp_close_direct(sock); + } return (r); } @@ -114,14 +128,21 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { isc__nm_uvreq_t *req = ievent->req; int r; - REQUIRE(sock->type == isc_nm_tcpsocket); - REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id); + UNUSED(worker); r = tcp_connect_direct(sock, req); if (r != 0) { /* We need to issue callbacks ourselves */ tcp_connect_cb(&req->uv_req.connect, r); + goto done; } + + atomic_store(&sock->connected, true); + +done: + LOCK(&sock->lock); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); } static void @@ -138,6 +159,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { struct sockaddr_storage ss; isc_nmhandle_t *handle = NULL; + sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, &(int){ sizeof(ss) }); @@ -165,13 +187,61 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { * TODO: * Handle the connect error properly and free the socket. */ - isc__nm_incstats(sock->mgr, - sock->statsindex[STATID_CONNECTFAIL]); req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg); isc__nm_uvreq_put(&req, sock); } } +isc_result_t +isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, + isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) { + isc_nmsocket_t *nsock = NULL; + isc__netievent_tcpconnect_t *ievent = NULL; + isc__nm_uvreq_t *req = NULL; + + REQUIRE(VALID_NM(mgr)); + + nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); + isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local); + nsock->extrahandlesize = extrahandlesize; + nsock->result = ISC_R_SUCCESS; + + req = isc__nm_uvreq_get(mgr, nsock); + req->cb.connect = cb; + req->cbarg = cbarg; + req->peer = peer->addr; + + ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect); + ievent->sock = nsock; + ievent->req = req; + + if (isc__nm_in_netthread()) { + nsock->tid = isc_nm_tid(); + isc__nm_async_tcpconnect(&mgr->workers[nsock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(mgr, ievent); + } else { + nsock->tid = isc_random_uniform(mgr->nworkers); + isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + (isc__netievent_t *)ievent); + + LOCK(&nsock->lock); + while (!atomic_load(&nsock->connected) && + !atomic_load(&nsock->connect_error)) { + WAIT(&nsock->cond, &nsock->lock); + } + UNLOCK(&nsock->lock); + } + + if (nsock->result != ISC_R_SUCCESS) { + isc_result_t result = nsock->result; + isc__nmsocket_detach(&nsock); + return (result); + } + + return (ISC_R_SUCCESS); +} + isc_result_t isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize, int backlog, @@ -245,8 +315,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { - /* It was never opened */ isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); + /* The socket was never opened, so no need for uv_close() */ atomic_store(&sock->closed, true); sock->result = isc__nm_uverr2result(r); atomic_store(&sock->listen_error, true); @@ -866,6 +936,7 @@ static void tcp_send_cb(uv_write_t *req, int status) { isc_result_t result = ISC_R_SUCCESS; isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; + isc_nmsocket_t *sock = NULL; REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); @@ -877,8 +948,10 @@ tcp_send_cb(uv_write_t *req, int status) { } uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); + + sock = uvreq->handle->sock; isc_nmhandle_unref(uvreq->handle); - isc__nm_uvreq_put(&uvreq, uvreq->handle->sock); + isc__nm_uvreq_put(&uvreq, sock); } /* @@ -931,6 +1004,7 @@ tcp_close_cb(uv_handle_t *uvhandle) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); atomic_store(&sock->closed, true); + atomic_store(&sock->connected, false); isc__nmsocket_prep_destroy(sock); } diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 4a7ec68056e..723c6a95fde 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -459,6 +459,7 @@ isc_nm_send isc_nm_setstats isc_nm_start isc_nm_stoplistening +isc_nm_tcpconnect isc_nm_tcp_gettimeouts isc_nm_tcp_settimeouts isc_nm_tcpdns_keepalive