]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] protocol: Zero-copy piecewise writev for v3 multipart responses
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sun, 8 Feb 2026 12:18:16 +0000 (12:18 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sun, 8 Feb 2026 13:21:04 +0000 (13:21 +0000)
Add body_iov support to the HTTP message layer so the write path can use
writev with multiple iovec segments instead of a single contiguous buffer.
The v3 multipart response now builds its boundary/header strings and data
pointers as separate iovecs, avoiding extra copies of the UCL result and
rewritten message body. The cryptobox encryption path handles multiple body
segments via encryptv_nm_inplace seamlessly.

src/libserver/http/http_connection.c
src/libserver/http/http_message.c
src/libserver/http/http_message.h
src/libserver/http/http_private.h
src/libserver/multipart_response.cxx
src/libserver/multipart_response.h
src/libserver/multipart_response.hxx
src/libserver/protocol.c
test/rspamd_cxx_unit_multipart.hxx

index d998d8ae35346974ad08156748c6ce559e444397..18350716585b3c078fb030340d250bc5beedd978 100644 (file)
@@ -1702,9 +1702,10 @@ rspamd_http_connection_encrypt_message(
        outlen = priv->out[0].iov_len + priv->out[1].iov_len;
        /*
         * Create segments from the following:
-        * Method, [URL], CRLF, nheaders, CRLF, body
+        * Method, [URL], CRLF, nheaders, CRLF, body segment(s)
         */
-       segments = g_new(struct rspamd_cryptobox_segment, hdrcount + 5);
+       gsize body_seg_count = (msg->body_iov_count > 0) ? msg->body_iov_count : (pbody ? 1 : 0);
+       segments = g_new(struct rspamd_cryptobox_segment, hdrcount + 4 + body_seg_count);
 
        segments[0].data = pmethod;
        segments[0].len = methodlen;
@@ -1739,7 +1740,13 @@ rspamd_http_connection_encrypt_message(
 segments[i].data = crlfp;
 segments[i++].len = 2;
 
-if (pbody) {
+if (msg->body_iov_count > 0) {
+       for (gsize j = 0; j < msg->body_iov_count; j++) {
+               segments[i].data = msg->body_iov[j].iov_base;
+               segments[i++].len = msg->body_iov[j].iov_len;
+       }
+}
+else if (pbody) {
        segments[i].data = pbody;
        segments[i++].len = bodylen;
 }
@@ -2267,7 +2274,12 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn,
        }
 
        if (encrypted) {
-               if (msg->body_buf.len == 0) {
+               if (msg->body_iov_count > 0) {
+                       pbody = NULL;
+                       bodylen = msg->body_buf.len;
+                       msg->method = HTTP_POST;
+               }
+               else if (msg->body_buf.len == 0) {
                        pbody = NULL;
                        bodylen = 0;
                        msg->method = HTTP_GET;
@@ -2288,7 +2300,7 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn,
                         * iov[6] = encrypted crlf
                         * iov[7..n] = encrypted headers
                         * iov[n + 1] = encrypted crlf
-                        * [iov[n + 2] = encrypted body]
+                        * [iov[n + 2..] = encrypted body segment(s)]
                         */
                        priv->outlen = 7;
                        enclen = crypto_box_noncebytes() +
@@ -2307,7 +2319,7 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn,
                         * iov[7] = encrypted prelude
                         * iov[8..n] = encrypted headers
                         * iov[n + 1] = encrypted crlf
-                        * [iov[n + 2] = encrypted body]
+                        * [iov[n + 2..] = encrypted body segment(s)]
                         */
                        priv->outlen = 8;
 
@@ -2341,12 +2353,26 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn,
                }
 
                if (bodylen > 0) {
-                       priv->outlen++;
+                       if (msg->body_iov_count > 0) {
+                               priv->outlen += msg->body_iov_count;
+                       }
+                       else {
+                               priv->outlen++;
+                       }
                }
        }
        else {
                if (msg->method < HTTP_SYMBOLS) {
-                       if (msg->body_buf.len == 0 || allow_shared) {
+                       if (msg->body_iov_count > 0) {
+                               pbody = NULL;
+                               bodylen = msg->body_buf.len;
+                               priv->outlen = 2 + msg->body_iov_count;
+
+                               if (msg->method == HTTP_INVALID) {
+                                       msg->method = HTTP_POST;
+                               }
+                       }
+                       else if (msg->body_buf.len == 0 || allow_shared) {
                                pbody = NULL;
                                bodylen = 0;
                                priv->outlen = 2;
@@ -2475,7 +2501,13 @@ else
        priv->wr_total -= 2;
 }
 
-if (pbody != NULL) {
+if (msg->body_iov_count > 0) {
+       for (gsize j = 0; j < msg->body_iov_count; j++) {
+               priv->out[i].iov_base = msg->body_iov[j].iov_base;
+               priv->out[i++].iov_len = msg->body_iov[j].iov_len;
+       }
+}
+else if (pbody != NULL) {
        priv->out[i].iov_base = pbody;
        priv->out[i++].iov_len = bodylen;
 }
index e5e4a0469a1fdf442b1ff3c105175ece8d1e8f52..92ca8334ca338e3bfdd626b319fab44a2555e240 100644 (file)
@@ -440,11 +440,33 @@ rspamd_http_message_append_body(struct rspamd_http_message *msg,
        return TRUE;
 }
 
+void rspamd_http_message_set_body_iov(struct rspamd_http_message *msg,
+                                                                         struct iovec *iov, gsize count,
+                                                                         gsize total_len)
+{
+       rspamd_http_message_storage_cleanup(msg);
+
+       msg->body_iov = iov;
+       msg->body_iov_count = count;
+       msg->body_buf.len = total_len;
+       msg->body_buf.begin = NULL;
+       msg->body_buf.str = NULL;
+       msg->body_buf.allocated_len = 0;
+       msg->flags |= RSPAMD_HTTP_FLAG_HAS_BODY;
+}
+
 void rspamd_http_message_storage_cleanup(struct rspamd_http_message *msg)
 {
        union _rspamd_storage_u *storage;
        struct stat st;
 
+       /* Free piecewise body iov if present */
+       if (msg->body_iov) {
+               g_free(msg->body_iov);
+               msg->body_iov = NULL;
+               msg->body_iov_count = 0;
+       }
+
        if (msg->flags & RSPAMD_HTTP_FLAG_SHMEM) {
                storage = &msg->body_buf.c;
 
index 45ca3a2f92cfee75fdb11ac6a0d004bbbed8c6b6..0b1c9441c6f2a11a1f155e82b56c8c7bc3bee0ed 100644 (file)
@@ -21,6 +21,7 @@
 #include "keypairs_cache.h"
 #include "fstring.h"
 #include "ref.h"
+#include <sys/uio.h>
 
 
 #ifdef __cplusplus
@@ -149,6 +150,19 @@ gboolean rspamd_http_message_set_body_from_fstring_copy(struct rspamd_http_messa
 gboolean rspamd_http_message_append_body(struct rspamd_http_message *msg,
                                                                                 const char *data, gsize len);
 
+/**
+ * Set message body from an array of iovec segments (piecewise/scatter body).
+ * The message takes ownership of the iov array (will g_free it).
+ * The data pointers inside iovecs are NOT owned — caller must keep data alive.
+ * @param msg message
+ * @param iov array of iovec segments (ownership transferred)
+ * @param count number of segments
+ * @param total_len sum of all iov_len values
+ */
+void rspamd_http_message_set_body_iov(struct rspamd_http_message *msg,
+                                                                         struct iovec *iov, gsize count,
+                                                                         gsize total_len);
+
 /**
  * Append a header to http message
  * @param rep
index de683a91b9396d770bbaffabccd126281b79077d..bbdeb7e0a44187c3f955c6a3a873afe5e6cbefe6 100644 (file)
@@ -72,6 +72,12 @@ struct rspamd_http_message {
                } c;
        } body_buf;
 
+       /* Piecewise body: multiple iovec segments instead of single buffer.
+        * When body_iov_count > 0, the write path uses these instead of body_buf.begin.
+        * body_buf.len still holds total length (for Content-Length). */
+       struct iovec *body_iov;
+       gsize body_iov_count;
+
        struct rspamd_cryptobox_pubkey *peer_key;
        time_t date;
        time_t last_modified;
index 99856bf7028f631ced6c951520a357df2cdaaee7..8143b47084c0dde09c73e408a17a2a695674c863 100644 (file)
@@ -129,6 +129,118 @@ auto multipart_response::serialize(void *zstream) const -> std::string
        return out;
 }
 
+void multipart_response::prepare_iov(void *zstream)
+{
+       body_iov_.clear();
+       owned_headers_.clear();
+       owned_compressed_.clear();
+       body_total_len_ = 0;
+
+       for (size_t idx = 0; idx < parts_.size(); idx++) {
+               const auto &part = parts_[idx];
+
+               /* Build part header string */
+               std::string hdr;
+               if (idx == 0) {
+                       hdr.append("--");
+               }
+               else {
+                       hdr.append("\r\n--");
+               }
+               hdr.append(boundary_);
+               hdr.append("\r\n");
+               hdr.append("Content-Disposition: form-data; name=\"");
+               hdr.append(part.name);
+               hdr.append("\"\r\n");
+               if (!part.content_type.empty()) {
+                       hdr.append("Content-Type: ");
+                       hdr.append(part.content_type);
+                       hdr.append("\r\n");
+               }
+
+               /* Handle compression */
+               bool compressed = false;
+               if (part.compress && zstream && !part.data.empty()) {
+                       auto *cstream = static_cast<ZSTD_CStream *>(zstream);
+                       size_t bound = ZSTD_compressBound(part.data.size());
+                       std::string compressed_buf(bound, '\0');
+
+                       ZSTD_inBuffer zin{};
+                       zin.src = part.data.data();
+                       zin.size = part.data.size();
+                       zin.pos = 0;
+
+                       ZSTD_outBuffer zout{};
+                       zout.dst = compressed_buf.data();
+                       zout.size = compressed_buf.size();
+                       zout.pos = 0;
+
+                       ZSTD_CCtx_reset(cstream, ZSTD_reset_session_only);
+
+                       bool ok = true;
+                       while (zin.pos < zin.size) {
+                               size_t r = ZSTD_compressStream2(cstream, &zout, &zin, ZSTD_e_continue);
+                               if (ZSTD_isError(r)) {
+                                       ok = false;
+                                       break;
+                               }
+                       }
+
+                       if (ok) {
+                               size_t r = ZSTD_compressStream2(cstream, &zout, &zin, ZSTD_e_end);
+                               if (ZSTD_isError(r)) {
+                                       ok = false;
+                               }
+                       }
+
+                       if (ok) {
+                               compressed_buf.resize(zout.pos);
+                               compressed = true;
+                               hdr.append("Content-Encoding: zstd\r\n");
+                               hdr.append("\r\n");
+                               owned_compressed_.push_back(std::move(compressed_buf));
+                       }
+               }
+               if (!compressed) {
+                       hdr.append("\r\n");
+               }
+
+               /* Store owned header, add iov for it */
+               owned_headers_.push_back(std::move(hdr));
+               body_iov_.push_back({
+                       const_cast<char *>(owned_headers_.back().data()),
+                       owned_headers_.back().size(),
+               });
+               body_total_len_ += owned_headers_.back().size();
+
+               /* Add iov for data (compressed or original — zero-copy for original) */
+               if (compressed) {
+                       const auto &comp = owned_compressed_.back();
+                       body_iov_.push_back({
+                               const_cast<char *>(comp.data()),
+                               comp.size(),
+                       });
+                       body_total_len_ += comp.size();
+               }
+               else if (!part.data.empty()) {
+                       body_iov_.push_back({
+                               const_cast<char *>(part.data.data()),
+                               part.data.size(),
+                       });
+                       body_total_len_ += part.data.size();
+               }
+       }
+
+       /* Closing boundary: "\r\n--boundary--\r\n" */
+       std::string trailer = "\r\n--" + boundary_ + "--\r\n";
+       owned_headers_.push_back(std::move(trailer));
+       body_iov_.push_back({
+               const_cast<char *>(owned_headers_.back().data()),
+               owned_headers_.back().size(),
+       });
+       body_total_len_ += owned_headers_.back().size();
+}
+
 auto multipart_response::content_type() const -> std::string
 {
        return "multipart/mixed; boundary=\"" + boundary_ + "\"";
@@ -183,6 +295,32 @@ rspamd_multipart_response_serialize(
        return rspamd_fstring_new_init(serialized.data(), serialized.size());
 }
 
+void rspamd_multipart_response_prepare_iov(
+       struct rspamd_multipart_response_c *resp,
+       void *zstream)
+{
+       if (!resp) {
+               return;
+       }
+       resp->resp.prepare_iov(zstream);
+}
+
+const struct iovec *
+rspamd_multipart_response_body_iov(
+       struct rspamd_multipart_response_c *resp,
+       gsize *count,
+       gsize *total_len)
+{
+       if (!resp) {
+               if (count) *count = 0;
+               if (total_len) *total_len = 0;
+               return NULL;
+       }
+       if (count) *count = resp->resp.body_iov_count();
+       if (total_len) *total_len = resp->resp.body_total_len();
+       return resp->resp.body_iov();
+}
+
 const char *
 rspamd_multipart_response_content_type(
        struct rspamd_multipart_response_c *resp)
index 6b7e97f7b60b2bf2d2da691a126b7926b9040d19..62b3422005746f03a10f1b95974b29a0510b7902 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "config.h"
 #include "libutil/fstring.h"
+#include <sys/uio.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -45,6 +46,28 @@ rspamd_fstring_t *rspamd_multipart_response_serialize(
        struct rspamd_multipart_response_c *resp,
        void *zstream);
 
+/**
+ * Prepare piecewise iov segments for zero-copy writev.
+ * @param resp response handle
+ * @param zstream ZSTD compression stream (may be NULL)
+ */
+void rspamd_multipart_response_prepare_iov(
+       struct rspamd_multipart_response_c *resp,
+       void *zstream);
+
+/**
+ * Get the prepared body iov segments.
+ * Returned pointer is valid until resp is freed.
+ * @param resp response handle
+ * @param count [out] number of iov segments
+ * @param total_len [out] total byte length across all segments
+ * @return pointer to iov array
+ */
+const struct iovec *rspamd_multipart_response_body_iov(
+       struct rspamd_multipart_response_c *resp,
+       gsize *count,
+       gsize *total_len);
+
 /**
  * Get the Content-Type header value (includes boundary).
  * The returned string is valid until resp is freed.
index a8611d1b0d4794735a711d29ac07bfa8dae01cdf..a43c38df9ccd3eb12d403f576bb6b78c59c748b6 100644 (file)
@@ -20,6 +20,7 @@
 #include <string>
 #include <string_view>
 #include <vector>
+#include <sys/uio.h>
 
 namespace rspamd::http {
 
@@ -44,6 +45,27 @@ public:
         */
        auto serialize(void *zstream = nullptr) const -> std::string;
 
+       /**
+        * Prepare body as iovec segments for zero-copy writev.
+        * Boundary/header strings and compressed buffers are owned internally.
+        * Data pointers for uncompressed parts reference the original part.data.
+        * @param zstream ZSTD compression stream (may be null)
+        */
+       void prepare_iov(void *zstream = nullptr);
+
+       const struct iovec *body_iov() const
+       {
+               return body_iov_.data();
+       }
+       std::size_t body_iov_count() const
+       {
+               return body_iov_.size();
+       }
+       std::size_t body_total_len() const
+       {
+               return body_total_len_;
+       }
+
        /**
         * Get the Content-Type header value including boundary.
         */
@@ -57,6 +79,12 @@ public:
 private:
        std::string boundary_;
        std::vector<response_part> parts_;
+
+       /* Iov support (populated by prepare_iov) */
+       std::vector<struct iovec> body_iov_;
+       std::vector<std::string> owned_headers_;    /* boundary+headers per part */
+       std::vector<std::string> owned_compressed_; /* compressed data buffers */
+       std::size_t body_total_len_ = 0;
 };
 
 }// namespace rspamd::http
index f59a04be955f7da605658cc702a3b0e314197d95..fda9b9a243b32bcd249c2729784a780a98abe645 100644 (file)
@@ -2711,15 +2711,29 @@ rspamd_protocol_http_reply_v3(struct rspamd_http_message *msg,
                zstream = task->cfg->libs_ctx->out_zstream;
        }
 
-       rspamd_fstring_t *reply = rspamd_multipart_response_serialize(resp, zstream);
-       const char *resp_ctype = rspamd_multipart_response_content_type(resp);
+       rspamd_multipart_response_prepare_iov(resp, zstream);
+
+       gsize niov, total_len;
+       const struct iovec *body_segments =
+               rspamd_multipart_response_body_iov(resp, &niov, &total_len);
+
+       /* Copy iov array — message takes ownership of the copy */
+       struct iovec *iov_copy = g_new(struct iovec, niov);
+       memcpy(iov_copy, body_segments, sizeof(struct iovec) * niov);
+       rspamd_http_message_set_body_iov(msg, iov_copy, niov, total_len);
 
+       const char *resp_ctype = rspamd_multipart_response_content_type(resp);
        /* Copy Content-Type to task pool so it survives after response is freed */
        const char *pool_ctype = rspamd_mempool_strdup(task->task_pool, resp_ctype);
 
-       rspamd_http_message_set_body_from_fstring_steal(msg, reply);
-       rspamd_fstring_free(result_data);
-       rspamd_multipart_response_free(resp);
+       /* Keep data alive until after HTTP write:
+        * - resp owns boundary/header strings and compressed buffers
+        * - result_data fstring owns the UCL result data
+        * Both freed when task_pool is destroyed (after write completes) */
+       rspamd_mempool_add_destructor(task->task_pool,
+                                                                 (rspamd_mempool_destruct_t) rspamd_multipart_response_free, resp);
+       rspamd_mempool_add_destructor(task->task_pool,
+                                                                 (rspamd_mempool_destruct_t) rspamd_fstring_free, result_data);
 
        rspamd_protocol_update_stats(task);
 
index 768290c09905f2232a89fde4c3e72a032764db7b..f13089f49487cd15a8ce2cdeb4164eaf1e1149cf 100644 (file)
@@ -508,6 +508,81 @@ TEST_SUITE("multipart_roundtrip")
                REQUIRE(result_part != nullptr);
                CHECK(result_part->data == data);
        }
+
+       TEST_CASE("prepare_iov matches serialize for two parts")
+       {
+               rspamd::http::multipart_response resp;
+               std::string result_data = "{\"action\":\"reject\",\"score\":15.0}";
+               std::string body_data = "Subject: test\r\n\r\nRewritten body content here";
+               resp.add_part("result", "application/json", result_data);
+               resp.add_part("body", "application/octet-stream", body_data);
+
+               auto serialized = resp.serialize();
+
+               resp.prepare_iov();
+
+               /* Reassemble iov into a single string */
+               std::string reassembled;
+               reassembled.reserve(resp.body_total_len());
+               for (gsize i = 0; i < resp.body_iov_count(); i++) {
+                       const auto *iov = &resp.body_iov()[i];
+                       reassembled.append(static_cast<const char *>(iov->iov_base), iov->iov_len);
+               }
+
+               CHECK(reassembled.size() == resp.body_total_len());
+               CHECK(reassembled == serialized);
+       }
+
+       TEST_CASE("prepare_iov matches serialize for single part")
+       {
+               rspamd::http::multipart_response resp;
+               std::string data = "{\"action\":\"no action\"}";
+               resp.add_part("result", "application/json", data);
+
+               auto serialized = resp.serialize();
+
+               resp.prepare_iov();
+
+               std::string reassembled;
+               reassembled.reserve(resp.body_total_len());
+               for (gsize i = 0; i < resp.body_iov_count(); i++) {
+                       const auto *iov = &resp.body_iov()[i];
+                       reassembled.append(static_cast<const char *>(iov->iov_base), iov->iov_len);
+               }
+
+               CHECK(reassembled == serialized);
+       }
+
+       TEST_CASE("prepare_iov roundtrip through parser")
+       {
+               rspamd::http::multipart_response resp;
+               std::string result_data = "{\"score\":42}";
+               std::string body_data = "Hello world body";
+               resp.add_part("result", "application/json", result_data);
+               resp.add_part("body", "application/octet-stream", body_data);
+
+               resp.prepare_iov();
+
+               /* Reassemble and parse */
+               std::string reassembled;
+               for (gsize i = 0; i < resp.body_iov_count(); i++) {
+                       const auto *iov = &resp.body_iov()[i];
+                       reassembled.append(static_cast<const char *>(iov->iov_base), iov->iov_len);
+               }
+
+               auto boundary = std::string(resp.get_boundary());
+               auto parsed = rspamd::http::parse_multipart_form(reassembled, boundary);
+               REQUIRE(parsed.has_value());
+               CHECK(parsed->parts.size() == 2);
+
+               auto *rp = rspamd::http::find_part(*parsed, "result");
+               REQUIRE(rp != nullptr);
+               CHECK(rp->data == result_data);
+
+               auto *bp = rspamd::http::find_part(*parsed, "body");
+               REQUIRE(bp != nullptr);
+               CHECK(bp->data == body_data);
+       }
 }
 
 #endif// RSPAMD_CXX_UNIT_MULTIPART_HXX