From: Witold Kręcicki Date: Thu, 28 Nov 2019 09:21:34 +0000 (+0100) Subject: netmgr: make tcp listening multithreaded. X-Git-Tag: v9.15.7~20^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bc5aae1579c773833ad2c3d17c0e42610de2822b;p=thirdparty%2Fbind9.git netmgr: make tcp listening multithreaded. When listening for TCP connections we create a socket, bind it and then pass it over IPC to all threads - which then listen on in and accept connections. This sounds broken, but it's the official way of dealing with multithreaded TCP listeners in libuv, and works on all platforms supported by libuv. --- diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 3acacc756bc..129f4001c40 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -114,7 +114,9 @@ typedef enum isc__netievent_type { netievent_tcpstartread, netievent_tcppauseread, netievent_tcplisten, + netievent_tcpchildlisten, netievent_tcpstoplisten, + netievent_tcpstopchildlisten, netievent_tcpclose, netievent_closecb, netievent_shutdown, @@ -159,6 +161,7 @@ typedef struct isc__nm_uvreq { isc_sockaddr_t peer; /* peer address */ isc__nm_cb_t cb; /* callback */ void * cbarg; /* callback argument */ + uv_pipe_t pipe; union { uv_req_t req; uv_getaddrinfo_t getaddrinfo; @@ -180,6 +183,7 @@ typedef struct isc__netievent__socket { typedef isc__netievent__socket_t isc__netievent_udplisten_t; typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t; typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t; +typedef isc__netievent__socket_t isc__netievent_tcpstopchildlisten_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; @@ -193,6 +197,7 @@ typedef struct isc__netievent__socket_req { typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; +typedef isc__netievent__socket_req_t isc__netievent_tcpchildlisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; typedef struct isc__netievent_udpsend { @@ -274,6 +279,7 @@ typedef enum isc_nmsocket_type { isc_nm_udplistener, /* Aggregate of nm_udpsocks */ isc_nm_tcpsocket, isc_nm_tcplistener, + isc_nm_tcpchildlistener, isc_nm_tcpdnslistener, isc_nm_tcpdnssocket } isc_nmsocket_type; @@ -322,6 +328,11 @@ struct isc_nmsocket { isc_nmiface_t *iface; isc_nmhandle_t *tcphandle; + /* used to send listening TCP sockets to children */ + uv_pipe_t ipc; + char ipc_pipe_name[32]; + atomic_int_fast32_t schildren; + /*% extra data allocated at the end of each isc_nmhandle_t */ size_t extrahandlesize; @@ -579,9 +590,14 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); void +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0); +void isc__nm_async_tcpstoplisten(isc__networker_t *worker, isc__netievent_t *ievent0); void +isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0); +void isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0); void isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 902fe2d1ccf..479d327469b 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -42,6 +42,12 @@ ISC_THREAD_LOCAL int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; +#ifdef WIN32 +#define NAMED_PIPE_PREFIX "\\\\.\\pipe\\named-ipc" +#else +#define NAMED_PIPE_PREFIX ".named-ipc" +#endif + static void nmsocket_maybe_destroy(isc_nmsocket_t *sock); static void @@ -497,6 +503,9 @@ async_cb(uv_async_t *handle) { case netievent_tcplisten: isc__nm_async_tcplisten(worker, ievent); break; + case netievent_tcpchildlisten: + isc__nm_async_tcpchildlisten(worker, ievent); + break; case netievent_tcpstartread: isc__nm_async_startread(worker, ievent); break; @@ -509,6 +518,9 @@ async_cb(uv_async_t *handle) { case netievent_tcpstoplisten: isc__nm_async_tcpstoplisten(worker, ievent); break; + case netievent_tcpstopchildlisten: + isc__nm_async_tcpstopchildlisten(worker, ievent); + break; case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; @@ -790,6 +802,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, sock->ah_handles[i] = NULL; } + /* + * XXXWPK Maybe it should be in tmp, maybe it should not + * be random? + */ + strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX); + for (int i=strlen(sock->ipc_pipe_name); i<31; i++) { + sock->ipc_pipe_name[i] = isc_random8()%24 + 'a'; + } + sock->ipc_pipe_name[31] = '\0'; + isc_mutex_init(&sock->lock); isc_condition_init(&sock->cond); isc_refcount_init(&sock->references, 1); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 75dcd3a7e27..c30db8fe995 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -50,6 +50,21 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); static void tcp_close_cb(uv_handle_t *uvhandle); +static void +ipc_connection_cb(uv_stream_t *stream, int status); +static void +ipc_write_cb(uv_write_t* uvreq, int status); +static void +parent_pipe_close_cb(uv_handle_t *handle); +static void +childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status); +static void +childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); +static void +stoplistening(isc_nmsocket_t *sock); +static void +tcp_listenclose_cb(uv_handle_t *handle); + static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker; @@ -71,7 +86,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (r); } } - + sock->uv_handle.tcp.data = sock; r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, &req->peer.type.sa, tcp_connect_cb); return (r); @@ -134,7 +149,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_quota_t *quota, isc_nmsocket_t **sockp) { - isc__netievent_tcplisten_t *ievent = NULL; isc_nmsocket_t *nsock = NULL; REQUIRE(VALID_NM(mgr)); @@ -142,6 +156,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener); nsock->iface = iface; + nsock->nchildren = mgr->nworkers; + atomic_init(&nsock->rchildren, mgr->nworkers); + nsock->children = isc_mem_get(mgr->mctx, + mgr->nworkers * sizeof(*nsock)); + memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock)); nsock->rcb.accept = cb; nsock->rcbarg = cbarg; nsock->extrahandlesize = extrahandlesize; @@ -156,18 +175,27 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock->tid = isc_random_uniform(mgr->nworkers); /* - * Listening to TCP is rare enough not to care about the - * added overhead from passing this to another thread. - */ - ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); + * Listening to TCP is rare enough not to care about the + * added overhead from passing this to another thread. + */ + isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], (isc__netievent_t *) ievent); + + *sockp = nsock; return (ISC_R_SUCCESS); } +/* + * For TCP listening we create a single socket, bind it, and then pass it + * to `ncpu` child sockets - the passing is done over IPC. + * XXXWPK This design pattern is ugly but it's "the way to do it" recommended + * by libuv documentation - which also mentions that there should be + * uv_export/uv_import functions which would simplify this greatly. + */ void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { isc__netievent_tcplisten_t *ievent = @@ -184,10 +212,56 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { } uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); - r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, - tcp_connection_cb); - if (r != 0) { - return; + sock->uv_handle.tcp.data = sock; + /* + * This is not properly documented in libuv, and the example + * (benchmark-multi-accept) is wrong: + * 'ipc' parameter must be '0' for 'listening' IPC socket, '1' + * only for the sockets are really passing the FDs between + * threads. This works without any problems on Unices, but + * breaks horribly on Windows. + */ + r = uv_pipe_init(&worker->loop, &sock->ipc, 0); + INSIST(r == 0); + sock->ipc.data = sock; + r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); + INSIST(r == 0); + r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, + ipc_connection_cb); + INSIST(r == 0); + + /* + * We launch n 'tcpchildlistener' that will receive + * sockets to be listened on over ipc. + */ + for (int i = 0; i < sock->nchildren; i++) { + isc__netievent_tcpchildlisten_t *event = NULL; + isc_nmsocket_t *csock = &sock->children[i]; + + isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); + csock->parent = sock; + csock->iface = sock->iface; + csock->tid = i; + csock->pquota = sock->pquota; + csock->backlog = sock->backlog; + csock->extrahandlesize = sock->extrahandlesize; + + INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); + csock->rcb.accept = sock->rcb.accept; + csock->rcbarg = sock->rcbarg; + csock->fd = -1; + + event = isc__nm_get_ievent(csock->mgr, + netievent_tcpchildlisten); + event->sock = csock; + if (csock->tid == isc_nm_tid()) { + isc__nm_async_tcpchildlisten(&sock->mgr->workers[i], + (isc__netievent_t *) event); + isc__nm_put_ievent(sock->mgr, event); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } } atomic_store(&sock->listening, true); @@ -195,6 +269,117 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { return; } +/* Parent got an IPC connection from child */ +static void +ipc_connection_cb(uv_stream_t *stream, int status) { + int r; + REQUIRE(status == 0); + isc_nmsocket_t *sock = stream->data; + isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()]; + isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock); + /* + * The buffer can be anything, it will be ignored, but it has to + * be something that won't disappear. + */ + nreq->uvbuf = uv_buf_init((char *)nreq, 1); + uv_pipe_init(&worker->loop, &nreq->pipe, 1); + nreq->pipe.data = nreq; + + /* Failure here is critical */ + r = uv_accept((uv_stream_t *) &sock->ipc, + (uv_stream_t*) &nreq->pipe); + INSIST(r == 0); + r = uv_write2(&nreq->uv_req.write, + (uv_stream_t*) &nreq->pipe, + &nreq->uvbuf, + 1, + (uv_stream_t*) &sock->uv_handle.stream, + ipc_write_cb); + INSIST(r == 0); +} + +static void +ipc_write_cb(uv_write_t* uvreq, int status) { + UNUSED(status); + isc__nm_uvreq_t *req = uvreq->data; + /* + * We want all children to get the socket. If we're done we can stop + * listening on the IPC socket. + */ + if (atomic_fetch_add(&req->sock->schildren, 1) == + req->sock->nchildren - 1) { + uv_close((uv_handle_t*) &req->sock->ipc, NULL); + } + uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb); +} + +static void +parent_pipe_close_cb(uv_handle_t *handle) { + isc__nm_uvreq_t *req = handle->data; + isc__nm_uvreq_put(&req, req->sock); +} + +void +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_tcplisten_t *ievent = + (isc__netievent_tcplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + int r; + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(sock->type == isc_nm_tcpchildlistener); + + r = uv_pipe_init(&worker->loop, &sock->ipc, 1); + INSIST(r == 0); + sock->ipc.data = sock; + isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock); + + uv_pipe_connect(&req->uv_req.connect, + &sock->ipc, + sock->parent->ipc_pipe_name, + childlisten_ipc_connect_cb); +} + +/* child connected to parent over IPC */ +static void +childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { + UNUSED(status); + isc__nm_uvreq_t *req = uvreq->data; + isc_nmsocket_t *sock = req->sock; + isc__nm_uvreq_put(&req, sock); + int r = uv_read_start((uv_stream_t*) &sock->ipc, + isc__nm_alloc_cb, + childlisten_read_cb); + INSIST(r == 0); +} + +/* child got the socket over IPC */ +static void +childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { + UNUSED(nread); + int r; + isc_nmsocket_t *sock = stream->data; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(buf != NULL); + uv_pipe_t* ipc = (uv_pipe_t*) stream; + uv_handle_type type = uv_pipe_pending_type(ipc); + INSIST(type == UV_TCP); + isc__nm_free_uvbuf(sock, buf); + isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp); + sock->uv_handle.tcp.data = sock; + uv_accept(stream, &sock->uv_handle.stream); + r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, + tcp_connection_cb); + uv_close((uv_handle_t*) ipc, NULL); + if (r != 0) { + /* XXX log it? */ + return; + } +} + + void isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { isc__netievent_tcpstoplisten_t *ievent = NULL; @@ -208,24 +393,71 @@ isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { (isc__netievent_t *) ievent); } +void +isc__nm_async_tcpstoplisten(isc__networker_t *worker, + isc__netievent_t *ievent0) +{ + isc__netievent_tcpstoplisten_t *ievent = + (isc__netievent_tcpstoplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcplistener); + + /* + * If network manager is interlocked, re-enqueue the event for later. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + isc__netievent_tcpstoplisten_t *event = NULL; + + event = isc__nm_get_ievent(sock->mgr, + netievent_tcpstoplisten); + event->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) event); + } else { + stoplistening(sock); + isc__nm_drop_interlocked(sock->mgr); + } +} + static void -stoplistening_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = handle->data; +stoplistening(isc_nmsocket_t *sock) { + for (int i = 0; i < sock->nchildren; i++) { + /* + * Stoplistening is a rare event, we can ignore the overhead + * caused by allocating an event, and doing it this way + * simplifies sock reference counting. + */ + isc__netievent_tcpstopchildlisten_t *event = NULL; + event = isc__nm_get_ievent(sock->mgr, + netievent_tcpstopchildlisten); + isc_nmsocket_attach(&sock->children[i], &event->sock); + + if (i == sock->tid) { + isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i], + (isc__netievent_t *) event); + isc__nm_put_ievent(sock->mgr, event); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } + } LOCK(&sock->lock); - atomic_store(&sock->listening, false); - atomic_store(&sock->closed, true); - SIGNAL(&sock->cond); + while (atomic_load_relaxed(&sock->rchildren) > 0) { + WAIT(&sock->cond, &sock->lock); + } UNLOCK(&sock->lock); - - sock->pquota = NULL; - - isc_nmsocket_detach(&sock); + uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb); } void -isc__nm_async_tcpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0) +isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0) { isc__netievent_tcpstoplisten_t *ievent = (isc__netievent_tcpstoplisten_t *) ievent0; @@ -233,11 +465,39 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, UNUSED(worker); - REQUIRE(isc__nm_in_netthread()); + REQUIRE(isc_nm_tid() == sock->tid); REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_tcplistener); + REQUIRE(sock->type == isc_nm_tcpchildlistener); + REQUIRE(sock->parent != NULL); + + /* + * rchildren is atomic but we still need to change it + * under a lock as the parent is waiting on conditional + * and without it we might deadlock. + */ + LOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); + UNLOCK(&sock->parent->lock); + + uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb); + BROADCAST(&sock->parent->cond); +} - uv_close(&sock->uv_handle.handle, stoplistening_cb); +/* + * This callback is used for closing child and parent listening sockets - + * that's why we need to choose the proper lock. + */ +static void +tcp_listenclose_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = handle->data; + isc_mutex_t * lock = (sock->parent != NULL) ? + &sock->parent->lock : &sock->lock; + LOCK(lock); + atomic_store(&sock->closed, true); + atomic_store(&sock->listening, false); + sock->pquota = NULL; + UNLOCK(lock); + isc_nmsocket_detach(&sock); } static void diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 0aae5577e0d..8abdd0c46f9 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -171,7 +171,7 @@ stoplistening(isc_nmsocket_t *sock) { INSIST(sock->type == isc_nm_udplistener); for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_udplisten_t *event = NULL; + isc__netievent_udpstoplisten_t *event = NULL; if (i == sock->tid) { stop_udp_child(&sock->children[i]);