async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
-static bool
-process_queue(isc__networker_t *worker, isc_queue_t *queue,
- unsigned int *quantump);
+static isc_result_t
+process_queue(isc__networker_t *worker, netievent_type_t type);
static void
wait_for_priority_queue(isc__networker_t *worker);
-static bool
-process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
-static bool
-process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
-static bool
-process_task_queue(isc__networker_t *worker, unsigned int *quantump);
-static bool
-process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
-
-#define drain_priority_queue(worker) \
- (void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
-#define drain_privilege_queue(worker) \
- (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
-#define drain_task_queue(worker) \
- (void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
-#define drain_normal_queue(worker) \
- (void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
+static void
+drain_queue(isc__networker_t *worker, netievent_type_t type);
+
+#define ENQUEUE_NETIEVENT(worker, queue, event) \
+ isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
+#define DEQUEUE_NETIEVENT(worker, queue) \
+ (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
+
+#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
+ ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
+#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
+ ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
+#define ENQUEUE_TASK_NETIEVENT(worker, event) \
+ ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
+#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
+ ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
+
+#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
+ DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
+#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
+ DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
+#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
+#define DEQUEUE_NORMAL_NETIEVENT(worker) \
+ DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
+
+#define INCREMENT_NETIEVENT(worker, queue) \
+ atomic_fetch_add_release(&worker->nievents[queue], 1)
+#define DECREMENT_NETIEVENT(worker, queue) \
+ atomic_fetch_sub_release(&worker->nievents[queue], 1)
+
+#define INCREMENT_PRIORITY_NETIEVENT(worker) \
+ INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
+#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
+ INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
+#define INCREMENT_TASK_NETIEVENT(worker) \
+ INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
+#define INCREMENT_NORMAL_NETIEVENT(worker) \
+ INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
+
+#define DECREMENT_PRIORITY_NETIEVENT(worker) \
+ DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
+#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
+ DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
+#define DECREMENT_TASK_NETIEVENT(worker) \
+ DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
+#define DECREMENT_NORMAL_NETIEVENT(worker) \
+ DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
*worker = (isc__networker_t){
.mgr = mgr,
.id = i,
- .quantum = ISC_NETMGR_QUANTUM_DEFAULT,
};
r = uv_loop_init(&worker->loop);
isc_mutex_init(&worker->lock);
- worker->ievents = isc_queue_new(mgr->mctx, 128);
- worker->ievents_task = isc_queue_new(mgr->mctx, 128);
- worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
- worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
- isc_condition_init(&worker->cond_prio);
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ worker->ievents[type] = isc_queue_new(mgr->mctx, 128);
+ atomic_init(&worker->nievents[type], 0);
+ }
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
int r;
/* Empty the async event queues */
- while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
- worker->ievents)) != NULL)
- {
+ while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc_mempool_put(mgr->evpool, ievent);
}
- INSIST(isc_queue_dequeue(worker->ievents_priv) ==
- (uintptr_t)NULL);
- INSIST(isc_queue_dequeue(worker->ievents_task) ==
- (uintptr_t)NULL);
+ INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
+ INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
- while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
- worker->ievents_prio)) != NULL)
- {
+ while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc_mempool_put(mgr->evpool, ievent);
}
isc_condition_destroy(&worker->cond_prio);
r = uv_loop_close(&worker->loop);
INSIST(r == 0);
- isc_queue_destroy(worker->ievents);
- isc_queue_destroy(worker->ievents_priv);
- isc_queue_destroy(worker->ievents_task);
- isc_queue_destroy(worker->ievents_prio);
- isc_mutex_destroy(&worker->lock);
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ isc_queue_destroy(worker->ievents[type]);
+ }
isc_mem_put(mgr->mctx, worker->sendbuf,
ISC_NETMGR_SENDBUF_SIZE);
if (isc__nm_in_netthread()) {
REQUIRE(isc_nm_tid() == 0);
- drain_priority_queue(&mgr->workers[isc_nm_tid()]);
+ drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY);
}
for (int i = 0; i < mgr->nworkers; i++) {
}
if (isc__nm_in_netthread()) {
- drain_privilege_queue(&mgr->workers[isc_nm_tid()]);
+ drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED);
atomic_fetch_sub(&mgr->workers_paused, 1);
isc_barrier_wait(&mgr->resuming);
* All workers must drain the privileged event
* queue before we resume from pause.
*/
- drain_privilege_queue(worker);
+ drain_queue(worker, NETIEVENT_PRIVILEGED);
atomic_fetch_sub(&mgr->workers_paused, 1);
if (isc_barrier_wait(&mgr->resuming) != 0) {
* (they may include shutdown events) but do not process
* the netmgr event queue.
*/
- drain_privilege_queue(worker);
- drain_task_queue(worker);
+ drain_queue(worker, NETIEVENT_PRIVILEGED);
+ drain_queue(worker, NETIEVENT_TASK);
LOCK(&mgr->lock);
mgr->workers_running--;
}
static bool
-process_all_queues(isc__networker_t *worker, unsigned int quantum) {
+process_all_queues(isc__networker_t *worker) {
+ bool reschedule = false;
/*
* The queue processing functions will return false when the
- * system is pausing or stopping, or if we have completed
- * 'quantum' events.
- *
- * We don't want to proceed to a new queue until the previous one
- * has been fully drained, so whenever one queue is interrupted,
- * we skip all the later ones.
+ * system is pausing or stopping and we don't want to process
+ * the other queues in such case, but we need the async event
+ * to be rescheduled in the next uv_run().
*/
- return (process_priority_queue(worker, &quantum) &&
- process_privilege_queue(worker, &quantum) &&
- process_task_queue(worker, &quantum) &&
- process_normal_queue(worker, &quantum));
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ isc_result_t result = process_queue(worker, type);
+ switch (result) {
+ case ISC_R_SUSPEND:
+ return (true);
+ case ISC_R_EMPTY:
+ /* empty queue */
+ break;
+ case ISC_R_SUCCESS:
+ reschedule = true;
+ break;
+ default:
+ INSIST(0);
+ ISC_UNREACHABLE();
+ }
+ }
+
+ return (reschedule);
}
/*
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
- unsigned int quantum = worker->quantum;
- if (!process_all_queues(worker, quantum)) {
+ if (process_all_queues(worker)) {
/* If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
static void
wait_for_priority_queue(isc__networker_t *worker) {
- isc_queue_t *queue = worker->ievents_prio;
isc_condition_t *cond = &worker->cond_prio;
bool wait_for_work = true;
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
- ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
+ ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
if (wait_for_work) {
while (ievent == NULL) {
WAIT(cond, &worker->lock);
- ievent = (isc__netievent_t *)isc_queue_dequeue(
- queue);
+ ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
}
}
UNLOCK(&worker->lock);
if (ievent == NULL) {
return;
}
+ DECREMENT_PRIORITY_NETIEVENT(worker);
(void)process_netievent(worker, ievent);
}
}
-static bool
-process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
- return (process_queue(worker, worker->ievents_prio, quantump));
-}
-
-static bool
-process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
- return (process_queue(worker, worker->ievents_priv, quantump));
-}
-
-static bool
-process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
- return (process_queue(worker, worker->ievents_task, quantump));
-}
-
-static bool
-process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
- return (process_queue(worker, worker->ievents, quantump));
+static void
+drain_queue(isc__networker_t *worker, netievent_type_t type) {
+ while (process_queue(worker, type) != ISC_R_EMPTY) {
+ ;
+ }
}
/*
return (true);
}
-static bool
-process_queue(isc__networker_t *worker, isc_queue_t *queue,
- unsigned int *quantump) {
- while (*quantump > 0) {
- isc__netievent_t *ievent =
- (isc__netievent_t *)isc_queue_dequeue(queue);
-
- if (ievent == NULL) {
- /* We fully drained this queue */
- return (true);
- }
+static isc_result_t
+process_queue(isc__networker_t *worker, netievent_type_t type) {
+ /*
+ * The number of items on the queue is only loosely synchronized with
+ * the items on the queue. But there's a guarantee that if there's an
+ * item on the queue, it will be accounted for. However there's a
+ * possibility that the counter might be higher than the items on the
+ * queue stored.
+ */
+ uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
+ isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
+
+ if (ievent == NULL && waiting == 0) {
+ /* There's nothing scheduled */
+ return (ISC_R_EMPTY);
+ } else if (ievent == NULL) {
+ /* There's at least one item scheduled, but not on the queue yet
+ */
+ return (ISC_R_SUCCESS);
+ }
- (*quantump)--;
+ while (ievent != NULL) {
+ DECREMENT_NETIEVENT(worker, type);
+ bool stop = !process_netievent(worker, ievent);
- if (!process_netievent(worker, ievent)) {
+ if (stop) {
/* Netievent told us to stop */
- return (false);
+ return (ISC_R_SUSPEND);
+ }
+
+ if (waiting-- == 0) {
+ /* We reached this round "quota" */
+ break;
}
+
+ ievent = DEQUEUE_NETIEVENT(worker, type);
}
- /* No more quantum */
- return (false);
+ /* We processed at least one */
+ return (ISC_R_SUCCESS);
}
void *
* the queue will be processed.
*/
LOCK(&worker->lock);
- isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
+ INCREMENT_PRIORITY_NETIEVENT(worker);
+ ENQUEUE_PRIORITY_NETIEVENT(worker, event);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
- isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
+ INCREMENT_PRIVILEGED_NETIEVENT(worker);
+ ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
} else if (event->type == netievent_task) {
- isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
+ INCREMENT_TASK_NETIEVENT(worker);
+ ENQUEUE_TASK_NETIEVENT(worker, event);
} else {
- isc_queue_enqueue(worker->ievents, (uintptr_t)event);
+ INCREMENT_NORMAL_NETIEVENT(worker);
+ ENQUEUE_NORMAL_NETIEVENT(worker, event);
}
uv_async_send(&worker->async);
}