outlen = priv->out[0].iov_len + priv->out[1].iov_len;
/*
* Create segments from the following:
- * Method, [URL], CRLF, nheaders, CRLF, body
+ * Method, [URL], CRLF, nheaders, CRLF, body segment(s)
*/
- segments = g_new(struct rspamd_cryptobox_segment, hdrcount + 5);
+ gsize body_seg_count = (msg->body_iov_count > 0) ? msg->body_iov_count : (pbody ? 1 : 0);
+ segments = g_new(struct rspamd_cryptobox_segment, hdrcount + 4 + body_seg_count);
segments[0].data = pmethod;
segments[0].len = methodlen;
segments[i].data = crlfp;
segments[i++].len = 2;
-if (pbody) {
+if (msg->body_iov_count > 0) {
+ for (gsize j = 0; j < msg->body_iov_count; j++) {
+ segments[i].data = msg->body_iov[j].iov_base;
+ segments[i++].len = msg->body_iov[j].iov_len;
+ }
+}
+else if (pbody) {
segments[i].data = pbody;
segments[i++].len = bodylen;
}
}
if (encrypted) {
- if (msg->body_buf.len == 0) {
+ if (msg->body_iov_count > 0) {
+ pbody = NULL;
+ bodylen = msg->body_buf.len;
+ msg->method = HTTP_POST;
+ }
+ else if (msg->body_buf.len == 0) {
pbody = NULL;
bodylen = 0;
msg->method = HTTP_GET;
* iov[6] = encrypted crlf
* iov[7..n] = encrypted headers
* iov[n + 1] = encrypted crlf
- * [iov[n + 2] = encrypted body]
+ * [iov[n + 2..] = encrypted body segment(s)]
*/
priv->outlen = 7;
enclen = crypto_box_noncebytes() +
* iov[7] = encrypted prelude
* iov[8..n] = encrypted headers
* iov[n + 1] = encrypted crlf
- * [iov[n + 2] = encrypted body]
+ * [iov[n + 2..] = encrypted body segment(s)]
*/
priv->outlen = 8;
}
if (bodylen > 0) {
- priv->outlen++;
+ if (msg->body_iov_count > 0) {
+ priv->outlen += msg->body_iov_count;
+ }
+ else {
+ priv->outlen++;
+ }
}
}
else {
if (msg->method < HTTP_SYMBOLS) {
- if (msg->body_buf.len == 0 || allow_shared) {
+ if (msg->body_iov_count > 0) {
+ pbody = NULL;
+ bodylen = msg->body_buf.len;
+ priv->outlen = 2 + msg->body_iov_count;
+
+ if (msg->method == HTTP_INVALID) {
+ msg->method = HTTP_POST;
+ }
+ }
+ else if (msg->body_buf.len == 0 || allow_shared) {
pbody = NULL;
bodylen = 0;
priv->outlen = 2;
priv->wr_total -= 2;
}
-if (pbody != NULL) {
+if (msg->body_iov_count > 0) {
+ for (gsize j = 0; j < msg->body_iov_count; j++) {
+ priv->out[i].iov_base = msg->body_iov[j].iov_base;
+ priv->out[i++].iov_len = msg->body_iov[j].iov_len;
+ }
+}
+else if (pbody != NULL) {
priv->out[i].iov_base = pbody;
priv->out[i++].iov_len = bodylen;
}
return TRUE;
}
+void rspamd_http_message_set_body_iov(struct rspamd_http_message *msg,
+ struct iovec *iov, gsize count,
+ gsize total_len)
+{
+ rspamd_http_message_storage_cleanup(msg);
+
+ msg->body_iov = iov;
+ msg->body_iov_count = count;
+ msg->body_buf.len = total_len;
+ msg->body_buf.begin = NULL;
+ msg->body_buf.str = NULL;
+ msg->body_buf.allocated_len = 0;
+ msg->flags |= RSPAMD_HTTP_FLAG_HAS_BODY;
+}
+
void rspamd_http_message_storage_cleanup(struct rspamd_http_message *msg)
{
union _rspamd_storage_u *storage;
struct stat st;
+ /* Free piecewise body iov if present */
+ if (msg->body_iov) {
+ g_free(msg->body_iov);
+ msg->body_iov = NULL;
+ msg->body_iov_count = 0;
+ }
+
if (msg->flags & RSPAMD_HTTP_FLAG_SHMEM) {
storage = &msg->body_buf.c;
#include "keypairs_cache.h"
#include "fstring.h"
#include "ref.h"
+#include <sys/uio.h>
#ifdef __cplusplus
gboolean rspamd_http_message_append_body(struct rspamd_http_message *msg,
const char *data, gsize len);
+/**
+ * Set message body from an array of iovec segments (piecewise/scatter body).
+ * The message takes ownership of the iov array (will g_free it).
+ * The data pointers inside iovecs are NOT owned — caller must keep data alive.
+ * @param msg message
+ * @param iov array of iovec segments (ownership transferred)
+ * @param count number of segments
+ * @param total_len sum of all iov_len values
+ */
+void rspamd_http_message_set_body_iov(struct rspamd_http_message *msg,
+ struct iovec *iov, gsize count,
+ gsize total_len);
+
/**
* Append a header to http message
* @param rep
} c;
} body_buf;
+ /* Piecewise body: multiple iovec segments instead of single buffer.
+ * When body_iov_count > 0, the write path uses these instead of body_buf.begin.
+ * body_buf.len still holds total length (for Content-Length). */
+ struct iovec *body_iov;
+ gsize body_iov_count;
+
struct rspamd_cryptobox_pubkey *peer_key;
time_t date;
time_t last_modified;
return out;
}
+void multipart_response::prepare_iov(void *zstream)
+{
+ body_iov_.clear();
+ owned_headers_.clear();
+ owned_compressed_.clear();
+ body_total_len_ = 0;
+
+ for (size_t idx = 0; idx < parts_.size(); idx++) {
+ const auto &part = parts_[idx];
+
+ /* Build part header string */
+ std::string hdr;
+ if (idx == 0) {
+ hdr.append("--");
+ }
+ else {
+ hdr.append("\r\n--");
+ }
+ hdr.append(boundary_);
+ hdr.append("\r\n");
+ hdr.append("Content-Disposition: form-data; name=\"");
+ hdr.append(part.name);
+ hdr.append("\"\r\n");
+ if (!part.content_type.empty()) {
+ hdr.append("Content-Type: ");
+ hdr.append(part.content_type);
+ hdr.append("\r\n");
+ }
+
+ /* Handle compression */
+ bool compressed = false;
+ if (part.compress && zstream && !part.data.empty()) {
+ auto *cstream = static_cast<ZSTD_CStream *>(zstream);
+ size_t bound = ZSTD_compressBound(part.data.size());
+ std::string compressed_buf(bound, '\0');
+
+ ZSTD_inBuffer zin{};
+ zin.src = part.data.data();
+ zin.size = part.data.size();
+ zin.pos = 0;
+
+ ZSTD_outBuffer zout{};
+ zout.dst = compressed_buf.data();
+ zout.size = compressed_buf.size();
+ zout.pos = 0;
+
+ ZSTD_CCtx_reset(cstream, ZSTD_reset_session_only);
+
+ bool ok = true;
+ while (zin.pos < zin.size) {
+ size_t r = ZSTD_compressStream2(cstream, &zout, &zin, ZSTD_e_continue);
+ if (ZSTD_isError(r)) {
+ ok = false;
+ break;
+ }
+ }
+
+ if (ok) {
+ size_t r = ZSTD_compressStream2(cstream, &zout, &zin, ZSTD_e_end);
+ if (ZSTD_isError(r)) {
+ ok = false;
+ }
+ }
+
+ if (ok) {
+ compressed_buf.resize(zout.pos);
+ compressed = true;
+ hdr.append("Content-Encoding: zstd\r\n");
+ hdr.append("\r\n");
+ owned_compressed_.push_back(std::move(compressed_buf));
+ }
+ }
+ if (!compressed) {
+ hdr.append("\r\n");
+ }
+
+ /* Store owned header, add iov for it */
+ owned_headers_.push_back(std::move(hdr));
+ body_iov_.push_back({
+ const_cast<char *>(owned_headers_.back().data()),
+ owned_headers_.back().size(),
+ });
+ body_total_len_ += owned_headers_.back().size();
+
+ /* Add iov for data (compressed or original — zero-copy for original) */
+ if (compressed) {
+ const auto &comp = owned_compressed_.back();
+ body_iov_.push_back({
+ const_cast<char *>(comp.data()),
+ comp.size(),
+ });
+ body_total_len_ += comp.size();
+ }
+ else if (!part.data.empty()) {
+ body_iov_.push_back({
+ const_cast<char *>(part.data.data()),
+ part.data.size(),
+ });
+ body_total_len_ += part.data.size();
+ }
+ }
+
+ /* Closing boundary: "\r\n--boundary--\r\n" */
+ std::string trailer = "\r\n--" + boundary_ + "--\r\n";
+ owned_headers_.push_back(std::move(trailer));
+ body_iov_.push_back({
+ const_cast<char *>(owned_headers_.back().data()),
+ owned_headers_.back().size(),
+ });
+ body_total_len_ += owned_headers_.back().size();
+}
+
auto multipart_response::content_type() const -> std::string
{
return "multipart/mixed; boundary=\"" + boundary_ + "\"";
return rspamd_fstring_new_init(serialized.data(), serialized.size());
}
+void rspamd_multipart_response_prepare_iov(
+ struct rspamd_multipart_response_c *resp,
+ void *zstream)
+{
+ if (!resp) {
+ return;
+ }
+ resp->resp.prepare_iov(zstream);
+}
+
+const struct iovec *
+rspamd_multipart_response_body_iov(
+ struct rspamd_multipart_response_c *resp,
+ gsize *count,
+ gsize *total_len)
+{
+ if (!resp) {
+ if (count) *count = 0;
+ if (total_len) *total_len = 0;
+ return NULL;
+ }
+ if (count) *count = resp->resp.body_iov_count();
+ if (total_len) *total_len = resp->resp.body_total_len();
+ return resp->resp.body_iov();
+}
+
const char *
rspamd_multipart_response_content_type(
struct rspamd_multipart_response_c *resp)
#include "config.h"
#include "libutil/fstring.h"
+#include <sys/uio.h>
#ifdef __cplusplus
extern "C" {
struct rspamd_multipart_response_c *resp,
void *zstream);
+/**
+ * Prepare piecewise iov segments for zero-copy writev.
+ * @param resp response handle
+ * @param zstream ZSTD compression stream (may be NULL)
+ */
+void rspamd_multipart_response_prepare_iov(
+ struct rspamd_multipart_response_c *resp,
+ void *zstream);
+
+/**
+ * Get the prepared body iov segments.
+ * Returned pointer is valid until resp is freed.
+ * @param resp response handle
+ * @param count [out] number of iov segments
+ * @param total_len [out] total byte length across all segments
+ * @return pointer to iov array
+ */
+const struct iovec *rspamd_multipart_response_body_iov(
+ struct rspamd_multipart_response_c *resp,
+ gsize *count,
+ gsize *total_len);
+
/**
* Get the Content-Type header value (includes boundary).
* The returned string is valid until resp is freed.
#include <string>
#include <string_view>
#include <vector>
+#include <sys/uio.h>
namespace rspamd::http {
*/
auto serialize(void *zstream = nullptr) const -> std::string;
+ /**
+ * Prepare body as iovec segments for zero-copy writev.
+ * Boundary/header strings and compressed buffers are owned internally.
+ * Data pointers for uncompressed parts reference the original part.data.
+ * @param zstream ZSTD compression stream (may be null)
+ */
+ void prepare_iov(void *zstream = nullptr);
+
+ const struct iovec *body_iov() const
+ {
+ return body_iov_.data();
+ }
+ std::size_t body_iov_count() const
+ {
+ return body_iov_.size();
+ }
+ std::size_t body_total_len() const
+ {
+ return body_total_len_;
+ }
+
/**
* Get the Content-Type header value including boundary.
*/
private:
std::string boundary_;
std::vector<response_part> parts_;
+
+ /* Iov support (populated by prepare_iov) */
+ std::vector<struct iovec> body_iov_;
+ std::vector<std::string> owned_headers_; /* boundary+headers per part */
+ std::vector<std::string> owned_compressed_; /* compressed data buffers */
+ std::size_t body_total_len_ = 0;
};
}// namespace rspamd::http
zstream = task->cfg->libs_ctx->out_zstream;
}
- rspamd_fstring_t *reply = rspamd_multipart_response_serialize(resp, zstream);
- const char *resp_ctype = rspamd_multipart_response_content_type(resp);
+ rspamd_multipart_response_prepare_iov(resp, zstream);
+
+ gsize niov, total_len;
+ const struct iovec *body_segments =
+ rspamd_multipart_response_body_iov(resp, &niov, &total_len);
+
+ /* Copy iov array — message takes ownership of the copy */
+ struct iovec *iov_copy = g_new(struct iovec, niov);
+ memcpy(iov_copy, body_segments, sizeof(struct iovec) * niov);
+ rspamd_http_message_set_body_iov(msg, iov_copy, niov, total_len);
+ const char *resp_ctype = rspamd_multipart_response_content_type(resp);
/* Copy Content-Type to task pool so it survives after response is freed */
const char *pool_ctype = rspamd_mempool_strdup(task->task_pool, resp_ctype);
- rspamd_http_message_set_body_from_fstring_steal(msg, reply);
- rspamd_fstring_free(result_data);
- rspamd_multipart_response_free(resp);
+ /* Keep data alive until after HTTP write:
+ * - resp owns boundary/header strings and compressed buffers
+ * - result_data fstring owns the UCL result data
+ * Both freed when task_pool is destroyed (after write completes) */
+ rspamd_mempool_add_destructor(task->task_pool,
+ (rspamd_mempool_destruct_t) rspamd_multipart_response_free, resp);
+ rspamd_mempool_add_destructor(task->task_pool,
+ (rspamd_mempool_destruct_t) rspamd_fstring_free, result_data);
rspamd_protocol_update_stats(task);
REQUIRE(result_part != nullptr);
CHECK(result_part->data == data);
}
+
+ TEST_CASE("prepare_iov matches serialize for two parts")
+ {
+ rspamd::http::multipart_response resp;
+ std::string result_data = "{\"action\":\"reject\",\"score\":15.0}";
+ std::string body_data = "Subject: test\r\n\r\nRewritten body content here";
+ resp.add_part("result", "application/json", result_data);
+ resp.add_part("body", "application/octet-stream", body_data);
+
+ auto serialized = resp.serialize();
+
+ resp.prepare_iov();
+
+ /* Reassemble iov into a single string */
+ std::string reassembled;
+ reassembled.reserve(resp.body_total_len());
+ for (gsize i = 0; i < resp.body_iov_count(); i++) {
+ const auto *iov = &resp.body_iov()[i];
+ reassembled.append(static_cast<const char *>(iov->iov_base), iov->iov_len);
+ }
+
+ CHECK(reassembled.size() == resp.body_total_len());
+ CHECK(reassembled == serialized);
+ }
+
+ TEST_CASE("prepare_iov matches serialize for single part")
+ {
+ rspamd::http::multipart_response resp;
+ std::string data = "{\"action\":\"no action\"}";
+ resp.add_part("result", "application/json", data);
+
+ auto serialized = resp.serialize();
+
+ resp.prepare_iov();
+
+ std::string reassembled;
+ reassembled.reserve(resp.body_total_len());
+ for (gsize i = 0; i < resp.body_iov_count(); i++) {
+ const auto *iov = &resp.body_iov()[i];
+ reassembled.append(static_cast<const char *>(iov->iov_base), iov->iov_len);
+ }
+
+ CHECK(reassembled == serialized);
+ }
+
+ TEST_CASE("prepare_iov roundtrip through parser")
+ {
+ rspamd::http::multipart_response resp;
+ std::string result_data = "{\"score\":42}";
+ std::string body_data = "Hello world body";
+ resp.add_part("result", "application/json", result_data);
+ resp.add_part("body", "application/octet-stream", body_data);
+
+ resp.prepare_iov();
+
+ /* Reassemble and parse */
+ std::string reassembled;
+ for (gsize i = 0; i < resp.body_iov_count(); i++) {
+ const auto *iov = &resp.body_iov()[i];
+ reassembled.append(static_cast<const char *>(iov->iov_base), iov->iov_len);
+ }
+
+ auto boundary = std::string(resp.get_boundary());
+ auto parsed = rspamd::http::parse_multipart_form(reassembled, boundary);
+ REQUIRE(parsed.has_value());
+ CHECK(parsed->parts.size() == 2);
+
+ auto *rp = rspamd::http::find_part(*parsed, "result");
+ REQUIRE(rp != nullptr);
+ CHECK(rp->data == result_data);
+
+ auto *bp = rspamd::http::find_part(*parsed, "body");
+ REQUIRE(bp != nullptr);
+ CHECK(bp->data == body_data);
+ }
}
#endif// RSPAMD_CXX_UNIT_MULTIPART_HXX