]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Offload AXFR and IXFR processing
authorOndřej Surý <ondrej@isc.org>
Fri, 13 Oct 2023 12:41:22 +0000 (14:41 +0200)
committerOndřej Surý <ondrej@isc.org>
Thu, 19 Oct 2023 12:57:25 +0000 (14:57 +0200)
Instead of processing received data synchronously, store the incoming
differences in the list and process them asynchronously when we need to
commit the data into the database and/or journal.

lib/dns/xfrin.c

index 359e1cc6f9543e22b538e8ef04fb33a6201eed42..d389f7dc4404f68de861cddd7f00103ed7cfd397 100644 (file)
@@ -22,6 +22,7 @@
 #include <isc/result.h>
 #include <isc/string.h>
 #include <isc/util.h>
+#include <isc/work.h>
 
 #include <dns/callbacks.h>
 #include <dns/catz.h>
@@ -129,6 +130,12 @@ struct dns_xfrin {
        dns_dbversion_t *ver;
        dns_diff_t diff; /*%< Pending database changes */
 
+       /* Diff queue */
+       bool diff_running;
+       struct __cds_wfcq_head diff_head;
+       struct cds_wfcq_tail diff_tail;
+       isc_result_t result;
+
        _Atomic xfrin_state_t state;
        uint32_t expireopt;
        bool edns, expireoptset;
@@ -136,7 +143,7 @@ struct dns_xfrin {
 
        /*
         * Following variable were made atomic only for loading the values for
-        * the statistics channelr, thus all accesses can be **relaxed** because
+        * the statistics channel, thus all accesses can be **relaxed** because
         * all store and load operations that affect XFR are done on the same
         * thread and only the statistics channel thread could perform a load
         * operation from a different thread and it's ok to not be precise in
@@ -210,9 +217,7 @@ axfr_makedb(dns_xfrin_t *xfr, dns_db_t **dbp);
 static isc_result_t
 axfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl,
             dns_rdata_t *rdata);
-static isc_result_t
-axfr_apply(dns_xfrin_t *xfr);
-static isc_result_t
+static void
 axfr_commit(dns_xfrin_t *xfr);
 static isc_result_t
 axfr_finalize(dns_xfrin_t *xfr);
@@ -220,8 +225,6 @@ axfr_finalize(dns_xfrin_t *xfr);
 static isc_result_t
 ixfr_init(dns_xfrin_t *xfr);
 static isc_result_t
-ixfr_apply(dns_xfrin_t *xfr);
-static isc_result_t
 ixfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl,
             dns_rdata_t *rdata);
 static isc_result_t
@@ -242,6 +245,9 @@ xfrin_send_done(isc_result_t eresult, isc_region_t *region, void *arg);
 static void
 xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg);
 
+static void
+xfrin_end(dns_xfrin_t *xfr, isc_result_t result);
+
 static void
 xfrin_destroy(dns_xfrin_t *xfr);
 
@@ -320,13 +326,18 @@ failure:
 /*
  * Store a set of AXFR RRs in the database.
  */
-static isc_result_t
-axfr_apply(dns_xfrin_t *xfr) {
-       isc_result_t result;
+static void
+axfr_apply(void *arg) {
+       dns_xfrin_t *xfr = arg;
+       isc_result_t result = ISC_R_SUCCESS;
        uint64_t records;
 
+       if (atomic_load(&xfr->shuttingdown)) {
+               result = ISC_R_SHUTTINGDOWN;
+               goto failure;
+       }
+
        CHECK(dns_diff_load(&xfr->diff, xfr->axfr.add, xfr->axfr.add_private));
-       dns_diff_clear(&xfr->diff);
        if (xfr->maxrecords != 0U) {
                result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL);
                if (result == ISC_R_SUCCESS && records > xfr->maxrecords) {
@@ -334,22 +345,50 @@ axfr_apply(dns_xfrin_t *xfr) {
                        goto failure;
                }
        }
-       result = ISC_R_SUCCESS;
+
 failure:
-       return (result);
+       dns_diff_clear(&xfr->diff);
+       xfr->result = result;
 }
 
-static isc_result_t
-axfr_commit(dns_xfrin_t *xfr) {
-       isc_result_t result;
+static void
+axfr_apply_done(void *arg) {
+       dns_xfrin_t *xfr = arg;
+       isc_result_t result = xfr->result;
 
-       CHECK(axfr_apply(xfr));
-       CHECK(dns_db_endload(xfr->db, &xfr->axfr));
-       CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL));
+       if (atomic_load(&xfr->shuttingdown)) {
+               result = ISC_R_SHUTTINGDOWN;
+       }
+
+       if (result == ISC_R_SUCCESS) {
+               CHECK(dns_db_endload(xfr->db, &xfr->axfr));
+               CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL));
+               CHECK(axfr_finalize(xfr));
+       } else {
+               (void)dns_db_endload(xfr->db, &xfr->axfr);
+       }
 
-       result = ISC_R_SUCCESS;
 failure:
-       return (result);
+       xfr->diff_running = false;
+
+       if (result == ISC_R_SUCCESS) {
+               if (atomic_load(&xfr->state) == XFRST_AXFR_END) {
+                       xfrin_end(xfr, result);
+               }
+       } else {
+               xfrin_fail(xfr, result, "failed while processing responses");
+       }
+
+       dns_xfrin_detach(&xfr);
+}
+
+static void
+axfr_commit(dns_xfrin_t *xfr) {
+       INSIST(!xfr->diff_running);
+       xfr->diff_running = true;
+       dns_xfrin_ref(xfr);
+       isc_work_enqueue(dns_zone_getloop(xfr->zone), axfr_apply,
+                        axfr_apply_done, xfr);
 }
 
 static isc_result_t
@@ -368,6 +407,12 @@ axfr_finalize(dns_xfrin_t *xfr) {
  * IXFR handling
  */
 
+typedef struct ixfr_apply_data {
+       dns_diff_t diff; /*%< Pending database changes */
+       isc_result_t result;
+       struct cds_wfcq_node wfcq_node;
+} ixfr_apply_data_t;
+
 static isc_result_t
 ixfr_init(dns_xfrin_t *xfr) {
        isc_result_t result;
@@ -414,21 +459,38 @@ failure:
        return (result);
 }
 
-/*
- * Apply a set of IXFR changes to the database.
- */
 static isc_result_t
-ixfr_apply(dns_xfrin_t *xfr) {
-       isc_result_t result;
-       uint64_t records;
+ixfr_begin_transaction(dns_xfrin_t *xfr) {
+       isc_result_t result = ISC_R_SUCCESS;
 
-       if (xfr->ver == NULL) {
-               CHECK(dns_db_newversion(xfr->db, &xfr->ver));
-               if (xfr->ixfr.journal != NULL) {
-                       CHECK(dns_journal_begin_transaction(xfr->ixfr.journal));
-               }
+       if (xfr->ixfr.journal != NULL) {
+               CHECK(dns_journal_begin_transaction(xfr->ixfr.journal));
        }
-       CHECK(dns_diff_apply(&xfr->diff, xfr->db, xfr->ver));
+failure:
+       return (result);
+}
+
+static isc_result_t
+ixfr_end_transaction(dns_xfrin_t *xfr) {
+       isc_result_t result = ISC_R_SUCCESS;
+
+       CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver));
+       /* XXX enter ready-to-commit state here */
+       if (xfr->ixfr.journal != NULL) {
+               CHECK(dns_journal_commit(xfr->ixfr.journal));
+       }
+failure:
+       return (result);
+}
+
+static isc_result_t
+ixfr_apply_one(dns_xfrin_t *xfr, ixfr_apply_data_t *data) {
+       isc_result_t result = ISC_R_SUCCESS;
+       uint64_t records;
+
+       CHECK(ixfr_begin_transaction(xfr));
+
+       CHECK(dns_diff_apply(&data->diff, xfr->db, xfr->ver));
        if (xfr->maxrecords != 0U) {
                result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL);
                if (result == ISC_R_SUCCESS && records > xfr->maxrecords) {
@@ -437,29 +499,126 @@ ixfr_apply(dns_xfrin_t *xfr) {
                }
        }
        if (xfr->ixfr.journal != NULL) {
-               CHECK(dns_journal_writediff(xfr->ixfr.journal, &xfr->diff));
+               CHECK(dns_journal_writediff(xfr->ixfr.journal, &data->diff));
        }
-       dns_diff_clear(&xfr->diff);
-       result = ISC_R_SUCCESS;
+
+       result = ixfr_end_transaction(xfr);
+
+       return (result);
 failure:
+       /* We need to end the transaction, but keep the previous error */
+       (void)ixfr_end_transaction(xfr);
+
        return (result);
 }
 
-static isc_result_t
-ixfr_commit(dns_xfrin_t *xfr) {
-       isc_result_t result;
+static void
+ixfr_apply(void *arg) {
+       dns_xfrin_t *xfr = arg;
+       isc_result_t result = ISC_R_SUCCESS;
 
-       CHECK(ixfr_apply(xfr));
-       if (xfr->ver != NULL) {
-               CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver));
-               /* XXX enter ready-to-commit state here */
-               if (xfr->ixfr.journal != NULL) {
-                       CHECK(dns_journal_commit(xfr->ixfr.journal));
+       struct __cds_wfcq_head diff_head;
+       struct cds_wfcq_tail diff_tail;
+
+       /* Initialize local wfcqueue */
+       __cds_wfcq_init(&diff_head, &diff_tail);
+
+       enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+               &diff_head, &diff_tail, &xfr->diff_head, &xfr->diff_tail);
+       INSIST(ret == CDS_WFCQ_RET_DEST_EMPTY);
+
+       struct cds_wfcq_node *node, *next;
+       __cds_wfcq_for_each_blocking_safe(&diff_head, &diff_tail, node, next) {
+               ixfr_apply_data_t *data =
+                       caa_container_of(node, ixfr_apply_data_t, wfcq_node);
+
+               if (atomic_load(&xfr->shuttingdown)) {
+                       result = ISC_R_SHUTTINGDOWN;
+               }
+
+               /* Apply only until first failure */
+               if (result == ISC_R_SUCCESS) {
+                       /* This also checks for shuttingdown condition */
+                       result = ixfr_apply_one(xfr, data);
                }
+
+               /* We need to clear and free all data chunks */
+               dns_diff_clear(&data->diff);
+               isc_mem_put(xfr->mctx, data, sizeof(*data));
+       }
+
+       /* FIXME: This might need a barrier or smth */
+       xfr->result = result;
+}
+
+static void
+ixfr_apply_done(void *arg) {
+       dns_xfrin_t *xfr = arg;
+       isc_result_t result = xfr->result;
+
+       if (atomic_load(&xfr->shuttingdown)) {
+               result = ISC_R_SHUTTINGDOWN;
+       }
+
+       if (result != ISC_R_SUCCESS) {
+               goto failure;
+       }
+
+       /* Reschedule */
+       if (!cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail)) {
+               isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply,
+                                ixfr_apply_done, xfr);
+               return;
+       }
+
+failure:
+       xfr->diff_running = false;
+
+       if (result == ISC_R_SUCCESS) {
                dns_db_closeversion(xfr->db, &xfr->ver, true);
                dns_zone_markdirty(xfr->zone);
+
+               if (atomic_load(&xfr->state) == XFRST_IXFR_END) {
+                       xfrin_end(xfr, result);
+               }
+       } else {
+               dns_db_closeversion(xfr->db, &xfr->ver, false);
+
+               xfrin_fail(xfr, result, "failed while processing responses");
        }
-       result = ISC_R_SUCCESS;
+
+       dns_xfrin_detach(&xfr);
+}
+
+/*
+ * Apply a set of IXFR changes to the database.
+ */
+static isc_result_t
+ixfr_commit(dns_xfrin_t *xfr) {
+       isc_result_t result = ISC_R_SUCCESS;
+       ixfr_apply_data_t *data = isc_mem_get(xfr->mctx, sizeof(*data));
+
+       *data = (ixfr_apply_data_t){ 0 };
+       cds_wfcq_node_init(&data->wfcq_node);
+
+       if (xfr->ver == NULL) {
+               CHECK(dns_db_newversion(xfr->db, &xfr->ver));
+       }
+
+       dns_diff_init(xfr->mctx, &data->diff);
+       /* FIXME: Should we add dns_diff_move() */
+       ISC_LIST_MOVE(data->diff.tuples, xfr->diff.tuples);
+
+       (void)cds_wfcq_enqueue(&xfr->diff_head, &xfr->diff_tail,
+                              &data->wfcq_node);
+
+       if (!xfr->diff_running) {
+               dns_xfrin_ref(xfr);
+               xfr->diff_running = true;
+               isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply,
+                                ixfr_apply_done, xfr);
+       }
+
 failure:
        return (result);
 }
@@ -672,7 +831,7 @@ redo:
                                result = DNS_R_FORMERR;
                                goto failure;
                        }
-                       CHECK(axfr_commit(xfr));
+                       axfr_commit(xfr);
                        atomic_store(&xfr->state, XFRST_AXFR_END);
                        break;
                }
@@ -764,13 +923,9 @@ xfrin_idledout(void *xfr) {
 
 isc_time_t
 dns_xfrin_getstarttime(dns_xfrin_t *xfr) {
-       isc_time_t start;
-
        REQUIRE(VALID_XFRIN(xfr));
 
        return (atomic_load_relaxed(&xfr->start));
-
-       return (start);
 }
 
 void
@@ -895,8 +1050,12 @@ ISC_REFCOUNT_IMPL(dns_xfrin, xfrin_destroy);
 
 static void
 xfrin_cancelio(dns_xfrin_t *xfr) {
-       dns_dispatch_done(&xfr->dispentry);
-       dns_dispatch_detach(&xfr->disp);
+       if (xfr->dispentry != NULL) {
+               dns_dispatch_done(&xfr->dispentry);
+       }
+       if (xfr->disp != NULL) {
+               dns_dispatch_detach(&xfr->disp);
+       }
 }
 
 static void
@@ -946,21 +1105,10 @@ xfrin_fail(dns_xfrin_t *xfr, isc_result_t result, const char *msg) {
                                result = DNS_R_BADIXFR;
                        }
                }
+
                xfrin_cancelio(xfr);
 
-               /*
-                * Close the journal.
-                */
-               if (xfr->ixfr.journal != NULL) {
-                       dns_journal_destroy(&xfr->ixfr.journal);
-               }
-               if (xfr->done != NULL) {
-                       (xfr->done)(xfr->zone,
-                                   xfr->expireoptset ? &xfr->expireopt : NULL,
-                                   result);
-                       xfr->done = NULL;
-               }
-               xfr->shutdown_result = result;
+               xfrin_end(xfr, result);
        }
 
        dns_xfrin_detach(&xfr);
@@ -995,6 +1143,8 @@ xfrin_create(isc_mem_t *mctx, dns_zone_t *zone, dns_db_t *db,
        dns_view_weakattach(dns_zone_getview(zone), &xfr->view);
        dns_name_init(&xfr->name, NULL);
 
+       __cds_wfcq_init(&xfr->diff_head, &xfr->diff_tail);
+
        atomic_init(&xfr->shuttingdown, false);
        atomic_init(&xfr->is_ixfr, false);
 
@@ -1128,12 +1278,7 @@ xfrin_start(dns_xfrin_t *xfr) {
        return (ISC_R_SUCCESS);
 
 failure:
-       if (xfr->dispentry != NULL) {
-               dns_dispatch_done(&xfr->dispentry);
-       }
-       if (xfr->disp != NULL) {
-               dns_dispatch_detach(&xfr->disp);
-       }
+       xfrin_cancelio(xfr);
        dns_xfrin_detach(&xfr);
 
        return (result);
@@ -1501,6 +1646,31 @@ get_edns_expire(dns_xfrin_t *xfr, dns_message_t *msg) {
        }
 }
 
+static void
+xfrin_end(dns_xfrin_t *xfr, isc_result_t result) {
+       /* Close the journal. */
+       if (xfr->ixfr.journal != NULL) {
+               LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info, result);
+               dns_journal_destroy(&xfr->ixfr.journal);
+               LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info, result);
+       }
+
+       /* Inform the caller. */
+       if (xfr->done != NULL) {
+               LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info, result);
+               (xfr->done)(xfr->zone,
+                           xfr->expireoptset ? &xfr->expireopt : NULL, result);
+               xfr->done = NULL;
+               LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result);
+       }
+
+       atomic_store(&xfr->shuttingdown, true);
+       isc_timer_stop(xfr->max_time_timer);
+       if (xfr->shutdown_result == ISC_R_UNSET) {
+               xfr->shutdown_result = result;
+       }
+}
+
 static void
 xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
        dns_xfrin_t *xfr = (dns_xfrin_t *)arg;
@@ -1715,9 +1885,10 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
                        }
                }
        }
-       if (result != ISC_R_NOMORE) {
-               goto failure;
+       if (result == ISC_R_NOMORE) {
+               result = ISC_R_SUCCESS;
        }
+       CHECK(result);
 
        if (dns_message_gettsig(msg, &tsigowner) != NULL) {
                /*
@@ -1772,36 +1943,11 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
                CHECK(xfrin_send_request(xfr));
                break;
        case XFRST_AXFR_END:
-               CHECK(axfr_finalize(xfr));
-               FALLTHROUGH;
        case XFRST_IXFR_END:
-               /*
-                * Close the journal.
-                */
-               if (xfr->ixfr.journal != NULL) {
-                       LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info,
-                                                          result);
-                       dns_journal_destroy(&xfr->ixfr.journal);
-                       LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info,
-                                                        result);
-               }
-
-               /*
-                * Inform the caller we succeeded.
-                */
-               if (xfr->done != NULL) {
-                       LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info,
-                                                        result);
-                       (xfr->done)(xfr->zone,
-                                   xfr->expireoptset ? &xfr->expireopt : NULL,
-                                   ISC_R_SUCCESS);
-                       xfr->done = NULL;
-                       LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result);
-               }
-
-               atomic_store(&xfr->shuttingdown, true);
+               /* We are at the end, cancel the timers and IO */
+               isc_timer_stop(xfr->max_idle_timer);
                isc_timer_stop(xfr->max_time_timer);
-               xfr->shutdown_result = ISC_R_SUCCESS;
+               xfrin_cancelio(xfr);
                break;
        default:
                /*
@@ -1821,6 +1967,7 @@ xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
 
 failure:
        if (result != ISC_R_SUCCESS) {
+               xfr->result = result;
                xfrin_fail(xfr, result, "failed while receiving responses");
        }
 
@@ -1872,13 +2019,22 @@ xfrin_destroy(dns_xfrin_t *xfr) {
                  (unsigned int)(msecs / 1000), (unsigned int)(msecs % 1000),
                  (unsigned int)persec, atomic_load_relaxed(&xfr->end_serial));
 
-       if (xfr->dispentry != NULL) {
-               dns_dispatch_done(&xfr->dispentry);
-       }
-       if (xfr->disp != NULL) {
-               dns_dispatch_detach(&xfr->disp);
+       /* Cleanup unprocessed IXFR data */
+       struct cds_wfcq_node *node, *next;
+       __cds_wfcq_for_each_blocking_safe(&xfr->diff_head, &xfr->diff_tail,
+                                         node, next) {
+               ixfr_apply_data_t *data =
+                       caa_container_of(node, ixfr_apply_data_t, wfcq_node);
+               /* We need to clear and free all data chunks */
+               dns_diff_clear(&data->diff);
+               isc_mem_put(xfr->mctx, data, sizeof(*data));
        }
 
+       /* Cleanup unprocessed AXFR data */
+       dns_diff_clear(&xfr->diff);
+
+       xfrin_cancelio(xfr);
+
        if (xfr->transport != NULL) {
                dns_transport_detach(&xfr->transport);
        }
@@ -1891,8 +2047,6 @@ xfrin_destroy(dns_xfrin_t *xfr) {
                isc_buffer_free(&xfr->lasttsig);
        }
 
-       dns_diff_clear(&xfr->diff);
-
        if (xfr->ixfr.journal != NULL) {
                dns_journal_destroy(&xfr->ixfr.journal);
        }