From: Evan Hunt Date: Sat, 5 Sep 2020 18:07:40 +0000 (-0700) Subject: add netmgr functions to support outgoing DNS queries X-Git-Tag: v9.17.7~31^2~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5dcdc00b93d82d4feb5edeb4cd2b73a77325c29a;p=thirdparty%2Fbind9.git add netmgr functions to support outgoing DNS queries - isc_nm_tcpdnsconnect() sets up up an outgoing TCP DNS connection. - isc_nm_tcpconnect(), _udpconnect() and _tcpdnsconnect() now take a timeout argument to ensure connections time out and are correctly cleaned up on failure. - isc_nm_read() now supports UDP; it reads a single datagram and then stops until the next time it's called. - isc_nm_cancelread() now runs asynchronously to prevent assertion failure if reading is interrupted by a non-network thread (e.g. a timeout). - isc_nm_cancelread() can now apply to UDP sockets. - added shim code to support UDP connection in versions of libuv prior to 1.27, when uv_udp_connect() was added all these functions will be used to support outgoing queries in dig, xfrin, dispatch, etc. --- diff --git a/bin/rndc/rndc.c b/bin/rndc/rndc.c index a8f7277a017..bc3d1243285 100644 --- a/bin/rndc/rndc.c +++ b/bin/rndc/rndc.c @@ -603,7 +603,7 @@ rndc_startconnect(isc_sockaddr_t *addr) { DO("create connection", isc_nm_tcpconnect(netmgr, (isc_nmiface_t *)local, (isc_nmiface_t *)addr, rndc_connected, &rndc_ccmsg, - 0)); + 10000, 0)); } static void diff --git a/configure.ac b/configure.ac index 6c16a76c190..c43c51eee6e 100644 --- a/configure.ac +++ b/configure.ac @@ -574,7 +574,7 @@ LIBS="$LIBS $LIBUV_LIBS" # Those functions are only provided in newer versions of libuv, we'll be emulating them # for now -AC_CHECK_FUNCS([uv_handle_get_data uv_handle_set_data uv_import]) +AC_CHECK_FUNCS([uv_handle_get_data uv_handle_set_data uv_import uv_udp_connect uv_translate_sys_error]) AX_RESTORE_FLAGS([libuv]) # diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index 8843cbd54f0..063b5f41c3d 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -180,6 +180,25 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, * can then be freed automatically when the handle is destroyed. */ +isc_result_t +isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, + isc_nm_cb_t cb, void *cbarg, unsigned int timeout, + size_t extrahandlesize); +/*%< + * Open a UDP socket, bind to 'local' and connect to 'peer', and + * immediately call 'cb' with a handle so that the caller can begin + * sending packets over UDP. + * + * When handles are allocated for the socket, 'extrasize' additional bytes + * can be allocated along with the handle for an associated object, which + * can then be freed automatically when the handle is destroyed. + * + * 'timeout' specifies the timeout interval in milliseconds. + * + * The connected socket can only be accessed via the handle passed to + * 'cb'. + */ + void isc_nm_stoplistening(isc_nmsocket_t *sock); /*%< @@ -277,14 +296,17 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_result_t isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, - isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize); + isc_nm_cb_t cb, void *cbarg, unsigned int timeout, + size_t extrahandlesize); /*%< * Create a socket using netmgr 'mgr', bind it to the address 'local', * and connect it to the address 'peer'. * - * When the connection is complete, call 'cb' with argument 'cbarg'. - * Allocate 'extrahandlesize' additional bytes along with the handle to use - * for an associated object. + * When the connection is complete or has timed out, call 'cb' with + * argument 'cbarg'. Allocate 'extrahandlesize' additional bytes along + * with the handle to use for an associated object. + * + * 'timeout' specifies the timeout interval in milliseconds. * * The connected socket can only be accessed via the handle passed to * 'cb'. @@ -390,3 +412,21 @@ isc_nm_setstats(isc_nm_t *mgr, isc_stats_t *stats); *\li stats is a valid set of statistics counters supporting the * full range of socket-related stats counter numbers. */ + +isc_result_t +isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, + isc_nm_cb_t cb, void *cbarg, unsigned int timeout, + size_t extrahandlesize); +/*% + * Establish a DNS client connection over a TCP socket, bound to the + * address 'local', and connected to the address 'peer'. + * + * When the connection is complete or has timed out, call 'cb' with + * argument 'cbarg'. Allocate 'extrahandlesize' additional bytes along + * with the handle to use for an associated object. + * + * 'timeout' specifies the timeout interval in milliseconds. + * + * The connected socket can only be accessed via the handle passed to + * 'cb'. + */ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 99ce506987b..d5a167ad50c 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -135,8 +135,12 @@ struct isc_nmiface { }; typedef enum isc__netievent_type { + netievent_udpconnect, netievent_udpsend, + netievent_udpread, netievent_udpstop, + netievent_udpcancel, + netievent_udpclose, netievent_tcpconnect, netievent_tcpsend, @@ -145,9 +149,12 @@ typedef enum isc__netievent_type { netievent_tcpchildaccept, netievent_tcpaccept, netievent_tcpstop, + netievent_tcpcancel, netievent_tcpclose, netievent_tcpdnssend, + netievent_tcpdnsread, + netievent_tcpdnscancel, netievent_tcpdnsclose, netievent_tcpdnsstop, @@ -212,13 +219,16 @@ typedef struct isc__netievent__socket { } isc__netievent__socket_t; typedef isc__netievent__socket_t isc__netievent_udplisten_t; +typedef isc__netievent__socket_t isc__netievent_udpread_t; typedef isc__netievent__socket_t isc__netievent_udpstop_t; +typedef isc__netievent__socket_t isc__netievent_udpclose_t; typedef isc__netievent__socket_t isc__netievent_tcpstop_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; typedef isc__netievent__socket_t isc__netievent_closecb_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; +typedef isc__netievent__socket_t isc__netievent_tcpdnsread_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsstop_t; typedef struct isc__netievent__socket_req { @@ -227,6 +237,7 @@ typedef struct isc__netievent__socket_req { isc__nm_uvreq_t *req; } isc__netievent__socket_req_t; +typedef isc__netievent__socket_req_t isc__netievent_udpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; @@ -248,6 +259,9 @@ typedef struct isc__netievent__socket_handle { isc_nmhandle_t *handle; } isc__netievent__socket_handle_t; +typedef isc__netievent__socket_handle_t isc__netievent_udpcancel_t; +typedef isc__netievent__socket_handle_t isc__netievent_tcpcancel_t; +typedef isc__netievent__socket_handle_t isc__netievent_tcpdnscancel_t; typedef isc__netievent__socket_handle_t isc__netievent_detach_t; typedef struct isc__netievent__socket_quota { @@ -399,11 +413,14 @@ struct isc_nmsocket { const isc_statscounter_t *statsindex; /*% - * TCP read timeout timer. + * TCP read/connect timeout timers. */ uv_timer_t timer; bool timer_initialized; + bool timer_running; uint64_t read_timeout; + uint64_t connect_timeout; + bool timed_out; /*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */ isc_nmsocket_t *outer; @@ -452,6 +469,7 @@ struct isc_nmsocket { atomic_bool closed; atomic_bool listening; atomic_bool listen_error; + atomic_bool connecting; atomic_bool connected; atomic_bool connect_error; isc_refcount_t references; @@ -506,9 +524,9 @@ struct isc_nmsocket { isc_condition_t cond; /*% - * Used to pass a result back from TCP listening events. + * Used to pass a result back from listen or connect events. */ - isc_result_t result; + atomic_int_fast32_t result; /*% * List of active handles. @@ -551,6 +569,9 @@ struct isc_nmsocket { isc_nm_recv_cb_t recv_cb; void *recv_cbarg; + isc_nm_cb_t connect_cb; + void *connect_cbarg; + isc_nm_accept_cb_t accept_cb; void *accept_cbarg; #ifdef NETMGR_TRACE @@ -698,16 +719,41 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, * Back-end implementation of isc_nm_send() for UDP handles. */ +isc_result_t +isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); +/* + * Back-end implementation of isc_nm_read() for UDP handles. + */ + +void +isc__nm_udp_close(isc_nmsocket_t *sock); +/*%< + * Close a UDP socket. + */ + +void +isc__nm_udp_cancelread(isc_nmhandle_t *handle); +/*%< + * Stop reading on a connected UDP handle. + */ + void isc__nm_udp_stoplistening(isc_nmsocket_t *sock); void isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0); - +void +isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous UDP events (listen, stoplisten, send). */ @@ -780,6 +826,8 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0); void +isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0); +void isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous TCP events (connect, listen, @@ -802,6 +850,8 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock); void isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock); +void +isc__nm_async_tcpdnscancel(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); void @@ -809,6 +859,18 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0); + +isc_result_t +isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); + +void +isc__nm_tcpdns_cancelread(isc_nmhandle_t *handle); +/*%< + * Stop reading on a connected TCPDNS handle. + */ + #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) isc_result_t diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index cb6b18911c4..144380deaf9 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -612,6 +612,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { more = false; break; + case netievent_udpconnect: + isc__nm_async_udpconnect(worker, ievent); + break; case netievent_udplisten: isc__nm_async_udplisten(worker, ievent); break; @@ -621,6 +624,15 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_udpsend: isc__nm_async_udpsend(worker, ievent); break; + case netievent_udpread: + isc__nm_async_udpread(worker, ievent); + break; + case netievent_udpcancel: + isc__nm_async_udpcancel(worker, ievent); + break; + case netievent_udpclose: + isc__nm_async_udpclose(worker, ievent); + break; case netievent_tcpconnect: isc__nm_async_tcpconnect(worker, ievent); @@ -649,13 +661,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpstop: isc__nm_async_tcpstop(worker, ievent); break; + case netievent_tcpcancel: + isc__nm_async_tcpcancel(worker, ievent); + break; case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; + case netievent_tcpdnscancel: + isc__nm_async_tcpdnscancel(worker, ievent); + break; case netievent_tcpdnsclose: isc__nm_async_tcpdnsclose(worker, ievent); break; + case netievent_tcpdnsread: + isc__nm_async_tcpdnsread(worker, ievent); + break; case netievent_tcpdnsstop: isc__nm_async_tcpdnsstop(worker, ievent); break; @@ -934,6 +955,9 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock) { */ if (!atomic_load(&sock->closed)) { switch (sock->type) { + case isc_nm_udpsocket: + isc__nm_udp_close(sock); + return; case isc_nm_tcpsocket: isc__nm_tcp_close(sock); return; @@ -1471,9 +1495,15 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); switch (handle->sock->type) { + case isc_nm_udpsocket: + isc__nm_udp_read(handle, cb, cbarg); + break; case isc_nm_tcpsocket: isc__nm_tcp_read(handle, cb, cbarg); break; + case isc_nm_tcpdnssocket: + isc__nm_tcpdns_read(handle, cb, cbarg); + break; default: INSIST(0); ISC_UNREACHABLE(); @@ -1485,9 +1515,15 @@ isc_nm_cancelread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); switch (handle->sock->type) { + case isc_nm_udpsocket: + isc__nm_udp_cancelread(handle); + break; case isc_nm_tcpsocket: isc__nm_tcp_cancelread(handle); break; + case isc_nm_tcpdnssocket: + isc__nm_tcpdns_cancelread(handle); + break; default: INSIST(0); ISC_UNREACHABLE(); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 43d3f9fc9fb..ce95559d759 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -77,6 +77,22 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota); static void quota_accept_cb(isc_quota_t *quota, void *sock0); +static void +connecttimeout_cb(uv_timer_t *handle) { + isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle); + isc_nmsocket_t *sock = req->sock; + + if (req->cb.connect != NULL) { + req->cb.connect(NULL, ISC_R_TIMEDOUT, req->cbarg); + } + + uv_timer_stop(&sock->timer); + sock->timer_running = false; + sock->timed_out = true; + isc__nm_uvreq_put(&req, sock); + isc__nmsocket_detach(&sock); +} + static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; @@ -86,12 +102,24 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { worker = &sock->mgr->workers[isc_nm_tid()]; + atomic_store(&sock->connecting, true); + + if (!sock->timer_initialized) { + uv_timer_init(&worker->loop, &sock->timer); + uv_handle_set_data((uv_handle_t *)&sock->timer, req); + sock->timer_initialized = true; + } + + uv_timer_start(&sock->timer, connecttimeout_cb, sock->connect_timeout, + 0); + sock->timer_running = true; + r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); /* Socket was never opened; no need for tcp_close_direct() */ atomic_store(&sock->closed, true); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->connect_error, true); return (r); } @@ -101,7 +129,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->connect_error, true); tcp_close_direct(sock); return (r); @@ -114,11 +142,14 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECTFAIL]); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->connect_error, true); tcp_close_direct(sock); + return (r); } - return (r); + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); + + return (0); } void @@ -159,15 +190,30 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); - REQUIRE(VALID_UVREQ(req)); + atomic_store(&sock->connecting, false); + + if (sock->timed_out) { + return; + } + + uv_timer_stop(&sock->timer); + sock->timer_running = false; if (status != 0) { req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg); - isc__nm_uvreq_put(&req, sock); - isc__nmsocket_detach(&sock); + if (status != UV_ECANCELED) { + /* + * In this case the resources would already + * have been freed in isc__nm_tcp_shutdown(). + */ + isc__nm_uvreq_put(&req, sock); + isc__nmsocket_detach(&sock); + } return; } + REQUIRE(VALID_UVREQ(req)); + sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, @@ -180,8 +226,6 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { isc__nm_uvreq_put(&req, sock); - atomic_init(&sock->client, true); - /* * The sock is now attached to the handle. */ @@ -196,11 +240,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { isc_result_t isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, - isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) { + isc_nm_cb_t cb, void *cbarg, unsigned int timeout, + size_t extrahandlesize) { + isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *nsock = NULL, *tmp = NULL; isc__netievent_tcpconnect_t *ievent = NULL; isc__nm_uvreq_t *req = NULL; - isc_result_t result = ISC_R_SUCCESS; REQUIRE(VALID_NM(mgr)); REQUIRE(local != NULL); @@ -209,7 +254,9 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local); nsock->extrahandlesize = extrahandlesize; - nsock->result = ISC_R_SUCCESS; + nsock->connect_timeout = timeout; + atomic_init(&nsock->result, ISC_R_SUCCESS); + atomic_init(&nsock->client, true); req = isc__nm_uvreq_get(mgr, nsock); req->cb.connect = cb; @@ -245,9 +292,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, UNLOCK(&nsock->lock); } - if (nsock->result != ISC_R_SUCCESS) { - result = nsock->result; - } + result = atomic_load(&nsock->result); isc__nmsocket_detach(&tmp); @@ -266,11 +311,12 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface); + nsock->accept_cb = accept_cb; nsock->accept_cbarg = accept_cbarg; nsock->extrahandlesize = extrahandlesize; nsock->backlog = backlog; - nsock->result = ISC_R_SUCCESS; + atomic_init(&nsock->result, ISC_R_SUCCESS); if (quota != NULL) { /* * We don't attach to quota, just assign - to avoid @@ -300,11 +346,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, UNLOCK(&nsock->lock); } - if (nsock->result == ISC_R_SUCCESS) { + if (atomic_load(&nsock->result) == ISC_R_SUCCESS) { *sockp = nsock; return (ISC_R_SUCCESS); } else { - isc_result_t result = nsock->result; + isc_result_t result = atomic_load(&nsock->result); isc__nmsocket_detach(&nsock); return (result); } @@ -333,7 +379,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); /* The socket was never opened, so no need for uv_close() */ atomic_store(&sock->closed, true); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->listen_error, true); goto done; } @@ -364,7 +410,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); uv_close(&sock->uv_handle.handle, tcp_close_cb); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->listen_error, true); goto done; } @@ -379,7 +425,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { &snamelen); if (r != 0) { uv_close(&sock->uv_handle.handle, tcp_close_cb); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->listen_error, true); goto done; } @@ -396,7 +442,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { "uv_listen failed: %s", isc_result_totext(isc__nm_uverr2result(r))); uv_close(&sock->uv_handle.handle, tcp_close_cb); - sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->listen_error, true); goto done; } @@ -600,6 +646,7 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { if (sock->timer_initialized) { uv_timer_stop(&sock->timer); + sock->timer_running = false; } if (sock->quota) { @@ -652,6 +699,7 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { } REQUIRE(sock->tid == isc_nm_tid()); + sock->recv_cb = cb; sock->recv_cbarg = cbarg; @@ -719,6 +767,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) { } uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout, 0); + sock->timer_running = true; } } @@ -756,8 +805,9 @@ isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(worker->id == isc_nm_tid()); - if (sock->timer_initialized) { + if (sock->timer_running) { uv_timer_stop(&sock->timer); + sock->timer_running = false; } uv_read_stop(&sock->uv_handle.stream); } @@ -897,6 +947,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { isc__netievent_tcpchildaccept_t *event = NULL; isc__networker_t *worker = NULL; uv_tcp_t *uvstream = NULL; + isc_nmsocket_t *csock = NULL; isc_mem_t *mctx = NULL; int r, w; @@ -973,8 +1024,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { return (result); } - isc_nmsocket_t *csock = isc_mem_get(ssock->mgr->mctx, - sizeof(isc_nmsocket_t)); + csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t)); isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); csock->tid = w; csock->extrahandlesize = ssock->extrahandlesize; @@ -1148,12 +1198,14 @@ tcp_close_direct(isc_nmsocket_t *sock) { uv_read_stop((uv_stream_t *)&sock->uv_handle.handle); - if (sock->timer_initialized) { + if (sock->timer_running) { uv_timer_stop(&sock->timer); + sock->timer_running = false; } if (sock->timer_initialized) { sock->timer_initialized = false; + uv_handle_set_data((uv_handle_t *)&sock->timer, sock); uv_close((uv_handle_t *)&sock->timer, timer_close_cb); } else { uv_close(&sock->uv_handle.handle, tcp_close_cb); @@ -1202,7 +1254,24 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { return; } - if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) { + if (atomic_load(&sock->connecting)) { + isc__nm_uvreq_t *req = NULL; + + atomic_store(&sock->connecting, false); + req = uv_handle_get_data((uv_handle_t *)&sock->timer); + uv_timer_stop(&sock->timer); + sock->timer_running = false; + + isc__nmsocket_clearcb(sock); + if (sock->connect_cb != NULL) { + sock->connect_cb(NULL, ISC_R_CANCELED, + sock->connect_cbarg); + } + + isc__nm_uvreq_put(&req, sock); + isc__nmsocket_detach(&sock); + } else if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) + { failed_read_cb(sock, ISC_R_CANCELED); } } @@ -1210,16 +1279,36 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { void isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { isc_nmsocket_t *sock = NULL; + isc__netievent_tcpcancel_t *ievent = NULL; REQUIRE(VALID_NMHANDLE(handle)); sock = handle->sock; - REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpsocket); + + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpcancel); + ievent->sock = sock; + isc_nmhandle_attach(handle, &ievent->handle); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); +} + +void +isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc_nmhandle_t *handle = ievent->handle; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(worker->id == sock->tid); REQUIRE(sock->tid == isc_nm_tid()); + uv_read_stop(&sock->uv_handle.stream); + if (atomic_load(&sock->client)) { failed_read_cb(sock, ISC_R_EOF); } + + isc_nmhandle_detach(&handle); } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 63391273cb1..255c44fe2c3 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -262,7 +262,11 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, if (region == NULL || eresult != ISC_R_SUCCESS) { /* Connection closed */ - dnssock->result = eresult; + atomic_store(&dnssock->result, eresult); + if (atomic_load(&dnssock->client) && dnssock->recv_cb != NULL) { + dnssock->recv_cb(dnssock->statichandle, eresult, NULL, + dnssock->recv_cbarg); + } if (dnssock->self != NULL) { isc__nmsocket_detach(&dnssock->self); } @@ -270,7 +274,15 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, if (dnssock->outerhandle != NULL) { isc_nmhandle_detach(&dnssock->outerhandle); } - isc_nmhandle_detach(&handle); + + /* + * Server connections will hold two handle references when + * shut down, but client (tcpdnsconnect) connections have + * only one. + */ + if (!atomic_load(&dnssock->client)) { + isc_nmhandle_detach(&handle); + } return; } @@ -677,3 +689,204 @@ isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) { tcpdns_close_direct(ievent->sock); } + +typedef struct { + isc_mem_t *mctx; + isc_nm_cb_t cb; + void *cbarg; + size_t extrahandlesize; +} tcpconnect_t; + +static void +tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) { + tcpconnect_t *conn = (tcpconnect_t *)arg; + isc_nm_cb_t cb = conn->cb; + void *cbarg = conn->cbarg; + size_t extrahandlesize = conn->extrahandlesize; + isc_nmsocket_t *dnssock = NULL; + + REQUIRE(result != ISC_R_SUCCESS || VALID_NMHANDLE(handle)); + + isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); + + if (result != ISC_R_SUCCESS) { + cb(NULL, result, cbarg); + return; + } + + dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock)); + isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket, + handle->sock->iface); + + dnssock->extrahandlesize = extrahandlesize; + isc_nmhandle_attach(handle, &dnssock->outerhandle); + + dnssock->peer = handle->sock->peer; + dnssock->read_timeout = handle->sock->mgr->init; + dnssock->tid = isc_nm_tid(); + + atomic_init(&dnssock->client, true); + dnssock->statichandle = isc__nmhandle_get(dnssock, NULL, NULL); + + uv_timer_init(&dnssock->mgr->workers[isc_nm_tid()].loop, + &dnssock->timer); + dnssock->timer.data = dnssock; + dnssock->timer_initialized = true; + uv_timer_start(&dnssock->timer, dnstcp_readtimeout, + dnssock->read_timeout, 0); + + /* + * The connection is now established; we start reading immediately, + * before we've been asked to. We'll read and buffer at most one + * packet. + */ + isc_nm_read(handle, dnslisten_readcb, dnssock); + cb(dnssock->statichandle, ISC_R_SUCCESS, cbarg); + isc__nmsocket_detach(&dnssock); +} + +isc_result_t +isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, + isc_nm_cb_t cb, void *cbarg, unsigned int timeout, + size_t extrahandlesize) { + tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); + + *conn = (tcpconnect_t){ .cb = cb, + .cbarg = cbarg, + .extrahandlesize = extrahandlesize }; + isc_mem_attach(mgr->mctx, &conn->mctx); + return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn, + timeout, 0)); +} + +isc_result_t +isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { + isc_nmsocket_t *sock = handle->sock; + isc__netievent_tcpdnsread_t *ievent = NULL; + isc_nmhandle_t *eventhandle = NULL; + + REQUIRE(handle == sock->statichandle); + REQUIRE(sock->recv_cb == NULL); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(atomic_load(&sock->client)); + + /* + * This MUST be done asynchronously, no matter which thread we're + * in. The callback function for isc_nm_read() often calls + * isc_nm_read() again; if we tried to do that synchronously + * we'd clash in processbuffer() and grow the stack indefinitely. + */ + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnsread); + ievent->sock = sock; + + sock->recv_cb = cb; + sock->recv_cbarg = cbarg; + + /* + * Add a reference to the handle to keep it from being freed by + * the caller; it will be detached in in isc__nm_async_tcpdnsread(). + */ + isc_nmhandle_attach(handle, &eventhandle); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + return (ISC_R_SUCCESS); +} + +void +isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { + isc_result_t result; + isc__netievent_tcpdnsread_t *ievent = + (isc__netievent_tcpdnsclose_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc_nmhandle_t *handle = NULL, *newhandle = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(worker->id == sock->tid); + REQUIRE(sock->tid == isc_nm_tid()); + + handle = sock->statichandle; + + if (sock->type != isc_nm_tcpdnssocket || sock->outerhandle == NULL) { + if (sock->recv_cb != NULL) { + sock->recv_cb(handle, ISC_R_NOTCONNECTED, NULL, + sock->recv_cbarg); + } + isc_nmhandle_detach(&handle); + return; + } + + /* + * Maybe we have a packet already? + */ + result = processbuffer(sock, &newhandle); + if (result == ISC_R_SUCCESS) { + atomic_store(&sock->outerhandle->sock->processing, true); + if (sock->timer_initialized) { + uv_timer_stop(&sock->timer); + } + isc_nmhandle_detach(&newhandle); + } else if (sock->outerhandle != NULL) { + /* Restart reading, wait for the callback */ + atomic_store(&sock->outerhandle->sock->processing, false); + if (sock->timer_initialized) { + uv_timer_start(&sock->timer, dnstcp_readtimeout, + sock->read_timeout, 0); + } + isc_nm_resumeread(sock->outerhandle); + } else { + isc_nm_recv_cb_t cb = sock->recv_cb; + void *cbarg = sock->recv_cbarg; + + isc__nmsocket_clearcb(sock); + cb(handle, ISC_R_NOTCONNECTED, NULL, cbarg); + } + + isc_nmhandle_detach(&handle); +} + +void +isc__nm_tcpdns_cancelread(isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + isc__netievent_tcpdnscancel_t *ievent = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + + sock = handle->sock; + + REQUIRE(sock->type == isc_nm_tcpdnssocket); + + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnscancel); + ievent->sock = sock; + isc_nmhandle_attach(handle, &ievent->handle); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); +} + +void +isc__nm_async_tcpdnscancel(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpdnscancel_t *ievent = + (isc__netievent_tcpdnscancel_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc_nmhandle_t *handle = ievent->handle; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(worker->id == sock->tid); + REQUIRE(sock->tid == isc_nm_tid()); + + if (atomic_load(&sock->client)) { + isc_nm_recv_cb_t cb; + void *cbarg = NULL; + + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + isc__nmsocket_clearcb(sock); + + if (cb != NULL) { + cb(handle, ISC_R_EOF, NULL, cbarg); + } + + isc__nm_tcp_cancelread(sock->outerhandle); + } + + isc_nmhandle_detach(&handle); +} diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index d97a0b612a4..01974a53d15 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,12 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, static void udp_send_cb(uv_udp_send_t *req, int status); +static void +udp_close_cb(uv_handle_t *uvhandle); + +static void +udp_close_direct(isc_nmsocket_t *sock); + isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { @@ -336,6 +343,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, bool free_buf = true; isc_nm_recv_cb_t cb; void *cbarg; + bool connected; /* * Even though destruction of the socket can only happen from the @@ -371,12 +379,19 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); - nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); + connected = atomic_load(&sock->connected); + + if (!connected) { + nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); + } else { + nmhandle = sock->statichandle; + } region.base = (unsigned char *)buf->base; region.length = nrecv; INSIST(sock->tid == isc_nm_tid()); INSIST(sock->recv_cb != NULL); + cb = sock->recv_cb; cbarg = sock->recv_cbarg; @@ -395,7 +410,9 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * If the recv callback wants to hold on to the handle, * it needs to attach to it. */ - isc_nmhandle_detach(&nmhandle); + if (!connected) { + isc_nmhandle_detach(&nmhandle); + } } /* @@ -510,6 +527,12 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(worker->id == sock->tid); + if (!isc__nmsocket_active(ievent->sock)) { + uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg); + isc__nm_uvreq_put(&uvreq, sock); + return; + } + result = udp_send_direct(sock, uvreq, &ievent->peer); if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); @@ -543,7 +566,7 @@ udp_send_cb(uv_udp_send_t *req, int status) { static isc_result_t udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, isc_sockaddr_t *peer) { - const struct sockaddr *sa = NULL; + const struct sockaddr *sa = &peer->type.sa; int r; REQUIRE(VALID_NMSOCK(sock)); @@ -555,7 +578,17 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, return (ISC_R_CANCELED); } - sa = atomic_load(&sock->connected) ? NULL : &peer->type.sa; +#ifdef HAVE_UV_UDP_CONNECT + /* + * If we used uv_udp_connect() (and not the shim version for + * older versions of libuv), then the peer address has to be + * set to NULL or else uv_udp_send() could fail or assert, + * depending on the libuv version. + */ + if (atomic_load(&sock->connected)) { + sa = NULL; + } +#endif r = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp, &req->uvbuf, 1, sa, udp_send_cb); if (r < 0) { @@ -564,3 +597,410 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, return (ISC_R_SUCCESS); } + +static int +udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { + isc__networker_t *worker = NULL; + int uv_bind_flags = UV_UDP_REUSEADDR; + int r; + + REQUIRE(isc__nm_in_netthread()); + + worker = &sock->mgr->workers[isc_nm_tid()]; + + atomic_store(&sock->connecting, true); + + r = uv_udp_init(&worker->loop, &sock->uv_handle.udp); + if (r != 0) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); + /* Socket was never opened; no need for udp_close_direct() */ + atomic_store(&sock->closed, true); + atomic_store(&sock->result, isc__nm_uverr2result(r)); + atomic_store(&sock->connect_error, true); + return (r); + } + + r = uv_udp_open(&sock->uv_handle.udp, sock->fd); + if (r != 0) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); + atomic_store(&sock->closed, true); + atomic_store(&sock->connect_error, true); + atomic_store(&sock->result, isc__nm_uverr2result(r)); + return (r); + } + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); + + if (sock->iface->addr.type.sa.sa_family == AF_INET6) { + uv_bind_flags |= UV_UDP_IPV6ONLY; + } + + r = uv_udp_bind(&sock->uv_handle.udp, &sock->iface->addr.type.sa, + uv_bind_flags); + if (r != 0) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); + atomic_store(&sock->connect_error, true); + atomic_store(&sock->result, isc__nm_uverr2result(r)); + udp_close_direct(sock); + return (r); + } + + uv_handle_set_data(&sock->uv_handle.handle, sock); + + r = isc_uv_udp_connect(&sock->uv_handle.udp, &req->peer.type.sa); + if (r != 0) { + isc__nm_incstats(sock->mgr, + sock->statsindex[STATID_CONNECTFAIL]); + atomic_store(&sock->connect_error, true); + atomic_store(&sock->result, isc__nm_uverr2result(r)); + udp_close_direct(sock); + return (r); + } + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); + +#ifdef ISC_RECV_BUFFER_SIZE + uv_recv_buffer_size(&sock->uv_handle.handle, + &(int){ ISC_RECV_BUFFER_SIZE }); +#endif +#ifdef ISC_SEND_BUFFER_SIZE + uv_send_buffer_size(&sock->uv_handle.handle, + &(int){ ISC_SEND_BUFFER_SIZE }); +#endif + return (0); +} + +/* + * Asynchronous 'udpconnect' call handler: open a new UDP socket and call + * the 'open' callback with a handle. + */ +void +isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_udpconnect_t *ievent = + (isc__netievent_udpconnect_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *req = ievent->req; + isc_nmhandle_t *handle = NULL; + isc_nm_cb_t cb; + void *cbarg; + int r; + isc_result_t result; + + UNUSED(worker); + + REQUIRE(sock->type == isc_nm_udpsocket); + REQUIRE(sock->iface != NULL); + REQUIRE(sock->parent == NULL); + REQUIRE(sock->tid == isc_nm_tid()); + + cb = sock->connect_cb; + cbarg = sock->connect_cbarg; + + r = udp_connect_direct(sock, req); + if (r != 0) { + result = isc__nm_uverr2result(r); + } else { + atomic_store(&sock->connected, true); + atomic_store(&sock->result, ISC_R_SUCCESS); + result = atomic_load(&sock->result); + } + + handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); + cb(handle, result, cbarg); + + LOCK(&sock->lock); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); + + /* + * The sock is now attached to the handle. + */ + isc__nmsocket_detach(&sock); + + /* + * The connect callback should have attached to the handle. + * If it didn't, the socket will be closed now. + */ + isc_nmhandle_detach(&handle); +} + +isc_result_t +isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, + isc_nm_cb_t cb, void *cbarg, unsigned int timeout, + size_t extrahandlesize) { + isc_result_t result = ISC_R_SUCCESS; + isc_nmsocket_t *sock = NULL, *tmp = NULL; + isc__netievent_udpconnect_t *event = NULL; + isc__nm_uvreq_t *req = NULL; + sa_family_t sa_family; + + REQUIRE(VALID_NM(mgr)); + REQUIRE(local != NULL); + REQUIRE(peer != NULL); + + sa_family = peer->addr.type.sa.sa_family; + + sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); + isc__nmsocket_init(sock, mgr, isc_nm_udpsocket, local); + + INSIST(sock->connect_cb == NULL && sock->connect_cbarg == NULL); + sock->connect_cb = cb; + sock->connect_cbarg = cbarg; + sock->read_timeout = timeout; + sock->extrahandlesize = extrahandlesize; + sock->peer = peer->addr; + atomic_init(&sock->client, true); + + sock->fd = socket(sa_family, SOCK_DGRAM, 0); + RUNTIME_CHECK(sock->fd >= 0); + + result = isc__nm_socket_reuse(sock->fd); + RUNTIME_CHECK(result == ISC_R_SUCCESS || + result == ISC_R_NOTIMPLEMENTED); + + result = isc__nm_socket_reuse_lb(sock->fd); + RUNTIME_CHECK(result == ISC_R_SUCCESS || + result == ISC_R_NOTIMPLEMENTED); + + (void)isc__nm_socket_incoming_cpu(sock->fd); + + (void)isc__nm_socket_dontfrag(sock->fd, sa_family); + + req = isc__nm_uvreq_get(mgr, sock); + req->cb.connect = cb; + req->cbarg = cbarg; + req->peer = peer->addr; + req->local = local->addr; + + event = isc__nm_get_ievent(mgr, netievent_udpconnect); + event->sock = sock; + event->req = req; + + /* + * Hold an additional sock reference so async callbacks + * can't destroy it until we're ready. + */ + isc__nmsocket_attach(sock, &tmp); + + if (isc__nm_in_netthread()) { + sock->tid = isc_nm_tid(); + isc__nm_async_udpconnect(&mgr->workers[sock->tid], + (isc__netievent_t *)event); + isc__nm_put_ievent(mgr, event); + isc__nm_uvreq_put(&req, sock); + } else { + sock->tid = isc_random_uniform(mgr->nworkers); + isc__nm_enqueue_ievent(&mgr->workers[sock->tid], + (isc__netievent_t *)event); + + LOCK(&sock->lock); + while (!atomic_load(&sock->connected) && + !atomic_load(&sock->connect_error)) { + WAIT(&sock->cond, &sock->lock); + } + UNLOCK(&sock->lock); + isc__nm_uvreq_put(&req, sock); + } + + if (atomic_load(&sock->result) != ISC_R_SUCCESS) { + result = atomic_load(&sock->result); + } + + isc__nmsocket_detach(&tmp); + + return (result); +} + +static void +udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, + const struct sockaddr *addr, unsigned flags) { + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); + + udp_recv_cb(handle, nrecv, buf, addr, flags); + uv_udp_recv_stop(&sock->uv_handle.udp); +} + +static void +readtimeout_cb(uv_timer_t *handle) { + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); + isc_nm_recv_cb_t cb; + void *cbarg = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); + + /* + * Timeout; stop reading and process whatever we have. + */ + uv_udp_recv_stop(&sock->uv_handle.udp); + + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + isc__nmsocket_clearcb(sock); + + if (cb != NULL) { + cb(sock->statichandle, ISC_R_TIMEDOUT, NULL, cbarg); + } +} + +/* + * Asynchronous 'udpread' call handler: start or resume reading on a socket; + * pause reading and call the 'recv' callback after each datagram. + */ +void +isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(worker->id == isc_nm_tid()); + if (sock->read_timeout != 0) { + if (!sock->timer_initialized) { + uv_timer_init(&worker->loop, &sock->timer); + uv_handle_set_data((uv_handle_t *)&sock->timer, sock); + sock->timer_initialized = true; + } + uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout, + 0); + sock->timer_running = true; + } + + uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb); +} + +isc_result_t +isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { + isc_nmsocket_t *sock = NULL; + isc__netievent_startread_t *ievent = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + REQUIRE(handle->sock->type == isc_nm_udpsocket); + + sock = handle->sock; + + REQUIRE(sock->tid == isc_nm_tid()); + sock->recv_cb = cb; + sock->recv_cbarg = cbarg; + + ievent = isc__nm_get_ievent(sock->mgr, netievent_udpread); + ievent->sock = sock; + + if (sock->tid == isc_nm_tid()) { + isc__nm_async_udpread(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } + + return (ISC_R_SUCCESS); +} + +static void +udp_close_cb(uv_handle_t *uvhandle) { + isc_nmsocket_t *sock = uv_handle_get_data(uvhandle); + + REQUIRE(VALID_NMSOCK(sock)); + + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); +} + +static void +timer_close_cb(uv_handle_t *uvhandle) { + isc_nmsocket_t *sock = uv_handle_get_data(uvhandle); + + REQUIRE(VALID_NMSOCK(sock)); + + uv_close(&sock->uv_handle.handle, udp_close_cb); +} + +static void +udp_close_direct(isc_nmsocket_t *sock) { + uv_udp_recv_stop(&sock->uv_handle.udp); + + if (sock->timer_running) { + uv_timer_stop(&sock->timer); + sock->timer_running = false; + } + + if (sock->timer_initialized) { + sock->timer_initialized = false; + uv_handle_set_data((uv_handle_t *)&sock->timer, sock); + uv_close((uv_handle_t *)&sock->timer, timer_close_cb); + } else { + uv_close(&sock->uv_handle.handle, udp_close_cb); + } +} + +void +isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(worker->id == ievent->sock->tid); + + udp_close_direct(sock); +} + +void +isc__nm_udp_close(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_udpsocket); + + if (sock->tid == isc_nm_tid()) { + udp_close_direct(sock); + } else { + isc__netievent_udpclose_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_udpclose); + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_udp_cancelread(isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + isc__netievent_udpcancel_t *ievent = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + + sock = handle->sock; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_udpsocket); + + ievent = isc__nm_get_ievent(sock->mgr, netievent_udpcancel); + ievent->sock = sock; + isc_nmhandle_attach(handle, &ievent->handle); + + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); +} + +void +isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc_nmhandle_t *handle = ievent->handle; + + REQUIRE(worker->id == ievent->sock->tid); + + uv_udp_recv_stop(&sock->uv_handle.udp); + + if (atomic_load(&sock->client)) { + isc_nm_recv_cb_t cb; + void *cbarg = NULL; + + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + isc__nmsocket_clearcb(sock); + + if (cb != NULL) { + cb(handle, ISC_R_EOF, NULL, cbarg); + } + } + + isc_nmhandle_detach(&handle); +} diff --git a/lib/isc/netmgr/uv-compat.c b/lib/isc/netmgr/uv-compat.c index 8ad3b1a0d18..51964477e66 100644 --- a/lib/isc/netmgr/uv-compat.c +++ b/lib/isc/netmgr/uv-compat.c @@ -187,3 +187,35 @@ isc_uv_import(uv_stream_t *stream, isc_uv_stream_info_t *info) { #endif /* ifdef WIN32 */ #endif /* ifndef HAVE_UV_IMPORT */ + +#ifndef HAVE_UV_UDP_CONNECT +int +isc_uv_udp_connect(uv_udp_t *handle, const struct sockaddr *addr) { + int err = 0; + + do { + int addrlen = (addr->sa_family == AF_INET) + ? sizeof(struct sockaddr_in) + : sizeof(struct sockaddr_in6); +#ifdef WIN32 + err = connect(handle->socket, addr, addrlen); +#else /* WIN32 */ + err = connect(handle->io_watcher.fd, addr, addrlen); +#endif /* WIN32 */ + } while (err == -1 && errno == EINTR); + + if (err) { +#ifdef WIN32 + return (uv_translate_sys_error(err)); +#else /* WIN32 */ +#ifdef HAVE_UV_TRANSLATE_SYS_ERROR + return (uv_translate_sys_error(errno)); +#else + return (-errno); +#endif /* HAVE_UV_TRANSLATE_SYS_ERROR */ +#endif /* WIN32 */ + } + + return (0); +} +#endif /* ifndef HAVE_UV_UDP_CONNECT */ diff --git a/lib/isc/netmgr/uv-compat.h b/lib/isc/netmgr/uv-compat.h index d02ddd3e8e9..334924588ca 100644 --- a/lib/isc/netmgr/uv-compat.h +++ b/lib/isc/netmgr/uv-compat.h @@ -79,3 +79,19 @@ isc_uv_import(uv_stream_t *stream, isc_uv_stream_info_t *info); */ #endif + +#ifdef HAVE_UV_UDP_CONNECT +#define isc_uv_udp_connect uv_udp_connect +#else + +int +isc_uv_udp_connect(uv_udp_t *handle, const struct sockaddr *addr); +/*%< + * Associate the UDP handle to a remote address and port, so every message sent + * by this handle is automatically sent to that destination. + * + * NOTE: This is just a limited shim for uv_udp_connect() as it requires the + * handle to be bound. + */ + +#endif diff --git a/lib/isc/netmgr/uverr2result.c b/lib/isc/netmgr/uverr2result.c index f1c0a74bf92..eb9dbea9b6a 100644 --- a/lib/isc/netmgr/uverr2result.c +++ b/lib/isc/netmgr/uverr2result.c @@ -79,6 +79,8 @@ isc___nm_uverr2result(int uverr, bool dolog, const char *file, return (ISC_R_ADDRNOTAVAIL); case UV_ECONNREFUSED: return (ISC_R_CONNREFUSED); + case UV_ECANCELED: + return (ISC_R_CANCELED); default: if (dolog) { UNEXPECTED_ERROR(file, line, diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 6ce39f6fef2..9ee446ca391 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -459,11 +459,13 @@ isc_nm_setstats isc_nm_start isc_nm_stoplistening isc_nm_tcpconnect +isc_nm_tcpdnsconnect isc_nm_tcp_gettimeouts isc_nm_tcp_settimeouts isc_nm_tcpdns_keepalive isc_nm_tcpdns_sequential isc_nm_tid +isc_nm_udpconnect isc_nmsocket_close isc__nm_acquire_interlocked isc__nm_drop_interlocked