]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
allow updating of records in a qpzone database
authorEvan Hunt <each@isc.org>
Sun, 19 Nov 2023 08:00:49 +0000 (00:00 -0800)
committerEvan Hunt <each@isc.org>
Fri, 8 Mar 2024 23:36:56 +0000 (15:36 -0800)
add database API methods needed to apply updates to an existing zone
database (newversion, addrdataset, subtractrdataset and deleterdataset).

it is now possible to apply journals to zone databases after loading, so
named-checkzone -J works correctly.

lib/dns/qpzone.c

index 5c73b4c9f4d65689a0ae8a19dea579c0e321071d..2740d3ad6a33d818e1db31a233a408318f6e6a9f 100644 (file)
@@ -81,9 +81,9 @@
 #define OPTOUT(header)                                 \
        ((atomic_load_acquire(&(header)->attributes) & \
          DNS_SLABHEADERATTR_OPTOUT) != 0)
-#define NEGATIVE(header)                               \
+#define STATCOUNT(header)                              \
        ((atomic_load_acquire(&(header)->attributes) & \
-         DNS_SLABHEADERATTR_NEGATIVE) != 0)
+         DNS_SLABHEADERATTR_STATCOUNT) != 0)
 
 #define HEADERNODE(h) ((qpdata_t *)((h)->node))
 
@@ -174,9 +174,7 @@ struct qpzonedb {
        db_nodelock_t *node_locks;
        qpdata_t *origin;
        qpdata_t *nsec3_origin;
-       dns_stats_t *rrsetstats;     /* cache DB only */
-       isc_stats_t *cachestats;     /* cache DB only */
-       isc_stats_t *gluecachestats; /* zone DB only */
+       isc_stats_t *gluecachestats;
        /* Locked by lock. */
        unsigned int active;
        unsigned int attributes;
@@ -326,6 +324,13 @@ typedef struct qpdb_dbiterator {
        enum { full, nonsec3, nsec3only } nsec3mode;
 } qpdb_dbiterator_t;
 
+/*%
+ * 'init_count' is used to initialize 'newheader->count' which inturn
+ * is used to determine where in the cycle rrset-order cyclic starts.
+ * We don't lock this as we don't care about simultaneous updates.
+ */
+static atomic_uint_fast16_t init_count = 0;
+
 static bool
 prio_type(dns_typepair_t type) {
        switch (type) {
@@ -357,8 +362,6 @@ prio_type(dns_typepair_t type) {
  * If a routine is going to lock more than one lock in this module, then
  * the locking must be done in the following order:
  *
- *      Tree Lock
- *
  *      Node Lock       (Only one from the set may be locked at one time by
  *                       any caller)
  *
@@ -473,12 +476,6 @@ free_db_rcu(struct rcu_head *rcu_head) {
                             qpdb->node_lock_count, sizeof(isc_heap_t *));
        }
 
-       if (qpdb->rrsetstats != NULL) {
-               dns_stats_detach(&qpdb->rrsetstats);
-       }
-       if (qpdb->cachestats != NULL) {
-               isc_stats_detach(&qpdb->cachestats);
-       }
        if (qpdb->gluecachestats != NULL) {
                isc_stats_detach(&qpdb->gluecachestats);
        }
@@ -635,7 +632,7 @@ allocate_version(isc_mem_t *mctx, uint32_t serial, unsigned int references,
        };
 
        cds_wfs_init(&version->glue_stack);
-
+       isc_rwlock_init(&version->rwlock);
        isc_refcount_init(&version->references, references);
 
        return (version);
@@ -650,8 +647,6 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
        isc_result_t result;
        dns_qp_t *qp = NULL;
 
-       REQUIRE(type != dns_dbtype_cache);
-
        qpdb = isc_mem_get(mctx, sizeof(*qpdb));
        *qpdb = (qpzonedb_t){
                .common.origin = DNS_NAME_INITEMPTY,
@@ -765,7 +760,6 @@ dns__qpzone_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
         */
        qpdb->current_version = allocate_version(mctx, 1, 1, false);
        qpdb->current_version->qpdb = qpdb;
-       isc_rwlock_init(&qpdb->current_version->rwlock);
 
        /*
         * Keep the current version in the open list so that list operation
@@ -1044,9 +1038,6 @@ bindrdataset(qpzonedb_t *qpdb, qpdata_t *node, dns_slabheader_t *header,
        rdataset->ttl = header->ttl - now;
        rdataset->trust = header->trust;
 
-       if (NEGATIVE(header)) {
-               rdataset->attributes |= DNS_RDATASETATTR_NEGATIVE;
-       }
        if (OPTOUT(header)) {
                rdataset->attributes |= DNS_RDATASETATTR_OPTOUT;
        }
@@ -1082,6 +1073,171 @@ bindrdataset(qpzonedb_t *qpdb, qpdata_t *node, dns_slabheader_t *header,
        }
 }
 
+static void
+setnsec3parameters(dns_db_t *db, qpdb_version_t *version) {
+       qpdata_t *node = NULL;
+       dns_rdata_nsec3param_t nsec3param;
+       dns_rdata_t rdata = DNS_RDATA_INIT;
+       isc_region_t region;
+       isc_result_t result;
+       dns_slabheader_t *header = NULL, *header_next = NULL;
+       unsigned char *raw; /* RDATASLAB */
+       unsigned int count, length;
+       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+
+       version->havensec3 = false;
+       node = qpdb->origin;
+       NODE_RDLOCK(&(qpdb->node_locks[node->locknum].lock), &nlocktype);
+       for (header = node->data; header != NULL; header = header_next) {
+               header_next = header->next;
+               do {
+                       if (header->serial <= version->serial &&
+                           !IGNORE(header))
+                       {
+                               if (NONEXISTENT(header)) {
+                                       header = NULL;
+                               }
+                               break;
+                       } else {
+                               header = header->down;
+                       }
+               } while (header != NULL);
+
+               if (header != NULL &&
+                   (header->type == dns_rdatatype_nsec3param))
+               {
+                       /*
+                        * Find an NSEC3PARAM with a supported algorithm.
+                        */
+                       raw = dns_slabheader_raw(header);
+                       count = raw[0] * 256 + raw[1]; /* count */
+                       raw += DNS_RDATASET_COUNT + DNS_RDATASET_LENGTH;
+                       while (count-- > 0U) {
+                               length = raw[0] * 256 + raw[1];
+                               raw += DNS_RDATASET_ORDER + DNS_RDATASET_LENGTH;
+                               region.base = raw;
+                               region.length = length;
+                               raw += length;
+                               dns_rdata_fromregion(
+                                       &rdata, qpdb->common.rdclass,
+                                       dns_rdatatype_nsec3param, &region);
+                               result = dns_rdata_tostruct(&rdata, &nsec3param,
+                                                           NULL);
+                               INSIST(result == ISC_R_SUCCESS);
+                               dns_rdata_reset(&rdata);
+
+                               if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG &&
+                                   !dns_nsec3_supportedhash(nsec3param.hash))
+                               {
+                                       continue;
+                               }
+
+                               if (nsec3param.flags != 0) {
+                                       continue;
+                               }
+
+                               memmove(version->salt, nsec3param.salt,
+                                       nsec3param.salt_length);
+                               version->hash = nsec3param.hash;
+                               version->salt_length = nsec3param.salt_length;
+                               version->iterations = nsec3param.iterations;
+                               version->flags = nsec3param.flags;
+                               version->havensec3 = true;
+                               /*
+                                * Look for a better algorithm than the
+                                * unknown test algorithm.
+                                */
+                               if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG) {
+                                       goto unlock;
+                               }
+                       }
+               }
+       }
+unlock:
+       NODE_UNLOCK(&(qpdb->node_locks[node->locknum].lock), &nlocktype);
+}
+
+static void
+cleanup_nondirty(qpdb_version_t *version, qpdb_changedlist_t *cleanup_list) {
+       qpdb_changed_t *changed = NULL, *next_changed = NULL;
+
+       /*
+        * If the changed record is dirty, then an update created multiple
+        * versions of a given rdataset.  We keep this list until we're the
+        * least open version, at which point it's safe to get rid of any
+        * older versions.
+        *
+        * If the changed record isn't dirty, then we don't need it anymore
+        * since we're committing and not rolling back.
+        *
+        * The caller must be holding the database lock.
+        */
+       for (changed = HEAD(version->changed_list); changed != NULL;
+            changed = next_changed)
+       {
+               next_changed = NEXT(changed, link);
+               if (!changed->dirty) {
+                       UNLINK(version->changed_list, changed, link);
+                       APPEND(*cleanup_list, changed, link);
+               }
+       }
+}
+
+static void
+setsecure(dns_db_t *db, qpdb_version_t *version, dns_dbnode_t *origin) {
+       dns_rdataset_t keyset;
+       dns_rdataset_t nsecset, signsecset;
+       bool haszonekey = false;
+       bool hasnsec = false;
+       isc_result_t result;
+
+       dns_rdataset_init(&keyset);
+       result = dns_db_findrdataset(db, origin, version, dns_rdatatype_dnskey,
+                                    0, 0, &keyset, NULL);
+       if (result == ISC_R_SUCCESS) {
+               result = dns_rdataset_first(&keyset);
+               while (result == ISC_R_SUCCESS) {
+                       dns_rdata_t keyrdata = DNS_RDATA_INIT;
+                       dns_rdataset_current(&keyset, &keyrdata);
+                       if (dns_zonekey_iszonekey(&keyrdata)) {
+                               haszonekey = true;
+                               break;
+                       }
+                       result = dns_rdataset_next(&keyset);
+               }
+               dns_rdataset_disassociate(&keyset);
+       }
+       if (!haszonekey) {
+               version->secure = false;
+               version->havensec3 = false;
+               return;
+       }
+
+       dns_rdataset_init(&nsecset);
+       dns_rdataset_init(&signsecset);
+       result = dns_db_findrdataset(db, origin, version, dns_rdatatype_nsec, 0,
+                                    0, &nsecset, &signsecset);
+       if (result == ISC_R_SUCCESS) {
+               if (dns_rdataset_isassociated(&signsecset)) {
+                       hasnsec = true;
+                       dns_rdataset_disassociate(&signsecset);
+               }
+               dns_rdataset_disassociate(&nsecset);
+       }
+
+       setnsec3parameters(db, version);
+
+       /*
+        * Do we have a valid NSEC/NSEC3 chain?
+        */
+       if (version->havensec3 || hasnsec) {
+               version->secure = true;
+       } else {
+               version->secure = false;
+       }
+}
+
 static void
 currentversion(dns_db_t *db, dns_dbversion_t **versionp) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
@@ -1111,71 +1267,376 @@ attachversion(dns_db_t *db, dns_dbversion_t *source,
        *targetp = version;
 }
 
-static void
-closeversion(dns_db_t *db, dns_dbversion_t **versionp,
-            bool commit ISC_ATTR_UNUSED DNS__DB_FLARG) {
+static isc_result_t
+newversion(dns_db_t *db, dns_dbversion_t **versionp) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
        qpdb_version_t *version = NULL;
 
        REQUIRE(VALID_QPZONE(qpdb));
-       version = (qpdb_version_t *)*versionp;
-       INSIST(version->qpdb == qpdb);
+       REQUIRE(versionp != NULL && *versionp == NULL);
+       REQUIRE(qpdb->future_version == NULL);
 
-       /*
-        * XXX: currently only current_version works.
-        */
-       INSIST(version == qpdb->current_version);
+       RWLOCK(&qpdb->lock, isc_rwlocktype_write);
+       INSIST(qpdb->next_serial != 0);
+       version = allocate_version(qpdb->common.mctx, qpdb->next_serial, 1,
+                                  true);
+       version->qpdb = qpdb;
+       version->secure = qpdb->current_version->secure;
+       version->havensec3 = qpdb->current_version->havensec3;
+       if (version->havensec3) {
+               version->flags = qpdb->current_version->flags;
+               version->iterations = qpdb->current_version->iterations;
+               version->hash = qpdb->current_version->hash;
+               version->salt_length = qpdb->current_version->salt_length;
+               memmove(version->salt, qpdb->current_version->salt,
+                       version->salt_length);
+       }
+
+       version->records = qpdb->current_version->records;
+       version->xfrsize = qpdb->current_version->xfrsize;
+
+       qpdb->next_serial++;
+       qpdb->future_version = version;
+       RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
 
-       if (isc_refcount_decrement(&version->references) > 1) {
-               *versionp = NULL;
-               return;
+       *versionp = version;
+
+       return (ISC_R_SUCCESS);
+}
+
+static void
+resigninsert(qpzonedb_t *qpdb, int idx, dns_slabheader_t *newheader) {
+       INSIST(newheader->heap_index == 0);
+       INSIST(!ISC_LINK_LINKED(newheader, link));
+
+       isc_heap_insert(qpdb->heaps[idx], newheader);
+       newheader->heap = qpdb->heaps[idx];
+}
+
+static void
+resigndelete(qpzonedb_t *qpdb, qpdb_version_t *version,
+            dns_slabheader_t *header DNS__DB_FLARG) {
+       if (header != NULL && header->heap_index != 0) {
+               isc_heap_delete(qpdb->heaps[HEADERNODE(header)->locknum],
+                               header->heap_index);
+               header->heap_index = 0;
+               if (version != NULL) {
+                       newref(qpdb, HEADERNODE(header) DNS__DB_FLARG_PASS);
+                       ISC_LIST_APPEND(version->resigned_list, header, link);
+               }
        }
+}
 
-       INSIST(EMPTY(version->changed_list));
+static void
+make_least_version(qpzonedb_t *qpdb, qpdb_version_t *version,
+                  qpdb_changedlist_t *cleanup_list) {
+       qpdb->least_serial = version->serial;
+       *cleanup_list = version->changed_list;
+       ISC_LIST_INIT(version->changed_list);
 }
 
-static isc_result_t
-findrdataset(dns_db_t *db, dns_dbnode_t *dbnode, dns_dbversion_t *dbversion,
-            dns_rdatatype_t type, dns_rdatatype_t covers,
-            isc_stdtime_t now ISC_ATTR_UNUSED, dns_rdataset_t *rdataset,
-            dns_rdataset_t *sigrdataset DNS__DB_FLARG) {
+static void
+rollback_node(qpdata_t *node, uint32_t serial) {
+       dns_slabheader_t *header = NULL, *dcurrent = NULL;
+       bool make_dirty = false;
+
+       /*
+        * We set the IGNORE attribute on rdatasets with serial number
+        * 'serial'.  When the reference count goes to zero, these rdatasets
+        * will be cleaned up; until that time, they will be ignored.
+        */
+       for (header = node->data; header != NULL; header = header->next) {
+               if (header->serial == serial) {
+                       DNS_SLABHEADER_SETATTR(header,
+                                              DNS_SLABHEADERATTR_IGNORE);
+                       make_dirty = true;
+               }
+               for (dcurrent = header->down; dcurrent != NULL;
+                    dcurrent = dcurrent->down)
+               {
+                       if (dcurrent->serial == serial) {
+                               DNS_SLABHEADER_SETATTR(
+                                       dcurrent, DNS_SLABHEADERATTR_IGNORE);
+                               make_dirty = true;
+                       }
+               }
+       }
+       if (make_dirty) {
+               node->dirty = 1;
+       }
+}
+
+static void
+closeversion(dns_db_t *db, dns_dbversion_t **versionp,
+            bool commit DNS__DB_FLARG) {
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
-       qpdata_t *node = (qpdata_t *)dbnode;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
-       dns_slabheader_t *found = NULL, *foundsig = NULL;
-       uint32_t serial;
-       qpdb_version_t *version = dbversion;
-       bool close_version = false;
-       dns_typepair_t matchtype, sigmatchtype;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+       qpdb_version_t *version = NULL, *cleanup_version = NULL;
+       qpdb_version_t *least_greater = NULL;
+       qpdata_t *node = NULL;
+       bool rollback = false;
+       qpdb_changed_t *changed = NULL, *next_changed = NULL;
+       qpdb_changedlist_t cleanup_list;
+       dns_slabheaderlist_t resigned_list;
+       dns_slabheader_t *header = NULL;
+       uint32_t serial, least_serial;
 
        REQUIRE(VALID_QPZONE(qpdb));
-       REQUIRE(type != dns_rdatatype_any);
-       INSIST(version == NULL || version->qpdb == qpdb);
+       version = (qpdb_version_t *)*versionp;
+       INSIST(version->qpdb == qpdb);
 
-       if (version == NULL) {
-               currentversion(db, (dns_dbversion_t **)&version);
-               close_version = true;
+       if (isc_refcount_decrement(&version->references) > 1) {
+               *versionp = NULL;
+               return;
        }
-       serial = version->serial;
 
-       NODE_RDLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+       ISC_LIST_INIT(cleanup_list);
+       ISC_LIST_INIT(resigned_list);
 
-       matchtype = DNS_TYPEPAIR_VALUE(type, covers);
-       if (covers == 0) {
-               sigmatchtype = DNS_SIGTYPE(type);
-       } else {
-               sigmatchtype = 0;
+       /*
+        * Update the zone's secure status in version before making
+        * it the current version.
+        */
+       if (version->writer && commit) {
+               setsecure(db, version, qpdb->origin);
        }
 
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-               do {
-                       if (header->serial <= serial && !IGNORE(header)) {
-                               if (NONEXISTENT(header)) {
-                                       header = NULL;
-                               }
-                               break;
+       RWLOCK(&qpdb->lock, isc_rwlocktype_write);
+       serial = version->serial;
+       if (version->writer) {
+               if (commit) {
+                       unsigned int cur_ref;
+                       qpdb_version_t *cur_version = NULL;
+
+                       INSIST(version == qpdb->future_version);
+                       /*
+                        * The current version is going to be replaced.
+                        * Release the (likely last) reference to it from the
+                        * DB itself and unlink it from the open list.
+                        */
+                       cur_version = qpdb->current_version;
+                       cur_ref = isc_refcount_decrement(
+                               &cur_version->references);
+                       if (cur_ref == 1) {
+                               (void)isc_refcount_current(
+                                       &cur_version->references);
+                               if (cur_version->serial == qpdb->least_serial) {
+                                       INSIST(EMPTY(
+                                               cur_version->changed_list));
+                               }
+                               UNLINK(qpdb->open_versions, cur_version, link);
+                       }
+                       if (EMPTY(qpdb->open_versions)) {
+                               /*
+                                * We're going to become the least open
+                                * version.
+                                */
+                               make_least_version(qpdb, version,
+                                                  &cleanup_list);
+                       } else {
+                               /*
+                                * Some other open version is the
+                                * least version.  We can't cleanup
+                                * records that were changed in this
+                                * version because the older versions
+                                * may still be in use by an open
+                                * version.
+                                *
+                                * We can, however, discard the
+                                * changed records for things that
+                                * we've added that didn't exist in
+                                * prior versions.
+                                */
+                               cleanup_nondirty(version, &cleanup_list);
+                       }
+                       /*
+                        * If the (soon to be former) current version
+                        * isn't being used by anyone, we can clean
+                        * it up.
+                        */
+                       if (cur_ref == 1) {
+                               cleanup_version = cur_version;
+                               APPENDLIST(version->changed_list,
+                                          cleanup_version->changed_list, link);
+                       }
+                       /*
+                        * Become the current version.
+                        */
+                       version->writer = false;
+                       qpdb->current_version = version;
+                       qpdb->current_serial = version->serial;
+                       qpdb->future_version = NULL;
+
+                       /*
+                        * Keep the current version in the open list, and
+                        * gain a reference for the DB itself (see the DB
+                        * creation function below).  This must be the only
+                        * case where we need to increment the counter from
+                        * zero and need to use isc_refcount_increment0().
+                        */
+                       INSIST(isc_refcount_increment0(&version->references) ==
+                              0);
+                       PREPEND(qpdb->open_versions, qpdb->current_version,
+                               link);
+                       resigned_list = version->resigned_list;
+                       ISC_LIST_INIT(version->resigned_list);
+               } else {
+                       /*
+                        * We're rolling back this transaction.
+                        */
+                       cleanup_list = version->changed_list;
+                       ISC_LIST_INIT(version->changed_list);
+                       resigned_list = version->resigned_list;
+                       ISC_LIST_INIT(version->resigned_list);
+                       rollback = true;
+                       cleanup_version = version;
+                       qpdb->future_version = NULL;
+               }
+       } else {
+               if (version != qpdb->current_version) {
+                       /*
+                        * There are no external or internal references
+                        * to this version and it can be cleaned up.
+                        */
+                       cleanup_version = version;
+
+                       /*
+                        * Find the version with the least serial
+                        * number greater than ours.
+                        */
+                       least_greater = PREV(version, link);
+                       if (least_greater == NULL) {
+                               least_greater = qpdb->current_version;
+                       }
+
+                       INSIST(version->serial < least_greater->serial);
+                       /*
+                        * Is this the least open version?
+                        */
+                       if (version->serial == qpdb->least_serial) {
+                               /*
+                                * Yes.  Install the new least open
+                                * version.
+                                */
+                               make_least_version(qpdb, least_greater,
+                                                  &cleanup_list);
+                       } else {
+                               /*
+                                * Add any unexecuted cleanups to
+                                * those of the least greater version.
+                                */
+                               APPENDLIST(least_greater->changed_list,
+                                          version->changed_list, link);
+                       }
+               } else if (version->serial == qpdb->least_serial) {
+                       INSIST(EMPTY(version->changed_list));
+               }
+               UNLINK(qpdb->open_versions, version, link);
+       }
+       least_serial = qpdb->least_serial;
+       RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
+
+       if (cleanup_version != NULL) {
+               isc_refcount_destroy(&cleanup_version->references);
+               INSIST(EMPTY(cleanup_version->changed_list));
+               free_gluetable(&cleanup_version->glue_stack);
+               cds_wfs_destroy(&cleanup_version->glue_stack);
+               isc_rwlock_destroy(&cleanup_version->rwlock);
+               isc_mem_put(qpdb->common.mctx, cleanup_version,
+                           sizeof(*cleanup_version));
+       }
+
+       /*
+        * Commit/rollback re-signed headers.
+        */
+       for (header = HEAD(resigned_list); header != NULL;
+            header = HEAD(resigned_list))
+       {
+               isc_rwlock_t *lock = NULL;
+               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+
+               ISC_LIST_UNLINK(resigned_list, header, link);
+
+               lock = &qpdb->node_locks[HEADERNODE(header)->locknum].lock;
+               NODE_WRLOCK(lock, &nlocktype);
+               if (rollback && !IGNORE(header)) {
+                       resigninsert(qpdb, HEADERNODE(header)->locknum, header);
+               }
+               decref(qpdb, HEADERNODE(header), least_serial,
+                      &nlocktype DNS__DB_FLARG_PASS);
+               NODE_UNLOCK(lock, &nlocktype);
+       }
+
+       if (EMPTY(cleanup_list)) {
+               *versionp = NULL;
+               return;
+       }
+
+       for (changed = HEAD(cleanup_list); changed != NULL;
+            changed = next_changed)
+       {
+               isc_rwlock_t *lock = NULL;
+               isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+
+               next_changed = NEXT(changed, link);
+               node = changed->node;
+               lock = &qpdb->node_locks[node->locknum].lock;
+
+               NODE_WRLOCK(lock, &nlocktype);
+               if (rollback) {
+                       rollback_node(node, serial);
+               }
+               decref(qpdb, node, least_serial, &nlocktype DNS__DB_FILELINE);
+
+               NODE_UNLOCK(lock, &nlocktype);
+
+               isc_mem_put(qpdb->common.mctx, changed, sizeof(*changed));
+       }
+
+       *versionp = NULL;
+}
+
+static isc_result_t
+findrdataset(dns_db_t *db, dns_dbnode_t *dbnode, dns_dbversion_t *dbversion,
+            dns_rdatatype_t type, dns_rdatatype_t covers,
+            isc_stdtime_t now ISC_ATTR_UNUSED, dns_rdataset_t *rdataset,
+            dns_rdataset_t *sigrdataset DNS__DB_FLARG) {
+       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+       qpdata_t *node = (qpdata_t *)dbnode;
+       dns_slabheader_t *header = NULL, *header_next = NULL;
+       dns_slabheader_t *found = NULL, *foundsig = NULL;
+       uint32_t serial;
+       qpdb_version_t *version = dbversion;
+       bool close_version = false;
+       dns_typepair_t matchtype, sigmatchtype;
+       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+
+       REQUIRE(VALID_QPZONE(qpdb));
+       REQUIRE(type != dns_rdatatype_any);
+       INSIST(version == NULL || version->qpdb == qpdb);
+
+       if (version == NULL) {
+               currentversion(db, (dns_dbversion_t **)&version);
+               close_version = true;
+       }
+       serial = version->serial;
+
+       NODE_RDLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+
+       matchtype = DNS_TYPEPAIR_VALUE(type, covers);
+       if (covers == 0) {
+               sigmatchtype = DNS_SIGTYPE(type);
+       } else {
+               sigmatchtype = 0;
+       }
+
+       for (header = node->data; header != NULL; header = header_next) {
+               header_next = header->next;
+               do {
+                       if (header->serial <= serial && !IGNORE(header)) {
+                               if (NONEXISTENT(header)) {
+                                       header = NULL;
+                               }
+                               break;
                        } else {
                                header = header->down;
                        }
@@ -1386,29 +1847,6 @@ add_changed(dns_slabheader_t *header, qpdb_version_t *version DNS__DB_FLARG) {
        return (changed);
 }
 
-static void
-resigninsert(qpzonedb_t *qpdb, int idx, dns_slabheader_t *newheader) {
-       INSIST(newheader->heap_index == 0);
-       INSIST(!ISC_LINK_LINKED(newheader, link));
-
-       isc_heap_insert(qpdb->heaps[idx], newheader);
-       newheader->heap = qpdb->heaps[idx];
-}
-
-static void
-resigndelete(qpzonedb_t *qpdb, qpdb_version_t *version,
-            dns_slabheader_t *header DNS__DB_FLARG) {
-       if (header != NULL && header->heap_index != 0) {
-               isc_heap_delete(qpdb->heaps[HEADERNODE(header)->locknum],
-                               header->heap_index);
-               header->heap_index = 0;
-               if (version != NULL) {
-                       newref(qpdb, HEADERNODE(header) DNS__DB_FLARG_PASS);
-                       ISC_LIST_APPEND(version->resigned_list, header, link);
-               }
-       }
-}
-
 static uint64_t
 recordsize(dns_slabheader_t *header, unsigned int namelen) {
        return (dns_rdataslab_rdatasize((unsigned char *)header,
@@ -1854,182 +2292,46 @@ beginload(dns_db_t *db, dns_rdatacallbacks_t *callbacks) {
        return (ISC_R_SUCCESS);
 }
 
-static void
-setnsec3parameters(dns_db_t *db, qpdb_version_t *version) {
-       qpdata_t *node = NULL;
-       dns_rdata_nsec3param_t nsec3param;
-       dns_rdata_t rdata = DNS_RDATA_INIT;
-       isc_region_t region;
-       isc_result_t result;
-       dns_slabheader_t *header = NULL, *header_next = NULL;
-       unsigned char *raw; /* RDATASLAB */
-       unsigned int count, length;
+static isc_result_t
+endload(dns_db_t *db, dns_rdatacallbacks_t *callbacks) {
+       qpdb_load_t *loadctx = NULL;
        qpzonedb_t *qpdb = (qpzonedb_t *)db;
-       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
 
-       version->havensec3 = false;
-       node = qpdb->origin;
-       NODE_RDLOCK(&(qpdb->node_locks[node->locknum].lock), &nlocktype);
-       for (header = node->data; header != NULL; header = header_next) {
-               header_next = header->next;
-               do {
-                       if (header->serial <= version->serial &&
-                           !IGNORE(header))
-                       {
-                               break;
-                       } else {
-                               header = header->down;
-                       }
-               } while (header != NULL);
+       REQUIRE(VALID_QPZONE(qpdb));
+       REQUIRE(DNS_CALLBACK_VALID(callbacks));
+       loadctx = callbacks->add_private;
+       REQUIRE(loadctx != NULL);
+       REQUIRE(loadctx->db == db);
 
-               if (header != NULL &&
-                   (header->type == dns_rdatatype_nsec3param))
-               {
-                       /*
-                        * Find an NSEC3PARAM with a supported algorithm.
-                        */
-                       raw = dns_slabheader_raw(header);
-                       count = raw[0] * 256 + raw[1]; /* count */
-                       raw += DNS_RDATASET_COUNT + DNS_RDATASET_LENGTH;
-                       while (count-- > 0U) {
-                               length = raw[0] * 256 + raw[1];
-                               raw += DNS_RDATASET_ORDER + DNS_RDATASET_LENGTH;
-                               region.base = raw;
-                               region.length = length;
-                               raw += length;
-                               dns_rdata_fromregion(
-                                       &rdata, qpdb->common.rdclass,
-                                       dns_rdatatype_nsec3param, &region);
-                               result = dns_rdata_tostruct(&rdata, &nsec3param,
-                                                           NULL);
-                               INSIST(result == ISC_R_SUCCESS);
-                               dns_rdata_reset(&rdata);
+       RWLOCK(&qpdb->lock, isc_rwlocktype_write);
 
-                               if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG &&
-                                   !dns_nsec3_supportedhash(nsec3param.hash))
-                               {
-                                       continue;
-                               }
+       REQUIRE((qpdb->attributes & QPDB_ATTR_LOADING) != 0);
+       REQUIRE((qpdb->attributes & QPDB_ATTR_LOADED) == 0);
 
-                               if (nsec3param.flags != 0) {
-                                       continue;
-                               }
+       qpdb->attributes &= ~QPDB_ATTR_LOADING;
+       qpdb->attributes |= QPDB_ATTR_LOADED;
 
-                               memmove(version->salt, nsec3param.salt,
-                                       nsec3param.salt_length);
-                               version->hash = nsec3param.hash;
-                               version->salt_length = nsec3param.salt_length;
-                               version->iterations = nsec3param.iterations;
-                               version->flags = nsec3param.flags;
-                               version->havensec3 = true;
-                               /*
-                                * Look for a better algorithm than the
-                                * unknown test algorithm.
-                                */
-                               if (nsec3param.hash != DNS_NSEC3_UNKNOWNALG) {
-                                       goto unlock;
-                               }
-                       }
-               }
+       if (qpdb->origin != NULL) {
+               dns_dbversion_t *version = qpdb->current_version;
+               RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
+               setsecure(db, version, qpdb->origin);
+       } else {
+               RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
        }
-unlock:
-       NODE_UNLOCK(&(qpdb->node_locks[node->locknum].lock), &nlocktype);
+
+       callbacks->add = NULL;
+       callbacks->add_private = NULL;
+
+       isc_mem_put(qpdb->common.mctx, loadctx, sizeof(*loadctx));
+
+       return (ISC_R_SUCCESS);
 }
 
-static void
-setsecure(dns_db_t *db, qpdb_version_t *version, dns_dbnode_t *origin) {
-       dns_rdataset_t keyset;
-       dns_rdataset_t nsecset, signsecset;
-       bool haszonekey = false;
-       bool hasnsec = false;
+static isc_result_t
+findnodeintree(qpzonedb_t *qpdb, dns_qp_t *qp, const dns_name_t *name,
+              bool create, bool nsec3, dns_dbnode_t **nodep DNS__DB_FLARG) {
        isc_result_t result;
-
-       dns_rdataset_init(&keyset);
-       result = dns_db_findrdataset(db, origin, version, dns_rdatatype_dnskey,
-                                    0, 0, &keyset, NULL);
-       if (result == ISC_R_SUCCESS) {
-               result = dns_rdataset_first(&keyset);
-               while (result == ISC_R_SUCCESS) {
-                       dns_rdata_t keyrdata = DNS_RDATA_INIT;
-                       dns_rdataset_current(&keyset, &keyrdata);
-                       if (dns_zonekey_iszonekey(&keyrdata)) {
-                               haszonekey = true;
-                               break;
-                       }
-                       result = dns_rdataset_next(&keyset);
-               }
-               dns_rdataset_disassociate(&keyset);
-       }
-       if (!haszonekey) {
-               version->secure = false;
-               version->havensec3 = false;
-               return;
-       }
-
-       dns_rdataset_init(&nsecset);
-       dns_rdataset_init(&signsecset);
-       result = dns_db_findrdataset(db, origin, version, dns_rdatatype_nsec, 0,
-                                    0, &nsecset, &signsecset);
-       if (result == ISC_R_SUCCESS) {
-               if (dns_rdataset_isassociated(&signsecset)) {
-                       hasnsec = true;
-                       dns_rdataset_disassociate(&signsecset);
-               }
-               dns_rdataset_disassociate(&nsecset);
-       }
-
-       setnsec3parameters(db, version);
-
-       /*
-        * Do we have a valid NSEC/NSEC3 chain?
-        */
-       if (version->havensec3 || hasnsec) {
-               version->secure = true;
-       } else {
-               version->secure = false;
-       }
-}
-
-static isc_result_t
-endload(dns_db_t *db, dns_rdatacallbacks_t *callbacks) {
-       qpdb_load_t *loadctx = NULL;
-       qpzonedb_t *qpdb = (qpzonedb_t *)db;
-
-       REQUIRE(VALID_QPZONE(qpdb));
-       REQUIRE(DNS_CALLBACK_VALID(callbacks));
-       loadctx = callbacks->add_private;
-       REQUIRE(loadctx != NULL);
-       REQUIRE(loadctx->db == db);
-
-       RWLOCK(&qpdb->lock, isc_rwlocktype_write);
-
-       REQUIRE((qpdb->attributes & QPDB_ATTR_LOADING) != 0);
-       REQUIRE((qpdb->attributes & QPDB_ATTR_LOADED) == 0);
-
-       qpdb->attributes &= ~QPDB_ATTR_LOADING;
-       qpdb->attributes |= QPDB_ATTR_LOADED;
-
-       if (qpdb->origin != NULL) {
-               dns_dbversion_t *version = qpdb->current_version;
-               RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
-               setsecure(db, version, qpdb->origin);
-       } else {
-               RWUNLOCK(&qpdb->lock, isc_rwlocktype_write);
-       }
-
-       callbacks->add = NULL;
-       callbacks->add_private = NULL;
-
-       isc_mem_put(qpdb->common.mctx, loadctx, sizeof(*loadctx));
-
-       return (ISC_R_SUCCESS);
-}
-
-static isc_result_t
-findnodeintree(qpzonedb_t *qpdb, dns_qp_t *qp, const dns_name_t *name,
-              bool create, bool nsec3, dns_dbnode_t **nodep DNS__DB_FLARG) {
-       isc_result_t result;
-       qpdata_t *node = NULL;
+       qpdata_t *node = NULL;
 
        result = dns_qp_getname(qp, name, (void **)&node, NULL);
        if (result != ISC_R_SUCCESS) {
@@ -2539,7 +2841,7 @@ previous_closest_nsec(dns_rdatatype_t type, qpdb_search_t *search,
                 */
                if (result != DNS_R_PARTIALMATCH && result != ISC_R_NOTFOUND) {
                        isc_log_write(dns_lctx, DNS_LOGCATEGORY_DATABASE,
-                                     DNS_LOGMODULE_CACHE, ISC_LOG_ERROR,
+                                     DNS_LOGMODULE_DB, ISC_LOG_ERROR,
                                      "previous_closest_nsec(): %s",
                                      isc_result_totext(result));
                        result = DNS_R_BADDB;
@@ -3486,7 +3788,7 @@ detachnode(dns_db_t *db, dns_dbnode_t **targetp DNS__DB_FLARG) {
                                strlcpy(buf, "<UNKNOWN>", sizeof(buf));
                        }
                        isc_log_write(dns_lctx, DNS_LOGCATEGORY_DATABASE,
-                                     DNS_LOGMODULE_CACHE, ISC_LOG_DEBUG(1),
+                                     DNS_LOGMODULE_DB, ISC_LOG_DEBUG(1),
                                      "calling free_qpdb(%s)", buf);
                        free_qpdb(qpdb, true);
                }
@@ -3564,18 +3866,12 @@ rdatasetiter_destroy(dns_rdatasetiter_t **iteratorp DNS__DB_FLARG) {
        *iteratorp = NULL;
 }
 
-static bool
-iterator_active(qpzonedb_t *qpdb ISC_ATTR_UNUSED,
-               qpdb_rdatasetiter_t *qrditer ISC_ATTR_UNUSED,
-               dns_slabheader_t *header ISC_ATTR_UNUSED) {
-       return (true);
-}
-
 static isc_result_t
 rdatasetiter_first(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
        qpdb_rdatasetiter_t *qrditer = (qpdb_rdatasetiter_t *)iterator;
        qpzonedb_t *qpdb = (qpzonedb_t *)(qrditer->common.db);
        qpdata_t *node = qrditer->common.node;
+       qpdb_version_t *version = qrditer->common.version;
        dns_slabheader_t *header = NULL, *top_next = NULL;
        isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
 
@@ -3584,8 +3880,10 @@ rdatasetiter_first(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
        for (header = node->data; header != NULL; header = top_next) {
                top_next = header->next;
                do {
-                       if (header->serial <= 1 && !IGNORE(header)) {
-                               if (!iterator_active(qpdb, qrditer, header)) {
+                       if (header->serial <= version->serial &&
+                           !IGNORE(header))
+                       {
+                               if (NONEXISTENT(header)) {
                                        header = NULL;
                                }
                                break;
@@ -3614,9 +3912,10 @@ rdatasetiter_next(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
        qpdb_rdatasetiter_t *qrditer = (qpdb_rdatasetiter_t *)iterator;
        qpzonedb_t *qpdb = (qpzonedb_t *)(qrditer->common.db);
        qpdata_t *node = qrditer->common.node;
+       qpdb_version_t *version = qrditer->common.version;
        dns_slabheader_t *header = NULL, *top_next = NULL;
        dns_typepair_t type, negtype;
-       dns_rdatatype_t rdtype, covers;
+       dns_rdatatype_t rdtype;
        isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
 
        header = qrditer->current;
@@ -3628,12 +3927,7 @@ rdatasetiter_next(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
 
        type = header->type;
        rdtype = DNS_TYPEPAIR_TYPE(header->type);
-       if (NEGATIVE(header)) {
-               covers = DNS_TYPEPAIR_COVERS(header->type);
-               negtype = DNS_TYPEPAIR_VALUE(covers, 0);
-       } else {
-               negtype = DNS_TYPEPAIR_VALUE(0, rdtype);
-       }
+       negtype = DNS_TYPEPAIR_VALUE(0, rdtype);
 
        /*
         * Find the start of the header chain for the next type
@@ -3648,8 +3942,10 @@ rdatasetiter_next(dns_rdatasetiter_t *iterator DNS__DB_FLARG) {
        for (header = top_next; header != NULL; header = top_next) {
                top_next = header->next;
                do {
-                       if (header->serial <= 1 && !IGNORE(header)) {
-                               if (!iterator_active(qpdb, qrditer, header)) {
+                       if (header->serial <= version->serial &&
+                           !IGNORE(header))
+                       {
+                               if (NONEXISTENT(header)) {
                                        header = NULL;
                                }
                                break;
@@ -4139,37 +4435,554 @@ createiterator(dns_db_t *db, unsigned int options,
        return (ISC_R_SUCCESS);
 }
 
+static isc_result_t
+addnoqname(isc_mem_t *mctx, dns_slabheader_t *newheader,
+          dns_rdataset_t *rdataset) {
+       isc_result_t result;
+       dns_slabheader_proof_t *noqname = NULL;
+       dns_name_t name = DNS_NAME_INITEMPTY;
+       dns_rdataset_t neg = DNS_RDATASET_INIT, negsig = DNS_RDATASET_INIT;
+       isc_region_t r1, r2;
+
+       result = dns_rdataset_getnoqname(rdataset, &name, &neg, &negsig);
+       RUNTIME_CHECK(result == ISC_R_SUCCESS);
+
+       result = dns_rdataslab_fromrdataset(&neg, mctx, &r1, 0);
+       if (result != ISC_R_SUCCESS) {
+               goto cleanup;
+       }
+
+       result = dns_rdataslab_fromrdataset(&negsig, mctx, &r2, 0);
+       if (result != ISC_R_SUCCESS) {
+               goto cleanup;
+       }
+
+       noqname = isc_mem_get(mctx, sizeof(*noqname));
+       *noqname = (dns_slabheader_proof_t){
+               .neg = r1.base,
+               .negsig = r2.base,
+               .type = neg.type,
+               .name = DNS_NAME_INITEMPTY,
+       };
+       dns_name_dup(&name, mctx, &noqname->name);
+       newheader->noqname = noqname;
+
+cleanup:
+       dns_rdataset_disassociate(&neg);
+       dns_rdataset_disassociate(&negsig);
+
+       return (result);
+}
+
+static isc_result_t
+addclosest(isc_mem_t *mctx, dns_slabheader_t *newheader,
+          dns_rdataset_t *rdataset) {
+       isc_result_t result;
+       dns_slabheader_proof_t *closest = NULL;
+       dns_name_t name = DNS_NAME_INITEMPTY;
+       dns_rdataset_t neg = DNS_RDATASET_INIT, negsig = DNS_RDATASET_INIT;
+       isc_region_t r1, r2;
+
+       result = dns_rdataset_getclosest(rdataset, &name, &neg, &negsig);
+       RUNTIME_CHECK(result == ISC_R_SUCCESS);
+
+       result = dns_rdataslab_fromrdataset(&neg, mctx, &r1, 0);
+       if (result != ISC_R_SUCCESS) {
+               goto cleanup;
+       }
+
+       result = dns_rdataslab_fromrdataset(&negsig, mctx, &r2, 0);
+       if (result != ISC_R_SUCCESS) {
+               goto cleanup;
+       }
+
+       closest = isc_mem_get(mctx, sizeof(*closest));
+       *closest = (dns_slabheader_proof_t){
+               .neg = r1.base,
+               .negsig = r2.base,
+               .name = DNS_NAME_INITEMPTY,
+               .type = neg.type,
+       };
+       dns_name_dup(&name, mctx, &closest->name);
+       newheader->closest = closest;
+
+cleanup:
+       dns_rdataset_disassociate(&neg);
+       dns_rdataset_disassociate(&negsig);
+       return (result);
+}
+
+static isc_result_t
+addrdataset(dns_db_t *db, dns_dbnode_t *dbnode, dns_dbversion_t *dbversion,
+           isc_stdtime_t now, dns_rdataset_t *rdataset, unsigned int options,
+           dns_rdataset_t *addedrdataset DNS__DB_FLARG) {
+       isc_result_t result;
+       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+       qpdata_t *node = (qpdata_t *)dbnode;
+       qpdb_version_t *version = dbversion;
+       isc_region_t region;
+       dns_slabheader_t *newheader = NULL;
+       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+       dns_fixedname_t fn;
+       dns_name_t *name = dns_fixedname_initname(&fn);
+       dns_qp_t *nsec = NULL;
+
+       REQUIRE(VALID_QPZONE(qpdb));
+       INSIST(version == NULL || version->qpdb == qpdb);
+
+       /*
+        * SOA records are only allowed at top of zone.
+        */
+       if (rdataset->type == dns_rdatatype_soa && node != qpdb->origin) {
+               return (DNS_R_NOTZONETOP);
+       }
+
+       REQUIRE(((node->nsec == DNS_DB_NSEC_NSEC3 &&
+                 (rdataset->type == dns_rdatatype_nsec3 ||
+                  rdataset->covers == dns_rdatatype_nsec3)) ||
+                (node->nsec != DNS_DB_NSEC_NSEC3 &&
+                 rdataset->type != dns_rdatatype_nsec3 &&
+                 rdataset->covers != dns_rdatatype_nsec3)));
+
+       if (version == NULL) {
+               if (now == 0) {
+                       now = isc_stdtime_now();
+               }
+       } else {
+               now = 0;
+       }
+
+       result = dns_rdataslab_fromrdataset(rdataset, qpdb->common.mctx,
+                                           &region, sizeof(dns_slabheader_t));
+       if (result != ISC_R_SUCCESS) {
+               return (result);
+       }
+
+       dns_name_copy(node->name, name);
+       dns_rdataset_getownercase(rdataset, name);
+
+       newheader = (dns_slabheader_t *)region.base;
+       *newheader = (dns_slabheader_t){
+               .type = DNS_TYPEPAIR_VALUE(rdataset->type, rdataset->covers),
+               .trust = rdataset->trust,
+               .last_used = now,
+               .node = node,
+       };
+
+       dns_slabheader_reset(newheader, db, node);
+       newheader->ttl = rdataset->ttl + now;
+       if (rdataset->ttl == 0U) {
+               DNS_SLABHEADER_SETATTR(newheader, DNS_SLABHEADERATTR_ZEROTTL);
+       }
+       atomic_init(&newheader->count,
+                   atomic_fetch_add_relaxed(&init_count, 1));
+       if (version != NULL) {
+               newheader->serial = version->serial;
+               now = 0;
+
+               if ((rdataset->attributes & DNS_RDATASETATTR_RESIGN) != 0) {
+                       DNS_SLABHEADER_SETATTR(newheader,
+                                              DNS_SLABHEADERATTR_RESIGN);
+                       newheader->resign =
+                               (isc_stdtime_t)(dns_time64_from32(
+                                                       rdataset->resign) >>
+                                               1);
+                       newheader->resign_lsb = rdataset->resign & 0x1;
+               }
+       } else {
+               newheader->serial = 1;
+               if ((rdataset->attributes & DNS_RDATASETATTR_OPTOUT) != 0) {
+                       DNS_SLABHEADER_SETATTR(newheader,
+                                              DNS_SLABHEADERATTR_OPTOUT);
+               }
+               if ((rdataset->attributes & DNS_RDATASETATTR_NOQNAME) != 0) {
+                       result = addnoqname(qpdb->common.mctx, newheader,
+                                           rdataset);
+                       if (result != ISC_R_SUCCESS) {
+                               dns_slabheader_destroy(&newheader);
+                               return (result);
+                       }
+               }
+               if ((rdataset->attributes & DNS_RDATASETATTR_CLOSEST) != 0) {
+                       result = addclosest(qpdb->common.mctx, newheader,
+                                           rdataset);
+                       if (result != ISC_R_SUCCESS) {
+                               dns_slabheader_destroy(&newheader);
+                               return (result);
+                       }
+               }
+       }
+
+       /*
+        * Add to the auxiliary NSEC tree if we're adding an NSEC record.
+        */
+       if (node->nsec != DNS_DB_NSEC_HAS_NSEC &&
+           rdataset->type == dns_rdatatype_nsec)
+       {
+               dns_qpmulti_write(qpdb->nsec, &nsec);
+       }
+
+       /*
+        * If we're adding a delegation type or adding to the auxiliary NSEC
+        * tree hold an exclusive lock on the tree.  In the latter case the
+        * lock does not necessarily have to be acquired but it will help
+        * purge ancient entries more effectively.
+        *
+        * (Note: node lock must be acquired after starting
+        * the QPDB transaction and released before committing.)
+        */
+       NODE_WRLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+
+       result = ISC_R_SUCCESS;
+       if (nsec != NULL) {
+               qpdata_t *nsecnode = new_qpdata(qpdb, name);
+               result = dns_qp_insert(nsec, nsecnode, 0);
+               if (result == ISC_R_SUCCESS) {
+                       nsecnode->nsec = DNS_DB_NSEC_NSEC;
+                       node->nsec = DNS_DB_NSEC_HAS_NSEC;
+               } else if (result == ISC_R_EXISTS) {
+                       node->nsec = DNS_DB_NSEC_HAS_NSEC;
+                       result = ISC_R_SUCCESS;
+               }
+               qpdata_detach(&nsecnode);
+       }
+
+       if (result == ISC_R_SUCCESS) {
+               result = add(qpdb, node, name, version, newheader, options,
+                            false, addedrdataset, now DNS__DB_FLARG_PASS);
+       }
+
+       /*
+        * If we're adding a delegation type (e.g. NS or DNAME),
+        * then we need to set the callback bit on the node.
+        */
+       if (result == ISC_R_SUCCESS &&
+           delegating_type(qpdb, node, rdataset->type))
+       {
+               node->delegating = 1;
+       }
+
+       NODE_UNLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+
+       if (nsec != NULL) {
+               dns_qpmulti_commit(qpdb->nsec, &nsec);
+       }
+
+       /*
+        * Update the zone's secure status.  If version is non-NULL
+        * this is deferred until closeversion() is called.
+        */
+       if (result == ISC_R_SUCCESS && version == NULL) {
+               setsecure(db, version, qpdb->origin);
+       }
+
+       return (result);
+}
+
+static isc_result_t
+subtractrdataset(dns_db_t *db, dns_dbnode_t *dbnode, dns_dbversion_t *dbversion,
+                dns_rdataset_t *rdataset, unsigned int options,
+                dns_rdataset_t *newrdataset DNS__DB_FLARG) {
+       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+       qpdata_t *node = (qpdata_t *)dbnode;
+       qpdb_version_t *version = dbversion;
+       dns_fixedname_t fname;
+       dns_name_t *nodename = dns_fixedname_initname(&fname);
+       dns_slabheader_t *topheader = NULL, *topheader_prev = NULL;
+       dns_slabheader_t *header = NULL, *newheader = NULL;
+       unsigned char *subresult = NULL;
+       isc_region_t region;
+       isc_result_t result;
+       qpdb_changed_t *changed = NULL;
+       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+
+       REQUIRE(VALID_QPZONE(qpdb));
+       REQUIRE(version != NULL && version->qpdb == qpdb);
+
+       REQUIRE(((node->nsec == DNS_DB_NSEC_NSEC3 &&
+                 (rdataset->type == dns_rdatatype_nsec3 ||
+                  rdataset->covers == dns_rdatatype_nsec3)) ||
+                (node->nsec != DNS_DB_NSEC_NSEC3 &&
+                 rdataset->type != dns_rdatatype_nsec3 &&
+                 rdataset->covers != dns_rdatatype_nsec3)));
+
+       dns_name_copy(node->name, nodename);
+       result = dns_rdataslab_fromrdataset(rdataset, qpdb->common.mctx,
+                                           &region, sizeof(dns_slabheader_t));
+       if (result != ISC_R_SUCCESS) {
+               return (result);
+       }
+
+       newheader = (dns_slabheader_t *)region.base;
+       dns_slabheader_reset(newheader, db, node);
+       newheader->ttl = rdataset->ttl;
+       newheader->type = DNS_TYPEPAIR_VALUE(rdataset->type, rdataset->covers);
+       atomic_init(&newheader->attributes, 0);
+       newheader->serial = version->serial;
+       newheader->trust = 0;
+       newheader->noqname = NULL;
+       newheader->closest = NULL;
+       atomic_init(&newheader->count,
+                   atomic_fetch_add_relaxed(&init_count, 1));
+       newheader->last_used = 0;
+       newheader->node = node;
+       newheader->db = (dns_db_t *)qpdb;
+       if ((rdataset->attributes & DNS_RDATASETATTR_RESIGN) != 0) {
+               DNS_SLABHEADER_SETATTR(newheader, DNS_SLABHEADERATTR_RESIGN);
+               newheader->resign =
+                       (isc_stdtime_t)(dns_time64_from32(rdataset->resign) >>
+                                       1);
+               newheader->resign_lsb = rdataset->resign & 0x1;
+       } else {
+               newheader->resign = 0;
+               newheader->resign_lsb = 0;
+       }
+
+       NODE_WRLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+
+       changed = add_changed(newheader, version DNS__DB_FLARG_PASS);
+       for (topheader = node->data; topheader != NULL;
+            topheader = topheader->next)
+       {
+               if (topheader->type == newheader->type) {
+                       break;
+               }
+               topheader_prev = topheader;
+       }
+       /*
+        * If header isn't NULL, we've found the right type.  There may be
+        * IGNORE rdatasets between the top of the chain and the first real
+        * data.  We skip over them.
+        */
+       header = topheader;
+       while (header != NULL && IGNORE(header)) {
+               header = header->down;
+       }
+       if (header != NULL && !NONEXISTENT(header)) {
+               unsigned int flags = 0;
+               subresult = NULL;
+               result = ISC_R_SUCCESS;
+               if ((options & DNS_DBSUB_EXACT) != 0) {
+                       flags |= DNS_RDATASLAB_EXACT;
+                       if (newheader->ttl != header->ttl) {
+                               result = DNS_R_NOTEXACT;
+                       }
+               }
+               if (result == ISC_R_SUCCESS) {
+                       result = dns_rdataslab_subtract(
+                               (unsigned char *)header,
+                               (unsigned char *)newheader,
+                               (unsigned int)(sizeof(*newheader)),
+                               qpdb->common.mctx, qpdb->common.rdclass,
+                               (dns_rdatatype_t)header->type, flags,
+                               &subresult);
+               }
+               if (result == ISC_R_SUCCESS) {
+                       dns_slabheader_destroy(&newheader);
+                       newheader = (dns_slabheader_t *)subresult;
+                       dns_slabheader_reset(newheader, db, node);
+                       dns_slabheader_copycase(newheader, header);
+                       if (RESIGN(header)) {
+                               DNS_SLABHEADER_SETATTR(
+                                       newheader, DNS_SLABHEADERATTR_RESIGN);
+                               newheader->resign = header->resign;
+                               newheader->resign_lsb = header->resign_lsb;
+                               resigninsert(qpdb, node->locknum, newheader);
+                       }
+                       /*
+                        * We have to set the serial since the rdataslab
+                        * subtraction routine copies the reserved portion of
+                        * header, not newheader.
+                        */
+                       newheader->serial = version->serial;
+                       /*
+                        * XXXJT: dns_rdataslab_subtract() copied the pointers
+                        * to additional info.  We need to clear these fields
+                        * to avoid having duplicated references.
+                        */
+                       maybe_update_recordsandsize(true, version, newheader,
+                                                   nodename->length);
+               } else if (result == DNS_R_NXRRSET) {
+                       /*
+                        * This subtraction would remove all of the rdata;
+                        * add a nonexistent header instead.
+                        */
+                       dns_slabheader_destroy(&newheader);
+                       newheader = dns_slabheader_new((dns_db_t *)qpdb,
+                                                      (dns_dbnode_t *)node);
+                       newheader->ttl = 0;
+                       newheader->type = topheader->type;
+                       atomic_init(&newheader->attributes,
+                                   DNS_SLABHEADERATTR_NONEXISTENT);
+                       newheader->serial = version->serial;
+               } else {
+                       dns_slabheader_destroy(&newheader);
+                       goto unlock;
+               }
+
+               /*
+                * If we're here, we want to link newheader in front of
+                * topheader.
+                */
+               INSIST(version->serial >= topheader->serial);
+               maybe_update_recordsandsize(false, version, header,
+                                           nodename->length);
+               if (topheader_prev != NULL) {
+                       topheader_prev->next = newheader;
+               } else {
+                       node->data = newheader;
+               }
+               newheader->next = topheader->next;
+               newheader->down = topheader;
+               topheader->next = newheader;
+               node->dirty = 1;
+               changed->dirty = true;
+               resigndelete(qpdb, version, header DNS__DB_FLARG_PASS);
+       } else {
+               /*
+                * The rdataset doesn't exist, so we don't need to do anything
+                * to satisfy the deletion request.
+                */
+               dns_slabheader_destroy(&newheader);
+               if ((options & DNS_DBSUB_EXACT) != 0) {
+                       result = DNS_R_NOTEXACT;
+               } else {
+                       result = DNS_R_UNCHANGED;
+               }
+       }
+
+       if (result == ISC_R_SUCCESS && newrdataset != NULL) {
+               bindrdataset(qpdb, node, newheader, 0,
+                            newrdataset DNS__DB_FLARG_PASS);
+       }
+
+       if (result == DNS_R_NXRRSET && newrdataset != NULL &&
+           (options & DNS_DBSUB_WANTOLD) != 0)
+       {
+               bindrdataset(qpdb, node, header, 0,
+                            newrdataset DNS__DB_FLARG_PASS);
+       }
+
+unlock:
+       NODE_UNLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+
+       /*
+        * Update the zone's secure status.  If version is non-NULL
+        * this is deferred until closeversion() is called.
+        */
+       if (result == ISC_R_SUCCESS && version == NULL) {
+               RWLOCK(&qpdb->lock, isc_rwlocktype_read);
+               version = qpdb->current_version;
+               RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
+               setsecure(db, version, qpdb->origin);
+       }
+
+       return (result);
+}
+
+static isc_result_t
+deleterdataset(dns_db_t *db, dns_dbnode_t *dbnode, dns_dbversion_t *dbversion,
+              dns_rdatatype_t type, dns_rdatatype_t covers DNS__DB_FLARG) {
+       qpzonedb_t *qpdb = (qpzonedb_t *)db;
+       qpdata_t *node = (qpdata_t *)dbnode;
+       qpdb_version_t *version = dbversion;
+       dns_fixedname_t fname;
+       dns_name_t *nodename = dns_fixedname_initname(&fname);
+       isc_result_t result;
+       dns_slabheader_t *newheader = NULL;
+       isc_rwlocktype_t nlocktype = isc_rwlocktype_none;
+
+       REQUIRE(VALID_QPZONE(qpdb));
+       INSIST(version == NULL || version->qpdb == qpdb);
+
+       if (type == dns_rdatatype_any) {
+               return (ISC_R_NOTIMPLEMENTED);
+       }
+       if (type == dns_rdatatype_rrsig && covers == 0) {
+               return (ISC_R_NOTIMPLEMENTED);
+       }
+
+       newheader = dns_slabheader_new(db, node);
+       newheader->type = DNS_TYPEPAIR_VALUE(type, covers);
+       newheader->ttl = 0;
+       atomic_init(&newheader->attributes, DNS_SLABHEADERATTR_NONEXISTENT);
+       if (version != NULL) {
+               newheader->serial = version->serial;
+       }
+
+       dns_name_copy(node->name, nodename);
+
+       NODE_WRLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+       result = add(qpdb, node, nodename, version, newheader, DNS_DBADD_FORCE,
+                    false, NULL, 0 DNS__DB_FLARG_PASS);
+       NODE_UNLOCK(&qpdb->node_locks[node->locknum].lock, &nlocktype);
+
+       /*
+        * Update the zone's secure status.  If version is non-NULL
+        * this is deferred until closeversion() is called.
+        */
+       if (result == ISC_R_SUCCESS && version == NULL) {
+               RWLOCK(&qpdb->lock, isc_rwlocktype_read);
+               version = qpdb->current_version;
+               RWUNLOCK(&qpdb->lock, isc_rwlocktype_read);
+               setsecure(db, version, qpdb->origin);
+       }
+
+       return (result);
+}
+
 static dns_dbmethods_t qpdb_zonemethods = {
        .destroy = qpdb_destroy,
        .beginload = beginload,
        .endload = endload,
-       .setloop = setloop,
        .currentversion = currentversion,
-       .closeversion = closeversion,
+       .newversion = newversion,
        .attachversion = attachversion,
-       .findrdataset = findrdataset,
+       .closeversion = closeversion,
        .findnode = findnode,
        .find = find,
        .attachnode = attachnode,
        .detachnode = detachnode,
        .createiterator = createiterator,
-       .getoriginnode = getoriginnode,
+       .findrdataset = findrdataset,
        .allrdatasets = allrdatasets,
+       .addrdataset = addrdataset,
+       .subtractrdataset = subtractrdataset,
+       .deleterdataset = deleterdataset,
+       .issecure = NULL,
+       .nodecount = NULL,
+       .setloop = setloop,
+       .getoriginnode = getoriginnode,
+       .getnsec3parameters = NULL,
+       .findnsec3node = NULL,
+       .setsigningtime = NULL,
+       .getsigningtime = NULL,
+       .getsize = NULL,
+       .setgluecachestats = NULL,
+       .locknode = NULL,
+       .unlocknode = NULL,
+       .addglue = NULL,
        .deletedata = deletedata,
 };
 
 static void
-destroy_qpdata(qpdata_t *data) {
-       dns_slabheader_t *current = data->data;
-       dns_slabheader_t *next = NULL;
+destroy_qpdata(qpdata_t *node) {
+       dns_slabheader_t *current = NULL, *next = NULL;
+
+       for (current = node->data; current != NULL; current = next) {
+               dns_slabheader_t *down = current->down, *down_next = NULL;
 
-       while (current != NULL) {
                next = current->next;
+
+               for (down = current->down; down != NULL; down = down_next) {
+                       down_next = down->down;
+                       dns_slabheader_destroy(&down);
+               }
+
                dns_slabheader_destroy(&current);
-               current = next;
        }
 
-       isc_mem_putanddetach(&data->mctx, data, sizeof(qpdata_t));
+       isc_mem_putanddetach(&node->mctx, node, sizeof(qpdata_t));
 }
 
 #if DNS_DB_NODETRACE