From 9d4ad75d103f1fe5811a1bd2765d5fddaaab0a27 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Luk=C3=A1=C5=A1=20Ondr=C3=A1=C4=8Dek?= Date: Wed, 2 Oct 2024 17:13:10 +0200 Subject: [PATCH] daemon/defer: use total accounted time as KRU time --- daemon/defer.c | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/daemon/defer.c b/daemon/defer.c index 91fb1295b..d86cd8377 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -1,3 +1,4 @@ +#include #include "daemon/defer.h" #include "daemon/mmapped.h" #include "daemon/session2.h" @@ -19,11 +20,10 @@ #define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified #define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async -#define KRU_CAPACITY (1<<10) -#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s -#define TIME_MULT 1/1 // NOLINT for now, TODO improve readability - // max fraction of rate limit filled by one cpu (multiplies large int) - // TODO divide by #cpus? +#define KRU_CAPACITY (1<<10) +#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s of accounted time +#define BASE_PRICE(nsec) ((uint64_t)MAX_DECAY * nsec / 1000000ll) +// TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time) #define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped #define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase @@ -35,10 +35,12 @@ struct defer { size_t capacity; kru_price_t max_decay; bool using_avx2; + _Atomic uint32_t time_now; // shared counter incremented each msec by each process, used for kru decay only _Alignas(64) uint8_t kru[]; }; struct defer *defer = NULL; struct mmapped defer_mmapped = {0}; +uint64_t kru_time_now_nsec = 0; // total time contribution of the current process defer_sample_state_t defer_sample_state = { .is_accounting = 0, @@ -70,7 +72,16 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) { _Alignas(16) uint8_t key[16] = {0, }; uint16_t max_load = 0; uint8_t prefix = 0; - kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll; // TODO adjust + kru_price_t base_price = BASE_PRICE(nsec); + + uint32_t time_now; + uint32_t time_now_incr = (kru_time_now_nsec + nsec) / 1000000 - kru_time_now_nsec / 1000000; + if (time_now_incr > 0) { + time_now = atomic_fetch_add_explicit(&defer->time_now, time_now_incr, memory_order_relaxed) + time_now_incr; + } else { + time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed); + } + kru_time_now_nsec += nsec; if (addr->ip.sa_family == AF_INET6) { memcpy(key, &addr->ip6.sin6_addr, 16); @@ -80,7 +91,7 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) { prices[i] = base_price / V6_RATE_MULT[i]; } - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now, 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix); } else if (addr->ip.sa_family == AF_INET) { memcpy(key, &addr->ip4.sin_addr, 4); @@ -90,14 +101,14 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) { prices[i] = base_price / V4_RATE_MULT[i]; } - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now, 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix); } else { return; } - VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n", - kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix); + VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d (kru time: %d)\n", + kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix, time_now); } /// Determine priority of the request in [-1, QUEUES_CNT - 1]. @@ -109,16 +120,17 @@ static inline int classify(const union kr_sockaddr *addr, bool stream) return UNVERIFIED_PRIORITY; // UDP } + uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed); _Alignas(16) uint8_t key[16] = {0, }; uint16_t max_load = 0; uint8_t prefix = 0; if (addr->ip.sa_family == AF_INET6) { memcpy(key, &addr->ip6.sin6_addr, 16); - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now, 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix); } else if (addr->ip.sa_family == AF_INET) { memcpy(key, &addr->ip4.sin_addr, 4); - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now, 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix); } @@ -281,6 +293,7 @@ int defer_init(uv_loop_t *loop) kr_log_info(SYSTEM, "Initializing prioritization...\n"); defer = defer_mmapped.mem; + defer->time_now = 0; bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay); if (!succ) { -- 2.47.2