netievent_udpstoplisten,
netievent_tcpstoplisten,
netievent_tcpclose,
- netievent_prio = 0xff,
+ netievent_prio = 0xff, /* event type values higher than this
+ * will be treated as high-priority
+ * events, which can be processed
+ * while the netmgr is paused.
+ */
netievent_udplisten,
netievent_tcplisten,
} isc__netievent_type;
isc_nm_t *mgr;
isc_nmsocket_t *parent;
- /*
+ /*%
* quota is the TCP client, attached when a TCP connection
* is established. pquota is a non-attached pointer to the
* TCP client quota, stored in listening sockets but only
isc_quota_t *pquota;
bool overquota;
- /*
+ /*%
* TCP read timeout timer.
*/
uv_timer_t timer;
/*% server socket for connections */
isc_nmsocket_t *server;
- /*% children sockets for multi-socket setups */
+ /*% Child sockets for multi-socket setups */
isc_nmsocket_t *children;
int nchildren;
isc_nmiface_t *iface;
isc_nmhandle_t *tcphandle;
- /* used to send listening TCP sockets to children */
+ /*% Used to transfer listening TCP sockets to children */
uv_pipe_t ipc;
char ipc_pipe_name[32];
atomic_int_fast32_t schildren;
- /*% extra data allocated at the end of each isc_nmhandle_t */
+ /*% Extra data allocated at the end of each isc_nmhandle_t */
size_t extrahandlesize;
/*% TCP backlog */
uv_os_sock_t fd;
union uv_any_handle uv_handle;
+ /*% Peer address */
isc_sockaddr_t peer;
/* Atomic */
- /*% Number of running (e.g. listening) children sockets */
+ /*% Number of running (e.g. listening) child sockets */
atomic_int_fast32_t rchildren;
/*%
- * Socket if active if it's listening, working, etc., if we're
- * closing a socket it doesn't make any sense to e.g. still
- * push handles or reqs for reuse
+ * Socket is active if it's listening, working, etc. If it's
+ * closing, then it doesn't make a sense, for example, to
+ * push handles or reqs for reuse.
*/
atomic_bool active;
atomic_bool destroying;
/*%
* Socket is closed if it's not active and all the possible
* callbacks were fired, there are no active handles, etc.
- * active==false, closed==false means the socket is closing.
+ * If active==false but closed==false, that means the socket
+ * is closing.
*/
atomic_bool closed;
atomic_bool listening;
isc_astack_t *inactivehandles;
isc_astack_t *inactivereqs;
- /*
- * Used to wait for listening event to be done and active/rchildren
- * during shutdown.
+ /*%
+ * Used to wait for TCP listening events to complete, and
+ * for the number of running children to reach zero during
+ * shutdown.
*/
isc_mutex_t lock;
isc_condition_t cond;
+ /*%
+ * Used to pass a result back from TCP listening events.
+ */
isc_result_t result;
/*%
size_t *ah_frees;
isc_nmhandle_t **ah_handles;
- /* Buffer for TCPDNS processing, optional */
+ /*% Buffer for TCPDNS processing */
size_t buf_size;
size_t buf_len;
unsigned char *buf;
- /*
+ /*%
* This function will be called with handle->sock
* as the argument whenever a handle's references drop
* to zero, after its reset callback has been called.
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0);
void
-isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0);
+isc__nm_async_tcpchildlisten(isc__networker_t *worker,
+ isc__netievent_t *ievent0);
void
isc__nm_async_tcpstoplisten(isc__networker_t *worker,
isc__netievent_t *ievent0);
static void
ipc_connection_cb(uv_stream_t *stream, int status);
static void
-ipc_write_cb(uv_write_t* uvreq, int status);
+ipc_write_cb(uv_write_t *uvreq, int status);
static void
parent_pipe_close_cb(uv_handle_t *handle);
static void
static int
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
- isc__networker_t *worker;
+ isc__networker_t *worker = NULL;
int r;
REQUIRE(isc__nm_in_netthread());
static void
tcp_connect_cb(uv_connect_t *uvreq, int status) {
isc__nm_uvreq_t *req = (isc__nm_uvreq_t *) uvreq->data;
- isc_nmsocket_t *sock;
+ isc_nmsocket_t *sock = NULL;
sock = uv_handle_get_data((uv_handle_t *) uvreq->handle);
REQUIRE(VALID_UVREQ(req));
isc_nmsocket_t **sockp)
{
isc_nmsocket_t *nsock = NULL;
+ isc__netievent_tcplisten_t *ievent = NULL;
REQUIRE(VALID_NM(mgr));
nsock->pquota = quota;
}
- isc__netievent_tcplisten_t *ievent;
ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
ievent->sock = nsock;
if (isc__nm_in_netthread()) {
}
/*
- * For TCP listening we create a single socket, bind it, and then pass it
- * to `ncpu` child sockets - the passing is done over IPC.
+ * For TCP listening, we create a single socket, bind it, and then
+ * pass it to `ncpu` child sockets - the passing is done over IPC.
+ *
* XXXWPK This design pattern is ugly but it's "the way to do it" recommended
* by libuv documentation - which also mentions that there should be
- * uv_export/uv_import functions which would simplify this greatly.
+ * uv_export/uv_import functions, which would simplify this greatly.
*/
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
/* It was never opened */
atomic_store(&sock->closed, true);
sock->result = isc__nm_uverr2result(r);
- goto fini;
+ goto done;
}
r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0);
if (r != 0) {
uv_close(&sock->uv_handle.handle, tcp_close_cb);
sock->result = isc__nm_uverr2result(r);
- goto fini;
+ goto done;
}
uv_handle_set_data(&sock->uv_handle.handle, sock);
+
/*
* This is not properly documented in libuv, and the example
* (benchmark-multi-accept) is wrong:
+ *
* 'ipc' parameter must be '0' for 'listening' IPC socket, '1'
* only for the sockets are really passing the FDs between
* threads. This works without any problems on Unices, but
*/
r = uv_pipe_init(&worker->loop, &sock->ipc, 0);
INSIST(r == 0);
+
uv_handle_set_data((uv_handle_t *)&sock->ipc, sock);
r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name);
INSIST(r == 0);
+
r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren,
ipc_connection_cb);
INSIST(r == 0);
*/
for (int i = 0; i < sock->nchildren; i++) {
isc_nmsocket_t *csock = &sock->children[i];
+ isc__netievent_tcpchildlisten_t *event = NULL;
- isc__netievent_tcpchildlisten_t *event;
event = isc__nm_get_ievent(csock->mgr,
netievent_tcpchildlisten);
event->sock = csock;
atomic_store(&sock->listening, true);
-fini:
+ done:
LOCK(&sock->lock);
SIGNAL(&sock->cond);
UNLOCK(&sock->lock);
return;
}
-/* Parent got an IPC connection from child */
+/*
+ * Parent received an IPC connection from child
+ */
static void
ipc_connection_cb(uv_stream_t *stream, int status) {
- int r;
- REQUIRE(status == 0);
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream);
isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()];
isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock);
+ int r;
+
+ REQUIRE(status == 0);
+
/*
* The buffer can be anything, it will be ignored, but it has to
* be something that won't disappear.
/* Failure here is critical */
r = uv_accept((uv_stream_t *) &sock->ipc,
- (uv_stream_t*) &nreq->pipe);
+ (uv_stream_t *) &nreq->pipe);
INSIST(r == 0);
+
r = uv_write2(&nreq->uv_req.write,
- (uv_stream_t*) &nreq->pipe,
- &nreq->uvbuf,
- 1,
- (uv_stream_t*) &sock->uv_handle.stream,
+ (uv_stream_t *) &nreq->pipe, &nreq->uvbuf, 1,
+ (uv_stream_t *) &sock->uv_handle.stream,
ipc_write_cb);
INSIST(r == 0);
}
static void
-ipc_write_cb(uv_write_t* uvreq, int status) {
- UNUSED(status);
+ipc_write_cb(uv_write_t *uvreq, int status) {
isc__nm_uvreq_t *req = uvreq->data;
+
+ UNUSED(status);
+
/*
- * We want all children to get the socket. If we're done we can stop
- * listening on the IPC socket.
+ * We want all children to get the socket. If we're done, we
+ * can stop listening on the IPC socket.
*/
if (atomic_fetch_add(&req->sock->schildren, 1) ==
- req->sock->nchildren - 1) {
- uv_close((uv_handle_t*) &req->sock->ipc, NULL);
+ req->sock->nchildren - 1)
+ {
+ uv_close((uv_handle_t *) &req->sock->ipc, NULL);
}
- uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb);
+ uv_close((uv_handle_t *) &req->pipe, parent_pipe_close_cb);
}
static void
}
void
-isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
+isc__nm_async_tcpchildlisten(isc__networker_t *worker,
+ isc__netievent_t *ievent0)
+{
isc__netievent_tcplisten_t *ievent =
(isc__netievent_tcplisten_t *) ievent0;
isc_nmsocket_t *sock = ievent->sock;
+ isc__nm_uvreq_t *req = NULL;
int r;
REQUIRE(isc__nm_in_netthread());
r = uv_pipe_init(&worker->loop, &sock->ipc, 1);
INSIST(r == 0);
+
uv_handle_set_data((uv_handle_t *) &sock->ipc, sock);
- isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock);
- uv_pipe_connect(&req->uv_req.connect,
- &sock->ipc,
+ req = isc__nm_uvreq_get(sock->mgr, sock);
+ uv_pipe_connect(&req->uv_req.connect, &sock->ipc,
sock->parent->ipc_pipe_name,
childlisten_ipc_connect_cb);
}
-/* child connected to parent over IPC */
+/* Child connected to parent over IPC */
static void
childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) {
- UNUSED(status);
isc__nm_uvreq_t *req = uvreq->data;
isc_nmsocket_t *sock = req->sock;
+ int r;
+
+ UNUSED(status);
+
isc__nm_uvreq_put(&req, sock);
- int r = uv_read_start((uv_stream_t*) &sock->ipc,
- isc__nm_alloc_cb,
- childlisten_read_cb);
+
+ r = uv_read_start((uv_stream_t *) &sock->ipc, isc__nm_alloc_cb,
+ childlisten_read_cb);
INSIST(r == 0);
}
-/* child got the socket over IPC */
+/* Child received the socket via IPC */
static void
childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
- UNUSED(nread);
- int r;
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream);
+ isc__networker_t *worker = NULL;
+ uv_pipe_t *ipc = NULL;
+ uv_handle_type type;
+ int r;
+
+ UNUSED(nread);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(buf != NULL);
- uv_pipe_t* ipc = (uv_pipe_t*) stream;
- uv_handle_type type = uv_pipe_pending_type(ipc);
+
+ ipc = (uv_pipe_t *) stream;
+ type = uv_pipe_pending_type(ipc);
INSIST(type == UV_TCP);
+
isc__nm_free_uvbuf(sock, buf);
- isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()];
- uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp);
+ worker = &sock->mgr->workers[isc_nm_tid()];
+ uv_tcp_init(&worker->loop, (uv_tcp_t *) &sock->uv_handle.tcp);
uv_handle_set_data(&sock->uv_handle.handle, sock);
+
uv_accept(stream, &sock->uv_handle.stream);
r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
tcp_connection_cb);
- uv_close((uv_handle_t*) ipc, NULL);
+ uv_close((uv_handle_t *) ipc, NULL);
if (r != 0) {
/* XXX log it? */
return;
static void
stoplistening(isc_nmsocket_t *sock) {
for (int i = 0; i < sock->nchildren; i++) {
+ isc__netievent_tcpstopchildlisten_t *event = NULL;
+
/*
- * Stoplistening is a rare event, we can ignore the overhead
- * caused by allocating an event, and doing it this way
+ * We can ignore the overhead of event allocation because
+ * stoplistening is a rare event, and doing it this way
* simplifies sock reference counting.
*/
- isc__netievent_tcpstopchildlisten_t *event = NULL;
event = isc__nm_get_ievent(sock->mgr,
netievent_tcpstopchildlisten);
isc_nmsocket_attach(&sock->children[i], &event->sock);
if (i == sock->tid) {
- isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i],
- (isc__netievent_t *) event);
+ isc__nm_async_tcpstopchildlisten(
+ &sock->mgr->workers[i],
+ (isc__netievent_t *) event);
isc__nm_put_ievent(sock->mgr, event);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
REQUIRE(sock->parent != NULL);
/*
- * rchildren is atomic but we still need to change it
- * under a lock as the parent is waiting on conditional
+ * rchildren is atomic, but we still need to change it
+ * under a lock because the parent is waiting on conditional
* and without it we might deadlock.
*/
LOCK(&sock->parent->lock);
}
/*
- * This callback is used for closing child and parent listening sockets -
- * that's why we need to choose the proper lock.
+ * This callback is used for closing both child and parent listening
+ * sockets; that's why we need to choose the proper lock.
*/
static void
tcp_listenclose_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
- isc_mutex_t * lock = (sock->parent != NULL) ?
- &sock->parent->lock : &sock->lock;
+ isc_mutex_t *lock = ((sock->parent != NULL)
+ ? &sock->parent->lock
+ : &sock->lock);
+
LOCK(lock);
atomic_store(&sock->closed, true);
atomic_store(&sock->listening, false);
sock->pquota = NULL;
UNLOCK(lock);
+
isc_nmsocket_detach(&sock);
}
static void
read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
- isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t*) stream);
+ isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(buf != NULL);
static void
tcp_connection_cb(uv_stream_t *server, int status) {
- isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t*) server);
+ isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *) server);
isc_result_t result;
UNUSED(status);