From: Lukáš Ondráček Date: Mon, 22 Jul 2024 16:21:16 +0000 (+0200) Subject: defer: add special queue for UDP X-Git-Tag: v6.0.9~1^2~38 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1bed71934c07c9facca640d40ae9619ee91b91c6;p=thirdparty%2Fknot-resolver.git defer: add special queue for UDP --- diff --git a/daemon/defer.c b/daemon/defer.c index 134ce3a1e..884ae8727 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -15,8 +15,9 @@ #define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES)) #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) -#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX -#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) +#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX +#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified +#define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async #define KRU_CAPACITY (1<<10) #define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s @@ -28,7 +29,7 @@ #define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase #define VERBOSE_LOG(...) kr_log_notice(DEVEL, "defer | " __VA_ARGS__) -// #define VERBOSE_LOG(...) +//#define VERBOSE_LOG(...) struct defer { size_t capacity; @@ -91,6 +92,8 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) { max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix); + } else { + return; } VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n", @@ -99,8 +102,13 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) { /// Determine priority of the request in [-1, QUEUES_CNT - 1]. /// Lower value has higher priority, -1 should be synchronous. -static inline int classify(const union kr_sockaddr *addr) +static inline int classify(const union kr_sockaddr *addr, bool stream) { + if (!stream) { + VERBOSE_LOG(" unverified address\n"); + return UNVERIFIED_PRIORITY; // UDP + } + _Alignas(16) uint8_t key[16] = {0, }; uint16_t max_load = 0; uint8_t prefix = 0; @@ -119,7 +127,11 @@ static inline int classify(const union kr_sockaddr *addr) VERBOSE_LOG(" load %d on /%d\n", max_load, prefix); - return threshold_index - 1; + int priority = threshold_index - 1; + if (priority >= UNVERIFIED_PRIORITY) + priority++; + + return priority; } @@ -159,7 +171,7 @@ static inline void process_single_deferred(void) { struct protolayer_iter_ctx *ctx = pop_query(); if (ctx == NULL) return; - defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); + defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx); uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp; @@ -175,7 +187,7 @@ static inline void process_single_deferred(void) { return; } - int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr); + int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); if (priority > queue_ix) { // priority dropped (got higher value) VERBOSE_LOG(" PUSH to %d\n", priority); push_query(ctx, priority); @@ -195,13 +207,13 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( if (ctx->session->outgoing) return protolayer_continue(ctx); - defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); + defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); struct pl_defer_iter_data *data = iter_data; data->req_stamp = defer_sample_state.stamp; VERBOSE_LOG(" %s UNWRAP\n", kr_straddr(ctx->comm->comm_addr)); - int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr); + int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); if (priority == -1) { VERBOSE_LOG(" CONTINUE\n"); @@ -231,7 +243,8 @@ static void defer_queues_idle(uv_idle_t *handle) { defer_sample_restart(); } defer_sample_stop(); // TODO skip calling and use just restart elsewhere? - udp_queue_send_all(); + udp_queue_send_all(); // TODO keep here or call after processing each priority level? + // (or after UNVERIFIED_PRIORITY but beware future QUIC) if (waiting_requests > 0) { VERBOSE_LOG(" %d waiting\n", waiting_requests); } diff --git a/daemon/defer.h b/daemon/defer.h index 1856972a8..3d7980779 100644 --- a/daemon/defer.h +++ b/daemon/defer.h @@ -43,10 +43,11 @@ static inline void defer_sample_start(void) } /// Annotate the work currently being accounted by an IP address. -static inline void defer_sample_addr(const union kr_sockaddr *addr) +static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream) { if (!defer || kr_fails_assert(addr)) return; if (!defer_sample_state.is_accounting) return; + if (!stream) return; // UDP is not counted if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) { // TODO: this costs performance, so only in some debug mode? diff --git a/daemon/worker.c b/daemon/worker.c index 58ca58395..99b5f1405 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -1214,7 +1214,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) { if (task && task->ctx->source.session) - defer_sample_addr(&task->ctx->source.addr); + defer_sample_addr(&task->ctx->source.addr, task->ctx->source.session->stream); /* No more steps after we're finished. */ if (!task || task->finished) {