#include <isc/dir.h>
#include <isc/file.h>
#include <isc/hash.h>
-#include <isc/hp.h>
#include <isc/httpd.h>
#include <isc/managers.h>
#include <isc/netmgr.h>
bind9.@O@ buffer.@O@ bufferlist.@O@ \
commandline.@O@ counter.@O@ crc64.@O@ error.@O@ entropy.@O@ \
event.@O@ hash.@O@ ht.@O@ heap.@O@ hex.@O@ \
- hmac.@O@ hp.@O@ httpd.@O@ iterated_hash.@O@ \
+ hmac.@O@ httpd.@O@ iterated_hash.@O@ \
lex.@O@ lfsr.@O@ lib.@O@ log.@O@ \
managers.@O@ md.@O@ mem.@O@ mutexblock.@O@ \
netmgr/netmgr.@O@ netmgr/tcp.@O@ netmgr/udp.@O@ \
netmgr/tcpdns.@O@ \
netmgr/uverr2result.@O@ netmgr/uv-compat.@O@ \
netaddr.@O@ netscope.@O@ nonce.@O@ openssl_shim.@O@ pool.@O@ \
- parseint.@O@ portset.@O@ queue.@O@ quota.@O@ \
+ parseint.@O@ portset.@O@ quota.@O@ \
radix.@O@ random.@O@ ratelimiter.@O@ \
region.@O@ regex.@O@ result.@O@ rwlock.@O@ \
safe.@O@ serial.@O@ siphash.@O@ sockaddr.@O@ stats.@O@ \
backtrace.c base32.c base64.c bind9.c \
buffer.c bufferlist.c commandline.c counter.c crc64.c \
entropy.c error.c event.c hash.c ht.c heap.c \
- hex.c hmac.c hp.c httpd.c iterated_hash.c \
+ hex.c hmac.c httpd.c iterated_hash.c \
lex.c lfsr.c lib.c log.c \
managers.c md.c mem.c mutexblock.c \
netaddr.c netscope.c nonce.c openssl_shim.c pool.c \
- parseint.c portset.c queue.c quota.c radix.c random.c \
+ parseint.c portset.c quota.c radix.c random.c \
ratelimiter.c region.c regex.c result.c rwlock.c \
safe.c serial.c siphash.c sockaddr.c stats.c string.c \
symtab.c task.c taskpool.c timer.c tls.c \
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-/*
- * Hazard Pointer implementation.
- *
- * This work is based on C++ code available from:
- * https://github.com/pramalhe/ConcurrencyFreaks/
- *
- * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of Concurrency Freaks nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
- * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
- * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
- * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
- * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <inttypes.h>
-
-#include <isc/atomic.h>
-#include <isc/hp.h>
-#include <isc/mem.h>
-#include <isc/once.h>
-#include <isc/string.h>
-#include <isc/thread.h>
-#include <isc/util.h>
-
-#define HP_MAX_THREADS 128
-static int isc__hp_max_threads = HP_MAX_THREADS;
-#define HP_MAX_HPS 4 /* This is named 'K' in the HP paper */
-#define CLPAD (128 / sizeof(uintptr_t))
-#define HP_THRESHOLD_R 0 /* This is named 'R' in the HP paper */
-
-/* Maximum number of retired objects per thread */
-static int isc__hp_max_retired = HP_MAX_THREADS * HP_MAX_HPS;
-
-typedef struct retirelist {
- int size;
- uintptr_t *list;
-} retirelist_t;
-
-struct isc_hp {
- int max_hps;
- isc_mem_t *mctx;
- atomic_uintptr_t **hp;
- retirelist_t **rl;
- isc_hp_deletefunc_t *deletefunc;
-};
-
-static int
-tid(void) {
- return (isc_tid_v);
-}
-
-void
-isc_hp_init(int max_threads) {
- isc__hp_max_threads = max_threads;
- isc__hp_max_retired = max_threads * HP_MAX_HPS;
-}
-
-isc_hp_t *
-isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc) {
- isc_hp_t *hp = isc_mem_get(mctx, sizeof(*hp));
-
- if (max_hps == 0) {
- max_hps = HP_MAX_HPS;
- }
-
- *hp = (isc_hp_t){ .max_hps = max_hps, .deletefunc = deletefunc };
-
- isc_mem_attach(mctx, &hp->mctx);
-
- hp->hp = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->hp[0]));
- hp->rl = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->rl[0]));
-
- for (int i = 0; i < isc__hp_max_threads; i++) {
- hp->hp[i] = isc_mem_get(mctx, CLPAD * 2 * sizeof(hp->hp[i][0]));
- hp->rl[i] = isc_mem_get(mctx, sizeof(*hp->rl[0]));
- *hp->rl[i] = (retirelist_t){ .size = 0 };
-
- for (int j = 0; j < hp->max_hps; j++) {
- atomic_init(&hp->hp[i][j], 0);
- }
- hp->rl[i]->list = isc_mem_get(
- hp->mctx, isc__hp_max_retired * sizeof(uintptr_t));
- }
-
- return (hp);
-}
-
-void
-isc_hp_destroy(isc_hp_t *hp) {
- for (int i = 0; i < isc__hp_max_threads; i++) {
- isc_mem_put(hp->mctx, hp->hp[i],
- CLPAD * 2 * sizeof(hp->hp[i][0]));
-
- for (int j = 0; j < hp->rl[i]->size; j++) {
- void *data = (void *)hp->rl[i]->list[j];
- hp->deletefunc(data);
- }
- isc_mem_put(hp->mctx, hp->rl[i]->list,
- isc__hp_max_retired * sizeof(uintptr_t));
- isc_mem_put(hp->mctx, hp->rl[i], sizeof(*hp->rl[0]));
- }
- isc_mem_put(hp->mctx, hp->hp, isc__hp_max_threads * sizeof(hp->hp[0]));
- isc_mem_put(hp->mctx, hp->rl, isc__hp_max_threads * sizeof(hp->rl[0]));
-
- isc_mem_putanddetach(&hp->mctx, hp, sizeof(*hp));
-}
-
-void
-isc_hp_clear(isc_hp_t *hp) {
- for (int i = 0; i < hp->max_hps; i++) {
- atomic_store_release(&hp->hp[tid()][i], 0);
- }
-}
-
-void
-isc_hp_clear_one(isc_hp_t *hp, int ihp) {
- atomic_store_release(&hp->hp[tid()][ihp], 0);
-}
-
-uintptr_t
-isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom) {
- uintptr_t n = 0;
- uintptr_t ret;
- while ((ret = atomic_load(atom)) != n) {
- atomic_store(&hp->hp[tid()][ihp], ret);
- n = ret;
- }
- return (ret);
-}
-
-uintptr_t
-isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) {
- atomic_store(&hp->hp[tid()][ihp], atomic_load(&ptr));
- return (atomic_load(&ptr));
-}
-
-uintptr_t
-isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) {
- atomic_store_release(&hp->hp[tid()][ihp], atomic_load(&ptr));
- return (atomic_load(&ptr));
-}
-
-void
-isc_hp_retire(isc_hp_t *hp, uintptr_t ptr) {
- hp->rl[tid()]->list[hp->rl[tid()]->size++] = ptr;
- INSIST(hp->rl[tid()]->size < isc__hp_max_retired);
-
- if (hp->rl[tid()]->size < HP_THRESHOLD_R) {
- return;
- }
-
- for (int iret = 0; iret < hp->rl[tid()]->size; iret++) {
- uintptr_t obj = hp->rl[tid()]->list[iret];
- bool can_delete = true;
- for (int itid = 0; itid < isc__hp_max_threads && can_delete;
- itid++) {
- for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
- if (atomic_load(&hp->hp[itid][ihp]) == obj) {
- can_delete = false;
- break;
- }
- }
- }
-
- if (can_delete) {
- size_t bytes = (hp->rl[tid()]->size - iret) *
- sizeof(hp->rl[tid()]->list[0]);
- memmove(&hp->rl[tid()]->list[iret],
- &hp->rl[tid()]->list[iret + 1], bytes);
- hp->rl[tid()]->size--;
- hp->deletefunc((void *)obj);
- }
- }
-}
cmocka.h commandline.h counter.h crc64.h deprecated.h \
endian.h errno.h error.h event.h eventclass.h \
file.h formatcheck.h fsaccess.h fuzz.h \
- hash.h heap.h hex.h hmac.h hp.h ht.h httpd.h \
+ hash.h heap.h hex.h hmac.h ht.h httpd.h \
interfaceiter.h iterated_hash.h \
lang.h lex.h lfsr.h lib.h likely.h list.h log.h \
magic.h managers.h md.h mem.h meminfo.h \
mutexblock.h \
netaddr.h netmgr.h netscope.h nonce.h os.h parseint.h \
- pool.h portset.h print.h queue.h quota.h \
+ pool.h portset.h print.h quota.h \
radix.h random.h ratelimiter.h refcount.h regex.h \
region.h resource.h result.h resultclass.h rwlock.h \
safe.h serial.h siphash.h sockaddr.h socket.h \
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-/*
- * Hazard Pointer implementation.
- *
- * This work is based on C++ code available from:
- * https://github.com/pramalhe/ConcurrencyFreaks/
- *
- * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of Concurrency Freaks nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
- * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
- * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
- * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
- * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include <isc/atomic.h>
-#include <isc/mem.h>
-#include <isc/string.h>
-#include <isc/types.h>
-#include <isc/util.h>
-
-/*%
- * Hazard pointers are a mechanism for protecting objects in memory
- * from being deleted by other threads while in use. This allows
- * safe lock-free data structures.
- *
- * This is an adaptation of the ConcurrencyFreaks implementation in C.
- * More details available at https://github.com/pramalhe/ConcurrencyFreaks,
- * in the file HazardPointers.hpp.
- */
-
-typedef void(isc_hp_deletefunc_t)(void *);
-
-void
-isc_hp_init(int max_threads);
-/*%<
- * Initialize hazard pointer constants - isc__hp_max_threads. If more threads
- * will try to access hp it will assert.
- */
-
-isc_hp_t *
-isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc);
-/*%<
- * Create a new hazard pointer array of size 'max_hps' (or a reasonable
- * default value if 'max_hps' is 0). The function 'deletefunc' will be
- * used to delete objects protected by hazard pointers when it becomes
- * safe to retire them.
- */
-
-void
-isc_hp_destroy(isc_hp_t *hp);
-/*%<
- * Destroy a hazard pointer array and clean up all objects protected
- * by hazard pointers.
- */
-
-void
-isc_hp_clear(isc_hp_t *hp);
-/*%<
- * Clear all hazard pointers in the array for the current thread.
- *
- * Progress condition: wait-free bounded (by max_hps)
- */
-
-void
-isc_hp_clear_one(isc_hp_t *hp, int ihp);
-/*%<
- * Clear a specified hazard pointer in the array for the current thread.
- *
- * Progress condition: wait-free population oblivious.
- */
-
-uintptr_t
-isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom);
-/*%<
- * Protect an object referenced by 'atom' with a hazard pointer for the
- * current thread.
- *
- * Progress condition: lock-free.
- */
-
-uintptr_t
-isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr);
-/*%<
- * This returns the same value that is passed as ptr, which is sometimes
- * useful.
- *
- * Progress condition: wait-free population oblivious.
- */
-
-uintptr_t
-isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr);
-/*%<
- * Same as isc_hp_protect_ptr(), but explicitly uses memory_order_release.
- *
- * Progress condition: wait-free population oblivious.
- */
-
-void
-isc_hp_retire(isc_hp_t *hp, uintptr_t ptr);
-/*%<
- * Retire an object that is no longer in use by any thread, calling
- * the delete function that was specified in isc_hp_new().
- *
- * Progress condition: wait-free bounded (by the number of threads squared)
- */
* information regarding copyright ownership.
*/
-#ifndef ISC_LIST_H
-#define ISC_LIST_H 1
+#pragma once
#include <isc/assertions.h>
#define __ISC_LIST_DEQUEUEUNSAFE_TYPE(list, elt, link, type) \
__ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, type)
-#endif /* ISC_LIST_H */
+#define ISC_LIST_MOVEUNSAFE(dest, src) \
+ { \
+ (dest).head = (src).head; \
+ (dest).tail = (src).tail; \
+ (src).head = NULL; \
+ (src).tail = NULL; \
+ }
+
+#define ISC_LIST_MOVE(dest, src) \
+ { \
+ INSIST(ISC_LIST_EMPTY(dest)); \
+ ISC_LIST_MOVEUNSAFE(dest, src); \
+ }
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-#pragma once
-#include <isc/mem.h>
-
-typedef struct isc_queue isc_queue_t;
-
-isc_queue_t *
-isc_queue_new(isc_mem_t *mctx, int max_threads);
-/*%<
- * Create a new fetch-and-add array queue.
- *
- * 'max_threads' is currently unused. In the future it can be used
- * to pass a maximum threads parameter when creating hazard pointers,
- * but currently `isc_hp_t` uses a hard-coded value.
- */
-
-void
-isc_queue_enqueue(isc_queue_t *queue, uintptr_t item);
-/*%<
- * Enqueue an object pointer 'item' at the tail of the queue.
- *
- * Requires:
- * \li 'item' is not null.
- */
-
-uintptr_t
-isc_queue_dequeue(isc_queue_t *queue);
-/*%<
- * Remove an object pointer from the head of the queue and return the
- * pointer. If the queue is empty, return `nulluintptr` (the uintptr_t
- * representation of NULL).
- *
- * Requires:
- * \li 'queue' is not null.
- */
-
-void
-isc_queue_destroy(isc_queue_t *queue);
-/*%<
- * Destroy a queue.
- *
- * Requires:
- * \li 'queue' is not null.
- */
typedef int16_t isc_dscp_t; /*%< Diffserv code point */
typedef struct isc_event isc_event_t; /*%< Event */
typedef ISC_LIST(isc_event_t) isc_eventlist_t; /*%< Event List */
-typedef unsigned int isc_eventtype_t; /*%< Event Type */
-typedef uint32_t isc_fsaccess_t; /*%< FS Access */
-typedef struct isc_hash isc_hash_t; /*%< Hash */
-typedef struct isc_hp isc_hp_t; /*%< Hazard
- * pointer */
+typedef unsigned int isc_eventtype_t; /*%< Event Type */
+typedef uint32_t isc_fsaccess_t; /*%< FS Access */
+typedef struct isc_hash isc_hash_t; /*%< Hash */
typedef struct isc_httpd isc_httpd_t; /*%< HTTP client */
typedef void(isc_httpdfree_t)(isc_buffer_t *, void *); /*%< HTTP free function
*/
* information regarding copyright ownership.
*/
-#include <isc/hp.h>
#include <isc/managers.h>
#include <isc/util.h>
isc_taskmgr_t *taskmgr = NULL;
isc_nm_t *netmgr = NULL;
- /*
- * We have ncpus network threads, ncpus old network threads - make
- * it 4x just to be on the safe side.
- */
- isc_hp_init(4 * workers);
-
REQUIRE(netmgrp != NULL && *netmgrp == NULL);
isc__netmgr_create(mctx, workers, &netmgr);
*netmgrp = netmgr;
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
-#include <isc/queue.h>
#include <isc/quota.h>
#include <isc/random.h>
#include <isc/refcount.h>
NETIEVENT_MAX = 4,
} netievent_type_t;
+typedef struct isc__nm_uvreq isc__nm_uvreq_t;
+typedef struct isc__netievent isc__netievent_t;
+
+typedef ISC_LIST(isc__netievent_t) isc__netievent_list_t;
+
+typedef struct ievent {
+ isc_mutex_t lock;
+ isc_condition_t cond;
+ isc__netievent_list_t list;
+} ievent_t;
+
/*
* Single network event loop worker.
*/
uv_loop_t loop; /* libuv loop structure */
uv_async_t async; /* async channel to send
* data to this networker */
- isc_mutex_t lock;
bool paused;
bool finished;
isc_thread_t thread;
- isc_queue_t *ievents[NETIEVENT_MAX];
- atomic_uint_fast32_t nievents[NETIEVENT_MAX];
- isc_condition_t cond_prio;
+ ievent_t ievents[NETIEVENT_MAX];
isc_refcount_t references;
atomic_int_fast64_t pktcount;
* either in netmgr.c or matching protocol file (e.g. udp.c, tcp.c, etc.)
*/
-#define NETIEVENT__SOCKET \
- isc__netievent_type type; \
- isc_nmsocket_t *sock; \
- const char *file; \
- unsigned int line; \
+#define NETIEVENT__SOCKET \
+ isc__netievent_type type; \
+ ISC_LINK(isc__netievent_t) link; \
+ isc_nmsocket_t *sock; \
+ const char *file; \
+ unsigned int line; \
const char *func
typedef struct isc__netievent__socket {
}
typedef struct isc__netievent__socket_req_result {
- isc__netievent_type type;
- isc_nmsocket_t *sock;
+ NETIEVENT__SOCKET;
isc__nm_uvreq_t *req;
isc_result_t result;
} isc__netievent__socket_req_result_t;
typedef struct isc__netievent__task {
isc__netievent_type type;
+ ISC_LINK(isc__netievent_t) link;
isc_task_t *task;
} isc__netievent__task_t;
typedef struct isc__netievent {
isc__netievent_type type;
+ ISC_LINK(isc__netievent_t) link;
} isc__netievent_t;
#define NETIEVENT_TYPE(type) typedef isc__netievent_t isc__netievent_##type##_t;
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
+#include <isc/list.h>
#include <isc/log.h>
#include <isc/magic.h>
#include <isc/mem.h>
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
+
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static isc_result_t
static void
drain_queue(isc__networker_t *worker, netievent_type_t type);
-#define ENQUEUE_NETIEVENT(worker, queue, event) \
- isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
-#define DEQUEUE_NETIEVENT(worker, queue) \
- (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
-
-#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
- ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
-#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
- ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
-#define ENQUEUE_TASK_NETIEVENT(worker, event) \
- ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
-#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
- ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
-
-#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
- DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
-#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
- DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
-#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
-#define DEQUEUE_NORMAL_NETIEVENT(worker) \
- DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
-
-#define INCREMENT_NETIEVENT(worker, queue) \
- atomic_fetch_add_release(&worker->nievents[queue], 1)
-#define DECREMENT_NETIEVENT(worker, queue) \
- atomic_fetch_sub_release(&worker->nievents[queue], 1)
-
-#define INCREMENT_PRIORITY_NETIEVENT(worker) \
- INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
-#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
- INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
-#define INCREMENT_TASK_NETIEVENT(worker) \
- INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
-#define INCREMENT_NORMAL_NETIEVENT(worker) \
- INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
-
-#define DECREMENT_PRIORITY_NETIEVENT(worker) \
- DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
-#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
- DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
-#define DECREMENT_TASK_NETIEVENT(worker) \
- DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
-#define DECREMENT_NORMAL_NETIEVENT(worker) \
- DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
-
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
static void
r = uv_async_init(&worker->loop, &worker->async, async_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
- isc_mutex_init(&worker->lock);
- isc_condition_init(&worker->cond_prio);
-
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
- worker->ievents[type] = isc_queue_new(mgr->mctx, 128);
- atomic_init(&worker->nievents[type], 0);
+ isc_mutex_init(&worker->ievents[type].lock);
+ isc_condition_init(&worker->ievents[type].cond);
+ ISC_LIST_INIT(worker->ievents[type].list);
}
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
for (int i = 0; i < mgr->nworkers; i++) {
isc__networker_t *worker = &mgr->workers[i];
- isc__netievent_t *ievent = NULL;
int r;
- /* Empty the async event queues */
- while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
- isc__nm_put_netievent(mgr, ievent);
- }
-
- INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
- INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
-
- while ((ievent = DEQUEUE_NORMAL_NETIEVENT(worker)) != NULL) {
- isc__nm_put_netievent(mgr, ievent);
- }
- isc_condition_destroy(&worker->cond_prio);
- isc_mutex_destroy(&worker->lock);
-
r = uv_loop_close(&worker->loop);
UV_RUNTIME_CHECK(uv_loop_close, r);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
- isc_queue_destroy(worker->ievents[type]);
+ INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
+ isc_condition_destroy(&worker->ievents[type].cond);
+ isc_mutex_destroy(&worker->ievents[type].lock);
}
isc_mem_put(mgr->mctx, worker->sendbuf,
}
/*
- * We are shutting down. Process the task queues
- * (they may include shutdown events) but do not process
- * the netmgr event queue.
+ * We are shutting down. Drain the queues.
*/
drain_queue(worker, NETIEVENT_PRIVILEGED);
drain_queue(worker, NETIEVENT_TASK);
+ for (size_t type = 0; type < NETIEVENT_MAX; type++) {
+ LOCK(&worker->ievents[type].lock);
+ INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
+ UNLOCK(&worker->ievents[type].lock);
+ }
+
LOCK(&mgr->lock);
mgr->workers_running--;
SIGNAL(&mgr->wkstatecond);
isc_result_t result = process_queue(worker, type);
switch (result) {
case ISC_R_SUSPEND:
- return (true);
+ reschedule = true;
+ break;
case ISC_R_EMPTY:
/* empty queue */
break;
static void
wait_for_priority_queue(isc__networker_t *worker) {
- isc_condition_t *cond = &worker->cond_prio;
- bool wait_for_work = true;
+ isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond;
+ isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock;
+ isc__netievent_list_t *list =
+ &(worker->ievents[NETIEVENT_PRIORITY].list);
- while (true) {
- isc__netievent_t *ievent;
- LOCK(&worker->lock);
- ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
- if (wait_for_work) {
- while (ievent == NULL) {
- WAIT(cond, &worker->lock);
- ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
- }
- }
- UNLOCK(&worker->lock);
- wait_for_work = false;
-
- if (ievent == NULL) {
- return;
- }
- DECREMENT_PRIORITY_NETIEVENT(worker);
-
- (void)process_netievent(worker, ievent);
+ LOCK(lock);
+ while (ISC_LIST_EMPTY(*list)) {
+ WAIT(cond, lock);
}
+ UNLOCK(lock);
+
+ drain_queue(worker, NETIEVENT_PRIORITY);
}
static void
drain_queue(isc__networker_t *worker, netievent_type_t type) {
- while (process_queue(worker, type) != ISC_R_EMPTY) {
- ;
+ bool empty = false;
+ while (!empty) {
+ if (process_queue(worker, type) == ISC_R_EMPTY) {
+ LOCK(&worker->ievents[type].lock);
+ empty = ISC_LIST_EMPTY(worker->ievents[type].list);
+ UNLOCK(&worker->ievents[type].lock);
+ }
}
}
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type) {
- /*
- * The number of items on the queue is only loosely synchronized with
- * the items on the queue. But there's a guarantee that if there's an
- * item on the queue, it will be accounted for. However there's a
- * possibility that the counter might be higher than the items on the
- * queue stored.
- */
- uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
- isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
+ isc__netievent_t *ievent = NULL;
+ isc__netievent_list_t list;
+
+ ISC_LIST_INIT(list);
- if (ievent == NULL && waiting == 0) {
+ LOCK(&worker->ievents[type].lock);
+ ISC_LIST_MOVE(list, worker->ievents[type].list);
+ UNLOCK(&worker->ievents[type].lock);
+
+ ievent = ISC_LIST_HEAD(list);
+ if (ievent == NULL) {
/* There's nothing scheduled */
return (ISC_R_EMPTY);
- } else if (ievent == NULL) {
- /* There's at least one item scheduled, but not on the queue yet
- */
- return (ISC_R_SUCCESS);
}
while (ievent != NULL) {
- DECREMENT_NETIEVENT(worker, type);
- bool stop = !process_netievent(worker, ievent);
-
- if (stop) {
- /* Netievent told us to stop */
+ isc__netievent_t *next = ISC_LIST_NEXT(ievent, link);
+ ISC_LIST_DEQUEUE(list, ievent, link);
+
+ if (!process_netievent(worker, ievent)) {
+ /* The netievent told us to stop */
+ if (!ISC_LIST_EMPTY(list)) {
+ /*
+ * Reschedule the rest of the unprocessed
+ * events.
+ */
+ LOCK(&worker->ievents[type].lock);
+ ISC_LIST_PREPENDLIST(worker->ievents[type].list,
+ list, link);
+ UNLOCK(&worker->ievents[type].lock);
+ }
return (ISC_R_SUSPEND);
}
- if (waiting-- == 0) {
- /* We reached this round "quota" */
- break;
- }
-
- ievent = DEQUEUE_NETIEVENT(worker, type);
+ ievent = next;
}
/* We processed at least one */
sizeof(*event));
*event = (isc__netievent_storage_t){ .ni.type = type };
+ ISC_LINK_INIT(&(event->ni), link);
return (event);
}
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
+ netievent_type_t type;
+
if (event->type > netievent_prio) {
- /*
- * We need to make sure this signal will be delivered and
- * the queue will be processed.
- */
- LOCK(&worker->lock);
- INCREMENT_PRIORITY_NETIEVENT(worker);
- ENQUEUE_PRIORITY_NETIEVENT(worker, event);
- SIGNAL(&worker->cond_prio);
- UNLOCK(&worker->lock);
- } else if (event->type == netievent_privilegedtask) {
- INCREMENT_PRIVILEGED_NETIEVENT(worker);
- ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
- } else if (event->type == netievent_task) {
- INCREMENT_TASK_NETIEVENT(worker);
- ENQUEUE_TASK_NETIEVENT(worker, event);
+ type = NETIEVENT_PRIORITY;
} else {
- INCREMENT_NORMAL_NETIEVENT(worker);
- ENQUEUE_NORMAL_NETIEVENT(worker, event);
+ switch (event->type) {
+ case netievent_prio:
+ UNREACHABLE();
+ break;
+ case netievent_privilegedtask:
+ type = NETIEVENT_PRIVILEGED;
+ break;
+ case netievent_task:
+ type = NETIEVENT_TASK;
+ break;
+ default:
+ type = NETIEVENT_NORMAL;
+ break;
+ }
}
+
+ /*
+ * We need to make sure this signal will be delivered and
+ * the queue will be processed.
+ */
+ LOCK(&worker->ievents[type].lock);
+ ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link);
+ if (type == NETIEVENT_PRIORITY) {
+ SIGNAL(&worker->ievents[type].cond);
+ }
+ UNLOCK(&worker->ievents[type].lock);
+
uv_async_send(&worker->async);
}
void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
+
uv_walk(&worker->loop, shutdown_walk_cb, NULL);
}
+++ /dev/null
-/*
- * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
- *
- * SPDX-License-Identifier: MPL-2.0
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, you can obtain one at https://mozilla.org/MPL/2.0/.
- *
- * See the COPYRIGHT file distributed with this work for additional
- * information regarding copyright ownership.
- */
-
-#include <inttypes.h>
-
-#include <isc/align.h>
-#include <isc/atomic.h>
-#include <isc/hp.h>
-#include <isc/mem.h>
-#include <isc/queue.h>
-#include <isc/string.h>
-
-#define BUFFER_SIZE 1024
-
-#define MAX_THREADS 128
-
-#define ALIGNMENT 128
-
-static uintptr_t nulluintptr = (uintptr_t)NULL;
-
-typedef struct node {
- atomic_uint_fast32_t deqidx;
- atomic_uintptr_t items[BUFFER_SIZE];
- atomic_uint_fast32_t enqidx;
- atomic_uintptr_t next;
- isc_mem_t *mctx;
-} node_t;
-
-/* we just need one Hazard Pointer */
-#define HP_TAIL 0
-#define HP_HEAD 0
-
-struct isc_queue {
- alignas(ALIGNMENT) atomic_uintptr_t head;
- alignas(ALIGNMENT) atomic_uintptr_t tail;
- isc_mem_t *mctx;
- int max_threads;
- int taken;
- isc_hp_t *hp;
- void *alloced_ptr;
-};
-
-static node_t *
-node_new(isc_mem_t *mctx, uintptr_t item) {
- node_t *node = isc_mem_get(mctx, sizeof(*node));
- *node = (node_t){ .mctx = NULL };
-
- atomic_init(&node->deqidx, 0);
- atomic_init(&node->enqidx, 1);
- atomic_init(&node->next, 0);
- atomic_init(&node->items[0], item);
-
- for (int i = 1; i < BUFFER_SIZE; i++) {
- atomic_init(&node->items[i], 0);
- }
-
- isc_mem_attach(mctx, &node->mctx);
-
- return (node);
-}
-
-static void
-node_destroy(void *node0) {
- node_t *node = (node_t *)node0;
-
- isc_mem_putanddetach(&node->mctx, node, sizeof(*node));
-}
-
-static bool
-node_cas_next(node_t *node, node_t *cmp, const node_t *val) {
- return (atomic_compare_exchange_strong(&node->next, (uintptr_t *)&cmp,
- (uintptr_t)val));
-}
-
-static bool
-queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) {
- return (atomic_compare_exchange_strong(&queue->tail, (uintptr_t *)&cmp,
- (uintptr_t)val));
-}
-
-static bool
-queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) {
- return (atomic_compare_exchange_strong(&queue->head, (uintptr_t *)&cmp,
- (uintptr_t)val));
-}
-
-isc_queue_t *
-isc_queue_new(isc_mem_t *mctx, int max_threads) {
- isc_queue_t *queue = NULL;
- node_t *sentinel = NULL;
- void *qbuf = NULL;
- uintptr_t qptr;
-
- /*
- * A trick to allocate an aligned isc_queue_t structure
- */
- qbuf = isc_mem_get(mctx, sizeof(*queue) + ALIGNMENT);
- qptr = (uintptr_t)qbuf;
- queue = (isc_queue_t *)(qptr + (ALIGNMENT - (qptr % ALIGNMENT)));
-
- if (max_threads == 0) {
- max_threads = MAX_THREADS;
- }
-
- *queue = (isc_queue_t){
- .max_threads = max_threads,
- .alloced_ptr = qbuf,
- };
-
- isc_mem_attach(mctx, &queue->mctx);
-
- queue->hp = isc_hp_new(mctx, 1, node_destroy);
-
- sentinel = node_new(mctx, nulluintptr);
- atomic_init(&sentinel->enqidx, 0);
-
- atomic_init(&queue->head, (uintptr_t)sentinel);
- atomic_init(&queue->tail, (uintptr_t)sentinel);
-
- return (queue);
-}
-
-void
-isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) {
- REQUIRE(item != nulluintptr);
-
- while (true) {
- node_t *lt = NULL;
- uint_fast32_t idx;
- uintptr_t n = nulluintptr;
-
- lt = (node_t *)isc_hp_protect(queue->hp, 0, &queue->tail);
- idx = atomic_fetch_add(<->enqidx, 1);
- if (idx > BUFFER_SIZE - 1) {
- node_t *lnext = NULL;
-
- if (lt != (node_t *)atomic_load(&queue->tail)) {
- continue;
- }
-
- lnext = (node_t *)atomic_load(<->next);
- if (lnext == NULL) {
- node_t *newnode = node_new(queue->mctx, item);
- if (node_cas_next(lt, NULL, newnode)) {
- queue_cas_tail(queue, lt, newnode);
- isc_hp_clear(queue->hp);
- return;
- }
- node_destroy(newnode);
- } else {
- queue_cas_tail(queue, lt, lnext);
- }
-
- continue;
- }
-
- if (atomic_compare_exchange_strong(<->items[idx], &n, item)) {
- isc_hp_clear(queue->hp);
- return;
- }
- }
-}
-
-uintptr_t
-isc_queue_dequeue(isc_queue_t *queue) {
- REQUIRE(queue != NULL);
-
- while (true) {
- node_t *lh = NULL;
- uint_fast32_t idx;
- uintptr_t item;
-
- lh = (node_t *)isc_hp_protect(queue->hp, 0, &queue->head);
- if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) &&
- atomic_load(&lh->next) == nulluintptr)
- {
- break;
- }
-
- idx = atomic_fetch_add(&lh->deqidx, 1);
- if (idx > BUFFER_SIZE - 1) {
- node_t *lnext = (node_t *)atomic_load(&lh->next);
- if (lnext == NULL) {
- break;
- }
- if (queue_cas_head(queue, lh, lnext)) {
- isc_hp_retire(queue->hp, (uintptr_t)lh);
- }
-
- continue;
- }
-
- item = atomic_exchange(&(lh->items[idx]),
- (uintptr_t)&queue->taken);
- if (item == nulluintptr) {
- continue;
- }
-
- isc_hp_clear(queue->hp);
- return (item);
- }
-
- isc_hp_clear(queue->hp);
- return (nulluintptr);
-}
-
-void
-isc_queue_destroy(isc_queue_t *queue) {
- node_t *last = NULL;
- void *alloced = NULL;
-
- REQUIRE(queue != NULL);
-
- while (isc_queue_dequeue(queue) != nulluintptr) {
- /* do nothing */
- }
-
- last = (node_t *)atomic_load_relaxed(&queue->head);
- node_destroy(last);
- isc_hp_destroy(queue->hp);
-
- alloced = queue->alloced_ptr;
- isc_mem_putanddetach(&queue->mctx, alloced, sizeof(*queue) + ALIGNMENT);
-}
isc_result_t
isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp, int threadid) {
- isc_task_t *task;
+ isc_task_t *task = NULL;
bool exiting;
REQUIRE(VALID_MANAGER(manager));
#define UNIT_TESTING
#include <cmocka.h>
-#include <isc/hp.h>
#include <isc/nonce.h>
#include <isc/os.h>
#include <isc/quota.h>
return (-1);
}
- isc_hp_init(4 * workers);
-
signal(SIGPIPE, SIG_IGN);
if (getenv("CI") == NULL || getenv("CI_ENABLE_ALL_TESTS") != NULL) {
isc_hex_decodestring
isc_hex_tobuffer
isc_hex_totext
-isc_hp_clear
-isc_hp_clear_one
-isc_hp_destroy
-isc_hp_init
-isc_hp_protect
-isc_hp_protect_ptr
-isc_hp_protect_release
-isc_hp_new
-isc_hp_retire
isc_hmac
isc_hmac_new
isc_hmac_free
isc_hmac_get_md_type
isc_hmac_get_size
isc_hmac_get_block_size
-isc_hp_new
-isc_hp_destroy
-isc_hp_clear
-isc_hp_protect
-isc_hp_protect_ptr
-isc_hp_protect_release
-isc_hp_retire
isc_ht_add
isc_ht_count
isc_ht_delete
isc_portset_nports
isc_portset_remove
isc_portset_removerange
-isc_queue_enqueue
-isc_queue_dequeue
-isc_queue_destroy
-isc_queue_new
isc_quota_attach
isc_quota_attach_cb
isc_quota_cb_init
<ClInclude Include="..\include\isc\hmac.h">
<Filter>Library Header Files</Filter>
</ClInclude>
- <ClInclude Include="..\include\isc\hp.h">
- <Filter>Library Header Files</Filter>
- </ClInclude>
<ClInclude Include="..\include\isc\ht.h">
<Filter>Library Header Files</Filter>
</ClInclude>
<ClInclude Include="..\include\isc\print.h">
<Filter>Library Header Files</Filter>
</ClInclude>
- <ClInclude Include="..\include\isc\queue.h">
- <Filter>Library Header Files</Filter>
- </ClInclude>
<ClInclude Include="..\include\isc\quota.h">
<Filter>Library Header Files</Filter>
</ClInclude>
<ClCompile Include="..\hmac.c">
<Filter>Library Source Files</Filter>
</ClCompile>
- <ClCompile Include="..\hp.c">
- <Filter>Library Source Files</Filter>
- </ClCompile>
<ClCompile Include="..\ht.c">
<Filter>Library Source Files</Filter>
</ClCompile>
<ClCompile Include="..\portset.c">
<Filter>Library Source Files</Filter>
</ClCompile>
- <ClCompile Include="..\queue.c">
- <Filter>Library Source Files</Filter>
- </ClCompile>
<ClCompile Include="..\quota.c">
<Filter>Library Source Files</Filter>
</ClCompile>
<ClInclude Include="..\include\isc\heap.h" />
<ClInclude Include="..\include\isc\hex.h" />
<ClInclude Include="..\include\isc\hmac.h" />
- <ClInclude Include="..\include\isc\hp.h" />
<ClInclude Include="..\include\isc\ht.h" />
<ClInclude Include="..\include\isc\httpd.h" />
<ClInclude Include="..\include\isc\interfaceiter.h" />
<ClInclude Include="..\include\isc\pool.h" />
<ClInclude Include="..\include\isc\portset.h" />
<ClInclude Include="..\include\isc\print.h" />
- <ClInclude Include="..\include\isc\queue.h" />
<ClInclude Include="..\include\isc\quota.h" />
<ClInclude Include="..\include\isc\radix.h" />
<ClInclude Include="..\include\isc\random.h" />
<ClCompile Include="..\heap.c" />
<ClCompile Include="..\hex.c" />
<ClCompile Include="..\hmac.c" />
- <ClCompile Include="..\hp.c" />
<ClCompile Include="..\ht.c" />
<ClCompile Include="..\httpd.c" />
<ClCompile Include="..\iterated_hash.c" />
<ClCompile Include="..\parseint.c" />
<ClCompile Include="..\pool.c" />
<ClCompile Include="..\portset.c" />
- <ClCompile Include="..\queue.c" />
<ClCompile Include="..\quota.c" />
<ClCompile Include="..\radix.c" />
<ClCompile Include="..\random.c" />