]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
defer: add special queue for UDP docs-develop-rrl-8r8r8r/deployments/4605
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 22 Jul 2024 16:21:16 +0000 (18:21 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 22 Jul 2024 16:21:16 +0000 (18:21 +0200)
daemon/defer.c
daemon/defer.h
daemon/worker.c

index 134ce3a1ee4e9c99b975d717b1e791c7e8ff2749..884ae8727d15f2db75095f633bc98e926a71cdb1 100644 (file)
@@ -15,8 +15,9 @@
 #define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
 #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
 
-#define LOADS_THRESHOLDS  (uint16_t[])  {1<<4, 1<<8, 1<<11, -1}    // the last one should be UINT16_MAX
-#define QUEUES_CNT        (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1)
+#define LOADS_THRESHOLDS     (uint16_t[])  {1<<4, 1<<8, 1<<11, -1}    // the last one should be UINT16_MAX
+#define QUEUES_CNT           (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS))  // -1 for synchronous, +1 for unverified
+#define UNVERIFIED_PRIORITY  1  // -1 synchronous, 1 async UDP, {0, 2, 3} other async
 
 #define KRU_CAPACITY  (1<<10)
 #define MAX_DECAY     (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s
@@ -28,7 +29,7 @@
 #define MAX_WAITING_REQS     10000 // if exceeded, process single deferred request immediatelly in poll phase
 
 #define VERBOSE_LOG(...) kr_log_notice(DEVEL, "defer |  " __VA_ARGS__)
-// #define VERBOSE_LOG(...)
+//#define VERBOSE_LOG(...)
 
 struct defer {
        size_t capacity;
@@ -91,6 +92,8 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
 
                max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
                                0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
+       } else {
+               return;
        }
 
        VERBOSE_LOG("  %s ADD %4.3f ms -> load: %d on /%d\n",
@@ -99,8 +102,13 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
 
 /// Determine priority of the request in [-1, QUEUES_CNT - 1].
 /// Lower value has higher priority, -1 should be synchronous.
-static inline int classify(const union kr_sockaddr *addr)
+static inline int classify(const union kr_sockaddr *addr, bool stream)
 {
+       if (!stream) {
+               VERBOSE_LOG("    unverified address\n");
+               return UNVERIFIED_PRIORITY; // UDP
+       }
+
        _Alignas(16) uint8_t key[16] = {0, };
        uint16_t max_load = 0;
        uint8_t prefix = 0;
@@ -119,7 +127,11 @@ static inline int classify(const union kr_sockaddr *addr)
 
        VERBOSE_LOG("    load %d on /%d\n", max_load, prefix);
 
-       return threshold_index - 1;
+       int priority = threshold_index - 1;
+       if (priority >= UNVERIFIED_PRIORITY)
+               priority++;
+
+       return priority;
 }
 
 
@@ -159,7 +171,7 @@ static inline void process_single_deferred(void) {
        struct protolayer_iter_ctx *ctx = pop_query();
        if (ctx == NULL) return;
 
-       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
 
        struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
        uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
@@ -175,7 +187,7 @@ static inline void process_single_deferred(void) {
                return;
        }
 
-       int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr);
+       int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
        if (priority > queue_ix) {  // priority dropped (got higher value)
                VERBOSE_LOG("    PUSH to %d\n", priority);
                push_query(ctx, priority);
@@ -195,13 +207,13 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        if (ctx->session->outgoing)
                return protolayer_continue(ctx);
 
-       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
        struct pl_defer_iter_data *data = iter_data;
        data->req_stamp = defer_sample_state.stamp;
 
        VERBOSE_LOG("  %s UNWRAP\n",
                        kr_straddr(ctx->comm->comm_addr));
-       int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr);
+       int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
 
        if (priority == -1) {
                VERBOSE_LOG("    CONTINUE\n");
@@ -231,7 +243,8 @@ static void defer_queues_idle(uv_idle_t *handle) {
                defer_sample_restart();
        }
        defer_sample_stop();  // TODO skip calling and use just restart elsewhere?
-       udp_queue_send_all();
+       udp_queue_send_all(); // TODO keep here or call after processing each priority level?
+                             //      (or after UNVERIFIED_PRIORITY but beware future QUIC)
        if (waiting_requests > 0) {
                VERBOSE_LOG("  %d waiting\n", waiting_requests);
        }
index 1856972a8315c8fa3c6f53de825164e1e831bcfa..3d7980779c27184e8ca7ba680a7558fd26bdcdfc 100644 (file)
@@ -43,10 +43,11 @@ static inline void defer_sample_start(void)
 }
 
 /// Annotate the work currently being accounted by an IP address.
-static inline void defer_sample_addr(const union kr_sockaddr *addr)
+static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
 {
        if (!defer || kr_fails_assert(addr)) return;
        if (!defer_sample_state.is_accounting) return;
+       if (!stream) return;  // UDP is not counted
 
        if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
                // TODO: this costs performance, so only in some debug mode?
index 58ca5839571609d28ff3b5c0a9275f55614745dc..99b5f14057aafc9eeffabe493a3fc08d54115c12 100644 (file)
@@ -1214,7 +1214,7 @@ static int qr_task_step(struct qr_task *task,
                        const struct sockaddr *packet_source, knot_pkt_t *packet)
 {
        if (task && task->ctx->source.session)
-               defer_sample_addr(&task->ctx->source.addr);
+               defer_sample_addr(&task->ctx->source.addr, task->ctx->source.session->stream);
 
        /* No more steps after we're finished. */
        if (!task || task->finished) {