--- /dev/null
+#include "daemon/defer.h"
+#include "lib/kru.h"
+
+// TODO: move kru_defer to another file
+
+#include "daemon/session2.h"
+
+defer_sample_state_t defer_sample_state = {
+ .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU
+ .is_accounting = 0,
+};
+
+
+static enum protolayer_iter_cb_result pl_defer_unwrap(
+ void *sess_data, void *iter_data,
+ struct protolayer_iter_ctx *ctx)
+{
+
+ kr_log_notice(DEVEL, "DEFER: %s\n",
+ kr_straddr(&defer_sample_state.addr.ip));
+
+ return protolayer_continue(ctx);
+ //return protolayer_async();
+}
+
+void defer_init(void) {
+ protolayer_globals[PROTOLAYER_TYPE_DEFER].unwrap = pl_defer_unwrap;
+}
--- /dev/null
+#include <stdbool.h>
+#include "lib/defines.h"
+#include "lib/utils.h"
+
+// TODO: reconsider `static inline` cases below
+
+
+typedef struct {
+ bool do_sample; /// whether to sample; could be important if _COARSE isn't available
+ int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
+ union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
+ uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
+} defer_sample_state_t;
+extern defer_sample_state_t defer_sample_state;
+
+#include <time.h>
+static inline uint64_t get_stamp(void)
+{
+ struct timespec now_ts = {0};
+ clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
+ return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
+}
+
+/// Start accounting work, if not doing it already.
+static inline void defer_sample_start(void)
+{
+ if (!defer_sample_state.do_sample) return;
+ kr_assert(!defer_sample_state.is_accounting);
+ ++defer_sample_state.is_accounting;
+ defer_sample_state.stamp = get_stamp();
+ defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+}
+
+/// Annotate the work currently being accounted by an IP address.
+static inline void defer_sample_addr(const union kr_sockaddr *addr)
+{
+ if (!defer_sample_state.do_sample || kr_fails_assert(addr)) return;
+ if (!defer_sample_state.is_accounting) return;
+
+ if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
+ // TODO: this costs performance, so only in some debug mode?
+ kr_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok());
+ return;
+ }
+
+ switch (addr->ip.sa_family) {
+ case AF_INET:
+ defer_sample_state.addr.ip4 = addr->ip4;
+ break;
+ case AF_INET6:
+ defer_sample_state.addr.ip6 = addr->ip6;
+ break;
+ default:
+ defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+ break;
+ }
+}
+
+/// Stop accounting work - and change the source if applicable.
+static inline void defer_sample_stop(void)
+{
+ if (!defer_sample_state.do_sample) return;
+
+ if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
+ if (--defer_sample_state.is_accounting) return;
+
+ const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
+
+ // we accounted something
+ // FIXME: drop the log, add KRU, etc.
+ kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0,
+ kr_straddr(&defer_sample_state.addr.ip));
+ // TODO: some queries of internal origin have suspicioiusly high numbers.
+ // We won't be really accounting those, but it might suggest some other issue.
+}
+
+
+void defer_init(void);
knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
int zi_zone_import(const zi_config_t);
-_Bool kr_rrl_request_begin(struct kr_request *);
-void kr_rrl_init(const char *, size_t, uint32_t, uint32_t, int);
+_Bool ratelimiting_request_begin(struct kr_request *);
+void ratelimiting_init(const char *, size_t, uint32_t, uint32_t, int);
struct engine {
char _stub[];
};
knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
int zi_zone_import(const zi_config_t);
-_Bool kr_rrl_request_begin(struct kr_request *);
-void kr_rrl_init(const char *, size_t, uint32_t, uint32_t, int);
+_Bool ratelimiting_request_begin(struct kr_request *);
+void ratelimiting_init(const char *, size_t, uint32_t, uint32_t, int);
struct engine {
char _stub[];
};
knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
int zi_zone_import(const zi_config_t);
-_Bool kr_rrl_request_begin(struct kr_request *);
-void kr_rrl_init(const char *, size_t, uint32_t, uint32_t, int);
+_Bool ratelimiting_request_begin(struct kr_request *);
+void ratelimiting_init(const char *, size_t, uint32_t, uint32_t, int);
struct engine {
char _stub[];
};
worker_resolve_mk_pkt
worker_resolve_start
zi_zone_import
- kr_rrl_request_begin
- kr_rrl_init
+ ratelimiting_request_begin
+ ratelimiting_init
EOF
echo "struct engine" | ${CDEFS} ${KRESD} types | sed '/module_array_t/,$ d'
#include "daemon/network.h"
#include "daemon/udp_queue.h"
#include "daemon/worker.h"
-#include "daemon/rrl/api.h"
+#include "daemon/ratelimiting.h"
+#include "daemon/defer.h"
#ifdef ENABLE_DOH2
#include "daemon/http.h"
io_protolayers_init();
tls_protolayers_init();
proxy_protolayers_init();
+ defer_init();
#ifdef ENABLE_DOH2
http_protolayers_init();
#endif
cleanup:/* Cleanup. */
network_unregister();
- kr_rrl_deinit();
+ ratelimiting_deinit();
kr_resolver_deinit();
worker_deinit();
engine_deinit();
'bindings/modules.c',
'bindings/net.c',
'bindings/worker.c',
+ 'defer.c',
'engine.c',
'ffimodule.c',
'io.c',
'main.c',
'network.c',
'proxyv2.c',
+ 'ratelimiting.c',
'session2.c',
'tls.c',
'tls_ephemeral_credentials.c',
c_src_lint += kresd_src
+unit_tests += [
+ ['ratelimiting', files('ratelimiting.test/tests.c') + libkres_src ],
+
+ # parallel tests timeouts under valgrind; they checks mainly for race conditions, which is not needed there
+ ['ratelimiting-parallel', files('ratelimiting.test/tests-parallel.c') + libkres_src, ['skip_valgrind']]
+]
+
config_tests += [
['cache.clear', files('cache.test/clear.test.lua')],
['zimport', files('zimport.test/zimport.test.lua')],
subdir('lua')
-subdir('rrl')
-
kresd = executable(
'kresd',
#include <fcntl.h>
#include <sys/mman.h>
-#include "daemon/rrl/api.h"
-#include "daemon/rrl/kru.h"
+#include "daemon/ratelimiting.h"
+#include "lib/kru.h"
#include "lib/utils.h"
#include "lib/resolve.h"
#define RRL_V6_PREFIXES_CNT (sizeof(RRL_V6_PREFIXES) / sizeof(*RRL_V6_PREFIXES))
#define RRL_MAX_PREFIXES_CNT ((RRL_V4_PREFIXES_CNT > RRL_V6_PREFIXES_CNT) ? RRL_V4_PREFIXES_CNT : RRL_V6_PREFIXES_CNT)
-struct rrl {
+struct rrl { // TODO rename?
size_t capacity;
uint32_t instant_limit;
uint32_t rate_limit;
int the_rrl_fd = -1;
char *the_rrl_mmap_file = NULL;
-kr_rrl_sample_state_t kr_rrl_sample_state = {
- .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU
- .is_accounting = 0,
-};
-
/// return whether we're using optimized variant right now
static bool using_avx2(void)
{
return result;
}
-void kr_rrl_init(const char *mmap_file, size_t capacity, uint32_t instant_limit, uint32_t rate_limit, int tc_limit_perc)
+void ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_limit, uint32_t rate_limit, int tc_limit_perc)
{
int fd = the_rrl_fd = open(mmap_file, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (fd == -1) {
abort();
}
-void kr_rrl_deinit(void)
+void ratelimiting_deinit(void)
{
if (the_rrl == NULL) return;
int fd = the_rrl_fd;
the_rrl = NULL;
}
-bool kr_rrl_request_begin(struct kr_request *req)
+bool ratelimiting_request_begin(struct kr_request *req)
{
if (!req->qsource.addr)
return false; // don't consider internal requests
--- /dev/null
+#include <stdbool.h>
+#include "lib/defines.h"
+#include "lib/utils.h"
+struct kr_request;
+
+/** Initialize rate-limiting with shared mmapped memory.
+ * The existing data are used if another instance is already using the file
+ * and it was initialized with the same parameters; it fails on mismatch. */
+KR_EXPORT
+void ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_limit, uint32_t rate_limit, int tc_limit_perc);
+
+/** Do rate-limiting, during knot_layer_api::begin. */
+KR_EXPORT
+bool ratelimiting_request_begin(struct kr_request *req);
+
+/** Remove mmapped file data if not used by other processes. */
+KR_EXPORT
+void ratelimiting_deinit(void);
static void the_tests(void **state);
-#include "daemon/rrl/tests.inc.c"
+#include "./tests.inc.c"
#define THREADS 4
hqi % 0xff, (hqi >> 8) % 0xff, (hqi >> 16) % 0xff);
kr_straddr_socket_set((struct sockaddr *)&addr, addr_str, 0);
- if (!kr_rrl_request_begin(&req)) {
+ if (!ratelimiting_request_begin(&req)) {
atomic_fetch_add(&d->stages[si].hosts[hi].passed, 1);
}
static void the_tests(void **state);
-#include "daemon/rrl/tests.inc.c"
+#include "./tests.inc.c"
// defining count_test as macro to let it print usable line number on failure
#define count_test(DESC, EXPECTED_PASSING, MARGIN_FRACT, ...) { \
i % (max_value - min_value + 1) + min_value,
i / (max_value - min_value + 1) % 256);
kr_straddr_socket_set((struct sockaddr *) &addr, addr_str, 0);
- if (kr_rrl_request_begin(&req)) {
+ if (ratelimiting_request_begin(&req)) {
cnt = i;
break;
}
#include "lib/utils.h"
uint64_t fakeclock_now(void);
#define kr_now fakeclock_now
-#include "daemon/rrl/api.c"
+#include "daemon/ratelimiting.c"
#undef kr_now
#define RRL_TABLE_SIZE (1 << 20)
const char *tmpdir = test_tmpdir_create();
char mmap_file[64];
stpcpy(stpcpy(mmap_file, tmpdir), "/rrl");
- kr_rrl_init(mmap_file, RRL_TABLE_SIZE, RRL_INSTANT_LIMIT, RRL_RATE_LIMIT, 100);
+ ratelimiting_init(mmap_file, RRL_TABLE_SIZE, RRL_INSTANT_LIMIT, RRL_RATE_LIMIT, 100);
if (KRU.initialize == KRU_GENERIC.initialize) {
struct kru_generic *kru = (struct kru_generic *) the_rrl->kru;
the_tests(state);
- kr_rrl_deinit();
+ ratelimiting_deinit();
test_tmpdir_remove(tmpdir);
dnssec_crypto_cleanup();
}
+++ /dev/null
-
-#include <stdbool.h>
-#include "lib/defines.h"
-#include "lib/utils.h"
-struct kr_request;
-
-/** Initialize rate-limiting with shared mmapped memory.
- * The existing data are used if another instance is already using the file
- * and it was initialized with the same parameters; it fails on mismatch. */
-KR_EXPORT
-void kr_rrl_init(const char *mmap_file, size_t capacity, uint32_t instant_limit, uint32_t rate_limit, int tc_limit_perc);
-
-/** Do rate-limiting, during knot_layer_api::begin. */
-KR_EXPORT
-bool kr_rrl_request_begin(struct kr_request *req);
-
-/** Remove mmapped file data if not used by other processes. */
-KR_EXPORT
-void kr_rrl_deinit(void);
-
-
-// TODO: reconsider `static inline` cases below
-
-
-typedef struct {
- bool do_sample; /// whether to sample; could be important if _COARSE isn't available
- int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
- union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
- uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
-} kr_rrl_sample_state_t;
-extern kr_rrl_sample_state_t kr_rrl_sample_state;
-
-#include <time.h>
-static inline uint64_t get_stamp(void)
-{
- struct timespec now_ts = {0};
- clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
- return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
-}
-
-/// Start accounting work, if not doing it already.
-static inline void kr_rrl_sample_start(void)
-{
- if (!kr_rrl_sample_state.do_sample) return;
- kr_assert(!kr_rrl_sample_state.is_accounting);
- ++kr_rrl_sample_state.is_accounting;
- kr_rrl_sample_state.stamp = get_stamp();
- kr_rrl_sample_state.addr.ip.sa_family = AF_UNSPEC;
-}
-
-/// Annotate the work currently being accounted by an IP address.
-static inline void kr_rrl_sample_addr(const union kr_sockaddr *addr)
-{
- if (!kr_rrl_sample_state.do_sample || kr_fails_assert(addr)) return;
- if (!kr_rrl_sample_state.is_accounting) return;
-
- if (kr_rrl_sample_state.addr.ip.sa_family != AF_UNSPEC) {
- // TODO: this costs performance, so only in some debug mode?
- kr_assert(kr_sockaddr_cmp(&addr->ip, &kr_rrl_sample_state.addr.ip) == kr_ok());
- return;
- }
-
- switch (addr->ip.sa_family) {
- case AF_INET:
- kr_rrl_sample_state.addr.ip4 = addr->ip4;
- break;
- case AF_INET6:
- kr_rrl_sample_state.addr.ip6 = addr->ip6;
- break;
- default:
- kr_rrl_sample_state.addr.ip.sa_family = AF_UNSPEC;
- break;
- }
-}
-
-/// Stop accounting work - and change the source if applicable.
-static inline void kr_rrl_sample_stop(void)
-{
- if (!kr_rrl_sample_state.do_sample) return;
-
- if (kr_fails_assert(kr_rrl_sample_state.is_accounting > 0)) return; // weird
- if (--kr_rrl_sample_state.is_accounting) return;
-
- const uint64_t elapsed = get_stamp() - kr_rrl_sample_state.stamp;
-
- // we accounted something
- // FIXME: drop the log, add KRU, etc.
- kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0,
- kr_straddr(&kr_rrl_sample_state.addr.ip));
- // TODO: some queries of internal origin have suspicioiusly high numbers.
- // We won't be really accounting those, but it might suggest some other issue.
-}
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-# rate limiting code
-
-kresd_src += files([
- 'api.c',
- 'kru-generic.c',
- 'kru-avx2.c',
- '../../contrib/openbsd/siphash.c',
-])
-
-kresd_deps += [
- # https://mesonbuild.com/howtox.html#add-math-library-lm-portably
- (meson.get_compiler('c').find_library('m', required : false))
-]
-
-unit_tests += [
- ['rrl', files('tests.c', 'kru-generic.c', 'kru-avx2.c', '../../contrib/openbsd/siphash.c') + libkres_src ],
-
- # parallel tests timeouts under valgrind; they checks mainly for race conditions, which is not needed there
- ['rrl-parallel', files('tests-parallel.c', 'kru-generic.c', 'kru-avx2.c', '../../contrib/openbsd/siphash.c') + libkres_src, ['skip_valgrind']]
-]
#include "daemon/io.h"
#include "daemon/udp_queue.h"
#include "daemon/worker.h"
-#include "daemon/rrl/api.h"
+#include "daemon/defer.h"
#include "daemon/proxyv2.h"
#include "daemon/session2.h"
static const enum protolayer_type protolayer_grp_udp53[] = {
PROTOLAYER_TYPE_UDP,
PROTOLAYER_TYPE_PROXYV2_DGRAM,
+ PROTOLAYER_TYPE_DEFER,
PROTOLAYER_TYPE_DNS_DGRAM,
};
static const enum protolayer_type protolayer_grp_tcp53[] = {
PROTOLAYER_TYPE_TCP,
PROTOLAYER_TYPE_PROXYV2_STREAM,
+ PROTOLAYER_TYPE_DEFER,
PROTOLAYER_TYPE_DNS_MULTI_STREAM,
};
static const enum protolayer_type protolayer_grp_dot[] = {
PROTOLAYER_TYPE_TCP,
PROTOLAYER_TYPE_PROXYV2_STREAM,
+ PROTOLAYER_TYPE_DEFER,
PROTOLAYER_TYPE_TLS,
PROTOLAYER_TYPE_DNS_MULTI_STREAM,
};
static const enum protolayer_type protolayer_grp_doh[] = {
PROTOLAYER_TYPE_TCP,
PROTOLAYER_TYPE_PROXYV2_STREAM,
+ PROTOLAYER_TYPE_DEFER,
PROTOLAYER_TYPE_TLS,
PROTOLAYER_TYPE_HTTP,
PROTOLAYER_TYPE_DNS_UNSIZED_STREAM,
// Note two cases: incoming session (new request)
// vs. outgoing session (resuming work on some request)
if (direction == PROTOLAYER_UNWRAP) {
- kr_rrl_sample_start();
+ defer_sample_start();
// In particular we don't want to miss en/decryption work
// for regular connections from clients.
if (!session->outgoing && session->secure && !proxy_allowed(comm->comm_addr))
- kr_rrl_sample_addr((const union kr_sockaddr *)comm->comm_addr);
+ defer_sample_addr((const union kr_sockaddr *)comm->comm_addr);
}
int ret;
ret = protolayer_step(ctx);
if (direction == PROTOLAYER_UNWRAP)
- kr_rrl_sample_stop();
+ defer_sample_stop();
return ret;
}
static void session2_on_timeout(uv_timer_t *timer)
{
- kr_rrl_sample_start();
+ defer_sample_start();
struct session2 *s = timer->data;
session2_event(s, s->timer_event, NULL);
- kr_rrl_sample_stop();
+ defer_sample_stop();
}
int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
XX(DNS_MULTI_STREAM) /**< Multiple packets WITH prepended sizes in a
* stream (may span multiple (un)wraps). */\
XX(DNS_SINGLE_STREAM) /**< Singular packet WITH prepended size in a
- * stream (may span multiple (un)wraps). */
+ * stream (may span multiple (un)wraps). */\
+ /* Requests prioritization */\
+ XX(DEFER)
/** The identifiers of protocol layer types. */
enum protolayer_type {
/** Global data for a specific layered protocol. This is to be initialized in
* the `protolayer_globals` global array (below) during the the resolver's
* startup. It contains pointers to functions implementing a particular
- * protocol, as well as other importand data.
+ * protocol, as well as other important data.
*
* Every member of this struct is allowed to be zero/NULL if a particular
* protocol has no use for it. */
#include "lib/layer.h"
#include "lib/layer/iterate.h" /* kr_response_classify */
#include "lib/utils.h"
-#include "daemon/rrl/api.h"
+#include "daemon/defer.h"
/* Magic defaults for the worker. */
} source;
};
+
/** Query resolution task. */
struct qr_task
{
/* We need to store a copy of peer address. */
memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr));
req->qsource.addr = &ctx->source.addr.ip;
- kr_rrl_sample_addr(&ctx->source.addr);
+ defer_sample_addr(&ctx->source.addr);
if (!comm_addr)
comm_addr = src_addr;
const struct sockaddr *packet_source, knot_pkt_t *packet)
{
if (task && task->ctx->source.session)
- kr_rrl_sample_addr(&task->ctx->source.addr);
+ defer_sample_addr(&task->ctx->source.addr);
/* No more steps after we're finished. */
if (!task || task->finished) {
'generic/lru.c',
'generic/queue.c',
'generic/trie.c',
+ 'kru-avx2.c',
+ 'kru-generic.c',
'layer/cache.c',
'layer/iterate.c',
'layer/validate.c',
'selection_iter.c',
'utils.c',
'zonecut.c',
+ '../contrib/openbsd/siphash.c', # needed for kru
])
c_src_lint += libkres_src
'generic/pack.h',
'generic/queue.h',
'generic/trie.h',
+ 'kru.h',
'layer.h',
'layer/iterate.h',
'log.h',
gnutls,
luajit,
libsystemd,
+ libm
],
install: true,
)
{% from 'macros/common_macros.lua.j2' import boolean %}
{% if cfg.rate_limiting.rate_limit -%}
-C.kr_rrl_init(
- '{{ cfg.rundir }}/rrl',
+C.ratelimiting_init(
+ '{{ cfg.rundir }}/ratelimiting',
{{ cfg.rate_limiting.capacity }},
{{ cfg.rate_limiting.instant_limit }},
{{ cfg.rate_limiting.rate_limit }},
endif
gnutls = dependency('gnutls')
luajit = dependency('luajit')
+# https://mesonbuild.com/howtox.html#add-math-library-lm-portably
+libm = meson.get_compiler('c').find_library('m', required : false)
message('------------------------------')
loadstring('return ' .. act_str)()(state, req)
end
- if ffi.C.kr_rrl_request_begin(req) then return end
+ if ffi.C.ratelimiting_request_begin(req) then return end
local qry = req:initial() -- same as :current() but more descriptive
return policy.evaluate(policy.rules, req, qry, state)