]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/session2: make copies short-lived buffers when needed
authorOto Šťáva <oto.stava@nic.cz>
Fri, 23 Jun 2023 09:02:34 +0000 (11:02 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Fri, 23 Jun 2023 09:11:36 +0000 (11:11 +0200)
daemon/http.c
daemon/io.c
daemon/session2.c
daemon/session2.h
daemon/tls.c
daemon/worker.c

index 2d51b7dab09cfeeb06587967380c46507e3ec27d..7d1f0899c5857cfd38ac51423f113a1e6364283f 100644 (file)
@@ -390,7 +390,7 @@ static ssize_t send_callback(nghttp2_session *h2, const uint8_t *data, size_t le
 
        kr_log_debug(DOH, "[%p] send_callback: %p\n", (void *)h2, (void *)send_ctx->data);
        session2_wrap_after(http->h.session, PROTOLAYER_PROTOCOL_HTTP,
-                       protolayer_buffer(send_ctx->data, length), NULL,
+                       protolayer_buffer(send_ctx->data, length, false), NULL,
                        callback_finished_free_baton, send_ctx);
 
        return length;
@@ -506,7 +506,7 @@ static int send_data_callback(nghttp2_session *h2, nghttp2_frame *frame, const u
 
        kr_assert(cur == iovcnt);
        int ret = session2_wrap_after(http->h.session, PROTOLAYER_PROTOCOL_HTTP,
-                       protolayer_iovec(dest_iov, cur),
+                       protolayer_iovec(dest_iov, cur, false),
                        NULL, callback_finished_free_baton, sdctx);
 
        if (ret < 0)
@@ -733,7 +733,7 @@ static int submit_to_wirebuffer(struct pl_http_sess_data *ctx)
 
        ret = 0;
        session2_unwrap_after(ctx->h.session, PROTOLAYER_PROTOCOL_HTTP,
-                       protolayer_wire_buf(wb), NULL, NULL, NULL);
+                       protolayer_wire_buf(wb, false), NULL, NULL, NULL);
 cleanup:
        http_cleanup_stream(ctx);
        return ret;
index 4f61a0a9c86318984d2a8e683040fdec26d4b606..ea0330044d926617437bfd536408f228ac92c568 100644 (file)
@@ -90,8 +90,8 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
                .comm_addr = comm_addr,
                .src_addr = comm_addr
        };
-       session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf), &in_comm,
-                       udp_on_unwrapped, NULL);
+       session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf, false),
+                       &in_comm, udp_on_unwrapped, NULL);
 }
 
 static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name)
@@ -189,7 +189,8 @@ static enum protolayer_iter_cb_result pl_udp_unwrap(
                        }
                }
 
-               ctx->payload = protolayer_buffer(data + trimmed, data_len - trimmed);
+               ctx->payload = protolayer_buffer(
+                               data + trimmed, data_len - trimmed, false);
        }
 
        return protolayer_continue(ctx);
@@ -271,7 +272,7 @@ static enum protolayer_iter_cb_result pl_tcp_unwrap(
 
                memcpy(wire_buf_free_space(&tcp->wire_buf), buf, len);
                wire_buf_consume(&tcp->wire_buf, ctx->payload.buffer.len);
-               ctx->payload = protolayer_wire_buf(&tcp->wire_buf);
+               ctx->payload = protolayer_wire_buf(&tcp->wire_buf, false);
        }
 
        if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) {
@@ -523,7 +524,8 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
                return;
        }
 
-       session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf), NULL, NULL, NULL);
+       session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf, false),
+                       NULL, NULL, NULL);
 }
 
 static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp)
@@ -936,7 +938,9 @@ static void xdp_rx(uv_poll_t* handle, int status, int events)
                memcpy(comm.eth_from, msg->eth_from, sizeof(comm.eth_from));
                memcpy(comm.eth_to, msg->eth_to, sizeof(comm.eth_to));
                session2_unwrap(xhd->session,
-                               protolayer_buffer(msg->payload.iov_base, msg->payload.iov_len),
+                               protolayer_buffer(
+                                       msg->payload.iov_base,
+                                       msg->payload.iov_len, false),
                                &comm, NULL, NULL);
                if (ret)
                        kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
index 10534cba5df654b95575457b0c41d17751e6baca..491f0e6a02056f53260ce26c7c990437adf4d657 100644 (file)
@@ -135,32 +135,58 @@ const char *protolayer_payload_name(enum protolayer_payload_type p)
 /* Forward decls. */
 static int session2_transport_pushv(struct session2 *s,
                                     struct iovec *iov, int iovcnt,
+                                    bool iov_short_lived,
                                     const struct comm_info *comm,
                                     protolayer_finished_cb cb, void *baton);
 static inline int session2_transport_push(struct session2 *s,
                                           char *buf, size_t buf_len,
+                                          bool buf_short_lived,
                                           const struct comm_info *comm,
                                           protolayer_finished_cb cb, void *baton);
 static int session2_transport_event(struct session2 *s,
                                     enum protolayer_event_type event,
                                     void *baton);
 
+static size_t iovecs_size(const struct iovec *iov, int cnt)
+{
+       size_t sum = 0;
+       for (int i = 0; i < cnt; i++) {
+               sum += iov[i].iov_len;
+       }
+       return sum;
+}
+
+static size_t iovecs_copy(void *dest, const struct iovec *iov, int cnt,
+                          size_t max_len)
+{
+       const size_t pld_size = iovecs_size(iov, cnt);
+       const size_t copy_size = MIN(max_len, pld_size);
+       char *cur = dest;
+       size_t remaining = copy_size;
+       for (int i = 0; i < cnt && remaining; i++) {
+               size_t l = iov[i].iov_len;
+               size_t to_copy = MIN(l, remaining);
+               memcpy(cur, iov[i].iov_base, to_copy);
+               remaining -= l;
+               cur += l;
+       }
+
+       kr_assert(remaining == 0 && (cur - (char *)dest) == copy_size);
+       return copy_size;
+}
 
 size_t protolayer_payload_size(const struct protolayer_payload *payload)
 {
-       if (payload->type == PROTOLAYER_PAYLOAD_BUFFER) {
+       switch (payload->type) {
+       case PROTOLAYER_PAYLOAD_BUFFER:
                return payload->buffer.len;
-       } else if (payload->type == PROTOLAYER_PAYLOAD_IOVEC) {
-               size_t sum = 0;
-               for (int i = 0; i < payload->iovec.cnt; i++) {
-                       sum += payload->iovec.iov[i].iov_len;
-               }
-               return sum;
-       } else if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
+       case PROTOLAYER_PAYLOAD_IOVEC:
+               return iovecs_size(payload->iovec.iov, payload->iovec.cnt);
+       case PROTOLAYER_PAYLOAD_WIRE_BUF:
                return wire_buf_data_length(payload->wire_buf);
-       } else if(!payload->type) {
+       case PROTOLAYER_PAYLOAD_NULL:
                return 0;
-       } else {
+       default:
                kr_assert(false && "Invalid payload type");
                return 0;
        }
@@ -208,6 +234,7 @@ struct protolayer_payload protolayer_as_buffer(const struct protolayer_payload *
        if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
                struct protolayer_payload new_payload = {
                        .type = PROTOLAYER_PAYLOAD_BUFFER,
+                       .short_lived = payload->short_lived,
                        .ttl = payload->ttl,
                        .buffer = {
                                .buf = wire_buf_data(payload->wire_buf),
@@ -373,6 +400,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
                ctx->finished_cb(ret, session, &ctx->comm,
                                ctx->finished_cb_baton);
 
+       free(ctx->async_buffer);
        free(ctx);
 
        return ret;
@@ -402,10 +430,12 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
                session2_transport_push(session,
                                ctx->payload.buffer.buf, ctx->payload.buffer.len,
+                               ctx->payload.short_lived,
                                &ctx->comm, protolayer_push_finished, ctx);
        } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
                session2_transport_pushv(session,
                                ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
+                               ctx->payload.short_lived,
                                &ctx->comm, protolayer_push_finished, ctx);
        } else {
                kr_assert(false && "Invalid payload type");
@@ -415,6 +445,23 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
        return PROTOLAYER_RET_ASYNC;
 }
 
+static void protolayer_ensure_long_lived(struct protolayer_iter_ctx *ctx)
+{
+       if (!ctx->payload.short_lived)
+               return;
+
+       size_t buf_len = protolayer_payload_size(&ctx->payload);
+       if (kr_fails_assert(buf_len))
+               return;
+
+       void *buf = malloc(buf_len);
+       kr_require(buf);
+       protolayer_payload_copy(buf, &ctx->payload, buf_len);
+
+       ctx->async_buffer = buf;
+       ctx->payload = protolayer_buffer(buf, buf_len, false);
+}
+
 /** Processes as many layers as possible synchronously, returning when either
  * a layer has gone asynchronous, or when the whole sequence has finished.
  *
@@ -459,6 +506,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
                if (!ctx->action) {
                        /* Next step is from a callback */
                        ctx->async_mode = true;
+                       protolayer_ensure_long_lived(ctx);
                        return PROTOLAYER_RET_ASYNC;
                }
 
@@ -1229,8 +1277,7 @@ struct session2_pushv_ctx {
        const struct comm_info *comm;
        void *baton;
 
-       char *buf;
-       size_t buf_len;
+       char *async_buf;
 };
 
 static void session2_transport_parent_pushv_finished(int status,
@@ -1241,36 +1288,32 @@ static void session2_transport_parent_pushv_finished(int status,
        struct session2_pushv_ctx *ctx = baton;
        if (ctx->cb)
                ctx->cb(status, ctx->session, comm, ctx->baton);
-       free(ctx->buf);
+       free(ctx->async_buf);
        free(ctx);
 }
 
-static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
+static void session2_transport_pushv_finished(int status, struct session2_pushv_ctx *ctx)
 {
-       struct session2_pushv_ctx *ctx = baton;
        if (ctx->cb)
                ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
-       free(ctx->buf);
+       free(ctx->async_buf);
        free(ctx);
 }
 
+static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
+{
+       session2_transport_pushv_finished(status, baton);
+}
+
 static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status)
 {
-       struct session2_pushv_ctx *ctx = req->data;
-       if (ctx->cb)
-               ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
-       free(ctx->buf);
-       free(ctx);
+       session2_transport_pushv_finished(status, req->data);
        free(req);
 }
 
 static void session2_transport_stream_pushv_finished(uv_write_t *req, int status)
 {
-       struct session2_pushv_ctx *ctx = req->data;
-       if (ctx->cb)
-               ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
-       free(ctx->buf);
-       free(ctx);
+       session2_transport_pushv_finished(status, req->data);
        free(req);
 }
 
@@ -1298,11 +1341,35 @@ static void xdp_tx_waker(uv_idle_t *handle)
 }
 #endif
 
+static void session2_transport_pushv_ensure_long_lived(
+               struct iovec **iov, int *iovcnt, bool iov_short_lived,
+               struct iovec *out_iovecmem, struct session2_pushv_ctx *ctx)
+{
+       if (!iov_short_lived)
+               return;
+
+       size_t iovsize = iovecs_size(*iov, *iovcnt);
+       if (kr_fails_assert(iovsize))
+               return;
+
+       void *buf = malloc(iovsize);
+       kr_require(buf);
+       iovecs_copy(buf, *iov, *iovcnt, iovsize);
+
+       ctx->async_buf = buf;
+       out_iovecmem->iov_base = buf;
+       out_iovecmem->iov_len = iovsize;
+       *iov = out_iovecmem;
+       *iovcnt = 1;
+}
+
 static int session2_transport_pushv(struct session2 *s,
                                     struct iovec *iov, int iovcnt,
+                                    bool iov_short_lived,
                                     const struct comm_info *comm,
                                     protolayer_finished_cb cb, void *baton)
 {
+       struct iovec iovecmem;
        if (kr_fails_assert(s))
                return kr_error(EINVAL);
 
@@ -1336,34 +1403,46 @@ static int session2_transport_pushv(struct session2 *s,
                                if (kr_fails_assert(iovcnt == 1))
                                        return kr_error(EINVAL);
 
+                               session2_transport_pushv_ensure_long_lived(
+                                               &iov, &iovcnt, iov_short_lived,
+                                               &iovecmem, ctx);
                                udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len,
                                                session2_transport_udp_queue_pushv_finished,
                                                ctx);
                                return kr_ok();
                        } else {
-                               uv_udp_send_t *req = malloc(sizeof(*req));
-                               req->data = ctx;
-                               int ret = uv_udp_send(req, (uv_udp_t *)handle,
-                                               (uv_buf_t *)iov, iovcnt, comm->comm_addr,
-                                               session2_transport_udp_pushv_finished);
-                               if (ret) {
-                                       if (cb)
-                                               cb(ret, s, comm, baton);
-                                       free(req);
-                                       free(ctx);
+                               int ret = uv_udp_try_send((uv_udp_t*)handle,
+                                               (uv_buf_t *)iov, iovcnt, comm->comm_addr);
+                               if (ret == UV_EAGAIN) {
+                                       uv_udp_send_t *req = malloc(sizeof(*req));
+                                       req->data = ctx;
+                                       session2_transport_pushv_ensure_long_lived(
+                                                       &iov, &iovcnt, iov_short_lived,
+                                                       &iovecmem, ctx);
+                                       ret = uv_udp_send(req, (uv_udp_t *)handle,
+                                                       (uv_buf_t *)iov, iovcnt, comm->comm_addr,
+                                                       session2_transport_udp_pushv_finished);
+                                       if (ret)
+                                               session2_transport_udp_pushv_finished(req, ret);
+                               } else {
+                                       session2_transport_pushv_finished(ret, ctx);
                                }
                                return ret;
                        }
                } else if (handle->type == UV_TCP) {
-                       uv_write_t *req = malloc(sizeof(*req));
-                       req->data = ctx;
-                       int ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt,
-                                       session2_transport_stream_pushv_finished);
-                       if (ret) {
-                               if (cb)
-                                       cb(ret, s, comm, baton);
-                               free(req);
-                               free(ctx);
+                       int ret = uv_try_write((uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt);
+                       if (ret == UV_EAGAIN) {
+                               uv_write_t *req = malloc(sizeof(*req));
+                               req->data = ctx;
+                               session2_transport_pushv_ensure_long_lived(
+                                               &iov, &iovcnt, iov_short_lived,
+                                               &iovecmem, ctx);
+                               ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt,
+                                               session2_transport_stream_pushv_finished);
+                               if (ret)
+                                       session2_transport_stream_pushv_finished(req, ret);
+                       } else {
+                               session2_transport_pushv_finished(ret, ctx);
                        }
                        return ret;
 #if ENABLE_XDP
@@ -1376,6 +1455,10 @@ static int session2_transport_pushv(struct session2 *s,
                        if (kr_fails_assert(iovcnt == 1))
                                return kr_error(EINVAL);
 
+                       session2_transport_pushv_ensure_long_lived(
+                                       &iov, &iovcnt, iov_short_lived,
+                                       &iovecmem, ctx);
+
                        knot_xdp_msg_t msg;
 #if KNOT_VERSION_HEX >= 0x030100
                        /* We don't have a nice way of preserving the _msg_t from frame allocation,
@@ -1415,7 +1498,8 @@ static int session2_transport_pushv(struct session2 *s,
                        free(ctx);
                        return kr_error(EINVAL);
                }
-               int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
+               int ret = session2_wrap(parent,
+                               protolayer_iovec(iov, iovcnt, iov_short_lived),
                                comm, session2_transport_parent_pushv_finished,
                                ctx);
                return (ret < 0) ? ret : kr_ok();
@@ -1446,6 +1530,7 @@ static void session2_transport_single_push_finished(int status,
 
 static inline int session2_transport_push(struct session2 *s,
                                           char *buf, size_t buf_len,
+                                          bool buf_short_lived,
                                           const struct comm_info *comm,
                                           protolayer_finished_cb cb, void *baton)
 {
@@ -1460,7 +1545,7 @@ static inline int session2_transport_push(struct session2 *s,
                .baton = baton
        };
 
-       return session2_transport_pushv(s, &ctx->iov, 1, comm,
+       return session2_transport_pushv(s, &ctx->iov, 1, buf_short_lived, comm,
                        session2_transport_single_push_finished, ctx);
 }
 
index 00fa388cb4d7178aaa0825d72cf7738064b4bb07..133bb10fffbbba3aaccb7d290741d8583044f25c 100644 (file)
@@ -385,7 +385,15 @@ const char *protolayer_payload_name(enum protolayer_payload_type p);
  * is ever (de-)allocated by the protolayer manager! */
 struct protolayer_payload {
        enum protolayer_payload_type type;
-       unsigned int ttl; /**< time-to-live hint (e.g. for HTTP Cache-Control) */
+
+       /** Time-to-live hint (e.g. for HTTP Cache-Control) */
+       unsigned int ttl;
+
+       /** If `true`, the payload's memory may be freed early as kresd does not
+        * completely control its lifetime. When going asynchronous, it needs to
+        * be copied. */
+       bool short_lived;
+
        union {
                /** Only valid if `type` is `_BUFFER`. */
                struct {
@@ -426,6 +434,7 @@ struct protolayer_iter_ctx {
        struct protolayer_manager *manager;
        int status;
        enum protolayer_iter_action action;
+       void *async_buffer;
 
        /** Contains a sequence of variably-sized CPU-aligned layer-specific
         * structs. See `struct protolayer_manager::data`. */
@@ -444,10 +453,12 @@ size_t protolayer_payload_copy(void *dest,
                                size_t max_len);
 
 /** Convenience function to get a buffer-type payload. */
-static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len)
+static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len,
+                                                          bool short_lived)
 {
        return (struct protolayer_payload){
                .type = PROTOLAYER_PAYLOAD_BUFFER,
+               .short_lived = short_lived,
                .buffer = {
                        .buf = buf,
                        .len = len
@@ -457,10 +468,11 @@ static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len)
 
 /** Convenience function to get an iovec-type payload. */
 static inline struct protolayer_payload protolayer_iovec(
-               struct iovec *iov, int iovcnt)
+               struct iovec *iov, int iovcnt, bool short_lived)
 {
        return (struct protolayer_payload){
                .type = PROTOLAYER_PAYLOAD_IOVEC,
+               .short_lived = short_lived,
                .iovec = {
                        .iov = iov,
                        .cnt = iovcnt
@@ -469,10 +481,12 @@ static inline struct protolayer_payload protolayer_iovec(
 }
 
 /** Convenience function to get a wire-buf-type payload. */
-static inline struct protolayer_payload protolayer_wire_buf(struct wire_buf *wire_buf)
+static inline struct protolayer_payload protolayer_wire_buf(
+               struct wire_buf *wire_buf, bool short_lived)
 {
        return (struct protolayer_payload){
                .type = PROTOLAYER_PAYLOAD_WIRE_BUF,
+               .short_lived = short_lived,
                .wire_buf = wire_buf
        };
 }
index 6cddfb132802cbc3c87070ef8059e659df1ee160..1512f04516cf8a9156fb75edf4a052452c043868 100644 (file)
@@ -227,7 +227,7 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i
        memcpy(push_ctx->iov, iov, sizeof(struct iovec[iovcnt]));
 
        session2_wrap_after(tls->h.session, PROTOLAYER_PROTOCOL_TLS,
-                       protolayer_iovec(push_ctx->iov, iovcnt), NULL,
+                       protolayer_iovec(push_ctx->iov, iovcnt, true), NULL,
                        kres_gnutls_push_finished, push_ctx);
 
        return total_len;
@@ -1158,7 +1158,7 @@ static enum protolayer_iter_cb_result pl_tls_unwrap(void *sess_data, void *iter_
        struct protolayer_iter_ctx *ctx_head = queue_head(tls->unwrap_queue);
        if (!kr_fails_assert(ctx == ctx_head))
                queue_pop(tls->unwrap_queue);
-       ctx->payload = protolayer_wire_buf(&tls->unwrap_buf);
+       ctx->payload = protolayer_wire_buf(&tls->unwrap_buf, false);
        return protolayer_continue(ctx);
 
 exit_break:
index 813df6c7598de66cfba0538a82f9e4b4fb08e5f6..8d0a6b28d320e5c7e64fc7799ab4e94035bd7899 100644 (file)
@@ -660,7 +660,8 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
 
        /* Pending '_finished' callback on current task */
        qr_task_ref(task);
-       struct protolayer_payload payload = protolayer_buffer((char *)pkt->wire, pkt->size);
+       struct protolayer_payload payload = protolayer_buffer(
+                       (char *)pkt->wire, pkt->size, false);
        payload.ttl = packet_ttl(pkt);
        ret = session2_wrap(session, payload, comm, qr_task_wrap_finished, task);
 
@@ -2241,7 +2242,7 @@ static enum protolayer_iter_cb_result pl_dns_stream_wrap(
                stream->sent.mem = siov;
                stream->sent.pool = &s->pool;
 
-               ctx->payload = protolayer_iovec(siov->iovs, iovcnt);
+               ctx->payload = protolayer_iovec(siov->iovs, iovcnt, false);
                return protolayer_continue(ctx);
        } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
                const int iovcnt = 1 + ctx->payload.iovec.cnt;
@@ -2267,7 +2268,7 @@ static enum protolayer_iter_cb_result pl_dns_stream_wrap(
                stream->sent.mem = siov;
                stream->sent.pool = &s->pool;
 
-               ctx->payload = protolayer_iovec(siov->iovs, iovcnt);
+               ctx->payload = protolayer_iovec(siov->iovs, iovcnt, false);
                return protolayer_continue(ctx);
        } else {
                kr_assert(false && "Invalid payload");