#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
-#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
-#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
-#define PRIORITY_SYNC (-1) // no queue
-#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
-
-#define KRU_CAPACITY (1<<10)
-#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s
-#define TIME_MULT 1/1 // NOLINT for now, TODO improve readability
- // max fraction of rate limit filled by one cpu (multiplies large int)
- // TODO divide by #cpus?
+#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
+#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
+#define PRIORITY_SYNC (-1) // no queue
+#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
+
+#define KRU_CAPACITY (1<<19)
+ // same as ratelimiting default
+#define MAX_DECAY (KRU_LIMIT * 0.0006929)
+ // halving counters in 1s
+ // 5s from max to 2^11 (priority 3) // TODO change 2^11 to 2^12 to make the times equal?
+ // 3s from 2^11 to 2^8 (priority 2)
+ // 4s from 2^8 to 2^4 (priority 1)
+ // 4s from 2^4 to zero (priority 0)
+#define BASE_PRICE(nsec, cpus) ((uint64_t)MAX_DECAY * 10 * nsec / 1000000ll / cpus)
+ // max value when the single host uses 1/10 of all cpus' time;
+ // needed cpu utilization (rate limit) for other thresholds and prefixes:
+ // single v6/48 v4/24 v6/32 v4/20 v4/18
+ // max: 10.000 % 40.00 % - - - -
+ // 2^11: 0.312 % 1.25 % 10.00 % 20.00 % 80.00 % - (priority 3)
+ // 2^8: 0.039 % 0.16 % 1.25 % 2.50 % 10.00 % 30.00 % (priority 2)
+ // 2^4: 0.002 % 0.01 % 0.08 % 0.16 % 0.63 % 1.87 % (priority 1)
+ // instant limit for single host and 1 cpu: (greater for larger networks and for more cpus)
+ // 35 us for 2^4, 0.56 ms for 2^8, 4.5 ms for 2^11, 144 ms max value
+ // TODO adjust somehow
+ // simple DoT query may cost 1 ms, DoH 2.5 ms; it gets priority 2 during handshake (on laptop);
+ // the instant limits can be doubled by:
+ // doubling half-life (approx.),
+ // doubling percents in the previous table, or
+ // doubling number of cpus
+ // possible solution:
+ // half-life 5s, BASE_PRICE /= 2.5 -> for 4 cpus 1.75 ms fits below 2^4;
+ // still not enough for home routers -> TODO make something configurable, maybe the BASE_PRICE multiplier
#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
#define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
#define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue
#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
+ // TODO measure memory usage instead
#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)
struct defer {
size_t capacity;
kru_price_t max_decay;
+ int cpus;
bool using_avx2;
_Alignas(64) uint8_t kru[];
};
uint64_t phase_elapsed = 0; // ns
bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account
-static inline void phase_set(enum phase p) {
+static inline void phase_set(enum phase p)
+{
if (phase != p) {
phase_elapsed = 0;
phase = p;
}
}
-static inline void phase_account(uint64_t nsec) {
+static inline void phase_account(uint64_t nsec)
+{
kr_assert(phase != PHASE_ANY);
phase_elapsed += nsec;
if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
}
/// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) {
+void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
+{
if (phase_accounting) {
phase_account(nsec);
phase_accounting = false;
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
- kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll; // TODO adjust
+ kru_price_t base_price = BASE_PRICE(nsec, defer->cpus);
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
/// Process a single deferred query (or defer again) if there is any.
/// Time accounting should have been just started, the stamp is used, accounted address is set.
-static inline void process_single_deferred(void) {
+static inline void process_single_deferred(void)
+{
struct protolayer_iter_ctx *ctx = pop_query();
if (ctx == NULL) return;
}
/// Break expired requests at the beginning of queues, uses current stamp.
-static inline void cleanup_queues(void) {
+static inline void cleanup_queues(void)
+{
for (int i = 0; i < QUEUES_CNT; i++) {
int cnt = 0;
while (queue_len(queues[i]) > 0) {
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
+ if (!defer)
+ return protolayer_continue(ctx);
+
if (ctx->session->outgoing)
return protolayer_continue(ctx);
}
/// Idle: continue processing deferred requests.
-static void defer_queues_idle(uv_idle_t *handle) {
+static void defer_queues_idle(uv_idle_t *handle)
+{
kr_assert(waiting_requests > 0);
VERBOSE_LOG("IDLE\n");
VERBOSE_LOG(" %d waiting\n", waiting_requests);
}
-/// Initialize shared memory, queues, idle.
-int defer_init(uv_loop_t *loop)
+/// Initialize shared memory, queues. To be called from Lua.
+int defer_init(const char *mmap_file, int cpus)
{
+ int ret = 0;
+ if (cpus < 1) {
+ ret = EINVAL;
+ goto fail;
+ }
+
struct defer header = {
.capacity = KRU_CAPACITY,
.max_decay = MAX_DECAY,
+ .cpus = cpus,
.using_avx2 = using_avx2(),
};
kr_assert(header_size ==
sizeof(header.capacity) +
sizeof(header.max_decay) +
+ sizeof(header.cpus) +
sizeof(header.using_avx2)); // no undefined padding inside
- int ret = mmapped_init(&defer_mmapped, "defer", size, &header, header_size);
+ ret = mmapped_init(&defer_mmapped, mmap_file, size, &header, header_size);
if (ret == MMAPPED_WAS_FIRST) {
kr_log_info(SYSTEM, "Initializing prioritization...\n");
for (size_t i = 0; i < QUEUES_CNT; i++)
queue_init(queues[i]);
- uv_idle_init(loop, &idle_handle);
return 0;
fail:
return ret;
}
+/// Initialize idle.
+int defer_init_idle(uv_loop_t *loop)
+{
+ return uv_idle_init(loop, &idle_handle);
+}
+
/// Initialize session queue
-int pl_defer_sess_init(struct session2 *session, void *data, void *param) {
+int pl_defer_sess_init(struct session2 *session, void *data, void *param)
+{
struct pl_defer_sess_data *sdata = data;
queue_init(sdata->queue);
return 0;