static void
tcp_close_cb(uv_handle_t *uvhandle);
+static void
+ipc_connection_cb(uv_stream_t *stream, int status);
+static void
+ipc_write_cb(uv_write_t* uvreq, int status);
+static void
+parent_pipe_close_cb(uv_handle_t *handle);
+static void
+childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status);
+static void
+childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
+static void
+stoplistening(isc_nmsocket_t *sock);
+static void
+tcp_listenclose_cb(uv_handle_t *handle);
+
static int
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker;
return (r);
}
}
-
+ sock->uv_handle.tcp.data = sock;
r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
&req->peer.type.sa, tcp_connect_cb);
return (r);
isc_quota_t *quota,
isc_nmsocket_t **sockp)
{
- isc__netievent_tcplisten_t *ievent = NULL;
isc_nmsocket_t *nsock = NULL;
REQUIRE(VALID_NM(mgr));
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener);
nsock->iface = iface;
+ nsock->nchildren = mgr->nworkers;
+ atomic_init(&nsock->rchildren, mgr->nworkers);
+ nsock->children = isc_mem_get(mgr->mctx,
+ mgr->nworkers * sizeof(*nsock));
+ memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock));
nsock->rcb.accept = cb;
nsock->rcbarg = cbarg;
nsock->extrahandlesize = extrahandlesize;
nsock->tid = isc_random_uniform(mgr->nworkers);
/*
- * Listening to TCP is rare enough not to care about the
- * added overhead from passing this to another thread.
- */
- ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
+ * Listening to TCP is rare enough not to care about the
+ * added overhead from passing this to another thread.
+ */
+ isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
ievent->sock = nsock;
isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
(isc__netievent_t *) ievent);
+
+
*sockp = nsock;
return (ISC_R_SUCCESS);
}
+/*
+ * For TCP listening we create a single socket, bind it, and then pass it
+ * to `ncpu` child sockets - the passing is done over IPC.
+ * XXXWPK This design pattern is ugly but it's "the way to do it" recommended
+ * by libuv documentation - which also mentions that there should be
+ * uv_export/uv_import functions which would simplify this greatly.
+ */
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
isc__netievent_tcplisten_t *ievent =
}
uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0);
- r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
- tcp_connection_cb);
- if (r != 0) {
- return;
+ sock->uv_handle.tcp.data = sock;
+ /*
+ * This is not properly documented in libuv, and the example
+ * (benchmark-multi-accept) is wrong:
+ * 'ipc' parameter must be '0' for 'listening' IPC socket, '1'
+ * only for the sockets are really passing the FDs between
+ * threads. This works without any problems on Unices, but
+ * breaks horribly on Windows.
+ */
+ r = uv_pipe_init(&worker->loop, &sock->ipc, 0);
+ INSIST(r == 0);
+ sock->ipc.data = sock;
+ r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name);
+ INSIST(r == 0);
+ r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren,
+ ipc_connection_cb);
+ INSIST(r == 0);
+
+ /*
+ * We launch n 'tcpchildlistener' that will receive
+ * sockets to be listened on over ipc.
+ */
+ for (int i = 0; i < sock->nchildren; i++) {
+ isc__netievent_tcpchildlisten_t *event = NULL;
+ isc_nmsocket_t *csock = &sock->children[i];
+
+ isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener);
+ csock->parent = sock;
+ csock->iface = sock->iface;
+ csock->tid = i;
+ csock->pquota = sock->pquota;
+ csock->backlog = sock->backlog;
+ csock->extrahandlesize = sock->extrahandlesize;
+
+ INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
+ csock->rcb.accept = sock->rcb.accept;
+ csock->rcbarg = sock->rcbarg;
+ csock->fd = -1;
+
+ event = isc__nm_get_ievent(csock->mgr,
+ netievent_tcpchildlisten);
+ event->sock = csock;
+ if (csock->tid == isc_nm_tid()) {
+ isc__nm_async_tcpchildlisten(&sock->mgr->workers[i],
+ (isc__netievent_t *) event);
+ isc__nm_put_ievent(sock->mgr, event);
+ } else {
+ isc__nm_enqueue_ievent(&sock->mgr->workers[i],
+ (isc__netievent_t *) event);
+ }
}
atomic_store(&sock->listening, true);
return;
}
+/* Parent got an IPC connection from child */
+static void
+ipc_connection_cb(uv_stream_t *stream, int status) {
+ int r;
+ REQUIRE(status == 0);
+ isc_nmsocket_t *sock = stream->data;
+ isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()];
+ isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock);
+ /*
+ * The buffer can be anything, it will be ignored, but it has to
+ * be something that won't disappear.
+ */
+ nreq->uvbuf = uv_buf_init((char *)nreq, 1);
+ uv_pipe_init(&worker->loop, &nreq->pipe, 1);
+ nreq->pipe.data = nreq;
+
+ /* Failure here is critical */
+ r = uv_accept((uv_stream_t *) &sock->ipc,
+ (uv_stream_t*) &nreq->pipe);
+ INSIST(r == 0);
+ r = uv_write2(&nreq->uv_req.write,
+ (uv_stream_t*) &nreq->pipe,
+ &nreq->uvbuf,
+ 1,
+ (uv_stream_t*) &sock->uv_handle.stream,
+ ipc_write_cb);
+ INSIST(r == 0);
+}
+
+static void
+ipc_write_cb(uv_write_t* uvreq, int status) {
+ UNUSED(status);
+ isc__nm_uvreq_t *req = uvreq->data;
+ /*
+ * We want all children to get the socket. If we're done we can stop
+ * listening on the IPC socket.
+ */
+ if (atomic_fetch_add(&req->sock->schildren, 1) ==
+ req->sock->nchildren - 1) {
+ uv_close((uv_handle_t*) &req->sock->ipc, NULL);
+ }
+ uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb);
+}
+
+static void
+parent_pipe_close_cb(uv_handle_t *handle) {
+ isc__nm_uvreq_t *req = handle->data;
+ isc__nm_uvreq_put(&req, req->sock);
+}
+
+void
+isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
+ isc__netievent_tcplisten_t *ievent =
+ (isc__netievent_tcplisten_t *) ievent0;
+ isc_nmsocket_t *sock = ievent->sock;
+ int r;
+
+ REQUIRE(isc__nm_in_netthread());
+ REQUIRE(sock->type == isc_nm_tcpchildlistener);
+
+ r = uv_pipe_init(&worker->loop, &sock->ipc, 1);
+ INSIST(r == 0);
+ sock->ipc.data = sock;
+ isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock);
+
+ uv_pipe_connect(&req->uv_req.connect,
+ &sock->ipc,
+ sock->parent->ipc_pipe_name,
+ childlisten_ipc_connect_cb);
+}
+
+/* child connected to parent over IPC */
+static void
+childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) {
+ UNUSED(status);
+ isc__nm_uvreq_t *req = uvreq->data;
+ isc_nmsocket_t *sock = req->sock;
+ isc__nm_uvreq_put(&req, sock);
+ int r = uv_read_start((uv_stream_t*) &sock->ipc,
+ isc__nm_alloc_cb,
+ childlisten_read_cb);
+ INSIST(r == 0);
+}
+
+/* child got the socket over IPC */
+static void
+childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
+ UNUSED(nread);
+ int r;
+ isc_nmsocket_t *sock = stream->data;
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(buf != NULL);
+ uv_pipe_t* ipc = (uv_pipe_t*) stream;
+ uv_handle_type type = uv_pipe_pending_type(ipc);
+ INSIST(type == UV_TCP);
+ isc__nm_free_uvbuf(sock, buf);
+ isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()];
+ uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp);
+ sock->uv_handle.tcp.data = sock;
+ uv_accept(stream, &sock->uv_handle.stream);
+ r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
+ tcp_connection_cb);
+ uv_close((uv_handle_t*) ipc, NULL);
+ if (r != 0) {
+ /* XXX log it? */
+ return;
+ }
+}
+
+
void
isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) {
isc__netievent_tcpstoplisten_t *ievent = NULL;
(isc__netievent_t *) ievent);
}
+void
+isc__nm_async_tcpstoplisten(isc__networker_t *worker,
+ isc__netievent_t *ievent0)
+{
+ isc__netievent_tcpstoplisten_t *ievent =
+ (isc__netievent_tcpstoplisten_t *) ievent0;
+ isc_nmsocket_t *sock = ievent->sock;
+
+ UNUSED(worker);
+
+ REQUIRE(isc__nm_in_netthread());
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->type == isc_nm_tcplistener);
+
+ /*
+ * If network manager is interlocked, re-enqueue the event for later.
+ */
+ if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ isc__netievent_tcpstoplisten_t *event = NULL;
+
+ event = isc__nm_get_ievent(sock->mgr,
+ netievent_tcpstoplisten);
+ event->sock = sock;
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *) event);
+ } else {
+ stoplistening(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
+}
+
static void
-stoplistening_cb(uv_handle_t *handle) {
- isc_nmsocket_t *sock = handle->data;
+stoplistening(isc_nmsocket_t *sock) {
+ for (int i = 0; i < sock->nchildren; i++) {
+ /*
+ * Stoplistening is a rare event, we can ignore the overhead
+ * caused by allocating an event, and doing it this way
+ * simplifies sock reference counting.
+ */
+ isc__netievent_tcpstopchildlisten_t *event = NULL;
+ event = isc__nm_get_ievent(sock->mgr,
+ netievent_tcpstopchildlisten);
+ isc_nmsocket_attach(&sock->children[i], &event->sock);
+
+ if (i == sock->tid) {
+ isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i],
+ (isc__netievent_t *) event);
+ isc__nm_put_ievent(sock->mgr, event);
+ } else {
+ isc__nm_enqueue_ievent(&sock->mgr->workers[i],
+ (isc__netievent_t *) event);
+ }
+ }
LOCK(&sock->lock);
- atomic_store(&sock->listening, false);
- atomic_store(&sock->closed, true);
- SIGNAL(&sock->cond);
+ while (atomic_load_relaxed(&sock->rchildren) > 0) {
+ WAIT(&sock->cond, &sock->lock);
+ }
UNLOCK(&sock->lock);
-
- sock->pquota = NULL;
-
- isc_nmsocket_detach(&sock);
+ uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb);
}
void
-isc__nm_async_tcpstoplisten(isc__networker_t *worker,
- isc__netievent_t *ievent0)
+isc__nm_async_tcpstopchildlisten(isc__networker_t *worker,
+ isc__netievent_t *ievent0)
{
isc__netievent_tcpstoplisten_t *ievent =
(isc__netievent_tcpstoplisten_t *) ievent0;
UNUSED(worker);
- REQUIRE(isc__nm_in_netthread());
+ REQUIRE(isc_nm_tid() == sock->tid);
REQUIRE(VALID_NMSOCK(sock));
- REQUIRE(sock->type == isc_nm_tcplistener);
+ REQUIRE(sock->type == isc_nm_tcpchildlistener);
+ REQUIRE(sock->parent != NULL);
+
+ /*
+ * rchildren is atomic but we still need to change it
+ * under a lock as the parent is waiting on conditional
+ * and without it we might deadlock.
+ */
+ LOCK(&sock->parent->lock);
+ atomic_fetch_sub(&sock->parent->rchildren, 1);
+ UNLOCK(&sock->parent->lock);
+
+ uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb);
+ BROADCAST(&sock->parent->cond);
+}
- uv_close(&sock->uv_handle.handle, stoplistening_cb);
+/*
+ * This callback is used for closing child and parent listening sockets -
+ * that's why we need to choose the proper lock.
+ */
+static void
+tcp_listenclose_cb(uv_handle_t *handle) {
+ isc_nmsocket_t *sock = handle->data;
+ isc_mutex_t * lock = (sock->parent != NULL) ?
+ &sock->parent->lock : &sock->lock;
+ LOCK(lock);
+ atomic_store(&sock->closed, true);
+ atomic_store(&sock->listening, false);
+ sock->pquota = NULL;
+ UNLOCK(lock);
+ isc_nmsocket_detach(&sock);
}
static void