#include "daemon/defer.h"
+#include "daemon/mmapped.h"
+#include "daemon/session2.h"
#include "lib/kru.h"
+#include "lib/utils.h"
-// TODO: move kru_defer to another file
+#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 }
+#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 }
-#include "daemon/session2.h"
+#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 }
+#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 }
+
+#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
+#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
+#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
+
+#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1}
+#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1)
+
+#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s
+#define TIME_MULT 1/1 // max fraction of rate limit filled by one cpu (multiplies large int) // TODO divide by #cpus?
+
+struct defer {
+ size_t capacity;
+ kru_price_t max_decay;
+ bool using_avx2;
+ uint8_t kru[] ALIGNED(64);
+};
+struct defer *defer = NULL;
+struct mmapped defer_mmapped = {0};
+
+uv_check_t check_handle;
+protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
defer_sample_state_t defer_sample_state = {
- .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU
.is_accounting = 0,
};
+/// Return whether we're using optimized variant right now.
+static bool using_avx2(void)
+{
+ bool result = (KRU.initialize == KRU_AVX2.initialize);
+ kr_require(result || KRU.initialize == KRU_GENERIC.initialize);
+ return result;
+}
+
+/// Increment KRU counters by given time.
+void defer_account(uint64_t nsec, union kr_sockaddr addr) {
+ uint8_t key[16] ALIGNED(16) = {0, };
+ uint16_t max_load = 0;
+ if (defer_sample_state.addr.ip.sa_family == AF_INET6) {
+ struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)&defer_sample_state.addr.ip;
+ memcpy(key, &ipv6->sin6_addr, 16);
+
+ kru_price_t prices[V6_PREFIXES_CNT];
+ for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
+ prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V6_RATE_MULT[i]; // TODO adjust
+ }
+
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT);
+ } else if (defer_sample_state.addr.ip.sa_family == AF_INET) {
+ struct sockaddr_in *ipv4 = (struct sockaddr_in *)&defer_sample_state.addr.ip;
+ memcpy(key, &ipv4->sin_addr, 4); // TODO append port?
+
+ kru_price_t prices[V4_PREFIXES_CNT];
+ for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
+ prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V4_RATE_MULT[i]; // TODO adjust
+ }
+
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT);
+ }
+ kr_log_notice(DEVEL, "%8.3f ms for %s, load: %d\n", nsec / 1000000.0,
+ kr_straddr(&defer_sample_state.addr.ip), max_load);
+}
+
+/// Determine whether the request should be deferred during unwrapping.
static enum protolayer_iter_cb_result pl_defer_unwrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
+ if (ctx->session->outgoing)
+ return protolayer_continue(ctx);
+
+ defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+
+ uint8_t key[16] ALIGNED(16) = {0, };
+ uint16_t max_load = 0;
+ if (ctx->comm->comm_addr->sa_family == AF_INET6) {
+ struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)ctx->comm->comm_addr;
+ memcpy(key, &ipv6->sin6_addr, 16);
+
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT);
+ } else if (ctx->comm->comm_addr->sa_family == AF_INET) {
+ struct sockaddr_in *ipv4 = (struct sockaddr_in *)ctx->comm->comm_addr;
+ memcpy(key, &ipv4->sin_addr, 4); // TODO append port?
- kr_log_notice(DEVEL, "DEFER: %s\n",
- kr_straddr(&defer_sample_state.addr.ip));
+ max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+ 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT);
+ }
- return protolayer_continue(ctx);
- //return protolayer_async();
+ int threshold_index = 0; // 0: synchronous
+ for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++);
+
+ kr_log_notice(DEVEL, "DEFER | addr: %s, load: %d, queue: %d\n",
+ kr_straddr(ctx->comm->src_addr),
+ max_load, threshold_index);
+
+ if (threshold_index == 0)
+ return protolayer_continue(ctx);
+
+ queue_push(queues[threshold_index - 1], ctx);
+
+ return protolayer_async();
+}
+
+/// Continue processing deferred requests in libuv check phase.
+static void defer_queues_check(uv_check_t *handle) {
+ // TODO drop too old requests and/or break processing if it lasts too long (keeping some work to another check phase)
+ for (size_t i = 0; i < QUEUES_CNT; i++) {
+ while (queue_len(queues[i]) > 0) {
+ defer_sample_start();
+ struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
+ queue_pop(queues[i]);
+ defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+ kr_log_notice(DEVEL, "DEFER continue: %s\n",
+ kr_straddr(ctx->comm->comm_addr));
+ protolayer_continue(ctx);
+ defer_sample_stop();
+ }
+ }
}
-void defer_init(void) {
+/// Initialize defer, incl. shared memory with KRU.
+int defer_init(uv_loop_t *loop) {
+ struct defer header = { // TODO adjust hardcoded values
+ .capacity = 1 << 10,
+ .max_decay = MAX_DECAY,
+ .using_avx2 = using_avx2(),
+ };
+
+ size_t capacity_log = 0;
+ for (size_t c = header.capacity - 1; c > 0; c >>= 1) capacity_log++;
+
+ size_t size = offsetof(struct defer, kru) + KRU.get_size(capacity_log);
+ size_t header_size = offsetof(struct defer, kru);
+
+ int ret = mmapped_init(&defer_mmapped, "defer", size, &header, header_size);
+ if (ret == MMAPPED_WAS_FIRST) {
+ kr_log_info(SYSTEM, "Initializing prioritization...\n");
+
+ defer = defer_mmapped.mem;
+
+ bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay);
+ if (!succ) {
+ defer = NULL;
+ ret = kr_error(EINVAL);
+ goto fail;
+ }
+
+ ret = mmapped_init_continue(&defer_mmapped);
+ if (ret != 0) goto fail;
+
+ kr_log_info(SYSTEM, "Prioritization initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+ } else if (ret == 0) {
+ defer = defer_mmapped.mem;
+ kr_log_info(SYSTEM, "Using existing prioritization data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+ } else goto fail;
+
+ for (size_t i = 0; i < QUEUES_CNT; i++)
+ queue_init(queues[i]);
+
protolayer_globals[PROTOLAYER_TYPE_DEFER].unwrap = pl_defer_unwrap;
+ uv_check_init(loop, &check_handle);
+ uv_check_start(&check_handle, defer_queues_check);
+ return 0;
+
+fail:
+
+ kr_log_crit(SYSTEM, "Initialization of shared prioritization data failed.\n");
+ return ret;
+}
+
+/// Deinitialize shared memory.
+void defer_deinit(void)
+{
+ mmapped_deinit(&defer_mmapped);
+ defer = NULL;
}
#include <stdbool.h>
#include "lib/defines.h"
#include "lib/utils.h"
+#include "lib/kru.h"
-// TODO: reconsider `static inline` cases below
+/// Initialize defer, incl. shared memory with KRU.
+int defer_init(uv_loop_t *loop);
+
+/// Deinitialize shared memory.
+void defer_deinit(void);
+/// Increment KRU counters by the given time.
+void defer_account(uint64_t nsec, union kr_sockaddr addr);
typedef struct {
- bool do_sample; /// whether to sample; could be important if _COARSE isn't available
int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
} defer_sample_state_t;
extern defer_sample_state_t defer_sample_state;
+extern struct defer *defer; /// skip sampling/deferring if NULL
+
+
+// TODO: reconsider `static inline` cases below
+
#include <time.h>
static inline uint64_t get_stamp(void)
{
/// Start accounting work, if not doing it already.
static inline void defer_sample_start(void)
{
- if (!defer_sample_state.do_sample) return;
+ if (!defer) return;
kr_assert(!defer_sample_state.is_accounting);
++defer_sample_state.is_accounting;
defer_sample_state.stamp = get_stamp();
/// Annotate the work currently being accounted by an IP address.
static inline void defer_sample_addr(const union kr_sockaddr *addr)
{
- if (!defer_sample_state.do_sample || kr_fails_assert(addr)) return;
+ if (!defer || kr_fails_assert(addr)) return;
if (!defer_sample_state.is_accounting) return;
if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
/// Stop accounting work - and change the source if applicable.
static inline void defer_sample_stop(void)
{
- if (!defer_sample_state.do_sample) return;
+ if (!defer) return;
if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
if (--defer_sample_state.is_accounting) return;
const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
// we accounted something
- // FIXME: drop the log, add KRU, etc.
- kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0,
- kr_straddr(&defer_sample_state.addr.ip));
+
// TODO: some queries of internal origin have suspicioiusly high numbers.
// We won't be really accounting those, but it might suggest some other issue.
-}
-
-void defer_init(void);
+ defer_account(elapsed, defer_sample_state.addr);
+}
io_protolayers_init();
tls_protolayers_init();
proxy_protolayers_init();
- defer_init();
#ifdef ENABLE_DOH2
http_protolayers_init();
#endif
lua_settop(the_engine->L, 0);
}
+ if (defer_init(loop) != 0) {
+ ret = EXIT_FAILURE;
+ goto cleanup;
+ }
+
ret = kr_rules_init_ensure();
if (ret) {
kr_log_error(RULES, "failed to initialize policy rule engine: %s\n",
worker_deinit();
engine_deinit();
network_deinit();
+ defer_deinit();
kr_rules_commit(false);
kr_rules_deinit();
if (loop != NULL) {
#include "lib/utils.h"
#include "lib/resolve.h"
-#define RRL_V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 }
-#define RRL_V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 }
+#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 }
+#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 }
-#define RRL_V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 }
-#define RRL_V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 }
+#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 }
+#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 }
-#define RRL_V4_PREFIXES_CNT (sizeof(RRL_V4_PREFIXES) / sizeof(*RRL_V4_PREFIXES))
-#define RRL_V6_PREFIXES_CNT (sizeof(RRL_V6_PREFIXES) / sizeof(*RRL_V6_PREFIXES))
-#define RRL_MAX_PREFIXES_CNT ((RRL_V4_PREFIXES_CNT > RRL_V6_PREFIXES_CNT) ? RRL_V4_PREFIXES_CNT : RRL_V6_PREFIXES_CNT)
+#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
+#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
+#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
struct ratelimiting {
size_t capacity;
uint32_t rate_limit;
uint16_t tc_limit;
bool using_avx2;
- kru_price_t v4_prices[RRL_V4_PREFIXES_CNT];
- kru_price_t v6_prices[RRL_V6_PREFIXES_CNT];
+ kru_price_t v4_prices[V4_PREFIXES_CNT];
+ kru_price_t v6_prices[V6_PREFIXES_CNT];
uint8_t kru[] ALIGNED(64);
};
struct ratelimiting *ratelimiting = NULL;
(uint64_t) base_price * rate_limit / 1000;
bool succ = KRU.initialize((struct kru *)ratelimiting->kru, capacity_log, max_decay);
- kr_require(succ);
+ if (!succ) {
+ ratelimiting = NULL;
+ ret = kr_error(EINVAL);
+ goto fail;
+ }
- for (size_t i = 0; i < RRL_V4_PREFIXES_CNT; i++) {
- ratelimiting->v4_prices[i] = base_price / RRL_V4_RATE_MULT[i];
+ for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
+ ratelimiting->v4_prices[i] = base_price / V4_RATE_MULT[i];
}
- for (size_t i = 0; i < RRL_V6_PREFIXES_CNT; i++) {
- ratelimiting->v6_prices[i] = base_price / RRL_V6_RATE_MULT[i];
+ for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
+ ratelimiting->v6_prices[i] = base_price / V6_RATE_MULT[i];
}
ret = mmapped_init_continue(&ratelimiting_mmapped);
fail:
kr_log_crit(SYSTEM, "Initialization of shared rate-limiting data failed.\n");
-
return ret;
}
void ratelimiting_deinit(void)
{
- if (ratelimiting == NULL) return;
-
mmapped_deinit(&ratelimiting_mmapped);
ratelimiting = NULL;
}
memcpy(key, &ipv6->sin6_addr, 16);
limited_prefix = KRU.limited_multi_prefix_or((struct kru *)ratelimiting->kru, kr_now(),
- 1, key, RRL_V6_PREFIXES, ratelimiting->v6_prices, RRL_V6_PREFIXES_CNT, &max_final_load);
+ 1, key, V6_PREFIXES, ratelimiting->v6_prices, V6_PREFIXES_CNT, &max_final_load);
} else {
struct sockaddr_in *ipv4 = (struct sockaddr_in *)req->qsource.addr;
memcpy(key, &ipv4->sin_addr, 4); // TODO append port?
limited_prefix = KRU.limited_multi_prefix_or((struct kru *)ratelimiting->kru, kr_now(),
- 0, key, RRL_V4_PREFIXES, ratelimiting->v4_prices, RRL_V4_PREFIXES_CNT, &max_final_load);
+ 0, key, V4_PREFIXES, ratelimiting->v4_prices, V4_PREFIXES_CNT, &max_final_load);
}
limited = (limited_prefix ? 2 : (max_final_load > ratelimiting->tc_limit ? 1 : 0));
}
static void the_tests(void **state)
{
/* IPv4 multi-prefix tests */
- static_assert(RRL_V4_PREFIXES_CNT == 4,
+ static_assert(V4_PREFIXES_CNT == 4,
"There are no more IPv4 limited prefixes (/32, /24, /20, /18 will be tested).");
count_test("IPv4 instant limit /32", INST(V4, 32), 0,
AF_INET, "128.0.64.0", 0, 0);
/* IPv6 multi-prefix tests */
- static_assert(RRL_V6_PREFIXES_CNT == 5,
+ static_assert(V6_PREFIXES_CNT == 5,
"There are no more IPv6 limited prefixes (/128, /64, /56, /48, /32 will be tested).");
count_test("IPv6 instant limit /128, independent to IPv4", INST(V6, 128), 0,
// Accessing RRL configuration of INSTANT/RATE limits for V4/V6 and specific prefix.
#define LIMIT(type, Vx, prefix) (RRL_MULT(Vx, prefix) * RRL_ ## type ## _LIMIT)
-#define RRL_CONFIG(Vx, name) RRL_ ## Vx ## _ ## name
+#define RRL_CONFIG(Vx, name) Vx ## _ ## name
#define RRL_MULT(Vx, prefix) get_mult(RRL_CONFIG(Vx, PREFIXES), RRL_CONFIG(Vx, RATE_MULT), RRL_CONFIG(Vx, PREFIXES_CNT), prefix)
static inline kru_price_t get_mult(uint8_t prefixes[], kru_price_t mults[], size_t cnt, uint8_t wanted_prefix) {
for (size_t i = 0; i < cnt; i++)
if (!had_comm_param)
comm = &session->comm_storage;
- // RRL: at this point we might start doing nontrivial work,
+ // DEFER: at this point we might start doing nontrivial work,
// but we may not know the client's IP yet.
// Note two cases: incoming session (new request)
// vs. outgoing session (resuming work on some request)
- if (direction == PROTOLAYER_UNWRAP) {
+ if (direction == PROTOLAYER_UNWRAP)
defer_sample_start();
- // In particular we don't want to miss en/decryption work
- // for regular connections from clients.
- if (!session->outgoing && session->secure && !proxy_allowed(comm->comm_addr))
- defer_sample_addr((const union kr_sockaddr *)comm->comm_addr);
- }
+
int ret;
struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
* stream (may span multiple (un)wraps). */\
XX(DNS_SINGLE_STREAM) /**< Singular packet WITH prepended size in a
* stream (may span multiple (un)wraps). */\
- /* Requests prioritization */\
+ /* Prioritization of requests */\
XX(DEFER)
/** The identifiers of protocol layer types. */
/* We need to store a copy of peer address. */
memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr));
req->qsource.addr = &ctx->source.addr.ip;
- defer_sample_addr(&ctx->source.addr);
if (!comm_addr)
comm_addr = src_addr;