-#include <stdatomic.h>
#include "daemon/defer.h"
#include "daemon/mmapped.h"
#include "daemon/session2.h"
#define PRIORITY_SYNC (-1) // no queue
#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
-#define KRU_CAPACITY (1<<10)
-#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s of accounted time
-#define BASE_PRICE(nsec) ((uint64_t)MAX_DECAY * nsec / 1000000ll)
-// TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time)
+#define KRU_CAPACITY (1<<10)
+#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s
+#define TIME_MULT 1/1 // NOLINT for now, TODO improve readability
+ // max fraction of rate limit filled by one cpu (multiplies large int)
+ // TODO divide by #cpus?
#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
size_t capacity;
kru_price_t max_decay;
bool using_avx2;
- _Atomic uint32_t time_now; // shared counter incremented each msec by each process, used for kru decay only
_Alignas(64) uint8_t kru[];
};
struct defer *defer = NULL;
struct mmapped defer_mmapped = {0};
-uint64_t kru_time_now_nsec = 0; // total time contribution of the current process
defer_sample_state_t defer_sample_state = {
.is_accounting = 0,
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
- kru_price_t base_price = BASE_PRICE(nsec);
-
- uint32_t time_now;
- uint32_t time_now_incr = (kru_time_now_nsec + nsec) / 1000000 - kru_time_now_nsec / 1000000;
- if (time_now_incr > 0) {
- time_now = atomic_fetch_add_explicit(&defer->time_now, time_now_incr, memory_order_relaxed) + time_now_incr;
- } else {
- time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
- }
- kru_time_now_nsec += nsec;
+ kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll; // TODO adjust
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
prices[i] = base_price / V6_RATE_MULT[i];
}
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
prices[i] = base_price / V4_RATE_MULT[i];
}
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
} else {
return;
}
- VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d (kru time: %d)\n",
- kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix, time_now);
+ VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
+ kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
}
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
return PRIORITY_UDP;
}
- uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, time_now,
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
}
kr_log_info(SYSTEM, "Initializing prioritization...\n");
defer = defer_mmapped.mem;
- defer->time_now = 0;
bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay);
if (!succ) {