]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/session2: fix asynchronous layer iterations
authorOto Šťáva <oto.stava@nic.cz>
Fri, 31 May 2024 10:09:42 +0000 (12:09 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Tue, 4 Jun 2024 11:05:00 +0000 (13:05 +0200)
There were a few bugs in the protolayer system that prevented us from
pausing iteration and resuming it properly. This commit should hopefully
resolve them.

daemon/io.c
daemon/proxyv2.c
daemon/session2.c
daemon/session2.h
daemon/worker.c

index d219f592bbee51437e292e6b4a094136e2aca76c..b6b289aea8cc3f1c72f1fc70f90a5886e66b36a1 100644 (file)
@@ -151,6 +151,17 @@ static enum protolayer_event_cb_result pl_udp_event_wrap(
        return PROTOLAYER_EVENT_PROPAGATE;
 }
 
+static int pl_tcp_sess_init(struct session2 *session,
+                            void *data, void *param)
+{
+       struct sockaddr *peer = session2_get_peer(session);
+       session->comm_storage = (struct comm_info) {
+               .comm_addr = peer,
+               .src_addr = peer
+       };
+       return 0;
+}
+
 static enum protolayer_event_cb_result pl_tcp_event_wrap(
                enum protolayer_event_type event, void **baton,
                struct session2 *session, void *sess_data)
@@ -173,6 +184,7 @@ void io_protolayers_init(void)
        };
 
        protolayer_globals[PROTOLAYER_TYPE_TCP] = (struct protolayer_globals){
+               .sess_init = pl_tcp_sess_init,
                .event_wrap = pl_tcp_event_wrap,
        };
 }
index 5f2147ce8bfcc4bd611fc259db845218430acbd1..d080d0c6595c08a400524c369fa691af95b9b07d 100644 (file)
@@ -330,7 +330,7 @@ static enum protolayer_iter_cb_result pl_proxyv2_dgram_unwrap(
 
        char *data = ctx->payload.buffer.buf;
        ssize_t data_len = ctx->payload.buffer.len;
-       struct comm_info *comm = &ctx->comm;
+       struct comm_info *comm = ctx->comm;
        if (!s->outgoing && proxy_header_present(data, data_len)) {
                if (!proxy_allowed(comm->comm_addr)) {
                        kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
@@ -385,17 +385,6 @@ struct pl_proxyv2_stream_sess_data {
        bool has_proxy : 1;
 };
 
-static int pl_proxyv2_stream_sess_init(struct session2 *session,
-                                       void *data, void *param)
-{
-       struct sockaddr *peer = session2_get_peer(session);
-       session->comm = (struct comm_info) {
-               .comm_addr = peer,
-               .src_addr = peer
-       };
-       return 0;
-}
-
 static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap(
                void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
 {
@@ -410,7 +399,7 @@ static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap(
 
        char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */
        ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf);
-       struct comm_info *comm = &ctx->session->comm;
+       struct comm_info *comm = ctx->comm;
        if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) {
                if (!proxy_allowed(comm->src_addr)) {
                        if (kr_log_is_debug(IO, NULL)) {
@@ -458,7 +447,6 @@ static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap(
        }
 
        tcp->had_data = true;
-       ctx->comm = ctx->session->comm;
        return protolayer_continue(ctx);
 }
 
@@ -472,7 +460,6 @@ void proxy_protolayers_init(void)
 
        protolayer_globals[PROTOLAYER_TYPE_PROXYV2_STREAM] = (struct protolayer_globals){
                .sess_size = sizeof(struct pl_proxyv2_stream_sess_data),
-               .sess_init = pl_proxyv2_stream_sess_init,
                .unwrap = pl_proxyv2_stream_unwrap,
        };
 }
index 1a9068e4a4d7857c6ffefd9850a84d91c6187d6b..19ea42dc3de0632c24d65718516da0c717fea406 100644 (file)
@@ -406,7 +406,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
                                ctx->layer_ix, layer_name_ctx(ctx), ctx->status);
 
        if (ctx->finished_cb)
-               ctx->finished_cb(ret, s, &ctx->comm,
+               ctx->finished_cb(ret, s, ctx->comm,
                                ctx->finished_cb_baton);
 
        mm_ctx_delete(&ctx->pool);
@@ -440,12 +440,12 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
                session2_transport_push(session,
                                ctx->payload.buffer.buf, ctx->payload.buffer.len,
                                ctx->payload.short_lived,
-                               &ctx->comm, protolayer_push_finished, ctx);
+                               ctx->comm, protolayer_push_finished, ctx);
        } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
                session2_transport_pushv(session,
                                ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
                                ctx->payload.short_lived,
-                               &ctx->comm, protolayer_push_finished, ctx);
+                               ctx->comm, protolayer_push_finished, ctx);
        } else {
                kr_assert(false && "Invalid payload type");
                return kr_error(EINVAL);
@@ -473,8 +473,9 @@ static void protolayer_payload_ensure_long_lived(struct protolayer_iter_ctx *ctx
 /** Processes as many layers as possible synchronously, returning when either
  * a layer has gone asynchronous, or when the whole sequence has finished.
  *
- * May be called multiple times on the same `ctx` to continue processing
- * after an asynchronous operation. */
+ * May be called multiple times on the same `ctx` to continue processing after
+ * an asynchronous operation - user code will do this via *layer sequence return
+ * functions*. */
 static int protolayer_step(struct protolayer_iter_ctx *ctx)
 {
        while (true) {
@@ -484,38 +485,52 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
                enum protolayer_type protocol = protolayer_grps[ctx->session->proto].layers[ctx->layer_ix];
                struct protolayer_globals *globals = &protolayer_globals[protocol];
 
+               bool was_async = ctx->async_mode;
                ctx->async_mode = false;
-               ctx->status = 0;
-               ctx->action = PROTOLAYER_ITER_ACTION_NULL;
 
-               protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP)
-                       ? globals->unwrap : globals->wrap;
+               /* Basically if we went asynchronous, we want to "resume" from
+                * underneath this `if` block. */
+               if (!was_async) {
+                       ctx->status = 0;
+                       ctx->action = PROTOLAYER_ITER_ACTION_NULL;
 
-               if (ctx->session->closing) {
-                       return protolayer_iter_ctx_finish(
-                                       ctx, kr_error(ECANCELED));
-               }
+                       protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP)
+                               ? globals->unwrap : globals->wrap;
 
-               if (cb) {
-                       struct protolayer_data *sess_data = protolayer_sess_data_get(
-                                       ctx->session, ctx->layer_ix);
-                       struct protolayer_data *iter_data = protolayer_iter_data_get(
-                                       ctx, ctx->layer_ix);
-                       enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx);
-                       if (kr_fails_assert(result == PROTOLAYER_ITER_CB_RESULT_MAGIC)) {
-                               /* Callback did not use a continuation function to return. */
-                               return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
+                       if (ctx->session->closing) {
+                               return protolayer_iter_ctx_finish(
+                                               ctx, kr_error(ECANCELED));
                        }
-               } else {
-                       ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
-               }
 
+                       if (cb) {
+                               struct protolayer_data *sess_data = protolayer_sess_data_get(
+                                               ctx->session, ctx->layer_ix);
+                               struct protolayer_data *iter_data = protolayer_iter_data_get(
+                                               ctx, ctx->layer_ix);
+                               enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx);
+                               if (kr_fails_assert(result == PROTOLAYER_ITER_CB_RESULT_MAGIC)) {
+                                       /* Callback did not use a *layer
+                                        * sequence return function* (see
+                                        * glossary). */
+                                       return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
+                               }
+                       } else {
+                               ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
+                       }
+
+                       if (!ctx->action) {
+                               /* We're going asynchronous - the next step is
+                                * probably going to be from some sort of a
+                                * callback and we will "resume" from underneath
+                                * this `if` block. */
+                               ctx->async_mode = true;
+                               protolayer_payload_ensure_long_lived(ctx);
+                               return PROTOLAYER_RET_ASYNC;
+                       }
+               }
 
-               if (!ctx->action) {
-                       /* Next step is from a callback */
-                       ctx->async_mode = true;
-                       protolayer_payload_ensure_long_lived(ctx);
-                       return PROTOLAYER_RET_ASYNC;
+               if (kr_fails_assert(ctx->action)) {
+                       return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
                }
 
                if (ctx->action == PROTOLAYER_ITER_ACTION_BREAK) {
@@ -525,7 +540,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
 
                if (kr_fails_assert(ctx->status == 0)) {
                        /* Status should be zero without a BREAK. */
-                       return protolayer_iter_ctx_finish(ctx, kr_error(ECANCELED));
+                       return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
                }
 
                if (ctx->action == PROTOLAYER_ITER_ACTION_CONTINUE) {
@@ -566,6 +581,10 @@ static int session2_submit(
        if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
                return kr_error(EFAULT);
 
+       bool had_comm_param = (comm != NULL);
+       if (!had_comm_param)
+               comm = &session->comm_storage;
+
        struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
        kr_require(ctx);
 
@@ -578,13 +597,33 @@ static int session2_submit(
 
        *ctx = (struct protolayer_iter_ctx) {
                .payload = payload,
-               .comm = (comm) ? *comm : session->comm,
                .direction = direction,
                .layer_ix = layer_ix,
                .session = session,
                .finished_cb = cb,
                .finished_cb_baton = baton
        };
+       if (had_comm_param) {
+               struct comm_addr_storage *addrst = &ctx->comm_addr_storage;
+               if (comm->src_addr) {
+                       memcpy(&addrst->src_addr.ip, comm->src_addr,
+                               kr_sockaddr_len(comm->src_addr));
+                       ctx->comm_storage.src_addr = &addrst->src_addr.ip;
+               }
+               if (comm->comm_addr) {
+                       memcpy(&addrst->comm_addr.ip, comm->comm_addr,
+                               kr_sockaddr_len(comm->comm_addr));
+                       ctx->comm_storage.comm_addr = &addrst->comm_addr.ip;
+               }
+               if (comm->dst_addr) {
+                       memcpy(&addrst->dst_addr.ip, comm->dst_addr,
+                               kr_sockaddr_len(comm->dst_addr));
+                       ctx->comm_storage.dst_addr = &addrst->dst_addr.ip;
+               }
+               ctx->comm = &ctx->comm_storage;
+       } else {
+               ctx->comm = &session->comm_storage;
+       }
        mm_ctx_mempool(&ctx->pool, CPU_PAGE_SIZE);
 
        const struct protolayer_grp *grp = &protolayer_grps[session->proto];
@@ -616,25 +655,26 @@ static void *get_init_param(enum protolayer_type p,
        return NULL;
 }
 
-enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx)
+/** Called by *Layer sequence return functions* to proceed with protolayer
+ * processing. If the  */
+static inline void maybe_async_do_step(struct protolayer_iter_ctx *ctx)
 {
-       if (ctx->async_mode) {
-               protolayer_iter_ctx_next(ctx);
+       if (ctx->async_mode)
                protolayer_step(ctx);
-       } else {
-               ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
-       }
+}
+
+enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx)
+{
+       ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
+       maybe_async_do_step(ctx);
        return PROTOLAYER_ITER_CB_RESULT_MAGIC;
 }
 
 enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status)
 {
        ctx->status = status;
-       if (ctx->async_mode) {
-               protolayer_iter_ctx_finish(ctx, PROTOLAYER_RET_NORMAL);
-       } else {
-               ctx->action = PROTOLAYER_ITER_ACTION_BREAK;
-       }
+       ctx->action = PROTOLAYER_ITER_ACTION_BREAK;
+       maybe_async_do_step(ctx);
        return PROTOLAYER_ITER_CB_RESULT_MAGIC;
 }
 
index b43d8fc13a999782c09c268c326d4fb9bfbe7304..931dd6ee0b1ecd658d118e483e8b172256448fea 100644 (file)
  *   processing, it is also the lifetime of `struct protolayer_iter_ctx` and
  *   layer-specific data contained therein.
  *
+ * Layer sequence return function:
+ *   - One of `protolayer_break()`, `protolayer_continue()`, or
+ *   `protolayer_async()` - a function that a protolayer's `_wrap` or `_unwrap`
+ *   callback should call to get its return value. They may either be called
+ *   synchronously directly in the callback to end/pause the processing, or, if
+ *   the processing went asynchronous, called to resume the iteration of layers.
+ *
  * Payload:
  *   - Data processed by protocol layers in a particular sequence. In the wrap
  *   direction, this data generally starts as a DNS packet, which is then
@@ -105,6 +112,14 @@ struct comm_info {
        bool xdp:1;
 };
 
+/** Just a simple struct able to hold three IPv6 or IPv4 addresses, so that we
+ * can hold them somewhere. */
+struct comm_addr_storage {
+       union kr_sockaddr src_addr;
+       union kr_sockaddr comm_addr;
+       union kr_sockaddr dst_addr;
+};
+
 
 /** A buffer control struct, with indices marking a chunk containing received
  * but as of yet unprocessed data - the data in this chunk is called "valid
@@ -399,9 +414,14 @@ struct protolayer_iter_ctx {
 /* read-write for layers: */
        /** The payload */
        struct protolayer_payload payload;
-       /** Communication information. Typically written into by one of the
-        * first layers facilitating transport protocol processing. */
-       struct comm_info comm;
+       /** Pointer to communication information. For TCP, this will generally
+        * point to the storage in the session. For UDP, this will generally
+        * point to the storage in this context. */
+       struct comm_info *comm;
+       /** Communication information storage. This will generally be set by one
+        * of the first layers in the sequence, if used, e.g. UDP PROXYv2. */
+       struct comm_info comm_storage;
+       struct comm_addr_storage comm_addr_storage;
        /** Per-iter memory pool. Has no `free` procedure, gets freed as a whole
         * when the context is being destroyed. Initialized and destroyed
         * automatically - layers may use it to allocate memory. */
@@ -414,8 +434,9 @@ struct protolayer_iter_ctx {
 /* internal information for the manager - should only be used by the protolayer
  * system, never by layers: */
        enum protolayer_direction direction;
-       /** If `true`, the processing of layers has been paused and is waiting
-        * to be resumed or canceled. */
+       /** If `true`, the processing of the layer sequence has been paused and
+        * is waiting to be resumed (`protolayer_continue()`) or cancelled
+        * (`protolayer_break()`). */
        bool async_mode;
        /** The index of the layer that is currently being (or has just been)
         * processed. */
@@ -509,8 +530,8 @@ struct protolayer_data {
 };
 
 /** Return value of `protolayer_iter_cb` callbacks. To be returned by *layer
- * sequence return functions* as a sanity check. Not to be used directly by
- * user code. */
+ * sequence return functions* (see glossary) as a sanity check. Not to be used
+ * directly by user code. */
 enum protolayer_iter_cb_result {
        PROTOLAYER_ITER_CB_RESULT_MAGIC = 0x364F392E,
 };
@@ -522,9 +543,9 @@ enum protolayer_iter_cb_result {
  *
  * The function (or another function, that the pointed-to function causes to be
  * called, directly or through an asynchronous operation), must call one of the
- * *layer sequence return functions* (e.g. `protolayer_continue()`,
- * `protolayer_async()`, ...) to advance (or end) the layer sequence. The
- * function must return the result of such a return function. */
+ * *layer sequence return functions* (see glossary) to advance (or end) the
+ * layer sequence. The function must return the result of such a return
+ * function. */
 typedef enum protolayer_iter_cb_result (*protolayer_iter_cb)(
                void *sess_data,
                void *iter_data,
@@ -693,25 +714,25 @@ struct protolayer_globals {
 extern struct protolayer_globals protolayer_globals[PROTOLAYER_TYPE_COUNT];
 
 
-/** *Layer sequence return function* - signalizes the protolayer manager to
- * continue processing the next layer. */
+/** *Layer sequence return function* (see glossary) - signalizes the protolayer
+ * manager to continue processing the next layer. */
 enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx);
 
-/** *Layer sequence return function* - signalizes that the layer wants to stop
- * processing of the buffer and clean up, possibly due to an error (indicated
- * by a non-zero `status`). */
+/** *Layer sequence return function* (see glossary) - signalizes that the layer
+ * wants to stop processing of the buffer and clean up, possibly due to an error
+ * (indicated by a non-zero `status`). */
 enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status);
 
-/** *Layer sequence return function* - signalizes that the current sequence
- * will continue in an asynchronous manner. The layer should store the context
- * and call another sequence return function at another point. This may be used
- * in layers that work through libraries whose operation is asynchronous, like
- * GnuTLS.
+/** *Layer sequence return function* (see glossary) - signalizes that the
+ * current sequence will continue in an asynchronous manner. The layer should
+ * store the context and call another sequence return function at another point.
+ * This may be used in layers that work through libraries whose operation is
+ * asynchronous, like GnuTLS.
  *
- * Note that this return function is just a readability hint - another return
- * function may be called in another stack frame before it (generally during a
- * call to an external library function, e.g. GnuTLS or nghttp2) and the
- * sequence will continue correctly. */
+ * Note that this one is basically just a readability hint - another return
+ * function may be actually called before it (generally during a call to an
+ * external library function, e.g. GnuTLS or nghttp2). This is completely legal
+ * and the sequence will continue correctly. */
 static inline enum protolayer_iter_cb_result protolayer_async(void)
 {
        return PROTOLAYER_ITER_CB_RESULT_MAGIC;
@@ -776,7 +797,7 @@ struct session2 {
        /** Communication information. Typically written into by one of the
         * first layers facilitating transport protocol processing.
         * Zero-initialized by default. */
-       struct comm_info comm;
+       struct comm_info comm_storage;
 
        /** Time of last IO activity (if any occurs). Otherwise session
         * creation time. */
@@ -996,7 +1017,7 @@ void session2_penalize(struct session2 *session);
  * indicating an error. */
 int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
                     const struct comm_info *comm, protolayer_finished_cb cb,
-                   void *baton);
+                    void *baton);
 
 /** Same as `session2_unwrap`, but looks up the specified `protocol` in the
  * session's assigned protocol group and sends the `payload` to the layer that
index 5473c8b89dec05a9c3f3522b63f693c3b96466f6..562ebf07912a30fd9db7b5224a88a675af1fbf36 100644 (file)
@@ -633,9 +633,6 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
 
        int ret = 0;
 
-       if (comm == NULL)
-               comm = &session->comm;
-
        if (pkt == NULL)
                pkt = worker_task_get_pktbuf(task);
 
@@ -1771,7 +1768,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
                                break;
                        }
 
-                       ret = worker_submit(session, &ctx->comm, pkt);
+                       ret = worker_submit(session, ctx->comm, pkt);
                        if (ret)
                                break;
                }
@@ -1789,7 +1786,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
                if (!pkt)
                        return protolayer_break(ctx, KNOT_EMALF);
 
-               int ret = worker_submit(session, &ctx->comm, pkt);
+               int ret = worker_submit(session, ctx->comm, pkt);
                mp_flush(the_worker->pkt_pool.ctx);
                return protolayer_break(ctx, ret);
        } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
@@ -1805,7 +1802,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
                if (!pkt)
                        return protolayer_break(ctx, KNOT_EMALF);
 
-               int ret = worker_submit(session, &ctx->comm, pkt);
+               int ret = worker_submit(session, ctx->comm, pkt);
                wire_buf_reset(ctx->payload.wire_buf);
                mp_flush(the_worker->pkt_pool.ctx);
                return protolayer_break(ctx, ret);
@@ -2173,14 +2170,14 @@ static enum protolayer_iter_cb_result pl_dns_stream_unwrap(
                if (stream_sess->single && stream_sess->produced) {
                        if (kr_log_is_debug(WORKER, NULL)) {
                                kr_log_debug(WORKER, "Unexpected extra data from %s\n",
-                                               kr_straddr(ctx->comm.src_addr));
+                                               kr_straddr(ctx->comm->src_addr));
                        }
                        status = KNOT_EMALF;
                        goto exit;
                }
 
                stream_sess->produced = true;
-               int ret = worker_submit(session, &ctx->comm, pkt);
+               int ret = worker_submit(session, ctx->comm, pkt);
 
                /* Errors from worker_submit() are intentionally *not* handled
                 * in order to ensure the entire wire buffer is processed. */