include/isc/atomic.h \
include/isc/attributes.h \
include/isc/backtrace.h \
+ include/isc/barrier.h \
include/isc/base32.h \
include/isc/base64.h \
include/isc/bind9.h \
--- /dev/null
+/*
+ * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, you can obtain one at https://mozilla.org/MPL/2.0/.
+ *
+ * See the COPYRIGHT file distributed with this work for additional
+ * information regarding copyright ownership.
+ */
+
+#pragma once
+
+#include <isc/util.h>
+
+#if __SANITIZE_THREAD__ && !defined(WIN32)
+
+#include <pthread.h>
+
+#define isc_barrier_t pthread_barrier_t
+
+#define isc_barrier_init(barrier, count) \
+ pthread_barrier_init(barrier, NULL, count)
+#define isc_barrier_destroy(barrier) pthread_barrier_destroy(barrier)
+#define isc_barrier_wait(barrier) pthread_barrier_wait(barrier)
+
+#else
+
+#include <uv.h>
+
+#define isc_barrier_t uv_barrier_t
+
+#define isc_barrier_init(barrier, count) uv_barrier_init(barrier, count)
+#define isc_barrier_destroy(barrier) uv_barrier_destroy(barrier)
+#define isc_barrier_wait(barrier) uv_barrier_wait(barrier)
+
+#endif /* __SANITIZE_THREAD__ */
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
- sock->result = ISC_R_DEFAULT;
+ sock->result = ISC_R_UNSET;
sock->connect_cb = cb;
sock->connect_cbarg = cbarg;
atomic_init(&sock->client, true);
isc__nmsocket_attach(sock, &sock->outer->h2.httpserver);
sock->nchildren = sock->outer->nchildren;
- sock->result = ISC_R_DEFAULT;
+ sock->result = ISC_R_UNSET;
sock->tid = isc_random_uniform(sock->nchildren);
sock->fd = (uv_os_sock_t)-1;
#include <isc/astack.h>
#include <isc/atomic.h>
+#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/magic.h>
uv_async_t async; /* async channel to send
* data to this networker */
isc_mutex_t lock;
- isc_condition_t cond;
bool paused;
bool finished;
isc_thread_t thread;
* used for listening etc.
* can be processed while
* worker is paused */
+ isc_condition_t cond_prio;
+
isc_refcount_t references;
atomic_int_fast64_t pktcount;
char *recvbuf;
isc_mutex_t evlock;
uint_fast32_t workers_running;
- uint_fast32_t workers_paused;
+ atomic_uint_fast32_t workers_paused;
atomic_uint_fast32_t maxudp;
atomic_bool paused;
atomic_uint_fast32_t keepalive;
atomic_uint_fast32_t advertised;
+ isc_barrier_t pausing;
+ isc_barrier_t resuming;
+
#ifdef NETMGR_TRACE
ISC_LIST(isc_nmsocket_t) active_sockets;
#endif
/*% Self socket */
isc_nmsocket_t *self;
+ isc_barrier_t startlistening;
+ isc_barrier_t stoplistening;
+
/*% TLS stuff */
struct tls {
isc_tls_t *tls;
/* Atomic */
/*% Number of running (e.g. listening) child sockets */
- uint_fast32_t rchildren;
+ atomic_uint_fast32_t rchildren;
/*%
* Socket is active if it's listening, working, etc. If it's
#include <isc/atomic.h>
#include <isc/backtrace.h>
+#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
static void
async_cb(uv_async_t *handle);
static bool
+process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
+static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump);
+static void
+wait_for_priority_queue(isc__networker_t *worker);
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
+#define drain_priority_queue(worker) \
+ (void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
+#define drain_privilege_queue(worker) \
+ (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
+#define drain_task_queue(worker) \
+ (void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
+#define drain_normal_queue(worker) \
+ (void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
+
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc_refcount_init(&mgr->references, 1);
atomic_init(&mgr->maxudp, 0);
atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED);
+ atomic_init(&mgr->workers_paused, 0);
#ifdef NETMGR_TRACE
ISC_LIST_INIT(mgr->active_sockets);
isc_mempool_associatelock(mgr->evpool, &mgr->evlock);
isc_mempool_setfillcount(mgr->evpool, 32);
+ isc_barrier_init(&mgr->pausing, workers);
+ isc_barrier_init(&mgr->resuming, workers);
+
mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
for (size_t i = 0; i < workers; i++) {
int r;
RUNTIME_CHECK(r == 0);
isc_mutex_init(&worker->lock);
- isc_condition_init(&worker->cond);
worker->ievents = isc_queue_new(mgr->mctx, 128);
worker->ievents_task = isc_queue_new(mgr->mctx, 128);
worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
+ isc_condition_init(&worker->cond_prio);
+
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
{
isc_mempool_put(mgr->evpool, ievent);
}
+ isc_condition_destroy(&worker->cond_prio);
r = uv_loop_close(&worker->loop);
INSIST(r == 0);
isc_queue_destroy(worker->ievents_task);
isc_queue_destroy(worker->ievents_prio);
isc_mutex_destroy(&worker->lock);
- isc_condition_destroy(&worker->cond);
isc_mem_put(mgr->mctx, worker->sendbuf,
ISC_NETMGR_SENDBUF_SIZE);
isc_stats_detach(&mgr->stats);
}
+ isc_barrier_destroy(&mgr->resuming);
+ isc_barrier_destroy(&mgr->pausing);
+
isc_condition_destroy(&mgr->wkstatecond);
isc_condition_destroy(&mgr->wkpausecond);
isc_mutex_destroy(&mgr->lock);
}
}
+ if (isc__nm_in_netthread()) {
+ isc_barrier_wait(&mgr->pausing);
+ }
+
LOCK(&mgr->lock);
- while (mgr->workers_paused != pausing) {
+ while (atomic_load(&mgr->workers_paused) != pausing) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
+ UNLOCK(&mgr->lock);
+
REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false },
true));
- UNLOCK(&mgr->lock);
}
void
}
}
+ if (isc__nm_in_netthread()) {
+ isc_barrier_wait(&mgr->resuming);
+ }
+
LOCK(&mgr->lock);
- while (mgr->workers_paused != 0) {
+ while (atomic_load(&mgr->workers_paused) != 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
+ UNLOCK(&mgr->lock);
+
REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true },
false));
- BROADCAST(&mgr->wkpausecond);
- UNLOCK(&mgr->lock);
+
isc__nm_drop_interlocked(mgr);
}
if (worker->paused) {
INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid());
- /*
- * We need to lock the worker first; otherwise
- * isc_nm_resume() might slip in before WAIT() in
- * the while loop starts, then the signal never
- * gets delivered and we are stuck forever in the
- * paused loop.
- */
- LOCK(&worker->lock);
- LOCK(&mgr->lock);
- mgr->workers_paused++;
- SIGNAL(&mgr->wkstatecond);
- UNLOCK(&mgr->lock);
+ atomic_fetch_add(&mgr->workers_paused, 1);
+ if (isc_barrier_wait(&mgr->pausing) != 0) {
+ LOCK(&mgr->lock);
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
+ }
while (worker->paused) {
- WAIT(&worker->cond, &worker->lock);
- UNLOCK(&worker->lock);
- (void)process_priority_queue(
- worker, &(unsigned int){ UINT_MAX });
- LOCK(&worker->lock);
+ wait_for_priority_queue(worker);
}
- LOCK(&mgr->lock);
- mgr->workers_paused--;
- SIGNAL(&mgr->wkstatecond);
- UNLOCK(&mgr->lock);
- UNLOCK(&worker->lock);
-
/*
* All workers must drain the privileged event
* queue before we resume from pause.
*/
- (void)process_privilege_queue(
- worker, &(unsigned int){ UINT_MAX });
+ drain_privilege_queue(worker);
- LOCK(&mgr->lock);
- while (atomic_load(&mgr->paused)) {
- WAIT(&mgr->wkpausecond, &mgr->lock);
+ atomic_fetch_sub(&mgr->workers_paused, 1);
+ if (isc_barrier_wait(&mgr->resuming) != 0) {
+ LOCK(&mgr->lock);
+ SIGNAL(&mgr->wkstatecond);
+ UNLOCK(&mgr->lock);
}
- UNLOCK(&mgr->lock);
}
if (r == 0) {
* (they may include shutdown events) but do not process
* the netmgr event queue.
*/
- (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX });
- (void)process_task_queue(worker, &(unsigned int){ UINT_MAX });
+ drain_privilege_queue(worker);
+ drain_task_queue(worker);
LOCK(&mgr->lock);
mgr->workers_running--;
}
}
+static void
+wait_for_priority_queue(isc__networker_t *worker) {
+ isc_queue_t *queue = worker->ievents_prio;
+ isc_condition_t *cond = &worker->cond_prio;
+ bool wait_for_work = true;
+
+ while (true) {
+ isc__netievent_t *ievent;
+ LOCK(&worker->lock);
+ ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
+ if (wait_for_work) {
+ while (ievent == NULL) {
+ WAIT(cond, &worker->lock);
+ ievent = (isc__netievent_t *)isc_queue_dequeue(
+ queue);
+ }
+ }
+ UNLOCK(&worker->lock);
+ wait_for_work = false;
+
+ if (ievent == NULL) {
+ return;
+ }
+
+ (void)process_netievent(worker, ievent);
+ }
+}
+
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_prio, quantump));
isc__netievent_t *ievent =
(isc__netievent_t *)isc_queue_dequeue(queue);
- (*quantump)--;
-
if (ievent == NULL) {
/* We fully drained this queue */
return (true);
}
+ (*quantump)--;
+
if (!process_netievent(worker, ievent)) {
/* Netievent told us to stop */
return (false);
*/
LOCK(&worker->lock);
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
- SIGNAL(&worker->cond);
+ SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
}
/*
- * This was a parent socket; free the children.
+ * This was a parent socket: destroy the listening
+ * barriers that synchronized the children.
+ */
+ isc_barrier_destroy(&sock->startlistening);
+ isc_barrier_destroy(&sock->stoplistening);
+
+ /*
+ * Now free them.
*/
isc_mem_put(sock->mgr->mctx, sock->children,
sock->nchildren * sizeof(*sock));
isc_mem_free(sock->mgr->mctx, sock->ah_frees);
isc_mem_free(sock->mgr->mctx, sock->ah_handles);
isc_mutex_destroy(&sock->lock);
- isc_condition_destroy(&sock->cond);
isc_condition_destroy(&sock->scond);
isc__nm_tls_cleanup_data(sock);
isc__nm_http_cleanup_data(sock);
}
isc_mutex_init(&sock->lock);
- isc_condition_init(&sock->cond);
isc_condition_init(&sock->scond);
isc_refcount_init(&sock->references, 1);
#include <uv.h>
#include <isc/atomic.h>
+#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
static isc_result_t
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
int r;
REQUIRE(VALID_NMSOCK(sock));
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
- sock->result = ISC_R_DEFAULT;
+ sock->result = ISC_R_UNSET;
sock->fd = (uv_os_sock_t)-1;
atomic_init(&sock->client, true);
(isc__netievent_t *)ievent);
}
LOCK(&sock->lock);
- while (sock->result == ISC_R_DEFAULT) {
+ while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
return (sock);
}
+static void
+start_tcp_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
+ uv_os_sock_t fd, int tid) {
+ isc__netievent_tcplisten_t *ievent = NULL;
+ isc_nmsocket_t *csock = &sock->children[tid];
+
+ isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface);
+ csock->parent = sock;
+ csock->accept_cb = sock->accept_cb;
+ csock->accept_cbarg = sock->accept_cbarg;
+ csock->extrahandlesize = sock->extrahandlesize;
+ csock->backlog = sock->backlog;
+ csock->tid = tid;
+ /*
+ * We don't attach to quota, just assign - to avoid
+ * increasing quota unnecessarily.
+ */
+ csock->pquota = sock->pquota;
+ isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
+
+#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
+ UNUSED(fd);
+ csock->fd = isc__nm_tcp_lb_socket(iface->addr.type.sa.sa_family);
+#else
+ csock->fd = dup(fd);
+#endif
+ REQUIRE(csock->fd >= 0);
+
+ ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
+ (isc__netievent_t *)ievent);
+}
+
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
- sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
-#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
-#endif
REQUIRE(VALID_NM(mgr));
sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface);
- sock->rchildren = 0;
+ atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
sock->children = isc_mem_get(mgr->mctx, children_size);
memset(sock->children, 0, children_size);
- sock->result = ISC_R_DEFAULT;
- sock->tid = isc_random_uniform(sock->nchildren);
+ sock->result = ISC_R_UNSET;
+
+ sock->accept_cb = accept_cb;
+ sock->accept_cbarg = accept_cbarg;
+ sock->extrahandlesize = extrahandlesize;
+ sock->backlog = backlog;
+ sock->pquota = quota;
+
+ if (isc__nm_in_netthread()) {
+ sock->tid = isc_nm_tid();
+ } else {
+ sock->tid = isc_random_uniform(sock->nchildren);
+ }
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
- fd = isc__nm_tcp_lb_socket(sa_family);
+ fd = isc__nm_tcp_lb_socket(iface->addr.type.sa.sa_family);
#endif
- for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_tcplisten_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
-
- isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface);
- csock->parent = sock;
- csock->accept_cb = accept_cb;
- csock->accept_cbarg = accept_cbarg;
- csock->extrahandlesize = extrahandlesize;
- csock->backlog = backlog;
- csock->tid = i;
- /*
- * We don't attach to quota, just assign - to avoid
- * increasing quota unnecessarily.
- */
- csock->pquota = quota;
- isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
+ isc_barrier_init(&sock->startlistening, sock->nchildren);
-#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
- csock->fd = isc__nm_tcp_lb_socket(sa_family);
-#else
- csock->fd = dup(fd);
-#endif
- REQUIRE(csock->fd >= 0);
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ if ((int)i == isc_nm_tid()) {
+ continue;
+ }
+ start_tcp_child(mgr, iface, sock, fd, i);
+ }
- ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
- isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ if (isc__nm_in_netthread()) {
+ start_tcp_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
#endif
LOCK(&sock->lock);
- while (sock->rchildren != sock->nchildren) {
+ while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
- BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
- INSIST(result != ISC_R_DEFAULT);
+
+ INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
- REQUIRE(sock->rchildren == sock->nchildren);
+ REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
- isc__nm_tcp_stoplistening(sock);
+ isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
sock->pquota = NULL;
}
- sock->parent->rchildren += 1;
- if (sock->parent->result == ISC_R_DEFAULT) {
+ atomic_fetch_add(&sock->parent->rchildren, 1);
+ if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
- if (!atomic_load(&sock->parent->active)) {
- WAIT(&sock->parent->scond, &sock->parent->lock);
- }
- INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
+
+ isc_barrier_wait(&sock->parent->startlistening);
}
static void
INSIST(0);
ISC_UNREACHABLE();
}
- enqueue_stoplistening(sock);
+
+ if (!isc__nm_in_netthread()) {
+ enqueue_stoplistening(sock);
+ } else if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_tcp_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
void
return;
}
- stop_tcp_parent(sock);
+ if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_tcp_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
void
static void
stop_tcp_child(isc_nmsocket_t *sock) {
- bool last_child = false;
-
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_nm_tid());
tcp_close_direct(sock);
- LOCK(&sock->parent->lock);
- sock->parent->rchildren -= 1;
- last_child = (sock->parent->rchildren == 0);
- UNLOCK(&sock->parent->lock);
+ atomic_fetch_sub(&sock->parent->rchildren, 1);
- if (last_child) {
- atomic_store(&sock->parent->closed, true);
- isc__nmsocket_prep_destroy(sock->parent);
- }
+ isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_tcp_parent(isc_nmsocket_t *sock) {
+ isc_nmsocket_t *csock = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcplistener);
+ isc_barrier_init(&sock->stoplistening, sock->nchildren);
+
for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_tcpstop_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
+ csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
- atomic_store(&csock->active, false);
+ if ((int)i == isc_nm_tid()) {
+ /*
+ * We need to schedule closing the other sockets first
+ */
+ continue;
+ }
- ievent = isc__nm_get_netievent_tcpstop(sock->mgr, csock);
- isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
- (isc__netievent_t *)ievent);
+ atomic_store(&csock->active, false);
+ enqueue_stoplistening(csock);
}
+
+ csock = &sock->children[isc_nm_tid()];
+ atomic_store(&csock->active, false);
+ stop_tcp_child(csock);
+
+ atomic_store(&sock->closed, true);
+ isc__nmsocket_prep_destroy(sock);
}
static void
#include <uv.h>
#include <isc/atomic.h>
+#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
static isc_result_t
tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
int r;
REQUIRE(VALID_NMSOCK(sock));
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
- sock->result = ISC_R_DEFAULT;
+ sock->result = ISC_R_UNSET;
atomic_init(&sock->client, true);
req = isc__nm_uvreq_get(mgr, sock);
}
LOCK(&sock->lock);
- while (sock->result == ISC_R_DEFAULT) {
+ while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
return (sock);
}
+static void
+start_tcpdns_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
+ uv_os_sock_t fd, int tid) {
+ isc__netievent_tcpdnslisten_t *ievent = NULL;
+ isc_nmsocket_t *csock = &sock->children[tid];
+
+ isc__nmsocket_init(csock, mgr, isc_nm_tcpdnssocket, iface);
+ csock->parent = sock;
+ csock->accept_cb = sock->accept_cb;
+ csock->accept_cbarg = sock->accept_cbarg;
+ csock->recv_cb = sock->recv_cb;
+ csock->recv_cbarg = sock->recv_cbarg;
+ csock->extrahandlesize = sock->extrahandlesize;
+ csock->backlog = sock->backlog;
+ csock->tid = tid;
+ /*
+ * We don't attach to quota, just assign - to avoid
+ * increasing quota unnecessarily.
+ */
+ csock->pquota = sock->pquota;
+ isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
+
+#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
+ UNUSED(fd);
+ csock->fd = isc__nm_tcpdns_lb_socket(iface->addr.type.sa.sa_family);
+#else
+ csock->fd = dup(fd);
+#endif
+ REQUIRE(csock->fd >= 0);
+
+ ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
+ (isc__netievent_t *)ievent);
+}
isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t recv_cb, void *recv_cbarg,
isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
- sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
-#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
-#endif
REQUIRE(VALID_NM(mgr));
sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(sock, mgr, isc_nm_tcpdnslistener, iface);
- sock->rchildren = 0;
+ atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
sock->children = isc_mem_get(mgr->mctx, children_size);
memset(sock->children, 0, children_size);
- sock->result = ISC_R_DEFAULT;
- sock->tid = isc_random_uniform(sock->nchildren);
+ sock->result = ISC_R_UNSET;
+ sock->accept_cb = accept_cb;
+ sock->accept_cbarg = accept_cbarg;
+ sock->recv_cb = recv_cb;
+ sock->recv_cbarg = recv_cbarg;
+ sock->extrahandlesize = extrahandlesize;
+ sock->backlog = backlog;
+ sock->pquota = quota;
+
+ if (isc__nm_in_netthread()) {
+ sock->tid = isc_nm_tid();
+ } else {
+ sock->tid = isc_random_uniform(sock->nchildren);
+ }
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
- fd = isc__nm_tcpdns_lb_socket(sa_family);
+ fd = isc__nm_tcpdns_lb_socket(iface->addr.type.sa.sa_family);
#endif
- for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_tcpdnslisten_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
-
- isc__nmsocket_init(csock, mgr, isc_nm_tcpdnssocket, iface);
- csock->parent = sock;
- csock->accept_cb = accept_cb;
- csock->accept_cbarg = accept_cbarg;
- csock->recv_cb = recv_cb;
- csock->recv_cbarg = recv_cbarg;
- csock->extrahandlesize = extrahandlesize;
- csock->backlog = backlog;
- csock->tid = i;
- /*
- * We don't attach to quota, just assign - to avoid
- * increasing quota unnecessarily.
- */
- csock->pquota = quota;
- isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
+ isc_barrier_init(&sock->startlistening, sock->nchildren);
-#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
- csock->fd = isc__nm_tcpdns_lb_socket(sa_family);
-#else
- csock->fd = dup(fd);
-#endif
- REQUIRE(csock->fd >= 0);
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ if ((int)i == isc_nm_tid()) {
+ continue;
+ }
+ start_tcpdns_child(mgr, iface, sock, fd, i);
+ }
- ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock);
- isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ if (isc__nm_in_netthread()) {
+ start_tcpdns_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
#endif
LOCK(&sock->lock);
- while (sock->rchildren != sock->nchildren) {
+ while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
- BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
- INSIST(result != ISC_R_DEFAULT);
+
+ INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
- REQUIRE(sock->rchildren == sock->nchildren);
+ REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
- isc__nm_tcpdns_stoplistening(sock);
+ isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
int r;
int flags = 0;
isc_nmsocket_t *sock = NULL;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
sock->pquota = NULL;
}
- sock->parent->rchildren += 1;
- if (sock->parent->result == ISC_R_DEFAULT) {
+ atomic_fetch_add(&sock->parent->rchildren, 1);
+ if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
- if (!atomic_load(&sock->parent->active)) {
- WAIT(&sock->parent->scond, &sock->parent->lock);
- }
- INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
+
+ isc_barrier_wait(&sock->parent->startlistening);
}
static void
INSIST(0);
ISC_UNREACHABLE();
}
- enqueue_stoplistening(sock);
+
+ if (!isc__nm_in_netthread()) {
+ enqueue_stoplistening(sock);
+ } else if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_tcpdns_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
void
return;
}
- stop_tcpdns_parent(sock);
+ /*
+ * If network manager is paused, re-enqueue the event for later.
+ */
+ if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_tcpdns_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
void
static void
stop_tcpdns_child(isc_nmsocket_t *sock) {
- bool last_child = false;
-
REQUIRE(sock->type == isc_nm_tcpdnssocket);
REQUIRE(sock->tid == isc_nm_tid());
tcpdns_close_direct(sock);
- LOCK(&sock->parent->lock);
- sock->parent->rchildren -= 1;
- last_child = (sock->parent->rchildren == 0);
- UNLOCK(&sock->parent->lock);
+ atomic_fetch_sub(&sock->parent->rchildren, 1);
- if (last_child) {
- atomic_store(&sock->parent->closed, true);
- isc__nmsocket_prep_destroy(sock->parent);
- }
+ isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_tcpdns_parent(isc_nmsocket_t *sock) {
+ isc_nmsocket_t *csock = NULL;
+
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnslistener);
+ isc_barrier_init(&sock->stoplistening, sock->nchildren);
+
for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_tcpdnsstop_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
+ csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
- atomic_store(&csock->active, false);
+ if ((int)i == isc_nm_tid()) {
+ /*
+ * We need to schedule closing the other sockets first
+ */
+ continue;
+ }
- ievent = isc__nm_get_netievent_tcpdnsstop(sock->mgr, csock);
- isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
- (isc__netievent_t *)ievent);
+ atomic_store(&csock->active, false);
+ enqueue_stoplistening(csock);
}
+
+ csock = &sock->children[isc_nm_tid()];
+ atomic_store(&csock->active, false);
+ stop_tcpdns_child(csock);
+
+ atomic_store(&sock->closed, true);
+ isc__nmsocket_prep_destroy(sock);
}
static void
#include <uv.h>
#include <isc/atomic.h>
+#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
static isc_result_t
tlsdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
int r;
REQUIRE(VALID_NMSOCK(sock));
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
- sock->result = ISC_R_DEFAULT;
+ sock->result = ISC_R_UNSET;
sock->tls.ctx = sslctx;
atomic_init(&sock->client, true);
atomic_init(&sock->connecting, true);
(isc__netievent_t *)ievent);
}
LOCK(&sock->lock);
- while (sock->result == ISC_R_DEFAULT) {
+ while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
return (sock);
}
+static void
+start_tlsdns_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
+ uv_os_sock_t fd, int tid) {
+ isc__netievent_tlsdnslisten_t *ievent = NULL;
+ isc_nmsocket_t *csock = &sock->children[tid];
+
+ isc__nmsocket_init(csock, mgr, isc_nm_tlsdnssocket, iface);
+ csock->parent = sock;
+ csock->accept_cb = sock->accept_cb;
+ csock->accept_cbarg = sock->accept_cbarg;
+ csock->recv_cb = sock->recv_cb;
+ csock->recv_cbarg = sock->recv_cbarg;
+ csock->extrahandlesize = sock->extrahandlesize;
+ csock->backlog = sock->backlog;
+ csock->tid = tid;
+ csock->tls.ctx = sock->tls.ctx;
+
+ /*
+ * We don't attach to quota, just assign - to avoid
+ * increasing quota unnecessarily.
+ */
+ csock->pquota = sock->pquota;
+ isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
+
+#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
+ UNUSED(fd);
+ csock->fd = isc__nm_tlsdns_lb_socket(iface->addr.type.sa.sa_family);
+#else
+ csock->fd = dup(fd);
+#endif
+ REQUIRE(csock->fd >= 0);
+
+ ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
+ (isc__netievent_t *)ievent);
+}
+
isc_result_t
isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t recv_cb, void *recv_cbarg,
isc_tlsctx_t *sslctx, isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
- sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
-#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
-#endif
REQUIRE(VALID_NM(mgr));
sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(sock, mgr, isc_nm_tlsdnslistener, iface);
- sock->rchildren = 0;
+ atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
sock->children = isc_mem_get(mgr->mctx, children_size);
memset(sock->children, 0, children_size);
- sock->result = ISC_R_DEFAULT;
- sock->tid = isc_random_uniform(sock->nchildren);
- sock->fd = -1;
+ sock->result = ISC_R_UNSET;
+ sock->accept_cb = accept_cb;
+ sock->accept_cbarg = accept_cbarg;
+ sock->recv_cb = recv_cb;
+ sock->recv_cbarg = recv_cbarg;
+ sock->extrahandlesize = extrahandlesize;
+ sock->backlog = backlog;
+ sock->pquota = quota;
+
+ if (isc__nm_in_netthread()) {
+ sock->tid = isc_nm_tid();
+ } else {
+ sock->tid = isc_random_uniform(sock->nchildren);
+ }
+
sock->tls.ctx = sslctx;
+ sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
- fd = isc__nm_tlsdns_lb_socket(sa_family);
+ fd = isc__nm_tlsdns_lb_socket(iface->addr.type.sa.sa_family);
#endif
- for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_tlsdnslisten_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
-
- isc__nmsocket_init(csock, mgr, isc_nm_tlsdnssocket, iface);
- csock->parent = sock;
- csock->accept_cb = accept_cb;
- csock->accept_cbarg = accept_cbarg;
- csock->recv_cb = recv_cb;
- csock->recv_cbarg = recv_cbarg;
- csock->extrahandlesize = extrahandlesize;
- csock->backlog = backlog;
- csock->tid = i;
- csock->tls.ctx = sslctx;
-
- /*
- * We don't attach to quota, just assign - to avoid
- * increasing quota unnecessarily.
- */
- csock->pquota = quota;
- isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
+ isc_barrier_init(&sock->startlistening, sock->nchildren);
-#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
- csock->fd = isc__nm_tlsdns_lb_socket(sa_family);
-#else
- csock->fd = dup(fd);
-#endif
- REQUIRE(csock->fd >= 0);
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ if ((int)i == isc_nm_tid()) {
+ continue;
+ }
+ start_tlsdns_child(mgr, iface, sock, fd, i);
+ }
- ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock);
- isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ if (isc__nm_in_netthread()) {
+ start_tlsdns_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
#endif
LOCK(&sock->lock);
- while (sock->rchildren != sock->nchildren) {
+ while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
- BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
- INSIST(result != ISC_R_DEFAULT);
+
+ INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
- REQUIRE(sock->rchildren == sock->nchildren);
+ REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
- isc__nm_tlsdns_stoplistening(sock);
+ isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
int r;
int flags = 0;
isc_nmsocket_t *sock = NULL;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
sock->pquota = NULL;
}
- sock->parent->rchildren += 1;
- if (sock->parent->result == ISC_R_DEFAULT) {
+ atomic_fetch_add(&sock->parent->rchildren, 1);
+ if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
- if (!atomic_load(&sock->parent->active)) {
- WAIT(&sock->parent->scond, &sock->parent->lock);
- }
- INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
+
+ isc_barrier_wait(&sock->parent->startlistening);
}
static void
INSIST(0);
ISC_UNREACHABLE();
}
- enqueue_stoplistening(sock);
+
+ if (!isc__nm_in_netthread()) {
+ enqueue_stoplistening(sock);
+ } else if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_tlsdns_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
static void
return;
}
- stop_tlsdns_parent(sock);
+ /*
+ * If network manager is paused, re-enqueue the event for later.
+ */
+ if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_tlsdns_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
void
static void
stop_tlsdns_child(isc_nmsocket_t *sock) {
- bool last_child = false;
-
REQUIRE(sock->type == isc_nm_tlsdnssocket);
REQUIRE(sock->tid == isc_nm_tid());
tlsdns_close_direct(sock);
- LOCK(&sock->parent->lock);
- sock->parent->rchildren -= 1;
- last_child = (sock->parent->rchildren == 0);
- UNLOCK(&sock->parent->lock);
+ atomic_fetch_sub(&sock->parent->rchildren, 1);
- if (last_child) {
- atomic_store(&sock->parent->closed, true);
- isc__nmsocket_prep_destroy(sock->parent);
- }
+ isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_tlsdns_parent(isc_nmsocket_t *sock) {
+ isc_nmsocket_t *csock = NULL;
+
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tlsdnslistener);
+ isc_barrier_init(&sock->stoplistening, sock->nchildren);
+
for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_tlsdnsstop_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
+ csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
- atomic_store(&csock->active, false);
+ if ((int)i == isc_nm_tid()) {
+ /*
+ * We need to schedule closing the other sockets first
+ */
+ continue;
+ }
- ievent = isc__nm_get_netievent_tlsdnsstop(sock->mgr, csock);
- isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
- (isc__netievent_t *)ievent);
+ atomic_store(&csock->active, false);
+ enqueue_stoplistening(csock);
}
+
+ csock = &sock->children[isc_nm_tid()];
+ atomic_store(&csock->active, false);
+ stop_tlsdns_child(csock);
+
+ atomic_store(&sock->closed, true);
+ isc__nmsocket_prep_destroy(sock);
}
static void
tlssock->tlsstream.server_iface = *iface;
ISC_LINK_INIT(&tlssock->tlsstream.server_iface.addr, link);
tlssock->iface = &tlssock->tlsstream.server_iface;
- tlssock->result = ISC_R_DEFAULT;
+ tlssock->result = ISC_R_UNSET;
tlssock->accept_cb = accept_cb;
tlssock->accept_cbarg = accept_cbarg;
tlssock->extrahandlesize = extrahandlesize;
/* wait for listen result */
isc__nmsocket_attach(tlssock->outer, &tsock);
- LOCK(&tlssock->outer->lock);
- while (tlssock->outer->rchildren != tlssock->outer->nchildren) {
- WAIT(&tlssock->outer->cond, &tlssock->outer->lock);
- }
- result = tlssock->outer->result;
tlssock->result = result;
atomic_store(&tlssock->active, true);
INSIST(tlssock->outer->tlsstream.tlslistener == NULL);
isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener);
- BROADCAST(&tlssock->outer->scond);
- UNLOCK(&tlssock->outer->lock);
isc__nmsocket_detach(&tsock);
- INSIST(result != ISC_R_DEFAULT);
+ INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
atomic_store(&tlssock->listening, true);
ISC_LINK_INIT(&nsock->tlsstream.local_iface.addr, link);
nsock->iface = &nsock->tlsstream.local_iface;
nsock->extrahandlesize = extrahandlesize;
- nsock->result = ISC_R_DEFAULT;
+ nsock->result = ISC_R_UNSET;
nsock->connect_cb = cb;
nsock->connect_cbarg = cbarg;
nsock->connect_timeout = timeout;
#include <uv.h>
#include <isc/atomic.h>
+#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
return (sock);
}
+static void
+start_udp_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
+ uv_os_sock_t fd, int tid) {
+ isc_nmsocket_t *csock;
+ isc__netievent_udplisten_t *ievent = NULL;
+
+ csock = &sock->children[tid];
+
+ isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface);
+ csock->parent = sock;
+ csock->iface = sock->iface;
+ csock->reading = true;
+ csock->recv_cb = sock->recv_cb;
+ csock->recv_cbarg = sock->recv_cbarg;
+ csock->extrahandlesize = sock->extrahandlesize;
+ csock->tid = tid;
+
+#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
+ UNUSED(fd);
+ csock->fd = isc__nm_udp_lb_socket(iface->addr.type.sa.sa_family);
+#else
+ csock->fd = dup(fd);
+#endif
+ REQUIRE(csock->fd >= 0);
+
+ ievent = isc__nm_get_netievent_udplisten(mgr, csock);
+ isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
+ (isc__netievent_t *)ievent);
+}
+
isc_result_t
isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
- sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
-#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
- uv_os_sock_t fd = -1;
-#endif
-
REQUIRE(VALID_NM(mgr));
+ uv_os_sock_t fd = -1;
/*
* We are creating mgr->nworkers duplicated sockets, one
sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface);
- sock->rchildren = 0;
+ atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->extrahandlesize = extrahandlesize;
- sock->result = ISC_R_DEFAULT;
- sock->tid = isc_random_uniform(sock->nchildren);
+ sock->result = ISC_R_UNSET;
+ if (isc__nm_in_netthread()) {
+ sock->tid = isc_nm_tid();
+ } else {
+ sock->tid = isc_random_uniform(sock->nchildren);
+ }
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
- fd = isc__nm_udp_lb_socket(sa_family);
+ fd = isc__nm_udp_lb_socket(iface->addr.type.sa.sa_family);
#endif
- for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_udplisten_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
-
- isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface);
- csock->parent = sock;
- csock->iface = sock->iface;
- csock->reading = true;
- csock->recv_cb = cb;
- csock->recv_cbarg = cbarg;
- csock->extrahandlesize = sock->extrahandlesize;
- csock->tid = i;
+ isc_barrier_init(&sock->startlistening, sock->nchildren);
-#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
- csock->fd = isc__nm_udp_lb_socket(sa_family);
-#else
- csock->fd = dup(fd);
-#endif
- REQUIRE(csock->fd >= 0);
+ for (size_t i = 0; i < sock->nchildren; i++) {
+ if ((int)i == isc_nm_tid()) {
+ continue;
+ }
+ start_udp_child(mgr, iface, sock, fd, i);
+ }
- ievent = isc__nm_get_netievent_udplisten(mgr, csock);
- isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
- (isc__netievent_t *)ievent);
+ if (isc__nm_in_netthread()) {
+ start_udp_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
#endif
LOCK(&sock->lock);
- while (sock->rchildren != sock->nchildren) {
+ while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
- BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
- INSIST(result != ISC_R_DEFAULT);
+
+ INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
- REQUIRE(sock->rchildren == sock->nchildren);
+ REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
- isc__nm_udp_stoplistening(sock);
+ isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
int r, uv_bind_flags = 0;
int uv_init_flags = 0;
sa_family_t sa_family;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
done:
result = isc__nm_uverr2result(r);
- sock->parent->rchildren += 1;
- if (sock->parent->result == ISC_R_DEFAULT) {
+ atomic_fetch_add(&sock->parent->rchildren, 1);
+ if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
- if (!atomic_load(&sock->parent->active)) {
- WAIT(&sock->parent->scond, &sock->parent->lock);
- }
- INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
+
+ isc_barrier_wait(&sock->parent->startlistening);
}
static void
ISC_UNREACHABLE();
}
- enqueue_stoplistening(sock);
+ if (!isc__nm_in_netthread()) {
+ enqueue_stoplistening(sock);
+ } else if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_udp_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
/*
/*
* If network manager is paused, re-enqueue the event for later.
*/
- stop_udp_parent(sock);
+ if (!isc__nm_acquire_interlocked(sock->mgr)) {
+ enqueue_stoplistening(sock);
+ } else {
+ stop_udp_parent(sock);
+ isc__nm_drop_interlocked(sock->mgr);
+ }
}
/*
udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
int uv_bind_flags = UV_UDP_REUSEADDR;
- isc_result_t result = ISC_R_DEFAULT;
+ isc_result_t result = ISC_R_UNSET;
int tries = 3;
int r;
sock->read_timeout = timeout;
sock->extrahandlesize = extrahandlesize;
sock->peer = peer->addr;
- sock->result = ISC_R_DEFAULT;
+ sock->result = ISC_R_UNSET;
atomic_init(&sock->client, true);
req = isc__nm_uvreq_get(mgr, sock);
(isc__netievent_t *)event);
}
LOCK(&sock->lock);
- while (sock->result == ISC_R_DEFAULT) {
+ while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->tid == isc_nm_tid());
- bool last_child = false;
-
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true)) {
return;
udp_close_direct(sock);
- LOCK(&sock->parent->lock);
- sock->parent->rchildren -= 1;
- last_child = (sock->parent->rchildren == 0);
- UNLOCK(&sock->parent->lock);
+ atomic_fetch_sub(&sock->parent->rchildren, 1);
- if (last_child) {
- atomic_store(&sock->parent->closed, true);
- isc__nmsocket_prep_destroy(sock->parent);
- }
+ isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_udp_parent(isc_nmsocket_t *sock) {
+ isc_nmsocket_t *csock = NULL;
+
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udplistener);
+ isc_barrier_init(&sock->stoplistening, sock->nchildren);
+
for (size_t i = 0; i < sock->nchildren; i++) {
- isc__netievent_udpstop_t *ievent = NULL;
- isc_nmsocket_t *csock = &sock->children[i];
+ csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
- atomic_store(&csock->active, false);
+ if ((int)i == isc_nm_tid()) {
+ /*
+ * We need to schedule closing the other sockets first
+ */
+ continue;
+ }
- ievent = isc__nm_get_netievent_udpstop(sock->mgr, csock);
- isc__nm_enqueue_ievent(&sock->mgr->workers[i],
- (isc__netievent_t *)ievent);
+ atomic_store(&csock->active, false);
+ enqueue_stoplistening(csock);
}
+
+ csock = &sock->children[isc_nm_tid()];
+ atomic_store(&csock->active, false);
+ stop_udp_child(csock);
+
+ atomic_store(&sock->closed, true);
+ isc__nmsocket_prep_destroy(sock);
}
static void
REQUIRE(VALID_MANAGER(manager));
- XTHREADTRACE(e "isc_taskmgr_shutdown");
+ XTHREADTRACE("isc_taskmgr_shutdown");
/*
* Only one non-worker thread may ever call this routine.
* If a worker thread wants to initiate shutdown of the
<ClInclude Include="..\include\isc\backtrace.h">
<Filter>Library Header Files</Filter>
</ClInclude>
+ <ClInclude Include="..\include\isc\barrier.h">
+ <Filter>Library Header Files</Filter>
+ </ClInclude>
<ClInclude Include="..\include\isc\base32.h">
<Filter>Library Header Files</Filter>
</ClInclude>
<ClInclude Include="..\include\isc\astack.h" />
<ClInclude Include="..\include\isc\atomic.h" />
<ClInclude Include="..\include\isc\backtrace.h" />
+ <ClInclude Include="..\include\isc\barrier.h" />
<ClInclude Include="..\include\isc\base32.h" />
<ClInclude Include="..\include\isc\base64.h" />
<ClInclude Include="..\include\isc\bind9.h" />
./lib/isc/include/isc/atomic.h C 2018,2019,2020,2021
./lib/isc/include/isc/attributes.h C 2020,2021
./lib/isc/include/isc/backtrace.h C 2009,2016,2018,2019,2020,2021
+./lib/isc/include/isc/barrier.h C 2021
./lib/isc/include/isc/base32.h C 2008,2014,2016,2018,2019,2020,2021
./lib/isc/include/isc/base64.h C 1999,2000,2001,2004,2005,2006,2007,2016,2018,2019,2020,2021
./lib/isc/include/isc/bind9.h C 2009,2013,2016,2018,2019,2020,2021