]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: limit deferred queries by memory usage docs-develop-defe-x6j6qe/deployments/5712
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 20 Nov 2024 23:15:29 +0000 (00:15 +0100)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 20 Nov 2024 23:25:34 +0000 (00:25 +0100)
daemon/defer.c
daemon/session2.c
daemon/session2.h

index c561bc52303f9f7d238077465285ec71d7a8c133..3835bceb6ef67680d46072dfbea700cfaeb5dcc7 100644 (file)
 #define IDLE_TIMEOUT          1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
 #define PHASE_UDP_TIMEOUT      400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
 #define PHASE_NON_UDP_TIMEOUT  400000 // ns (THREAD_CPUTIME);    after timeout or emptying queue
-#define MAX_WAITING_REQS        10000 // if exceeded, process single deferred request immediatelly in poll phase
-       // TODO measure memory usage instead
+#define MAX_WAITING_REQS_SIZE (64 * 1024 * 1024)  // bytes; if exceeded, some deferred requests are processed in poll phase
+       // single TCP allocates more than 64KiB wire buffer
+       // TODO check whether all important allocations are counted;
+       //   different things are not counted: tasks and subsessions (not deferred after created), uv handles, queues overhead, ...;
+       //   payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams)
 
 #define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)
 
@@ -81,6 +84,7 @@ static void defer_queues_idle(uv_idle_t *handle);
 
 protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
 int waiting_requests = 0;
+ptrdiff_t waiting_requests_size = 0;  // signed for non-negativeness asserts
 int queue_ix = QUEUES_CNT;  // MIN( last popped queue, first non-empty queue )
 
 enum phase {
@@ -113,11 +117,13 @@ struct pl_defer_sess_data {
        struct protolayer_data h;
        protolayer_iter_ctx_queue_t queue;  // properly ordered sequence of deferred packets, for stream only
                // the first ctx in the queue is also in a defer queue
+       size_t size;
 };
 
 struct pl_defer_iter_data {
        struct protolayer_data h;
        uint64_t req_stamp;   // time when request was received, uses get_stamp()
+       size_t size;
 };
 
 /// Return whether we're using optimized variant right now.
@@ -281,17 +287,23 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
 {
        if (ctx->session->stream) {
                struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+               waiting_requests_size -= sdata->size;
                if (!ctx->session->closing) {
                        session2_force_close(ctx->session); // session is not freed here as iter contexts exist
                }
                queue_pop(sdata->queue);
                while (queue_len(sdata->queue) > 0) {
+                       struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+                       waiting_requests_size -= idata->size;
                        protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
                        ctx = queue_head(sdata->queue);
                        queue_pop(sdata->queue);
                }
        }
+       struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+       waiting_requests_size -= idata->size;
        protolayer_break(ctx, kr_error(err));
+       kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);
 }
 
 /// Process a single deferred query (or defer again) if there is any.
@@ -344,9 +356,14 @@ static inline void process_single_deferred(void)
                if (queue_len(sdata->queue) > 0) {
                        VERBOSE_LOG("    PUSH follow-up to head of %d\n", priority);
                        push_query(queue_head(sdata->queue), priority, true);
+               } else {
+                       waiting_requests_size -= sdata->size;
                }
        }
 
+       waiting_requests_size -= idata->size;
+       kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);
+
        if (eof) {
                // Keep session alive even if it is somehow force-closed during continuation.
                // TODO Is it needed?
@@ -402,6 +419,8 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
 
        if (queue_len(sdata->queue) > 0) {  // stream with preceding packet already deferred
                queue_push(sdata->queue, ctx);
+               waiting_requests_size += data->size = protolayer_iter_size_est(ctx, false);
+                       // payload counted in session wire buffer
                VERBOSE_LOG("    PUSH as follow-up\n");
                return protolayer_async();
        }
@@ -417,11 +436,14 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        VERBOSE_LOG("    PUSH to %d\n", priority);
        if (ctx->session->stream) {
                queue_push(sdata->queue, ctx);
+               waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session);
        }
        push_query(ctx, priority, false);
-       while (waiting_requests > MAX_WAITING_REQS) {  // TODO follow-up stream packets are not counted here
+       waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
+               // for stream, payload is counted in session wire buffer
+       while (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
                defer_sample_restart();
-               process_single_deferred();  // possibly defers again without decreasing waiting_requests
+               process_single_deferred();  // possibly defers again without decreasing waiting_requests_size
                // defer_sample_stop should be called soon outside
        }
 
index 6eb91bf457a8d88a92a97910dbffbbcbbd1cff07..eb3bedb3c98911987a645b8a4a5f342bc9d878a8 100644 (file)
@@ -378,6 +378,19 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
        return protolayer_iter_data_get(ctx, ctx->layer_ix);
 }
 
+size_t protolayer_sess_size_est(struct session2 *s)
+{
+       return s->session_size + s->wire_buf.size;
+}
+
+size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload)
+{
+       size_t size = ctx->session->iter_ctx_size;
+       if (incl_payload)
+               size += protolayer_payload_size(&ctx->payload);
+       return size;
+}
+
 static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
 {
        unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP)
@@ -852,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type,
 
                .proto = proto,
                .iter_ctx_size = iter_ctx_size,
+               .session_size = session_size,
        };
 
        memcpy(&s->layer_data, offsets, sizeof(offsets));
index 73b88d32e4f09c4823503c26cc95b4ee91927f2c..5228ad864dfdb6362b41f26c3c81fa76f290e6ca 100644 (file)
@@ -549,6 +549,13 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
  * To be used after returning from its callback for async continuation but before calling protolayer_continue. */
 void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx);
 
+/** Gets rough memory footprint estimate of session/iteration for use in defer.
+ * Different, hopefully minor, allocations are not counted here;
+ * tasks and subsessions are also not counted;
+ * read the code before using elsewhere. */
+size_t protolayer_sess_size_est(struct session2 *s);
+size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload);
+
 /** Layer-specific data - the generic struct. To be added as the first member of
  * each specific struct. */
 struct protolayer_data {
@@ -874,6 +881,9 @@ struct session2 {
         * (`struct protolayer_iter_ctx`), including layer-specific data. */
        size_t iter_ctx_size;
 
+       /** The size of this session struct. */
+       size_t session_size;
+
        /** The following flexible array has basically this structure:
         *
         * struct {