From: Witold Kręcicki Date: Mon, 2 Dec 2019 12:54:44 +0000 (+0100) Subject: - Add separate priority event queue for events that must be processed X-Git-Tag: v9.15.7~20^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8c5aaacbefa769454ddc2c2e7ad934415fcd933b;p=thirdparty%2Fbind9.git - Add separate priority event queue for events that must be processed even when worker is paused (e.g. interface reconfiguration). This is needed to prevent deadlocks when reconfiguring interfaces - as network manager is paused then, but we still need to stop/start listening. - Proper handling of TCP listen errors in netmgr - bind to the socket first, then return the error code. --- diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 129f4001c40..4a8f10aff7c 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -46,7 +46,11 @@ typedef struct isc__networker { bool paused; bool finished; isc_thread_t thread; - isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents_prio; /* priority async events + * used for listening etc. + * can be processed while + * worker is paused */ isc_refcount_t references; atomic_int_fast64_t pktcount; char recvbuf[65536]; @@ -103,9 +107,6 @@ struct isc_nmiface { }; typedef enum isc__netievent_type { - netievent_stop, - netievent_udplisten, - netievent_udpstoplisten, netievent_udpsend, netievent_udprecv, netievent_tcpconnect, @@ -113,13 +114,17 @@ typedef enum isc__netievent_type { netievent_tcprecv, netievent_tcpstartread, netievent_tcppauseread, - netievent_tcplisten, netievent_tcpchildlisten, - netievent_tcpstoplisten, netievent_tcpstopchildlisten, - netievent_tcpclose, netievent_closecb, netievent_shutdown, + netievent_stop, + netievent_udpstoplisten, + netievent_tcpstoplisten, + netievent_tcpclose, + netievent_prio = 0xff, + netievent_udplisten, + netievent_tcplisten, } isc__netievent_type; /* @@ -402,10 +407,15 @@ struct isc_nmsocket { isc_astack_t *inactivehandles; isc_astack_t *inactivereqs; - /* Used for active/rchildren during shutdown */ + /* + * Used to wait for listening event to be done and active/rchildren + * during shutdown. + */ isc_mutex_t lock; isc_condition_t cond; + isc_result_t result; + /*% * List of active handles. * ah - current position in 'ah_frees'; this represents the diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 9ef847bf2b8..f91651c0093 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -57,6 +57,8 @@ static void * nm_thread(void *worker0); static void async_cb(uv_async_t *handle); +static void +process_queue(isc__networker_t *worker, isc_queue_t *queue); int isc_nm_tid() { @@ -135,6 +137,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); + worker->ievents_prio = isc_queue_new(mgr->mctx, 128); /* * We need to do this here and not in nm_thread to avoid a @@ -182,17 +185,24 @@ nm_destroy(isc_nm_t **mgr0) { UNLOCK(&mgr->lock); for (size_t i = 0; i < mgr->nworkers; i++) { + isc__networker_t *worker = &mgr->workers[i]; /* Empty the async event queue */ isc__netievent_t *ievent; while ((ievent = (isc__netievent_t *) - isc_queue_dequeue(mgr->workers[i].ievents)) != NULL) + isc_queue_dequeue(worker->ievents)) != NULL) + { + isc_mempool_put(mgr->evpool, ievent); + } + while ((ievent = (isc__netievent_t *) + isc_queue_dequeue(worker->ievents_prio)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - int r = uv_loop_close(&mgr->workers[i].loop); + int r = uv_loop_close(&worker->loop); INSIST(r == 0); - isc_queue_destroy(mgr->workers[i].ievents); - isc_thread_join(mgr->workers[i].thread, NULL); + isc_queue_destroy(worker->ievents); + isc_queue_destroy(worker->ievents_prio); + isc_thread_join(worker->thread, NULL); } isc_condition_destroy(&mgr->wkstatecond); @@ -410,6 +420,9 @@ nm_thread(void *worker0) { UNLOCK(&worker->mgr->lock); WAIT(&worker->cond, &worker->lock); + + /* Process priority events */ + process_queue(worker, worker->ievents_prio); } if (pausing) { uint32_t wp = atomic_fetch_sub_explicit( @@ -459,7 +472,8 @@ nm_thread(void *worker0) { /* * Empty the async queue. */ - async_cb(&worker->async); + process_queue(worker, worker->ievents_prio); + process_queue(worker, worker->ievents); } LOCK(&worker->mgr->lock); @@ -479,14 +493,20 @@ nm_thread(void *worker0) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *) handle->loop->data; + process_queue(worker, worker->ievents_prio); + process_queue(worker, worker->ievents); +} + +static void +process_queue(isc__networker_t *worker, isc_queue_t *queue) { isc__netievent_t *ievent; while ((ievent = (isc__netievent_t *) - isc_queue_dequeue(worker->ievents)) != NULL) + isc_queue_dequeue(queue)) != NULL) { switch (ievent->type) { case netievent_stop: - uv_stop(handle->loop); + uv_stop(&worker->loop); isc_mempool_put(worker->mgr->evpool, ievent); return; case netievent_udplisten: @@ -557,7 +577,18 @@ isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) { void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { - isc_queue_enqueue(worker->ievents, (uintptr_t)event); + if (event->type > netievent_prio) { + /* + * We need to make sure this signal will be delivered and + * the queue will be processed. + */ + LOCK(&worker->lock); + isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); + SIGNAL(&worker->cond); + UNLOCK(&worker->lock); + } else { + isc_queue_enqueue(worker->ievents, (uintptr_t)event); + } uv_async_send(&worker->async); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 9a4746cc40a..63573d8921c 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -167,6 +167,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock->rcbarg = cbarg; nsock->extrahandlesize = extrahandlesize; nsock->backlog = backlog; + nsock->result = ISC_R_SUCCESS; if (quota != NULL) { /* * We don't attach to quota, just assign - to avoid @@ -174,21 +175,32 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, */ nsock->pquota = quota; } - nsock->tid = isc_random_uniform(mgr->nworkers); - /* - * Listening to TCP is rare enough not to care about the - * added overhead from passing this to another thread. - */ - isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); + isc__netievent_tcplisten_t *ievent; + ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; - isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], - (isc__netievent_t *) ievent); - - - *sockp = nsock; + if (isc__nm_in_netthread()) { + nsock->tid = isc_nm_tid(); + isc__nm_async_tcplisten(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + isc__nm_put_ievent(mgr, ievent); + } else { + nsock->tid = isc_random_uniform(mgr->nworkers); + LOCK(&nsock->lock); + isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + WAIT(&nsock->cond, &nsock->lock); + UNLOCK(&nsock->lock); + } - return (ISC_R_SUCCESS); + if (nsock->result == ISC_R_SUCCESS) { + *sockp = nsock; + return (ISC_R_SUCCESS); + } else { + isc_result_t result = nsock->result; + isc_nmsocket_detach(&nsock); + return (result); + } } /* @@ -208,14 +220,37 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->type == isc_nm_tcplistener); + /* Initialize children now to make cleaning up easier */ + for (int i = 0; i < sock->nchildren; i++) { + isc_nmsocket_t *csock = &sock->children[i]; + + isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); + csock->parent = sock; + csock->iface = sock->iface; + csock->tid = i; + csock->pquota = sock->pquota; + csock->backlog = sock->backlog; + csock->extrahandlesize = sock->extrahandlesize; + + INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); + csock->rcb.accept = sock->rcb.accept; + csock->rcbarg = sock->rcbarg; + csock->fd = -1; + } + r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { - return; + /* It was never opened */ + atomic_store(&sock->closed, true); + sock->result = isc__nm_uverr2result(r); + goto fini; } r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); if (r != 0) { - return; + uv_close(&sock->uv_handle.handle, tcp_close_cb); + sock->result = isc__nm_uverr2result(r); + goto fini; } uv_handle_set_data(&sock->uv_handle.handle, sock); /* @@ -240,22 +275,9 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { * sockets to be listened on over ipc. */ for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpchildlisten_t *event = NULL; isc_nmsocket_t *csock = &sock->children[i]; - isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); - csock->parent = sock; - csock->iface = sock->iface; - csock->tid = i; - csock->pquota = sock->pquota; - csock->backlog = sock->backlog; - csock->extrahandlesize = sock->extrahandlesize; - - INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); - csock->rcb.accept = sock->rcb.accept; - csock->rcbarg = sock->rcbarg; - csock->fd = -1; - + isc__netievent_tcpchildlisten_t *event; event = isc__nm_get_ievent(csock->mgr, netievent_tcpchildlisten); event->sock = csock; @@ -271,6 +293,10 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { atomic_store(&sock->listening, true); +fini: + LOCK(&sock->lock); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); return; } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index def3993800c..cf3456c175c 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -298,11 +298,15 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, dnslistensock, extrahandlesize, backlog, quota, &dnslistensock->outer); - - atomic_store(&dnslistensock->listening, true); - *sockp = dnslistensock; - - return (result); + if (result == ISC_R_SUCCESS) { + atomic_store(&dnslistensock->listening, true); + *sockp = dnslistensock; + return (ISC_R_SUCCESS); + } else { + atomic_store(&dnslistensock->closed, true); + isc_nmsocket_detach(&dnslistensock); + return (result); + } } void