From d047b22d300230ad994e51c4fbd0c4dffbd3892d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Luk=C3=A1=C5=A1=20Ondr=C3=A1=C4=8Dek?= Date: Wed, 10 Jul 2024 17:28:45 +0200 Subject: [PATCH] defer: add request and idle timeouts, limit on waiting queries --- daemon/defer.c | 232 +++++++++++++++++++++++++++++++++------------ daemon/defer.h | 22 ++++- daemon/mmapped.h | 2 +- daemon/session2.c | 5 + daemon/session2.h | 5 + daemon/udp_queue.c | 11 ++- daemon/udp_queue.h | 2 + 7 files changed, 213 insertions(+), 66 deletions(-) diff --git a/daemon/defer.c b/daemon/defer.c index a4a58e5be..134ce3a1e 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -1,6 +1,7 @@ #include "daemon/defer.h" #include "daemon/mmapped.h" #include "daemon/session2.h" +#include "daemon/udp_queue.h" #include "lib/kru.h" #include "lib/utils.h" @@ -14,11 +15,20 @@ #define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES)) #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) -#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} +#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX #define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) -#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s -#define TIME_MULT 1/1 // max fraction of rate limit filled by one cpu (multiplies large int) // TODO divide by #cpus? +#define KRU_CAPACITY (1<<10) +#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s +#define TIME_MULT 1/1 // max fraction of rate limit filled by one cpu (multiplies large int) + // TODO divide by #cpus? + +#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped +#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase +#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase + +#define VERBOSE_LOG(...) kr_log_notice(DEVEL, "defer | " __VA_ARGS__) +// #define VERBOSE_LOG(...) struct defer { size_t capacity; @@ -29,13 +39,23 @@ struct defer { struct defer *defer = NULL; struct mmapped defer_mmapped = {0}; -uv_check_t check_handle; -protolayer_iter_ctx_queue_t queues[QUEUES_CNT]; - defer_sample_state_t defer_sample_state = { .is_accounting = 0, }; +uv_idle_t idle_handle; +static void defer_queues_idle(uv_idle_t *handle); + +protolayer_iter_ctx_queue_t queues[QUEUES_CNT]; +int waiting_requests = 0; +int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue ) + +struct pl_defer_iter_data { + struct protolayer_data h; + uint64_t req_stamp; // time when request was received, uses get_stamp() + // TODO use different clock than CLOCK_THREAD_CPUTIME_ID? +}; + /// Return whether we're using optimized variant right now. static bool using_avx2(void) { @@ -45,61 +65,51 @@ static bool using_avx2(void) } /// Increment KRU counters by given time. -void defer_account(uint64_t nsec, union kr_sockaddr addr) { +void defer_account(uint64_t nsec, union kr_sockaddr *addr) { _Alignas(16) uint8_t key[16] = {0, }; uint16_t max_load = 0; uint8_t prefix = 0; - if (defer_sample_state.addr.ip.sa_family == AF_INET6) { - struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)&defer_sample_state.addr.ip; - memcpy(key, &ipv6->sin6_addr, 16); + kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll; // TODO adjust + + if (addr->ip.sa_family == AF_INET6) { + memcpy(key, &addr->ip6.sin6_addr, 16); kru_price_t prices[V6_PREFIXES_CNT]; for (size_t i = 0; i < V6_PREFIXES_CNT; i++) { - prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V6_RATE_MULT[i]; // TODO adjust + prices[i] = base_price / V6_RATE_MULT[i]; } max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix); - } else if (defer_sample_state.addr.ip.sa_family == AF_INET) { - struct sockaddr_in *ipv4 = (struct sockaddr_in *)&defer_sample_state.addr.ip; - memcpy(key, &ipv4->sin_addr, 4); // TODO append port? + } else if (addr->ip.sa_family == AF_INET) { + memcpy(key, &addr->ip4.sin_addr, 4); kru_price_t prices[V4_PREFIXES_CNT]; for (size_t i = 0; i < V4_PREFIXES_CNT; i++) { - prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V4_RATE_MULT[i]; // TODO adjust + prices[i] = base_price / V4_RATE_MULT[i]; } max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix); } - kr_log_notice(DEVEL, "%8.3f ms for %s, load: %d on /%d\n", nsec / 1000000.0, - kr_straddr(&defer_sample_state.addr.ip), max_load, prefix); + VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n", + kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix); } -/// Determine whether the request should be deferred during unwrapping. -static enum protolayer_iter_cb_result pl_defer_unwrap( - void *sess_data, void *iter_data, - struct protolayer_iter_ctx *ctx) +/// Determine priority of the request in [-1, QUEUES_CNT - 1]. +/// Lower value has higher priority, -1 should be synchronous. +static inline int classify(const union kr_sockaddr *addr) { - if (ctx->session->outgoing) - return protolayer_continue(ctx); - - defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); - _Alignas(16) uint8_t key[16] = {0, }; uint16_t max_load = 0; uint8_t prefix = 0; - if (ctx->comm->comm_addr->sa_family == AF_INET6) { - struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)ctx->comm->comm_addr; - memcpy(key, &ipv6->sin6_addr, 16); - + if (addr->ip.sa_family == AF_INET6) { + memcpy(key, &addr->ip6.sin6_addr, 16); max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix); - } else if (ctx->comm->comm_addr->sa_family == AF_INET) { - struct sockaddr_in *ipv4 = (struct sockaddr_in *)ctx->comm->comm_addr; - memcpy(key, &ipv4->sin_addr, 4); // TODO append port? - + } else if (addr->ip.sa_family == AF_INET) { + memcpy(key, &addr->ip4.sin_addr, 4); max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix); } @@ -107,39 +117,133 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( int threshold_index = 0; // 0: synchronous for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++); - kr_log_notice(DEVEL, "DEFER | addr: %s, load: %d on /%d, queue: %d\n", - kr_straddr(ctx->comm->src_addr), - max_load, prefix, threshold_index); + VERBOSE_LOG(" load %d on /%d\n", max_load, prefix); + + return threshold_index - 1; +} + + + +/// Push query to a queue according to its priority and activate idle. +static inline void push_query(struct protolayer_iter_ctx *ctx, int priority) +{ + queue_push(queues[priority], ctx); + queue_ix = MIN(queue_ix, priority); + if (waiting_requests++ <= 0) { + kr_assert(waiting_requests == 1); + uv_idle_start(&idle_handle, defer_queues_idle); + VERBOSE_LOG(" activating idle\n"); + } +} + +/// Pop and return the query with the highest priority, deactivate idle if not needed. +static inline struct protolayer_iter_ctx *pop_query(void) +{ + for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++); + if (queue_ix >= QUEUES_CNT) return NULL; + + struct protolayer_iter_ctx *ctx = queue_head(queues[queue_ix]); + queue_pop(queues[queue_ix]); + if (--waiting_requests <= 0) { + kr_assert(waiting_requests == 0); + uv_idle_stop(&idle_handle); + VERBOSE_LOG(" deactivating idle\n"); + } + return ctx; +} + + +/// Process a single deferred query (or defer again) if there is any. +/// Time accounting should have been just started, the stamp is used, accounted address is set. +static inline void process_single_deferred(void) { + struct protolayer_iter_ctx *ctx = pop_query(); + if (ctx == NULL) return; + + defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); + + struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx); + uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp; + + VERBOSE_LOG(" %s POP from %d after %4.3f ms\n", + kr_straddr(ctx->comm->comm_addr), + queue_ix, + age_ns / 1000000.0); + + if (age_ns >= REQ_TIMEOUT) { + VERBOSE_LOG(" BREAK\n"); + protolayer_break(ctx, kr_error(ETIME)); + return; + } + + int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr); + if (priority > queue_ix) { // priority dropped (got higher value) + VERBOSE_LOG(" PUSH to %d\n", priority); + push_query(ctx, priority); + return; + } + + VERBOSE_LOG(" CONTINUE\n"); + protolayer_continue(ctx); +} + +/// Unwrap: defer or process the query synchronously. +/// Time accounting should have been started, the stamp is used, accounted address is set. +static enum protolayer_iter_cb_result pl_defer_unwrap( + void *sess_data, void *iter_data, + struct protolayer_iter_ctx *ctx) +{ + if (ctx->session->outgoing) + return protolayer_continue(ctx); + + defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); + struct pl_defer_iter_data *data = iter_data; + data->req_stamp = defer_sample_state.stamp; + + VERBOSE_LOG(" %s UNWRAP\n", + kr_straddr(ctx->comm->comm_addr)); + int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr); - if (threshold_index == 0) + if (priority == -1) { + VERBOSE_LOG(" CONTINUE\n"); return protolayer_continue(ctx); + } - queue_push(queues[threshold_index - 1], ctx); + VERBOSE_LOG(" PUSH to %d\n", priority); + push_query(ctx, priority); + while (waiting_requests > MAX_WAITING_REQS) { + defer_sample_restart(); + process_single_deferred(); // possibly defers again without decreasing waiting_requests + // defer_sample_stop should be called soon outside + } return protolayer_async(); } -/// Continue processing deferred requests in libuv check phase. -static void defer_queues_check(uv_check_t *handle) { - // TODO drop too old requests and/or break processing if it lasts too long (keeping some work to another check phase) - for (size_t i = 0; i < QUEUES_CNT; i++) { - while (queue_len(queues[i]) > 0) { - defer_sample_start(); - struct protolayer_iter_ctx *ctx = queue_head(queues[i]); - queue_pop(queues[i]); - defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); - kr_log_notice(DEVEL, "DEFER continue: %s\n", - kr_straddr(ctx->comm->comm_addr)); - protolayer_continue(ctx); - defer_sample_stop(); - } +/// Idle: continue processing deferred requests. +static void defer_queues_idle(uv_idle_t *handle) { + kr_assert(waiting_requests > 0); + VERBOSE_LOG("IDLE\n"); + VERBOSE_LOG(" %d waiting\n", waiting_requests); + defer_sample_start(); + uint64_t idle_stamp = defer_sample_state.stamp; + while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) { + process_single_deferred(); + defer_sample_restart(); + } + defer_sample_stop(); // TODO skip calling and use just restart elsewhere? + udp_queue_send_all(); + if (waiting_requests > 0) { + VERBOSE_LOG(" %d waiting\n", waiting_requests); } + VERBOSE_LOG("POLL\n"); } -/// Initialize defer, incl. shared memory with KRU. -int defer_init(uv_loop_t *loop) { - struct defer header = { // TODO adjust hardcoded values - .capacity = 1 << 10, + +/// Initialize shared memory, queues, idle. +int defer_init(uv_loop_t *loop) +{ + struct defer header = { + .capacity = KRU_CAPACITY, .max_decay = MAX_DECAY, .using_avx2 = using_avx2(), }; @@ -175,9 +279,7 @@ int defer_init(uv_loop_t *loop) { for (size_t i = 0; i < QUEUES_CNT; i++) queue_init(queues[i]); - protolayer_globals[PROTOLAYER_TYPE_DEFER].unwrap = pl_defer_unwrap; - uv_check_init(loop, &check_handle); - uv_check_start(&check_handle, defer_queues_check); + uv_idle_init(loop, &idle_handle); return 0; fail: @@ -192,3 +294,13 @@ void defer_deinit(void) mmapped_deinit(&defer_mmapped); defer = NULL; } + +/// Initialize protolayer. +__attribute__((constructor)) +static void defer_protolayers_init(void) +{ + protolayer_globals[PROTOLAYER_TYPE_DEFER] = (struct protolayer_globals){ + .iter_size = sizeof(struct pl_defer_iter_data), + .unwrap = pl_defer_unwrap, + }; +} diff --git a/daemon/defer.h b/daemon/defer.h index 7ded12880..1856972a8 100644 --- a/daemon/defer.h +++ b/daemon/defer.h @@ -10,7 +10,7 @@ int defer_init(uv_loop_t *loop); void defer_deinit(void); /// Increment KRU counters by the given time. -void defer_account(uint64_t nsec, union kr_sockaddr addr); +void defer_account(uint64_t nsec, union kr_sockaddr *addr); typedef struct { int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1 @@ -74,6 +74,7 @@ static inline void defer_sample_stop(void) if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird if (--defer_sample_state.is_accounting) return; + if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return; const uint64_t elapsed = get_stamp() - defer_sample_state.stamp; @@ -82,5 +83,22 @@ static inline void defer_sample_stop(void) // TODO: some queries of internal origin have suspicioiusly high numbers. // We won't be really accounting those, but it might suggest some other issue. - defer_account(elapsed, defer_sample_state.addr); + defer_account(elapsed, &defer_sample_state.addr); +} + +/// Stop accounting if active, then start again. Uses just one stamp. +static inline void defer_sample_restart(void) +{ + if (!defer) return; + + uint64_t stamp = get_stamp(); + + if (defer_sample_state.is_accounting > 0) { + const uint64_t elapsed = stamp - defer_sample_state.stamp; + defer_account(elapsed, &defer_sample_state.addr); + } + + defer_sample_state.stamp = stamp; + defer_sample_state.addr.ip.sa_family = AF_UNSPEC; + defer_sample_state.is_accounting = 1; } diff --git a/daemon/mmapped.h b/daemon/mmapped.h index dd55903a5..41414cba9 100644 --- a/daemon/mmapped.h +++ b/daemon/mmapped.h @@ -22,5 +22,5 @@ int mmapped_init(struct mmapped *mmapped, const char *mmap_file, size_t size, vo * Returns zero on success and kr_error(errno) on system error. */ int mmapped_init_continue(struct mmapped *mmapped); -/* Free mmapped memory and truncate underlying file to zero size unless it is used by other processes. */ +/* Free mmapped memory and, unless the underlying file is used by other processes, truncate it to zero size. */ void mmapped_deinit(struct mmapped *mmapped); diff --git a/daemon/session2.c b/daemon/session2.c index 39617b0ab..798d6f4d4 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -348,6 +348,11 @@ static inline struct protolayer_data *protolayer_iter_data_get( return (struct protolayer_data *)(ctx->data + offset); } +void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx) +{ + return protolayer_iter_data_get(ctx, ctx->layer_ix); +} + static inline ssize_t session2_get_protocol( struct session2 *s, enum protolayer_type protocol) { diff --git a/daemon/session2.h b/daemon/session2.h index 37a9ec9c5..f10ed387b 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -525,6 +525,11 @@ size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue); * queue iterators, as it does not need to iterate through the whole queue. */ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue); +/** Gets layer-specific iteration data for the last processed layer. + * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ +void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx); + + /** Layer-specific data - the generic struct. To be added as the first member of * each specific struct. */ struct protolayer_data { diff --git a/daemon/udp_queue.c b/daemon/udp_queue.c index 68d67ec60..0e829d2d6 100644 --- a/daemon/udp_queue.c +++ b/daemon/udp_queue.c @@ -86,15 +86,20 @@ static void udp_queue_send(int fd) q->len = 0; } -/** Periodical callback to send all queued packets. */ -static void udp_queue_check(uv_check_t *handle) -{ +/** Send all queued packets. */ +void udp_queue_send_all(void) { for (int i = 0; i < state.waiting_fds.len; ++i) { udp_queue_send(state.waiting_fds.at[i]); } state.waiting_fds.len = 0; } +/** Periodical callback to send all queued packets. */ +static void udp_queue_check(uv_check_t *handle) +{ + udp_queue_send_all(); +} + int udp_queue_init_global(uv_loop_t *loop) { int ret = uv_check_init(loop, &state.check_handle); diff --git a/daemon/udp_queue.h b/daemon/udp_queue.h index ed0a32699..cb01f90a2 100644 --- a/daemon/udp_queue.h +++ b/daemon/udp_queue.h @@ -17,3 +17,5 @@ int udp_queue_init_global(uv_loop_t *loop); void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len, udp_queue_cb cb, void *baton); +/** Send all queued packets immediatelly. */ +void udp_queue_send_all(void); -- 2.47.2