]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
EXP: Lock-free QPzone ondrej/lock-free-qpzone-reads-v1
authorOndřej Surý <ondrej@isc.org>
Tue, 4 Mar 2025 16:39:14 +0000 (17:39 +0100)
committerOndřej Surý <ondrej@sury.org>
Wed, 5 Mar 2025 06:55:51 +0000 (07:55 +0100)
.clang-format
lib/dns/include/dns/rdataslab.h
lib/dns/include/dns/trace.h
lib/dns/qpzone.c
lib/dns/rdataslab.c
lib/isc/include/isc/urcu.h
tests/dns/qpzone_test.c

index 7bd90052abb56d790ae79d47c45721a657a68222..bc121f3896f26dc6a4708f046359b14e5d95cc7c 100644 (file)
@@ -78,6 +78,6 @@ PenaltyBreakString: 80
 PenaltyExcessCharacter: 100
 Standard: Cpp11
 ContinuationIndentWidth: 8
-ForEachMacros: [ 'cds_lfs_for_each', 'cds_lfs_for_each_safe', 'cds_list_for_each_entry_safe', 'ISC_LIST_FOREACH', 'ISC_LIST_FOREACH_SAFE', 'ISC_LIST_FOREACH_REV', 'ISC_LIST_FOREACH_REV_SAFE' ]
+ForEachMacros: [ 'cds_lfs_for_each', 'cds_lfs_for_each_safe', 'cds_list_for_each_entry_safe', 'cds_list_for_each_entry_rcu', 'ISC_LIST_FOREACH', 'ISC_LIST_FOREACH_SAFE', 'ISC_LIST_FOREACH_REV', 'ISC_LIST_FOREACH_REV_SAFE' ]
 RemoveParentheses: ReturnStatement
 RemoveSemicolon: true
index 7f393b1e56f740054321c3f2fc9681d051090a3a..ebe7d7f0bd6726add391a95e23fb1e37f297b755 100644 (file)
@@ -64,6 +64,14 @@ struct dns_slabheader_proof {
        dns_rdatatype_t type;
 };
 
+typedef struct dns_slabheader_list {
+       isc_mem_t           *mctx;
+       struct cds_list_head nnode;
+       struct cds_list_head versions;
+       struct rcu_head      rcu_head;
+       dns_typepair_t       type;
+} dns_slabheader_list_t;
+
 struct dns_slabheader {
        _Atomic(uint16_t) attributes;
 
@@ -115,12 +123,14 @@ struct dns_slabheader {
         */
 
        struct dns_slabheader *down;
+       struct cds_list_head   dnode;
        /*%<
         * Points to the header for the next older version of
         * this rdataset.
         */
 
        dns_db_t     *db;
+       isc_mem_t    *mctx; /* FIXME: Experiment */
        dns_dbnode_t *node;
        /*%<
         * The database and database node objects containing
@@ -139,6 +149,8 @@ struct dns_slabheader {
        isc_heap_t *heap;
 
        dns_gluelist_t *gluelist;
+
+       struct rcu_head rcu_head;
 };
 
 enum {
@@ -307,7 +319,7 @@ dns_slabheader_new(dns_db_t *db, dns_dbnode_t *node);
 void
 dns_slabheader_destroy(dns_slabheader_t **headerp);
 /*%<
- * Free all memory associated with '*headerp'.
+ * Free all memory associated with '*headerp', either directly or with RCU.
  */
 
 void
@@ -321,3 +333,9 @@ dns_slabheader_top(dns_slabheader_t *header);
 /*%<
  * Return the top header for the type or the negtype
  */
+
+void
+dns_slabheader_createlist(isc_mem_t *mctx, dns_slabheader_list_t **headp);
+
+void
+dns_slabheader_destroylist(dns_slabheader_list_t **headp);
index e7326917a2d070bec7e1c10154d2f932a5dcac1a..d20dd2101c44b78f0223fd759d3354fb5668aca0 100644 (file)
@@ -13,7 +13,7 @@
 
 #pragma once
 
-#undef DNS_DB_NODETRACE
+/* Add -DDNS_DB_NODETRACE=1 to CFLAGS for detailed node reference tracing */
 
 #if DNS_DB_NODETRACE
 
index 72050bf96b7baf49daeff01607444d36dcaa5242..edf82ff2ff68b432c799b30625320a4371356b92 100644 (file)
@@ -34,6 +34,7 @@
 #include <isc/result.h>
 #include <isc/rwlock.h>
 #include <isc/serial.h>
+#include <isc/spinlock.h>
 #include <isc/stdio.h>
 #include <isc/string.h>
 #include <isc/time.h>
 #include "db_p.h"
 #include "qpzone_p.h"
 
+/*
+ * FIXME: Undefine the macros to be sure there's no usage here.
+ */
+#undef NODE_INITLOCK
+#undef NODE_DESTROYLOCK
+#undef NODE_LOCK
+#undef NODE_UNLOCK
+#undef NODE_RDLOCK
+#undef NODE_WRLOCK
+#undef NODE_TRYRDLOCK
+#undef NODE_TRYWRLOCK
+#undef NODE_TRYUPGRADE
+#undef NODE_FORCEUPGRADE
+
 #define CHECK(op)                            \
        do {                                 \
                result = (op);               \
 #define QPDB_ATTR_LOADED  0x01
 #define QPDB_ATTR_LOADING 0x02
 
-#define DEFAULT_BUCKETS_COUNT 17 /*%< Should be prime. */
-
 #define QPDBITER_NSEC3_ORIGIN_NODE(qpdb, iterator)        \
        ((iterator)->current == &(iterator)->nsec3iter && \
         (iterator)->node == (qpdb)->nsec3_origin)
@@ -177,22 +190,18 @@ struct qpznode {
        isc_refcount_t references;
        isc_refcount_t erefs;
 
-       uint16_t locknum;
        atomic_uint_fast8_t nsec;
-       atomic_bool wild;
-       atomic_bool delegating;
-       atomic_bool dirty;
-       void *data;
-};
 
-typedef struct qpzone_bucket {
-       /* Per-bucket lock. */
-       isc_rwlock_t lock;
+       bool wild;
+       bool delegating;
+       bool dirty;
+
+       isc_spinlock_t spinlock;
+
+       struct cds_list_head headers;
 
-       /* Padding to prevent false sharing between locks. */
-       uint8_t __padding[ISC_OS_CACHELINE_SIZE -
-                         (sizeof(isc_rwlock_t)) % ISC_OS_CACHELINE_SIZE];
-} qpzone_bucket_t;
+       struct rcu_head rcu_head;
+};
 
 struct qpzonedb {
        /* Unlocked. */
@@ -238,9 +247,6 @@ struct qpzonedb {
        dns_qpmulti_t *tree;  /* Main QP trie for data storage */
        dns_qpmulti_t *nsec;  /* NSEC nodes only */
        dns_qpmulti_t *nsec3; /* NSEC3 nodes only */
-
-       size_t buckets_count;
-       qpzone_bucket_t buckets[]; /* attribute((counted_by(buckets_count))) */
 };
 
 #ifdef DNS_DB_NODETRACE
@@ -332,7 +338,8 @@ static dns_rdatasetitermethods_t rdatasetiter_methods = {
 
 typedef struct qpdb_rdatasetiter {
        dns_rdatasetiter_t common;
-       dns_slabheader_t *current;
+       dns_slabheader_list_t *hlist;
+       dns_slabheader_t *header;
 } qpdb_rdatasetiter_t;
 
 /*
@@ -390,20 +397,6 @@ typedef struct qpdb_dbiterator {
  */
 static atomic_uint_fast16_t init_count = 0;
 
-/*
- * Locking
- *
- * If a routine is going to lock more than one lock in this module, then
- * the locking must be done in the following order:
- *
- *      Node Lock       (Only one from the set may be locked at one time by
- *                       any caller)
- *
- *      Database Lock
- *
- * Failure to follow this hierarchy can result in deadlock.
- */
-
 /*%
  * Return which RRset should be resigned sooner.  If the RRsets have the
  * same signing time, prefer the other RRset over the SOA RRset.
@@ -426,6 +419,7 @@ static void
 set_index(void *what, unsigned int idx) {
        dns_slabheader_t *h = what;
 
+       /* Locked by the heap lock */
        h->heap_index = idx;
 }
 
@@ -501,15 +495,12 @@ cleanup_gluelists(struct cds_wfs_stack *glue_stack) {
 }
 
 static void
-free_db_rcu(struct rcu_head *rcu_head) {
+qpzonedb__destroy_rcu(struct rcu_head *rcu_head) {
        qpzonedb_t *qpdb = caa_container_of(rcu_head, qpzonedb_t, rcu_head);
 
        if (dns_name_dynamic(&qpdb->common.origin)) {
                dns_name_free(&qpdb->common.origin, qpdb->common.mctx);
        }
-       for (size_t i = 0; i < qpdb->buckets_count; i++) {
-               NODE_DESTROYLOCK(&qpdb->buckets[i].lock);
-       }
 
        isc_heap_destroy(&qpdb->heap);
 
@@ -532,13 +523,11 @@ free_db_rcu(struct rcu_head *rcu_head) {
                INSIST(!cds_lfht_destroy(qpdb->common.update_listeners, NULL));
        }
 
-       isc_mem_putanddetach(&qpdb->common.mctx, qpdb,
-                            sizeof(*qpdb) + qpdb->buckets_count *
-                                                    sizeof(qpdb->buckets[0]));
+       isc_mem_putanddetach(&qpdb->common.mctx, qpdb, sizeof(*qpdb));
 }
 
 static void
-qpzone_destroy(qpzonedb_t *qpdb) {
+qpzonedb__destroy(qpzonedb_t *qpdb) {
        REQUIRE(qpdb->future_version == NULL);
 
        isc_refcount_decrementz(&qpdb->current_version->references);
@@ -563,11 +552,11 @@ qpzone_destroy(qpzonedb_t *qpdb) {
        isc_log_write(DNS_LOGCATEGORY_DATABASE, DNS_LOGMODULE_DB,
                      ISC_LOG_DEBUG(1), "called %s(%s)", __func__, buf);
 
-       call_rcu(&qpdb->rcu_head, free_db_rcu);
+       call_rcu(&qpdb->rcu_head, qpzonedb__destroy_rcu);
 }
 
 static void
-qpdb_destroy(dns_db_t *arg) {
+qpzonedb_destroy(dns_db_t *arg) {
        qpzonedb_t *qpdb = (qpzonedb_t *)arg;
 
        if (qpdb->origin != NULL) {
@@ -589,23 +578,26 @@ qpdb_destroy(dns_db_t *arg) {
        qpzonedb_detach(&qpdb);
 }
 
-static qpznode_t *
-new_qpznode(qpzonedb_t *qpdb, const dns_name_t *name) {
-       qpznode_t *newdata = isc_mem_get(qpdb->common.mctx, sizeof(*newdata));
-       *newdata = (qpznode_t){
+static void
+qpznode_create(qpzonedb_t *qpdb, const dns_name_t *name,
+              qpznode_t **qpznodep DNS__DB_FLARG) {
+       REQUIRE(qpznodep != NULL && *qpznodep == NULL);
+       qpznode_t *qpznode = isc_mem_get(qpdb->common.mctx, sizeof(*qpznode));
+       *qpznode = (qpznode_t){
                .name = DNS_NAME_INITEMPTY,
                .references = ISC_REFCOUNT_INITIALIZER(1),
-               .locknum = isc_random_uniform(qpdb->buckets_count),
+               .headers = CDS_LIST_HEAD_INIT(qpznode->headers),
        };
 
-       isc_mem_attach(qpdb->common.mctx, &newdata->mctx);
-       dns_name_dup(name, qpdb->common.mctx, &newdata->name);
+       isc_spinlock_init(&qpznode->spinlock);
+       isc_mem_attach(qpdb->common.mctx, &qpznode->mctx);
+       dns_name_dup(name, qpdb->common.mctx, &qpznode->name);
 
 #if DNS_DB_NODETRACE
-       fprintf(stderr, "new_qpznode:%s:%s:%d:%p->references = 1\n", __func__,
-               __FILE__, __LINE__ + 1, name);
+       fprintf(stderr, "new:qpznode:%s:%s:%d:%p->references = 1\n", func, file,
+               line, qpznode);
 #endif
-       return newdata;
+       *qpznodep = qpznode;
 }
 
 static qpz_version_t *
@@ -636,14 +628,11 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
        isc_result_t result;
        dns_qp_t *qp = NULL;
 
-       qpdb = isc_mem_get(mctx,
-                          sizeof(*qpdb) + DEFAULT_BUCKETS_COUNT *
-                                                  sizeof(qpdb->buckets[0]));
+       qpdb = isc_mem_get(mctx, sizeof(*qpdb));
        *qpdb = (qpzonedb_t){
                .common.origin = DNS_NAME_INITEMPTY,
                .common.rdclass = rdclass,
                .common.references = ISC_REFCOUNT_INITIALIZER(1),
-               .buckets_count = DEFAULT_BUCKETS_COUNT,
                .current_serial = 1,
                .least_serial = 1,
                .next_serial = 2,
@@ -662,10 +651,6 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
 
        isc_heap_create(mctx, resign_sooner, set_index, 0, &qpdb->heap);
 
-       for (size_t i = 0; i < qpdb->buckets_count; i++) {
-               NODE_INITLOCK(&qpdb->buckets[i].lock);
-       }
-
        /*
         * Attach to the mctx.  The database will persist so long as there
         * are references to it, and attaching to the mctx ensures that our
@@ -701,7 +686,7 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
         */
 
        dns_qpmulti_write(qpdb->tree, &qp);
-       qpdb->origin = new_qpznode(qpdb, &qpdb->common.origin);
+       qpznode_create(qpdb, &qpdb->common.origin, &qpdb->origin);
        result = dns_qp_insert(qp, qpdb->origin, 0);
        INSIST(result == ISC_R_SUCCESS);
        qpdb->origin->nsec = DNS_DB_NSEC_NORMAL;
@@ -713,7 +698,7 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
         * record in the tree.
         */
        dns_qpmulti_write(qpdb->nsec3, &qp);
-       qpdb->nsec3_origin = new_qpznode(qpdb, &qpdb->common.origin);
+       qpznode_create(qpdb, &qpdb->common.origin, &qpdb->nsec3_origin);
        qpdb->nsec3_origin->nsec = DNS_DB_NSEC_NSEC3;
        result = dns_qp_insert(qp, qpdb->nsec3_origin, 0);
        INSIST(result == ISC_R_SUCCESS);
@@ -733,6 +718,75 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
        return ISC_R_SUCCESS;
 }
 
+static void
+clean_zone_node(qpznode_t *node, uint32_t least_serial) {
+       REQUIRE(least_serial != 0);
+
+       /*
+        * When cleaning up the node headers, we need to keep all the active
+        * (non-IGNORE) headers with the serial larger or equal to the
+        * `least_serial`.
+        *
+        * If no such header exists we need to keep a single header with the
+        * serial smaller than least_version.
+        */
+
+       /*
+        * When cleaning up the node headers, we need to keep all the active
+        * (non-IGNORE) headers with the serial larger or equal to the
+        * `least_serial`.
+        *
+        * If no such header exists we need to keep a single header with the
+        * serial smaller than least_version.
+        */
+
+       dns_slabheader_list_t *hlist = NULL, *hnext = NULL;
+       cds_list_for_each_entry_safe (hlist, hnext, &node->headers, nnode) {
+               bool empty = true;
+
+               dns_slabheader_t *header = NULL, *next = NULL;
+               dns_slabheader_t *parent = NULL;
+               cds_list_for_each_entry_safe (header, next, &hlist->versions,
+                                             dnode)
+               {
+                       if (IGNORE(header)) {
+                               /* were rolled back */
+                               cds_list_del_rcu(&header->dnode);
+                               dns_slabheader_destroy(&header);
+                       } else if (parent != NULL &&
+                                  header->serial == parent->serial)
+                       {
+                               /* with the same serial number */
+                               cds_list_del_rcu(&header->dnode);
+                               dns_slabheader_destroy(&header);
+                       } else if (parent != NULL &&
+                                  header->serial < least_serial)
+                       {
+                               /*
+                                * with the serial number less than least_serial
+                                * except when this would be the last header
+                                * left
+                                */
+                               cds_list_del_rcu(&header->dnode);
+                               dns_slabheader_destroy(&header);
+                       } else {
+                               parent = header;
+                               empty = false;
+                       }
+               }
+
+               if (empty) {
+                       cds_list_del_rcu(&hlist->nnode);
+                       dns_slabheader_destroylist(&hlist);
+               }
+       }
+
+       /*
+        * The code above has cleaned everything it could.
+        */
+       CMM_STORE_SHARED(node->dirty, false);
+}
+
 /*
  * If incrementing erefs from zero, we also increment the node use counter
  * in the qpzonedb object.
@@ -743,13 +797,14 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
  */
 static void
 qpznode_erefs_increment(qpzonedb_t *qpdb, qpznode_t *node DNS__DB_FLARG) {
-       uint_fast32_t refs = isc_refcount_increment0(&node->erefs);
+       uint_fast32_t erefs = atomic_fetch_add_release(&node->erefs, 1);
+       INSIST(erefs < UINT32_MAX);
 #if DNS_DB_NODETRACE
        fprintf(stderr, "incr:node:%s:%s:%u:%p->erefs = %" PRIuFAST32 "\n",
-               func, file, line, node, refs + 1);
+               func, file, line, node, erefs + 1);
 #endif
 
-       if (refs > 0) {
+       if (erefs > 0) {
                return;
        }
 
@@ -762,125 +817,6 @@ qpznode_acquire(qpzonedb_t *qpdb, qpznode_t *node DNS__DB_FLARG) {
        qpznode_erefs_increment(qpdb, node DNS__DB_FLARG_PASS);
 }
 
-static void
-clean_zone_node(qpznode_t *node, uint32_t least_serial) {
-       dns_slabheader_t *current = NULL, *dcurrent = NULL;
-       dns_slabheader_t *dcurrent_down = NULL, *dparent = NULL;
-       dns_slabheader_t *top_prev = NULL, *top_next = NULL;
-       bool still_dirty = false;
-
-       /*
-        * Caller must be holding the node lock.
-        */
-       REQUIRE(least_serial != 0);
-
-       for (current = node->data; current != NULL; current = top_next) {
-               top_next = current->next;
-
-               /*
-                * First, we clean up any instances of multiple rdatasets
-                * with the same serial number, or that have the IGNORE
-                * attribute.
-                */
-               dparent = current;
-               for (dcurrent = current->down; dcurrent != NULL;
-                    dcurrent = dcurrent_down)
-               {
-                       dcurrent_down = dcurrent->down;
-                       INSIST(dcurrent->serial <= dparent->serial);
-                       if (dcurrent->serial == dparent->serial ||
-                           IGNORE(dcurrent))
-                       {
-                               if (dcurrent_down != NULL) {
-                                       dcurrent_down->up = dparent;
-                               }
-                               dparent->down = dcurrent_down;
-                               dns_slabheader_destroy(&dcurrent);
-                       } else {
-                               dparent = dcurrent;
-                       }
-               }
-
-               /*
-                * We've now eliminated all IGNORE datasets with the possible
-                * exception of current, which we now check.
-                */
-               dcurrent = current;
-               if (IGNORE(dcurrent)) {
-                       dcurrent_down = current->down;
-                       if (dcurrent_down == NULL) {
-                               if (top_prev != NULL) {
-                                       top_prev->next = current->next;
-                               } else {
-                                       node->data = current->next;
-                               }
-                               dns_slabheader_destroy(&current);
-                               /*
-                                * current no longer exists, so we can
-                                * just continue with the loop.
-                                */
-                               continue;
-                       } else {
-                               /*
-                                * Pull up current->down, making it the new
-                                * current.
-                                */
-                               if (top_prev != NULL) {
-                                       top_prev->next = dcurrent_down;
-                               } else {
-                                       node->data = dcurrent_down;
-                               }
-                               dcurrent_down->next = top_next;
-                               dns_slabheader_destroy(&current);
-                               current = dcurrent_down;
-                       }
-               }
-
-               /*
-                * We now try to find the first down node less than the
-                * least serial.
-                */
-               dparent = current;
-               for (dcurrent = current->down; dcurrent != NULL;
-                    dcurrent = dcurrent_down)
-               {
-                       dcurrent_down = dcurrent->down;
-                       if (dcurrent->serial < least_serial) {
-                               break;
-                       }
-                       dparent = dcurrent;
-               }
-
-               /*
-                * If there is a such an rdataset, delete it and any older
-                * versions.
-                */
-               if (dcurrent != NULL) {
-                       do {
-                               dcurrent_down = dcurrent->down;
-                               INSIST(dcurrent->serial <= least_serial);
-                               dns_slabheader_destroy(&dcurrent);
-                               dcurrent = dcurrent_down;
-                       } while (dcurrent != NULL);
-                       dparent->down = NULL;
-               }
-
-               /*
-                * Note.  The serial number of 'current' might be less than
-                * least_serial too, but we cannot delete it because it is
-                * the most recent version.
-                */
-               if (current->down != NULL) {
-                       still_dirty = true;
-               }
-               top_prev = current;
-       }
-
-       if (!still_dirty) {
-               node->dirty = false;
-       }
-}
-
 /*
  * Decrement the external references to a node. If the counter
  * goes to zero, decrement the node use counter in the qpzonedb object
@@ -916,57 +852,20 @@ qpznode_erefs_decrement(qpzonedb_t *qpdb, qpznode_t *node DNS__DB_FLARG) {
  * if necessary, then decrements the internal reference counter as well.
  */
 static void
-qpznode_release(qpzonedb_t *qpdb, qpznode_t *node, uint32_t least_serial,
-               isc_rwlocktype_t *nlocktypep DNS__DB_FLARG) {
-       REQUIRE(*nlocktypep != isc_rwlocktype_none);
-
-       if (!qpznode_erefs_decrement(qpdb, node DNS__DB_FLARG_PASS)) {
-               goto unref;
-       }
-
-       /* Handle easy and typical case first. */
-       if (!node->dirty && (node->data != NULL || node == qpdb->origin ||
-                            node == qpdb->nsec3_origin))
+qpznode_release(qpzonedb_t *qpdb, qpznode_t *node,
+               uint32_t least_serial DNS__DB_FLARG) {
+       if (qpznode_erefs_decrement(qpdb, node DNS__DB_FLARG_PASS) &&
+           CMM_LOAD_SHARED(node->dirty))
        {
-               goto unref;
-       }
+               isc_spinlock_lock(&node->spinlock);
 
-       if (*nlocktypep == isc_rwlocktype_read) {
-               /*
-                * The external reference count went to zero and the node
-                * is dirty or has no data, so we might want to delete it.
-                * To do that, we'll need a write lock. If we don't already
-                * have one, we have to make sure nobody else has
-                * acquired a reference in the meantime, so we increment
-                * erefs (but NOT references!), upgrade the node lock,
-                * decrement erefs again, and see if it's still zero.
-                *
-                * We can't really assume anything about the result code of
-                * erefs_increment.  If another thread acquires reference it
-                * will be larger than 0, if it doesn't it is going to be 0.
-                */
-               isc_rwlock_t *nlock = &qpdb->buckets[node->locknum].lock;
-               qpznode_erefs_increment(qpdb, node DNS__DB_FLARG_PASS);
-               NODE_FORCEUPGRADE(nlock, nlocktypep);
-               if (!qpznode_erefs_decrement(qpdb, node DNS__DB_FLARG_PASS)) {
-                       goto unref;
-               }
-       }
-
-       if (node->dirty) {
                if (least_serial == 0) {
-                       /*
-                        * Caller doesn't know the least serial.
-                        * Get it.
-                        */
-                       RWLOCK(&qpdb->lock, isc_rwlocktype_read);
-                       least_serial = qpdb->least_serial;
-                       RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
+                       least_serial = CMM_LOAD_SHARED(qpdb->least_serial);
                }
                clean_zone_node(node, least_serial);
+               isc_spinlock_unlock(&node->spinlock);
        }
 
-unref:
        qpznode_unref(node);
 }
 
@@ -1030,84 +929,81 @@ setnsec3parameters(dns_db_t *db, qpz_version_t *version) {
        dns_rdata_t rdata = DNS_RDATA_INIT;
        isc_region_t region;
        isc_result_t result;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
+       dns_slabheader_list_t *hlist = NULL;
        unsigned char *raw; /* RDATASLAB */
        unsigned int count, length;
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        version->havensec3 = false;
        node = qpdb->origin;
-       nlock = &qpdb->buckets[node->locknum].lock;
-       NODE_RDLOCK(nlock, &nlocktype);
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-               do {
-                       if (header->serial <= version->serial &&
-                           !IGNORE(header))
+
+       rcu_read_lock();
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               if (hlist->type != dns_rdatatype_nsec3param) {
+                       continue;
+               }
+
+               dns_slabheader_t *header = NULL, *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial <= version->serial &&
+                           !IGNORE(current))
                        {
-                               if (NONEXISTENT(header)) {
-                                       header = NULL;
-                               }
+                               header = current;
                                break;
-                       } else {
-                               header = header->down;
                        }
-               } while (header != NULL);
+               }
 
-               if (header != NULL &&
-                   (header->type == dns_rdatatype_nsec3param))
-               {
-                       /*
-                        * Find an NSEC3PARAM with a supported algorithm.
-                        */
-                       raw = dns_slabheader_raw(header);
-                       count = raw[0] * 256 + raw[1]; /* count */
-                       raw += DNS_RDATASET_COUNT + DNS_RDATASET_LENGTH;
-                       while (count-- > 0U) {
-                               length = raw[0] * 256 + raw[1];
-                               raw += DNS_RDATASET_ORDER + DNS_RDATASET_LENGTH;
-                               region.base = raw;
-                               region.length = length;
-                               raw += length;
-                               dns_rdata_fromregion(
-                                       &rdata, qpdb->common.rdclass,
-                                       dns_rdatatype_nsec3param, &region);
-                               result = dns_rdata_tostruct(&rdata, &nsec3param,
-                                                           NULL);
-                               INSIST(result == ISC_R_SUCCESS);
-                               dns_rdata_reset(&rdata);
+               if (header == NULL || NONEXISTENT(current)) {
+                       /* There is no extant NSEC3PARAM header */
+                       break;
+               }
 
-                               if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG &&
-                                   !dns_nsec3_supportedhash(nsec3param.hash))
-                               {
-                                       continue;
-                               }
+               /*
+                * Find an NSEC3PARAM with a supported algorithm.
+                */
+               raw = dns_slabheader_raw(header);
+               count = raw[0] * 256 + raw[1]; /* count */
+               raw += DNS_RDATASET_COUNT + DNS_RDATASET_LENGTH;
+               while (count-- > 0U) {
+                       length = raw[0] * 256 + raw[1];
+                       raw += DNS_RDATASET_ORDER + DNS_RDATASET_LENGTH;
+                       region.base = raw;
+                       region.length = length;
+                       raw += length;
+                       dns_rdata_fromregion(&rdata, qpdb->common.rdclass,
+                                            dns_rdatatype_nsec3param, &region);
+                       result = dns_rdata_tostruct(&rdata, &nsec3param, NULL);
+                       INSIST(result == ISC_R_SUCCESS);
+                       dns_rdata_reset(&rdata);
 
-                               if (nsec3param.flags != 0) {
-                                       continue;
-                               }
+                       if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG &&
+                           !dns_nsec3_supportedhash(nsec3param.hash))
+                       {
+                               continue;
+                       }
 
-                               memmove(version->salt, nsec3param.salt,
-                                       nsec3param.salt_length);
-                               version->hash = nsec3param.hash;
-                               version->salt_length = nsec3param.salt_length;
-                               version->iterations = nsec3param.iterations;
-                               version->flags = nsec3param.flags;
-                               version->havensec3 = true;
-                               /*
-                                * Look for a better algorithm than the
-                                * unknown test algorithm.
-                                */
-                               if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG) {
-                                       goto unlock;
-                               }
+                       if (nsec3param.flags != 0) {
+                               continue;
+                       }
+
+                       memmove(version->salt, nsec3param.salt,
+                               nsec3param.salt_length);
+                       version->hash = nsec3param.hash;
+                       version->salt_length = nsec3param.salt_length;
+                       version->iterations = nsec3param.iterations;
+                       version->flags = nsec3param.flags;
+                       version->havensec3 = true;
+                       /*
+                        * Look for a better algorithm than the
+                        * unknown test algorithm.
+                        */
+                       if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG) {
+                               goto unlock;
                        }
                }
        }
 unlock:
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
 }
 
 static void
@@ -1295,7 +1191,7 @@ make_least_version(qpzonedb_t *qpdb, qpz_version_t *version,
 
 static void
 rollback_node(qpznode_t *node, uint32_t serial) {
-       dns_slabheader_t *header = NULL, *dcurrent = NULL;
+       dns_slabheader_list_t *hlist = NULL;
        bool make_dirty = false;
 
        /*
@@ -1303,24 +1199,23 @@ rollback_node(qpznode_t *node, uint32_t serial) {
         * 'serial'.  When the reference count goes to zero, these rdatasets
         * will be cleaned up; until that time, they will be ignored.
         */
-       for (header = node->data; header != NULL; header = header->next) {
-               if (header->serial == serial) {
-                       DNS_SLABHEADER_SETATTR(header,
-                                              DNS_SLABHEADERATTR_IGNORE);
-                       make_dirty = true;
-               }
-               for (dcurrent = header->down; dcurrent != NULL;
-                    dcurrent = dcurrent->down)
-               {
-                       if (dcurrent->serial == serial) {
+       rcu_read_lock();
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               dns_slabheader_t *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial == serial) {
                                DNS_SLABHEADER_SETATTR(
-                                       dcurrent, DNS_SLABHEADERATTR_IGNORE);
+                                       current, DNS_SLABHEADERATTR_IGNORE);
                                make_dirty = true;
+                       } else if (current->serial < serial) {
+                               break;
                        }
                }
        }
+       rcu_read_unlock();
+
        if (make_dirty) {
-               node->dirty = true;
+               CMM_STORE_SHARED(node->dirty, true);
        }
 }
 
@@ -1360,139 +1255,132 @@ closeversion(dns_db_t *db, dns_dbversion_t **versionp,
 
        RWLOCK(&qpdb->lock, isc_rwlocktype_write);
        serial = version->serial;
-       if (version->writer) {
-               if (commit) {
-                       unsigned int cur_ref;
-                       qpz_version_t *cur_version = NULL;
+       if (version->writer && commit) {
+               unsigned int cur_ref;
+               qpz_version_t *cur_version = NULL;
 
-                       INSIST(version == qpdb->future_version);
-                       /*
-                        * The current version is going to be replaced.
-                        * Release the (likely last) reference to it from the
-                        * DB itself and unlink it from the open list.
-                        */
-                       cur_version = qpdb->current_version;
-                       cur_ref = isc_refcount_decrement(
-                               &cur_version->references);
-                       if (cur_ref == 1) {
-                               (void)isc_refcount_current(
-                                       &cur_version->references);
-                               if (cur_version->serial == qpdb->least_serial) {
-                                       INSIST(ISC_LIST_EMPTY(
-                                               cur_version->changed_list));
-                               }
-                               ISC_LIST_UNLINK(qpdb->open_versions,
-                                               cur_version, link);
-                       }
-                       if (ISC_LIST_EMPTY(qpdb->open_versions)) {
-                               /*
-                                * We're going to become the least open
-                                * version.
-                                */
-                               make_least_version(qpdb, version,
-                                                  &cleanup_list);
-                       } else {
-                               /*
-                                * Some other open version is the
-                                * least version.  We can't cleanup
-                                * records that were changed in this
-                                * version because the older versions
-                                * may still be in use by an open
-                                * version.
-                                *
-                                * We can, however, discard the
-                                * changed records for things that
-                                * we've added that didn't exist in
-                                * prior versions.
-                                */
-                               cleanup_nondirty(version, &cleanup_list);
-                       }
-                       /*
-                        * If the (soon to be former) current version
-                        * isn't being used by anyone, we can clean
-                        * it up.
-                        */
-                       if (cur_ref == 1) {
-                               cleanup_version = cur_version;
-                               ISC_LIST_APPENDLIST(
-                                       version->changed_list,
-                                       cleanup_version->changed_list, link);
+               INSIST(version == qpdb->future_version);
+               /*
+                * The current version is going to be replaced.
+                * Release the (likely last) reference to it from the
+                * DB itself and unlink it from the open list.
+                */
+               cur_version = qpdb->current_version;
+               cur_ref = isc_refcount_decrement(&cur_version->references);
+               if (cur_ref == 1) {
+                       (void)isc_refcount_current(&cur_version->references);
+                       if (cur_version->serial == qpdb->least_serial) {
+                               INSIST(ISC_LIST_EMPTY(
+                                       cur_version->changed_list));
                        }
+                       ISC_LIST_UNLINK(qpdb->open_versions, cur_version, link);
+               }
+               if (ISC_LIST_EMPTY(qpdb->open_versions)) {
                        /*
-                        * Become the current version.
-                        */
-                       version->writer = false;
-                       qpdb->current_version = version;
-                       qpdb->current_serial = version->serial;
-                       qpdb->future_version = NULL;
-
-                       /*
-                        * Keep the current version in the open list, and
-                        * gain a reference for the DB itself (see the DB
-                        * creation function below).  This must be the only
-                        * case where we need to increment the counter from
-                        * zero and need to use isc_refcount_increment0().
+                        * We're going to become the least open
+                        * version.
                         */
-                       INSIST(isc_refcount_increment0(&version->references) ==
-                              0);
-                       ISC_LIST_PREPEND(qpdb->open_versions,
-                                        qpdb->current_version, link);
-                       resigned_list = version->resigned_list;
-                       ISC_LIST_INIT(version->resigned_list);
+                       make_least_version(qpdb, version, &cleanup_list);
                } else {
                        /*
-                        * We're rolling back this transaction.
+                        * Some other open version is the
+                        * least version.  We can't cleanup
+                        * records that were changed in this
+                        * version because the older versions
+                        * may still be in use by an open
+                        * version.
+                        *
+                        * We can, however, discard the
+                        * changed records for things that
+                        * we've added that didn't exist in
+                        * prior versions.
                         */
-                       cleanup_list = version->changed_list;
-                       ISC_LIST_INIT(version->changed_list);
-                       resigned_list = version->resigned_list;
-                       ISC_LIST_INIT(version->resigned_list);
-                       rollback = true;
-                       cleanup_version = version;
-                       qpdb->future_version = NULL;
+                       cleanup_nondirty(version, &cleanup_list);
+               }
+               /*
+                * If the (soon to be former) current version
+                * isn't being used by anyone, we can clean
+                * it up.
+                */
+               if (cur_ref == 1) {
+                       cleanup_version = cur_version;
+                       ISC_LIST_APPENDLIST(version->changed_list,
+                                           cleanup_version->changed_list,
+                                           link);
+               }
+               /*
+                * Become the current version.
+                */
+               version->writer = false;
+               qpdb->current_version = version;
+               qpdb->current_serial = version->serial;
+               qpdb->future_version = NULL;
+
+               /*
+                * Keep the current version in the open list, and
+                * gain a reference for the DB itself (see the DB
+                * creation function below).  This must be the only
+                * case where we need to increment the counter from
+                * zero and need to use isc_refcount_increment0().
+                */
+               INSIST(isc_refcount_increment0(&version->references) == 0);
+               ISC_LIST_PREPEND(qpdb->open_versions, qpdb->current_version,
+                                link);
+               resigned_list = version->resigned_list;
+               ISC_LIST_INIT(version->resigned_list);
+       } else if (version->writer && !commit) {
+               /*
+                * We're rolling back this transaction.
+                */
+               cleanup_list = version->changed_list;
+               ISC_LIST_INIT(version->changed_list);
+               resigned_list = version->resigned_list;
+               ISC_LIST_INIT(version->resigned_list);
+               rollback = true;
+               cleanup_version = version;
+               qpdb->future_version = NULL;
+       } else if (version != qpdb->current_version) {
+               /*
+                * There are no external or internal references
+                * to this version and it can be cleaned up.
+                */
+               cleanup_version = version;
+
+               /*
+                * Find the version with the least serial
+                * number greater than ours.
+                */
+               least_greater = ISC_LIST_PREV(version, link);
+               if (least_greater == NULL) {
+                       least_greater = qpdb->current_version;
                }
-       } else {
-               if (version != qpdb->current_version) {
-                       /*
-                        * There are no external or internal references
-                        * to this version and it can be cleaned up.
-                        */
-                       cleanup_version = version;
 
+               INSIST(version->serial < least_greater->serial);
+               /*
+                * Is this the least open version?
+                */
+               if (version->serial == qpdb->least_serial) {
                        /*
-                        * Find the version with the least serial
-                        * number greater than ours.
+                        * Yes.  Install the new least open
+                        * version.
                         */
-                       least_greater = ISC_LIST_PREV(version, link);
-                       if (least_greater == NULL) {
-                               least_greater = qpdb->current_version;
-                       }
-
-                       INSIST(version->serial < least_greater->serial);
+                       make_least_version(qpdb, least_greater, &cleanup_list);
+               } else {
                        /*
-                        * Is this the least open version?
+                        * Add any unexecuted cleanups to
+                        * those of the least greater version.
                         */
-                       if (version->serial == qpdb->least_serial) {
-                               /*
-                                * Yes.  Install the new least open
-                                * version.
-                                */
-                               make_least_version(qpdb, least_greater,
-                                                  &cleanup_list);
-                       } else {
-                               /*
-                                * Add any unexecuted cleanups to
-                                * those of the least greater version.
-                                */
-                               ISC_LIST_APPENDLIST(least_greater->changed_list,
-                                                   version->changed_list,
-                                                   link);
-                       }
-               } else if (version->serial == qpdb->least_serial) {
-                       INSIST(ISC_LIST_EMPTY(version->changed_list));
+                       ISC_LIST_APPENDLIST(least_greater->changed_list,
+                                           version->changed_list, link);
                }
+       } else if (version->serial == qpdb->least_serial) {
+               INSIST(ISC_LIST_EMPTY(version->changed_list));
+       }
+
+       if (!version->writer && version != qpdb->current_version) {
                ISC_LIST_UNLINK(qpdb->open_versions, version, link);
        }
+
        least_serial = qpdb->least_serial;
        RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
 
@@ -1512,19 +1400,13 @@ closeversion(dns_db_t *db, dns_dbversion_t **versionp,
        for (header = ISC_LIST_HEAD(resigned_list); header != NULL;
             header = ISC_LIST_HEAD(resigned_list))
        {
-               isc_rwlock_t *nlock = NULL;
-               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-
                ISC_LIST_UNLINK(resigned_list, header, link);
 
-               nlock = &qpdb->buckets[HEADERNODE(header)->locknum].lock;
-               NODE_WRLOCK(nlock, &nlocktype);
                if (rollback && !IGNORE(header)) {
                        resigninsert(qpdb, header);
                }
-               qpznode_release(qpdb, HEADERNODE(header), least_serial,
-                               &nlocktype DNS__DB_FLARG_PASS);
-               NODE_UNLOCK(nlock, &nlocktype);
+               qpznode_release(qpdb, HEADERNODE(header),
+                               least_serial DNS__DB_FLARG_PASS);
        }
 
        dns_qp_t *tree = NULL, *nsec = NULL, *nsec3 = NULL;
@@ -1533,24 +1415,18 @@ closeversion(dns_db_t *db, dns_dbversion_t **versionp,
        for (changed = ISC_LIST_HEAD(cleanup_list); changed != NULL;
             changed = next_changed)
        {
-               isc_rwlock_t *nlock = NULL;
-               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-
                next_changed = ISC_LIST_NEXT(changed, link);
                node = changed->node;
-               nlock = &qpdb->buckets[node->locknum].lock;
 
-               NODE_WRLOCK(nlock, &nlocktype);
                if (rollback) {
                        rollback_node(node, serial);
                }
 
                qpznode_ref(node);
-               qpznode_release(qpdb, node, least_serial,
-                               &nlocktype DNS__DB_FILELINE);
+               qpznode_release(qpdb, node, least_serial DNS__DB_FILELINE);
 
                /* If the node is now empty, we can delete it. */
-               if (commit && node->data == NULL) {
+               if (commit && cds_list_empty_rcu(&node->headers)) {
                        switch ((int)node->nsec) {
                        case DNS_DB_NSEC_HAS_NSEC:
                                /*
@@ -1599,8 +1475,6 @@ closeversion(dns_db_t *db, dns_dbversion_t **versionp,
 
                qpznode_detach(&node);
 
-               NODE_UNLOCK(nlock, &nlocktype);
-
                if (next_changed == changed) {
                        /*
                         * We found a node to delete but didn't have a
@@ -1649,14 +1523,12 @@ qpzone_findrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                    dns_rdataset_t *sigrdataset DNS__DB_FLARG) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
        qpznode_t *node = (qpznode_t *)dbnode;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
+       dns_slabheader_list_t *hlist = NULL;
        dns_slabheader_t *found = NULL, *foundsig = NULL;
        uint32_t serial;
        qpz_version_t *version = (qpz_version_t *)dbversion;
        bool close_version = false;
        dns_typepair_t matchtype, sigmatchtype;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        REQUIRE(VALID_QPZONE(qpdb));
        REQUIRE(type != dns_rdatatype_any);
@@ -1668,9 +1540,6 @@ qpzone_findrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
        }
        serial = version->serial;
 
-       nlock = &qpdb->buckets[node->locknum].lock;
-       NODE_RDLOCK(nlock, &nlocktype);
-
        matchtype = DNS_TYPEPAIR_VALUE(type, covers);
        if (covers == 0) {
                sigmatchtype = DNS_SIGTYPE(type);
@@ -1678,36 +1547,36 @@ qpzone_findrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                sigmatchtype = 0;
        }
 
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-               do {
-                       if (header->serial <= serial && !IGNORE(header)) {
-                               if (NONEXISTENT(header)) {
-                                       header = NULL;
-                               }
+       rcu_read_lock();
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               if (hlist->type != matchtype && hlist->type != sigmatchtype) {
+                       continue;
+               }
+
+               dns_slabheader_t *header = NULL, *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial <= serial && !IGNORE(current)) {
+                               header = current;
                                break;
-                       } else {
-                               header = header->down;
                        }
-               } while (header != NULL);
-               if (header != NULL) {
-                       /*
-                        * We have an active, extant rdataset.  If it's a
-                        * type we're looking for, remember it.
-                        */
-                       if (header->type == matchtype) {
-                               found = header;
-                               if (foundsig != NULL) {
-                                       break;
-                               }
-                       } else if (header->type == sigmatchtype) {
-                               foundsig = header;
-                               if (found != NULL) {
-                                       break;
-                               }
+               }
+               if (header == NULL || NONEXISTENT(header)) {
+                       continue;
+               }
+
+               if (hlist->type == matchtype) {
+                       found = header;
+                       if (foundsig != NULL) {
+                               break;
+                       }
+               } else if (hlist->type == sigmatchtype) {
+                       foundsig = header;
+                       if (found != NULL) {
+                               break;
                        }
                }
        }
+
        if (found != NULL) {
                bindrdataset(qpdb, node, found, rdataset DNS__DB_FLARG_PASS);
                if (foundsig != NULL) {
@@ -1716,7 +1585,7 @@ qpzone_findrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                }
        }
 
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
 
        if (close_version) {
                closeversion(db, (dns_dbversion_t **)&version,
@@ -1751,7 +1620,7 @@ loading_addnode(qpz_load_t *loadctx, const dns_name_t *name,
                if (result == ISC_R_SUCCESS) {
                        *nodep = node;
                } else {
-                       node = new_qpznode(qpdb, name);
+                       qpznode_create(qpdb, name, &node);
                        result = dns_qp_insert(loadctx->nsec3, node, 0);
                        INSIST(result == ISC_R_SUCCESS);
                        node->nsec = DNS_DB_NSEC_NSEC3;
@@ -1769,8 +1638,7 @@ loading_addnode(qpz_load_t *loadctx, const dns_name_t *name,
                        goto done;
                }
        } else {
-               INSIST(node == NULL);
-               node = new_qpznode(qpdb, name);
+               qpznode_create(qpdb, name, &node);
                result = dns_qp_insert(loadctx->tree, node, 0);
                INSIST(result == ISC_R_SUCCESS);
                qpznode_unref(node);
@@ -1788,7 +1656,7 @@ loading_addnode(qpz_load_t *loadctx, const dns_name_t *name,
         * move on.
         */
        node->nsec = DNS_DB_NSEC_HAS_NSEC;
-       nsecnode = new_qpznode(qpdb, name);
+       qpznode_create(qpdb, name, &nsecnode);
        nsecnode->nsec = DNS_DB_NSEC_NSEC;
        (void)dns_qp_insert(loadctx->nsec, nsecnode, 0);
        qpznode_detach(&nsecnode);
@@ -1799,7 +1667,7 @@ done:
 
 static bool
 cname_and_other(qpznode_t *node, uint32_t serial) {
-       dns_slabheader_t *header = NULL, *header_next = NULL;
+       dns_slabheader_list_t *hlist = NULL;
        bool cname = false, other = false;
        dns_rdatatype_t rdtype;
 
@@ -1808,54 +1676,36 @@ cname_and_other(qpznode_t *node, uint32_t serial) {
         * ("Other data" is any rdataset whose type is not KEY, NSEC, SIG
         * or RRSIG.
         */
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-
-               rdtype = DNS_TYPEPAIR_TYPE(header->type);
-               if (rdtype == dns_rdatatype_cname) {
-                       do {
-                               if (header->serial <= serial && !IGNORE(header))
-                               {
-                                       if (NONEXISTENT(header)) {
-                                               header = NULL;
-                                       }
-                                       break;
-                               }
-                               header = header->down;
-                       } while (header != NULL);
-                       if (header != NULL) {
-                               cname = true;
-                       }
-               } else if (rdtype != dns_rdatatype_key &&
-                          rdtype != dns_rdatatype_sig &&
-                          rdtype != dns_rdatatype_nsec &&
-                          rdtype != dns_rdatatype_rrsig)
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               rdtype = DNS_TYPEPAIR_TYPE(hlist->type);
+               if (rdtype == dns_rdatatype_key ||
+                   rdtype == dns_rdatatype_sig ||
+                   rdtype == dns_rdatatype_nsec ||
+                   rdtype == dns_rdatatype_rrsig)
                {
-                       do {
-                               if (header->serial <= serial && !IGNORE(header))
-                               {
-                                       if (NONEXISTENT(header)) {
-                                               header = NULL;
-                                       }
-                                       break;
-                               }
-                               header = header->down;
-                       } while (header != NULL);
-                       if (header != NULL) {
-                               if (!prio_type(header->type)) {
-                                       /*
-                                        * CNAME is in the priority list, so if
-                                        * we are done with priority types, we
-                                        * know there will not be a CNAME, and
-                                        * are safe to skip the rest.
-                                        */
-                                       return cname;
-                               }
-                               other = true;
+                       /* These types can coexist with CNAME, skip them. */
+                       continue;
+               }
+
+               dns_slabheader_t *header = NULL, *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial <= serial && !IGNORE(current)) {
+                               header = current;
+                               break;
                        }
                }
+               if (header == NULL || NONEXISTENT(header)) {
+                       continue;
+               }
+
+               if (rdtype == dns_rdatatype_cname) {
+                       cname = true;
+               } else {
+                       other = true;
+               }
 
                if (cname && other) {
+                       /* We've seen CNAME, but now found other type. */
                        return true;
                }
        }
@@ -1874,8 +1724,12 @@ add_changed(dns_slabheader_t *header, qpz_version_t *version DNS__DB_FLARG) {
        RWLOCK(&qpdb->lock, isc_rwlocktype_write);
        REQUIRE(version->writer);
 
-       *changed = (qpz_changed_t){ .node = node };
-       ISC_LIST_INITANDAPPEND(version->changed_list, changed, link);
+       *changed = (qpz_changed_t){
+               .node = node,
+               .link = ISC_LINK_INITIALIZER,
+       };
+       ISC_LIST_APPEND(version->changed_list, changed, link);
+
        qpznode_acquire(qpdb, node DNS__DB_FLARG_PASS);
        RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
 
@@ -1912,9 +1766,8 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
     bool loading, dns_rdataset_t *addedrdataset,
     isc_stdtime_t now ISC_ATTR_UNUSED DNS__DB_FLARG) {
        qpz_changed_t *changed = NULL;
-       dns_slabheader_t *topheader = NULL, *topheader_prev = NULL;
-       dns_slabheader_t *prioheader = NULL;
-       dns_slabheader_t *header = NULL;
+       dns_slabheader_list_t *hlist = NULL, *hcurrent = NULL;
+       dns_slabheader_t *topheader = NULL, *header = NULL, *current = NULL;
        dns_slabheader_t *merged = NULL;
        isc_result_t result;
        bool merge = false;
@@ -1935,28 +1788,38 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
        }
 
        ntypes = 0;
-       for (topheader = node->data; topheader != NULL;
-            topheader = topheader->next)
-       {
+       cds_list_for_each_entry_rcu (hcurrent, &node->headers, nnode) {
                ++ntypes;
-               if (prio_type(topheader->type)) {
-                       prioheader = topheader;
-               }
-               if (topheader->type == newheader->type) {
+
+               if (hcurrent->type == newheader->type) {
+                       hlist = hcurrent;
+
+                       /*
+                        * We can break early because `ntypes` is only used when
+                        * the slabheader list for the type is not found, e.g.
+                        * hlist == NULL && header == NULL below.
+                        */
                        break;
                }
-               topheader_prev = topheader;
        }
 
        /*
-        * If topheader isn't NULL, we've found the right type.  There may be
-        * IGNORE rdatasets between the top of the chain and the first real
-        * data.  We skip over them.
+        * If hlist isn't NULL, we've found the right type.
         */
-       header = topheader;
-       while (header != NULL && IGNORE(header)) {
-               header = header->down;
+       if (hlist != NULL) {
+               /*
+                * There may be IGNORE rdatasets between the top of the
+                * chain and the first real data.  We skip over them.
+                */
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (!IGNORE(current)) {
+                               header = current;
+                               break;
+                       }
+               }
        }
+
+       topheader = header;
        if (header != NULL) {
                /*
                 * If 'merge' is true and header isn't empty/nonexistent,
@@ -2023,7 +1886,6 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
 
                INSIST(version->serial >= topheader->serial);
                if (loading) {
-                       newheader->down = NULL;
                        if (RESIGN(newheader)) {
                                resigninsert(qpdb, newheader);
                                /* resigndelete not needed here */
@@ -2035,12 +1897,8 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
                         * Since we don't generate changed records when
                         * loading, we MUST clean up 'header' now.
                         */
-                       if (topheader_prev != NULL) {
-                               topheader_prev->next = newheader;
-                       } else {
-                               node->data = newheader;
-                       }
-                       newheader->next = topheader->next;
+                       cds_list_del_rcu(&header->dnode);
+                       cds_list_add_rcu(&newheader->dnode, &hlist->versions);
                        maybe_update_recordsandsize(false, version, header,
                                                    nodename->length);
                        dns_slabheader_destroy(&header);
@@ -2050,18 +1908,15 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
                                resigndelete(qpdb, version,
                                             header DNS__DB_FLARG_PASS);
                        }
-                       if (topheader_prev != NULL) {
-                               topheader_prev->next = newheader;
-                       } else {
-                               node->data = newheader;
-                       }
-                       newheader->next = topheader->next;
-                       newheader->down = topheader;
-                       topheader->up = newheader;
-                       node->dirty = true;
+
+                       cds_list_add_rcu(&newheader->dnode, &hlist->versions);
+
+                       CMM_STORE_SHARED(node->dirty, true);
+
                        if (changed != NULL) {
                                changed->dirty = true;
                        }
+
                        maybe_update_recordsandsize(false, version, header,
                                                    nodename->length);
                }
@@ -2077,12 +1932,7 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
                        return DNS_R_UNCHANGED;
                }
 
-               if (RESIGN(newheader)) {
-                       resigninsert(qpdb, newheader);
-                       resigndelete(qpdb, version, header DNS__DB_FLARG_PASS);
-               }
-
-               if (topheader != NULL) {
+               if (hlist != NULL) {
                        /*
                         * We have a list of rdatasets of the given type,
                         * but they're all marked IGNORE.  We simply insert
@@ -2092,58 +1942,47 @@ add(qpzonedb_t *qpdb, qpznode_t *node, const dns_name_t *nodename,
                         * we INSIST on it.
                         */
                        INSIST(!loading);
-                       INSIST(version->serial >= topheader->serial);
-                       if (topheader_prev != NULL) {
-                               topheader_prev->next = newheader;
-                       } else {
-                               node->data = newheader;
-                       }
-                       newheader->next = topheader->next;
-                       newheader->down = topheader;
-                       topheader->up = newheader;
-                       if (changed != NULL) {
-                               changed->dirty = true;
-                       }
-                       node->dirty = true;
+               } else if (qpdb->maxtypepername > 0 &&
+                          ntypes >= qpdb->maxtypepername)
+               {
+                       dns_slabheader_destroy(&newheader);
+                       return DNS_R_TOOMANYRECORDS;
                } else {
                        /*
                         * No rdatasets of the given type exist at the node.
                         */
 
-                       if (qpdb->maxtypepername > 0 &&
-                           ntypes >= qpdb->maxtypepername)
-                       {
-                               dns_slabheader_destroy(&newheader);
-                               return DNS_R_TOOMANYRECORDS;
-                       }
+                       dns_slabheader_createlist(qpdb->common.mctx, &hlist);
+                       hlist->type = newheader->type;
+                       cds_list_add_rcu(&hlist->nnode, &node->headers);
+               }
 
-                       INSIST(newheader->down == NULL);
+               if (RESIGN(newheader)) {
+                       resigninsert(qpdb, newheader);
+                       resigndelete(qpdb, version, header DNS__DB_FLARG_PASS);
+               }
 
-                       if (prio_type(newheader->type)) {
-                               /* This is a priority type, prepend it */
-                               newheader->next = node->data;
-                               node->data = newheader;
-                       } else if (prioheader != NULL) {
-                               /* Append after the priority headers */
-                               newheader->next = prioheader->next;
-                               prioheader->next = newheader;
-                       } else {
-                               /* There were no priority headers */
-                               newheader->next = node->data;
-                               node->data = newheader;
-                       }
+               cds_list_add_rcu(&newheader->dnode, &hlist->versions);
+
+               CMM_STORE_SHARED(node->dirty, true);
+
+               if (changed != NULL) {
+                       changed->dirty = true;
+               }
+
+               /*
+                * Check if the node now contains CNAME and other data.
+                *
+                * FIXME: Do this earlier before adding the new header
+                * to the node.
+                */
+               if (cname_and_other(node, version->serial)) {
+                       return DNS_R_CNAMEANDOTHER;
                }
        }
 
        maybe_update_recordsandsize(true, version, newheader, nodename->length);
 
-       /*
-        * Check if the node now contains CNAME and other data.
-        */
-       if (cname_and_other(node, version->serial)) {
-               return DNS_R_CNAMEANDOTHER;
-       }
-
        if (addedrdataset != NULL) {
                bindrdataset(qpdb, node, newheader,
                             addedrdataset DNS__DB_FLARG_PASS);
@@ -2168,14 +2007,13 @@ wildcardmagic(qpzonedb_t *qpdb, dns_qp_t *qp, const dns_name_t *name) {
        /* insert an empty node, if needed, to hold the wildcard bit */
        result = dns_qp_getname(qp, &foundname, (void **)&node, NULL);
        if (result != ISC_R_SUCCESS) {
-               INSIST(node == NULL);
-               node = new_qpznode(qpdb, &foundname);
+               qpznode_create(qpdb, &foundname, &node);
                result = dns_qp_insert(qp, node, 0);
                INSIST(result == ISC_R_SUCCESS);
                qpznode_unref(node);
        }
 
-       node->wild = true;
+       CMM_STORE_SHARED(node->wild, true);
 }
 
 static void
@@ -2206,8 +2044,6 @@ loading_addrdataset(void *arg, const dns_name_t *name,
        isc_result_t result = ISC_R_SUCCESS;
        isc_region_t region;
        dns_slabheader_t *newheader = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        REQUIRE(rdataset->rdclass == qpdb->common.rdclass);
 
@@ -2274,16 +2110,17 @@ loading_addrdataset(void *arg, const dns_name_t *name,
                newheader->resign_lsb = rdataset->resign & 0x1;
        }
 
-       nlock = &qpdb->buckets[node->locknum].lock;
-       NODE_WRLOCK(nlock, &nlocktype);
+       isc_spinlock_lock(&node->spinlock);
+       rcu_read_lock();
        result = add(qpdb, node, name, qpdb->current_version, newheader,
                     DNS_DBADD_MERGE, true, NULL, 0 DNS__DB_FLARG_PASS);
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
+       isc_spinlock_unlock(&node->spinlock);
 
        if (result == ISC_R_SUCCESS &&
            delegating_type(qpdb, node, rdataset->type))
        {
-               node->delegating = true;
+               CMM_STORE_SHARED(node->delegating, true);
        } else if (result == DNS_R_UNCHANGED) {
                result = ISC_R_SUCCESS;
        }
@@ -2474,18 +2311,16 @@ static isc_result_t
 setsigningtime(dns_db_t *db, dns_rdataset_t *rdataset, isc_stdtime_t resign) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
        dns_slabheader_t *header = NULL, oldheader;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        REQUIRE(VALID_QPZONE(qpdb));
        REQUIRE(rdataset != NULL);
        REQUIRE(rdataset->methods == &dns_rdataslab_rdatasetmethods);
 
-       header = dns_rdataset_getheader(rdataset);
+       /* FIXME: This should be (in theory) serialized via zone TID */
 
-       nlock = &qpdb->buckets[HEADERNODE(header)->locknum].lock;
-       NODE_WRLOCK(nlock, &nlocktype);
+       header = dns_rdataset_getheader(rdataset);
 
+       /* FIXME: This is so so so weird */
        oldheader = *header;
 
        /*
@@ -2515,7 +2350,7 @@ setsigningtime(dns_db_t *db, dns_rdataset_t *rdataset, isc_stdtime_t resign) {
                DNS_SLABHEADER_SETATTR(header, DNS_SLABHEADERATTR_RESIGN);
                resigninsert(qpdb, header);
        }
-       NODE_UNLOCK(nlock, &nlocktype);
+
        return ISC_R_SUCCESS;
 }
 
@@ -2524,9 +2359,6 @@ getsigningtime(dns_db_t *db, isc_stdtime_t *resign, dns_name_t *foundname,
               dns_typepair_t *typepair) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
        dns_slabheader_t *header = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
-       uint16_t locknum;
        isc_result_t result = ISC_R_NOTFOUND;
 
        REQUIRE(VALID_QPZONE(qpdb));
@@ -2536,27 +2368,6 @@ getsigningtime(dns_db_t *db, isc_stdtime_t *resign, dns_name_t *foundname,
 
        RWLOCK(&qpdb->lock, isc_rwlocktype_read);
        header = isc_heap_element(qpdb->heap, 1);
-       if (header == NULL) {
-               RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
-               return ISC_R_NOTFOUND;
-       }
-       locknum = HEADERNODE(header)->locknum;
-       RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
-
-again:
-       nlock = &qpdb->buckets[locknum].lock;
-
-       NODE_RDLOCK(nlock, &nlocktype);
-
-       RWLOCK(&qpdb->lock, isc_rwlocktype_read);
-       header = isc_heap_element(qpdb->heap, 1);
-       if (header != NULL && HEADERNODE(header)->locknum != locknum) {
-               RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
-               NODE_UNLOCK(nlock, &nlocktype);
-               locknum = HEADERNODE(header)->locknum;
-               goto again;
-       }
-
        if (header != NULL) {
                *resign = RESIGN(header)
                                  ? (header->resign << 1) | header->resign_lsb
@@ -2566,7 +2377,6 @@ again:
                result = ISC_R_SUCCESS;
        }
        RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
-       NODE_UNLOCK(nlock, &nlocktype);
 
        return result;
 }
@@ -2606,7 +2416,7 @@ findnodeintree(qpzonedb_t *qpdb, const dns_name_t *name, bool create,
                        return result;
                }
 
-               node = new_qpznode(qpdb, name);
+               qpznode_create(qpdb, name, &node);
                result = dns_qp_insert(qp, node, 0);
                INSIST(result == ISC_R_SUCCESS);
                qpznode_unref(node);
@@ -2737,10 +2547,6 @@ qpzone_setup_delegation(qpz_search_t *search, dns_dbnode_t **nodep,
                search->need_cleanup = false;
        }
        if (rdataset != NULL) {
-               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-               isc_rwlock_t *nlock =
-                       &search->qpdb->buckets[node->locknum].lock;
-               NODE_RDLOCK(nlock, &nlocktype);
                bindrdataset(search->qpdb, node, search->zonecut_header,
                             rdataset DNS__DB_FLARG_PASS);
                if (sigrdataset != NULL && search->zonecut_sigheader != NULL) {
@@ -2748,7 +2554,6 @@ qpzone_setup_delegation(qpz_search_t *search, dns_dbnode_t **nodep,
                                     search->zonecut_sigheader,
                                     sigrdataset DNS__DB_FLARG_PASS);
                }
-               NODE_UNLOCK(nlock, &nlocktype);
        }
 
        if (type == dns_rdatatype_dname) {
@@ -2770,29 +2575,35 @@ step(qpz_search_t *search, dns_qpiter_t *it, direction_t direction,
      dns_name_t *nextname) {
        dns_fixedname_t fnodename;
        dns_name_t *nodename = dns_fixedname_initname(&fnodename);
-       qpzonedb_t *qpdb = NULL;
        qpznode_t *node = NULL;
-       isc_result_t result = ISC_R_SUCCESS;
-       dns_slabheader_t *header = NULL;
-
-       qpdb = search->qpdb;
+       isc_result_t result;
 
        result = dns_qpiter_current(it, nodename, (void **)&node, NULL);
        while (result == ISC_R_SUCCESS) {
-               isc_rwlock_t *nlock = &qpdb->buckets[node->locknum].lock;
-               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+               dns_slabheader_list_t *hlist = NULL;
+               dns_slabheader_t *found = NULL;
 
-               NODE_RDLOCK(nlock, &nlocktype);
-               for (header = node->data; header != NULL; header = header->next)
-               {
-                       if (header->serial <= search->serial &&
-                           !IGNORE(header) && !NONEXISTENT(header))
+               cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+                       dns_slabheader_t *header = NULL, *current = NULL;
+                       cds_list_for_each_entry_rcu (current, &hlist->versions,
+                                                    dnode)
                        {
-                               break;
+                               if (current->serial <= search->serial &&
+                                   !IGNORE(current))
+                               {
+                                       header = current;
+                                       break;
+                               }
+                       }
+                       if (header == NULL || NONEXISTENT(header)) {
+                               continue;
                        }
+
+                       found = header;
+                       break;
                }
-               NODE_UNLOCK(nlock, &nlocktype);
-               if (header != NULL) {
+
+               if (found != NULL) {
                        break;
                }
 
@@ -2909,122 +2720,114 @@ wildcard_blocked(qpz_search_t *search, const dns_name_t *qname,
 }
 
 static isc_result_t
-find_wildcard(qpz_search_t *search, qpznode_t **nodep,
-             const dns_name_t *qname) {
-       dns_slabheader_t *header = NULL;
-       isc_result_t result = ISC_R_NOTFOUND;
-       qpzonedb_t *qpdb = search->qpdb;
+find__wildcard(qpz_search_t *search, qpznode_t *snode, qpznode_t **nodep,
+              const dns_name_t *qname) {
+       dns_slabheader_list_t *hlist = NULL;
+       qpznode_t *node = NULL;
+       dns_fixedname_t fname;
+       dns_name_t *name = dns_fixedname_initname(&fname);
+       dns_qpiter_t it;
+       isc_result_t result;
+       dns_slabheader_t *found = NULL;
 
        /*
-        * Examine each ancestor level.  If the level's wild bit
-        * is set, then construct the corresponding wildcard name and
-        * search for it.  If the wildcard node exists, and is active in
-        * this version, we're done.  If not, then we next check to see
-        * if the ancestor is active in this version.  If so, then there
-        * can be no possible wildcard match and again we're done.  If not,
-        * continue the search.
+        * Construct the wildcard name for this level.
         */
-       for (int i = dns_qpchain_length(&search->chain) - 1; i >= 0; i--) {
-               qpznode_t *node = NULL;
-               isc_rwlock_t *nlock = NULL;
-               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-               bool wild, active;
+       result = dns_name_concatenate(dns_wildcardname, &snode->name, name);
+       if (result != ISC_R_SUCCESS) {
+               return result;
+       }
 
-               dns_qpchain_node(&search->chain, i, NULL, (void **)&node, NULL);
+       result = dns_qp_lookup(&search->qpr, name, NULL, &it, NULL,
+                              (void **)&node, NULL);
+       if (result != ISC_R_NOTFOUND && result != DNS_R_PARTIALMATCH) {
+               return result;
+       }
 
-               nlock = &qpdb->buckets[node->locknum].lock;
-               NODE_RDLOCK(nlock, &nlocktype);
-               /*
-                * First we try to figure out if this node is active in
-                * the search's version.  We do this now, even though we
-                * may not need the information, because it simplifies the
-                * locking and code flow.
-                */
-               for (header = node->data; header != NULL; header = header->next)
-               {
-                       if (header->serial <= search->serial &&
-                           !IGNORE(header) && !NONEXISTENT(header))
+       INSIST(result == ISC_R_SUCCESS);
+       /*
+        * We have found the wildcard node.  If it is active in
+        * the search's version, we're done.
+        */
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               dns_slabheader_t *header = NULL, *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial <= search->serial &&
+                           !IGNORE(current))
                        {
-                               break;
+                               header = current;
                        }
                }
+               if (header == NULL || NONEXISTENT(header)) {
+                       continue;
+               }
+
+               found = header;
+       }
+       if (found != NULL || activeempty(search, &it, name)) {
+               if (wildcard_blocked(search, qname, name)) {
+                       return ISC_R_NOTFOUND;
+               }
+
+               /*
+                * The wildcard node is active!
+                */
+               *nodep = node;
+               return ISC_R_SUCCESS;
+       }
+
+       return DNS_R_CONTINUE;
+}
+
+static isc_result_t
+find_wildcard(qpz_search_t *search, qpznode_t **nodep,
+             const dns_name_t *qname) {
+       isc_result_t result = ISC_R_NOTFOUND;
 
-               active = (header != NULL);
-               wild = node->wild;
-               NODE_UNLOCK(nlock, &nlocktype);
+       /*
+        * Examine each ancestor level.  If the level's wild bit
+        * is set, then construct the corresponding wildcard name and
+        * search for it.  If the wildcard node exists, and is active in
+        * this version, we're done.  If not, then we next check to see
+        * if the ancestor is active in this version.  If so, then there
+        * can be no possible wildcard match and again we're done.  If
+        * not, continue the search.
+        */
+       for (int i = dns_qpchain_length(&search->chain) - 1; i >= 0; i--) {
+               qpznode_t *node = NULL;
+               dns_slabheader_list_t *hlist = NULL;
 
-               if (wild) {
-                       qpznode_t *wnode = NULL;
-                       dns_fixedname_t fwname;
-                       dns_name_t *wname = dns_fixedname_initname(&fwname);
-                       dns_qpiter_t wit;
+               dns_qpchain_node(&search->chain, i, NULL, (void **)&node, NULL);
 
-                       /*
-                        * Construct the wildcard name for this level.
-                        */
-                       result = dns_name_concatenate(dns_wildcardname,
-                                                     &node->name, wname);
-                       if (result != ISC_R_SUCCESS) {
+               if (CMM_LOAD_SHARED(node->wild)) {
+                       result = find__wildcard(search, node, nodep, qname);
+                       if (result != DNS_R_CONTINUE) {
                                break;
                        }
+               }
 
-                       result = dns_qp_lookup(&search->qpr, wname, NULL, &wit,
-                                              NULL, (void **)&wnode, NULL);
-                       if (result == ISC_R_SUCCESS) {
-                               /*
-                                * We have found the wildcard node.  If it
-                                * is active in the search's version, we're
-                                * done.
-                                */
-                               nlock = &qpdb->buckets[wnode->locknum].lock;
-                               NODE_RDLOCK(nlock, &nlocktype);
-                               for (header = wnode->data; header != NULL;
-                                    header = header->next)
-                               {
-                                       if (header->serial <= search->serial &&
-                                           !IGNORE(header) &&
-                                           !NONEXISTENT(header))
-                                       {
-                                               break;
-                                       }
-                               }
-                               NODE_UNLOCK(nlock, &nlocktype);
-                               if (header != NULL ||
-                                   activeempty(search, &wit, wname))
+               cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+                       dns_slabheader_t *header = NULL, *current = NULL;
+                       cds_list_for_each_entry_rcu (current, &hlist->versions,
+                                                    dnode)
+                       {
+                               if (current->serial <= search->serial &&
+                                   !IGNORE(current))
                                {
-                                       if (wildcard_blocked(search, qname,
-                                                            wname))
-                                       {
-                                               return ISC_R_NOTFOUND;
-                                       }
-
-                                       /*
-                                        * The wildcard node is active!
-                                        *
-                                        * Note: result is still ISC_R_SUCCESS
-                                        * so we don't have to set it.
-                                        */
-                                       *nodep = wnode;
+                                       header = current;
                                        break;
                                }
-                       } else if (result != ISC_R_NOTFOUND &&
-                                  result != DNS_R_PARTIALMATCH)
-                       {
-                               /*
-                                * An error has occurred.  Bail out.
-                                */
-                               break;
                        }
-               }
+                       if (header == NULL || NONEXISTENT(header)) {
+                               continue;
+                       }
 
-               if (active) {
                        /*
                         * The level node is active.  Any wildcarding
                         * present at higher levels has no
                         * effect and we're done.
                         */
-                       result = ISC_R_NOTFOUND;
-                       break;
+                       return ISC_R_NOTFOUND;
                }
        }
 
@@ -3129,7 +2932,6 @@ find_closest_nsec(qpz_search_t *search, dns_dbnode_t **nodep,
                  dns_rdataset_t *sigrdataset, bool nsec3,
                  bool secure DNS__DB_FLARG) {
        qpznode_t *node = NULL, *prevnode = NULL;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
        dns_qpiter_t nseciter;
        bool empty_node;
        isc_result_t result;
@@ -3158,45 +2960,41 @@ find_closest_nsec(qpz_search_t *search, dns_dbnode_t **nodep,
 again:
        do {
                dns_slabheader_t *found = NULL, *foundsig = NULL;
-               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-               isc_rwlock_t *nlock =
-                       &search->qpdb->buckets[node->locknum].lock;
-               NODE_RDLOCK(nlock, &nlocktype);
+               dns_slabheader_list_t *hlist = NULL;
+
                empty_node = true;
-               for (header = node->data; header != NULL; header = header_next)
-               {
-                       header_next = header->next;
+
+               rcu_read_lock();
+               cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+                       dns_slabheader_t *header = NULL, *current = NULL;
+                       cds_list_for_each_entry_rcu (current, &hlist->versions,
+                                                    dnode)
+                       {
+                               if (current->serial <= search->serial &&
+                                   !IGNORE(current))
+                               {
+                                       header = current;
+                                       break;
+                               }
+                       }
+                       if (header == NULL || NONEXISTENT(header)) {
+                               continue;
+                       }
+
                        /*
-                        * Look for an active, extant NSEC or RRSIG NSEC.
+                        * We now know that there is at least one
+                        * active rdataset at this node.
                         */
-                       do {
-                               if (header->serial <= search->serial &&
-                                   !IGNORE(header))
-                               {
-                                       if (NONEXISTENT(header)) {
-                                               header = NULL;
-                                       }
+                       empty_node = false;
+                       if (header->type == type) {
+                               found = header;
+                               if (foundsig != NULL) {
                                        break;
-                               } else {
-                                       header = header->down;
                                }
-                       } while (header != NULL);
-                       if (header != NULL) {
-                               /*
-                                * We now know that there is at least one
-                                * active rdataset at this node.
-                                */
-                               empty_node = false;
-                               if (header->type == type) {
-                                       found = header;
-                                       if (foundsig != NULL) {
-                                               break;
-                                       }
-                               } else if (header->type == sigtype) {
-                                       foundsig = header;
-                                       if (found != NULL) {
-                                               break;
-                                       }
+                       } else if (header->type == sigtype) {
+                               foundsig = header;
+                               if (found != NULL) {
+                                       break;
                                }
                        }
                }
@@ -3215,13 +3013,14 @@ again:
                                   (foundsig != NULL || !need_sig))
                        {
                                /*
-                                * We've found the right NSEC/NSEC3 record.
+                                * We've found the right NSEC/NSEC3
+                                * record.
                                 *
                                 * Note: for this to really be the right
-                                * NSEC record, it's essential that the NSEC
-                                * records of any nodes obscured by a zone
-                                * cut have been removed; we assume this is
-                                * the case.
+                                * NSEC record, it's essential that the
+                                * NSEC records of any nodes obscured by
+                                * a zone cut have been removed; we
+                                * assume this is the case.
                                 */
                                dns_name_copy(name, foundname);
                                if (nodep != NULL) {
@@ -3241,11 +3040,12 @@ again:
                                }
                        } else if (found == NULL && foundsig == NULL) {
                                /*
-                                * This node is active, but has no NSEC or
-                                * RRSIG NSEC.  That means it's glue or
-                                * other obscured zone data that isn't
-                                * relevant for our search.  Treat the
-                                * node as if it were empty and keep looking.
+                                * This node is active, but has no NSEC
+                                * or RRSIG NSEC.  That means it's glue
+                                * or other obscured zone data that
+                                * isn't relevant for our search.  Treat
+                                * the node as if it were empty and keep
+                                * looking.
                                 */
                                empty_node = true;
                                result = previous_closest_nsec(
@@ -3253,9 +3053,9 @@ again:
                                        &nseciter, &first);
                        } else {
                                /*
-                                * We found an active node, but either the
-                                * NSEC or the RRSIG NSEC is missing.  This
-                                * shouldn't happen.
+                                * We found an active node, but either
+                                * the NSEC or the RRSIG NSEC is
+                                * missing.  This shouldn't happen.
                                 */
                                result = DNS_R_BADDB;
                        }
@@ -3268,7 +3068,8 @@ again:
                                                       &prevnode, &nseciter,
                                                       &first);
                }
-               NODE_UNLOCK(nlock, &nlocktype);
+               rcu_read_unlock();
+
                node = prevnode;
                prevnode = NULL;
        } while (empty_node && result == ISC_R_SUCCESS);
@@ -3297,55 +3098,48 @@ again:
 static isc_result_t
 qpzone_check_zonecut(qpznode_t *node, void *arg DNS__DB_FLARG) {
        qpz_search_t *search = arg;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
+       dns_slabheader_list_t *hlist = NULL;
        dns_slabheader_t *dname_header = NULL, *sigdname_header = NULL;
        dns_slabheader_t *ns_header = NULL;
        dns_slabheader_t *found = NULL;
        isc_result_t result = DNS_R_CONTINUE;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = &search->qpdb->buckets[node->locknum].lock;
-
-       NODE_RDLOCK(nlock, &nlocktype);
 
        /*
         * Look for an NS or DNAME rdataset active in our version.
         */
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-               if (header->type == dns_rdatatype_ns ||
-                   header->type == dns_rdatatype_dname ||
-                   header->type == DNS_SIGTYPE(dns_rdatatype_dname))
+       rcu_read_lock();
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               if (hlist->type != dns_rdatatype_ns &&
+                   hlist->type != dns_rdatatype_dname &&
+                   hlist->type == DNS_SIGTYPE(dns_rdatatype_dname))
                {
-                       do {
-                               if (header->serial <= search->serial &&
-                                   !IGNORE(header))
-                               {
-                                       if (NONEXISTENT(header)) {
-                                               header = NULL;
-                                       }
-                                       break;
-                               } else {
-                                       header = header->down;
-                               }
-                       } while (header != NULL);
-                       if (header != NULL) {
-                               if (header->type == dns_rdatatype_dname) {
-                                       dname_header = header;
-                               } else if (header->type ==
-                                          DNS_SIGTYPE(dns_rdatatype_dname))
-                               {
-                                       sigdname_header = header;
-                               } else if (node != search->qpdb->origin ||
-                                          IS_STUB(search->qpdb))
-                               {
-                                       /*
-                                        * We've found an NS rdataset that
-                                        * isn't at the origin node.
-                                        */
-                                       ns_header = header;
-                               }
+                       continue;
+               }
+               dns_slabheader_t *header = NULL, *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial <= search->serial &&
+                           !IGNORE(current))
+                       {
+                               header = current;
+                               break;
                        }
                }
+               if (header == NULL || NONEXISTENT(current)) {
+                       continue;
+               }
+               if (header->type == dns_rdatatype_dname) {
+                       dname_header = header;
+               } else if (header->type == DNS_SIGTYPE(dns_rdatatype_dname)) {
+                       sigdname_header = header;
+               } else if (node != search->qpdb->origin ||
+                          IS_STUB(search->qpdb))
+               {
+                       /*
+                        * We've found an NS rdataset that
+                        * isn't at the origin node.
+                        */
+                       ns_header = header;
+               }
        }
 
        /*
@@ -3380,7 +3174,7 @@ qpzone_check_zonecut(qpznode_t *node, void *arg DNS__DB_FLARG) {
                 * glue and is not subject to wildcard matching, so we
                 * may clear search->wild.
                 */
-               search->wild = false;
+               CMM_STORE_SHARED(search->wild, false);
                if ((search->options & DNS_DBFIND_GLUEOK) == 0) {
                        /*
                         * If the caller does not want to find glue, then
@@ -3410,12 +3204,14 @@ qpzone_check_zonecut(qpznode_t *node, void *arg DNS__DB_FLARG) {
                 * in case we need to go searching for wildcard matches
                 * later on.
                 */
-               if (node->wild && (search->options & DNS_DBFIND_NOWILD) == 0) {
+               if (CMM_LOAD_SHARED(node->wild) &&
+                   (search->options & DNS_DBFIND_NOWILD) == 0)
+               {
                        search->wild = true;
                }
        }
 
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
 
        return result;
 }
@@ -3434,13 +3230,11 @@ qpzone_find(dns_db_t *db, const dns_name_t *name, dns_dbversion_t *version,
        bool maybe_zonecut = false, at_zonecut = false;
        bool wild = false, empty_node = false;
        bool nsec3 = false;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
+       dns_slabheader_list_t *hlist = NULL;
        dns_slabheader_t *found = NULL, *nsecheader = NULL;
        dns_slabheader_t *foundsig = NULL, *cnamesig = NULL, *nsecsig = NULL;
        dns_typepair_t sigtype;
        bool active;
-       isc_rwlock_t *nlock = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
 
        REQUIRE(VALID_QPZONE((qpzonedb_t *)db));
        INSIST(version == NULL ||
@@ -3504,6 +3298,7 @@ qpzone_find(dns_db_t *db, const dns_name_t *name, dns_dbversion_t *version,
                }
        }
 
+       rcu_read_lock();
        if (result == DNS_R_PARTIALMATCH) {
        partial_match:
                if (search.zonecut != NULL) {
@@ -3569,9 +3364,6 @@ found:
         * have matched a wildcard.
         */
 
-       nlock = &search.qpdb->buckets[node->locknum].lock;
-       NODE_RDLOCK(nlock, &nlocktype);
-
        if (search.zonecut != NULL) {
                /*
                 * If we're beneath a zone cut, we don't want to look for
@@ -3589,9 +3381,10 @@ found:
                 * Stub zones don't have anything "above" the delegation so
                 * we always return a referral.
                 */
-               if (node->delegating && ((node != search.qpdb->origin &&
-                                         !dns_rdatatype_atparent(type)) ||
-                                        IS_STUB(search.qpdb)))
+               if (CMM_LOAD_SHARED(node->delegating) &&
+                   ((node != search.qpdb->origin &&
+                     !dns_rdatatype_atparent(type)) ||
+                    IS_STUB(search.qpdb)))
                {
                        maybe_zonecut = true;
                }
@@ -3614,154 +3407,145 @@ found:
 
        sigtype = DNS_SIGTYPE(type);
        empty_node = true;
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-               /*
-                * Look for an active, extant rdataset.
-                */
-               do {
-                       if (header->serial <= search.serial && !IGNORE(header))
+
+       cds_list_for_each_entry_rcu (hlist, &node->headers, nnode) {
+               dns_slabheader_t *header = NULL, *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (current->serial <= search.serial &&
+                           !IGNORE(current))
                        {
-                               if (NONEXISTENT(header)) {
-                                       header = NULL;
-                               }
+                               header = current;
                                break;
-                       } else {
-                               header = header->down;
                        }
-               } while (header != NULL);
-               if (header != NULL) {
+               }
+               if (header == NULL || NONEXISTENT(header)) {
+                       continue;
+               }
+
+               /*
+                * We now know that there is at least one active
+                * rdataset at this node.
+                */
+               empty_node = false;
+
+               /*
+                * Do special zone cut handling, if requested.
+                */
+               if (maybe_zonecut && header->type == dns_rdatatype_ns) {
                        /*
-                        * We now know that there is at least one active
-                        * rdataset at this node.
+                        * We increment the reference count on node to
+                        * ensure that search->zonecut_header will
+                        * still be valid later.
                         */
-                       empty_node = false;
-
+                       qpznode_acquire(search.qpdb, node DNS__DB_FLARG_PASS);
+                       search.zonecut = node;
+                       search.zonecut_header = header;
+                       search.zonecut_sigheader = NULL;
+                       search.need_cleanup = true;
+                       maybe_zonecut = false;
+                       at_zonecut = true;
                        /*
-                        * Do special zone cut handling, if requested.
+                        * It is not clear if KEY should still be
+                        * allowed at the parent side of the zone
+                        * cut or not.  It is needed for RFC3007
+                        * validated updates.
                         */
-                       if (maybe_zonecut && header->type == dns_rdatatype_ns) {
+                       if ((search.options & DNS_DBFIND_GLUEOK) == 0 &&
+                           type != dns_rdatatype_nsec &&
+                           type != dns_rdatatype_key)
+                       {
                                /*
-                                * We increment the reference count on node to
-                                * ensure that search->zonecut_header will
-                                * still be valid later.
+                                * Glue is not OK, but any answer we
+                                * could return would be glue.  Return
+                                * the delegation.
                                 */
-                               qpznode_acquire(search.qpdb,
-                                               node DNS__DB_FLARG_PASS);
-                               search.zonecut = node;
-                               search.zonecut_header = header;
-                               search.zonecut_sigheader = NULL;
-                               search.need_cleanup = true;
-                               maybe_zonecut = false;
-                               at_zonecut = true;
+                               found = NULL;
+                               break;
+                       }
+                       if (found != NULL && foundsig != NULL) {
+                               break;
+                       }
+               }
+
+               /*
+                * If the NSEC3 record doesn't match the chain
+                * we are using behave as if it isn't here.
+                */
+               if (header->type == dns_rdatatype_nsec3 &&
+                   !matchparams(header, &search))
+               {
+                       goto partial_match;
+               }
+               /*
+                * If we found a type we were looking for,
+                * remember it.
+                */
+               if (header->type == type || type == dns_rdatatype_any ||
+                   (header->type == dns_rdatatype_cname && cname_ok))
+               {
+                       /*
+                        * We've found the answer!
+                        */
+                       found = header;
+                       if (header->type == dns_rdatatype_cname && cname_ok) {
                                /*
-                                * It is not clear if KEY should still be
-                                * allowed at the parent side of the zone
-                                * cut or not.  It is needed for RFC3007
-                                * validated updates.
+                                * We may be finding a CNAME instead
+                                * of the desired type.
+                                *
+                                * If we've already got the CNAME RRSIG,
+                                * use it, otherwise change sigtype
+                                * so that we find it.
                                 */
-                               if ((search.options & DNS_DBFIND_GLUEOK) == 0 &&
-                                   type != dns_rdatatype_nsec &&
-                                   type != dns_rdatatype_key)
-                               {
-                                       /*
-                                        * Glue is not OK, but any answer we
-                                        * could return would be glue.  Return
-                                        * the delegation.
-                                        */
-                                       found = NULL;
-                                       break;
-                               }
-                               if (found != NULL && foundsig != NULL) {
-                                       break;
+                               if (cnamesig != NULL) {
+                                       foundsig = cnamesig;
+                               } else {
+                                       sigtype = DNS_SIGTYPE(
+                                               dns_rdatatype_cname);
                                }
                        }
-
                        /*
-                        * If the NSEC3 record doesn't match the chain
-                        * we are using behave as if it isn't here.
+                        * If we've got all we need, end the search.
                         */
-                       if (header->type == dns_rdatatype_nsec3 &&
-                           !matchparams(header, &search))
-                       {
-                               NODE_UNLOCK(nlock, &nlocktype);
-                               goto partial_match;
+                       if (!maybe_zonecut && foundsig != NULL) {
+                               break;
                        }
+               } else if (header->type == sigtype) {
                        /*
-                        * If we found a type we were looking for,
-                        * remember it.
+                        * We've found the RRSIG rdataset for our
+                        * target type.  Remember it.
                         */
-                       if (header->type == type || type == dns_rdatatype_any ||
-                           (header->type == dns_rdatatype_cname && cname_ok))
-                       {
-                               /*
-                                * We've found the answer!
-                                */
-                               found = header;
-                               if (header->type == dns_rdatatype_cname &&
-                                   cname_ok)
-                               {
-                                       /*
-                                        * We may be finding a CNAME instead
-                                        * of the desired type.
-                                        *
-                                        * If we've already got the CNAME RRSIG,
-                                        * use it, otherwise change sigtype
-                                        * so that we find it.
-                                        */
-                                       if (cnamesig != NULL) {
-                                               foundsig = cnamesig;
-                                       } else {
-                                               sigtype = DNS_SIGTYPE(
-                                                       dns_rdatatype_cname);
-                                       }
-                               }
-                               /*
-                                * If we've got all we need, end the search.
-                                */
-                               if (!maybe_zonecut && foundsig != NULL) {
-                                       break;
-                               }
-                       } else if (header->type == sigtype) {
-                               /*
-                                * We've found the RRSIG rdataset for our
-                                * target type.  Remember it.
-                                */
-                               foundsig = header;
-                               /*
-                                * If we've got all we need, end the search.
-                                */
-                               if (!maybe_zonecut && found != NULL) {
-                                       break;
-                               }
-                       } else if (header->type == dns_rdatatype_nsec &&
-                                  !search.version->havensec3)
-                       {
-                               /*
-                                * Remember a NSEC rdataset even if we're
-                                * not specifically looking for it, because
-                                * we might need it later.
-                                */
-                               nsecheader = header;
-                       } else if (header->type ==
-                                          DNS_SIGTYPE(dns_rdatatype_nsec) &&
-                                  !search.version->havensec3)
-                       {
-                               /*
-                                * If we need the NSEC rdataset, we'll also
-                                * need its signature.
-                                */
-                               nsecsig = header;
-                       } else if (cname_ok &&
-                                  header->type ==
-                                          DNS_SIGTYPE(dns_rdatatype_cname))
-                       {
-                               /*
-                                * If we get a CNAME match, we'll also need
-                                * its signature.
-                                */
-                               cnamesig = header;
+                       foundsig = header;
+                       /*
+                        * If we've got all we need, end the search.
+                        */
+                       if (!maybe_zonecut && found != NULL) {
+                               break;
                        }
+               } else if (header->type == dns_rdatatype_nsec &&
+                          !search.version->havensec3)
+               {
+                       /*
+                        * Remember a NSEC rdataset even if we're
+                        * not specifically looking for it, because
+                        * we might need it later.
+                        */
+                       nsecheader = header;
+               } else if (header->type == DNS_SIGTYPE(dns_rdatatype_nsec) &&
+                          !search.version->havensec3)
+               {
+                       /*
+                        * If we need the NSEC rdataset, we'll also
+                        * need its signature.
+                        */
+                       nsecsig = header;
+               } else if (cname_ok &&
+                          header->type == DNS_SIGTYPE(dns_rdatatype_cname))
+               {
+                       /*
+                        * If we get a CNAME match, we'll also need
+                        * its signature.
+                        */
+                       cnamesig = header;
                }
        }
 
@@ -3777,7 +3561,6 @@ found:
                if (!wild) {
                        unsigned int len = search.chain.len - 1;
                        if (len > 0) {
-                               NODE_UNLOCK(nlock, &nlocktype);
                                dns_qpchain_node(&search.chain, len - 1, NULL,
                                                 (void **)&node, NULL);
                                dns_name_copy(&node->name, foundname);
@@ -3797,7 +3580,6 @@ found:
                         *
                         * Return the delegation.
                         */
-                       NODE_UNLOCK(nlock, &nlocktype);
                        result = qpzone_setup_delegation(
                                &search, nodep, foundname, rdataset,
                                sigrdataset DNS__DB_FLARG_PASS);
@@ -3816,10 +3598,9 @@ found:
                         */
                        if (!wild) {
                                result = DNS_R_BADDB;
-                               goto node_exit;
+                               goto tree_exit;
                        }
 
-                       NODE_UNLOCK(nlock, &nlocktype);
                        result = find_closest_nsec(
                                &search, nodep, foundname, rdataset,
                                sigrdataset, false,
@@ -3844,7 +3625,7 @@ found:
                if (wild) {
                        foundname->attributes.wildcard = true;
                }
-               goto node_exit;
+               goto tree_exit;
        }
 
        /*
@@ -3914,10 +3695,9 @@ found:
                foundname->attributes.wildcard = true;
        }
 
-node_exit:
-       NODE_UNLOCK(nlock, &nlocktype);
-
 tree_exit:
+       rcu_read_unlock();
+
        if (nsec3) {
                dns_qpread_destroy(qpdb->nsec3, &search.qpr);
        } else {
@@ -3931,12 +3711,7 @@ tree_exit:
        if (search.need_cleanup) {
                node = search.zonecut;
                INSIST(node != NULL);
-               nlock = &search.qpdb->buckets[node->locknum].lock;
-
-               NODE_RDLOCK(nlock, &nlocktype);
-               qpznode_release(search.qpdb, node, 0,
-                               &nlocktype DNS__DB_FLARG_PASS);
-               NODE_UNLOCK(nlock, &nlocktype);
+               qpznode_release(search.qpdb, node, 0 DNS__DB_FLARG_PASS);
        }
 
        if (close_version) {
@@ -3977,6 +3752,22 @@ qpzone_allrdatasets(dns_db_t *db, dns_dbnode_t *dbnode,
 
        qpznode_acquire(qpdb, node DNS__DB_FLARG_PASS);
 
+       /*
+        * Clean the dirty node when acquiring the reference.  This is
+        * required because 'clean_zone_node()' might be running at the
+        * same time as acquiring the new reference and this would lead
+        * to an inconsistent state later.
+        *
+        * Thus either the qpznode_acquire() or qpznode_release() will
+        * clean the dirty node.
+        */
+       if (CMM_LOAD_SHARED(node->dirty)) {
+               isc_spinlock_lock(&node->spinlock);
+               uint32_t least_serial = CMM_LOAD_SHARED(qpdb->least_serial);
+               clean_zone_node(node, least_serial);
+               isc_spinlock_unlock(&node->spinlock);
+       }
+
        *iteratorp = (dns_rdatasetiter_t *)iterator;
        return ISC_R_SUCCESS;
 }
@@ -3997,17 +3788,12 @@ qpzone_attachnode(dns_db_t *db, dns_dbnode_t *source,
 
 static void
 qpzone_detachnode(dns_db_t *db, dns_dbnode_t **nodep DNS__DB_FLARG) {
-       qpzonedb_t *qpdb = (qpzonedb_t *)db;
-       qpznode_t *node = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
-
-       REQUIRE(VALID_QPZONE(qpdb));
+       REQUIRE(VALID_QPZONE((qpzonedb_t *)db));
        REQUIRE(nodep != NULL && *nodep != NULL);
 
-       node = (qpznode_t *)(*nodep);
+       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+       qpznode_t *node = (qpznode_t *)(*nodep);
        *nodep = NULL;
-       nlock = &qpdb->buckets[node->locknum].lock;
 
        /*
         * qpzone_destroy() uses call_rcu() API to destroy the node locks, so it
@@ -4019,9 +3805,7 @@ qpzone_detachnode(dns_db_t *db, dns_dbnode_t **nodep DNS__DB_FLARG) {
        qpzonedb_ref(qpdb);
 
        rcu_read_lock();
-       NODE_RDLOCK(nlock, &nlocktype);
-       qpznode_release(qpdb, node, 0, &nlocktype DNS__DB_FLARG_PASS);
-       NODE_UNLOCK(nlock, &nlocktype);
+       qpznode_release(qpdb, node, 0 DNS__DB_FLARG_PASS);
        rcu_read_unlock();
 
        qpzonedb_unref(qpdb);
@@ -4087,19 +3871,19 @@ getoriginnode(dns_db_t *db, dns_dbnode_t **nodep DNS__DB_FLARG) {
 }
 
 static void
-locknode(dns_db_t *db, dns_dbnode_t *dbnode, isc_rwlocktype_t type) {
-       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+locknode(dns_db_t *db ISC_ATTR_UNUSED, dns_dbnode_t *dbnode,
+        isc_rwlocktype_t type ISC_ATTR_UNUSED) {
        qpznode_t *node = (qpznode_t *)dbnode;
 
-       RWLOCK(&qpdb->buckets[node->locknum].lock, type);
+       SPINLOCK(&node->spinlock);
 }
 
 static void
-unlocknode(dns_db_t *db, dns_dbnode_t *dbnode, isc_rwlocktype_t type) {
-       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+unlocknode(dns_db_t *db ISC_ATTR_UNUSED, dns_dbnode_t *dbnode,
+          isc_rwlocktype_t type ISC_ATTR_UNUSED) {
        qpznode_t *node = (qpznode_t *)dbnode;
 
-       RWUNLOCK(&qpdb->buckets[node->locknum].lock, type);
+       SPINUNLOCK(&node->spinlock);
 }
 
 static void
@@ -4122,9 +3906,8 @@ deletedata(dns_db_t *db ISC_ATTR_UNUSED, dns_dbnode_t *node ISC_ATTR_UNUSED,
 
 static void
 rdatasetiter_destroy(dns_rdatasetiter_t **iteratorp DNS__DB_FLARG) {
-       qpdb_rdatasetiter_t *qrditer = NULL;
-
-       qrditer = (qpdb_rdatasetiter_t *)(*iteratorp);
+       qpdb_rdatasetiter_t *qrditer = (qpdb_rdatasetiter_t *)(*iteratorp);
+       *iteratorp = NULL;
 
        if (qrditer->common.version != NULL) {
                closeversion(qrditer->common.db, &qrditer->common.version,
@@ -4133,109 +3916,83 @@ rdatasetiter_destroy(dns_rdatasetiter_t **iteratorp DNS__DB_FLARG) {
        dns__db_detachnode(qrditer->common.db,
                           &qrditer->common.node DNS__DB_FLARG_PASS);
        isc_mem_put(qrditer->common.db->mctx, qrditer, sizeof(*qrditer));
-
-       *iteratorp = NULL;
 }
 
 static isc_result_t
 rdatasetiter_first(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
        qpdb_rdatasetiter_t *qrditer = (qpdb_rdatasetiter_t *)iterator;
-       qpzonedb_t *qpdb = (qpzonedb_t *)(qrditer->common.db);
        qpznode_t *node = (qpznode_t *)qrditer->common.node;
        qpz_version_t *version = (qpz_version_t *)qrditer->common.version;
-       dns_slabheader_t *header = NULL, *top_next = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = &qpdb->buckets[node->locknum].lock;
+       dns_slabheader_list_t *hcurrent = NULL;
 
-       NODE_RDLOCK(nlock, &nlocktype);
-
-       for (header = node->data; header != NULL; header = top_next) {
-               top_next = header->next;
-               do {
-                       if (header->serial <= version->serial &&
-                           !IGNORE(header))
+       rcu_read_lock();
+       cds_list_for_each_entry_rcu (hcurrent, &node->headers, nnode) {
+               dns_slabheader_t *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hcurrent->versions,
+                                            dnode)
+               {
+                       if (current->serial <= version->serial &&
+                           !IGNORE(current) && !NONEXISTENT(current))
                        {
-                               if (NONEXISTENT(header)) {
-                                       header = NULL;
-                               }
-                               break;
-                       } else {
-                               header = header->down;
+                               qrditer->hlist = hcurrent;
+                               qrditer->header = current;
+                               rcu_read_unlock();
+                               return ISC_R_SUCCESS;
                        }
-               } while (header != NULL);
-               if (header != NULL) {
-                       break;
                }
        }
+       rcu_read_unlock();
 
-       NODE_UNLOCK(nlock, &nlocktype);
-
-       qrditer->current = header;
-
-       if (header == NULL) {
-               return ISC_R_NOMORE;
-       }
-
-       return ISC_R_SUCCESS;
+       return ISC_R_NOMORE;
 }
 
 static isc_result_t
 rdatasetiter_next(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
        qpdb_rdatasetiter_t *qrditer = (qpdb_rdatasetiter_t *)iterator;
-       qpzonedb_t *qpdb = (qpzonedb_t *)(qrditer->common.db);
        qpznode_t *node = (qpznode_t *)qrditer->common.node;
        qpz_version_t *version = (qpz_version_t *)qrditer->common.version;
-       dns_slabheader_t *header = NULL;
-       dns_slabheader_t *topheader, *topheader_next = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = &qpdb->buckets[node->locknum].lock;
+       dns_slabheader_list_t *hcurrent = NULL;
 
-       header = qrditer->current;
-       if (header == NULL) {
+       if (qrditer->hlist == NULL || qrditer->header == NULL) {
                return ISC_R_NOMORE;
        }
 
-       NODE_RDLOCK(nlock, &nlocktype);
-
        /*
         * Find the start of the header chain for the next type.
         */
-       topheader = dns_slabheader_top(header);
 
-       for (header = topheader->next; header != NULL; header = topheader_next)
+       /*
+        * NOTE: No header will be removed during iterator as we hold to node
+        * reference, so it should be safe to just enter critical section while
+        * we are reading the list.  This of course means that any new header
+        * that gets inserted while iterating will show up here.  That said -
+        * any new inserted header will have larger serial number.
+        */
+       rcu_read_lock();
+       for (hcurrent =
+                    cds_list_entry(rcu_dereference(qrditer->hlist->nnode.next),
+                                   dns_slabheader_list_t, nnode);
+            &hcurrent->nnode != &node->headers;
+            hcurrent = cds_list_entry(rcu_dereference(hcurrent->nnode.next),
+                                      dns_slabheader_list_t, nnode))
        {
-               topheader_next = header->next;
-               do {
-                       if (header->serial <= version->serial &&
-                           !IGNORE(header))
+               dns_slabheader_t *current = NULL;
+               cds_list_for_each_entry_rcu (current, &hcurrent->versions,
+                                            dnode)
+               {
+                       if (current->serial <= version->serial &&
+                           !IGNORE(current) && !NONEXISTENT(current))
                        {
-                               if (NONEXISTENT(header)) {
-                                       header = NULL;
-                               }
-                               break;
-                       } else {
-                               header = header->down;
+                               qrditer->hlist = hcurrent;
+                               qrditer->header = current;
+                               rcu_read_unlock();
+                               return ISC_R_SUCCESS;
                        }
-               } while (header != NULL);
-               if (header != NULL) {
-                       break;
                }
-
-               /*
-                * Find the start of the header chain for the next type.
-                */
-               topheader = topheader->next;
-       }
-
-       NODE_UNLOCK(nlock, &nlocktype);
-
-       qrditer->current = header;
-
-       if (header == NULL) {
-               return ISC_R_NOMORE;
        }
+       rcu_read_unlock();
 
-       return ISC_R_SUCCESS;
+       return ISC_R_NOMORE;
 }
 
 static void
@@ -4245,17 +4002,15 @@ rdatasetiter_current(dns_rdatasetiter_t *iterator,
        qpzonedb_t *qpdb = (qpzonedb_t *)(qrditer->common.db);
        qpznode_t *node = (qpznode_t *)qrditer->common.node;
        dns_slabheader_t *header = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = &qpdb->buckets[node->locknum].lock;
 
-       header = qrditer->current;
+       header = qrditer->header;
        REQUIRE(header != NULL);
 
-       NODE_RDLOCK(nlock, &nlocktype);
-
+       /*
+        * The node is reference counted, so no header will be removed
+        * when any iterator is active.
+        */
        bindrdataset(qpdb, node, header, rdataset DNS__DB_FLARG_PASS);
-
-       NODE_UNLOCK(nlock, &nlocktype);
 }
 
 /*
@@ -4277,19 +4032,13 @@ static void
 dereference_iter_node(qpdb_dbiterator_t *iter DNS__DB_FLARG) {
        qpzonedb_t *qpdb = (qpzonedb_t *)iter->common.db;
        qpznode_t *node = iter->node;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        if (node == NULL) {
                return;
        }
 
        iter->node = NULL;
-       nlock = &qpdb->buckets[node->locknum].lock;
-
-       NODE_RDLOCK(nlock, &nlocktype);
-       qpznode_release(qpdb, node, 0, &nlocktype DNS__DB_FLARG_PASS);
-       NODE_UNLOCK(nlock, &nlocktype);
+       qpznode_release(qpdb, node, 0 DNS__DB_FLARG_PASS);
 }
 
 static void
@@ -4707,10 +4456,8 @@ qpzone_addrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
        qpz_version_t *version = (qpz_version_t *)dbversion;
        isc_region_t region;
        dns_slabheader_t *newheader = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
-       dns_fixedname_t fn;
-       dns_name_t *name = dns_fixedname_initname(&fn);
+       dns_fixedname_t fname;
+       dns_name_t *name = dns_fixedname_initname(&fname);
        dns_qp_t *nsec = NULL;
 
        REQUIRE(VALID_QPZONE(qpdb));
@@ -4777,13 +4524,7 @@ qpzone_addrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
         * tree hold an exclusive lock on the tree.  In the latter case the
         * lock does not necessarily have to be acquired but it will help
         * purge ancient entries more effectively.
-        *
-        * (Note: node lock must be acquired after starting
-        * the QPDB transaction and released before committing.)
         */
-       nlock = &qpdb->buckets[node->locknum].lock;
-
-       NODE_WRLOCK(nlock, &nlocktype);
 
        result = ISC_R_SUCCESS;
        if (nsec != NULL) {
@@ -4794,15 +4535,20 @@ qpzone_addrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                 * so we can detach the new one we created and
                 * move on.
                 */
-               qpznode_t *nsecnode = new_qpznode(qpdb, name);
+               qpznode_t *nsecnode = NULL;
+               qpznode_create(qpdb, name, &nsecnode);
                nsecnode->nsec = DNS_DB_NSEC_NSEC;
                (void)dns_qp_insert(nsec, nsecnode, 0);
                qpznode_detach(&nsecnode);
        }
 
        if (result == ISC_R_SUCCESS) {
+               isc_spinlock_lock(&node->spinlock);
+               rcu_read_lock();
                result = add(qpdb, node, name, version, newheader, options,
                             false, addedrdataset, 0 DNS__DB_FLARG_PASS);
+               rcu_read_unlock();
+               isc_spinlock_unlock(&node->spinlock);
        }
 
        /*
@@ -4812,11 +4558,9 @@ qpzone_addrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
        if (result == ISC_R_SUCCESS &&
            delegating_type(qpdb, node, rdataset->type))
        {
-               node->delegating = true;
+               CMM_STORE_SHARED(node->delegating, true);
        }
 
-       NODE_UNLOCK(nlock, &nlocktype);
-
        if (nsec != NULL) {
                dns_qpmulti_commit(qpdb->nsec, &nsec);
        }
@@ -4834,14 +4578,13 @@ qpzone_subtractrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
        qpz_version_t *version = (qpz_version_t *)dbversion;
        dns_fixedname_t fname;
        dns_name_t *nodename = dns_fixedname_initname(&fname);
-       dns_slabheader_t *topheader = NULL, *topheader_prev = NULL;
-       dns_slabheader_t *header = NULL, *newheader = NULL;
+       dns_slabheader_list_t *hlist = NULL, *hcurrent = NULL;
+       dns_slabheader_t *header = NULL, *current = NULL;
+       dns_slabheader_t *newheader = NULL;
        dns_slabheader_t *subresult = NULL;
        isc_region_t region;
        isc_result_t result;
        qpz_changed_t *changed = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock;
 
        REQUIRE(VALID_QPZONE(qpdb));
        REQUIRE(version != NULL && version->qpdb == qpdb);
@@ -4875,27 +4618,30 @@ qpzone_subtractrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                newheader->resign_lsb = rdataset->resign & 0x1;
        }
 
-       nlock = &qpdb->buckets[node->locknum].lock;
-       NODE_WRLOCK(nlock, &nlocktype);
+       isc_spinlock_lock(&node->spinlock);
+       rcu_read_lock();
 
        changed = add_changed(newheader, version DNS__DB_FLARG_PASS);
-       for (topheader = node->data; topheader != NULL;
-            topheader = topheader->next)
-       {
-               if (topheader->type == newheader->type) {
+       cds_list_for_each_entry_rcu (hcurrent, &node->headers, nnode) {
+               if (hcurrent->type == newheader->type) {
+                       hlist = hcurrent;
                        break;
                }
-               topheader_prev = topheader;
        }
        /*
         * If header isn't NULL, we've found the right type.  There may be
         * IGNORE rdatasets between the top of the chain and the first real
         * data.  We skip over them.
         */
-       header = topheader;
-       while (header != NULL && IGNORE(header)) {
-               header = header->down;
+       if (hlist != NULL) {
+               cds_list_for_each_entry_rcu (current, &hlist->versions, dnode) {
+                       if (!IGNORE(current)) {
+                               header = current;
+                               break;
+                       }
+               }
        }
+
        if (header != NULL && !NONEXISTENT(header)) {
                unsigned int flags = 0;
                subresult = NULL;
@@ -4948,7 +4694,7 @@ qpzone_subtractrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                        newheader = dns_slabheader_new((dns_db_t *)qpdb,
                                                       (dns_dbnode_t *)node);
                        newheader->ttl = 0;
-                       newheader->type = topheader->type;
+                       newheader->type = hlist->type;
                        atomic_init(&newheader->attributes,
                                    DNS_SLABHEADERATTR_NONEXISTENT);
                        newheader->serial = version->serial;
@@ -4961,19 +4707,17 @@ qpzone_subtractrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
                 * If we're here, we want to link newheader in front of
                 * topheader.
                 */
-               INSIST(version->serial >= topheader->serial);
+               INSIST(version->serial >= header->serial);
                maybe_update_recordsandsize(false, version, header,
                                            nodename->length);
-               if (topheader_prev != NULL) {
-                       topheader_prev->next = newheader;
-               } else {
-                       node->data = newheader;
+
+               cds_list_add_rcu(&newheader->dnode, &hlist->versions);
+
+               CMM_STORE_SHARED(node->dirty, true);
+               if (changed != NULL) {
+                       changed->dirty = true;
                }
-               newheader->next = topheader->next;
-               newheader->down = topheader;
-               topheader->next = newheader;
-               node->dirty = true;
-               changed->dirty = true;
+
                resigndelete(qpdb, version, header DNS__DB_FLARG_PASS);
        } else {
                /*
@@ -5001,7 +4745,9 @@ qpzone_subtractrdataset(dns_db_t *db, dns_dbnode_t *dbnode,
        }
 
 unlock:
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
+       isc_spinlock_unlock(&node->spinlock);
+
        return result;
 }
 
@@ -5016,8 +4762,6 @@ qpzone_deleterdataset(dns_db_t *db, dns_dbnode_t *dbnode,
        dns_name_t *nodename = dns_fixedname_initname(&fname);
        isc_result_t result;
        dns_slabheader_t *newheader = NULL;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        REQUIRE(VALID_QPZONE(qpdb));
        REQUIRE(version != NULL && version->qpdb == qpdb);
@@ -5037,11 +4781,12 @@ qpzone_deleterdataset(dns_db_t *db, dns_dbnode_t *dbnode,
 
        dns_name_copy(&node->name, nodename);
 
-       nlock = &qpdb->buckets[node->locknum].lock;
-       NODE_WRLOCK(nlock, &nlocktype);
+       isc_spinlock_lock(&node->spinlock);
+       rcu_read_lock();
        result = add(qpdb, node, nodename, version, newheader, DNS_DBADD_FORCE,
                     false, NULL, 0 DNS__DB_FLARG_PASS);
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
+       isc_spinlock_unlock(&node->spinlock);
        return result;
 }
 
@@ -5049,18 +4794,15 @@ static isc_result_t
 nodefullname(dns_db_t *db, dns_dbnode_t *node, dns_name_t *name) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
        qpznode_t *qpnode = (qpznode_t *)node;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
-       isc_rwlock_t *nlock = NULL;
 
        REQUIRE(VALID_QPZONE(qpdb));
        REQUIRE(node != NULL);
        REQUIRE(name != NULL);
 
-       nlock = &qpdb->buckets[qpnode->locknum].lock;
-
-       NODE_RDLOCK(nlock, &nlocktype);
+       /* FIXME: is RCU read lock enough? */
+       rcu_read_lock();
        dns_name_copy(&qpnode->name, name);
-       NODE_UNLOCK(nlock, &nlocktype);
+       rcu_read_unlock();
 
        return ISC_R_SUCCESS;
 }
@@ -5407,7 +5149,7 @@ setmaxtypepername(dns_db_t *db, uint32_t value) {
 }
 
 static dns_dbmethods_t qpdb_zonemethods = {
-       .destroy = qpdb_destroy,
+       .destroy = qpzonedb_destroy,
        .beginload = beginload,
        .endload = endload,
        .currentversion = currentversion,
@@ -5444,36 +5186,44 @@ static dns_dbmethods_t qpdb_zonemethods = {
 };
 
 static void
-destroy_qpznode(qpznode_t *node) {
-       dns_slabheader_t *current = NULL, *next = NULL;
+qpznode_destroy_rcu(struct rcu_head *rcu_head) {
+       qpznode_t *node = caa_container_of(rcu_head, qpznode_t, rcu_head);
 
-       for (current = node->data; current != NULL; current = next) {
-               dns_slabheader_t *down = current->down, *down_next = NULL;
+       isc_spinlock_destroy(&node->spinlock);
+       dns_name_free(&node->name, node->mctx);
+       isc_mem_putanddetach(&node->mctx, node, sizeof(qpznode_t));
+}
 
-               next = current->next;
+static void
+qpznode_destroy(qpznode_t *node) {
+       dns_slabheader_list_t *hlist = NULL, *hnext = NULL;
+       dns_slabheader_t *header = NULL, *next = NULL;
 
-               for (down = current->down; down != NULL; down = down_next) {
-                       down_next = down->down;
-                       dns_slabheader_destroy(&down);
+       cds_list_for_each_entry_safe (hlist, hnext, &node->headers, nnode) {
+               cds_list_for_each_entry_safe (header, next, &hlist->versions,
+                                             dnode)
+               {
+                       cds_list_del_rcu(&header->dnode);
+                       dns_slabheader_destroy(&header);
                }
 
-               dns_slabheader_destroy(&current);
+               cds_list_del_rcu(&hlist->nnode);
+               dns_slabheader_destroylist(&hlist);
        }
 
-       dns_name_free(&node->name, node->mctx);
-       isc_mem_putanddetach(&node->mctx, node, sizeof(qpznode_t));
+       call_rcu(&node->rcu_head, qpznode_destroy_rcu);
 }
 
 #if DNS_DB_NODETRACE
-ISC_REFCOUNT_STATIC_TRACE_IMPL(qpznode, destroy_qpznode);
+ISC_REFCOUNT_STATIC_TRACE_IMPL(qpznode, qpznode_destroy);
 #else
-ISC_REFCOUNT_STATIC_IMPL(qpznode, destroy_qpznode);
+ISC_REFCOUNT_STATIC_IMPL(qpznode, qpznode_destroy);
 #endif
 
 #ifdef DNS_DB_NODETRACE
-ISC_REFCOUNT_STATIC_TRACE_IMPL(qpzonedb, qpzone_destroy);
+ISC_REFCOUNT_STATIC_TRACE_IMPL(qpzonedb, qpzonedb__destroy);
 #else
-ISC_REFCOUNT_STATIC_IMPL(qpzonedb, qpzone_destroy);
+ISC_REFCOUNT_STATIC_IMPL(qpzonedb, qpzonedb__destroy);
 #endif
 
 static void
index 1aaafd0ba20a6f7e2d18fd1b458896ccdf66162e..421057d39e73d687bc6ea7d39f14a989e6889b3d 100644 (file)
@@ -351,6 +351,7 @@ dns_rdataslab_fromrdataset(dns_rdataset_t *rdataset, isc_mem_t *mctx,
                        .trust = rdataset->trust,
                        .ttl = rdataset->ttl,
                        .link = ISC_LINK_INITIALIZER,
+                       .dnode = CDS_LIST_HEAD_INIT(new->dnode),
                };
        }
 
@@ -881,14 +882,24 @@ dns_slabheader_new(dns_db_t *db, dns_dbnode_t *node) {
        h = isc_mem_get(db->mctx, sizeof(*h));
        *h = (dns_slabheader_t){
                .link = ISC_LINK_INITIALIZER,
+               .dnode = CDS_LIST_HEAD_INIT(h->dnode),
        };
        dns_slabheader_reset(h, db, node);
        return h;
 }
 
+static void
+dns__slabheader_destroy_rcu(struct rcu_head *rcu_head) {
+       dns_slabheader_t *header = caa_container_of(rcu_head, dns_slabheader_t,
+                                                   rcu_head);
+       size_t size = NONEXISTENT(header) ? sizeof(*header)
+                                         : dns_rdataslab_size(header);
+
+       isc_mem_putanddetach(&header->mctx, header, size);
+}
+
 void
 dns_slabheader_destroy(dns_slabheader_t **headerp) {
-       unsigned int size;
        dns_slabheader_t *header = *headerp;
 
        *headerp = NULL;
@@ -897,13 +908,16 @@ dns_slabheader_destroy(dns_slabheader_t **headerp) {
 
        dns_db_deletedata(header->db, header->node, header);
 
-       if (NONEXISTENT(header)) {
-               size = sizeof(*header);
+       header->mctx = NULL;
+
+       /* FIXME: bleh, this is so ugly */
+       isc_mem_attach(mctx, &header->mctx);
+
+       if (rcu_read_ongoing()) {
+               call_rcu(&header->rcu_head, dns__slabheader_destroy_rcu);
        } else {
-               size = dns_rdataslab_size(header);
+               dns__slabheader_destroy_rcu(&header->rcu_head);
        }
-
-       isc_mem_put(mctx, header, size);
 }
 
 void
@@ -1265,3 +1279,31 @@ rdataset_equals(const dns_rdataset_t *rdataset1,
        return dns_rdataslab_equalx(header1, header2, rdataset1->rdclass,
                                    rdataset2->type);
 }
+
+void
+dns_slabheader_createlist(isc_mem_t *mctx, dns_slabheader_list_t **headp) {
+       dns_slabheader_list_t *head = isc_mem_get(mctx, sizeof(*head));
+       *head = (dns_slabheader_list_t){
+               .nnode = CDS_LIST_HEAD_INIT(head->nnode),
+               .versions = CDS_LIST_HEAD_INIT(head->versions),
+       };
+
+       isc_mem_attach(mctx, &head->mctx);
+
+       *headp = head;
+}
+
+static void
+dns__slabheader_destroylist_rcu(struct rcu_head *rcu_head) {
+       dns_slabheader_list_t *head =
+               caa_container_of(rcu_head, dns_slabheader_list_t, rcu_head);
+       isc_mem_putanddetach(&head->mctx, head, sizeof(*head));
+}
+
+void
+dns_slabheader_destroylist(dns_slabheader_list_t **headp) {
+       dns_slabheader_list_t *head = *headp;
+       *headp = NULL;
+
+       call_rcu(&head->rcu_head, dns__slabheader_destroylist_rcu);
+}
index cf62934632be1d50d35c8fac30f13f955f38ac71..285e84a69edbf56b70dd7641fcc0d3abdce956f4 100644 (file)
 
 #endif /* !defined(caa_container_of_check_null) */
 /* clang-format on */
+
+static inline int
+cds_list_empty_rcu(const struct cds_list_head *head) {
+       return head == rcu_dereference(head->next);
+}
index 7e716bf024d6710ea95c31316964124a43feff36..7beb61b322aaeb45af3963717f6d2d95bff4fdf0 100644 (file)
@@ -103,14 +103,13 @@ const char *ownercase_vectors[12][2] = {
 static bool
 ownercase_test_one(const char *str1, const char *str2) {
        isc_result_t result;
-       uint8_t qpdb_s[sizeof(qpzonedb_t) + sizeof(qpzone_bucket_t)];
-       qpzonedb_t *qpdb = (qpzonedb_t *)&qpdb_s;
+       qpzonedb_t qpdb_s;
+       qpzonedb_t *qpdb = &qpdb_s;
        *qpdb = (qpzonedb_t){
                .common.methods = &qpdb_zonemethods,
                .common.mctx = mctx,
-               .buckets_count = 1,
        };
-       qpznode_t node = { .locknum = 0 };
+       qpznode_t node = { 0 };
        dns_slabheader_t header = {
                .node = (dns_dbnode_t *)&node,
                .db = (dns_db_t *)qpdb,
@@ -130,7 +129,7 @@ ownercase_test_one(const char *str1, const char *str2) {
        dns_name_t *name2 = dns_fixedname_initname(&fname2);
 
        /* Minimal initialization of the mock objects */
-       NODE_INITLOCK(&qpdb->buckets[0].lock);
+       isc_spinlock_init(&node.spinlock);
 
        isc_buffer_constinit(&b, str1, strlen(str1));
        isc_buffer_add(&b, strlen(str1));
@@ -150,7 +149,7 @@ ownercase_test_one(const char *str1, const char *str2) {
        /* Retrieve the case to name2 */
        dns_rdataset_getownercase(&rdataset, name2);
 
-       NODE_DESTROYLOCK(&qpdb->buckets[0].lock);
+       isc_spinlock_destroy(&node.spinlock);
 
        return dns_name_caseequal(name1, name2);
 }
@@ -171,14 +170,13 @@ ISC_RUN_TEST_IMPL(ownercase) {
 
 ISC_RUN_TEST_IMPL(setownercase) {
        isc_result_t result;
-       uint8_t qpdb_s[sizeof(qpzonedb_t) + sizeof(qpzone_bucket_t)];
+       qpzonedb_t qpdb_s;
        qpzonedb_t *qpdb = (qpzonedb_t *)&qpdb_s;
        *qpdb = (qpzonedb_t){
                .common.methods = &qpdb_zonemethods,
                .common.mctx = mctx,
-               .buckets_count = 1,
        };
-       qpznode_t node = { .locknum = 0 };
+       qpznode_t node = { 0 };
        dns_slabheader_t header = {
                .node = (dns_dbnode_t *)&node,
                .db = (dns_db_t *)qpdb,
@@ -202,7 +200,7 @@ ISC_RUN_TEST_IMPL(setownercase) {
        UNUSED(state);
 
        /* Minimal initialization of the mock objects */
-       NODE_INITLOCK(&qpdb->buckets[0].lock);
+       isc_spinlock_init(&node.spinlock);
 
        isc_buffer_constinit(&b, str1, strlen(str1));
        isc_buffer_add(&b, strlen(str1));
@@ -219,7 +217,7 @@ ISC_RUN_TEST_IMPL(setownercase) {
        /* Retrieve the case to name2 */
        dns_rdataset_getownercase(&rdataset, name2);
 
-       NODE_DESTROYLOCK(&qpdb->buckets[0].lock);
+       isc_spinlock_destroy(&node.spinlock);
 
        assert_true(dns_name_caseequal(name1, name2));
 }