]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Change the isc_quota API to use cds_wfcqueue internally
authorOndřej Surý <ondrej@isc.org>
Mon, 8 May 2023 21:31:54 +0000 (23:31 +0200)
committerOndřej Surý <ondrej@isc.org>
Fri, 12 May 2023 12:16:25 +0000 (14:16 +0200)
The isc_quota API was using locked list of isc_job_t objects to keep the
waiting TCP accepts.  Change the isc_quota implementation to use
cds_wfcqueue internally - the enqueue is wait-free and only dequeue
needs to be locked.

lib/isc/include/isc/quota.h
lib/isc/quota.c

index 22e653345c01f145089b95589d43df53b2adcdca..85654dd66fd022f876700ee18f1e6d76a02d03fb 100644 (file)
  *** Imports.
  ***/
 
+#include <isc/align.h>
 #include <isc/atomic.h>
 #include <isc/job.h>
 #include <isc/lang.h>
 #include <isc/magic.h>
 #include <isc/mutex.h>
+#include <isc/os.h>
 #include <isc/refcount.h>
 #include <isc/types.h>
+#include <isc/urcu.h>
 
 /*****
 ***** Types.
 
 ISC_LANG_BEGINDECLS
 
-/*% isc_quota structure */
+/*%
+ * isc_quota structure
+ *
+ * NOTE: We are using struct cds_wfcq_head which has an internal
+ * mutex, because we are using enqueue and dequeue, and dequeues need
+ * synchronization between multiple threads (see urcu/wfcqueue.h for
+ * detailed description).
+ */
 struct isc_quota {
        int                  magic;
        atomic_uint_fast32_t max;
        atomic_uint_fast32_t used;
        atomic_uint_fast32_t soft;
-       atomic_uint_fast32_t waiting;
-       isc_mutex_t          cblock;
-       ISC_LIST(isc_job_t) jobs;
+       struct {
+               alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_head head;
+               alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
+       } jobs;
        ISC_LINK(isc_quota_t) link;
 };
 
index 4dbdbc5781629205c5191ef02f28a200fec375ca..4816333e1cc476c744591ebe4f628ebd92a469fd 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <isc/atomic.h>
 #include <isc/quota.h>
+#include <isc/urcu.h>
 #include <isc/util.h>
 
 #define QUOTA_MAGIC    ISC_MAGIC('Q', 'U', 'O', 'T')
@@ -27,9 +28,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) {
        atomic_init(&quota->max, max);
        atomic_init(&quota->used, 0);
        atomic_init(&quota->soft, 0);
-       atomic_init(&quota->waiting, 0);
-       ISC_LIST_INIT(quota->jobs);
-       isc_mutex_init(&quota->cblock);
+       cds_wfcq_init(&quota->jobs.head, &quota->jobs.tail);
        ISC_LINK_INIT(quota, link);
        quota->magic = QUOTA_MAGIC;
 }
@@ -37,6 +36,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) {
 void
 isc_quota_soft(isc_quota_t *quota, unsigned int soft) {
        REQUIRE(VALID_QUOTA(quota));
+       REQUIRE(atomic_load_relaxed(&quota->max) > soft);
        atomic_store_relaxed(&quota->soft, soft);
 }
 
@@ -64,49 +64,24 @@ isc_quota_getused(isc_quota_t *quota) {
        return (atomic_load_relaxed(&quota->used));
 }
 
-/* Must be quota->cblock locked */
-static void
-enqueue(isc_quota_t *quota, isc_job_t *cb) {
-       REQUIRE(cb != NULL);
-       ISC_LIST_ENQUEUE(quota->jobs, cb, link);
-       atomic_fetch_add_relaxed(&quota->waiting, 1);
-}
-
-/* Must be quota->cblock locked */
-static isc_job_t *
-dequeue(isc_quota_t *quota) {
-       isc_job_t *cb = ISC_LIST_HEAD(quota->jobs);
-       if (cb != NULL) {
-               ISC_LIST_DEQUEUE(quota->jobs, cb, link);
-               atomic_fetch_sub_relaxed(&quota->waiting, 1);
-       }
-       return (cb);
-}
-
 void
 isc_quota_release(isc_quota_t *quota) {
-       uint_fast32_t used;
-
        /*
-        * This is opportunistic - we might race with a failing quota_attach_cb
-        * and not detect that something is waiting, but eventually someone will
-        * be releasing quota and will detect it, so we don't need to worry -
-        * and we're saving a lot by not locking cblock every time.
+        * We are using the cds_wfcq_dequeue_blocking() variant here that
+        * has an internal mutex because we need synchronization on
+        * multiple dequeues running from different threads.
         */
-
-       if (atomic_load_acquire(&quota->waiting) > 0) {
-               isc_job_t *cb = NULL;
-               LOCK(&quota->cblock);
-               cb = dequeue(quota);
-               UNLOCK(&quota->cblock);
-               if (cb != NULL) {
-                       cb->cb(cb->cbarg);
-                       return;
-               }
+       struct cds_wfcq_node *node =
+               cds_wfcq_dequeue_blocking(&quota->jobs.head, &quota->jobs.tail);
+       if (node == NULL) {
+               uint_fast32_t used = atomic_fetch_sub_relaxed(&quota->used, 1);
+               INSIST(used > 0);
+               return;
        }
 
-       used = atomic_fetch_sub_release(&quota->used, 1);
-       INSIST(used > 0);
+       isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
+       __tsan_acquire(job);
+       job->cb(job->cbarg);
 }
 
 isc_result_t
@@ -115,17 +90,22 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb,
        REQUIRE(VALID_QUOTA(quota));
        REQUIRE(job == NULL || cb != NULL);
 
-       uint_fast32_t used = atomic_fetch_add_release(&quota->used, 1);
-
+       uint_fast32_t used = atomic_fetch_add_relaxed(&quota->used, 1);
        uint_fast32_t max = atomic_load_relaxed(&quota->max);
        if (max != 0 && used >= max) {
                (void)atomic_fetch_sub_relaxed(&quota->used, 1);
                if (job != NULL) {
                        job->cb = cb;
                        job->cbarg = cbarg;
-                       LOCK(&quota->cblock);
-                       enqueue(quota, job);
-                       UNLOCK(&quota->cblock);
+                       cds_wfcq_node_init(&job->wfcq_node);
+
+                       /*
+                        * The cds_wfcq_enqueue() is non-blocking (no internal
+                        * mutex involved), so it offers a slight advantage.
+                        */
+                       __tsan_release(job);
+                       cds_wfcq_enqueue(&quota->jobs.head, &quota->jobs.tail,
+                                        &job->wfcq_node);
                }
                return (ISC_R_QUOTA);
        }
@@ -144,7 +124,7 @@ isc_quota_destroy(isc_quota_t *quota) {
        quota->magic = 0;
 
        INSIST(atomic_load(&quota->used) == 0);
-       INSIST(atomic_load(&quota->waiting) == 0);
-       INSIST(ISC_LIST_EMPTY(quota->jobs));
-       isc_mutex_destroy(&quota->cblock);
+       INSIST(cds_wfcq_empty(&quota->jobs.head, &quota->jobs.tail));
+
+       cds_wfcq_destroy(&quota->jobs.head, &quota->jobs.tail);
 }