static void
tcp_close_cb(uv_handle_t *uvhandle);
-static void
-stoplistening(isc_nmsocket_t *sock);
static void
tcp_listenclose_cb(uv_handle_t *handle);
+static isc_result_t
+accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
static int
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
- nsock->nchildren = mgr->nworkers;
- atomic_init(&nsock->rchildren, mgr->nworkers);
- nsock->children = isc_mem_get(mgr->mctx,
- mgr->nworkers * sizeof(*nsock));
- memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock));
nsock->rcb.accept = cb;
nsock->rcbarg = cbarg;
nsock->extrahandlesize = extrahandlesize;
}
/*
- * For multi-threaded TCP listening, we create a single "parent" socket,
- * bind to it, and then pass its uv_handle to a set of child sockets, one
- * per worker. For thread safety, the passing of the socket's uv_handle has
- * to be done via IPC socket.
- *
- * This design pattern is ugly but it's what's recommended by the libuv
- * documentation. (A prior version of libuv had uv_export() and
- * uv_import() functions which would have simplified this greatly, but
- * they have been deprecated and removed.)
+ * For multi-threaded TCP listening, we create a single socket,
+ * bind to it, and start listening. On an incoming connection we accept
+ * it, and then pass the accepted socket using the uv_export/uv_import
+ * mechanism to a child thread.
*/
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->type == isc_nm_tcplistener);
- /* Initialize children now to make cleaning up easier */
- for (int i = 0; i < sock->nchildren; i++) {
- isc_nmsocket_t *csock = &sock->children[i];
-
- isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener,
- sock->iface);
- csock->parent = sock;
- csock->tid = i;
- csock->pquota = sock->pquota;
- csock->backlog = sock->backlog;
- csock->extrahandlesize = sock->extrahandlesize;
-
- INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
- csock->rcb.accept = sock->rcb.accept;
- csock->rcbarg = sock->rcbarg;
- csock->fd = -1;
- }
-
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) {
/* It was never opened */
goto done;
}
- uv_handle_set_data(&sock->uv_handle.handle, sock);
-
/*
- * For each worker, we send a 'tcpchildlisten' event with
- * the exported socket.
+ * The callback will run in the same thread uv_listen() was called
+ * from, so a race with tcp_connection_cb() isn't possible.
*/
- for (int i = 0; i < sock->nchildren; i++) {
- isc_nmsocket_t *csock = &sock->children[i];
- isc__netievent_tcpchildlisten_t *event = NULL;
-
- event = isc__nm_get_ievent(csock->mgr,
- netievent_tcpchildlisten);
- r = isc_uv_export(&sock->uv_handle.stream, &event->streaminfo);
- if (r != 0) {
- isc_log_write(
- isc_lctx, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
- "uv_export failed: %s",
- isc_result_totext(isc__nm_uverr2result(r)));
- isc__nm_put_ievent(sock->mgr, event);
- continue;
- }
- event->sock = csock;
- if (csock->tid == isc_nm_tid()) {
- isc__nm_async_tcpchildlisten(&sock->mgr->workers[i],
- (isc__netievent_t *)event);
- isc__nm_put_ievent(sock->mgr, event);
- } else {
- isc__nm_enqueue_ievent(&sock->mgr->workers[i],
- (isc__netievent_t *)event);
- }
+ r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
+ tcp_connection_cb);
+ if (r != 0) {
+ isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
+ "uv_listen failed: %s",
+ isc_result_totext(isc__nm_uverr2result(r)));
+ uv_close(&sock->uv_handle.handle, tcp_close_cb);
+ sock->result = isc__nm_uverr2result(r);
+ atomic_store(&sock->listen_error, true);
+ goto done;
}
+ uv_handle_set_data(&sock->uv_handle.handle, sock);
+
atomic_store(&sock->listening, true);
done:
return;
}
-/*
- * Connect to the parent socket and be ready to receive the uv_handle
- * for the socket we'll be listening on.
- */
+static void
+tcp_connection_cb(uv_stream_t *server, int status) {
+ isc_nmsocket_t *psock = uv_handle_get_data((uv_handle_t *)server);
+ isc_result_t result;
+
+ UNUSED(status);
+
+ result = accept_connection(psock, NULL);
+ if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
+ if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
+ can_log_tcp_quota()) {
+ isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
+ "TCP connection failed: %s",
+ isc_result_totext(result));
+ }
+ }
+}
+
void
-isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0) {
- isc__netievent_tcpchildlisten_t *ievent =
- (isc__netievent_tcpchildlisten_t *)ev0;
- isc_nmsocket_t *sock = ievent->sock;
+isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc__netievent_tcpchildaccept_t *ievent =
+ (isc__netievent_tcpchildaccept_t *)ev0;
+ isc_nmsocket_t *ssock = ievent->sock;
+ isc_nmsocket_t *csock = NULL;
+ isc_nmhandle_t *handle;
+ isc_result_t result;
+ struct sockaddr_storage ss;
+ isc_sockaddr_t local;
int r;
REQUIRE(isc__nm_in_netthread());
- REQUIRE(sock->type == isc_nm_tcpchildlistener);
+ REQUIRE(ssock->type == isc_nm_tcplistener);
- worker = &sock->mgr->workers[isc_nm_tid()];
+ csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
+ isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
+ csock->tid = isc_nm_tid();
+ csock->extrahandlesize = ssock->extrahandlesize;
- uv_tcp_init(&worker->loop, (uv_tcp_t *)&sock->uv_handle.tcp);
- uv_handle_set_data(&sock->uv_handle.handle, sock);
- r = isc_uv_import(&sock->uv_handle.stream, &ievent->streaminfo);
+ csock->quota = ievent->quota;
+ ievent->quota = NULL;
+
+ worker = &ssock->mgr->workers[isc_nm_tid()];
+ uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
+
+ r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo);
if (r != 0) {
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"uv_import failed: %s",
isc_result_totext(isc__nm_uverr2result(r)));
- return;
+ result = isc__nm_uverr2result(r);
+ goto error;
}
- r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
- tcp_connection_cb);
+ r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
+ &(int){ sizeof(ss) });
+ if (r != 0) {
+ result = isc__nm_uverr2result(r);
+ goto error;
+ }
+ result = isc_sockaddr_fromsockaddr(&csock->peer,
+ (struct sockaddr *)&ss);
+ if (result != ISC_R_SUCCESS) {
+ goto error;
+ }
+
+ r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
+ &(int){ sizeof(ss) });
if (r != 0) {
- isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
- "uv_listen failed: %s",
- isc_result_totext(isc__nm_uverr2result(r)));
- return;
+ result = isc__nm_uverr2result(r);
+ goto error;
+ }
+
+ result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
+ if (result != ISC_R_SUCCESS) {
+ goto error;
}
+
+ isc_nmsocket_attach(ssock, &csock->server);
+
+ handle = isc__nmhandle_get(csock, NULL, &local);
+
+ INSIST(ssock->rcb.accept != NULL);
+ csock->read_timeout = ssock->mgr->init;
+ ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
+ isc_nmsocket_detach(&csock);
+ return;
+
+error:
+ /*
+ * Detach the quota early to make room for other connections;
+ * otherwise it'd be detached later asynchronously, and clog
+ * the quota unnecessarily.
+ */
+ if (csock->quota != NULL) {
+ isc_quota_detach(&csock->quota);
+ }
+ isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
+ ISC_LOG_ERROR, "Accepting TCP connection failed: %s",
+ isc_result_totext(result));
+
+ /*
+ * Detach the socket properly to make sure uv_close() is called.
+ */
+ isc_nmsocket_detach(&csock);
}
void
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)event);
} else {
- stoplistening(sock);
+ uv_close((uv_handle_t *)&sock->uv_handle.tcp,
+ tcp_listenclose_cb);
isc__nm_drop_interlocked(sock->mgr);
}
}
-static void
-stoplistening(isc_nmsocket_t *sock) {
- for (int i = 0; i < sock->nchildren; i++) {
- isc__netievent_tcpchildstop_t *event = NULL;
-
- /*
- * We can ignore the overhead of event allocation because
- * stoplistening is a rare event, and doing it this way
- * simplifies sock reference counting.
- */
- event = isc__nm_get_ievent(sock->mgr, netievent_tcpchildstop);
- isc_nmsocket_attach(&sock->children[i], &event->sock);
-
- if (isc_nm_tid() == sock->children[i].tid) {
- isc__nm_async_tcpchildstop(&sock->mgr->workers[i],
- (isc__netievent_t *)event);
- isc__nm_put_ievent(sock->mgr, event);
- } else {
- isc__nm_enqueue_ievent(&sock->mgr->workers[i],
- (isc__netievent_t *)event);
- }
- }
-
- LOCK(&sock->lock);
- while (atomic_load_relaxed(&sock->rchildren) > 0) {
- WAIT(&sock->cond, &sock->lock);
- }
- UNLOCK(&sock->lock);
- uv_close((uv_handle_t *)&sock->uv_handle.tcp, tcp_listenclose_cb);
-}
-
-void
-isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0) {
- isc__netievent_tcpchildstop_t *ievent =
- (isc__netievent_tcpchildstop_t *)ev0;
- isc_nmsocket_t *sock = ievent->sock;
-
- UNUSED(worker);
-
- REQUIRE(VALID_NMSOCK(sock));
- REQUIRE(isc_nm_tid() == sock->tid);
- REQUIRE(sock->type == isc_nm_tcpchildlistener);
- REQUIRE(sock->parent != NULL);
-
- /*
- * rchildren is atomic, but we still need to change it
- * under a lock because the parent is waiting on conditional
- * and without it we might deadlock.
- */
- LOCK(&sock->parent->lock);
- atomic_fetch_sub(&sock->parent->rchildren, 1);
- UNLOCK(&sock->parent->lock);
-
- uv_close((uv_handle_t *)&sock->uv_handle.tcp, tcp_listenclose_cb);
- BROADCAST(&sock->parent->cond);
-}
-
/*
- * This callback is used for closing both child and parent listening
- * sockets; that's why we need to choose the proper lock.
+ * This callback is used for closing listening sockets.
*/
static void
tcp_listenclose_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
- isc_mutex_t *lock = ((sock->parent != NULL) ? &sock->parent->lock
- : &sock->lock);
- LOCK(lock);
+ LOCK(&sock->lock);
atomic_store(&sock->closed, true);
atomic_store(&sock->listening, false);
sock->pquota = NULL;
- UNLOCK(lock);
+ UNLOCK(&sock->lock);
isc_nmsocket_detach(&sock);
}
*/
}
+static void
+quota_accept_cb(isc_quota_t *quota, void *sock0) {
+ isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0;
+ isc__netievent_tcpaccept_t *ievent = NULL;
+
+ REQUIRE(VALID_NMSOCK(sock));
+
+ /*
+ * Create a tcpaccept event and pass it using the async channel.
+ */
+ ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpaccept);
+ ievent->sock = sock;
+ ievent->quota = quota;
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *)ievent);
+}
+
+/*
+ * This is called after we get a quota_accept_cb() callback.
+ */
+void
+isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
+ isc_result_t result;
+ isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
+
+ REQUIRE(worker->id == ievent->sock->tid);
+
+ result = accept_connection(ievent->sock, ievent->quota);
+ if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
+ if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
+ can_log_tcp_quota()) {
+ isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
+ "TCP connection failed: %s",
+ isc_result_totext(result));
+ }
+ }
+
+ /*
+ * The socket was attached just before we called isc_quota_attach_cb().
+ */
+ isc_nmsocket_detach(&ievent->sock);
+}
+
+/*
+ * Close callback for uv_tcp_t strutures created in accept_connection().
+ */
+static void
+free_uvtcpt(uv_handle_t *uvs) {
+ isc_mem_t *mctx = (isc_mem_t *)uv_handle_get_data(uvs);
+ isc_mem_putanddetach(&mctx, uvs, sizeof(uv_tcp_t));
+}
+
static isc_result_t
-accept_connection(isc_nmsocket_t *ssock) {
+accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
isc_result_t result;
- isc_quota_t *quota = NULL;
- isc_nmsocket_t *csock = NULL;
+ isc__netievent_tcpchildaccept_t *event = NULL;
isc__networker_t *worker = NULL;
- isc_nmhandle_t *handle = NULL;
- struct sockaddr_storage ss;
- isc_sockaddr_t local;
- int r;
- bool overquota = false;
+ uv_tcp_t *uvstream = NULL;
+ isc_mem_t *mctx = NULL;
+ int r, w;
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
atomic_load_relaxed(&ssock->mgr->closing))
{
/* We're closing, bail */
+ if (quota != NULL) {
+ isc_quota_detach("a);
+ }
return (ISC_R_CANCELED);
}
- if (ssock->pquota != NULL) {
- result = isc_quota_attach(ssock->pquota, "a);
-
+ /* We can be called directly or as a callback from quota */
+ if (ssock->pquota != NULL && quota == NULL) {
/*
- * We share the quota between all TCP sockets. Others
- * may have used up all the quota slots, in which case
- * this socket could starve. So we only fail here if we
- * already had at least one active connection on this
- * socket. This guarantees that we'll maintain some level
- * of service while over quota, and will resume normal
- * service when the quota comes back down.
+ * We need to attach to ssock, because it might be queued
+ * waiting for a TCP quota slot. If so, then we'll detach it
+ * later when the connection is accepted. (XXX: This may be
+ * suboptimal, it might be better to attach unless
+ * we need to.)
*/
- if (result != ISC_R_SUCCESS) {
- ssock->overquota++;
- overquota = true;
- if (ssock->conns > 0) {
- isc__nm_incstats(
- ssock->mgr,
- ssock->statsindex[STATID_ACCEPTFAIL]);
- return (result);
- }
+ isc_nmsocket_t *tsock = NULL;
+ isc_nmsocket_attach(ssock, &tsock);
+ isc_quota_cb_init(&ssock->quotacb, quota_accept_cb, tsock);
+ result = isc_quota_attach_cb(ssock->pquota, "a,
+ &ssock->quotacb);
+ if (result == ISC_R_QUOTA) {
+ isc__nm_incstats(ssock->mgr,
+ ssock->statsindex[STATID_ACCEPTFAIL]);
+ return (result);
}
+
+ /*
+ * We're under quota, so there's no need to wait;
+ * clear the quota callback and and detach the socket.
+ */
+ isc_quota_cb_init(&ssock->quotacb, NULL, NULL);
+ isc_nmsocket_detach(&tsock);
}
isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
- csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
- isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
- csock->tid = isc_nm_tid();
- csock->extrahandlesize = ssock->extrahandlesize;
- csock->quota = quota;
- quota = NULL;
-
worker = &ssock->mgr->workers[isc_nm_tid()];
- uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
+ uvstream = isc_mem_get(ssock->mgr->mctx, sizeof(uv_tcp_t));
- r = uv_accept(&ssock->uv_handle.stream, &csock->uv_handle.stream);
- if (r != 0) {
- result = isc__nm_uverr2result(r);
- goto error;
- }
+ isc_mem_attach(ssock->mgr->mctx, &mctx);
+ uv_handle_set_data((uv_handle_t *)uvstream, mctx);
+ mctx = NULL; /* Detached later in free_uvtcpt() */
- r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
- &(int){ sizeof(ss) });
- if (r != 0) {
- result = isc__nm_uverr2result(r);
- goto error;
- }
+ uv_tcp_init(&worker->loop, uvstream);
- result = isc_sockaddr_fromsockaddr(&csock->peer,
- (struct sockaddr *)&ss);
- if (result != ISC_R_SUCCESS) {
- goto error;
- }
-
- r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
- &(int){ sizeof(ss) });
+ r = uv_accept(&ssock->uv_handle.stream, (uv_stream_t *)uvstream);
if (r != 0) {
result = isc__nm_uverr2result(r);
- goto error;
- }
- result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
- if (result != ISC_R_SUCCESS) {
- goto error;
+ uv_close((uv_handle_t *)uvstream, free_uvtcpt);
+ isc_quota_detach("a);
+ return (result);
}
- isc_nmsocket_attach(ssock, &csock->server);
- ssock->conns++;
-
- handle = isc__nmhandle_get(csock, NULL, &local);
+ /* We have an accepted TCP socket, pass it to a random worker */
+ w = isc_random_uniform(ssock->mgr->nworkers);
+ event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept);
+ event->sock = ssock;
+ event->quota = quota;
- INSIST(ssock->rcb.accept != NULL);
- csock->read_timeout = ssock->mgr->init;
- ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
- isc_nmsocket_detach(&csock);
+ r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo);
+ RUNTIME_CHECK(r == 0);
- return (ISC_R_SUCCESS);
+ uv_close((uv_handle_t *)uvstream, free_uvtcpt);
-error:
- /*
- * Detach it early to make room for other connections, otherwise
- * it'd be detached later asynchronously clogging the quota.
- */
- if (csock->quota != NULL) {
- isc_quota_detach(&csock->quota);
- }
- if (overquota) {
- ssock->overquota--;
+ if (w == isc_nm_tid()) {
+ isc__nm_async_tcpchildaccept(&ssock->mgr->workers[w],
+ (isc__netievent_t *)event);
+ isc__nm_put_ievent(ssock->mgr, event);
+ } else {
+ isc__nm_enqueue_ievent(&ssock->mgr->workers[w],
+ (isc__netievent_t *)event);
}
- /* We need to detach it properly to make sure uv_close is called. */
- isc_nmsocket_detach(&csock);
- return (result);
-}
-
-static void
-tcp_connection_cb(uv_stream_t *server, int status) {
- isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *)server);
- isc_result_t result;
- UNUSED(status);
-
- result = accept_connection(ssock);
- if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
- if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
- can_log_tcp_quota()) {
- isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
- "TCP connection failed: %s",
- isc_result_totext(result));
- }
- }
+ return (ISC_R_SUCCESS);
}
isc_result_t
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->type == isc_nm_tcpsocket);
- isc_nmsocket_t *ssock = sock->server;
-
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
}
- if (ssock != NULL) {
- ssock->conns--;
- while (ssock->conns == 0 && ssock->overquota > 0) {
- ssock->overquota--;
- isc_result_t result = accept_connection(ssock);
- if (result == ISC_R_SUCCESS || result == ISC_R_NOCONN) {
- continue;
- }
- if ((result != ISC_R_QUOTA &&
- result != ISC_R_SOFTQUOTA) ||
- can_log_tcp_quota()) {
- isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_NETMGR,
- ISC_LOG_ERROR,
- "TCP connection failed: %s",
- isc_result_totext(result));
- }
- }
- }
if (sock->timer_initialized) {
sock->timer_initialized = false;
uv_timer_stop(&sock->timer);