From: Vsevolod Stakhov Date: Sun, 8 Feb 2026 10:13:40 +0000 (+0000) Subject: [Feature] protocol: Add v3 multipart response parsing for proxy and body decompression X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=231426065ff8ea90854249e3cec3a7138e4f9a33;p=thirdparty%2Frspamd.git [Feature] protocol: Add v3 multipart response parsing for proxy and body decompression Proxy forwarding now handles multipart/mixed responses from /checkv3: parse result+body parts, decompress zstd, detect msgpack, and forward rewritten body to milter. Self-scan v3 populates conn->results for milter and Lua comparison scripts. rspamc client decompresses zstd body parts returned by the server. --- diff --git a/src/client/rspamdclient.c b/src/client/rspamdclient.c index e304de36e2..10647a55e6 100644 --- a/src/client/rspamdclient.c +++ b/src/client/rspamdclient.c @@ -600,12 +600,50 @@ rspamd_client_v3_finish_handler(struct rspamd_http_connection *conn, } /* Extract optional body part */ + unsigned char *body_decompressed = NULL; const struct rspamd_multipart_entry_c *body_part = rspamd_multipart_form_find(form, "body", sizeof("body") - 1); if (body_part && body_part->data_len > 0) { body = body_part->data; bodylen = body_part->data_len; - /* TODO: decompress body part if needed */ + + /* Decompress body part if needed */ + if (body_part->content_encoding && + body_part->content_encoding_len > 0 && + rspamd_substring_search_caseless(body_part->content_encoding, + body_part->content_encoding_len, + "zstd", 4) != -1) { + ZSTD_DStream *bzstream = ZSTD_createDStream(); + ZSTD_initDStream(bzstream); + ZSTD_inBuffer bzin = {body, bodylen, 0}; + gsize boutlen = ZSTD_getDecompressedSize(body, bodylen); + if (boutlen == 0) boutlen = ZSTD_DStreamOutSize(); + body_decompressed = g_malloc(boutlen); + ZSTD_outBuffer bzout = {body_decompressed, boutlen, 0}; + + while (bzin.pos < bzin.size) { + gsize r = ZSTD_decompressStream(bzstream, &bzout, &bzin); + if (ZSTD_isError(r)) { + g_free(body_decompressed); + body_decompressed = NULL; + ZSTD_freeDStream(bzstream); + /* Non-fatal: pass compressed body as-is */ + body = body_part->data; + bodylen = body_part->data_len; + break; + } + if (bzout.pos == bzout.size) { + bzout.size *= 2; + body_decompressed = g_realloc(bzout.dst, bzout.size); + bzout.dst = body_decompressed; + } + } + if (body_decompressed) { + ZSTD_freeDStream(bzstream); + body = (const char *) bzout.dst; + bodylen = bzout.pos; + } + } } parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); @@ -632,6 +670,7 @@ rspamd_client_v3_finish_handler(struct rspamd_http_connection *conn, req->input, req->ud, c->start_time, c->send_time, body, bodylen, err); g_error_free(err); + g_free(body_decompressed); return 0; } @@ -640,6 +679,7 @@ rspamd_client_v3_finish_handler(struct rspamd_http_connection *conn, req->input, req->ud, c->start_time, c->send_time, body, bodylen, NULL); ucl_parser_free(parser); + g_free(body_decompressed); } else { err = g_error_new(RCLIENT_ERROR, 500, diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 5ad794470d..6652970756 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -36,6 +36,8 @@ #include "libserver/milter.h" #include "libserver/milter_internal.h" #include "libmime/lang_detection.h" +#include "libmime/content_type.h" +#include "libserver/multipart_form.h" #include #include @@ -204,6 +206,8 @@ struct rspamd_proxy_backend_connection { int parser_to_ref; struct rspamd_task *task; gsize reserved_tokens; /* Tokens reserved for this request (token bucket) */ + const char *body_data; /* Extracted body from multipart response */ + gsize body_len; }; enum rspamd_proxy_legacy_support { @@ -1240,6 +1244,159 @@ proxy_backend_parse_results(struct rspamd_proxy_session *session, conn->results = ucl_object_lua_import(L, -1); lua_settop(L, 0); } + else if (ct && rspamd_substring_search_caseless(ct->begin, ct->len, + "multipart/mixed", sizeof("multipart/mixed") - 1) != -1) { + /* V3 multipart/mixed response */ + struct rspamd_content_type *parsed_ct = rspamd_content_type_parse( + ct->begin, ct->len, session->pool); + + if (!parsed_ct || parsed_ct->boundary.len == 0) { + msg_err_session("cannot extract boundary from multipart Content-Type"); + return FALSE; + } + + struct rspamd_multipart_form_c *form = rspamd_multipart_form_parse( + in, inlen, parsed_ct->boundary.begin, parsed_ct->boundary.len); + + if (!form) { + msg_err_session("cannot parse multipart/mixed response"); + return FALSE; + } + + const struct rspamd_multipart_entry_c *result_part = + rspamd_multipart_form_find(form, "result", sizeof("result") - 1); + + if (!result_part) { + msg_err_session("no 'result' part in multipart response"); + rspamd_multipart_form_free(form); + return FALSE; + } + + const char *result_data = result_part->data; + gsize result_len = result_part->data_len; + unsigned char *decompressed = NULL; + + /* Check for per-part zstd compression */ + if (result_part->content_encoding && + result_part->content_encoding_len > 0 && + rspamd_substring_search_caseless(result_part->content_encoding, + result_part->content_encoding_len, + "zstd", 4) != -1) { + ZSTD_DStream *zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + ZSTD_inBuffer zin = {result_data, result_len, 0}; + gsize outlen = ZSTD_getDecompressedSize(result_data, result_len); + if (outlen == 0) outlen = ZSTD_DStreamOutSize(); + decompressed = g_malloc(outlen); + ZSTD_outBuffer zout = {decompressed, outlen, 0}; + + while (zin.pos < zin.size) { + gsize r = ZSTD_decompressStream(zstream, &zout, &zin); + if (ZSTD_isError(r)) { + g_free(decompressed); + ZSTD_freeDStream(zstream); + rspamd_multipart_form_free(form); + msg_err_session("result decompression error: %s", + ZSTD_getErrorName(r)); + return FALSE; + } + if (zout.pos == zout.size) { + zout.size *= 2; + decompressed = g_realloc(zout.dst, zout.size); + zout.dst = decompressed; + } + } + ZSTD_freeDStream(zstream); + result_data = (const char *) zout.dst; + result_len = zout.pos; + } + + /* Parse UCL from result part */ + parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); + + if (result_part->content_type && + rspamd_substring_search_caseless(result_part->content_type, + result_part->content_type_len, + "msgpack", 7) != -1) { + ucl_parser_add_chunk_full(parser, (const unsigned char *) result_data, + result_len, + ucl_parser_get_default_priority(parser), + UCL_DUPLICATE_APPEND, UCL_PARSE_MSGPACK); + } + else { + ucl_parser_add_chunk(parser, (const unsigned char *) result_data, result_len); + } + + g_free(decompressed); + + if (ucl_parser_get_error(parser)) { + msg_err_session("cannot parse result UCL: %s", + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + rspamd_multipart_form_free(form); + return FALSE; + } + + conn->results = ucl_parser_get_object(parser); + ucl_parser_free(parser); + + /* Extract optional body part for milter */ + const struct rspamd_multipart_entry_c *body_part_entry = + rspamd_multipart_form_find(form, "body", sizeof("body") - 1); + + if (body_part_entry && body_part_entry->data_len > 0) { + const char *bp_data = body_part_entry->data; + gsize bp_len = body_part_entry->data_len; + + /* Check for per-part zstd compression on body */ + if (body_part_entry->content_encoding && + body_part_entry->content_encoding_len > 0 && + rspamd_substring_search_caseless(body_part_entry->content_encoding, + body_part_entry->content_encoding_len, + "zstd", 4) != -1) { + ZSTD_DStream *zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + ZSTD_inBuffer zin = {bp_data, bp_len, 0}; + gsize outlen = ZSTD_getDecompressedSize(bp_data, bp_len); + if (outlen == 0) outlen = ZSTD_DStreamOutSize(); + unsigned char *bp_decompressed = g_malloc(outlen); + ZSTD_outBuffer zout = {bp_decompressed, outlen, 0}; + gboolean decompress_ok = TRUE; + + while (zin.pos < zin.size) { + gsize r = ZSTD_decompressStream(zstream, &zout, &zin); + if (ZSTD_isError(r)) { + msg_warn_session("body decompression error: %s", + ZSTD_getErrorName(r)); + decompress_ok = FALSE; + break; + } + if (zout.pos == zout.size) { + zout.size *= 2; + bp_decompressed = g_realloc(zout.dst, zout.size); + zout.dst = bp_decompressed; + } + } + ZSTD_freeDStream(zstream); + + if (decompress_ok) { + /* Copy to pool so it persists */ + conn->body_data = rspamd_mempool_alloc(session->pool, zout.pos); + memcpy((void *) conn->body_data, zout.dst, zout.pos); + conn->body_len = zout.pos; + } + g_free(bp_decompressed); + } + else { + /* Uncompressed body — copy to pool */ + conn->body_data = rspamd_mempool_alloc(session->pool, bp_len); + memcpy((void *) conn->body_data, bp_data, bp_len); + conn->body_len = bp_len; + } + } + + rspamd_multipart_form_free(form); + } else { rspamd_ftok_t json_ct; RSPAMD_FTOK_ASSIGN(&json_ct, "application/json"); @@ -2303,7 +2460,14 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, if (session->client_milter_conn) { nsession = proxy_session_refresh(session); - if (body_offset > 0) { + if (bk_conn->body_data && bk_conn->body_len > 0) { + /* Body extracted from multipart response (v3) */ + rspamd_milter_send_task_results(nsession->client_milter_conn, + session->master_conn->results, + bk_conn->body_data, + bk_conn->body_len); + } + else if (body_offset > 0) { rspamd_milter_send_task_results(nsession->client_milter_conn, session->master_conn->results, msg->body_buf.begin + body_offset, @@ -2404,6 +2568,7 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task) break; case CMD_CHECK_V3: rspamd_task_set_finish_time(task); + rep = rspamd_protocol_write_ucl(task, RSPAMD_PROTOCOL_DEFAULT | RSPAMD_PROTOCOL_URLS); ctype = rspamd_protocol_http_reply_v3(msg, task); rspamd_protocol_write_log_pipe(task); break;