+#include <stdatomic.h>
#include "daemon/defer.h"
#include "daemon/mmapped.h"
#include "daemon/session2.h"
#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified
#define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async
-#define KRU_CAPACITY (1<<10)
-#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s
-#define TIME_MULT 1/1 // NOLINT for now, TODO improve readability
- // max fraction of rate limit filled by one cpu (multiplies large int)
- // TODO divide by #cpus?
+#define KRU_CAPACITY (1<<10)
+#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s of accounted time
+#define BASE_PRICE(nsec) ((uint64_t)MAX_DECAY * nsec / 1000000ll)
+// TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time)
#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
size_t capacity;
kru_price_t max_decay;
bool using_avx2;
+ _Atomic uint32_t time_now; // shared counter incremented each msec by each process, used for kru decay only
_Alignas(64) uint8_t kru[];
};
struct defer *defer = NULL;
struct mmapped defer_mmapped = {0};
+uint64_t kru_time_now_nsec = 0; // total time contribution of the current process
defer_sample_state_t defer_sample_state = {
.is_accounting = 0,
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
- kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll; // TODO adjust
+ kru_price_t base_price = BASE_PRICE(nsec);
+
+ uint32_t time_now;
+ uint32_t time_now_incr = (kru_time_now_nsec + nsec) / 1000000 - kru_time_now_nsec / 1000000;
+ if (time_now_incr > 0) {
+ time_now = atomic_fetch_add_explicit(&defer->time_now, time_now_incr, memory_order_relaxed) + time_now_incr;
+ } else {
+ time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
+ }
+ kru_time_now_nsec += nsec;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
prices[i] = base_price / V6_RATE_MULT[i];
}
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
prices[i] = base_price / V4_RATE_MULT[i];
}
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
} else {
return;
}
- VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
- kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
+ VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d (kru time: %d)\n",
+ kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix, time_now);
}
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
return UNVERIFIED_PRIORITY; // UDP
}
+ uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
}
kr_log_info(SYSTEM, "Initializing prioritization...\n");
defer = defer_mmapped.mem;
+ defer->time_now = 0;
bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay);
if (!succ) {