#include <isc/result.h>
#include <isc/string.h>
#include <isc/util.h>
+#include <isc/work.h>
#include <dns/callbacks.h>
#include <dns/catz.h>
dns_dbversion_t *ver;
dns_diff_t diff; /*%< Pending database changes */
+ /* Diff queue */
+ bool diff_running;
+ struct __cds_wfcq_head diff_head;
+ struct cds_wfcq_tail diff_tail;
+ isc_result_t result;
+
_Atomic xfrin_state_t state;
uint32_t expireopt;
bool edns, expireoptset;
/*
* Following variable were made atomic only for loading the values for
- * the statistics channelr, thus all accesses can be **relaxed** because
+ * the statistics channel, thus all accesses can be **relaxed** because
* all store and load operations that affect XFR are done on the same
* thread and only the statistics channel thread could perform a load
* operation from a different thread and it's ok to not be precise in
static isc_result_t
axfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl,
dns_rdata_t *rdata);
-static isc_result_t
-axfr_apply(dns_xfrin_t *xfr);
-static isc_result_t
+static void
axfr_commit(dns_xfrin_t *xfr);
static isc_result_t
axfr_finalize(dns_xfrin_t *xfr);
static isc_result_t
ixfr_init(dns_xfrin_t *xfr);
static isc_result_t
-ixfr_apply(dns_xfrin_t *xfr);
-static isc_result_t
ixfr_putdata(dns_xfrin_t *xfr, dns_diffop_t op, dns_name_t *name, dns_ttl_t ttl,
dns_rdata_t *rdata);
static isc_result_t
static void
xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg);
+static void
+xfrin_end(dns_xfrin_t *xfr, isc_result_t result);
+
static void
xfrin_destroy(dns_xfrin_t *xfr);
/*
* Store a set of AXFR RRs in the database.
*/
-static isc_result_t
-axfr_apply(dns_xfrin_t *xfr) {
- isc_result_t result;
+static void
+axfr_apply(void *arg) {
+ dns_xfrin_t *xfr = arg;
+ isc_result_t result = ISC_R_SUCCESS;
uint64_t records;
+ if (atomic_load(&xfr->shuttingdown)) {
+ result = ISC_R_SHUTTINGDOWN;
+ goto failure;
+ }
+
CHECK(dns_diff_load(&xfr->diff, xfr->axfr.add, xfr->axfr.add_private));
- dns_diff_clear(&xfr->diff);
if (xfr->maxrecords != 0U) {
result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL);
if (result == ISC_R_SUCCESS && records > xfr->maxrecords) {
goto failure;
}
}
- result = ISC_R_SUCCESS;
+
failure:
- return (result);
+ dns_diff_clear(&xfr->diff);
+ xfr->result = result;
}
-static isc_result_t
-axfr_commit(dns_xfrin_t *xfr) {
- isc_result_t result;
+static void
+axfr_apply_done(void *arg) {
+ dns_xfrin_t *xfr = arg;
+ isc_result_t result = xfr->result;
- CHECK(axfr_apply(xfr));
- CHECK(dns_db_endload(xfr->db, &xfr->axfr));
- CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL));
+ if (atomic_load(&xfr->shuttingdown)) {
+ result = ISC_R_SHUTTINGDOWN;
+ }
+
+ if (result == ISC_R_SUCCESS) {
+ CHECK(dns_db_endload(xfr->db, &xfr->axfr));
+ CHECK(dns_zone_verifydb(xfr->zone, xfr->db, NULL));
+ CHECK(axfr_finalize(xfr));
+ } else {
+ (void)dns_db_endload(xfr->db, &xfr->axfr);
+ }
- result = ISC_R_SUCCESS;
failure:
- return (result);
+ xfr->diff_running = false;
+
+ if (result == ISC_R_SUCCESS) {
+ if (atomic_load(&xfr->state) == XFRST_AXFR_END) {
+ xfrin_end(xfr, result);
+ }
+ } else {
+ xfrin_fail(xfr, result, "failed while processing responses");
+ }
+
+ dns_xfrin_detach(&xfr);
+}
+
+static void
+axfr_commit(dns_xfrin_t *xfr) {
+ INSIST(!xfr->diff_running);
+ xfr->diff_running = true;
+ dns_xfrin_ref(xfr);
+ isc_work_enqueue(dns_zone_getloop(xfr->zone), axfr_apply,
+ axfr_apply_done, xfr);
}
static isc_result_t
* IXFR handling
*/
+typedef struct ixfr_apply_data {
+ dns_diff_t diff; /*%< Pending database changes */
+ isc_result_t result;
+ struct cds_wfcq_node wfcq_node;
+} ixfr_apply_data_t;
+
static isc_result_t
ixfr_init(dns_xfrin_t *xfr) {
isc_result_t result;
return (result);
}
-/*
- * Apply a set of IXFR changes to the database.
- */
static isc_result_t
-ixfr_apply(dns_xfrin_t *xfr) {
- isc_result_t result;
- uint64_t records;
+ixfr_begin_transaction(dns_xfrin_t *xfr) {
+ isc_result_t result = ISC_R_SUCCESS;
- if (xfr->ver == NULL) {
- CHECK(dns_db_newversion(xfr->db, &xfr->ver));
- if (xfr->ixfr.journal != NULL) {
- CHECK(dns_journal_begin_transaction(xfr->ixfr.journal));
- }
+ if (xfr->ixfr.journal != NULL) {
+ CHECK(dns_journal_begin_transaction(xfr->ixfr.journal));
}
- CHECK(dns_diff_apply(&xfr->diff, xfr->db, xfr->ver));
+failure:
+ return (result);
+}
+
+static isc_result_t
+ixfr_end_transaction(dns_xfrin_t *xfr) {
+ isc_result_t result = ISC_R_SUCCESS;
+
+ CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver));
+ /* XXX enter ready-to-commit state here */
+ if (xfr->ixfr.journal != NULL) {
+ CHECK(dns_journal_commit(xfr->ixfr.journal));
+ }
+failure:
+ return (result);
+}
+
+static isc_result_t
+ixfr_apply_one(dns_xfrin_t *xfr, ixfr_apply_data_t *data) {
+ isc_result_t result = ISC_R_SUCCESS;
+ uint64_t records;
+
+ CHECK(ixfr_begin_transaction(xfr));
+
+ CHECK(dns_diff_apply(&data->diff, xfr->db, xfr->ver));
if (xfr->maxrecords != 0U) {
result = dns_db_getsize(xfr->db, xfr->ver, &records, NULL);
if (result == ISC_R_SUCCESS && records > xfr->maxrecords) {
}
}
if (xfr->ixfr.journal != NULL) {
- CHECK(dns_journal_writediff(xfr->ixfr.journal, &xfr->diff));
+ CHECK(dns_journal_writediff(xfr->ixfr.journal, &data->diff));
}
- dns_diff_clear(&xfr->diff);
- result = ISC_R_SUCCESS;
+
+ result = ixfr_end_transaction(xfr);
+
+ return (result);
failure:
+ /* We need to end the transaction, but keep the previous error */
+ (void)ixfr_end_transaction(xfr);
+
return (result);
}
-static isc_result_t
-ixfr_commit(dns_xfrin_t *xfr) {
- isc_result_t result;
+static void
+ixfr_apply(void *arg) {
+ dns_xfrin_t *xfr = arg;
+ isc_result_t result = ISC_R_SUCCESS;
- CHECK(ixfr_apply(xfr));
- if (xfr->ver != NULL) {
- CHECK(dns_zone_verifydb(xfr->zone, xfr->db, xfr->ver));
- /* XXX enter ready-to-commit state here */
- if (xfr->ixfr.journal != NULL) {
- CHECK(dns_journal_commit(xfr->ixfr.journal));
+ struct __cds_wfcq_head diff_head;
+ struct cds_wfcq_tail diff_tail;
+
+ /* Initialize local wfcqueue */
+ __cds_wfcq_init(&diff_head, &diff_tail);
+
+ enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
+ &diff_head, &diff_tail, &xfr->diff_head, &xfr->diff_tail);
+ INSIST(ret == CDS_WFCQ_RET_DEST_EMPTY);
+
+ struct cds_wfcq_node *node, *next;
+ __cds_wfcq_for_each_blocking_safe(&diff_head, &diff_tail, node, next) {
+ ixfr_apply_data_t *data =
+ caa_container_of(node, ixfr_apply_data_t, wfcq_node);
+
+ if (atomic_load(&xfr->shuttingdown)) {
+ result = ISC_R_SHUTTINGDOWN;
+ }
+
+ /* Apply only until first failure */
+ if (result == ISC_R_SUCCESS) {
+ /* This also checks for shuttingdown condition */
+ result = ixfr_apply_one(xfr, data);
}
+
+ /* We need to clear and free all data chunks */
+ dns_diff_clear(&data->diff);
+ isc_mem_put(xfr->mctx, data, sizeof(*data));
+ }
+
+ /* FIXME: This might need a barrier or smth */
+ xfr->result = result;
+}
+
+static void
+ixfr_apply_done(void *arg) {
+ dns_xfrin_t *xfr = arg;
+ isc_result_t result = xfr->result;
+
+ if (atomic_load(&xfr->shuttingdown)) {
+ result = ISC_R_SHUTTINGDOWN;
+ }
+
+ if (result != ISC_R_SUCCESS) {
+ goto failure;
+ }
+
+ /* Reschedule */
+ if (!cds_wfcq_empty(&xfr->diff_head, &xfr->diff_tail)) {
+ isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply,
+ ixfr_apply_done, xfr);
+ return;
+ }
+
+failure:
+ xfr->diff_running = false;
+
+ if (result == ISC_R_SUCCESS) {
dns_db_closeversion(xfr->db, &xfr->ver, true);
dns_zone_markdirty(xfr->zone);
+
+ if (atomic_load(&xfr->state) == XFRST_IXFR_END) {
+ xfrin_end(xfr, result);
+ }
+ } else {
+ dns_db_closeversion(xfr->db, &xfr->ver, false);
+
+ xfrin_fail(xfr, result, "failed while processing responses");
}
- result = ISC_R_SUCCESS;
+
+ dns_xfrin_detach(&xfr);
+}
+
+/*
+ * Apply a set of IXFR changes to the database.
+ */
+static isc_result_t
+ixfr_commit(dns_xfrin_t *xfr) {
+ isc_result_t result = ISC_R_SUCCESS;
+ ixfr_apply_data_t *data = isc_mem_get(xfr->mctx, sizeof(*data));
+
+ *data = (ixfr_apply_data_t){ 0 };
+ cds_wfcq_node_init(&data->wfcq_node);
+
+ if (xfr->ver == NULL) {
+ CHECK(dns_db_newversion(xfr->db, &xfr->ver));
+ }
+
+ dns_diff_init(xfr->mctx, &data->diff);
+ /* FIXME: Should we add dns_diff_move() */
+ ISC_LIST_MOVE(data->diff.tuples, xfr->diff.tuples);
+
+ (void)cds_wfcq_enqueue(&xfr->diff_head, &xfr->diff_tail,
+ &data->wfcq_node);
+
+ if (!xfr->diff_running) {
+ dns_xfrin_ref(xfr);
+ xfr->diff_running = true;
+ isc_work_enqueue(dns_zone_getloop(xfr->zone), ixfr_apply,
+ ixfr_apply_done, xfr);
+ }
+
failure:
return (result);
}
result = DNS_R_FORMERR;
goto failure;
}
- CHECK(axfr_commit(xfr));
+ axfr_commit(xfr);
atomic_store(&xfr->state, XFRST_AXFR_END);
break;
}
isc_time_t
dns_xfrin_getstarttime(dns_xfrin_t *xfr) {
- isc_time_t start;
-
REQUIRE(VALID_XFRIN(xfr));
return (atomic_load_relaxed(&xfr->start));
-
- return (start);
}
void
static void
xfrin_cancelio(dns_xfrin_t *xfr) {
- dns_dispatch_done(&xfr->dispentry);
- dns_dispatch_detach(&xfr->disp);
+ if (xfr->dispentry != NULL) {
+ dns_dispatch_done(&xfr->dispentry);
+ }
+ if (xfr->disp != NULL) {
+ dns_dispatch_detach(&xfr->disp);
+ }
}
static void
result = DNS_R_BADIXFR;
}
}
+
xfrin_cancelio(xfr);
- /*
- * Close the journal.
- */
- if (xfr->ixfr.journal != NULL) {
- dns_journal_destroy(&xfr->ixfr.journal);
- }
- if (xfr->done != NULL) {
- (xfr->done)(xfr->zone,
- xfr->expireoptset ? &xfr->expireopt : NULL,
- result);
- xfr->done = NULL;
- }
- xfr->shutdown_result = result;
+ xfrin_end(xfr, result);
}
dns_xfrin_detach(&xfr);
dns_view_weakattach(dns_zone_getview(zone), &xfr->view);
dns_name_init(&xfr->name, NULL);
+ __cds_wfcq_init(&xfr->diff_head, &xfr->diff_tail);
+
atomic_init(&xfr->shuttingdown, false);
atomic_init(&xfr->is_ixfr, false);
return (ISC_R_SUCCESS);
failure:
- if (xfr->dispentry != NULL) {
- dns_dispatch_done(&xfr->dispentry);
- }
- if (xfr->disp != NULL) {
- dns_dispatch_detach(&xfr->disp);
- }
+ xfrin_cancelio(xfr);
dns_xfrin_detach(&xfr);
return (result);
}
}
+static void
+xfrin_end(dns_xfrin_t *xfr, isc_result_t result) {
+ /* Close the journal. */
+ if (xfr->ixfr.journal != NULL) {
+ LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info, result);
+ dns_journal_destroy(&xfr->ixfr.journal);
+ LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info, result);
+ }
+
+ /* Inform the caller. */
+ if (xfr->done != NULL) {
+ LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info, result);
+ (xfr->done)(xfr->zone,
+ xfr->expireoptset ? &xfr->expireopt : NULL, result);
+ xfr->done = NULL;
+ LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result);
+ }
+
+ atomic_store(&xfr->shuttingdown, true);
+ isc_timer_stop(xfr->max_time_timer);
+ if (xfr->shutdown_result == ISC_R_UNSET) {
+ xfr->shutdown_result = result;
+ }
+}
+
static void
xfrin_recv_done(isc_result_t result, isc_region_t *region, void *arg) {
dns_xfrin_t *xfr = (dns_xfrin_t *)arg;
}
}
}
- if (result != ISC_R_NOMORE) {
- goto failure;
+ if (result == ISC_R_NOMORE) {
+ result = ISC_R_SUCCESS;
}
+ CHECK(result);
if (dns_message_gettsig(msg, &tsigowner) != NULL) {
/*
CHECK(xfrin_send_request(xfr));
break;
case XFRST_AXFR_END:
- CHECK(axfr_finalize(xfr));
- FALLTHROUGH;
case XFRST_IXFR_END:
- /*
- * Close the journal.
- */
- if (xfr->ixfr.journal != NULL) {
- LIBDNS_XFRIN_JOURNAL_DESTROY_BEGIN(xfr, xfr->info,
- result);
- dns_journal_destroy(&xfr->ixfr.journal);
- LIBDNS_XFRIN_JOURNAL_DESTROY_END(xfr, xfr->info,
- result);
- }
-
- /*
- * Inform the caller we succeeded.
- */
- if (xfr->done != NULL) {
- LIBDNS_XFRIN_DONE_CALLBACK_BEGIN(xfr, xfr->info,
- result);
- (xfr->done)(xfr->zone,
- xfr->expireoptset ? &xfr->expireopt : NULL,
- ISC_R_SUCCESS);
- xfr->done = NULL;
- LIBDNS_XFRIN_DONE_CALLBACK_END(xfr, xfr->info, result);
- }
-
- atomic_store(&xfr->shuttingdown, true);
+ /* We are at the end, cancel the timers and IO */
+ isc_timer_stop(xfr->max_idle_timer);
isc_timer_stop(xfr->max_time_timer);
- xfr->shutdown_result = ISC_R_SUCCESS;
+ xfrin_cancelio(xfr);
break;
default:
/*
failure:
if (result != ISC_R_SUCCESS) {
+ xfr->result = result;
xfrin_fail(xfr, result, "failed while receiving responses");
}
(unsigned int)(msecs / 1000), (unsigned int)(msecs % 1000),
(unsigned int)persec, atomic_load_relaxed(&xfr->end_serial));
- if (xfr->dispentry != NULL) {
- dns_dispatch_done(&xfr->dispentry);
- }
- if (xfr->disp != NULL) {
- dns_dispatch_detach(&xfr->disp);
+ /* Cleanup unprocessed IXFR data */
+ struct cds_wfcq_node *node, *next;
+ __cds_wfcq_for_each_blocking_safe(&xfr->diff_head, &xfr->diff_tail,
+ node, next) {
+ ixfr_apply_data_t *data =
+ caa_container_of(node, ixfr_apply_data_t, wfcq_node);
+ /* We need to clear and free all data chunks */
+ dns_diff_clear(&data->diff);
+ isc_mem_put(xfr->mctx, data, sizeof(*data));
}
+ /* Cleanup unprocessed AXFR data */
+ dns_diff_clear(&xfr->diff);
+
+ xfrin_cancelio(xfr);
+
if (xfr->transport != NULL) {
dns_transport_detach(&xfr->transport);
}
isc_buffer_free(&xfr->lasttsig);
}
- dns_diff_clear(&xfr->diff);
-
if (xfr->ixfr.journal != NULL) {
dns_journal_destroy(&xfr->ixfr.journal);
}