]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
netmgr: make tcp listening multithreaded.
authorWitold Kręcicki <wpk@isc.org>
Thu, 28 Nov 2019 09:21:34 +0000 (10:21 +0100)
committerEvan Hunt <each@isc.org>
Mon, 9 Dec 2019 19:15:27 +0000 (11:15 -0800)
When listening for TCP connections we create a socket, bind it
and then pass it over IPC to all threads - which then listen on
in and accept connections. This sounds broken, but it's the
official way of dealing with multithreaded TCP listeners in libuv,
and works on all platforms supported by libuv.

lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/udp.c

index 3acacc756bca9a8c4db32a3b4e5e14adfc72b9be..129f4001c40640ad56f60013969f4c7b105eebe9 100644 (file)
@@ -114,7 +114,9 @@ typedef enum isc__netievent_type {
        netievent_tcpstartread,
        netievent_tcppauseread,
        netievent_tcplisten,
+       netievent_tcpchildlisten,
        netievent_tcpstoplisten,
+       netievent_tcpstopchildlisten,
        netievent_tcpclose,
        netievent_closecb,
        netievent_shutdown,
@@ -159,6 +161,7 @@ typedef struct isc__nm_uvreq {
        isc_sockaddr_t          peer;   /* peer address */
        isc__nm_cb_t            cb;     /* callback */
        void *                  cbarg;  /* callback argument */
+       uv_pipe_t               pipe;
        union {
                uv_req_t                req;
                uv_getaddrinfo_t        getaddrinfo;
@@ -180,6 +183,7 @@ typedef struct isc__netievent__socket {
 typedef isc__netievent__socket_t isc__netievent_udplisten_t;
 typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t;
 typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t;
+typedef isc__netievent__socket_t isc__netievent_tcpstopchildlisten_t;
 typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
 typedef isc__netievent__socket_t isc__netievent_startread_t;
 typedef isc__netievent__socket_t isc__netievent_pauseread_t;
@@ -193,6 +197,7 @@ typedef struct isc__netievent__socket_req {
 
 typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
+typedef isc__netievent__socket_req_t isc__netievent_tcpchildlisten_t;
 typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
 
 typedef struct isc__netievent_udpsend {
@@ -274,6 +279,7 @@ typedef enum isc_nmsocket_type {
        isc_nm_udplistener, /* Aggregate of nm_udpsocks */
        isc_nm_tcpsocket,
        isc_nm_tcplistener,
+       isc_nm_tcpchildlistener,
        isc_nm_tcpdnslistener,
        isc_nm_tcpdnssocket
 } isc_nmsocket_type;
@@ -322,6 +328,11 @@ struct isc_nmsocket {
        isc_nmiface_t           *iface;
        isc_nmhandle_t          *tcphandle;
 
+       /* used to send listening TCP sockets to children */
+       uv_pipe_t               ipc;
+       char                    ipc_pipe_name[32];
+       atomic_int_fast32_t     schildren;
+
        /*% extra data allocated at the end of each isc_nmhandle_t */
        size_t                  extrahandlesize;
 
@@ -579,9 +590,14 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0);
 void
 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0);
 void
+isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0);
+void
 isc__nm_async_tcpstoplisten(isc__networker_t *worker,
                            isc__netievent_t *ievent0);
 void
+isc__nm_async_tcpstopchildlisten(isc__networker_t *worker,
+                                isc__netievent_t *ievent0);
+void
 isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0);
 void
 isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0);
index 902fe2d1ccf60ec3f79891761ee1f8774f7f42c6..479d327469b19b073c1a80e70fa277ee8af2c4b0 100644 (file)
 
 ISC_THREAD_LOCAL int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
 
+#ifdef WIN32
+#define NAMED_PIPE_PREFIX "\\\\.\\pipe\\named-ipc"
+#else
+#define NAMED_PIPE_PREFIX ".named-ipc"
+#endif
+
 static void
 nmsocket_maybe_destroy(isc_nmsocket_t *sock);
 static void
@@ -497,6 +503,9 @@ async_cb(uv_async_t *handle) {
                case netievent_tcplisten:
                        isc__nm_async_tcplisten(worker, ievent);
                        break;
+               case netievent_tcpchildlisten:
+                       isc__nm_async_tcpchildlisten(worker, ievent);
+                       break;
                case netievent_tcpstartread:
                        isc__nm_async_startread(worker, ievent);
                        break;
@@ -509,6 +518,9 @@ async_cb(uv_async_t *handle) {
                case netievent_tcpstoplisten:
                        isc__nm_async_tcpstoplisten(worker, ievent);
                        break;
+               case netievent_tcpstopchildlisten:
+                       isc__nm_async_tcpstopchildlisten(worker, ievent);
+                       break;
                case netievent_tcpclose:
                        isc__nm_async_tcpclose(worker, ievent);
                        break;
@@ -790,6 +802,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr,
                sock->ah_handles[i] = NULL;
        }
 
+       /*
+        * XXXWPK Maybe it should be in tmp, maybe it should not
+        * be random?
+        */
+       strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX);
+       for (int i=strlen(sock->ipc_pipe_name); i<31; i++) {
+               sock->ipc_pipe_name[i] = isc_random8()%24 + 'a';
+       }
+       sock->ipc_pipe_name[31] = '\0';
+
        isc_mutex_init(&sock->lock);
        isc_condition_init(&sock->cond);
        isc_refcount_init(&sock->references, 1);
index 75dcd3a7e27985977cf2247f08b21602d62b070f..c30db8fe99519289e96af82531bda543791d5813 100644 (file)
@@ -50,6 +50,21 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
 static void
 tcp_close_cb(uv_handle_t *uvhandle);
 
+static void
+ipc_connection_cb(uv_stream_t *stream, int status);
+static void
+ipc_write_cb(uv_write_t* uvreq, int status);
+static void
+parent_pipe_close_cb(uv_handle_t *handle);
+static void
+childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status);
+static void
+childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
+static void
+stoplistening(isc_nmsocket_t *sock);
+static void
+tcp_listenclose_cb(uv_handle_t *handle);
+
 static int
 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
        isc__networker_t *worker;
@@ -71,7 +86,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
                        return (r);
                }
        }
-
+       sock->uv_handle.tcp.data = sock;
        r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
                           &req->peer.type.sa, tcp_connect_cb);
        return (r);
@@ -134,7 +149,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
                 isc_quota_t *quota,
                 isc_nmsocket_t **sockp)
 {
-       isc__netievent_tcplisten_t *ievent = NULL;
        isc_nmsocket_t *nsock = NULL;
 
        REQUIRE(VALID_NM(mgr));
@@ -142,6 +156,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
        nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
        isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener);
        nsock->iface = iface;
+       nsock->nchildren = mgr->nworkers;
+       atomic_init(&nsock->rchildren, mgr->nworkers);
+       nsock->children = isc_mem_get(mgr->mctx,
+                                     mgr->nworkers * sizeof(*nsock));
+       memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock));
        nsock->rcb.accept = cb;
        nsock->rcbarg = cbarg;
        nsock->extrahandlesize = extrahandlesize;
@@ -156,18 +175,27 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
        nsock->tid = isc_random_uniform(mgr->nworkers);
 
        /*
-        * Listening to TCP is rare enough not to care about the
-        * added overhead from passing this to another thread.
-        */
-       ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
+       * Listening to TCP is rare enough not to care about the
+       * added overhead from passing this to another thread.
+       */
+       isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
        ievent->sock = nsock;
        isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
                               (isc__netievent_t *) ievent);
+
+
        *sockp = nsock;
 
        return (ISC_R_SUCCESS);
 }
 
+/*
+ * For TCP listening we create a single socket, bind it, and then pass it
+ * to `ncpu` child sockets - the passing is done over IPC.
+ * XXXWPK This design pattern is ugly but it's "the way to do it" recommended
+ * by libuv documentation - which also mentions that there should be
+ * uv_export/uv_import functions which would simplify this greatly.
+ */
 void
 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
        isc__netievent_tcplisten_t *ievent =
@@ -184,10 +212,56 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
        }
 
        uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0);
-       r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
-                     tcp_connection_cb);
-       if (r != 0) {
-               return;
+       sock->uv_handle.tcp.data = sock;
+       /*
+        * This is not properly documented in libuv, and the example
+        * (benchmark-multi-accept) is wrong:
+        * 'ipc' parameter must be '0' for 'listening' IPC socket, '1'
+        * only for the sockets are really passing the FDs between
+        * threads. This works without any problems on Unices, but
+        * breaks horribly on Windows.
+        */
+       r = uv_pipe_init(&worker->loop, &sock->ipc, 0);
+       INSIST(r == 0);
+       sock->ipc.data = sock;
+       r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name);
+       INSIST(r == 0);
+       r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren,
+                     ipc_connection_cb);
+       INSIST(r == 0);
+
+       /*
+        * We launch n 'tcpchildlistener' that will receive
+        * sockets to be listened on over ipc.
+        */
+       for (int i = 0; i < sock->nchildren; i++) {
+               isc__netievent_tcpchildlisten_t *event = NULL;
+               isc_nmsocket_t *csock = &sock->children[i];
+
+               isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener);
+               csock->parent = sock;
+               csock->iface = sock->iface;
+               csock->tid = i;
+               csock->pquota = sock->pquota;
+               csock->backlog = sock->backlog;
+               csock->extrahandlesize = sock->extrahandlesize;
+
+               INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
+               csock->rcb.accept = sock->rcb.accept;
+               csock->rcbarg = sock->rcbarg;
+               csock->fd = -1;
+
+               event = isc__nm_get_ievent(csock->mgr,
+                                          netievent_tcpchildlisten);
+               event->sock = csock;
+               if (csock->tid == isc_nm_tid()) {
+                       isc__nm_async_tcpchildlisten(&sock->mgr->workers[i],
+                                                 (isc__netievent_t *) event);
+                       isc__nm_put_ievent(sock->mgr, event);
+               } else {
+                       isc__nm_enqueue_ievent(&sock->mgr->workers[i],
+                                              (isc__netievent_t *) event);
+               }
        }
 
        atomic_store(&sock->listening, true);
@@ -195,6 +269,117 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
        return;
 }
 
+/* Parent got an IPC connection from child */
+static void
+ipc_connection_cb(uv_stream_t *stream, int status) {
+       int r;
+       REQUIRE(status == 0);
+       isc_nmsocket_t *sock = stream->data;
+       isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()];
+       isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock);
+       /*
+        * The buffer can be anything, it will be ignored, but it has to
+        * be something that won't disappear.
+        */
+       nreq->uvbuf = uv_buf_init((char *)nreq, 1);
+       uv_pipe_init(&worker->loop, &nreq->pipe, 1);
+       nreq->pipe.data = nreq;
+
+       /* Failure here is critical */
+       r = uv_accept((uv_stream_t *) &sock->ipc,
+                     (uv_stream_t*) &nreq->pipe);
+       INSIST(r == 0);
+       r = uv_write2(&nreq->uv_req.write,
+                     (uv_stream_t*) &nreq->pipe,
+                     &nreq->uvbuf,
+                     1,
+                     (uv_stream_t*) &sock->uv_handle.stream,
+                     ipc_write_cb);
+       INSIST(r == 0);
+}
+
+static void
+ipc_write_cb(uv_write_t* uvreq, int status) {
+       UNUSED(status);
+       isc__nm_uvreq_t *req = uvreq->data;
+       /*
+        * We want all children to get the socket. If we're done we can stop
+        * listening on the IPC socket.
+        */
+       if (atomic_fetch_add(&req->sock->schildren, 1) ==
+           req->sock->nchildren - 1) {
+               uv_close((uv_handle_t*) &req->sock->ipc, NULL);
+       }
+       uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb);
+}
+
+static void
+parent_pipe_close_cb(uv_handle_t *handle) {
+       isc__nm_uvreq_t *req = handle->data;
+       isc__nm_uvreq_put(&req, req->sock);
+}
+
+void
+isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
+       isc__netievent_tcplisten_t *ievent =
+               (isc__netievent_tcplisten_t *) ievent0;
+       isc_nmsocket_t *sock = ievent->sock;
+       int r;
+
+       REQUIRE(isc__nm_in_netthread());
+       REQUIRE(sock->type == isc_nm_tcpchildlistener);
+
+       r = uv_pipe_init(&worker->loop, &sock->ipc, 1);
+       INSIST(r == 0);
+       sock->ipc.data = sock;
+       isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock);
+
+       uv_pipe_connect(&req->uv_req.connect,
+                       &sock->ipc,
+                       sock->parent->ipc_pipe_name,
+                       childlisten_ipc_connect_cb);
+}
+
+/* child connected to parent over IPC */
+static void
+childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) {
+       UNUSED(status);
+       isc__nm_uvreq_t *req = uvreq->data;
+       isc_nmsocket_t *sock = req->sock;
+       isc__nm_uvreq_put(&req, sock);
+       int r = uv_read_start((uv_stream_t*) &sock->ipc,
+                             isc__nm_alloc_cb,
+                             childlisten_read_cb);
+       INSIST(r == 0);
+}
+
+/* child got the socket over IPC */
+static void
+childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
+       UNUSED(nread);
+       int r;
+       isc_nmsocket_t *sock = stream->data;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(buf != NULL);
+       uv_pipe_t* ipc = (uv_pipe_t*) stream;
+       uv_handle_type type = uv_pipe_pending_type(ipc);
+       INSIST(type == UV_TCP);
+       isc__nm_free_uvbuf(sock, buf);
+       isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()];
+       uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp);
+       sock->uv_handle.tcp.data = sock;
+       uv_accept(stream, &sock->uv_handle.stream);
+       r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
+                     tcp_connection_cb);
+       uv_close((uv_handle_t*) ipc, NULL);
+       if (r != 0) {
+               /* XXX log it? */
+               return;
+       }
+}
+
+
 void
 isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) {
        isc__netievent_tcpstoplisten_t *ievent = NULL;
@@ -208,24 +393,71 @@ isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) {
                               (isc__netievent_t *) ievent);
 }
 
+void
+isc__nm_async_tcpstoplisten(isc__networker_t *worker,
+                           isc__netievent_t *ievent0)
+{
+       isc__netievent_tcpstoplisten_t *ievent =
+               (isc__netievent_tcpstoplisten_t *) ievent0;
+       isc_nmsocket_t *sock = ievent->sock;
+
+       UNUSED(worker);
+
+       REQUIRE(isc__nm_in_netthread());
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->type == isc_nm_tcplistener);
+
+       /*
+        * If network manager is interlocked, re-enqueue the event for later.
+        */
+       if (!isc__nm_acquire_interlocked(sock->mgr)) {
+               isc__netievent_tcpstoplisten_t *event = NULL;
+
+               event = isc__nm_get_ievent(sock->mgr,
+                                          netievent_tcpstoplisten);
+               event->sock = sock;
+               isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *) event);
+       } else {
+               stoplistening(sock);
+               isc__nm_drop_interlocked(sock->mgr);
+       }
+}
+
 static void
-stoplistening_cb(uv_handle_t *handle) {
-       isc_nmsocket_t *sock = handle->data;
+stoplistening(isc_nmsocket_t *sock) {
+       for (int i = 0; i < sock->nchildren; i++) {
+               /*
+                * Stoplistening is a rare event, we can ignore the overhead
+                * caused by allocating an event, and doing it this way
+                * simplifies sock reference counting.
+                */
+               isc__netievent_tcpstopchildlisten_t *event = NULL;
+               event = isc__nm_get_ievent(sock->mgr,
+                                          netievent_tcpstopchildlisten);
+               isc_nmsocket_attach(&sock->children[i], &event->sock);
+
+               if (i == sock->tid) {
+                       isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i],
+                                                        (isc__netievent_t *) event);
+                       isc__nm_put_ievent(sock->mgr, event);
+               } else {
+                       isc__nm_enqueue_ievent(&sock->mgr->workers[i],
+                                              (isc__netievent_t *) event);
+               }
+       }
 
        LOCK(&sock->lock);
-       atomic_store(&sock->listening, false);
-       atomic_store(&sock->closed, true);
-       SIGNAL(&sock->cond);
+       while (atomic_load_relaxed(&sock->rchildren) > 0) {
+               WAIT(&sock->cond, &sock->lock);
+       }
        UNLOCK(&sock->lock);
-
-       sock->pquota = NULL;
-
-       isc_nmsocket_detach(&sock);
+       uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb);
 }
 
 void
-isc__nm_async_tcpstoplisten(isc__networker_t *worker,
-                           isc__netievent_t *ievent0)
+isc__nm_async_tcpstopchildlisten(isc__networker_t *worker,
+                                isc__netievent_t *ievent0)
 {
        isc__netievent_tcpstoplisten_t *ievent =
                (isc__netievent_tcpstoplisten_t *) ievent0;
@@ -233,11 +465,39 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker,
 
        UNUSED(worker);
 
-       REQUIRE(isc__nm_in_netthread());
+       REQUIRE(isc_nm_tid() == sock->tid);
        REQUIRE(VALID_NMSOCK(sock));
-       REQUIRE(sock->type == isc_nm_tcplistener);
+       REQUIRE(sock->type == isc_nm_tcpchildlistener);
+       REQUIRE(sock->parent != NULL);
+
+       /*
+        * rchildren is atomic but we still need to change it
+        * under a lock as the parent is waiting on conditional
+        * and without it we might deadlock.
+        */
+       LOCK(&sock->parent->lock);
+       atomic_fetch_sub(&sock->parent->rchildren, 1);
+       UNLOCK(&sock->parent->lock);
+
+       uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb);
+       BROADCAST(&sock->parent->cond);
+}
 
-       uv_close(&sock->uv_handle.handle, stoplistening_cb);
+/*
+ * This callback is used for closing child and parent listening sockets -
+ * that's why we need to choose the proper lock.
+ */
+static void
+tcp_listenclose_cb(uv_handle_t *handle) {
+       isc_nmsocket_t *sock = handle->data;
+       isc_mutex_t * lock = (sock->parent != NULL) ?
+                             &sock->parent->lock : &sock->lock;
+       LOCK(lock);
+       atomic_store(&sock->closed, true);
+       atomic_store(&sock->listening, false);
+       sock->pquota = NULL;
+       UNLOCK(lock);
+       isc_nmsocket_detach(&sock);
 }
 
 static void
index 0aae5577e0decdf59ab4c9ef804064ae7f4663ab..8abdd0c46f9c14d2bc17b636b2e935918a1a6c1e 100644 (file)
@@ -171,7 +171,7 @@ stoplistening(isc_nmsocket_t *sock) {
        INSIST(sock->type == isc_nm_udplistener);
 
        for (int i = 0; i < sock->nchildren; i++) {
-               isc__netievent_udplisten_t *event = NULL;
+               isc__netievent_udpstoplisten_t *event = NULL;
 
                if (i == sock->tid) {
                        stop_udp_child(&sock->children[i]);