]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: defer or close whole streams at once docs-develop-rrl-8r8r8r/deployments/5316
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Thu, 10 Oct 2024 17:08:44 +0000 (19:08 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Thu, 10 Oct 2024 17:08:44 +0000 (19:08 +0200)
daemon/defer.c
daemon/session2.c
daemon/session2.h

index 38e97074e83b5e1864c147fc10b35383abc368d6..2d78786adc82f4d9a4fb1332aaf159f807a3a722 100644 (file)
@@ -80,6 +80,12 @@ static inline void phase_account(uint64_t nsec) {
        }
 }
 
+struct pl_defer_sess_data {
+       struct protolayer_data h;
+       protolayer_iter_ctx_queue_t queue;  // properly ordered sequence of deferred packets, for stream only
+               // the first ctx in the queue is also in a defer queue
+};
+
 struct pl_defer_iter_data {
        struct protolayer_data h;
        uint64_t req_stamp;   // time when request was received, uses get_stamp()
@@ -189,9 +195,13 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
 
 
 /// Push query to a queue according to its priority and activate idle.
-static inline void push_query(struct protolayer_iter_ctx *ctx, int priority)
+static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
 {
-       queue_push(queues[priority], ctx);
+       if (to_head_end) {
+               queue_push_head(queues[priority], ctx);
+       } else {
+               queue_push(queues[priority], ctx);
+       }
        queue_ix = MIN(queue_ix, priority);
        if (waiting_requests++ <= 0) {
                kr_assert(waiting_requests == 1);
@@ -248,8 +258,9 @@ static inline void process_single_deferred(void) {
        defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
        phase_accounting = true;
 
-       struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
-       uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
+       struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+       struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+       uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
 
        VERBOSE_LOG("  %s POP from %d after %4.3f ms\n",
                        kr_straddr(ctx->comm->comm_addr),
@@ -258,22 +269,49 @@ static inline void process_single_deferred(void) {
 
        if (ctx->session->closing) {
                VERBOSE_LOG("    BREAK (session is closing)\n");
+               if (ctx->session->stream) {
+                       queue_pop(sdata->queue);
+                       while (queue_len(sdata->queue) > 0) {
+                               protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist
+                               ctx = queue_head(sdata->queue);
+                               queue_pop(sdata->queue);
+                       }
+               }
                protolayer_break(ctx, kr_error(ECANCELED));
                return;
        }
+
        if (age_ns >= REQ_TIMEOUT) {
                VERBOSE_LOG("    BREAK (timeout)\n");
-               protolayer_break(ctx, kr_error(ETIME));
+               if (ctx->session->stream) {
+                       while (queue_len(sdata->queue) > 0) {
+                               ctx = queue_head(sdata->queue);
+                               queue_pop(sdata->queue);
+                               protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet
+                       }
+                       session2_force_close(ctx->session);
+               } else {
+                       protolayer_break(ctx, kr_error(ETIME));
+               }
                return;
        }
 
        int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
        if (priority > queue_ix) {  // priority dropped (got higher value)
                VERBOSE_LOG("    PUSH to %d\n", priority);
-               push_query(ctx, priority);
+               push_query(ctx, priority, false);
                return;
        }
 
+       if (ctx->session->stream) {
+               kr_assert(queue_head(sdata->queue) == ctx);
+               queue_pop(sdata->queue);
+               if (queue_len(sdata->queue) > 0) {
+                       VERBOSE_LOG("    PUSH follow-up to head of %d\n", priority);
+                       push_query(queue_head(sdata->queue), priority, true);
+               }
+       }
+
        VERBOSE_LOG("    CONTINUE\n");
        protolayer_continue(ctx);
 }
@@ -289,10 +327,18 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
 
        defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
        struct pl_defer_iter_data *data = iter_data;
+       struct pl_defer_sess_data *sdata = sess_data;
        data->req_stamp = defer_sample_state.stamp;
 
        VERBOSE_LOG("  %s UNWRAP\n",
                        kr_straddr(ctx->comm->comm_addr));
+
+       if (queue_len(sdata->queue) > 0) {  // stream with preceding packet already deferred
+               queue_push(sdata->queue, ctx);
+               VERBOSE_LOG("    PUSH as follow-up\n");
+               return protolayer_async();
+       }
+
        int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
 
        if (priority == -1) {
@@ -302,8 +348,11 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        }
 
        VERBOSE_LOG("    PUSH to %d\n", priority);
-       push_query(ctx, priority);
-       while (waiting_requests > MAX_WAITING_REQS) {
+       if (ctx->session->stream) {
+               queue_push(sdata->queue, ctx);
+       }
+       push_query(ctx, priority, false);
+       while (waiting_requests > MAX_WAITING_REQS) {  // TODO follow-up stream packets are not counted here
                defer_sample_restart();
                process_single_deferred();  // possibly defers again without decreasing waiting_requests
                // defer_sample_stop should be called soon outside
@@ -389,6 +438,13 @@ fail:
        return ret;
 }
 
+/// Initialize session queue
+int pl_defer_sess_init(struct session2 *session, void *data, void *param) {
+       struct pl_defer_sess_data *sdata = data;
+       queue_init(sdata->queue);
+       return 0;
+}
+
 /// Deinitialize shared memory.
 void defer_deinit(void)
 {
@@ -402,6 +458,8 @@ static void defer_protolayers_init(void)
 {
        protolayer_globals[PROTOLAYER_TYPE_DEFER] = (struct protolayer_globals){
                .iter_size = sizeof(struct pl_defer_iter_data),
+               .sess_size = sizeof(struct pl_defer_sess_data),
+               .sess_init = pl_defer_sess_init,
                .unwrap = pl_defer_unwrap,
        };
 }
index 6761127a78309c61f76ce8bd56cbc6322cb33558..780a2fec0e6ad0400b49b98ee9dc40962c6126ab 100644 (file)
@@ -353,6 +353,11 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
        return protolayer_iter_data_get(ctx, ctx->layer_ix);
 }
 
+void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx)
+{
+       return protolayer_sess_data_get(ctx->session, ctx->layer_ix);
+}
+
 static inline ssize_t session2_get_protocol(
                struct session2 *s, enum protolayer_type protocol)
 {
index 4ea42c307f7a750a089a3e8f1ca9e70bd0f5d8f8..a95a54c1d0299ed21fb45036839071d4a57e7148 100644 (file)
@@ -539,6 +539,10 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue);
  * To be used after returning from its callback for async continuation but before calling protolayer_continue. */
 void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx);
 
+/** Gets layer-specific session data for the last processed layer.
+ * To be used after returning from its callback for async continuation but before calling protolayer_continue. */
+void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
+
 
 /** Layer-specific data - the generic struct. To be added as the first member of
  * each specific struct. */