From: Oto Šťáva Date: Fri, 31 May 2024 10:09:42 +0000 (+0200) Subject: daemon/session2: fix asynchronous layer iterations X-Git-Tag: v6.0.8~14^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0a1f0024e0cdc3d18ac508b7ce759316338252bb;p=thirdparty%2Fknot-resolver.git daemon/session2: fix asynchronous layer iterations There were a few bugs in the protolayer system that prevented us from pausing iteration and resuming it properly. This commit should hopefully resolve them. --- diff --git a/daemon/io.c b/daemon/io.c index d219f592b..b6b289aea 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -151,6 +151,17 @@ static enum protolayer_event_cb_result pl_udp_event_wrap( return PROTOLAYER_EVENT_PROPAGATE; } +static int pl_tcp_sess_init(struct session2 *session, + void *data, void *param) +{ + struct sockaddr *peer = session2_get_peer(session); + session->comm_storage = (struct comm_info) { + .comm_addr = peer, + .src_addr = peer + }; + return 0; +} + static enum protolayer_event_cb_result pl_tcp_event_wrap( enum protolayer_event_type event, void **baton, struct session2 *session, void *sess_data) @@ -173,6 +184,7 @@ void io_protolayers_init(void) }; protolayer_globals[PROTOLAYER_TYPE_TCP] = (struct protolayer_globals){ + .sess_init = pl_tcp_sess_init, .event_wrap = pl_tcp_event_wrap, }; } diff --git a/daemon/proxyv2.c b/daemon/proxyv2.c index 5f2147ce8..d080d0c65 100644 --- a/daemon/proxyv2.c +++ b/daemon/proxyv2.c @@ -330,7 +330,7 @@ static enum protolayer_iter_cb_result pl_proxyv2_dgram_unwrap( char *data = ctx->payload.buffer.buf; ssize_t data_len = ctx->payload.buffer.len; - struct comm_info *comm = &ctx->comm; + struct comm_info *comm = ctx->comm; if (!s->outgoing && proxy_header_present(data, data_len)) { if (!proxy_allowed(comm->comm_addr)) { kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n", @@ -385,17 +385,6 @@ struct pl_proxyv2_stream_sess_data { bool has_proxy : 1; }; -static int pl_proxyv2_stream_sess_init(struct session2 *session, - void *data, void *param) -{ - struct sockaddr *peer = session2_get_peer(session); - session->comm = (struct comm_info) { - .comm_addr = peer, - .src_addr = peer - }; - return 0; -} - static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { @@ -410,7 +399,7 @@ static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap( char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */ ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf); - struct comm_info *comm = &ctx->session->comm; + struct comm_info *comm = ctx->comm; if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) { if (!proxy_allowed(comm->src_addr)) { if (kr_log_is_debug(IO, NULL)) { @@ -458,7 +447,6 @@ static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap( } tcp->had_data = true; - ctx->comm = ctx->session->comm; return protolayer_continue(ctx); } @@ -472,7 +460,6 @@ void proxy_protolayers_init(void) protolayer_globals[PROTOLAYER_TYPE_PROXYV2_STREAM] = (struct protolayer_globals){ .sess_size = sizeof(struct pl_proxyv2_stream_sess_data), - .sess_init = pl_proxyv2_stream_sess_init, .unwrap = pl_proxyv2_stream_unwrap, }; } diff --git a/daemon/session2.c b/daemon/session2.c index 1a9068e4a..19ea42dc3 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -406,7 +406,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret) ctx->layer_ix, layer_name_ctx(ctx), ctx->status); if (ctx->finished_cb) - ctx->finished_cb(ret, s, &ctx->comm, + ctx->finished_cb(ret, s, ctx->comm, ctx->finished_cb_baton); mm_ctx_delete(&ctx->pool); @@ -440,12 +440,12 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx) session2_transport_push(session, ctx->payload.buffer.buf, ctx->payload.buffer.len, ctx->payload.short_lived, - &ctx->comm, protolayer_push_finished, ctx); + ctx->comm, protolayer_push_finished, ctx); } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { session2_transport_pushv(session, ctx->payload.iovec.iov, ctx->payload.iovec.cnt, ctx->payload.short_lived, - &ctx->comm, protolayer_push_finished, ctx); + ctx->comm, protolayer_push_finished, ctx); } else { kr_assert(false && "Invalid payload type"); return kr_error(EINVAL); @@ -473,8 +473,9 @@ static void protolayer_payload_ensure_long_lived(struct protolayer_iter_ctx *ctx /** Processes as many layers as possible synchronously, returning when either * a layer has gone asynchronous, or when the whole sequence has finished. * - * May be called multiple times on the same `ctx` to continue processing - * after an asynchronous operation. */ + * May be called multiple times on the same `ctx` to continue processing after + * an asynchronous operation - user code will do this via *layer sequence return + * functions*. */ static int protolayer_step(struct protolayer_iter_ctx *ctx) { while (true) { @@ -484,38 +485,52 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx) enum protolayer_type protocol = protolayer_grps[ctx->session->proto].layers[ctx->layer_ix]; struct protolayer_globals *globals = &protolayer_globals[protocol]; + bool was_async = ctx->async_mode; ctx->async_mode = false; - ctx->status = 0; - ctx->action = PROTOLAYER_ITER_ACTION_NULL; - protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP) - ? globals->unwrap : globals->wrap; + /* Basically if we went asynchronous, we want to "resume" from + * underneath this `if` block. */ + if (!was_async) { + ctx->status = 0; + ctx->action = PROTOLAYER_ITER_ACTION_NULL; - if (ctx->session->closing) { - return protolayer_iter_ctx_finish( - ctx, kr_error(ECANCELED)); - } + protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP) + ? globals->unwrap : globals->wrap; - if (cb) { - struct protolayer_data *sess_data = protolayer_sess_data_get( - ctx->session, ctx->layer_ix); - struct protolayer_data *iter_data = protolayer_iter_data_get( - ctx, ctx->layer_ix); - enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx); - if (kr_fails_assert(result == PROTOLAYER_ITER_CB_RESULT_MAGIC)) { - /* Callback did not use a continuation function to return. */ - return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL)); + if (ctx->session->closing) { + return protolayer_iter_ctx_finish( + ctx, kr_error(ECANCELED)); } - } else { - ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE; - } + if (cb) { + struct protolayer_data *sess_data = protolayer_sess_data_get( + ctx->session, ctx->layer_ix); + struct protolayer_data *iter_data = protolayer_iter_data_get( + ctx, ctx->layer_ix); + enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx); + if (kr_fails_assert(result == PROTOLAYER_ITER_CB_RESULT_MAGIC)) { + /* Callback did not use a *layer + * sequence return function* (see + * glossary). */ + return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL)); + } + } else { + ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE; + } + + if (!ctx->action) { + /* We're going asynchronous - the next step is + * probably going to be from some sort of a + * callback and we will "resume" from underneath + * this `if` block. */ + ctx->async_mode = true; + protolayer_payload_ensure_long_lived(ctx); + return PROTOLAYER_RET_ASYNC; + } + } - if (!ctx->action) { - /* Next step is from a callback */ - ctx->async_mode = true; - protolayer_payload_ensure_long_lived(ctx); - return PROTOLAYER_RET_ASYNC; + if (kr_fails_assert(ctx->action)) { + return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL)); } if (ctx->action == PROTOLAYER_ITER_ACTION_BREAK) { @@ -525,7 +540,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx) if (kr_fails_assert(ctx->status == 0)) { /* Status should be zero without a BREAK. */ - return protolayer_iter_ctx_finish(ctx, kr_error(ECANCELED)); + return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL)); } if (ctx->action == PROTOLAYER_ITER_ACTION_CONTINUE) { @@ -566,6 +581,10 @@ static int session2_submit( if (kr_fails_assert(session->proto < KR_PROTO_COUNT)) return kr_error(EFAULT); + bool had_comm_param = (comm != NULL); + if (!had_comm_param) + comm = &session->comm_storage; + struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size); kr_require(ctx); @@ -578,13 +597,33 @@ static int session2_submit( *ctx = (struct protolayer_iter_ctx) { .payload = payload, - .comm = (comm) ? *comm : session->comm, .direction = direction, .layer_ix = layer_ix, .session = session, .finished_cb = cb, .finished_cb_baton = baton }; + if (had_comm_param) { + struct comm_addr_storage *addrst = &ctx->comm_addr_storage; + if (comm->src_addr) { + memcpy(&addrst->src_addr.ip, comm->src_addr, + kr_sockaddr_len(comm->src_addr)); + ctx->comm_storage.src_addr = &addrst->src_addr.ip; + } + if (comm->comm_addr) { + memcpy(&addrst->comm_addr.ip, comm->comm_addr, + kr_sockaddr_len(comm->comm_addr)); + ctx->comm_storage.comm_addr = &addrst->comm_addr.ip; + } + if (comm->dst_addr) { + memcpy(&addrst->dst_addr.ip, comm->dst_addr, + kr_sockaddr_len(comm->dst_addr)); + ctx->comm_storage.dst_addr = &addrst->dst_addr.ip; + } + ctx->comm = &ctx->comm_storage; + } else { + ctx->comm = &session->comm_storage; + } mm_ctx_mempool(&ctx->pool, CPU_PAGE_SIZE); const struct protolayer_grp *grp = &protolayer_grps[session->proto]; @@ -616,25 +655,26 @@ static void *get_init_param(enum protolayer_type p, return NULL; } -enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx) +/** Called by *Layer sequence return functions* to proceed with protolayer + * processing. If the */ +static inline void maybe_async_do_step(struct protolayer_iter_ctx *ctx) { - if (ctx->async_mode) { - protolayer_iter_ctx_next(ctx); + if (ctx->async_mode) protolayer_step(ctx); - } else { - ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE; - } +} + +enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx) +{ + ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE; + maybe_async_do_step(ctx); return PROTOLAYER_ITER_CB_RESULT_MAGIC; } enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status) { ctx->status = status; - if (ctx->async_mode) { - protolayer_iter_ctx_finish(ctx, PROTOLAYER_RET_NORMAL); - } else { - ctx->action = PROTOLAYER_ITER_ACTION_BREAK; - } + ctx->action = PROTOLAYER_ITER_ACTION_BREAK; + maybe_async_do_step(ctx); return PROTOLAYER_ITER_CB_RESULT_MAGIC; } diff --git a/daemon/session2.h b/daemon/session2.h index b43d8fc13..931dd6ee0 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -22,6 +22,13 @@ * processing, it is also the lifetime of `struct protolayer_iter_ctx` and * layer-specific data contained therein. * + * Layer sequence return function: + * - One of `protolayer_break()`, `protolayer_continue()`, or + * `protolayer_async()` - a function that a protolayer's `_wrap` or `_unwrap` + * callback should call to get its return value. They may either be called + * synchronously directly in the callback to end/pause the processing, or, if + * the processing went asynchronous, called to resume the iteration of layers. + * * Payload: * - Data processed by protocol layers in a particular sequence. In the wrap * direction, this data generally starts as a DNS packet, which is then @@ -105,6 +112,14 @@ struct comm_info { bool xdp:1; }; +/** Just a simple struct able to hold three IPv6 or IPv4 addresses, so that we + * can hold them somewhere. */ +struct comm_addr_storage { + union kr_sockaddr src_addr; + union kr_sockaddr comm_addr; + union kr_sockaddr dst_addr; +}; + /** A buffer control struct, with indices marking a chunk containing received * but as of yet unprocessed data - the data in this chunk is called "valid @@ -399,9 +414,14 @@ struct protolayer_iter_ctx { /* read-write for layers: */ /** The payload */ struct protolayer_payload payload; - /** Communication information. Typically written into by one of the - * first layers facilitating transport protocol processing. */ - struct comm_info comm; + /** Pointer to communication information. For TCP, this will generally + * point to the storage in the session. For UDP, this will generally + * point to the storage in this context. */ + struct comm_info *comm; + /** Communication information storage. This will generally be set by one + * of the first layers in the sequence, if used, e.g. UDP PROXYv2. */ + struct comm_info comm_storage; + struct comm_addr_storage comm_addr_storage; /** Per-iter memory pool. Has no `free` procedure, gets freed as a whole * when the context is being destroyed. Initialized and destroyed * automatically - layers may use it to allocate memory. */ @@ -414,8 +434,9 @@ struct protolayer_iter_ctx { /* internal information for the manager - should only be used by the protolayer * system, never by layers: */ enum protolayer_direction direction; - /** If `true`, the processing of layers has been paused and is waiting - * to be resumed or canceled. */ + /** If `true`, the processing of the layer sequence has been paused and + * is waiting to be resumed (`protolayer_continue()`) or cancelled + * (`protolayer_break()`). */ bool async_mode; /** The index of the layer that is currently being (or has just been) * processed. */ @@ -509,8 +530,8 @@ struct protolayer_data { }; /** Return value of `protolayer_iter_cb` callbacks. To be returned by *layer - * sequence return functions* as a sanity check. Not to be used directly by - * user code. */ + * sequence return functions* (see glossary) as a sanity check. Not to be used + * directly by user code. */ enum protolayer_iter_cb_result { PROTOLAYER_ITER_CB_RESULT_MAGIC = 0x364F392E, }; @@ -522,9 +543,9 @@ enum protolayer_iter_cb_result { * * The function (or another function, that the pointed-to function causes to be * called, directly or through an asynchronous operation), must call one of the - * *layer sequence return functions* (e.g. `protolayer_continue()`, - * `protolayer_async()`, ...) to advance (or end) the layer sequence. The - * function must return the result of such a return function. */ + * *layer sequence return functions* (see glossary) to advance (or end) the + * layer sequence. The function must return the result of such a return + * function. */ typedef enum protolayer_iter_cb_result (*protolayer_iter_cb)( void *sess_data, void *iter_data, @@ -693,25 +714,25 @@ struct protolayer_globals { extern struct protolayer_globals protolayer_globals[PROTOLAYER_TYPE_COUNT]; -/** *Layer sequence return function* - signalizes the protolayer manager to - * continue processing the next layer. */ +/** *Layer sequence return function* (see glossary) - signalizes the protolayer + * manager to continue processing the next layer. */ enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx); -/** *Layer sequence return function* - signalizes that the layer wants to stop - * processing of the buffer and clean up, possibly due to an error (indicated - * by a non-zero `status`). */ +/** *Layer sequence return function* (see glossary) - signalizes that the layer + * wants to stop processing of the buffer and clean up, possibly due to an error + * (indicated by a non-zero `status`). */ enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status); -/** *Layer sequence return function* - signalizes that the current sequence - * will continue in an asynchronous manner. The layer should store the context - * and call another sequence return function at another point. This may be used - * in layers that work through libraries whose operation is asynchronous, like - * GnuTLS. +/** *Layer sequence return function* (see glossary) - signalizes that the + * current sequence will continue in an asynchronous manner. The layer should + * store the context and call another sequence return function at another point. + * This may be used in layers that work through libraries whose operation is + * asynchronous, like GnuTLS. * - * Note that this return function is just a readability hint - another return - * function may be called in another stack frame before it (generally during a - * call to an external library function, e.g. GnuTLS or nghttp2) and the - * sequence will continue correctly. */ + * Note that this one is basically just a readability hint - another return + * function may be actually called before it (generally during a call to an + * external library function, e.g. GnuTLS or nghttp2). This is completely legal + * and the sequence will continue correctly. */ static inline enum protolayer_iter_cb_result protolayer_async(void) { return PROTOLAYER_ITER_CB_RESULT_MAGIC; @@ -776,7 +797,7 @@ struct session2 { /** Communication information. Typically written into by one of the * first layers facilitating transport protocol processing. * Zero-initialized by default. */ - struct comm_info comm; + struct comm_info comm_storage; /** Time of last IO activity (if any occurs). Otherwise session * creation time. */ @@ -996,7 +1017,7 @@ void session2_penalize(struct session2 *session); * indicating an error. */ int session2_unwrap(struct session2 *s, struct protolayer_payload payload, const struct comm_info *comm, protolayer_finished_cb cb, - void *baton); + void *baton); /** Same as `session2_unwrap`, but looks up the specified `protocol` in the * session's assigned protocol group and sends the `payload` to the layer that diff --git a/daemon/worker.c b/daemon/worker.c index 5473c8b89..562ebf079 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -633,9 +633,6 @@ static int qr_task_send(struct qr_task *task, struct session2 *session, int ret = 0; - if (comm == NULL) - comm = &session->comm; - if (pkt == NULL) pkt = worker_task_get_pktbuf(task); @@ -1771,7 +1768,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap( break; } - ret = worker_submit(session, &ctx->comm, pkt); + ret = worker_submit(session, ctx->comm, pkt); if (ret) break; } @@ -1789,7 +1786,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap( if (!pkt) return protolayer_break(ctx, KNOT_EMALF); - int ret = worker_submit(session, &ctx->comm, pkt); + int ret = worker_submit(session, ctx->comm, pkt); mp_flush(the_worker->pkt_pool.ctx); return protolayer_break(ctx, ret); } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) { @@ -1805,7 +1802,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap( if (!pkt) return protolayer_break(ctx, KNOT_EMALF); - int ret = worker_submit(session, &ctx->comm, pkt); + int ret = worker_submit(session, ctx->comm, pkt); wire_buf_reset(ctx->payload.wire_buf); mp_flush(the_worker->pkt_pool.ctx); return protolayer_break(ctx, ret); @@ -2173,14 +2170,14 @@ static enum protolayer_iter_cb_result pl_dns_stream_unwrap( if (stream_sess->single && stream_sess->produced) { if (kr_log_is_debug(WORKER, NULL)) { kr_log_debug(WORKER, "Unexpected extra data from %s\n", - kr_straddr(ctx->comm.src_addr)); + kr_straddr(ctx->comm->src_addr)); } status = KNOT_EMALF; goto exit; } stream_sess->produced = true; - int ret = worker_submit(session, &ctx->comm, pkt); + int ret = worker_submit(session, ctx->comm, pkt); /* Errors from worker_submit() are intentionally *not* handled * in order to ensure the entire wire buffer is processed. */