]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: defer stream EOF if data are deferred
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 11 Nov 2024 18:13:27 +0000 (19:13 +0100)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 20 Nov 2024 22:54:03 +0000 (23:54 +0100)
daemon/defer.c
daemon/session2.c

index 20f8fb557b2e2207cd97c36a0b29e36c8628950c..c561bc52303f9f7d238077465285ec71d7a8c133 100644 (file)
@@ -168,7 +168,7 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
        }
 
        VERBOSE_LOG("  %s ADD %4.3f ms -> load: %d on /%d\n",
-                       kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
+                       kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix);
 }
 
 /// Determine priority of the request in [-1, QUEUES_CNT - 1].
@@ -299,13 +299,14 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
 static inline void process_single_deferred(void)
 {
        struct protolayer_iter_ctx *ctx = pop_query();
-       if (ctx == NULL) return;
+       if (kr_fails_assert(ctx)) return;
 
        defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
        phase_accounting = true;
 
        struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
        struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+       struct session2 *session = ctx->session;
        uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
 
        VERBOSE_LOG("  %s POP from %d after %4.3f ms\n",
@@ -332,17 +333,34 @@ static inline void process_single_deferred(void)
                return;
        }
 
+       bool eof = false;
        if (ctx->session->stream) {
                kr_assert(queue_head(sdata->queue) == ctx);
                queue_pop(sdata->queue);
+               while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event
+                       eof = true;
+                       queue_pop(sdata->queue);
+               }
                if (queue_len(sdata->queue) > 0) {
                        VERBOSE_LOG("    PUSH follow-up to head of %d\n", priority);
                        push_query(queue_head(sdata->queue), priority, true);
                }
        }
 
+       if (eof) {
+               // Keep session alive even if it is somehow force-closed during continuation.
+               // TODO Is it needed?
+               session->ref_count++;
+       }
+
        VERBOSE_LOG("    CONTINUE\n");
        protolayer_continue(ctx);
+
+       if (eof) {
+               VERBOSE_LOG("    CONTINUE EOF event\n");
+               session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL);
+               session2_unhandle(session); // decrease ref_count
+       }
 }
 
 /// Break expired requests at the beginning of queues, uses current stamp.
@@ -371,10 +389,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
                void *sess_data, void *iter_data,
                struct protolayer_iter_ctx *ctx)
 {
-       if (!defer)
-               return protolayer_continue(ctx);
-
-       if (ctx->session->outgoing)
+       if (!defer || ctx->session->outgoing)
                return protolayer_continue(ctx);
 
        defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
@@ -413,6 +428,32 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        return protolayer_async();
 }
 
+/// Unwrap event: EOF event may be deferred here, other events pass synchronously.
+static enum protolayer_event_cb_result pl_defer_event_unwrap(
+               enum protolayer_event_type event, void **baton,
+               struct session2 *session, void *sess_data)
+{
+       if (!defer || session->outgoing)
+               return PROTOLAYER_EVENT_PROPAGATE;
+
+       struct pl_defer_sess_data *sdata = sess_data;
+       if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0)) {
+               // defer EOF event if unprocessed data remain, baton is dropped if any
+               queue_push(sdata->queue, NULL);
+               VERBOSE_LOG("  %s event %s deferred\n",
+                               session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+                               protolayer_event_name(event));
+               return PROTOLAYER_EVENT_CONSUME;
+       }
+
+       VERBOSE_LOG("  %s event %s passes through synchronously%s%s\n",
+                       session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+                       protolayer_event_name(event),
+                       queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "",
+                       *baton ? " (with baton)" : "");
+       return PROTOLAYER_EVENT_PROPAGATE;
+}
+
 /// Idle: continue processing deferred requests.
 static void defer_queues_idle(uv_idle_t *handle)
 {
@@ -535,5 +576,6 @@ static void defer_protolayers_init(void)
                .sess_size = sizeof(struct pl_defer_sess_data),
                .sess_init = pl_defer_sess_init,
                .unwrap = pl_defer_unwrap,
+               .event_unwrap = pl_defer_event_unwrap,
        };
 }
index c372bfb53f309145af316b44b2593302ea5ec489..6eb91bf457a8d88a92a97910dbffbbcbbd1cff07 100644 (file)
@@ -603,7 +603,7 @@ static int session2_submit(
 {
        if (session->closing)
                return kr_error(ECANCELED);
-       if (session->ref_count >= INT_MAX)
+       if (session->ref_count >= INT_MAX - 1)
                return kr_error(ETOOMANYREFS);
        if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
                return kr_error(EFAULT);