From: Vsevolod Stakhov Date: Mon, 8 Dec 2025 17:21:03 +0000 (+0000) Subject: [Fix] Handle HTTP early server responses during request write X-Git-Tag: 3.14.2~4^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=01df1d4497f79960d3b17c4bab678e46767fd832;p=thirdparty%2Frspamd.git [Fix] Handle HTTP early server responses during request write Fix HTTP client to properly handle early server responses (e.g., 413 Too Large) that arrive before the client has finished sending the request body. This is allowed by HTTP/1.1 (RFC 7230 Section 6.5). - Use bitwise AND for event flag checks to handle combined EV_READ|EV_WRITE - Watch for both READ and WRITE events during write phase - Check for early response on write errors (EPIPE, ECONNRESET) - Add RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE flag to track state --- diff --git a/src/libserver/http/http_connection.c b/src/libserver/http/http_connection.c index 70158e294b..6e7bea1025 100644 --- a/src/libserver/http/http_connection.c +++ b/src/libserver/http/http_connection.c @@ -53,11 +53,18 @@ enum rspamd_http_priv_flags { RSPAMD_HTTP_CONN_FLAG_PROXY = 1u << 5u, RSPAMD_HTTP_CONN_FLAG_PROXY_REQUEST = 1u << 6u, RSPAMD_HTTP_CONN_OWN_SOCKET = 1u << 7u, + RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE = 1u << 8u, /* Server sent early response during write */ }; #define IS_CONN_ENCRYPTED(c) ((c)->flags & RSPAMD_HTTP_CONN_FLAG_ENCRYPTED) #define IS_CONN_RESETED(c) ((c)->flags & RSPAMD_HTTP_CONN_FLAG_RESETED) +static gssize rspamd_http_try_read(int fd, + struct rspamd_http_connection *conn, + struct rspamd_http_connection_private *priv, + struct _rspamd_http_privbuf *pbuf, + const char **buf_ptr); + struct rspamd_http_connection_private { struct rspamd_http_context *ctx; struct rspamd_ssl_connection *ssl; @@ -826,8 +833,43 @@ rspamd_http_write_helper(struct rspamd_http_connection *conn) } if (r == -1) { + int saved_errno = errno; + if (!priv->ssl) { - err = g_error_new(HTTP_ERROR, 500, "IO write error: %s", strerror(errno)); + /* Check if server sent an early response (e.g., 413) */ + if (saved_errno == EPIPE || saved_errno == ECONNRESET || + saved_errno == ENOTCONN) { + struct _rspamd_http_privbuf *pbuf = priv->buf; + const char *d; + gssize read_res; + + REF_RETAIN(pbuf); + read_res = rspamd_http_try_read(conn->fd, conn, priv, pbuf, &d); + + if (read_res > 0) { + msg_debug("got early server response (%z bytes) during write, processing", + read_res); + + if (http_parser_execute(&priv->parser, &priv->parser_cb, + d, read_res) != (size_t) read_res || + priv->parser.http_errno != 0) { + err = g_error_new(HTTP_ERROR, 400, + "HTTP parser error on early response: %s", + http_errno_description(priv->parser.http_errno)); + rspamd_http_connection_ref(conn); + conn->error_handler(conn, err); + rspamd_http_connection_unref(conn); + g_error_free(err); + } + + REF_RELEASE(pbuf); + return; + } + + REF_RELEASE(pbuf); + } + + err = g_error_new(HTTP_ERROR, 500, "IO write error: %s", strerror(saved_errno)); rspamd_http_connection_ref(conn); conn->error_handler(conn, err); rspamd_http_connection_unref(conn); @@ -866,10 +908,15 @@ call_finish_handler: rspamd_http_connection_unref(conn); } else { - /* Plan read message */ - /* Switch to read stage timeout */ - priv->first_write_done = true; - rspamd_http_simple_client_helper(conn); + if (priv->flags & RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE) { + /* Early response already being processed */ + rspamd_ev_watcher_reschedule(priv->ctx->event_loop, &priv->ev, EV_READ); + } + else { + /* Plan read message */ + priv->first_write_done = true; + rspamd_http_simple_client_helper(conn); + } } } @@ -955,7 +1002,16 @@ rspamd_http_event_handler(int fd, short what, gpointer ud) REF_RETAIN(pbuf); rspamd_http_connection_ref(conn); - if (what == EV_READ) { + if (what & EV_READ) { + /* Check for early response while still sending request */ + if (priv->wr_pos < priv->wr_total && priv->wr_total > 0) { + msg_debug("received early server response while still writing request " + "(sent %uz of %uz bytes)", + priv->wr_pos, priv->wr_total); + priv->wr_pos = priv->wr_total; + priv->flags |= RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE; + } + r = rspamd_http_try_read(fd, conn, priv, pbuf, &d); if (r > 0) { @@ -1039,7 +1095,7 @@ rspamd_http_event_handler(int fd, short what, gpointer ud) return; } } - else if (what == EV_TIMEOUT) { + else if (what & EV_TIMEOUT) { if (!priv->ssl) { /* Let's try to read from the socket first */ r = rspamd_http_try_read(fd, conn, priv, pbuf, &d); @@ -1087,7 +1143,7 @@ rspamd_http_event_handler(int fd, short what, gpointer ud) return; } } - else if (what == EV_WRITE) { + else if (what & EV_WRITE) { rspamd_http_write_helper(conn); } @@ -2440,12 +2496,13 @@ if (conn->opts & RSPAMD_HTTP_CLIENT_SSL) { rspamd_http_event_handler, rspamd_http_ssl_err_handler, conn, - EV_WRITE); + EV_WRITE | EV_READ); } } } else { - rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE, + /* Watch for both READ and WRITE to detect early server responses */ + rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE | EV_READ, rspamd_http_event_handler, conn); /* Use connect_timeout on initial EV_WRITE stage if provided */ ev_tstamp start_to = (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout); diff --git a/test/functional/cases/221_http_early_response.robot b/test/functional/cases/221_http_early_response.robot new file mode 100644 index 0000000000..06f0a7a426 --- /dev/null +++ b/test/functional/cases/221_http_early_response.robot @@ -0,0 +1,151 @@ +*** Settings *** +Test Setup Http Early Response Setup +Test Teardown Http Early Response Teardown +Library Process +Library ${RSPAMD_TESTDIR}/lib/rspamd.py +Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot +Variables ${RSPAMD_TESTDIR}/lib/vars.py + +*** Variables *** +${CONFIG} ${RSPAMD_TESTDIR}/configs/lua_test.conf +${MESSAGE} ${RSPAMD_TESTDIR}/messages/spam_message.eml +${RSPAMD_LUA_SCRIPT} ${RSPAMD_TESTDIR}/lua/http_early_response.lua +${RSPAMD_SCOPE} Suite +${RSPAMD_URL_TLD} ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat + +*** Test Cases *** +HTTP Early Reply + [Documentation] Test server sending response before reading request body + Scan File ${MESSAGE} test-type=early-reply + ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]} + # We expect either success (200) or an error - both indicate the client handled + # the early response scenario (success = client received response, error = connection issue) + ${result} = Run Keyword And Return Status Expect Symbol HTTP_EARLY_REPLY_200 + IF not ${result} + Expect Symbol HTTP_EARLY_REPLY_ERROR + END + +HTTP Early 413 Error + [Documentation] Test server sending 413 error before reading request body + Scan File ${MESSAGE} test-type=early-413 + ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_EARLY_413_413 + IF not ${result} + Expect Symbol HTTP_EARLY_413_ERROR + END + +HTTP Keepalive Early Response + [Documentation] Test keepalive with server sending early response + Scan File ${MESSAGE} test-type=keepalive-early + ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_KEEPALIVE_EARLY_200 + IF not ${result} + Expect Symbol HTTP_KEEPALIVE_EARLY_ERROR + END + +HTTP Early Reply Coroutine + [Documentation] Test early response with coroutine-based HTTP request + Scan File ${MESSAGE} test-type=early-coro + ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_EARLY_CORO_200 + IF not ${result} + Expect Symbol HTTP_EARLY_CORO_ERROR + END + +HTTP Normal Request Baseline + [Documentation] Baseline test with normal request handling + Scan File ${MESSAGE} test-type=normal + ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]} + Expect Symbol HTTP_NORMAL_200 + +HTTP Keepalive Sequential + [Documentation] Test sequential keepalive requests + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_KEEPALIVE_SEQUENTIAL_TEST]} + # Should have at least some successful requests + Expect Symbol HTTP_KEEPALIVE_SEQ_SUCCESS + +HTTP Early Keepalive Stress + [Documentation] Stress test mixing early responses with keepalive + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_EARLY_KEEPALIVE_STRESS_TEST]} + Expect Symbol HTTP_EARLY_KEEPALIVE_STRESS + +HTTP Immediate Close Large Body + [Documentation] Test server immediately closing connection during large body send + [Tags] aggressive + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_IMMEDIATE_CLOSE_TEST]} + # Either we get the 413 response OR an error - both are valid outcomes + ${result} = Run Keyword And Return Status Expect Symbol HTTP_IMMEDIATE_CLOSE_413 + IF not ${result} + Expect Symbol HTTP_IMMEDIATE_CLOSE_ERROR + END + +HTTP Slow Response Large Body + [Documentation] Test server responding slowly while client sends large body + [Tags] aggressive + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_SLOW_RESPONSE_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_SLOW_RESPONSE_200 + IF not ${result} + Expect Symbol HTTP_SLOW_RESPONSE_ERROR + END + +HTTP Rapid Close Requests + [Documentation] Rapid sequential requests to server that closes immediately + [Tags] aggressive + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_RAPID_CLOSE_TEST]} + Expect Symbol HTTP_RAPID_CLOSE_RESULTS + +HTTP Block And Reply + [Documentation] TRUE early response test - 512KB body with server not reading + [Tags] early-response critical + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_BLOCK_REPLY_TEST]} + # Ideal: HTTP_BLOCK_REPLY_413 (got early response) + # Acceptable: HTTP_BLOCK_REPLY_ERROR (connection error, but no crash) + ${result} = Run Keyword And Return Status Expect Symbol HTTP_BLOCK_REPLY_413 + IF not ${result} + Expect Symbol HTTP_BLOCK_REPLY_ERROR + END + +HTTP Block And Reply Coroutine + [Documentation] Coroutine version of block-and-reply test + [Tags] early-response critical + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_BLOCK_REPLY_CORO_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_BLOCK_REPLY_CORO_413 + IF not ${result} + Expect Symbol HTTP_BLOCK_REPLY_CORO_ERROR + END + +HTTP Block Slow Response + [Documentation] Server waits 1s then responds - 1MB body + [Tags] early-response slow + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_BLOCK_SLOW_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_BLOCK_SLOW_503 + IF not ${result} + Expect Symbol HTTP_BLOCK_SLOW_ERROR + END + +HTTP Instant Reply + [Documentation] Server responds BEFORE reading headers - most aggressive early response + [Tags] early-response critical + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [HTTP_INSTANT_REPLY_TEST]} + ${result} = Run Keyword And Return Status Expect Symbol HTTP_INSTANT_REPLY_413 + IF not ${result} + Expect Symbol HTTP_INSTANT_REPLY_ERROR + END + +*** Keywords *** +Http Early Response Setup + Run Dummy Http Early Response + Rspamd Setup + +Http Early Response Teardown + Rspamd Teardown + Dummy Http Early Teardown diff --git a/test/functional/lib/rspamd.robot b/test/functional/lib/rspamd.robot index 4fa6995202..aa6a2f02cd 100644 --- a/test/functional/lib/rspamd.robot +++ b/test/functional/lib/rspamd.robot @@ -551,3 +551,22 @@ Dummy Http Teardown Dummy Https Teardown Terminate Process ${DUMMY_HTTPS_PROC} Wait For Process ${DUMMY_HTTPS_PROC} + +Run Dummy Http Early Response + ${result} = Start Process ${RSPAMD_TESTDIR}/util/dummy_http_early_response.py -pf /tmp/dummy_http_early.pid -p 18083 + ... stderr=/tmp/dummy_http_early.log stdout=/tmp/dummy_http_early.log + ${status} ${error} = Run Keyword And Ignore Error Wait Until Created /tmp/dummy_http_early.pid timeout=2 second + IF '${status}' == 'FAIL' + ${logstatus} ${log} = Run Keyword And Ignore Error Get File /tmp/dummy_http_early.log + IF '${logstatus}' == 'PASS' + Log dummy_http_early_response.py failed to start. Log output:\n${log} level=ERROR + ELSE + Log dummy_http_early_response.py failed to start. No log file found at /tmp/dummy_http_early.log level=ERROR + END + Fail dummy_http_early_response.py did not create PID file in 2 seconds + END + Export Scoped Variables ${RSPAMD_SCOPE} DUMMY_HTTP_EARLY_PROC=${result} + +Dummy Http Early Teardown + Terminate Process ${DUMMY_HTTP_EARLY_PROC} + Wait For Process ${DUMMY_HTTP_EARLY_PROC} diff --git a/test/functional/lua/http_early_response.lua b/test/functional/lua/http_early_response.lua new file mode 100644 index 0000000000..fba8d3053a --- /dev/null +++ b/test/functional/lua/http_early_response.lua @@ -0,0 +1,596 @@ +--[[ +Test HTTP client behavior when server sends early responses. + +This tests edge cases where a server responds before the client has finished +sending the request body (which is allowed by HTTP/1.1 spec). + +The test server (dummy_http_early_response.py) runs on port 18083. +]] + +local rspamd_http = require "rspamd_http" +local rspamd_logger = require "rspamd_logger" + +-- Register all possible result symbols upfront +-- These are the symbols that will be inserted based on HTTP response codes +local result_symbols = { + -- Test 1: Early reply + 'HTTP_EARLY_REPLY_ERROR', + 'HTTP_EARLY_REPLY_200', + 'HTTP_EARLY_REPLY_413', + 'HTTP_EARLY_REPLY_500', + -- Test 2: Early 413 + 'HTTP_EARLY_413_ERROR', + 'HTTP_EARLY_413_413', + 'HTTP_EARLY_413_200', + -- Test 3: Keepalive early + 'HTTP_KEEPALIVE_EARLY_ERROR', + 'HTTP_KEEPALIVE_EARLY_200', + -- Test 4: Coroutine early + 'HTTP_EARLY_CORO_ERROR', + 'HTTP_EARLY_CORO_200', + -- Test 5: Normal + 'HTTP_NORMAL_ERROR', + 'HTTP_NORMAL_200', + -- Test 6: Keepalive sequential + 'HTTP_KEEPALIVE_SEQ_ERRORS', + 'HTTP_KEEPALIVE_SEQ_SUCCESS', + -- Test 7: Keepalive stress + 'HTTP_EARLY_KEEPALIVE_STRESS', + -- Test 8: Immediate close + 'HTTP_IMMEDIATE_CLOSE_ERROR', + 'HTTP_IMMEDIATE_CLOSE_413', + 'HTTP_IMMEDIATE_CLOSE_200', + -- Test 9: Slow response + 'HTTP_SLOW_RESPONSE_ERROR', + 'HTTP_SLOW_RESPONSE_200', + -- Test 10: Rapid close + 'HTTP_RAPID_CLOSE_RESULTS', + -- Test 11: Block and reply + 'HTTP_BLOCK_REPLY_ERROR', + 'HTTP_BLOCK_REPLY_413', + 'HTTP_BLOCK_REPLY_200', + -- Test 12: Block and reply coro + 'HTTP_BLOCK_REPLY_CORO_ERROR', + 'HTTP_BLOCK_REPLY_CORO_413', + 'HTTP_BLOCK_REPLY_CORO_200', + -- Test 13: Block slow + 'HTTP_BLOCK_SLOW_ERROR', + 'HTTP_BLOCK_SLOW_503', + 'HTTP_BLOCK_SLOW_200', + -- Test 14: Instant reply (before headers even read) + 'HTTP_INSTANT_REPLY_ERROR', + 'HTTP_INSTANT_REPLY_413', +} + +-- Register all result symbols as virtual symbols +for _, sym_name in ipairs(result_symbols) do + rspamd_config:register_symbol({ + name = sym_name, + score = 0.0, + type = 'virtual', + parent = -1, -- Will be set properly by parent registration + }) +end + +-- Test 1: Early reply - server responds before reading body +local function test_early_reply(task) + rspamd_logger.errx(task, 'test_early_reply: starting') + + -- Create a large body to increase chance of race condition + local body_parts = {} + for i = 1, 1000 do + body_parts[i] = string.format("line %d: some test data that we are sending\n", i) + end + local large_body = table.concat(body_parts) + + local function callback(err, code, body) + if err then + rspamd_logger.errx(task, 'test_early_reply callback error: %s', err) + task:insert_result('HTTP_EARLY_REPLY_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_early_reply callback success: code=%s body=%s', code, body) + task:insert_result('HTTP_EARLY_REPLY_' .. tostring(code), 1.0, body) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18083/early-reply', + task = task, + method = 'post', + body = large_body, + callback = callback, + timeout = 5, + }) +end + +-- Test 2: Early 413 error - server sends error before reading body +local function test_early_error_413(task) + rspamd_logger.errx(task, 'test_early_error_413: starting') + + -- Create a large body + local body_parts = {} + for i = 1, 1000 do + body_parts[i] = string.format("line %d: large body data\n", i) + end + local large_body = table.concat(body_parts) + + local function callback(err, code, body) + if err then + rspamd_logger.errx(task, 'test_early_error_413 callback error: %s', err) + task:insert_result('HTTP_EARLY_413_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_early_error_413 callback success: code=%s body=%s', code, body) + task:insert_result('HTTP_EARLY_413_' .. tostring(code), 1.0, body) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18083/early-error-413', + task = task, + method = 'post', + body = large_body, + callback = callback, + timeout = 5, + }) +end + +-- Test 3: Keep-alive with early response +local function test_keepalive_early(task) + rspamd_logger.errx(task, 'test_keepalive_early: starting') + + local body = "test body for keepalive" + + local function callback(err, code, body_response) + if err then + rspamd_logger.errx(task, 'test_keepalive_early callback error: %s', err) + task:insert_result('HTTP_KEEPALIVE_EARLY_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_keepalive_early callback success: code=%s', code) + task:insert_result('HTTP_KEEPALIVE_EARLY_' .. tostring(code), 1.0, body_response) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18083/keepalive-early', + task = task, + method = 'post', + body = body, + callback = callback, + timeout = 5, + keepalive = true, + }) +end + +-- Test 4: Coroutine-based early reply test +local function test_early_reply_coro(task) + rspamd_logger.errx(task, 'test_early_reply_coro: starting') + + local body_parts = {} + for i = 1, 500 do + body_parts[i] = string.format("coro line %d: test data\n", i) + end + local body = table.concat(body_parts) + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/early-reply', + task = task, + method = 'post', + body = body, + timeout = 5, + }) + + if err then + rspamd_logger.errx(task, 'test_early_reply_coro error: %s', err) + task:insert_result('HTTP_EARLY_CORO_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_early_reply_coro success: code=%s', response.code) + task:insert_result('HTTP_EARLY_CORO_' .. tostring(response.code), 1.0, response.content) + end +end + +-- Test 5: Multiple requests to normal endpoint (baseline) +local function test_normal_request(task) + rspamd_logger.errx(task, 'test_normal_request: starting') + + local function callback(err, code, body) + if err then + rspamd_logger.errx(task, 'test_normal_request callback error: %s', err) + task:insert_result('HTTP_NORMAL_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_normal_request callback success: code=%s', code) + task:insert_result('HTTP_NORMAL_' .. tostring(code), 1.0, body) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18083/request', + task = task, + method = 'post', + body = 'normal test body', + callback = callback, + timeout = 5, + }) +end + +-- Main test symbol that runs all tests +local function http_early_response_tests(task) + local test_type = tostring(task:get_request_header('test-type') or 'all') + + rspamd_logger.errx(task, 'http_early_response_tests: test_type=%s', test_type) + + if test_type == 'early-reply' or test_type == 'all' then + test_early_reply(task) + end + + if test_type == 'early-413' or test_type == 'all' then + test_early_error_413(task) + end + + if test_type == 'keepalive-early' or test_type == 'all' then + test_keepalive_early(task) + end + + if test_type == 'early-coro' or test_type == 'all' then + test_early_reply_coro(task) + end + + if test_type == 'normal' or test_type == 'all' then + test_normal_request(task) + end +end + +rspamd_config:register_symbol({ + name = 'HTTP_EARLY_RESPONSE_TEST', + score = 1.0, + callback = http_early_response_tests, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 6: Sequential keepalive requests (stress test for keepalive pool) +local function test_keepalive_sequential(task) + rspamd_logger.errx(task, 'test_keepalive_sequential: starting') + + local success_count = 0 + local error_count = 0 + local errors = {} + + -- Make 3 sequential requests using keepalive + for i = 1, 3 do + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/keepalive-normal', + task = task, + method = 'post', + body = string.format('request %d body', i), + timeout = 5, + keepalive = true, + }) + + if err then + error_count = error_count + 1 + errors[#errors + 1] = string.format('req%d: %s', i, err) + rspamd_logger.errx(task, 'keepalive request %d error: %s', i, err) + else + success_count = success_count + 1 + rspamd_logger.errx(task, 'keepalive request %d success: code=%s', i, response.code) + end + end + + if error_count > 0 then + task:insert_result('HTTP_KEEPALIVE_SEQ_ERRORS', 1.0, table.concat(errors, '; ')) + end + task:insert_result('HTTP_KEEPALIVE_SEQ_SUCCESS', 1.0, tostring(success_count)) +end + +rspamd_config:register_symbol({ + name = 'HTTP_KEEPALIVE_SEQUENTIAL_TEST', + score = 1.0, + callback = test_keepalive_sequential, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 7: Stress test with early responses + keepalive +local function test_early_keepalive_stress(task) + rspamd_logger.errx(task, 'test_early_keepalive_stress: starting') + + local results = {} + + -- Mix of normal and early-response requests + local endpoints = { + '/request', + '/early-reply', + '/request', + '/keepalive-early', + '/request', + } + + for i, endpoint in ipairs(endpoints) do + local body_parts = {} + for j = 1, 100 do + body_parts[j] = string.format("stress test line %d-%d\n", i, j) + end + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083' .. endpoint, + task = task, + method = 'post', + body = table.concat(body_parts), + timeout = 5, + keepalive = true, + }) + + if err then + results[#results + 1] = string.format('%s:err:%s', endpoint, err) + else + results[#results + 1] = string.format('%s:ok:%d', endpoint, response.code) + end + end + + task:insert_result('HTTP_EARLY_KEEPALIVE_STRESS', 1.0, table.concat(results, '|')) +end + +rspamd_config:register_symbol({ + name = 'HTTP_EARLY_KEEPALIVE_STRESS_TEST', + score = 1.0, + callback = test_early_keepalive_stress, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 8: Aggressive immediate close with large body +-- This should trigger actual failures - server closes socket while client writes +local function test_immediate_close_large(task) + rspamd_logger.errx(task, 'test_immediate_close_large: starting') + + -- Create a VERY large body that won't fit in socket buffer + -- This forces the client to block on write, at which point server closes + local body_parts = {} + for i = 1, 10000 do -- ~500KB body + body_parts[i] = string.format("line %05d: this is padding data to make the body very large and exceed socket buffers\n", i) + end + local large_body = table.concat(body_parts) + rspamd_logger.errx(task, 'test_immediate_close_large: body size = %d bytes', #large_body) + + local function callback(err, code, body) + if err then + rspamd_logger.errx(task, 'test_immediate_close_large callback error: %s', err) + -- Error is EXPECTED here - we want to see how client handles it + task:insert_result('HTTP_IMMEDIATE_CLOSE_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_immediate_close_large callback success: code=%s', code) + -- Success means client received the 413 response despite server closing + task:insert_result('HTTP_IMMEDIATE_CLOSE_' .. tostring(code), 1.0, body) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18083/immediate-close-413', + task = task, + method = 'post', + body = large_body, + callback = callback, + timeout = 10, + }) +end + +rspamd_config:register_symbol({ + name = 'HTTP_IMMEDIATE_CLOSE_TEST', + score = 1.0, + callback = test_immediate_close_large, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 9: Slow response with large body - tests race condition +local function test_slow_response_large(task) + rspamd_logger.errx(task, 'test_slow_response_large: starting') + + local body_parts = {} + for i = 1, 5000 do -- ~250KB body + body_parts[i] = string.format("slow test line %05d: padding data for the test\n", i) + end + local body = table.concat(body_parts) + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/slow-response-no-drain', + task = task, + method = 'post', + body = body, + timeout = 10, + }) + + if err then + rspamd_logger.errx(task, 'test_slow_response_large error: %s', err) + task:insert_result('HTTP_SLOW_RESPONSE_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_slow_response_large success: code=%s', response.code) + task:insert_result('HTTP_SLOW_RESPONSE_' .. tostring(response.code), 1.0, response.content) + end +end + +rspamd_config:register_symbol({ + name = 'HTTP_SLOW_RESPONSE_TEST', + score = 1.0, + callback = test_slow_response_large, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 10: Multiple rapid requests to immediate-close endpoint +-- This stresses the connection handling and cleanup +local function test_rapid_close_requests(task) + rspamd_logger.errx(task, 'test_rapid_close_requests: starting') + + local results = {} + local body = string.rep("x", 10000) -- 10KB body + + for i = 1, 5 do + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/immediate-close-413', + task = task, + method = 'post', + body = body, + timeout = 5, + }) + + if err then + results[#results + 1] = string.format('req%d:err:%s', i, err) + else + results[#results + 1] = string.format('req%d:ok:%d', i, response.code) + end + end + + task:insert_result('HTTP_RAPID_CLOSE_RESULTS', 1.0, table.concat(results, '|')) +end + +rspamd_config:register_symbol({ + name = 'HTTP_RAPID_CLOSE_TEST', + score = 1.0, + callback = test_rapid_close_requests, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 11: TRUE early response test with buffer-exceeding body +-- This test sends a body larger than socket buffers (~256KB+) to ensure +-- the client actually blocks on write while server sends response +local function test_block_and_reply(task) + rspamd_logger.errx(task, 'test_block_and_reply: starting') + + -- Create body larger than socket buffers (256KB+ for localhost) + -- macOS has 128KB send + 128KB receive buffers + local body_size = 512 * 1024 -- 512KB should definitely exceed buffers + local chunk = string.rep("X", 1024) -- 1KB chunk + local body_parts = {} + for i = 1, body_size / 1024 do + body_parts[i] = chunk + end + local huge_body = table.concat(body_parts) + rspamd_logger.errx(task, 'test_block_and_reply: body size = %d bytes', #huge_body) + + local function callback(err, code, body) + if err then + rspamd_logger.errx(task, 'test_block_and_reply error: %s', err) + task:insert_result('HTTP_BLOCK_REPLY_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_block_and_reply success: code=%s body=%s', code, body) + -- This is the IDEAL outcome - we got the early 413 response! + task:insert_result('HTTP_BLOCK_REPLY_' .. tostring(code), 1.0, body) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18083/block-and-reply', + task = task, + method = 'post', + body = huge_body, + callback = callback, + timeout = 10, + }) +end + +rspamd_config:register_symbol({ + name = 'HTTP_BLOCK_REPLY_TEST', + score = 1.0, + callback = test_block_and_reply, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 12: Coroutine version of block-and-reply +local function test_block_and_reply_coro(task) + rspamd_logger.errx(task, 'test_block_and_reply_coro: starting') + + -- 512KB body + local huge_body = string.rep("Y", 512 * 1024) + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/block-and-reply', + task = task, + method = 'post', + body = huge_body, + timeout = 10, + }) + + if err then + rspamd_logger.errx(task, 'test_block_and_reply_coro error: %s', err) + task:insert_result('HTTP_BLOCK_REPLY_CORO_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_block_and_reply_coro success: code=%s', response.code) + task:insert_result('HTTP_BLOCK_REPLY_CORO_' .. tostring(response.code), 1.0, response.content) + end +end + +rspamd_config:register_symbol({ + name = 'HTTP_BLOCK_REPLY_CORO_TEST', + score = 1.0, + callback = test_block_and_reply_coro, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 13: Slow block test - server waits even longer +local function test_block_slow(task) + rspamd_logger.errx(task, 'test_block_slow: starting') + + -- 1MB body to really fill things up + local huge_body = string.rep("Z", 1024 * 1024) + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/block-and-reply-slow', + task = task, + method = 'post', + body = huge_body, + timeout = 15, -- Longer timeout since server waits 1 second + }) + + if err then + rspamd_logger.errx(task, 'test_block_slow error: %s', err) + task:insert_result('HTTP_BLOCK_SLOW_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_block_slow success: code=%s', response.code) + task:insert_result('HTTP_BLOCK_SLOW_' .. tostring(response.code), 1.0, response.content) + end +end + +rspamd_config:register_symbol({ + name = 'HTTP_BLOCK_SLOW_TEST', + score = 1.0, + callback = test_block_slow, + no_squeeze = true, + flags = 'coro' +}) + +-- Test 14: Instant reply - server responds BEFORE reading headers +-- This is the most aggressive early response test +local function test_instant_reply(task) + rspamd_logger.errx(task, 'test_instant_reply: starting') + + -- 512KB body - server will respond before even reading our headers + local huge_body = string.rep("Z", 512 * 1024) + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18083/instant-reply', + task = task, + method = 'post', + body = huge_body, + timeout = 10, + }) + + if err then + rspamd_logger.errx(task, 'test_instant_reply error: %s', err) + task:insert_result('HTTP_INSTANT_REPLY_ERROR', 1.0, err) + else + rspamd_logger.errx(task, 'test_instant_reply success: code=%s', response.code) + task:insert_result('HTTP_INSTANT_REPLY_' .. tostring(response.code), 1.0, response.content) + end +end + +rspamd_config:register_symbol({ + name = 'HTTP_INSTANT_REPLY_TEST', + score = 1.0, + callback = test_instant_reply, + no_squeeze = true, + flags = 'coro' +}) diff --git a/test/functional/util/dummy_http_early_response.py b/test/functional/util/dummy_http_early_response.py new file mode 100755 index 0000000000..2fbb2942cc --- /dev/null +++ b/test/functional/util/dummy_http_early_response.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +""" +A deliberately "buggy" HTTP server that sends early responses before +reading the full client request. This is used to test rspamd's HTTP +client handling of edge cases that are allowed by HTTP/1.1 spec. + +Scenarios implemented: +1. /early-reply - Send response immediately after reading headers, before body +2. /early-error-413 - Send 413 error and close after reading just the request line +3. /early-error-close - Send error and immediately close connection (no keep-alive) +4. /keepalive-early - Send response early but keep connection alive +5. /slow-read-fast-reply - Read request very slowly but reply quickly +""" + +import asyncio +import argparse +import sys +import os + +# Add parent directory to path for dummy_killer +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +import dummy_killer + + +class EarlyResponseServer: + def __init__(self, host: str, port: int): + self.host = host + self.port = port + self.server = None + + async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + """Handle a single client connection.""" + addr = writer.get_extra_info('peername') + print(f"Connection from {addr}", file=sys.stderr) + + try: + # Read just the request line first + request_line = await asyncio.wait_for(reader.readline(), timeout=10.0) + if not request_line: + return + + request_line = request_line.decode('utf-8', errors='replace').strip() + print(f"Request line: {request_line}", file=sys.stderr) + + parts = request_line.split(' ') + if len(parts) < 2: + return + + method = parts[0] + path = parts[1] + + # For /instant-reply, send response BEFORE even reading headers + # This is the most aggressive early response - client is still sending headers+body + if path == '/instant-reply': + print(f"instant-reply: sending 413 BEFORE reading headers!", file=sys.stderr) + await self._send_response(writer, 413, "Request Entity Too Large", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Instant 413 - rejected at request line") + await writer.drain() + print(f"instant-reply: response sent, closing", file=sys.stderr) + return # Close without reading anything else + + # Read headers (but we might not read body depending on path) + headers = {} + content_length = 0 + while True: + line = await asyncio.wait_for(reader.readline(), timeout=10.0) + if not line or line == b'\r\n' or line == b'\n': + break + line = line.decode('utf-8', errors='replace').strip() + if ':' in line: + key, value = line.split(':', 1) + headers[key.strip().lower()] = value.strip() + if key.strip().lower() == 'content-length': + content_length = int(value.strip()) + + print(f"Headers received, Content-Length: {content_length}", file=sys.stderr) + + # Handle different test scenarios based on path + if path == '/early-reply': + # Send response immediately WITHOUT reading body + # This tests client handling when server replies before body is fully sent + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", "Connection": "close"}, + b"early reply - body not read") + # Now try to drain remaining data + if content_length > 0: + try: + remaining = await asyncio.wait_for(reader.read(content_length), timeout=1.0) + print(f"Drained {len(remaining)} bytes of body", file=sys.stderr) + except asyncio.TimeoutError: + print("Timeout draining body", file=sys.stderr) + + elif path == '/early-error-413': + # Send 413 immediately and close - simulates "request too large" + # Don't even try to read the body + await self._send_response(writer, 413, "Request Entity Too Large", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Request body too large") + # Close immediately without reading body + + elif path == '/early-error-close': + # Send error and close connection abruptly + await self._send_response(writer, 500, "Internal Server Error", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Server error - closing") + + elif path == '/keepalive-early': + # Send response early but indicate keep-alive + # This tests if client can handle early response + continue with keepalive + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", + "Connection": "keep-alive", + "Keep-Alive": "timeout=30"}, + b"early keepalive response") + # Read the body to properly maintain connection state + if content_length > 0: + body = await asyncio.wait_for(reader.read(content_length), timeout=5.0) + print(f"Read body: {len(body)} bytes", file=sys.stderr) + + # Wait for potential next request on keep-alive connection + try: + next_line = await asyncio.wait_for(reader.readline(), timeout=5.0) + if next_line: + print(f"Keep-alive follow-up: {next_line}", file=sys.stderr) + # Handle second request simply + while True: + line = await asyncio.wait_for(reader.readline(), timeout=5.0) + if not line or line == b'\r\n': + break + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", "Connection": "close"}, + b"second response on keepalive") + except asyncio.TimeoutError: + print("No follow-up request on keepalive", file=sys.stderr) + + elif path == '/slow-read-fast-reply': + # Start reading body very slowly, but send reply quickly + # This creates a race condition + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", "Connection": "close"}, + b"fast reply while slowly reading") + # Now slowly read + if content_length > 0: + read_so_far = 0 + while read_so_far < content_length: + chunk = await reader.read(min(100, content_length - read_so_far)) + if not chunk: + break + read_so_far += len(chunk) + await asyncio.sleep(0.1) # Slow down + + elif path == '/partial-read-then-reply': + # Read only part of the body, then send response + if content_length > 0: + # Read only first 100 bytes + partial = await reader.read(min(100, content_length)) + print(f"Read partial body: {len(partial)} bytes", file=sys.stderr) + + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", "Connection": "close"}, + b"response after partial body read") + + elif path == '/immediate-close-413': + # Send 413 and close socket IMMEDIATELY without reading anything else + # This is the most aggressive case - RST may be sent if client is still writing + await self._send_response(writer, 413, "Request Entity Too Large", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Too large - closing immediately") + # Force close without draining - this should cause client write to fail + writer.transport.abort() # Forceful close + return # Skip normal close + + elif path == '/block-and-reply': + # TRUE early response test: + # Send response IMMEDIATELY after headers, before body starts. + # The client should receive this while still preparing/sending body. + print(f"block-and-reply: sending 413 IMMEDIATELY (content-length={content_length})", file=sys.stderr) + # Send response right away - no waiting + await self._send_response(writer, 413, "Request Entity Too Large", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Early 413 - body was never read") + # Flush immediately + await writer.drain() + print(f"block-and-reply: response sent, closing without reading body", file=sys.stderr) + # Don't read body, just close + + elif path == '/block-and-reply-slow': + # Even more aggressive: wait longer to let more data queue up + print(f"block-and-reply-slow: waiting for client to fill buffers", file=sys.stderr) + await asyncio.sleep(1.0) # Wait for client to really fill up buffers + await self._send_response(writer, 503, "Service Unavailable", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Server busy - your data was ignored") + await asyncio.sleep(0.1) + + elif path == '/slow-response-no-drain': + # Wait a bit (let client send more data), then respond without reading + await asyncio.sleep(0.5) # Let client start/continue sending + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", "Connection": "close"}, + b"slow response - body not read") + # Don't drain - close with data potentially still in flight + + elif path == '/request': + # Normal request handling for comparison + if content_length > 0: + body = await asyncio.wait_for(reader.read(content_length), timeout=10.0) + print(f"Read full body: {len(body)} bytes", file=sys.stderr) + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", "Connection": "close"}, + b"normal response") + + elif path == '/keepalive-normal': + # Normal keep-alive handling for comparison + if content_length > 0: + body = await asyncio.wait_for(reader.read(content_length), timeout=10.0) + await self._send_response(writer, 200, "OK", + {"Content-Type": "text/plain", + "Connection": "keep-alive", + "Keep-Alive": "timeout=30"}, + b"normal keepalive response") + + else: + await self._send_response(writer, 404, "Not Found", + {"Content-Type": "text/plain", "Connection": "close"}, + b"Not found") + + except asyncio.TimeoutError: + print(f"Timeout handling {addr}", file=sys.stderr) + except ConnectionResetError: + print(f"Connection reset by {addr}", file=sys.stderr) + except Exception as e: + print(f"Error handling {addr}: {type(e).__name__}: {e}", file=sys.stderr) + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + print(f"Connection closed for {addr}", file=sys.stderr) + + async def _send_response(self, writer: asyncio.StreamWriter, status: int, status_text: str, + headers: dict, body: bytes): + """Send an HTTP response.""" + response_lines = [f"HTTP/1.1 {status} {status_text}"] + headers["Content-Length"] = str(len(body)) + for key, value in headers.items(): + response_lines.append(f"{key}: {value}") + response_lines.append("") + response_lines.append("") + + response_header = "\r\n".join(response_lines).encode('utf-8') + writer.write(response_header) + writer.write(body) + await writer.drain() + print(f"Sent response: {status} {status_text}, {len(body)} bytes body", file=sys.stderr) + + async def start(self): + """Start the server.""" + self.server = await asyncio.start_server( + self.handle_client, self.host, self.port + ) + print(f"Early response server listening on {self.host}:{self.port}", file=sys.stderr) + + async def serve_forever(self): + """Serve until cancelled.""" + async with self.server: + await self.server.serve_forever() + + +async def main(): + parser = argparse.ArgumentParser(description="HTTP server for testing early response scenarios") + parser.add_argument("--bind", "-b", default="127.0.0.1", help="bind address") + parser.add_argument("--port", "-p", type=int, default=18083, help="bind port") + parser.add_argument("--pidfile", "-pf", help="path to the PID file") + args = parser.parse_args() + + print(f"dummy_http_early_response.py: Starting on {args.bind}:{args.port}", file=sys.stderr) + + server = EarlyResponseServer(args.host if hasattr(args, 'host') else args.bind, args.port) + await server.start() + + if args.pidfile: + dummy_killer.write_pid(args.pidfile) + print(f"PID file written to {args.pidfile}", file=sys.stderr) + + await server.serve_forever() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Shutting down...", file=sys.stderr) + except Exception as e: + print(f"FATAL ERROR: {type(e).__name__}: {e}", file=sys.stderr) + import traceback + traceback.print_exc(file=sys.stderr) + sys.exit(1)