]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: use total accounted time as KRU time docs-develop-rrl-8r8r8r/deployments/5260
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 2 Oct 2024 15:13:10 +0000 (17:13 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 2 Oct 2024 15:14:55 +0000 (17:14 +0200)
daemon/defer.c

index 91fb1295b38e3ed19781c1db5a139ad74bc8decc..d86cd83776da3a4b2c9d13ec6ba1ee8799da71cd 100644 (file)
@@ -1,3 +1,4 @@
+#include <stdatomic.h>
 #include "daemon/defer.h"
 #include "daemon/mmapped.h"
 #include "daemon/session2.h"
 #define QUEUES_CNT           (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS))  // -1 for synchronous, +1 for unverified
 #define UNVERIFIED_PRIORITY  1  // -1 synchronous, 1 async UDP, {0, 2, 3} other async
 
-#define KRU_CAPACITY  (1<<10)
-#define MAX_DECAY     (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s
-#define TIME_MULT     1/1   // NOLINT for now, TODO improve readability
-       // max fraction of rate limit filled by one cpu (multiplies large int)
-       // TODO divide by #cpus?
+#define KRU_CAPACITY         (1<<10)
+#define MAX_DECAY            (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s of accounted time
+#define BASE_PRICE(nsec)     ((uint64_t)MAX_DECAY * nsec / 1000000ll)
+// TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time)
 
 #define REQ_TIMEOUT        5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
 #define IDLE_TIMEOUT       1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
@@ -35,10 +35,12 @@ struct defer {
        size_t capacity;
        kru_price_t max_decay;
        bool using_avx2;
+       _Atomic uint32_t time_now;  // shared counter incremented each msec by each process, used for kru decay only
        _Alignas(64) uint8_t kru[];
 };
 struct defer *defer = NULL;
 struct mmapped defer_mmapped = {0};
+uint64_t kru_time_now_nsec = 0;  // total time contribution of the current process
 
 defer_sample_state_t defer_sample_state = {
        .is_accounting = 0,
@@ -70,7 +72,16 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
        _Alignas(16) uint8_t key[16] = {0, };
        uint16_t max_load = 0;
        uint8_t prefix = 0;
-       kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll;  // TODO adjust
+       kru_price_t base_price = BASE_PRICE(nsec);
+
+       uint32_t time_now;
+       uint32_t time_now_incr = (kru_time_now_nsec + nsec) / 1000000 - kru_time_now_nsec / 1000000;
+       if (time_now_incr > 0) {
+               time_now = atomic_fetch_add_explicit(&defer->time_now, time_now_incr, memory_order_relaxed) + time_now_incr;
+       } else {
+               time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
+       }
+       kru_time_now_nsec += nsec;
 
        if (addr->ip.sa_family == AF_INET6) {
                memcpy(key, &addr->ip6.sin6_addr, 16);
@@ -80,7 +91,7 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
                        prices[i] = base_price / V6_RATE_MULT[i];
                }
 
-               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
                                1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
        } else if (addr->ip.sa_family == AF_INET) {
                memcpy(key, &addr->ip4.sin_addr, 4);
@@ -90,14 +101,14 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
                        prices[i] = base_price / V4_RATE_MULT[i];
                }
 
-               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
                                0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
        } else {
                return;
        }
 
-       VERBOSE_LOG("  %s ADD %4.3f ms -> load: %d on /%d\n",
-                       kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
+       VERBOSE_LOG("  %s ADD %4.3f ms -> load: %d on /%d  (kru time: %d)\n",
+                       kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix, time_now);
 }
 
 /// Determine priority of the request in [-1, QUEUES_CNT - 1].
@@ -109,16 +120,17 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
                return UNVERIFIED_PRIORITY; // UDP
        }
 
+       uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
        _Alignas(16) uint8_t key[16] = {0, };
        uint16_t max_load = 0;
        uint8_t prefix = 0;
        if (addr->ip.sa_family == AF_INET6) {
                memcpy(key, &addr->ip6.sin6_addr, 16);
-               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
                                1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
        } else if (addr->ip.sa_family == AF_INET) {
                memcpy(key, &addr->ip4.sin_addr, 4);
-               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
                                0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
        }
 
@@ -281,6 +293,7 @@ int defer_init(uv_loop_t *loop)
                kr_log_info(SYSTEM, "Initializing prioritization...\n");
 
                defer = defer_mmapped.mem;
+               defer->time_now = 0;
 
                bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay);
                if (!succ) {