bool paused;
bool finished;
isc_thread_t thread;
- isc_queue_t *ievents; /* incoming async events */
+ isc_queue_t *ievents; /* incoming async events */
+ isc_queue_t *ievents_prio; /* priority async events
+ * used for listening etc.
+ * can be processed while
+ * worker is paused */
isc_refcount_t references;
atomic_int_fast64_t pktcount;
char recvbuf[65536];
};
typedef enum isc__netievent_type {
- netievent_stop,
- netievent_udplisten,
- netievent_udpstoplisten,
netievent_udpsend,
netievent_udprecv,
netievent_tcpconnect,
netievent_tcprecv,
netievent_tcpstartread,
netievent_tcppauseread,
- netievent_tcplisten,
netievent_tcpchildlisten,
- netievent_tcpstoplisten,
netievent_tcpstopchildlisten,
- netievent_tcpclose,
netievent_closecb,
netievent_shutdown,
+ netievent_stop,
+ netievent_udpstoplisten,
+ netievent_tcpstoplisten,
+ netievent_tcpclose,
+ netievent_prio = 0xff,
+ netievent_udplisten,
+ netievent_tcplisten,
} isc__netievent_type;
/*
isc_astack_t *inactivehandles;
isc_astack_t *inactivereqs;
- /* Used for active/rchildren during shutdown */
+ /*
+ * Used to wait for listening event to be done and active/rchildren
+ * during shutdown.
+ */
isc_mutex_t lock;
isc_condition_t cond;
+ isc_result_t result;
+
/*%
* List of active handles.
* ah - current position in 'ah_frees'; this represents the
nm_thread(void *worker0);
static void
async_cb(uv_async_t *handle);
+static void
+process_queue(isc__networker_t *worker, isc_queue_t *queue);
int
isc_nm_tid() {
isc_condition_init(&worker->cond);
worker->ievents = isc_queue_new(mgr->mctx, 128);
+ worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
/*
* We need to do this here and not in nm_thread to avoid a
UNLOCK(&mgr->lock);
for (size_t i = 0; i < mgr->nworkers; i++) {
+ isc__networker_t *worker = &mgr->workers[i];
/* Empty the async event queue */
isc__netievent_t *ievent;
while ((ievent = (isc__netievent_t *)
- isc_queue_dequeue(mgr->workers[i].ievents)) != NULL)
+ isc_queue_dequeue(worker->ievents)) != NULL)
+ {
+ isc_mempool_put(mgr->evpool, ievent);
+ }
+ while ((ievent = (isc__netievent_t *)
+ isc_queue_dequeue(worker->ievents_prio)) != NULL)
{
isc_mempool_put(mgr->evpool, ievent);
}
- int r = uv_loop_close(&mgr->workers[i].loop);
+ int r = uv_loop_close(&worker->loop);
INSIST(r == 0);
- isc_queue_destroy(mgr->workers[i].ievents);
- isc_thread_join(mgr->workers[i].thread, NULL);
+ isc_queue_destroy(worker->ievents);
+ isc_queue_destroy(worker->ievents_prio);
+ isc_thread_join(worker->thread, NULL);
}
isc_condition_destroy(&mgr->wkstatecond);
UNLOCK(&worker->mgr->lock);
WAIT(&worker->cond, &worker->lock);
+
+ /* Process priority events */
+ process_queue(worker, worker->ievents_prio);
}
if (pausing) {
uint32_t wp = atomic_fetch_sub_explicit(
/*
* Empty the async queue.
*/
- async_cb(&worker->async);
+ process_queue(worker, worker->ievents_prio);
+ process_queue(worker, worker->ievents);
}
LOCK(&worker->mgr->lock);
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *) handle->loop->data;
+ process_queue(worker, worker->ievents_prio);
+ process_queue(worker, worker->ievents);
+}
+
+static void
+process_queue(isc__networker_t *worker, isc_queue_t *queue) {
isc__netievent_t *ievent;
while ((ievent = (isc__netievent_t *)
- isc_queue_dequeue(worker->ievents)) != NULL)
+ isc_queue_dequeue(queue)) != NULL)
{
switch (ievent->type) {
case netievent_stop:
- uv_stop(handle->loop);
+ uv_stop(&worker->loop);
isc_mempool_put(worker->mgr->evpool, ievent);
return;
case netievent_udplisten:
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
- isc_queue_enqueue(worker->ievents, (uintptr_t)event);
+ if (event->type > netievent_prio) {
+ /*
+ * We need to make sure this signal will be delivered and
+ * the queue will be processed.
+ */
+ LOCK(&worker->lock);
+ isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
+ SIGNAL(&worker->cond);
+ UNLOCK(&worker->lock);
+ } else {
+ isc_queue_enqueue(worker->ievents, (uintptr_t)event);
+ }
uv_async_send(&worker->async);
}
nsock->rcbarg = cbarg;
nsock->extrahandlesize = extrahandlesize;
nsock->backlog = backlog;
+ nsock->result = ISC_R_SUCCESS;
if (quota != NULL) {
/*
* We don't attach to quota, just assign - to avoid
*/
nsock->pquota = quota;
}
- nsock->tid = isc_random_uniform(mgr->nworkers);
- /*
- * Listening to TCP is rare enough not to care about the
- * added overhead from passing this to another thread.
- */
- isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
+ isc__netievent_tcplisten_t *ievent;
+ ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
ievent->sock = nsock;
- isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
- (isc__netievent_t *) ievent);
-
-
- *sockp = nsock;
+ if (isc__nm_in_netthread()) {
+ nsock->tid = isc_nm_tid();
+ isc__nm_async_tcplisten(&mgr->workers[nsock->tid],
+ (isc__netievent_t *) ievent);
+ isc__nm_put_ievent(mgr, ievent);
+ } else {
+ nsock->tid = isc_random_uniform(mgr->nworkers);
+ LOCK(&nsock->lock);
+ isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
+ (isc__netievent_t *) ievent);
+ WAIT(&nsock->cond, &nsock->lock);
+ UNLOCK(&nsock->lock);
+ }
- return (ISC_R_SUCCESS);
+ if (nsock->result == ISC_R_SUCCESS) {
+ *sockp = nsock;
+ return (ISC_R_SUCCESS);
+ } else {
+ isc_result_t result = nsock->result;
+ isc_nmsocket_detach(&nsock);
+ return (result);
+ }
}
/*
REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->type == isc_nm_tcplistener);
+ /* Initialize children now to make cleaning up easier */
+ for (int i = 0; i < sock->nchildren; i++) {
+ isc_nmsocket_t *csock = &sock->children[i];
+
+ isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener);
+ csock->parent = sock;
+ csock->iface = sock->iface;
+ csock->tid = i;
+ csock->pquota = sock->pquota;
+ csock->backlog = sock->backlog;
+ csock->extrahandlesize = sock->extrahandlesize;
+
+ INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
+ csock->rcb.accept = sock->rcb.accept;
+ csock->rcbarg = sock->rcbarg;
+ csock->fd = -1;
+ }
+
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) {
- return;
+ /* It was never opened */
+ atomic_store(&sock->closed, true);
+ sock->result = isc__nm_uverr2result(r);
+ goto fini;
}
r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0);
if (r != 0) {
- return;
+ uv_close(&sock->uv_handle.handle, tcp_close_cb);
+ sock->result = isc__nm_uverr2result(r);
+ goto fini;
}
uv_handle_set_data(&sock->uv_handle.handle, sock);
/*
* sockets to be listened on over ipc.
*/
for (int i = 0; i < sock->nchildren; i++) {
- isc__netievent_tcpchildlisten_t *event = NULL;
isc_nmsocket_t *csock = &sock->children[i];
- isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener);
- csock->parent = sock;
- csock->iface = sock->iface;
- csock->tid = i;
- csock->pquota = sock->pquota;
- csock->backlog = sock->backlog;
- csock->extrahandlesize = sock->extrahandlesize;
-
- INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
- csock->rcb.accept = sock->rcb.accept;
- csock->rcbarg = sock->rcbarg;
- csock->fd = -1;
-
+ isc__netievent_tcpchildlisten_t *event;
event = isc__nm_get_ievent(csock->mgr,
netievent_tcpchildlisten);
event->sock = csock;
atomic_store(&sock->listening, true);
+fini:
+ LOCK(&sock->lock);
+ SIGNAL(&sock->cond);
+ UNLOCK(&sock->lock);
return;
}
result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb,
dnslistensock, extrahandlesize, backlog,
quota, &dnslistensock->outer);
-
- atomic_store(&dnslistensock->listening, true);
- *sockp = dnslistensock;
-
- return (result);
+ if (result == ISC_R_SUCCESS) {
+ atomic_store(&dnslistensock->listening, true);
+ *sockp = dnslistensock;
+ return (ISC_R_SUCCESS);
+ } else {
+ atomic_store(&dnslistensock->closed, true);
+ isc_nmsocket_detach(&dnslistensock);
+ return (result);
+ }
}
void