]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] protocol: Add v3 multipart response parsing for proxy and body decompression
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sun, 8 Feb 2026 10:13:40 +0000 (10:13 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sun, 8 Feb 2026 10:13:40 +0000 (10:13 +0000)
Proxy forwarding now handles multipart/mixed responses from /checkv3:
parse result+body parts, decompress zstd, detect msgpack, and forward
rewritten body to milter. Self-scan v3 populates conn->results for
milter and Lua comparison scripts. rspamc client decompresses zstd
body parts returned by the server.

src/client/rspamdclient.c
src/rspamd_proxy.c

index e304de36e2ee303b826bdc5e13d5b196b16581f4..10647a55e622e67f7298727f6fe2e4ce9a960883 100644 (file)
@@ -600,12 +600,50 @@ rspamd_client_v3_finish_handler(struct rspamd_http_connection *conn,
                                        }
 
                                        /* Extract optional body part */
+                                       unsigned char *body_decompressed = NULL;
                                        const struct rspamd_multipart_entry_c *body_part =
                                                rspamd_multipart_form_find(form, "body", sizeof("body") - 1);
                                        if (body_part && body_part->data_len > 0) {
                                                body = body_part->data;
                                                bodylen = body_part->data_len;
-                                               /* TODO: decompress body part if needed */
+
+                                               /* Decompress body part if needed */
+                                               if (body_part->content_encoding &&
+                                                       body_part->content_encoding_len > 0 &&
+                                                       rspamd_substring_search_caseless(body_part->content_encoding,
+                                                                                                                        body_part->content_encoding_len,
+                                                                                                                        "zstd", 4) != -1) {
+                                                       ZSTD_DStream *bzstream = ZSTD_createDStream();
+                                                       ZSTD_initDStream(bzstream);
+                                                       ZSTD_inBuffer bzin = {body, bodylen, 0};
+                                                       gsize boutlen = ZSTD_getDecompressedSize(body, bodylen);
+                                                       if (boutlen == 0) boutlen = ZSTD_DStreamOutSize();
+                                                       body_decompressed = g_malloc(boutlen);
+                                                       ZSTD_outBuffer bzout = {body_decompressed, boutlen, 0};
+
+                                                       while (bzin.pos < bzin.size) {
+                                                               gsize r = ZSTD_decompressStream(bzstream, &bzout, &bzin);
+                                                               if (ZSTD_isError(r)) {
+                                                                       g_free(body_decompressed);
+                                                                       body_decompressed = NULL;
+                                                                       ZSTD_freeDStream(bzstream);
+                                                                       /* Non-fatal: pass compressed body as-is */
+                                                                       body = body_part->data;
+                                                                       bodylen = body_part->data_len;
+                                                                       break;
+                                                               }
+                                                               if (bzout.pos == bzout.size) {
+                                                                       bzout.size *= 2;
+                                                                       body_decompressed = g_realloc(bzout.dst, bzout.size);
+                                                                       bzout.dst = body_decompressed;
+                                                               }
+                                                       }
+                                                       if (body_decompressed) {
+                                                               ZSTD_freeDStream(bzstream);
+                                                               body = (const char *) bzout.dst;
+                                                               bodylen = bzout.pos;
+                                                       }
+                                               }
                                        }
 
                                        parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
@@ -632,6 +670,7 @@ rspamd_client_v3_finish_handler(struct rspamd_http_connection *conn,
                                                                req->input, req->ud, c->start_time,
                                                                c->send_time, body, bodylen, err);
                                                g_error_free(err);
+                                               g_free(body_decompressed);
                                                return 0;
                                        }
 
@@ -640,6 +679,7 @@ rspamd_client_v3_finish_handler(struct rspamd_http_connection *conn,
                                                        req->input, req->ud,
                                                        c->start_time, c->send_time, body, bodylen, NULL);
                                        ucl_parser_free(parser);
+                                       g_free(body_decompressed);
                                }
                                else {
                                        err = g_error_new(RCLIENT_ERROR, 500,
index 5ad794470de332e11e0a75e84a57381ab20b2d68..66529707569b8aa46564bd30f3962c613e5107dd 100644 (file)
@@ -36,6 +36,8 @@
 #include "libserver/milter.h"
 #include "libserver/milter_internal.h"
 #include "libmime/lang_detection.h"
+#include "libmime/content_type.h"
+#include "libserver/multipart_form.h"
 
 #include <math.h>
 #include <string.h>
@@ -204,6 +206,8 @@ struct rspamd_proxy_backend_connection {
        int parser_to_ref;
        struct rspamd_task *task;
        gsize reserved_tokens; /* Tokens reserved for this request (token bucket) */
+       const char *body_data; /* Extracted body from multipart response */
+       gsize body_len;
 };
 
 enum rspamd_proxy_legacy_support {
@@ -1240,6 +1244,159 @@ proxy_backend_parse_results(struct rspamd_proxy_session *session,
                conn->results = ucl_object_lua_import(L, -1);
                lua_settop(L, 0);
        }
+       else if (ct && rspamd_substring_search_caseless(ct->begin, ct->len,
+                                                                                                       "multipart/mixed", sizeof("multipart/mixed") - 1) != -1) {
+               /* V3 multipart/mixed response */
+               struct rspamd_content_type *parsed_ct = rspamd_content_type_parse(
+                       ct->begin, ct->len, session->pool);
+
+               if (!parsed_ct || parsed_ct->boundary.len == 0) {
+                       msg_err_session("cannot extract boundary from multipart Content-Type");
+                       return FALSE;
+               }
+
+               struct rspamd_multipart_form_c *form = rspamd_multipart_form_parse(
+                       in, inlen, parsed_ct->boundary.begin, parsed_ct->boundary.len);
+
+               if (!form) {
+                       msg_err_session("cannot parse multipart/mixed response");
+                       return FALSE;
+               }
+
+               const struct rspamd_multipart_entry_c *result_part =
+                       rspamd_multipart_form_find(form, "result", sizeof("result") - 1);
+
+               if (!result_part) {
+                       msg_err_session("no 'result' part in multipart response");
+                       rspamd_multipart_form_free(form);
+                       return FALSE;
+               }
+
+               const char *result_data = result_part->data;
+               gsize result_len = result_part->data_len;
+               unsigned char *decompressed = NULL;
+
+               /* Check for per-part zstd compression */
+               if (result_part->content_encoding &&
+                       result_part->content_encoding_len > 0 &&
+                       rspamd_substring_search_caseless(result_part->content_encoding,
+                                                                                        result_part->content_encoding_len,
+                                                                                        "zstd", 4) != -1) {
+                       ZSTD_DStream *zstream = ZSTD_createDStream();
+                       ZSTD_initDStream(zstream);
+                       ZSTD_inBuffer zin = {result_data, result_len, 0};
+                       gsize outlen = ZSTD_getDecompressedSize(result_data, result_len);
+                       if (outlen == 0) outlen = ZSTD_DStreamOutSize();
+                       decompressed = g_malloc(outlen);
+                       ZSTD_outBuffer zout = {decompressed, outlen, 0};
+
+                       while (zin.pos < zin.size) {
+                               gsize r = ZSTD_decompressStream(zstream, &zout, &zin);
+                               if (ZSTD_isError(r)) {
+                                       g_free(decompressed);
+                                       ZSTD_freeDStream(zstream);
+                                       rspamd_multipart_form_free(form);
+                                       msg_err_session("result decompression error: %s",
+                                                                       ZSTD_getErrorName(r));
+                                       return FALSE;
+                               }
+                               if (zout.pos == zout.size) {
+                                       zout.size *= 2;
+                                       decompressed = g_realloc(zout.dst, zout.size);
+                                       zout.dst = decompressed;
+                               }
+                       }
+                       ZSTD_freeDStream(zstream);
+                       result_data = (const char *) zout.dst;
+                       result_len = zout.pos;
+               }
+
+               /* Parse UCL from result part */
+               parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
+
+               if (result_part->content_type &&
+                       rspamd_substring_search_caseless(result_part->content_type,
+                                                                                        result_part->content_type_len,
+                                                                                        "msgpack", 7) != -1) {
+                       ucl_parser_add_chunk_full(parser, (const unsigned char *) result_data,
+                                                                         result_len,
+                                                                         ucl_parser_get_default_priority(parser),
+                                                                         UCL_DUPLICATE_APPEND, UCL_PARSE_MSGPACK);
+               }
+               else {
+                       ucl_parser_add_chunk(parser, (const unsigned char *) result_data, result_len);
+               }
+
+               g_free(decompressed);
+
+               if (ucl_parser_get_error(parser)) {
+                       msg_err_session("cannot parse result UCL: %s",
+                                                       ucl_parser_get_error(parser));
+                       ucl_parser_free(parser);
+                       rspamd_multipart_form_free(form);
+                       return FALSE;
+               }
+
+               conn->results = ucl_parser_get_object(parser);
+               ucl_parser_free(parser);
+
+               /* Extract optional body part for milter */
+               const struct rspamd_multipart_entry_c *body_part_entry =
+                       rspamd_multipart_form_find(form, "body", sizeof("body") - 1);
+
+               if (body_part_entry && body_part_entry->data_len > 0) {
+                       const char *bp_data = body_part_entry->data;
+                       gsize bp_len = body_part_entry->data_len;
+
+                       /* Check for per-part zstd compression on body */
+                       if (body_part_entry->content_encoding &&
+                               body_part_entry->content_encoding_len > 0 &&
+                               rspamd_substring_search_caseless(body_part_entry->content_encoding,
+                                                                                                body_part_entry->content_encoding_len,
+                                                                                                "zstd", 4) != -1) {
+                               ZSTD_DStream *zstream = ZSTD_createDStream();
+                               ZSTD_initDStream(zstream);
+                               ZSTD_inBuffer zin = {bp_data, bp_len, 0};
+                               gsize outlen = ZSTD_getDecompressedSize(bp_data, bp_len);
+                               if (outlen == 0) outlen = ZSTD_DStreamOutSize();
+                               unsigned char *bp_decompressed = g_malloc(outlen);
+                               ZSTD_outBuffer zout = {bp_decompressed, outlen, 0};
+                               gboolean decompress_ok = TRUE;
+
+                               while (zin.pos < zin.size) {
+                                       gsize r = ZSTD_decompressStream(zstream, &zout, &zin);
+                                       if (ZSTD_isError(r)) {
+                                               msg_warn_session("body decompression error: %s",
+                                                                                ZSTD_getErrorName(r));
+                                               decompress_ok = FALSE;
+                                               break;
+                                       }
+                                       if (zout.pos == zout.size) {
+                                               zout.size *= 2;
+                                               bp_decompressed = g_realloc(zout.dst, zout.size);
+                                               zout.dst = bp_decompressed;
+                                       }
+                               }
+                               ZSTD_freeDStream(zstream);
+
+                               if (decompress_ok) {
+                                       /* Copy to pool so it persists */
+                                       conn->body_data = rspamd_mempool_alloc(session->pool, zout.pos);
+                                       memcpy((void *) conn->body_data, zout.dst, zout.pos);
+                                       conn->body_len = zout.pos;
+                               }
+                               g_free(bp_decompressed);
+                       }
+                       else {
+                               /* Uncompressed body — copy to pool */
+                               conn->body_data = rspamd_mempool_alloc(session->pool, bp_len);
+                               memcpy((void *) conn->body_data, bp_data, bp_len);
+                               conn->body_len = bp_len;
+                       }
+               }
+
+               rspamd_multipart_form_free(form);
+       }
        else {
                rspamd_ftok_t json_ct;
                RSPAMD_FTOK_ASSIGN(&json_ct, "application/json");
@@ -2303,7 +2460,14 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
        if (session->client_milter_conn) {
                nsession = proxy_session_refresh(session);
 
-               if (body_offset > 0) {
+               if (bk_conn->body_data && bk_conn->body_len > 0) {
+                       /* Body extracted from multipart response (v3) */
+                       rspamd_milter_send_task_results(nsession->client_milter_conn,
+                                                                                       session->master_conn->results,
+                                                                                       bk_conn->body_data,
+                                                                                       bk_conn->body_len);
+               }
+               else if (body_offset > 0) {
                        rspamd_milter_send_task_results(nsession->client_milter_conn,
                                                                                        session->master_conn->results,
                                                                                        msg->body_buf.begin + body_offset,
@@ -2404,6 +2568,7 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task)
                break;
        case CMD_CHECK_V3:
                rspamd_task_set_finish_time(task);
+               rep = rspamd_protocol_write_ucl(task, RSPAMD_PROTOCOL_DEFAULT | RSPAMD_PROTOCOL_URLS);
                ctype = rspamd_protocol_http_reply_v3(msg, task);
                rspamd_protocol_write_log_pipe(task);
                break;