From: Vsevolod Stakhov Date: Sun, 8 Feb 2026 12:18:16 +0000 (+0000) Subject: [Feature] protocol: Zero-copy piecewise writev for v3 multipart responses X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=205b019740a1c45da9b70d76d002618edabd23a8;p=thirdparty%2Frspamd.git [Feature] protocol: Zero-copy piecewise writev for v3 multipart responses Add body_iov support to the HTTP message layer so the write path can use writev with multiple iovec segments instead of a single contiguous buffer. The v3 multipart response now builds its boundary/header strings and data pointers as separate iovecs, avoiding extra copies of the UCL result and rewritten message body. The cryptobox encryption path handles multiple body segments via encryptv_nm_inplace seamlessly. --- diff --git a/src/libserver/http/http_connection.c b/src/libserver/http/http_connection.c index d998d8ae35..1835071658 100644 --- a/src/libserver/http/http_connection.c +++ b/src/libserver/http/http_connection.c @@ -1702,9 +1702,10 @@ rspamd_http_connection_encrypt_message( outlen = priv->out[0].iov_len + priv->out[1].iov_len; /* * Create segments from the following: - * Method, [URL], CRLF, nheaders, CRLF, body + * Method, [URL], CRLF, nheaders, CRLF, body segment(s) */ - segments = g_new(struct rspamd_cryptobox_segment, hdrcount + 5); + gsize body_seg_count = (msg->body_iov_count > 0) ? msg->body_iov_count : (pbody ? 1 : 0); + segments = g_new(struct rspamd_cryptobox_segment, hdrcount + 4 + body_seg_count); segments[0].data = pmethod; segments[0].len = methodlen; @@ -1739,7 +1740,13 @@ rspamd_http_connection_encrypt_message( segments[i].data = crlfp; segments[i++].len = 2; -if (pbody) { +if (msg->body_iov_count > 0) { + for (gsize j = 0; j < msg->body_iov_count; j++) { + segments[i].data = msg->body_iov[j].iov_base; + segments[i++].len = msg->body_iov[j].iov_len; + } +} +else if (pbody) { segments[i].data = pbody; segments[i++].len = bodylen; } @@ -2267,7 +2274,12 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn, } if (encrypted) { - if (msg->body_buf.len == 0) { + if (msg->body_iov_count > 0) { + pbody = NULL; + bodylen = msg->body_buf.len; + msg->method = HTTP_POST; + } + else if (msg->body_buf.len == 0) { pbody = NULL; bodylen = 0; msg->method = HTTP_GET; @@ -2288,7 +2300,7 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn, * iov[6] = encrypted crlf * iov[7..n] = encrypted headers * iov[n + 1] = encrypted crlf - * [iov[n + 2] = encrypted body] + * [iov[n + 2..] = encrypted body segment(s)] */ priv->outlen = 7; enclen = crypto_box_noncebytes() + @@ -2307,7 +2319,7 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn, * iov[7] = encrypted prelude * iov[8..n] = encrypted headers * iov[n + 1] = encrypted crlf - * [iov[n + 2] = encrypted body] + * [iov[n + 2..] = encrypted body segment(s)] */ priv->outlen = 8; @@ -2341,12 +2353,26 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn, } if (bodylen > 0) { - priv->outlen++; + if (msg->body_iov_count > 0) { + priv->outlen += msg->body_iov_count; + } + else { + priv->outlen++; + } } } else { if (msg->method < HTTP_SYMBOLS) { - if (msg->body_buf.len == 0 || allow_shared) { + if (msg->body_iov_count > 0) { + pbody = NULL; + bodylen = msg->body_buf.len; + priv->outlen = 2 + msg->body_iov_count; + + if (msg->method == HTTP_INVALID) { + msg->method = HTTP_POST; + } + } + else if (msg->body_buf.len == 0 || allow_shared) { pbody = NULL; bodylen = 0; priv->outlen = 2; @@ -2475,7 +2501,13 @@ else priv->wr_total -= 2; } -if (pbody != NULL) { +if (msg->body_iov_count > 0) { + for (gsize j = 0; j < msg->body_iov_count; j++) { + priv->out[i].iov_base = msg->body_iov[j].iov_base; + priv->out[i++].iov_len = msg->body_iov[j].iov_len; + } +} +else if (pbody != NULL) { priv->out[i].iov_base = pbody; priv->out[i++].iov_len = bodylen; } diff --git a/src/libserver/http/http_message.c b/src/libserver/http/http_message.c index e5e4a0469a..92ca8334ca 100644 --- a/src/libserver/http/http_message.c +++ b/src/libserver/http/http_message.c @@ -440,11 +440,33 @@ rspamd_http_message_append_body(struct rspamd_http_message *msg, return TRUE; } +void rspamd_http_message_set_body_iov(struct rspamd_http_message *msg, + struct iovec *iov, gsize count, + gsize total_len) +{ + rspamd_http_message_storage_cleanup(msg); + + msg->body_iov = iov; + msg->body_iov_count = count; + msg->body_buf.len = total_len; + msg->body_buf.begin = NULL; + msg->body_buf.str = NULL; + msg->body_buf.allocated_len = 0; + msg->flags |= RSPAMD_HTTP_FLAG_HAS_BODY; +} + void rspamd_http_message_storage_cleanup(struct rspamd_http_message *msg) { union _rspamd_storage_u *storage; struct stat st; + /* Free piecewise body iov if present */ + if (msg->body_iov) { + g_free(msg->body_iov); + msg->body_iov = NULL; + msg->body_iov_count = 0; + } + if (msg->flags & RSPAMD_HTTP_FLAG_SHMEM) { storage = &msg->body_buf.c; diff --git a/src/libserver/http/http_message.h b/src/libserver/http/http_message.h index 45ca3a2f92..0b1c9441c6 100644 --- a/src/libserver/http/http_message.h +++ b/src/libserver/http/http_message.h @@ -21,6 +21,7 @@ #include "keypairs_cache.h" #include "fstring.h" #include "ref.h" +#include #ifdef __cplusplus @@ -149,6 +150,19 @@ gboolean rspamd_http_message_set_body_from_fstring_copy(struct rspamd_http_messa gboolean rspamd_http_message_append_body(struct rspamd_http_message *msg, const char *data, gsize len); +/** + * Set message body from an array of iovec segments (piecewise/scatter body). + * The message takes ownership of the iov array (will g_free it). + * The data pointers inside iovecs are NOT owned — caller must keep data alive. + * @param msg message + * @param iov array of iovec segments (ownership transferred) + * @param count number of segments + * @param total_len sum of all iov_len values + */ +void rspamd_http_message_set_body_iov(struct rspamd_http_message *msg, + struct iovec *iov, gsize count, + gsize total_len); + /** * Append a header to http message * @param rep diff --git a/src/libserver/http/http_private.h b/src/libserver/http/http_private.h index de683a91b9..bbdeb7e0a4 100644 --- a/src/libserver/http/http_private.h +++ b/src/libserver/http/http_private.h @@ -72,6 +72,12 @@ struct rspamd_http_message { } c; } body_buf; + /* Piecewise body: multiple iovec segments instead of single buffer. + * When body_iov_count > 0, the write path uses these instead of body_buf.begin. + * body_buf.len still holds total length (for Content-Length). */ + struct iovec *body_iov; + gsize body_iov_count; + struct rspamd_cryptobox_pubkey *peer_key; time_t date; time_t last_modified; diff --git a/src/libserver/multipart_response.cxx b/src/libserver/multipart_response.cxx index 99856bf702..8143b47084 100644 --- a/src/libserver/multipart_response.cxx +++ b/src/libserver/multipart_response.cxx @@ -129,6 +129,118 @@ auto multipart_response::serialize(void *zstream) const -> std::string return out; } +void multipart_response::prepare_iov(void *zstream) +{ + body_iov_.clear(); + owned_headers_.clear(); + owned_compressed_.clear(); + body_total_len_ = 0; + + for (size_t idx = 0; idx < parts_.size(); idx++) { + const auto &part = parts_[idx]; + + /* Build part header string */ + std::string hdr; + if (idx == 0) { + hdr.append("--"); + } + else { + hdr.append("\r\n--"); + } + hdr.append(boundary_); + hdr.append("\r\n"); + hdr.append("Content-Disposition: form-data; name=\""); + hdr.append(part.name); + hdr.append("\"\r\n"); + if (!part.content_type.empty()) { + hdr.append("Content-Type: "); + hdr.append(part.content_type); + hdr.append("\r\n"); + } + + /* Handle compression */ + bool compressed = false; + if (part.compress && zstream && !part.data.empty()) { + auto *cstream = static_cast(zstream); + size_t bound = ZSTD_compressBound(part.data.size()); + std::string compressed_buf(bound, '\0'); + + ZSTD_inBuffer zin{}; + zin.src = part.data.data(); + zin.size = part.data.size(); + zin.pos = 0; + + ZSTD_outBuffer zout{}; + zout.dst = compressed_buf.data(); + zout.size = compressed_buf.size(); + zout.pos = 0; + + ZSTD_CCtx_reset(cstream, ZSTD_reset_session_only); + + bool ok = true; + while (zin.pos < zin.size) { + size_t r = ZSTD_compressStream2(cstream, &zout, &zin, ZSTD_e_continue); + if (ZSTD_isError(r)) { + ok = false; + break; + } + } + + if (ok) { + size_t r = ZSTD_compressStream2(cstream, &zout, &zin, ZSTD_e_end); + if (ZSTD_isError(r)) { + ok = false; + } + } + + if (ok) { + compressed_buf.resize(zout.pos); + compressed = true; + hdr.append("Content-Encoding: zstd\r\n"); + hdr.append("\r\n"); + owned_compressed_.push_back(std::move(compressed_buf)); + } + } + if (!compressed) { + hdr.append("\r\n"); + } + + /* Store owned header, add iov for it */ + owned_headers_.push_back(std::move(hdr)); + body_iov_.push_back({ + const_cast(owned_headers_.back().data()), + owned_headers_.back().size(), + }); + body_total_len_ += owned_headers_.back().size(); + + /* Add iov for data (compressed or original — zero-copy for original) */ + if (compressed) { + const auto &comp = owned_compressed_.back(); + body_iov_.push_back({ + const_cast(comp.data()), + comp.size(), + }); + body_total_len_ += comp.size(); + } + else if (!part.data.empty()) { + body_iov_.push_back({ + const_cast(part.data.data()), + part.data.size(), + }); + body_total_len_ += part.data.size(); + } + } + + /* Closing boundary: "\r\n--boundary--\r\n" */ + std::string trailer = "\r\n--" + boundary_ + "--\r\n"; + owned_headers_.push_back(std::move(trailer)); + body_iov_.push_back({ + const_cast(owned_headers_.back().data()), + owned_headers_.back().size(), + }); + body_total_len_ += owned_headers_.back().size(); +} + auto multipart_response::content_type() const -> std::string { return "multipart/mixed; boundary=\"" + boundary_ + "\""; @@ -183,6 +295,32 @@ rspamd_multipart_response_serialize( return rspamd_fstring_new_init(serialized.data(), serialized.size()); } +void rspamd_multipart_response_prepare_iov( + struct rspamd_multipart_response_c *resp, + void *zstream) +{ + if (!resp) { + return; + } + resp->resp.prepare_iov(zstream); +} + +const struct iovec * +rspamd_multipart_response_body_iov( + struct rspamd_multipart_response_c *resp, + gsize *count, + gsize *total_len) +{ + if (!resp) { + if (count) *count = 0; + if (total_len) *total_len = 0; + return NULL; + } + if (count) *count = resp->resp.body_iov_count(); + if (total_len) *total_len = resp->resp.body_total_len(); + return resp->resp.body_iov(); +} + const char * rspamd_multipart_response_content_type( struct rspamd_multipart_response_c *resp) diff --git a/src/libserver/multipart_response.h b/src/libserver/multipart_response.h index 6b7e97f7b6..62b3422005 100644 --- a/src/libserver/multipart_response.h +++ b/src/libserver/multipart_response.h @@ -19,6 +19,7 @@ #include "config.h" #include "libutil/fstring.h" +#include #ifdef __cplusplus extern "C" { @@ -45,6 +46,28 @@ rspamd_fstring_t *rspamd_multipart_response_serialize( struct rspamd_multipart_response_c *resp, void *zstream); +/** + * Prepare piecewise iov segments for zero-copy writev. + * @param resp response handle + * @param zstream ZSTD compression stream (may be NULL) + */ +void rspamd_multipart_response_prepare_iov( + struct rspamd_multipart_response_c *resp, + void *zstream); + +/** + * Get the prepared body iov segments. + * Returned pointer is valid until resp is freed. + * @param resp response handle + * @param count [out] number of iov segments + * @param total_len [out] total byte length across all segments + * @return pointer to iov array + */ +const struct iovec *rspamd_multipart_response_body_iov( + struct rspamd_multipart_response_c *resp, + gsize *count, + gsize *total_len); + /** * Get the Content-Type header value (includes boundary). * The returned string is valid until resp is freed. diff --git a/src/libserver/multipart_response.hxx b/src/libserver/multipart_response.hxx index a8611d1b0d..a43c38df9c 100644 --- a/src/libserver/multipart_response.hxx +++ b/src/libserver/multipart_response.hxx @@ -20,6 +20,7 @@ #include #include #include +#include namespace rspamd::http { @@ -44,6 +45,27 @@ public: */ auto serialize(void *zstream = nullptr) const -> std::string; + /** + * Prepare body as iovec segments for zero-copy writev. + * Boundary/header strings and compressed buffers are owned internally. + * Data pointers for uncompressed parts reference the original part.data. + * @param zstream ZSTD compression stream (may be null) + */ + void prepare_iov(void *zstream = nullptr); + + const struct iovec *body_iov() const + { + return body_iov_.data(); + } + std::size_t body_iov_count() const + { + return body_iov_.size(); + } + std::size_t body_total_len() const + { + return body_total_len_; + } + /** * Get the Content-Type header value including boundary. */ @@ -57,6 +79,12 @@ public: private: std::string boundary_; std::vector parts_; + + /* Iov support (populated by prepare_iov) */ + std::vector body_iov_; + std::vector owned_headers_; /* boundary+headers per part */ + std::vector owned_compressed_; /* compressed data buffers */ + std::size_t body_total_len_ = 0; }; }// namespace rspamd::http diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index f59a04be95..fda9b9a243 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -2711,15 +2711,29 @@ rspamd_protocol_http_reply_v3(struct rspamd_http_message *msg, zstream = task->cfg->libs_ctx->out_zstream; } - rspamd_fstring_t *reply = rspamd_multipart_response_serialize(resp, zstream); - const char *resp_ctype = rspamd_multipart_response_content_type(resp); + rspamd_multipart_response_prepare_iov(resp, zstream); + + gsize niov, total_len; + const struct iovec *body_segments = + rspamd_multipart_response_body_iov(resp, &niov, &total_len); + + /* Copy iov array — message takes ownership of the copy */ + struct iovec *iov_copy = g_new(struct iovec, niov); + memcpy(iov_copy, body_segments, sizeof(struct iovec) * niov); + rspamd_http_message_set_body_iov(msg, iov_copy, niov, total_len); + const char *resp_ctype = rspamd_multipart_response_content_type(resp); /* Copy Content-Type to task pool so it survives after response is freed */ const char *pool_ctype = rspamd_mempool_strdup(task->task_pool, resp_ctype); - rspamd_http_message_set_body_from_fstring_steal(msg, reply); - rspamd_fstring_free(result_data); - rspamd_multipart_response_free(resp); + /* Keep data alive until after HTTP write: + * - resp owns boundary/header strings and compressed buffers + * - result_data fstring owns the UCL result data + * Both freed when task_pool is destroyed (after write completes) */ + rspamd_mempool_add_destructor(task->task_pool, + (rspamd_mempool_destruct_t) rspamd_multipart_response_free, resp); + rspamd_mempool_add_destructor(task->task_pool, + (rspamd_mempool_destruct_t) rspamd_fstring_free, result_data); rspamd_protocol_update_stats(task); diff --git a/test/rspamd_cxx_unit_multipart.hxx b/test/rspamd_cxx_unit_multipart.hxx index 768290c099..f13089f494 100644 --- a/test/rspamd_cxx_unit_multipart.hxx +++ b/test/rspamd_cxx_unit_multipart.hxx @@ -508,6 +508,81 @@ TEST_SUITE("multipart_roundtrip") REQUIRE(result_part != nullptr); CHECK(result_part->data == data); } + + TEST_CASE("prepare_iov matches serialize for two parts") + { + rspamd::http::multipart_response resp; + std::string result_data = "{\"action\":\"reject\",\"score\":15.0}"; + std::string body_data = "Subject: test\r\n\r\nRewritten body content here"; + resp.add_part("result", "application/json", result_data); + resp.add_part("body", "application/octet-stream", body_data); + + auto serialized = resp.serialize(); + + resp.prepare_iov(); + + /* Reassemble iov into a single string */ + std::string reassembled; + reassembled.reserve(resp.body_total_len()); + for (gsize i = 0; i < resp.body_iov_count(); i++) { + const auto *iov = &resp.body_iov()[i]; + reassembled.append(static_cast(iov->iov_base), iov->iov_len); + } + + CHECK(reassembled.size() == resp.body_total_len()); + CHECK(reassembled == serialized); + } + + TEST_CASE("prepare_iov matches serialize for single part") + { + rspamd::http::multipart_response resp; + std::string data = "{\"action\":\"no action\"}"; + resp.add_part("result", "application/json", data); + + auto serialized = resp.serialize(); + + resp.prepare_iov(); + + std::string reassembled; + reassembled.reserve(resp.body_total_len()); + for (gsize i = 0; i < resp.body_iov_count(); i++) { + const auto *iov = &resp.body_iov()[i]; + reassembled.append(static_cast(iov->iov_base), iov->iov_len); + } + + CHECK(reassembled == serialized); + } + + TEST_CASE("prepare_iov roundtrip through parser") + { + rspamd::http::multipart_response resp; + std::string result_data = "{\"score\":42}"; + std::string body_data = "Hello world body"; + resp.add_part("result", "application/json", result_data); + resp.add_part("body", "application/octet-stream", body_data); + + resp.prepare_iov(); + + /* Reassemble and parse */ + std::string reassembled; + for (gsize i = 0; i < resp.body_iov_count(); i++) { + const auto *iov = &resp.body_iov()[i]; + reassembled.append(static_cast(iov->iov_base), iov->iov_len); + } + + auto boundary = std::string(resp.get_boundary()); + auto parsed = rspamd::http::parse_multipart_form(reassembled, boundary); + REQUIRE(parsed.has_value()); + CHECK(parsed->parts.size() == 2); + + auto *rp = rspamd::http::find_part(*parsed, "result"); + REQUIRE(rp != nullptr); + CHECK(rp->data == result_data); + + auto *bp = rspamd::http::find_part(*parsed, "body"); + REQUIRE(bp != nullptr); + CHECK(bp->data == body_data); + } } #endif// RSPAMD_CXX_UNIT_MULTIPART_HXX