From: Lukáš Ondráček Date: Thu, 10 Oct 2024 17:08:44 +0000 (+0200) Subject: daemon/defer: defer or close whole streams at once X-Git-Tag: v6.0.9~1^2~18 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=ad604a79a141f605d39251e609b4e11e17918673;p=thirdparty%2Fknot-resolver.git daemon/defer: defer or close whole streams at once --- diff --git a/daemon/defer.c b/daemon/defer.c index 38e97074e..2d78786ad 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -80,6 +80,12 @@ static inline void phase_account(uint64_t nsec) { } } +struct pl_defer_sess_data { + struct protolayer_data h; + protolayer_iter_ctx_queue_t queue; // properly ordered sequence of deferred packets, for stream only + // the first ctx in the queue is also in a defer queue +}; + struct pl_defer_iter_data { struct protolayer_data h; uint64_t req_stamp; // time when request was received, uses get_stamp() @@ -189,9 +195,13 @@ static inline int classify(const union kr_sockaddr *addr, bool stream) /// Push query to a queue according to its priority and activate idle. -static inline void push_query(struct protolayer_iter_ctx *ctx, int priority) +static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end) { - queue_push(queues[priority], ctx); + if (to_head_end) { + queue_push_head(queues[priority], ctx); + } else { + queue_push(queues[priority], ctx); + } queue_ix = MIN(queue_ix, priority); if (waiting_requests++ <= 0) { kr_assert(waiting_requests == 1); @@ -248,8 +258,9 @@ static inline void process_single_deferred(void) { defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); phase_accounting = true; - struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx); - uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp; + struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); + struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); + uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp; VERBOSE_LOG(" %s POP from %d after %4.3f ms\n", kr_straddr(ctx->comm->comm_addr), @@ -258,22 +269,49 @@ static inline void process_single_deferred(void) { if (ctx->session->closing) { VERBOSE_LOG(" BREAK (session is closing)\n"); + if (ctx->session->stream) { + queue_pop(sdata->queue); + while (queue_len(sdata->queue) > 0) { + protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist + ctx = queue_head(sdata->queue); + queue_pop(sdata->queue); + } + } protolayer_break(ctx, kr_error(ECANCELED)); return; } + if (age_ns >= REQ_TIMEOUT) { VERBOSE_LOG(" BREAK (timeout)\n"); - protolayer_break(ctx, kr_error(ETIME)); + if (ctx->session->stream) { + while (queue_len(sdata->queue) > 0) { + ctx = queue_head(sdata->queue); + queue_pop(sdata->queue); + protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet + } + session2_force_close(ctx->session); + } else { + protolayer_break(ctx, kr_error(ETIME)); + } return; } int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); if (priority > queue_ix) { // priority dropped (got higher value) VERBOSE_LOG(" PUSH to %d\n", priority); - push_query(ctx, priority); + push_query(ctx, priority, false); return; } + if (ctx->session->stream) { + kr_assert(queue_head(sdata->queue) == ctx); + queue_pop(sdata->queue); + if (queue_len(sdata->queue) > 0) { + VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority); + push_query(queue_head(sdata->queue), priority, true); + } + } + VERBOSE_LOG(" CONTINUE\n"); protolayer_continue(ctx); } @@ -289,10 +327,18 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); struct pl_defer_iter_data *data = iter_data; + struct pl_defer_sess_data *sdata = sess_data; data->req_stamp = defer_sample_state.stamp; VERBOSE_LOG(" %s UNWRAP\n", kr_straddr(ctx->comm->comm_addr)); + + if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred + queue_push(sdata->queue, ctx); + VERBOSE_LOG(" PUSH as follow-up\n"); + return protolayer_async(); + } + int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); if (priority == -1) { @@ -302,8 +348,11 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( } VERBOSE_LOG(" PUSH to %d\n", priority); - push_query(ctx, priority); - while (waiting_requests > MAX_WAITING_REQS) { + if (ctx->session->stream) { + queue_push(sdata->queue, ctx); + } + push_query(ctx, priority, false); + while (waiting_requests > MAX_WAITING_REQS) { // TODO follow-up stream packets are not counted here defer_sample_restart(); process_single_deferred(); // possibly defers again without decreasing waiting_requests // defer_sample_stop should be called soon outside @@ -389,6 +438,13 @@ fail: return ret; } +/// Initialize session queue +int pl_defer_sess_init(struct session2 *session, void *data, void *param) { + struct pl_defer_sess_data *sdata = data; + queue_init(sdata->queue); + return 0; +} + /// Deinitialize shared memory. void defer_deinit(void) { @@ -402,6 +458,8 @@ static void defer_protolayers_init(void) { protolayer_globals[PROTOLAYER_TYPE_DEFER] = (struct protolayer_globals){ .iter_size = sizeof(struct pl_defer_iter_data), + .sess_size = sizeof(struct pl_defer_sess_data), + .sess_init = pl_defer_sess_init, .unwrap = pl_defer_unwrap, }; } diff --git a/daemon/session2.c b/daemon/session2.c index 6761127a7..780a2fec0 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -353,6 +353,11 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_iter_data_get(ctx, ctx->layer_ix); } +void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx) +{ + return protolayer_sess_data_get(ctx->session, ctx->layer_ix); +} + static inline ssize_t session2_get_protocol( struct session2 *s, enum protolayer_type protocol) { diff --git a/daemon/session2.h b/daemon/session2.h index 4ea42c307..a95a54c1d 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -539,6 +539,10 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue); * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx); +/** Gets layer-specific session data for the last processed layer. + * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ +void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx); + /** Layer-specific data - the generic struct. To be added as the first member of * each specific struct. */