RSPAMD_HTTP_CONN_FLAG_PROXY = 1u << 5u,
RSPAMD_HTTP_CONN_FLAG_PROXY_REQUEST = 1u << 6u,
RSPAMD_HTTP_CONN_OWN_SOCKET = 1u << 7u,
+ RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE = 1u << 8u, /* Server sent early response during write */
};
#define IS_CONN_ENCRYPTED(c) ((c)->flags & RSPAMD_HTTP_CONN_FLAG_ENCRYPTED)
#define IS_CONN_RESETED(c) ((c)->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)
+static gssize rspamd_http_try_read(int fd,
+ struct rspamd_http_connection *conn,
+ struct rspamd_http_connection_private *priv,
+ struct _rspamd_http_privbuf *pbuf,
+ const char **buf_ptr);
+
struct rspamd_http_connection_private {
struct rspamd_http_context *ctx;
struct rspamd_ssl_connection *ssl;
}
if (r == -1) {
+ int saved_errno = errno;
+
if (!priv->ssl) {
- err = g_error_new(HTTP_ERROR, 500, "IO write error: %s", strerror(errno));
+ /* Check if server sent an early response (e.g., 413) */
+ if (saved_errno == EPIPE || saved_errno == ECONNRESET ||
+ saved_errno == ENOTCONN) {
+ struct _rspamd_http_privbuf *pbuf = priv->buf;
+ const char *d;
+ gssize read_res;
+
+ REF_RETAIN(pbuf);
+ read_res = rspamd_http_try_read(conn->fd, conn, priv, pbuf, &d);
+
+ if (read_res > 0) {
+ msg_debug("got early server response (%z bytes) during write, processing",
+ read_res);
+
+ if (http_parser_execute(&priv->parser, &priv->parser_cb,
+ d, read_res) != (size_t) read_res ||
+ priv->parser.http_errno != 0) {
+ err = g_error_new(HTTP_ERROR, 400,
+ "HTTP parser error on early response: %s",
+ http_errno_description(priv->parser.http_errno));
+ rspamd_http_connection_ref(conn);
+ conn->error_handler(conn, err);
+ rspamd_http_connection_unref(conn);
+ g_error_free(err);
+ }
+
+ REF_RELEASE(pbuf);
+ return;
+ }
+
+ REF_RELEASE(pbuf);
+ }
+
+ err = g_error_new(HTTP_ERROR, 500, "IO write error: %s", strerror(saved_errno));
rspamd_http_connection_ref(conn);
conn->error_handler(conn, err);
rspamd_http_connection_unref(conn);
rspamd_http_connection_unref(conn);
}
else {
- /* Plan read message */
- /* Switch to read stage timeout */
- priv->first_write_done = true;
- rspamd_http_simple_client_helper(conn);
+ if (priv->flags & RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE) {
+ /* Early response already being processed */
+ rspamd_ev_watcher_reschedule(priv->ctx->event_loop, &priv->ev, EV_READ);
+ }
+ else {
+ /* Plan read message */
+ priv->first_write_done = true;
+ rspamd_http_simple_client_helper(conn);
+ }
}
}
REF_RETAIN(pbuf);
rspamd_http_connection_ref(conn);
- if (what == EV_READ) {
+ if (what & EV_READ) {
+ /* Check for early response while still sending request */
+ if (priv->wr_pos < priv->wr_total && priv->wr_total > 0) {
+ msg_debug("received early server response while still writing request "
+ "(sent %uz of %uz bytes)",
+ priv->wr_pos, priv->wr_total);
+ priv->wr_pos = priv->wr_total;
+ priv->flags |= RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE;
+ }
+
r = rspamd_http_try_read(fd, conn, priv, pbuf, &d);
if (r > 0) {
return;
}
}
- else if (what == EV_TIMEOUT) {
+ else if (what & EV_TIMEOUT) {
if (!priv->ssl) {
/* Let's try to read from the socket first */
r = rspamd_http_try_read(fd, conn, priv, pbuf, &d);
return;
}
}
- else if (what == EV_WRITE) {
+ else if (what & EV_WRITE) {
rspamd_http_write_helper(conn);
}
rspamd_http_event_handler,
rspamd_http_ssl_err_handler,
conn,
- EV_WRITE);
+ EV_WRITE | EV_READ);
}
}
}
else {
- rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE,
+ /* Watch for both READ and WRITE to detect early server responses */
+ rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE | EV_READ,
rspamd_http_event_handler, conn);
/* Use connect_timeout on initial EV_WRITE stage if provided */
ev_tstamp start_to = (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout);
--- /dev/null
+*** Settings ***
+Test Setup Http Early Response Setup
+Test Teardown Http Early Response Teardown
+Library Process
+Library ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG} ${RSPAMD_TESTDIR}/configs/lua_test.conf
+${MESSAGE} ${RSPAMD_TESTDIR}/messages/spam_message.eml
+${RSPAMD_LUA_SCRIPT} ${RSPAMD_TESTDIR}/lua/http_early_response.lua
+${RSPAMD_SCOPE} Suite
+${RSPAMD_URL_TLD} ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+
+*** Test Cases ***
+HTTP Early Reply
+ [Documentation] Test server sending response before reading request body
+ Scan File ${MESSAGE} test-type=early-reply
+ ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+ # We expect either success (200) or an error - both indicate the client handled
+ # the early response scenario (success = client received response, error = connection issue)
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_EARLY_REPLY_200
+ IF not ${result}
+ Expect Symbol HTTP_EARLY_REPLY_ERROR
+ END
+
+HTTP Early 413 Error
+ [Documentation] Test server sending 413 error before reading request body
+ Scan File ${MESSAGE} test-type=early-413
+ ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_EARLY_413_413
+ IF not ${result}
+ Expect Symbol HTTP_EARLY_413_ERROR
+ END
+
+HTTP Keepalive Early Response
+ [Documentation] Test keepalive with server sending early response
+ Scan File ${MESSAGE} test-type=keepalive-early
+ ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_KEEPALIVE_EARLY_200
+ IF not ${result}
+ Expect Symbol HTTP_KEEPALIVE_EARLY_ERROR
+ END
+
+HTTP Early Reply Coroutine
+ [Documentation] Test early response with coroutine-based HTTP request
+ Scan File ${MESSAGE} test-type=early-coro
+ ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_EARLY_CORO_200
+ IF not ${result}
+ Expect Symbol HTTP_EARLY_CORO_ERROR
+ END
+
+HTTP Normal Request Baseline
+ [Documentation] Baseline test with normal request handling
+ Scan File ${MESSAGE} test-type=normal
+ ... Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+ Expect Symbol HTTP_NORMAL_200
+
+HTTP Keepalive Sequential
+ [Documentation] Test sequential keepalive requests
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_KEEPALIVE_SEQUENTIAL_TEST]}
+ # Should have at least some successful requests
+ Expect Symbol HTTP_KEEPALIVE_SEQ_SUCCESS
+
+HTTP Early Keepalive Stress
+ [Documentation] Stress test mixing early responses with keepalive
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_EARLY_KEEPALIVE_STRESS_TEST]}
+ Expect Symbol HTTP_EARLY_KEEPALIVE_STRESS
+
+HTTP Immediate Close Large Body
+ [Documentation] Test server immediately closing connection during large body send
+ [Tags] aggressive
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_IMMEDIATE_CLOSE_TEST]}
+ # Either we get the 413 response OR an error - both are valid outcomes
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_IMMEDIATE_CLOSE_413
+ IF not ${result}
+ Expect Symbol HTTP_IMMEDIATE_CLOSE_ERROR
+ END
+
+HTTP Slow Response Large Body
+ [Documentation] Test server responding slowly while client sends large body
+ [Tags] aggressive
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_SLOW_RESPONSE_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_SLOW_RESPONSE_200
+ IF not ${result}
+ Expect Symbol HTTP_SLOW_RESPONSE_ERROR
+ END
+
+HTTP Rapid Close Requests
+ [Documentation] Rapid sequential requests to server that closes immediately
+ [Tags] aggressive
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_RAPID_CLOSE_TEST]}
+ Expect Symbol HTTP_RAPID_CLOSE_RESULTS
+
+HTTP Block And Reply
+ [Documentation] TRUE early response test - 512KB body with server not reading
+ [Tags] early-response critical
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_BLOCK_REPLY_TEST]}
+ # Ideal: HTTP_BLOCK_REPLY_413 (got early response)
+ # Acceptable: HTTP_BLOCK_REPLY_ERROR (connection error, but no crash)
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_BLOCK_REPLY_413
+ IF not ${result}
+ Expect Symbol HTTP_BLOCK_REPLY_ERROR
+ END
+
+HTTP Block And Reply Coroutine
+ [Documentation] Coroutine version of block-and-reply test
+ [Tags] early-response critical
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_BLOCK_REPLY_CORO_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_BLOCK_REPLY_CORO_413
+ IF not ${result}
+ Expect Symbol HTTP_BLOCK_REPLY_CORO_ERROR
+ END
+
+HTTP Block Slow Response
+ [Documentation] Server waits 1s then responds - 1MB body
+ [Tags] early-response slow
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_BLOCK_SLOW_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_BLOCK_SLOW_503
+ IF not ${result}
+ Expect Symbol HTTP_BLOCK_SLOW_ERROR
+ END
+
+HTTP Instant Reply
+ [Documentation] Server responds BEFORE reading headers - most aggressive early response
+ [Tags] early-response critical
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = [HTTP_INSTANT_REPLY_TEST]}
+ ${result} = Run Keyword And Return Status Expect Symbol HTTP_INSTANT_REPLY_413
+ IF not ${result}
+ Expect Symbol HTTP_INSTANT_REPLY_ERROR
+ END
+
+*** Keywords ***
+Http Early Response Setup
+ Run Dummy Http Early Response
+ Rspamd Setup
+
+Http Early Response Teardown
+ Rspamd Teardown
+ Dummy Http Early Teardown
Dummy Https Teardown
Terminate Process ${DUMMY_HTTPS_PROC}
Wait For Process ${DUMMY_HTTPS_PROC}
+
+Run Dummy Http Early Response
+ ${result} = Start Process ${RSPAMD_TESTDIR}/util/dummy_http_early_response.py -pf /tmp/dummy_http_early.pid -p 18083
+ ... stderr=/tmp/dummy_http_early.log stdout=/tmp/dummy_http_early.log
+ ${status} ${error} = Run Keyword And Ignore Error Wait Until Created /tmp/dummy_http_early.pid timeout=2 second
+ IF '${status}' == 'FAIL'
+ ${logstatus} ${log} = Run Keyword And Ignore Error Get File /tmp/dummy_http_early.log
+ IF '${logstatus}' == 'PASS'
+ Log dummy_http_early_response.py failed to start. Log output:\n${log} level=ERROR
+ ELSE
+ Log dummy_http_early_response.py failed to start. No log file found at /tmp/dummy_http_early.log level=ERROR
+ END
+ Fail dummy_http_early_response.py did not create PID file in 2 seconds
+ END
+ Export Scoped Variables ${RSPAMD_SCOPE} DUMMY_HTTP_EARLY_PROC=${result}
+
+Dummy Http Early Teardown
+ Terminate Process ${DUMMY_HTTP_EARLY_PROC}
+ Wait For Process ${DUMMY_HTTP_EARLY_PROC}
--- /dev/null
+--[[
+Test HTTP client behavior when server sends early responses.
+
+This tests edge cases where a server responds before the client has finished
+sending the request body (which is allowed by HTTP/1.1 spec).
+
+The test server (dummy_http_early_response.py) runs on port 18083.
+]]
+
+local rspamd_http = require "rspamd_http"
+local rspamd_logger = require "rspamd_logger"
+
+-- Register all possible result symbols upfront
+-- These are the symbols that will be inserted based on HTTP response codes
+local result_symbols = {
+ -- Test 1: Early reply
+ 'HTTP_EARLY_REPLY_ERROR',
+ 'HTTP_EARLY_REPLY_200',
+ 'HTTP_EARLY_REPLY_413',
+ 'HTTP_EARLY_REPLY_500',
+ -- Test 2: Early 413
+ 'HTTP_EARLY_413_ERROR',
+ 'HTTP_EARLY_413_413',
+ 'HTTP_EARLY_413_200',
+ -- Test 3: Keepalive early
+ 'HTTP_KEEPALIVE_EARLY_ERROR',
+ 'HTTP_KEEPALIVE_EARLY_200',
+ -- Test 4: Coroutine early
+ 'HTTP_EARLY_CORO_ERROR',
+ 'HTTP_EARLY_CORO_200',
+ -- Test 5: Normal
+ 'HTTP_NORMAL_ERROR',
+ 'HTTP_NORMAL_200',
+ -- Test 6: Keepalive sequential
+ 'HTTP_KEEPALIVE_SEQ_ERRORS',
+ 'HTTP_KEEPALIVE_SEQ_SUCCESS',
+ -- Test 7: Keepalive stress
+ 'HTTP_EARLY_KEEPALIVE_STRESS',
+ -- Test 8: Immediate close
+ 'HTTP_IMMEDIATE_CLOSE_ERROR',
+ 'HTTP_IMMEDIATE_CLOSE_413',
+ 'HTTP_IMMEDIATE_CLOSE_200',
+ -- Test 9: Slow response
+ 'HTTP_SLOW_RESPONSE_ERROR',
+ 'HTTP_SLOW_RESPONSE_200',
+ -- Test 10: Rapid close
+ 'HTTP_RAPID_CLOSE_RESULTS',
+ -- Test 11: Block and reply
+ 'HTTP_BLOCK_REPLY_ERROR',
+ 'HTTP_BLOCK_REPLY_413',
+ 'HTTP_BLOCK_REPLY_200',
+ -- Test 12: Block and reply coro
+ 'HTTP_BLOCK_REPLY_CORO_ERROR',
+ 'HTTP_BLOCK_REPLY_CORO_413',
+ 'HTTP_BLOCK_REPLY_CORO_200',
+ -- Test 13: Block slow
+ 'HTTP_BLOCK_SLOW_ERROR',
+ 'HTTP_BLOCK_SLOW_503',
+ 'HTTP_BLOCK_SLOW_200',
+ -- Test 14: Instant reply (before headers even read)
+ 'HTTP_INSTANT_REPLY_ERROR',
+ 'HTTP_INSTANT_REPLY_413',
+}
+
+-- Register all result symbols as virtual symbols
+for _, sym_name in ipairs(result_symbols) do
+ rspamd_config:register_symbol({
+ name = sym_name,
+ score = 0.0,
+ type = 'virtual',
+ parent = -1, -- Will be set properly by parent registration
+ })
+end
+
+-- Test 1: Early reply - server responds before reading body
+local function test_early_reply(task)
+ rspamd_logger.errx(task, 'test_early_reply: starting')
+
+ -- Create a large body to increase chance of race condition
+ local body_parts = {}
+ for i = 1, 1000 do
+ body_parts[i] = string.format("line %d: some test data that we are sending\n", i)
+ end
+ local large_body = table.concat(body_parts)
+
+ local function callback(err, code, body)
+ if err then
+ rspamd_logger.errx(task, 'test_early_reply callback error: %s', err)
+ task:insert_result('HTTP_EARLY_REPLY_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_early_reply callback success: code=%s body=%s', code, body)
+ task:insert_result('HTTP_EARLY_REPLY_' .. tostring(code), 1.0, body)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18083/early-reply',
+ task = task,
+ method = 'post',
+ body = large_body,
+ callback = callback,
+ timeout = 5,
+ })
+end
+
+-- Test 2: Early 413 error - server sends error before reading body
+local function test_early_error_413(task)
+ rspamd_logger.errx(task, 'test_early_error_413: starting')
+
+ -- Create a large body
+ local body_parts = {}
+ for i = 1, 1000 do
+ body_parts[i] = string.format("line %d: large body data\n", i)
+ end
+ local large_body = table.concat(body_parts)
+
+ local function callback(err, code, body)
+ if err then
+ rspamd_logger.errx(task, 'test_early_error_413 callback error: %s', err)
+ task:insert_result('HTTP_EARLY_413_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_early_error_413 callback success: code=%s body=%s', code, body)
+ task:insert_result('HTTP_EARLY_413_' .. tostring(code), 1.0, body)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18083/early-error-413',
+ task = task,
+ method = 'post',
+ body = large_body,
+ callback = callback,
+ timeout = 5,
+ })
+end
+
+-- Test 3: Keep-alive with early response
+local function test_keepalive_early(task)
+ rspamd_logger.errx(task, 'test_keepalive_early: starting')
+
+ local body = "test body for keepalive"
+
+ local function callback(err, code, body_response)
+ if err then
+ rspamd_logger.errx(task, 'test_keepalive_early callback error: %s', err)
+ task:insert_result('HTTP_KEEPALIVE_EARLY_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_keepalive_early callback success: code=%s', code)
+ task:insert_result('HTTP_KEEPALIVE_EARLY_' .. tostring(code), 1.0, body_response)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18083/keepalive-early',
+ task = task,
+ method = 'post',
+ body = body,
+ callback = callback,
+ timeout = 5,
+ keepalive = true,
+ })
+end
+
+-- Test 4: Coroutine-based early reply test
+local function test_early_reply_coro(task)
+ rspamd_logger.errx(task, 'test_early_reply_coro: starting')
+
+ local body_parts = {}
+ for i = 1, 500 do
+ body_parts[i] = string.format("coro line %d: test data\n", i)
+ end
+ local body = table.concat(body_parts)
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/early-reply',
+ task = task,
+ method = 'post',
+ body = body,
+ timeout = 5,
+ })
+
+ if err then
+ rspamd_logger.errx(task, 'test_early_reply_coro error: %s', err)
+ task:insert_result('HTTP_EARLY_CORO_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_early_reply_coro success: code=%s', response.code)
+ task:insert_result('HTTP_EARLY_CORO_' .. tostring(response.code), 1.0, response.content)
+ end
+end
+
+-- Test 5: Multiple requests to normal endpoint (baseline)
+local function test_normal_request(task)
+ rspamd_logger.errx(task, 'test_normal_request: starting')
+
+ local function callback(err, code, body)
+ if err then
+ rspamd_logger.errx(task, 'test_normal_request callback error: %s', err)
+ task:insert_result('HTTP_NORMAL_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_normal_request callback success: code=%s', code)
+ task:insert_result('HTTP_NORMAL_' .. tostring(code), 1.0, body)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18083/request',
+ task = task,
+ method = 'post',
+ body = 'normal test body',
+ callback = callback,
+ timeout = 5,
+ })
+end
+
+-- Main test symbol that runs all tests
+local function http_early_response_tests(task)
+ local test_type = tostring(task:get_request_header('test-type') or 'all')
+
+ rspamd_logger.errx(task, 'http_early_response_tests: test_type=%s', test_type)
+
+ if test_type == 'early-reply' or test_type == 'all' then
+ test_early_reply(task)
+ end
+
+ if test_type == 'early-413' or test_type == 'all' then
+ test_early_error_413(task)
+ end
+
+ if test_type == 'keepalive-early' or test_type == 'all' then
+ test_keepalive_early(task)
+ end
+
+ if test_type == 'early-coro' or test_type == 'all' then
+ test_early_reply_coro(task)
+ end
+
+ if test_type == 'normal' or test_type == 'all' then
+ test_normal_request(task)
+ end
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_EARLY_RESPONSE_TEST',
+ score = 1.0,
+ callback = http_early_response_tests,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 6: Sequential keepalive requests (stress test for keepalive pool)
+local function test_keepalive_sequential(task)
+ rspamd_logger.errx(task, 'test_keepalive_sequential: starting')
+
+ local success_count = 0
+ local error_count = 0
+ local errors = {}
+
+ -- Make 3 sequential requests using keepalive
+ for i = 1, 3 do
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/keepalive-normal',
+ task = task,
+ method = 'post',
+ body = string.format('request %d body', i),
+ timeout = 5,
+ keepalive = true,
+ })
+
+ if err then
+ error_count = error_count + 1
+ errors[#errors + 1] = string.format('req%d: %s', i, err)
+ rspamd_logger.errx(task, 'keepalive request %d error: %s', i, err)
+ else
+ success_count = success_count + 1
+ rspamd_logger.errx(task, 'keepalive request %d success: code=%s', i, response.code)
+ end
+ end
+
+ if error_count > 0 then
+ task:insert_result('HTTP_KEEPALIVE_SEQ_ERRORS', 1.0, table.concat(errors, '; '))
+ end
+ task:insert_result('HTTP_KEEPALIVE_SEQ_SUCCESS', 1.0, tostring(success_count))
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_KEEPALIVE_SEQUENTIAL_TEST',
+ score = 1.0,
+ callback = test_keepalive_sequential,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 7: Stress test with early responses + keepalive
+local function test_early_keepalive_stress(task)
+ rspamd_logger.errx(task, 'test_early_keepalive_stress: starting')
+
+ local results = {}
+
+ -- Mix of normal and early-response requests
+ local endpoints = {
+ '/request',
+ '/early-reply',
+ '/request',
+ '/keepalive-early',
+ '/request',
+ }
+
+ for i, endpoint in ipairs(endpoints) do
+ local body_parts = {}
+ for j = 1, 100 do
+ body_parts[j] = string.format("stress test line %d-%d\n", i, j)
+ end
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083' .. endpoint,
+ task = task,
+ method = 'post',
+ body = table.concat(body_parts),
+ timeout = 5,
+ keepalive = true,
+ })
+
+ if err then
+ results[#results + 1] = string.format('%s:err:%s', endpoint, err)
+ else
+ results[#results + 1] = string.format('%s:ok:%d', endpoint, response.code)
+ end
+ end
+
+ task:insert_result('HTTP_EARLY_KEEPALIVE_STRESS', 1.0, table.concat(results, '|'))
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_EARLY_KEEPALIVE_STRESS_TEST',
+ score = 1.0,
+ callback = test_early_keepalive_stress,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 8: Aggressive immediate close with large body
+-- This should trigger actual failures - server closes socket while client writes
+local function test_immediate_close_large(task)
+ rspamd_logger.errx(task, 'test_immediate_close_large: starting')
+
+ -- Create a VERY large body that won't fit in socket buffer
+ -- This forces the client to block on write, at which point server closes
+ local body_parts = {}
+ for i = 1, 10000 do -- ~500KB body
+ body_parts[i] = string.format("line %05d: this is padding data to make the body very large and exceed socket buffers\n", i)
+ end
+ local large_body = table.concat(body_parts)
+ rspamd_logger.errx(task, 'test_immediate_close_large: body size = %d bytes', #large_body)
+
+ local function callback(err, code, body)
+ if err then
+ rspamd_logger.errx(task, 'test_immediate_close_large callback error: %s', err)
+ -- Error is EXPECTED here - we want to see how client handles it
+ task:insert_result('HTTP_IMMEDIATE_CLOSE_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_immediate_close_large callback success: code=%s', code)
+ -- Success means client received the 413 response despite server closing
+ task:insert_result('HTTP_IMMEDIATE_CLOSE_' .. tostring(code), 1.0, body)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18083/immediate-close-413',
+ task = task,
+ method = 'post',
+ body = large_body,
+ callback = callback,
+ timeout = 10,
+ })
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_IMMEDIATE_CLOSE_TEST',
+ score = 1.0,
+ callback = test_immediate_close_large,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 9: Slow response with large body - tests race condition
+local function test_slow_response_large(task)
+ rspamd_logger.errx(task, 'test_slow_response_large: starting')
+
+ local body_parts = {}
+ for i = 1, 5000 do -- ~250KB body
+ body_parts[i] = string.format("slow test line %05d: padding data for the test\n", i)
+ end
+ local body = table.concat(body_parts)
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/slow-response-no-drain',
+ task = task,
+ method = 'post',
+ body = body,
+ timeout = 10,
+ })
+
+ if err then
+ rspamd_logger.errx(task, 'test_slow_response_large error: %s', err)
+ task:insert_result('HTTP_SLOW_RESPONSE_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_slow_response_large success: code=%s', response.code)
+ task:insert_result('HTTP_SLOW_RESPONSE_' .. tostring(response.code), 1.0, response.content)
+ end
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_SLOW_RESPONSE_TEST',
+ score = 1.0,
+ callback = test_slow_response_large,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 10: Multiple rapid requests to immediate-close endpoint
+-- This stresses the connection handling and cleanup
+local function test_rapid_close_requests(task)
+ rspamd_logger.errx(task, 'test_rapid_close_requests: starting')
+
+ local results = {}
+ local body = string.rep("x", 10000) -- 10KB body
+
+ for i = 1, 5 do
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/immediate-close-413',
+ task = task,
+ method = 'post',
+ body = body,
+ timeout = 5,
+ })
+
+ if err then
+ results[#results + 1] = string.format('req%d:err:%s', i, err)
+ else
+ results[#results + 1] = string.format('req%d:ok:%d', i, response.code)
+ end
+ end
+
+ task:insert_result('HTTP_RAPID_CLOSE_RESULTS', 1.0, table.concat(results, '|'))
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_RAPID_CLOSE_TEST',
+ score = 1.0,
+ callback = test_rapid_close_requests,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 11: TRUE early response test with buffer-exceeding body
+-- This test sends a body larger than socket buffers (~256KB+) to ensure
+-- the client actually blocks on write while server sends response
+local function test_block_and_reply(task)
+ rspamd_logger.errx(task, 'test_block_and_reply: starting')
+
+ -- Create body larger than socket buffers (256KB+ for localhost)
+ -- macOS has 128KB send + 128KB receive buffers
+ local body_size = 512 * 1024 -- 512KB should definitely exceed buffers
+ local chunk = string.rep("X", 1024) -- 1KB chunk
+ local body_parts = {}
+ for i = 1, body_size / 1024 do
+ body_parts[i] = chunk
+ end
+ local huge_body = table.concat(body_parts)
+ rspamd_logger.errx(task, 'test_block_and_reply: body size = %d bytes', #huge_body)
+
+ local function callback(err, code, body)
+ if err then
+ rspamd_logger.errx(task, 'test_block_and_reply error: %s', err)
+ task:insert_result('HTTP_BLOCK_REPLY_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_block_and_reply success: code=%s body=%s', code, body)
+ -- This is the IDEAL outcome - we got the early 413 response!
+ task:insert_result('HTTP_BLOCK_REPLY_' .. tostring(code), 1.0, body)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18083/block-and-reply',
+ task = task,
+ method = 'post',
+ body = huge_body,
+ callback = callback,
+ timeout = 10,
+ })
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_BLOCK_REPLY_TEST',
+ score = 1.0,
+ callback = test_block_and_reply,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 12: Coroutine version of block-and-reply
+local function test_block_and_reply_coro(task)
+ rspamd_logger.errx(task, 'test_block_and_reply_coro: starting')
+
+ -- 512KB body
+ local huge_body = string.rep("Y", 512 * 1024)
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/block-and-reply',
+ task = task,
+ method = 'post',
+ body = huge_body,
+ timeout = 10,
+ })
+
+ if err then
+ rspamd_logger.errx(task, 'test_block_and_reply_coro error: %s', err)
+ task:insert_result('HTTP_BLOCK_REPLY_CORO_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_block_and_reply_coro success: code=%s', response.code)
+ task:insert_result('HTTP_BLOCK_REPLY_CORO_' .. tostring(response.code), 1.0, response.content)
+ end
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_BLOCK_REPLY_CORO_TEST',
+ score = 1.0,
+ callback = test_block_and_reply_coro,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 13: Slow block test - server waits even longer
+local function test_block_slow(task)
+ rspamd_logger.errx(task, 'test_block_slow: starting')
+
+ -- 1MB body to really fill things up
+ local huge_body = string.rep("Z", 1024 * 1024)
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/block-and-reply-slow',
+ task = task,
+ method = 'post',
+ body = huge_body,
+ timeout = 15, -- Longer timeout since server waits 1 second
+ })
+
+ if err then
+ rspamd_logger.errx(task, 'test_block_slow error: %s', err)
+ task:insert_result('HTTP_BLOCK_SLOW_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_block_slow success: code=%s', response.code)
+ task:insert_result('HTTP_BLOCK_SLOW_' .. tostring(response.code), 1.0, response.content)
+ end
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_BLOCK_SLOW_TEST',
+ score = 1.0,
+ callback = test_block_slow,
+ no_squeeze = true,
+ flags = 'coro'
+})
+
+-- Test 14: Instant reply - server responds BEFORE reading headers
+-- This is the most aggressive early response test
+local function test_instant_reply(task)
+ rspamd_logger.errx(task, 'test_instant_reply: starting')
+
+ -- 512KB body - server will respond before even reading our headers
+ local huge_body = string.rep("Z", 512 * 1024)
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18083/instant-reply',
+ task = task,
+ method = 'post',
+ body = huge_body,
+ timeout = 10,
+ })
+
+ if err then
+ rspamd_logger.errx(task, 'test_instant_reply error: %s', err)
+ task:insert_result('HTTP_INSTANT_REPLY_ERROR', 1.0, err)
+ else
+ rspamd_logger.errx(task, 'test_instant_reply success: code=%s', response.code)
+ task:insert_result('HTTP_INSTANT_REPLY_' .. tostring(response.code), 1.0, response.content)
+ end
+end
+
+rspamd_config:register_symbol({
+ name = 'HTTP_INSTANT_REPLY_TEST',
+ score = 1.0,
+ callback = test_instant_reply,
+ no_squeeze = true,
+ flags = 'coro'
+})
--- /dev/null
+#!/usr/bin/env python3
+"""
+A deliberately "buggy" HTTP server that sends early responses before
+reading the full client request. This is used to test rspamd's HTTP
+client handling of edge cases that are allowed by HTTP/1.1 spec.
+
+Scenarios implemented:
+1. /early-reply - Send response immediately after reading headers, before body
+2. /early-error-413 - Send 413 error and close after reading just the request line
+3. /early-error-close - Send error and immediately close connection (no keep-alive)
+4. /keepalive-early - Send response early but keep connection alive
+5. /slow-read-fast-reply - Read request very slowly but reply quickly
+"""
+
+import asyncio
+import argparse
+import sys
+import os
+
+# Add parent directory to path for dummy_killer
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+import dummy_killer
+
+
+class EarlyResponseServer:
+ def __init__(self, host: str, port: int):
+ self.host = host
+ self.port = port
+ self.server = None
+
+ async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
+ """Handle a single client connection."""
+ addr = writer.get_extra_info('peername')
+ print(f"Connection from {addr}", file=sys.stderr)
+
+ try:
+ # Read just the request line first
+ request_line = await asyncio.wait_for(reader.readline(), timeout=10.0)
+ if not request_line:
+ return
+
+ request_line = request_line.decode('utf-8', errors='replace').strip()
+ print(f"Request line: {request_line}", file=sys.stderr)
+
+ parts = request_line.split(' ')
+ if len(parts) < 2:
+ return
+
+ method = parts[0]
+ path = parts[1]
+
+ # For /instant-reply, send response BEFORE even reading headers
+ # This is the most aggressive early response - client is still sending headers+body
+ if path == '/instant-reply':
+ print(f"instant-reply: sending 413 BEFORE reading headers!", file=sys.stderr)
+ await self._send_response(writer, 413, "Request Entity Too Large",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Instant 413 - rejected at request line")
+ await writer.drain()
+ print(f"instant-reply: response sent, closing", file=sys.stderr)
+ return # Close without reading anything else
+
+ # Read headers (but we might not read body depending on path)
+ headers = {}
+ content_length = 0
+ while True:
+ line = await asyncio.wait_for(reader.readline(), timeout=10.0)
+ if not line or line == b'\r\n' or line == b'\n':
+ break
+ line = line.decode('utf-8', errors='replace').strip()
+ if ':' in line:
+ key, value = line.split(':', 1)
+ headers[key.strip().lower()] = value.strip()
+ if key.strip().lower() == 'content-length':
+ content_length = int(value.strip())
+
+ print(f"Headers received, Content-Length: {content_length}", file=sys.stderr)
+
+ # Handle different test scenarios based on path
+ if path == '/early-reply':
+ # Send response immediately WITHOUT reading body
+ # This tests client handling when server replies before body is fully sent
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"early reply - body not read")
+ # Now try to drain remaining data
+ if content_length > 0:
+ try:
+ remaining = await asyncio.wait_for(reader.read(content_length), timeout=1.0)
+ print(f"Drained {len(remaining)} bytes of body", file=sys.stderr)
+ except asyncio.TimeoutError:
+ print("Timeout draining body", file=sys.stderr)
+
+ elif path == '/early-error-413':
+ # Send 413 immediately and close - simulates "request too large"
+ # Don't even try to read the body
+ await self._send_response(writer, 413, "Request Entity Too Large",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Request body too large")
+ # Close immediately without reading body
+
+ elif path == '/early-error-close':
+ # Send error and close connection abruptly
+ await self._send_response(writer, 500, "Internal Server Error",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Server error - closing")
+
+ elif path == '/keepalive-early':
+ # Send response early but indicate keep-alive
+ # This tests if client can handle early response + continue with keepalive
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain",
+ "Connection": "keep-alive",
+ "Keep-Alive": "timeout=30"},
+ b"early keepalive response")
+ # Read the body to properly maintain connection state
+ if content_length > 0:
+ body = await asyncio.wait_for(reader.read(content_length), timeout=5.0)
+ print(f"Read body: {len(body)} bytes", file=sys.stderr)
+
+ # Wait for potential next request on keep-alive connection
+ try:
+ next_line = await asyncio.wait_for(reader.readline(), timeout=5.0)
+ if next_line:
+ print(f"Keep-alive follow-up: {next_line}", file=sys.stderr)
+ # Handle second request simply
+ while True:
+ line = await asyncio.wait_for(reader.readline(), timeout=5.0)
+ if not line or line == b'\r\n':
+ break
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"second response on keepalive")
+ except asyncio.TimeoutError:
+ print("No follow-up request on keepalive", file=sys.stderr)
+
+ elif path == '/slow-read-fast-reply':
+ # Start reading body very slowly, but send reply quickly
+ # This creates a race condition
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"fast reply while slowly reading")
+ # Now slowly read
+ if content_length > 0:
+ read_so_far = 0
+ while read_so_far < content_length:
+ chunk = await reader.read(min(100, content_length - read_so_far))
+ if not chunk:
+ break
+ read_so_far += len(chunk)
+ await asyncio.sleep(0.1) # Slow down
+
+ elif path == '/partial-read-then-reply':
+ # Read only part of the body, then send response
+ if content_length > 0:
+ # Read only first 100 bytes
+ partial = await reader.read(min(100, content_length))
+ print(f"Read partial body: {len(partial)} bytes", file=sys.stderr)
+
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"response after partial body read")
+
+ elif path == '/immediate-close-413':
+ # Send 413 and close socket IMMEDIATELY without reading anything else
+ # This is the most aggressive case - RST may be sent if client is still writing
+ await self._send_response(writer, 413, "Request Entity Too Large",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Too large - closing immediately")
+ # Force close without draining - this should cause client write to fail
+ writer.transport.abort() # Forceful close
+ return # Skip normal close
+
+ elif path == '/block-and-reply':
+ # TRUE early response test:
+ # Send response IMMEDIATELY after headers, before body starts.
+ # The client should receive this while still preparing/sending body.
+ print(f"block-and-reply: sending 413 IMMEDIATELY (content-length={content_length})", file=sys.stderr)
+ # Send response right away - no waiting
+ await self._send_response(writer, 413, "Request Entity Too Large",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Early 413 - body was never read")
+ # Flush immediately
+ await writer.drain()
+ print(f"block-and-reply: response sent, closing without reading body", file=sys.stderr)
+ # Don't read body, just close
+
+ elif path == '/block-and-reply-slow':
+ # Even more aggressive: wait longer to let more data queue up
+ print(f"block-and-reply-slow: waiting for client to fill buffers", file=sys.stderr)
+ await asyncio.sleep(1.0) # Wait for client to really fill up buffers
+ await self._send_response(writer, 503, "Service Unavailable",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Server busy - your data was ignored")
+ await asyncio.sleep(0.1)
+
+ elif path == '/slow-response-no-drain':
+ # Wait a bit (let client send more data), then respond without reading
+ await asyncio.sleep(0.5) # Let client start/continue sending
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"slow response - body not read")
+ # Don't drain - close with data potentially still in flight
+
+ elif path == '/request':
+ # Normal request handling for comparison
+ if content_length > 0:
+ body = await asyncio.wait_for(reader.read(content_length), timeout=10.0)
+ print(f"Read full body: {len(body)} bytes", file=sys.stderr)
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"normal response")
+
+ elif path == '/keepalive-normal':
+ # Normal keep-alive handling for comparison
+ if content_length > 0:
+ body = await asyncio.wait_for(reader.read(content_length), timeout=10.0)
+ await self._send_response(writer, 200, "OK",
+ {"Content-Type": "text/plain",
+ "Connection": "keep-alive",
+ "Keep-Alive": "timeout=30"},
+ b"normal keepalive response")
+
+ else:
+ await self._send_response(writer, 404, "Not Found",
+ {"Content-Type": "text/plain", "Connection": "close"},
+ b"Not found")
+
+ except asyncio.TimeoutError:
+ print(f"Timeout handling {addr}", file=sys.stderr)
+ except ConnectionResetError:
+ print(f"Connection reset by {addr}", file=sys.stderr)
+ except Exception as e:
+ print(f"Error handling {addr}: {type(e).__name__}: {e}", file=sys.stderr)
+ finally:
+ try:
+ writer.close()
+ await writer.wait_closed()
+ except Exception:
+ pass
+ print(f"Connection closed for {addr}", file=sys.stderr)
+
+ async def _send_response(self, writer: asyncio.StreamWriter, status: int, status_text: str,
+ headers: dict, body: bytes):
+ """Send an HTTP response."""
+ response_lines = [f"HTTP/1.1 {status} {status_text}"]
+ headers["Content-Length"] = str(len(body))
+ for key, value in headers.items():
+ response_lines.append(f"{key}: {value}")
+ response_lines.append("")
+ response_lines.append("")
+
+ response_header = "\r\n".join(response_lines).encode('utf-8')
+ writer.write(response_header)
+ writer.write(body)
+ await writer.drain()
+ print(f"Sent response: {status} {status_text}, {len(body)} bytes body", file=sys.stderr)
+
+ async def start(self):
+ """Start the server."""
+ self.server = await asyncio.start_server(
+ self.handle_client, self.host, self.port
+ )
+ print(f"Early response server listening on {self.host}:{self.port}", file=sys.stderr)
+
+ async def serve_forever(self):
+ """Serve until cancelled."""
+ async with self.server:
+ await self.server.serve_forever()
+
+
+async def main():
+ parser = argparse.ArgumentParser(description="HTTP server for testing early response scenarios")
+ parser.add_argument("--bind", "-b", default="127.0.0.1", help="bind address")
+ parser.add_argument("--port", "-p", type=int, default=18083, help="bind port")
+ parser.add_argument("--pidfile", "-pf", help="path to the PID file")
+ args = parser.parse_args()
+
+ print(f"dummy_http_early_response.py: Starting on {args.bind}:{args.port}", file=sys.stderr)
+
+ server = EarlyResponseServer(args.host if hasattr(args, 'host') else args.bind, args.port)
+ await server.start()
+
+ if args.pidfile:
+ dummy_killer.write_pid(args.pidfile)
+ print(f"PID file written to {args.pidfile}", file=sys.stderr)
+
+ await server.serve_forever()
+
+
+if __name__ == "__main__":
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ print("Shutting down...", file=sys.stderr)
+ except Exception as e:
+ print(f"FATAL ERROR: {type(e).__name__}: {e}", file=sys.stderr)
+ import traceback
+ traceback.print_exc(file=sys.stderr)
+ sys.exit(1)