From: Oto Šťáva Date: Fri, 23 Jun 2023 09:02:34 +0000 (+0200) Subject: daemon/session2: make copies short-lived buffers when needed X-Git-Tag: v6.0.2~42^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bba209bf92fb71b18482e7248a2a40a71237e6e9;p=thirdparty%2Fknot-resolver.git daemon/session2: make copies short-lived buffers when needed --- diff --git a/daemon/http.c b/daemon/http.c index 2d51b7dab..7d1f0899c 100644 --- a/daemon/http.c +++ b/daemon/http.c @@ -390,7 +390,7 @@ static ssize_t send_callback(nghttp2_session *h2, const uint8_t *data, size_t le kr_log_debug(DOH, "[%p] send_callback: %p\n", (void *)h2, (void *)send_ctx->data); session2_wrap_after(http->h.session, PROTOLAYER_PROTOCOL_HTTP, - protolayer_buffer(send_ctx->data, length), NULL, + protolayer_buffer(send_ctx->data, length, false), NULL, callback_finished_free_baton, send_ctx); return length; @@ -506,7 +506,7 @@ static int send_data_callback(nghttp2_session *h2, nghttp2_frame *frame, const u kr_assert(cur == iovcnt); int ret = session2_wrap_after(http->h.session, PROTOLAYER_PROTOCOL_HTTP, - protolayer_iovec(dest_iov, cur), + protolayer_iovec(dest_iov, cur, false), NULL, callback_finished_free_baton, sdctx); if (ret < 0) @@ -733,7 +733,7 @@ static int submit_to_wirebuffer(struct pl_http_sess_data *ctx) ret = 0; session2_unwrap_after(ctx->h.session, PROTOLAYER_PROTOCOL_HTTP, - protolayer_wire_buf(wb), NULL, NULL, NULL); + protolayer_wire_buf(wb, false), NULL, NULL, NULL); cleanup: http_cleanup_stream(ctx); return ret; diff --git a/daemon/io.c b/daemon/io.c index 4f61a0a9c..ea0330044 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -90,8 +90,8 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, .comm_addr = comm_addr, .src_addr = comm_addr }; - session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf), &in_comm, - udp_on_unwrapped, NULL); + session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf, false), + &in_comm, udp_on_unwrapped, NULL); } static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name) @@ -189,7 +189,8 @@ static enum protolayer_iter_cb_result pl_udp_unwrap( } } - ctx->payload = protolayer_buffer(data + trimmed, data_len - trimmed); + ctx->payload = protolayer_buffer( + data + trimmed, data_len - trimmed, false); } return protolayer_continue(ctx); @@ -271,7 +272,7 @@ static enum protolayer_iter_cb_result pl_tcp_unwrap( memcpy(wire_buf_free_space(&tcp->wire_buf), buf, len); wire_buf_consume(&tcp->wire_buf, ctx->payload.buffer.len); - ctx->payload = protolayer_wire_buf(&tcp->wire_buf); + ctx->payload = protolayer_wire_buf(&tcp->wire_buf, false); } if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) { @@ -523,7 +524,8 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } - session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf), NULL, NULL, NULL); + session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf, false), + NULL, NULL, NULL); } static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp) @@ -936,7 +938,9 @@ static void xdp_rx(uv_poll_t* handle, int status, int events) memcpy(comm.eth_from, msg->eth_from, sizeof(comm.eth_from)); memcpy(comm.eth_to, msg->eth_to, sizeof(comm.eth_to)); session2_unwrap(xhd->session, - protolayer_buffer(msg->payload.iov_base, msg->payload.iov_len), + protolayer_buffer( + msg->payload.iov_base, + msg->payload.iov_len, false), &comm, NULL, NULL); if (ret) kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret)); diff --git a/daemon/session2.c b/daemon/session2.c index 10534cba5..491f0e6a0 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -135,32 +135,58 @@ const char *protolayer_payload_name(enum protolayer_payload_type p) /* Forward decls. */ static int session2_transport_pushv(struct session2 *s, struct iovec *iov, int iovcnt, + bool iov_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton); static inline int session2_transport_push(struct session2 *s, char *buf, size_t buf_len, + bool buf_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton); static int session2_transport_event(struct session2 *s, enum protolayer_event_type event, void *baton); +static size_t iovecs_size(const struct iovec *iov, int cnt) +{ + size_t sum = 0; + for (int i = 0; i < cnt; i++) { + sum += iov[i].iov_len; + } + return sum; +} + +static size_t iovecs_copy(void *dest, const struct iovec *iov, int cnt, + size_t max_len) +{ + const size_t pld_size = iovecs_size(iov, cnt); + const size_t copy_size = MIN(max_len, pld_size); + char *cur = dest; + size_t remaining = copy_size; + for (int i = 0; i < cnt && remaining; i++) { + size_t l = iov[i].iov_len; + size_t to_copy = MIN(l, remaining); + memcpy(cur, iov[i].iov_base, to_copy); + remaining -= l; + cur += l; + } + + kr_assert(remaining == 0 && (cur - (char *)dest) == copy_size); + return copy_size; +} size_t protolayer_payload_size(const struct protolayer_payload *payload) { - if (payload->type == PROTOLAYER_PAYLOAD_BUFFER) { + switch (payload->type) { + case PROTOLAYER_PAYLOAD_BUFFER: return payload->buffer.len; - } else if (payload->type == PROTOLAYER_PAYLOAD_IOVEC) { - size_t sum = 0; - for (int i = 0; i < payload->iovec.cnt; i++) { - sum += payload->iovec.iov[i].iov_len; - } - return sum; - } else if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) { + case PROTOLAYER_PAYLOAD_IOVEC: + return iovecs_size(payload->iovec.iov, payload->iovec.cnt); + case PROTOLAYER_PAYLOAD_WIRE_BUF: return wire_buf_data_length(payload->wire_buf); - } else if(!payload->type) { + case PROTOLAYER_PAYLOAD_NULL: return 0; - } else { + default: kr_assert(false && "Invalid payload type"); return 0; } @@ -208,6 +234,7 @@ struct protolayer_payload protolayer_as_buffer(const struct protolayer_payload * if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) { struct protolayer_payload new_payload = { .type = PROTOLAYER_PAYLOAD_BUFFER, + .short_lived = payload->short_lived, .ttl = payload->ttl, .buffer = { .buf = wire_buf_data(payload->wire_buf), @@ -373,6 +400,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret) ctx->finished_cb(ret, session, &ctx->comm, ctx->finished_cb_baton); + free(ctx->async_buffer); free(ctx); return ret; @@ -402,10 +430,12 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx) if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) { session2_transport_push(session, ctx->payload.buffer.buf, ctx->payload.buffer.len, + ctx->payload.short_lived, &ctx->comm, protolayer_push_finished, ctx); } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { session2_transport_pushv(session, ctx->payload.iovec.iov, ctx->payload.iovec.cnt, + ctx->payload.short_lived, &ctx->comm, protolayer_push_finished, ctx); } else { kr_assert(false && "Invalid payload type"); @@ -415,6 +445,23 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx) return PROTOLAYER_RET_ASYNC; } +static void protolayer_ensure_long_lived(struct protolayer_iter_ctx *ctx) +{ + if (!ctx->payload.short_lived) + return; + + size_t buf_len = protolayer_payload_size(&ctx->payload); + if (kr_fails_assert(buf_len)) + return; + + void *buf = malloc(buf_len); + kr_require(buf); + protolayer_payload_copy(buf, &ctx->payload, buf_len); + + ctx->async_buffer = buf; + ctx->payload = protolayer_buffer(buf, buf_len, false); +} + /** Processes as many layers as possible synchronously, returning when either * a layer has gone asynchronous, or when the whole sequence has finished. * @@ -459,6 +506,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx) if (!ctx->action) { /* Next step is from a callback */ ctx->async_mode = true; + protolayer_ensure_long_lived(ctx); return PROTOLAYER_RET_ASYNC; } @@ -1229,8 +1277,7 @@ struct session2_pushv_ctx { const struct comm_info *comm; void *baton; - char *buf; - size_t buf_len; + char *async_buf; }; static void session2_transport_parent_pushv_finished(int status, @@ -1241,36 +1288,32 @@ static void session2_transport_parent_pushv_finished(int status, struct session2_pushv_ctx *ctx = baton; if (ctx->cb) ctx->cb(status, ctx->session, comm, ctx->baton); - free(ctx->buf); + free(ctx->async_buf); free(ctx); } -static void session2_transport_udp_queue_pushv_finished(int status, void *baton) +static void session2_transport_pushv_finished(int status, struct session2_pushv_ctx *ctx) { - struct session2_pushv_ctx *ctx = baton; if (ctx->cb) ctx->cb(status, ctx->session, ctx->comm, ctx->baton); - free(ctx->buf); + free(ctx->async_buf); free(ctx); } +static void session2_transport_udp_queue_pushv_finished(int status, void *baton) +{ + session2_transport_pushv_finished(status, baton); +} + static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status) { - struct session2_pushv_ctx *ctx = req->data; - if (ctx->cb) - ctx->cb(status, ctx->session, ctx->comm, ctx->baton); - free(ctx->buf); - free(ctx); + session2_transport_pushv_finished(status, req->data); free(req); } static void session2_transport_stream_pushv_finished(uv_write_t *req, int status) { - struct session2_pushv_ctx *ctx = req->data; - if (ctx->cb) - ctx->cb(status, ctx->session, ctx->comm, ctx->baton); - free(ctx->buf); - free(ctx); + session2_transport_pushv_finished(status, req->data); free(req); } @@ -1298,11 +1341,35 @@ static void xdp_tx_waker(uv_idle_t *handle) } #endif +static void session2_transport_pushv_ensure_long_lived( + struct iovec **iov, int *iovcnt, bool iov_short_lived, + struct iovec *out_iovecmem, struct session2_pushv_ctx *ctx) +{ + if (!iov_short_lived) + return; + + size_t iovsize = iovecs_size(*iov, *iovcnt); + if (kr_fails_assert(iovsize)) + return; + + void *buf = malloc(iovsize); + kr_require(buf); + iovecs_copy(buf, *iov, *iovcnt, iovsize); + + ctx->async_buf = buf; + out_iovecmem->iov_base = buf; + out_iovecmem->iov_len = iovsize; + *iov = out_iovecmem; + *iovcnt = 1; +} + static int session2_transport_pushv(struct session2 *s, struct iovec *iov, int iovcnt, + bool iov_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { + struct iovec iovecmem; if (kr_fails_assert(s)) return kr_error(EINVAL); @@ -1336,34 +1403,46 @@ static int session2_transport_pushv(struct session2 *s, if (kr_fails_assert(iovcnt == 1)) return kr_error(EINVAL); + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len, session2_transport_udp_queue_pushv_finished, ctx); return kr_ok(); } else { - uv_udp_send_t *req = malloc(sizeof(*req)); - req->data = ctx; - int ret = uv_udp_send(req, (uv_udp_t *)handle, - (uv_buf_t *)iov, iovcnt, comm->comm_addr, - session2_transport_udp_pushv_finished); - if (ret) { - if (cb) - cb(ret, s, comm, baton); - free(req); - free(ctx); + int ret = uv_udp_try_send((uv_udp_t*)handle, + (uv_buf_t *)iov, iovcnt, comm->comm_addr); + if (ret == UV_EAGAIN) { + uv_udp_send_t *req = malloc(sizeof(*req)); + req->data = ctx; + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); + ret = uv_udp_send(req, (uv_udp_t *)handle, + (uv_buf_t *)iov, iovcnt, comm->comm_addr, + session2_transport_udp_pushv_finished); + if (ret) + session2_transport_udp_pushv_finished(req, ret); + } else { + session2_transport_pushv_finished(ret, ctx); } return ret; } } else if (handle->type == UV_TCP) { - uv_write_t *req = malloc(sizeof(*req)); - req->data = ctx; - int ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt, - session2_transport_stream_pushv_finished); - if (ret) { - if (cb) - cb(ret, s, comm, baton); - free(req); - free(ctx); + int ret = uv_try_write((uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt); + if (ret == UV_EAGAIN) { + uv_write_t *req = malloc(sizeof(*req)); + req->data = ctx; + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); + ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt, + session2_transport_stream_pushv_finished); + if (ret) + session2_transport_stream_pushv_finished(req, ret); + } else { + session2_transport_pushv_finished(ret, ctx); } return ret; #if ENABLE_XDP @@ -1376,6 +1455,10 @@ static int session2_transport_pushv(struct session2 *s, if (kr_fails_assert(iovcnt == 1)) return kr_error(EINVAL); + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); + knot_xdp_msg_t msg; #if KNOT_VERSION_HEX >= 0x030100 /* We don't have a nice way of preserving the _msg_t from frame allocation, @@ -1415,7 +1498,8 @@ static int session2_transport_pushv(struct session2 *s, free(ctx); return kr_error(EINVAL); } - int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt), + int ret = session2_wrap(parent, + protolayer_iovec(iov, iovcnt, iov_short_lived), comm, session2_transport_parent_pushv_finished, ctx); return (ret < 0) ? ret : kr_ok(); @@ -1446,6 +1530,7 @@ static void session2_transport_single_push_finished(int status, static inline int session2_transport_push(struct session2 *s, char *buf, size_t buf_len, + bool buf_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { @@ -1460,7 +1545,7 @@ static inline int session2_transport_push(struct session2 *s, .baton = baton }; - return session2_transport_pushv(s, &ctx->iov, 1, comm, + return session2_transport_pushv(s, &ctx->iov, 1, buf_short_lived, comm, session2_transport_single_push_finished, ctx); } diff --git a/daemon/session2.h b/daemon/session2.h index 00fa388cb..133bb10ff 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -385,7 +385,15 @@ const char *protolayer_payload_name(enum protolayer_payload_type p); * is ever (de-)allocated by the protolayer manager! */ struct protolayer_payload { enum protolayer_payload_type type; - unsigned int ttl; /**< time-to-live hint (e.g. for HTTP Cache-Control) */ + + /** Time-to-live hint (e.g. for HTTP Cache-Control) */ + unsigned int ttl; + + /** If `true`, the payload's memory may be freed early as kresd does not + * completely control its lifetime. When going asynchronous, it needs to + * be copied. */ + bool short_lived; + union { /** Only valid if `type` is `_BUFFER`. */ struct { @@ -426,6 +434,7 @@ struct protolayer_iter_ctx { struct protolayer_manager *manager; int status; enum protolayer_iter_action action; + void *async_buffer; /** Contains a sequence of variably-sized CPU-aligned layer-specific * structs. See `struct protolayer_manager::data`. */ @@ -444,10 +453,12 @@ size_t protolayer_payload_copy(void *dest, size_t max_len); /** Convenience function to get a buffer-type payload. */ -static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len) +static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len, + bool short_lived) { return (struct protolayer_payload){ .type = PROTOLAYER_PAYLOAD_BUFFER, + .short_lived = short_lived, .buffer = { .buf = buf, .len = len @@ -457,10 +468,11 @@ static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len) /** Convenience function to get an iovec-type payload. */ static inline struct protolayer_payload protolayer_iovec( - struct iovec *iov, int iovcnt) + struct iovec *iov, int iovcnt, bool short_lived) { return (struct protolayer_payload){ .type = PROTOLAYER_PAYLOAD_IOVEC, + .short_lived = short_lived, .iovec = { .iov = iov, .cnt = iovcnt @@ -469,10 +481,12 @@ static inline struct protolayer_payload protolayer_iovec( } /** Convenience function to get a wire-buf-type payload. */ -static inline struct protolayer_payload protolayer_wire_buf(struct wire_buf *wire_buf) +static inline struct protolayer_payload protolayer_wire_buf( + struct wire_buf *wire_buf, bool short_lived) { return (struct protolayer_payload){ .type = PROTOLAYER_PAYLOAD_WIRE_BUF, + .short_lived = short_lived, .wire_buf = wire_buf }; } diff --git a/daemon/tls.c b/daemon/tls.c index 6cddfb132..1512f0451 100644 --- a/daemon/tls.c +++ b/daemon/tls.c @@ -227,7 +227,7 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i memcpy(push_ctx->iov, iov, sizeof(struct iovec[iovcnt])); session2_wrap_after(tls->h.session, PROTOLAYER_PROTOCOL_TLS, - protolayer_iovec(push_ctx->iov, iovcnt), NULL, + protolayer_iovec(push_ctx->iov, iovcnt, true), NULL, kres_gnutls_push_finished, push_ctx); return total_len; @@ -1158,7 +1158,7 @@ static enum protolayer_iter_cb_result pl_tls_unwrap(void *sess_data, void *iter_ struct protolayer_iter_ctx *ctx_head = queue_head(tls->unwrap_queue); if (!kr_fails_assert(ctx == ctx_head)) queue_pop(tls->unwrap_queue); - ctx->payload = protolayer_wire_buf(&tls->unwrap_buf); + ctx->payload = protolayer_wire_buf(&tls->unwrap_buf, false); return protolayer_continue(ctx); exit_break: diff --git a/daemon/worker.c b/daemon/worker.c index 813df6c75..8d0a6b28d 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -660,7 +660,8 @@ static int qr_task_send(struct qr_task *task, struct session2 *session, /* Pending '_finished' callback on current task */ qr_task_ref(task); - struct protolayer_payload payload = protolayer_buffer((char *)pkt->wire, pkt->size); + struct protolayer_payload payload = protolayer_buffer( + (char *)pkt->wire, pkt->size, false); payload.ttl = packet_ttl(pkt); ret = session2_wrap(session, payload, comm, qr_task_wrap_finished, task); @@ -2241,7 +2242,7 @@ static enum protolayer_iter_cb_result pl_dns_stream_wrap( stream->sent.mem = siov; stream->sent.pool = &s->pool; - ctx->payload = protolayer_iovec(siov->iovs, iovcnt); + ctx->payload = protolayer_iovec(siov->iovs, iovcnt, false); return protolayer_continue(ctx); } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { const int iovcnt = 1 + ctx->payload.iovec.cnt; @@ -2267,7 +2268,7 @@ static enum protolayer_iter_cb_result pl_dns_stream_wrap( stream->sent.mem = siov; stream->sent.pool = &s->pool; - ctx->payload = protolayer_iovec(siov->iovs, iovcnt); + ctx->payload = protolayer_iovec(siov->iovs, iovcnt, false); return protolayer_continue(ctx); } else { kr_assert(false && "Invalid payload");