}
/* Extract optional body part */
+ unsigned char *body_decompressed = NULL;
const struct rspamd_multipart_entry_c *body_part =
rspamd_multipart_form_find(form, "body", sizeof("body") - 1);
if (body_part && body_part->data_len > 0) {
body = body_part->data;
bodylen = body_part->data_len;
- /* TODO: decompress body part if needed */
+
+ /* Decompress body part if needed */
+ if (body_part->content_encoding &&
+ body_part->content_encoding_len > 0 &&
+ rspamd_substring_search_caseless(body_part->content_encoding,
+ body_part->content_encoding_len,
+ "zstd", 4) != -1) {
+ ZSTD_DStream *bzstream = ZSTD_createDStream();
+ ZSTD_initDStream(bzstream);
+ ZSTD_inBuffer bzin = {body, bodylen, 0};
+ gsize boutlen = ZSTD_getDecompressedSize(body, bodylen);
+ if (boutlen == 0) boutlen = ZSTD_DStreamOutSize();
+ body_decompressed = g_malloc(boutlen);
+ ZSTD_outBuffer bzout = {body_decompressed, boutlen, 0};
+
+ while (bzin.pos < bzin.size) {
+ gsize r = ZSTD_decompressStream(bzstream, &bzout, &bzin);
+ if (ZSTD_isError(r)) {
+ g_free(body_decompressed);
+ body_decompressed = NULL;
+ ZSTD_freeDStream(bzstream);
+ /* Non-fatal: pass compressed body as-is */
+ body = body_part->data;
+ bodylen = body_part->data_len;
+ break;
+ }
+ if (bzout.pos == bzout.size) {
+ bzout.size *= 2;
+ body_decompressed = g_realloc(bzout.dst, bzout.size);
+ bzout.dst = body_decompressed;
+ }
+ }
+ if (body_decompressed) {
+ ZSTD_freeDStream(bzstream);
+ body = (const char *) bzout.dst;
+ bodylen = bzout.pos;
+ }
+ }
}
parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
req->input, req->ud, c->start_time,
c->send_time, body, bodylen, err);
g_error_free(err);
+ g_free(body_decompressed);
return 0;
}
req->input, req->ud,
c->start_time, c->send_time, body, bodylen, NULL);
ucl_parser_free(parser);
+ g_free(body_decompressed);
}
else {
err = g_error_new(RCLIENT_ERROR, 500,
#include "libserver/milter.h"
#include "libserver/milter_internal.h"
#include "libmime/lang_detection.h"
+#include "libmime/content_type.h"
+#include "libserver/multipart_form.h"
#include <math.h>
#include <string.h>
int parser_to_ref;
struct rspamd_task *task;
gsize reserved_tokens; /* Tokens reserved for this request (token bucket) */
+ const char *body_data; /* Extracted body from multipart response */
+ gsize body_len;
};
enum rspamd_proxy_legacy_support {
conn->results = ucl_object_lua_import(L, -1);
lua_settop(L, 0);
}
+ else if (ct && rspamd_substring_search_caseless(ct->begin, ct->len,
+ "multipart/mixed", sizeof("multipart/mixed") - 1) != -1) {
+ /* V3 multipart/mixed response */
+ struct rspamd_content_type *parsed_ct = rspamd_content_type_parse(
+ ct->begin, ct->len, session->pool);
+
+ if (!parsed_ct || parsed_ct->boundary.len == 0) {
+ msg_err_session("cannot extract boundary from multipart Content-Type");
+ return FALSE;
+ }
+
+ struct rspamd_multipart_form_c *form = rspamd_multipart_form_parse(
+ in, inlen, parsed_ct->boundary.begin, parsed_ct->boundary.len);
+
+ if (!form) {
+ msg_err_session("cannot parse multipart/mixed response");
+ return FALSE;
+ }
+
+ const struct rspamd_multipart_entry_c *result_part =
+ rspamd_multipart_form_find(form, "result", sizeof("result") - 1);
+
+ if (!result_part) {
+ msg_err_session("no 'result' part in multipart response");
+ rspamd_multipart_form_free(form);
+ return FALSE;
+ }
+
+ const char *result_data = result_part->data;
+ gsize result_len = result_part->data_len;
+ unsigned char *decompressed = NULL;
+
+ /* Check for per-part zstd compression */
+ if (result_part->content_encoding &&
+ result_part->content_encoding_len > 0 &&
+ rspamd_substring_search_caseless(result_part->content_encoding,
+ result_part->content_encoding_len,
+ "zstd", 4) != -1) {
+ ZSTD_DStream *zstream = ZSTD_createDStream();
+ ZSTD_initDStream(zstream);
+ ZSTD_inBuffer zin = {result_data, result_len, 0};
+ gsize outlen = ZSTD_getDecompressedSize(result_data, result_len);
+ if (outlen == 0) outlen = ZSTD_DStreamOutSize();
+ decompressed = g_malloc(outlen);
+ ZSTD_outBuffer zout = {decompressed, outlen, 0};
+
+ while (zin.pos < zin.size) {
+ gsize r = ZSTD_decompressStream(zstream, &zout, &zin);
+ if (ZSTD_isError(r)) {
+ g_free(decompressed);
+ ZSTD_freeDStream(zstream);
+ rspamd_multipart_form_free(form);
+ msg_err_session("result decompression error: %s",
+ ZSTD_getErrorName(r));
+ return FALSE;
+ }
+ if (zout.pos == zout.size) {
+ zout.size *= 2;
+ decompressed = g_realloc(zout.dst, zout.size);
+ zout.dst = decompressed;
+ }
+ }
+ ZSTD_freeDStream(zstream);
+ result_data = (const char *) zout.dst;
+ result_len = zout.pos;
+ }
+
+ /* Parse UCL from result part */
+ parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
+
+ if (result_part->content_type &&
+ rspamd_substring_search_caseless(result_part->content_type,
+ result_part->content_type_len,
+ "msgpack", 7) != -1) {
+ ucl_parser_add_chunk_full(parser, (const unsigned char *) result_data,
+ result_len,
+ ucl_parser_get_default_priority(parser),
+ UCL_DUPLICATE_APPEND, UCL_PARSE_MSGPACK);
+ }
+ else {
+ ucl_parser_add_chunk(parser, (const unsigned char *) result_data, result_len);
+ }
+
+ g_free(decompressed);
+
+ if (ucl_parser_get_error(parser)) {
+ msg_err_session("cannot parse result UCL: %s",
+ ucl_parser_get_error(parser));
+ ucl_parser_free(parser);
+ rspamd_multipart_form_free(form);
+ return FALSE;
+ }
+
+ conn->results = ucl_parser_get_object(parser);
+ ucl_parser_free(parser);
+
+ /* Extract optional body part for milter */
+ const struct rspamd_multipart_entry_c *body_part_entry =
+ rspamd_multipart_form_find(form, "body", sizeof("body") - 1);
+
+ if (body_part_entry && body_part_entry->data_len > 0) {
+ const char *bp_data = body_part_entry->data;
+ gsize bp_len = body_part_entry->data_len;
+
+ /* Check for per-part zstd compression on body */
+ if (body_part_entry->content_encoding &&
+ body_part_entry->content_encoding_len > 0 &&
+ rspamd_substring_search_caseless(body_part_entry->content_encoding,
+ body_part_entry->content_encoding_len,
+ "zstd", 4) != -1) {
+ ZSTD_DStream *zstream = ZSTD_createDStream();
+ ZSTD_initDStream(zstream);
+ ZSTD_inBuffer zin = {bp_data, bp_len, 0};
+ gsize outlen = ZSTD_getDecompressedSize(bp_data, bp_len);
+ if (outlen == 0) outlen = ZSTD_DStreamOutSize();
+ unsigned char *bp_decompressed = g_malloc(outlen);
+ ZSTD_outBuffer zout = {bp_decompressed, outlen, 0};
+ gboolean decompress_ok = TRUE;
+
+ while (zin.pos < zin.size) {
+ gsize r = ZSTD_decompressStream(zstream, &zout, &zin);
+ if (ZSTD_isError(r)) {
+ msg_warn_session("body decompression error: %s",
+ ZSTD_getErrorName(r));
+ decompress_ok = FALSE;
+ break;
+ }
+ if (zout.pos == zout.size) {
+ zout.size *= 2;
+ bp_decompressed = g_realloc(zout.dst, zout.size);
+ zout.dst = bp_decompressed;
+ }
+ }
+ ZSTD_freeDStream(zstream);
+
+ if (decompress_ok) {
+ /* Copy to pool so it persists */
+ conn->body_data = rspamd_mempool_alloc(session->pool, zout.pos);
+ memcpy((void *) conn->body_data, zout.dst, zout.pos);
+ conn->body_len = zout.pos;
+ }
+ g_free(bp_decompressed);
+ }
+ else {
+ /* Uncompressed body — copy to pool */
+ conn->body_data = rspamd_mempool_alloc(session->pool, bp_len);
+ memcpy((void *) conn->body_data, bp_data, bp_len);
+ conn->body_len = bp_len;
+ }
+ }
+
+ rspamd_multipart_form_free(form);
+ }
else {
rspamd_ftok_t json_ct;
RSPAMD_FTOK_ASSIGN(&json_ct, "application/json");
if (session->client_milter_conn) {
nsession = proxy_session_refresh(session);
- if (body_offset > 0) {
+ if (bk_conn->body_data && bk_conn->body_len > 0) {
+ /* Body extracted from multipart response (v3) */
+ rspamd_milter_send_task_results(nsession->client_milter_conn,
+ session->master_conn->results,
+ bk_conn->body_data,
+ bk_conn->body_len);
+ }
+ else if (body_offset > 0) {
rspamd_milter_send_task_results(nsession->client_milter_conn,
session->master_conn->results,
msg->body_buf.begin + body_offset,
break;
case CMD_CHECK_V3:
rspamd_task_set_finish_time(task);
+ rep = rspamd_protocol_write_ucl(task, RSPAMD_PROTOCOL_DEFAULT | RSPAMD_PROTOCOL_URLS);
ctype = rspamd_protocol_http_reply_v3(msg, task);
rspamd_protocol_write_log_pipe(task);
break;