*** Imports.
***/
+#include <isc/align.h>
#include <isc/atomic.h>
#include <isc/job.h>
#include <isc/lang.h>
#include <isc/magic.h>
#include <isc/mutex.h>
+#include <isc/os.h>
#include <isc/refcount.h>
#include <isc/types.h>
+#include <isc/urcu.h>
/*****
***** Types.
ISC_LANG_BEGINDECLS
-/*% isc_quota structure */
+/*%
+ * isc_quota structure
+ *
+ * NOTE: We are using struct cds_wfcq_head which has an internal
+ * mutex, because we are using enqueue and dequeue, and dequeues need
+ * synchronization between multiple threads (see urcu/wfcqueue.h for
+ * detailed description).
+ */
struct isc_quota {
int magic;
atomic_uint_fast32_t max;
atomic_uint_fast32_t used;
atomic_uint_fast32_t soft;
- atomic_uint_fast32_t waiting;
- isc_mutex_t cblock;
- ISC_LIST(isc_job_t) jobs;
+ struct {
+ alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_head head;
+ alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
+ } jobs;
ISC_LINK(isc_quota_t) link;
};
#include <isc/atomic.h>
#include <isc/quota.h>
+#include <isc/urcu.h>
#include <isc/util.h>
#define QUOTA_MAGIC ISC_MAGIC('Q', 'U', 'O', 'T')
atomic_init("a->max, max);
atomic_init("a->used, 0);
atomic_init("a->soft, 0);
- atomic_init("a->waiting, 0);
- ISC_LIST_INIT(quota->jobs);
- isc_mutex_init("a->cblock);
+ cds_wfcq_init("a->jobs.head, "a->jobs.tail);
ISC_LINK_INIT(quota, link);
quota->magic = QUOTA_MAGIC;
}
void
isc_quota_soft(isc_quota_t *quota, unsigned int soft) {
REQUIRE(VALID_QUOTA(quota));
+ REQUIRE(atomic_load_relaxed("a->max) > soft);
atomic_store_relaxed("a->soft, soft);
}
return (atomic_load_relaxed("a->used));
}
-/* Must be quota->cblock locked */
-static void
-enqueue(isc_quota_t *quota, isc_job_t *cb) {
- REQUIRE(cb != NULL);
- ISC_LIST_ENQUEUE(quota->jobs, cb, link);
- atomic_fetch_add_relaxed("a->waiting, 1);
-}
-
-/* Must be quota->cblock locked */
-static isc_job_t *
-dequeue(isc_quota_t *quota) {
- isc_job_t *cb = ISC_LIST_HEAD(quota->jobs);
- if (cb != NULL) {
- ISC_LIST_DEQUEUE(quota->jobs, cb, link);
- atomic_fetch_sub_relaxed("a->waiting, 1);
- }
- return (cb);
-}
-
void
isc_quota_release(isc_quota_t *quota) {
- uint_fast32_t used;
-
/*
- * This is opportunistic - we might race with a failing quota_attach_cb
- * and not detect that something is waiting, but eventually someone will
- * be releasing quota and will detect it, so we don't need to worry -
- * and we're saving a lot by not locking cblock every time.
+ * We are using the cds_wfcq_dequeue_blocking() variant here that
+ * has an internal mutex because we need synchronization on
+ * multiple dequeues running from different threads.
*/
-
- if (atomic_load_acquire("a->waiting) > 0) {
- isc_job_t *cb = NULL;
- LOCK("a->cblock);
- cb = dequeue(quota);
- UNLOCK("a->cblock);
- if (cb != NULL) {
- cb->cb(cb->cbarg);
- return;
- }
+ struct cds_wfcq_node *node =
+ cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail);
+ if (node == NULL) {
+ uint_fast32_t used = atomic_fetch_sub_relaxed("a->used, 1);
+ INSIST(used > 0);
+ return;
}
- used = atomic_fetch_sub_release("a->used, 1);
- INSIST(used > 0);
+ isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
+ __tsan_acquire(job);
+ job->cb(job->cbarg);
}
isc_result_t
REQUIRE(VALID_QUOTA(quota));
REQUIRE(job == NULL || cb != NULL);
- uint_fast32_t used = atomic_fetch_add_release("a->used, 1);
-
+ uint_fast32_t used = atomic_fetch_add_relaxed("a->used, 1);
uint_fast32_t max = atomic_load_relaxed("a->max);
if (max != 0 && used >= max) {
(void)atomic_fetch_sub_relaxed("a->used, 1);
if (job != NULL) {
job->cb = cb;
job->cbarg = cbarg;
- LOCK("a->cblock);
- enqueue(quota, job);
- UNLOCK("a->cblock);
+ cds_wfcq_node_init(&job->wfcq_node);
+
+ /*
+ * The cds_wfcq_enqueue() is non-blocking (no internal
+ * mutex involved), so it offers a slight advantage.
+ */
+ __tsan_release(job);
+ cds_wfcq_enqueue("a->jobs.head, "a->jobs.tail,
+ &job->wfcq_node);
}
return (ISC_R_QUOTA);
}
quota->magic = 0;
INSIST(atomic_load("a->used) == 0);
- INSIST(atomic_load("a->waiting) == 0);
- INSIST(ISC_LIST_EMPTY(quota->jobs));
- isc_mutex_destroy("a->cblock);
+ INSIST(cds_wfcq_empty("a->jobs.head, "a->jobs.tail));
+
+ cds_wfcq_destroy("a->jobs.head, "a->jobs.tail);
}