]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Handle HTTP early server responses during request write
authorVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 8 Dec 2025 17:21:03 +0000 (17:21 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 8 Dec 2025 17:21:03 +0000 (17:21 +0000)
Fix HTTP client to properly handle early server responses (e.g., 413
Too Large) that arrive before the client has finished sending the
request body. This is allowed by HTTP/1.1 (RFC 7230 Section 6.5).

- Use bitwise AND for event flag checks to handle combined EV_READ|EV_WRITE
- Watch for both READ and WRITE events during write phase
- Check for early response on write errors (EPIPE, ECONNRESET)
- Add RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE flag to track state

src/libserver/http/http_connection.c
test/functional/cases/221_http_early_response.robot [new file with mode: 0644]
test/functional/lib/rspamd.robot
test/functional/lua/http_early_response.lua [new file with mode: 0644]
test/functional/util/dummy_http_early_response.py [new file with mode: 0755]

index 70158e294b3e0fa1ca80d0609565de45c776170d..6e7bea1025b49fecd43ab2b8011263704d740d56 100644 (file)
@@ -53,11 +53,18 @@ enum rspamd_http_priv_flags {
        RSPAMD_HTTP_CONN_FLAG_PROXY = 1u << 5u,
        RSPAMD_HTTP_CONN_FLAG_PROXY_REQUEST = 1u << 6u,
        RSPAMD_HTTP_CONN_OWN_SOCKET = 1u << 7u,
+       RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE = 1u << 8u, /* Server sent early response during write */
 };
 
 #define IS_CONN_ENCRYPTED(c) ((c)->flags & RSPAMD_HTTP_CONN_FLAG_ENCRYPTED)
 #define IS_CONN_RESETED(c) ((c)->flags & RSPAMD_HTTP_CONN_FLAG_RESETED)
 
+static gssize rspamd_http_try_read(int fd,
+                                                                  struct rspamd_http_connection *conn,
+                                                                  struct rspamd_http_connection_private *priv,
+                                                                  struct _rspamd_http_privbuf *pbuf,
+                                                                  const char **buf_ptr);
+
 struct rspamd_http_connection_private {
        struct rspamd_http_context *ctx;
        struct rspamd_ssl_connection *ssl;
@@ -826,8 +833,43 @@ rspamd_http_write_helper(struct rspamd_http_connection *conn)
        }
 
        if (r == -1) {
+               int saved_errno = errno;
+
                if (!priv->ssl) {
-                       err = g_error_new(HTTP_ERROR, 500, "IO write error: %s", strerror(errno));
+                       /* Check if server sent an early response (e.g., 413) */
+                       if (saved_errno == EPIPE || saved_errno == ECONNRESET ||
+                               saved_errno == ENOTCONN) {
+                               struct _rspamd_http_privbuf *pbuf = priv->buf;
+                               const char *d;
+                               gssize read_res;
+
+                               REF_RETAIN(pbuf);
+                               read_res = rspamd_http_try_read(conn->fd, conn, priv, pbuf, &d);
+
+                               if (read_res > 0) {
+                                       msg_debug("got early server response (%z bytes) during write, processing",
+                                                         read_res);
+
+                                       if (http_parser_execute(&priv->parser, &priv->parser_cb,
+                                                                                       d, read_res) != (size_t) read_res ||
+                                               priv->parser.http_errno != 0) {
+                                               err = g_error_new(HTTP_ERROR, 400,
+                                                                                 "HTTP parser error on early response: %s",
+                                                                                 http_errno_description(priv->parser.http_errno));
+                                               rspamd_http_connection_ref(conn);
+                                               conn->error_handler(conn, err);
+                                               rspamd_http_connection_unref(conn);
+                                               g_error_free(err);
+                                       }
+
+                                       REF_RELEASE(pbuf);
+                                       return;
+                               }
+
+                               REF_RELEASE(pbuf);
+                       }
+
+                       err = g_error_new(HTTP_ERROR, 500, "IO write error: %s", strerror(saved_errno));
                        rspamd_http_connection_ref(conn);
                        conn->error_handler(conn, err);
                        rspamd_http_connection_unref(conn);
@@ -866,10 +908,15 @@ call_finish_handler:
                rspamd_http_connection_unref(conn);
        }
        else {
-               /* Plan read message */
-               /* Switch to read stage timeout */
-               priv->first_write_done = true;
-               rspamd_http_simple_client_helper(conn);
+               if (priv->flags & RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE) {
+                       /* Early response already being processed */
+                       rspamd_ev_watcher_reschedule(priv->ctx->event_loop, &priv->ev, EV_READ);
+               }
+               else {
+                       /* Plan read message */
+                       priv->first_write_done = true;
+                       rspamd_http_simple_client_helper(conn);
+               }
        }
 }
 
@@ -955,7 +1002,16 @@ rspamd_http_event_handler(int fd, short what, gpointer ud)
        REF_RETAIN(pbuf);
        rspamd_http_connection_ref(conn);
 
-       if (what == EV_READ) {
+       if (what & EV_READ) {
+               /* Check for early response while still sending request */
+               if (priv->wr_pos < priv->wr_total && priv->wr_total > 0) {
+                       msg_debug("received early server response while still writing request "
+                                         "(sent %uz of %uz bytes)",
+                                         priv->wr_pos, priv->wr_total);
+                       priv->wr_pos = priv->wr_total;
+                       priv->flags |= RSPAMD_HTTP_CONN_FLAG_EARLY_RESPONSE;
+               }
+
                r = rspamd_http_try_read(fd, conn, priv, pbuf, &d);
 
                if (r > 0) {
@@ -1039,7 +1095,7 @@ rspamd_http_event_handler(int fd, short what, gpointer ud)
                        return;
                }
        }
-       else if (what == EV_TIMEOUT) {
+       else if (what & EV_TIMEOUT) {
                if (!priv->ssl) {
                        /* Let's try to read from the socket first */
                        r = rspamd_http_try_read(fd, conn, priv, pbuf, &d);
@@ -1087,7 +1143,7 @@ rspamd_http_event_handler(int fd, short what, gpointer ud)
                        return;
                }
        }
-       else if (what == EV_WRITE) {
+       else if (what & EV_WRITE) {
                rspamd_http_write_helper(conn);
        }
 
@@ -2440,12 +2496,13 @@ if (conn->opts & RSPAMD_HTTP_CLIENT_SSL) {
                                                                                                   rspamd_http_event_handler,
                                                                                                   rspamd_http_ssl_err_handler,
                                                                                                   conn,
-                                                                                                  EV_WRITE);
+                                                                                                  EV_WRITE | EV_READ);
                }
        }
 }
 else {
-       rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE,
+       /* Watch for both READ and WRITE to detect early server responses */
+       rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE | EV_READ,
                                                   rspamd_http_event_handler, conn);
        /* Use connect_timeout on initial EV_WRITE stage if provided */
        ev_tstamp start_to = (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout);
diff --git a/test/functional/cases/221_http_early_response.robot b/test/functional/cases/221_http_early_response.robot
new file mode 100644 (file)
index 0000000..06f0a7a
--- /dev/null
@@ -0,0 +1,151 @@
+*** Settings ***
+Test Setup      Http Early Response Setup
+Test Teardown   Http Early Response Teardown
+Library         Process
+Library         ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource        ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables       ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG}              ${RSPAMD_TESTDIR}/configs/lua_test.conf
+${MESSAGE}             ${RSPAMD_TESTDIR}/messages/spam_message.eml
+${RSPAMD_LUA_SCRIPT}   ${RSPAMD_TESTDIR}/lua/http_early_response.lua
+${RSPAMD_SCOPE}        Suite
+${RSPAMD_URL_TLD}      ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+
+*** Test Cases ***
+HTTP Early Reply
+  [Documentation]  Test server sending response before reading request body
+  Scan File  ${MESSAGE}  test-type=early-reply
+  ...  Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+  # We expect either success (200) or an error - both indicate the client handled
+  # the early response scenario (success = client received response, error = connection issue)
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_EARLY_REPLY_200
+  IF  not ${result}
+    Expect Symbol  HTTP_EARLY_REPLY_ERROR
+  END
+
+HTTP Early 413 Error
+  [Documentation]  Test server sending 413 error before reading request body
+  Scan File  ${MESSAGE}  test-type=early-413
+  ...  Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_EARLY_413_413
+  IF  not ${result}
+    Expect Symbol  HTTP_EARLY_413_ERROR
+  END
+
+HTTP Keepalive Early Response
+  [Documentation]  Test keepalive with server sending early response
+  Scan File  ${MESSAGE}  test-type=keepalive-early
+  ...  Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_KEEPALIVE_EARLY_200
+  IF  not ${result}
+    Expect Symbol  HTTP_KEEPALIVE_EARLY_ERROR
+  END
+
+HTTP Early Reply Coroutine
+  [Documentation]  Test early response with coroutine-based HTTP request
+  Scan File  ${MESSAGE}  test-type=early-coro
+  ...  Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_EARLY_CORO_200
+  IF  not ${result}
+    Expect Symbol  HTTP_EARLY_CORO_ERROR
+  END
+
+HTTP Normal Request Baseline
+  [Documentation]  Baseline test with normal request handling
+  Scan File  ${MESSAGE}  test-type=normal
+  ...  Settings={symbols_enabled = [HTTP_EARLY_RESPONSE_TEST]}
+  Expect Symbol  HTTP_NORMAL_200
+
+HTTP Keepalive Sequential
+  [Documentation]  Test sequential keepalive requests
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_KEEPALIVE_SEQUENTIAL_TEST]}
+  # Should have at least some successful requests
+  Expect Symbol  HTTP_KEEPALIVE_SEQ_SUCCESS
+
+HTTP Early Keepalive Stress
+  [Documentation]  Stress test mixing early responses with keepalive
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_EARLY_KEEPALIVE_STRESS_TEST]}
+  Expect Symbol  HTTP_EARLY_KEEPALIVE_STRESS
+
+HTTP Immediate Close Large Body
+  [Documentation]  Test server immediately closing connection during large body send
+  [Tags]  aggressive
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_IMMEDIATE_CLOSE_TEST]}
+  # Either we get the 413 response OR an error - both are valid outcomes
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_IMMEDIATE_CLOSE_413
+  IF  not ${result}
+    Expect Symbol  HTTP_IMMEDIATE_CLOSE_ERROR
+  END
+
+HTTP Slow Response Large Body
+  [Documentation]  Test server responding slowly while client sends large body
+  [Tags]  aggressive
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_SLOW_RESPONSE_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_SLOW_RESPONSE_200
+  IF  not ${result}
+    Expect Symbol  HTTP_SLOW_RESPONSE_ERROR
+  END
+
+HTTP Rapid Close Requests
+  [Documentation]  Rapid sequential requests to server that closes immediately
+  [Tags]  aggressive
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_RAPID_CLOSE_TEST]}
+  Expect Symbol  HTTP_RAPID_CLOSE_RESULTS
+
+HTTP Block And Reply
+  [Documentation]  TRUE early response test - 512KB body with server not reading
+  [Tags]  early-response  critical
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_BLOCK_REPLY_TEST]}
+  # Ideal: HTTP_BLOCK_REPLY_413 (got early response)
+  # Acceptable: HTTP_BLOCK_REPLY_ERROR (connection error, but no crash)
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_BLOCK_REPLY_413
+  IF  not ${result}
+    Expect Symbol  HTTP_BLOCK_REPLY_ERROR
+  END
+
+HTTP Block And Reply Coroutine
+  [Documentation]  Coroutine version of block-and-reply test
+  [Tags]  early-response  critical
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_BLOCK_REPLY_CORO_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_BLOCK_REPLY_CORO_413
+  IF  not ${result}
+    Expect Symbol  HTTP_BLOCK_REPLY_CORO_ERROR
+  END
+
+HTTP Block Slow Response
+  [Documentation]  Server waits 1s then responds - 1MB body
+  [Tags]  early-response  slow
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_BLOCK_SLOW_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_BLOCK_SLOW_503
+  IF  not ${result}
+    Expect Symbol  HTTP_BLOCK_SLOW_ERROR
+  END
+
+HTTP Instant Reply
+  [Documentation]  Server responds BEFORE reading headers - most aggressive early response
+  [Tags]  early-response  critical
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [HTTP_INSTANT_REPLY_TEST]}
+  ${result} =  Run Keyword And Return Status  Expect Symbol  HTTP_INSTANT_REPLY_413
+  IF  not ${result}
+    Expect Symbol  HTTP_INSTANT_REPLY_ERROR
+  END
+
+*** Keywords ***
+Http Early Response Setup
+  Run Dummy Http Early Response
+  Rspamd Setup
+
+Http Early Response Teardown
+  Rspamd Teardown
+  Dummy Http Early Teardown
index 4fa699520295b08dafd0b022d3d46937356a9fac..aa6a2f02cdb78eea62cb6be10cd4189e6bb8efb2 100644 (file)
@@ -551,3 +551,22 @@ Dummy Http Teardown
 Dummy Https Teardown
   Terminate Process  ${DUMMY_HTTPS_PROC}
   Wait For Process  ${DUMMY_HTTPS_PROC}
+
+Run Dummy Http Early Response
+  ${result} =  Start Process  ${RSPAMD_TESTDIR}/util/dummy_http_early_response.py  -pf  /tmp/dummy_http_early.pid  -p  18083
+  ...  stderr=/tmp/dummy_http_early.log  stdout=/tmp/dummy_http_early.log
+  ${status}  ${error} =  Run Keyword And Ignore Error  Wait Until Created  /tmp/dummy_http_early.pid  timeout=2 second
+  IF  '${status}' == 'FAIL'
+    ${logstatus}  ${log} =  Run Keyword And Ignore Error  Get File  /tmp/dummy_http_early.log
+    IF  '${logstatus}' == 'PASS'
+      Log  dummy_http_early_response.py failed to start. Log output:\n${log}  level=ERROR
+    ELSE
+      Log  dummy_http_early_response.py failed to start. No log file found at /tmp/dummy_http_early.log  level=ERROR
+    END
+    Fail  dummy_http_early_response.py did not create PID file in 2 seconds
+  END
+  Export Scoped Variables  ${RSPAMD_SCOPE}  DUMMY_HTTP_EARLY_PROC=${result}
+
+Dummy Http Early Teardown
+  Terminate Process  ${DUMMY_HTTP_EARLY_PROC}
+  Wait For Process  ${DUMMY_HTTP_EARLY_PROC}
diff --git a/test/functional/lua/http_early_response.lua b/test/functional/lua/http_early_response.lua
new file mode 100644 (file)
index 0000000..fba8d30
--- /dev/null
@@ -0,0 +1,596 @@
+--[[
+Test HTTP client behavior when server sends early responses.
+
+This tests edge cases where a server responds before the client has finished
+sending the request body (which is allowed by HTTP/1.1 spec).
+
+The test server (dummy_http_early_response.py) runs on port 18083.
+]]
+
+local rspamd_http = require "rspamd_http"
+local rspamd_logger = require "rspamd_logger"
+
+-- Register all possible result symbols upfront
+-- These are the symbols that will be inserted based on HTTP response codes
+local result_symbols = {
+  -- Test 1: Early reply
+  'HTTP_EARLY_REPLY_ERROR',
+  'HTTP_EARLY_REPLY_200',
+  'HTTP_EARLY_REPLY_413',
+  'HTTP_EARLY_REPLY_500',
+  -- Test 2: Early 413
+  'HTTP_EARLY_413_ERROR',
+  'HTTP_EARLY_413_413',
+  'HTTP_EARLY_413_200',
+  -- Test 3: Keepalive early
+  'HTTP_KEEPALIVE_EARLY_ERROR',
+  'HTTP_KEEPALIVE_EARLY_200',
+  -- Test 4: Coroutine early
+  'HTTP_EARLY_CORO_ERROR',
+  'HTTP_EARLY_CORO_200',
+  -- Test 5: Normal
+  'HTTP_NORMAL_ERROR',
+  'HTTP_NORMAL_200',
+  -- Test 6: Keepalive sequential
+  'HTTP_KEEPALIVE_SEQ_ERRORS',
+  'HTTP_KEEPALIVE_SEQ_SUCCESS',
+  -- Test 7: Keepalive stress
+  'HTTP_EARLY_KEEPALIVE_STRESS',
+  -- Test 8: Immediate close
+  'HTTP_IMMEDIATE_CLOSE_ERROR',
+  'HTTP_IMMEDIATE_CLOSE_413',
+  'HTTP_IMMEDIATE_CLOSE_200',
+  -- Test 9: Slow response
+  'HTTP_SLOW_RESPONSE_ERROR',
+  'HTTP_SLOW_RESPONSE_200',
+  -- Test 10: Rapid close
+  'HTTP_RAPID_CLOSE_RESULTS',
+  -- Test 11: Block and reply
+  'HTTP_BLOCK_REPLY_ERROR',
+  'HTTP_BLOCK_REPLY_413',
+  'HTTP_BLOCK_REPLY_200',
+  -- Test 12: Block and reply coro
+  'HTTP_BLOCK_REPLY_CORO_ERROR',
+  'HTTP_BLOCK_REPLY_CORO_413',
+  'HTTP_BLOCK_REPLY_CORO_200',
+  -- Test 13: Block slow
+  'HTTP_BLOCK_SLOW_ERROR',
+  'HTTP_BLOCK_SLOW_503',
+  'HTTP_BLOCK_SLOW_200',
+  -- Test 14: Instant reply (before headers even read)
+  'HTTP_INSTANT_REPLY_ERROR',
+  'HTTP_INSTANT_REPLY_413',
+}
+
+-- Register all result symbols as virtual symbols
+for _, sym_name in ipairs(result_symbols) do
+  rspamd_config:register_symbol({
+    name = sym_name,
+    score = 0.0,
+    type = 'virtual',
+    parent = -1,  -- Will be set properly by parent registration
+  })
+end
+
+-- Test 1: Early reply - server responds before reading body
+local function test_early_reply(task)
+  rspamd_logger.errx(task, 'test_early_reply: starting')
+
+  -- Create a large body to increase chance of race condition
+  local body_parts = {}
+  for i = 1, 1000 do
+    body_parts[i] = string.format("line %d: some test data that we are sending\n", i)
+  end
+  local large_body = table.concat(body_parts)
+
+  local function callback(err, code, body)
+    if err then
+      rspamd_logger.errx(task, 'test_early_reply callback error: %s', err)
+      task:insert_result('HTTP_EARLY_REPLY_ERROR', 1.0, err)
+    else
+      rspamd_logger.errx(task, 'test_early_reply callback success: code=%s body=%s', code, body)
+      task:insert_result('HTTP_EARLY_REPLY_' .. tostring(code), 1.0, body)
+    end
+  end
+
+  rspamd_http.request({
+    url = 'http://127.0.0.1:18083/early-reply',
+    task = task,
+    method = 'post',
+    body = large_body,
+    callback = callback,
+    timeout = 5,
+  })
+end
+
+-- Test 2: Early 413 error - server sends error before reading body
+local function test_early_error_413(task)
+  rspamd_logger.errx(task, 'test_early_error_413: starting')
+
+  -- Create a large body
+  local body_parts = {}
+  for i = 1, 1000 do
+    body_parts[i] = string.format("line %d: large body data\n", i)
+  end
+  local large_body = table.concat(body_parts)
+
+  local function callback(err, code, body)
+    if err then
+      rspamd_logger.errx(task, 'test_early_error_413 callback error: %s', err)
+      task:insert_result('HTTP_EARLY_413_ERROR', 1.0, err)
+    else
+      rspamd_logger.errx(task, 'test_early_error_413 callback success: code=%s body=%s', code, body)
+      task:insert_result('HTTP_EARLY_413_' .. tostring(code), 1.0, body)
+    end
+  end
+
+  rspamd_http.request({
+    url = 'http://127.0.0.1:18083/early-error-413',
+    task = task,
+    method = 'post',
+    body = large_body,
+    callback = callback,
+    timeout = 5,
+  })
+end
+
+-- Test 3: Keep-alive with early response
+local function test_keepalive_early(task)
+  rspamd_logger.errx(task, 'test_keepalive_early: starting')
+
+  local body = "test body for keepalive"
+
+  local function callback(err, code, body_response)
+    if err then
+      rspamd_logger.errx(task, 'test_keepalive_early callback error: %s', err)
+      task:insert_result('HTTP_KEEPALIVE_EARLY_ERROR', 1.0, err)
+    else
+      rspamd_logger.errx(task, 'test_keepalive_early callback success: code=%s', code)
+      task:insert_result('HTTP_KEEPALIVE_EARLY_' .. tostring(code), 1.0, body_response)
+    end
+  end
+
+  rspamd_http.request({
+    url = 'http://127.0.0.1:18083/keepalive-early',
+    task = task,
+    method = 'post',
+    body = body,
+    callback = callback,
+    timeout = 5,
+    keepalive = true,
+  })
+end
+
+-- Test 4: Coroutine-based early reply test
+local function test_early_reply_coro(task)
+  rspamd_logger.errx(task, 'test_early_reply_coro: starting')
+
+  local body_parts = {}
+  for i = 1, 500 do
+    body_parts[i] = string.format("coro line %d: test data\n", i)
+  end
+  local body = table.concat(body_parts)
+
+  local err, response = rspamd_http.request({
+    url = 'http://127.0.0.1:18083/early-reply',
+    task = task,
+    method = 'post',
+    body = body,
+    timeout = 5,
+  })
+
+  if err then
+    rspamd_logger.errx(task, 'test_early_reply_coro error: %s', err)
+    task:insert_result('HTTP_EARLY_CORO_ERROR', 1.0, err)
+  else
+    rspamd_logger.errx(task, 'test_early_reply_coro success: code=%s', response.code)
+    task:insert_result('HTTP_EARLY_CORO_' .. tostring(response.code), 1.0, response.content)
+  end
+end
+
+-- Test 5: Multiple requests to normal endpoint (baseline)
+local function test_normal_request(task)
+  rspamd_logger.errx(task, 'test_normal_request: starting')
+
+  local function callback(err, code, body)
+    if err then
+      rspamd_logger.errx(task, 'test_normal_request callback error: %s', err)
+      task:insert_result('HTTP_NORMAL_ERROR', 1.0, err)
+    else
+      rspamd_logger.errx(task, 'test_normal_request callback success: code=%s', code)
+      task:insert_result('HTTP_NORMAL_' .. tostring(code), 1.0, body)
+    end
+  end
+
+  rspamd_http.request({
+    url = 'http://127.0.0.1:18083/request',
+    task = task,
+    method = 'post',
+    body = 'normal test body',
+    callback = callback,
+    timeout = 5,
+  })
+end
+
+-- Main test symbol that runs all tests
+local function http_early_response_tests(task)
+  local test_type = tostring(task:get_request_header('test-type') or 'all')
+
+  rspamd_logger.errx(task, 'http_early_response_tests: test_type=%s', test_type)
+
+  if test_type == 'early-reply' or test_type == 'all' then
+    test_early_reply(task)
+  end
+
+  if test_type == 'early-413' or test_type == 'all' then
+    test_early_error_413(task)
+  end
+
+  if test_type == 'keepalive-early' or test_type == 'all' then
+    test_keepalive_early(task)
+  end
+
+  if test_type == 'early-coro' or test_type == 'all' then
+    test_early_reply_coro(task)
+  end
+
+  if test_type == 'normal' or test_type == 'all' then
+    test_normal_request(task)
+  end
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_EARLY_RESPONSE_TEST',
+  score = 1.0,
+  callback = http_early_response_tests,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 6: Sequential keepalive requests (stress test for keepalive pool)
+local function test_keepalive_sequential(task)
+  rspamd_logger.errx(task, 'test_keepalive_sequential: starting')
+
+  local success_count = 0
+  local error_count = 0
+  local errors = {}
+
+  -- Make 3 sequential requests using keepalive
+  for i = 1, 3 do
+    local err, response = rspamd_http.request({
+      url = 'http://127.0.0.1:18083/keepalive-normal',
+      task = task,
+      method = 'post',
+      body = string.format('request %d body', i),
+      timeout = 5,
+      keepalive = true,
+    })
+
+    if err then
+      error_count = error_count + 1
+      errors[#errors + 1] = string.format('req%d: %s', i, err)
+      rspamd_logger.errx(task, 'keepalive request %d error: %s', i, err)
+    else
+      success_count = success_count + 1
+      rspamd_logger.errx(task, 'keepalive request %d success: code=%s', i, response.code)
+    end
+  end
+
+  if error_count > 0 then
+    task:insert_result('HTTP_KEEPALIVE_SEQ_ERRORS', 1.0, table.concat(errors, '; '))
+  end
+  task:insert_result('HTTP_KEEPALIVE_SEQ_SUCCESS', 1.0, tostring(success_count))
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_KEEPALIVE_SEQUENTIAL_TEST',
+  score = 1.0,
+  callback = test_keepalive_sequential,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 7: Stress test with early responses + keepalive
+local function test_early_keepalive_stress(task)
+  rspamd_logger.errx(task, 'test_early_keepalive_stress: starting')
+
+  local results = {}
+
+  -- Mix of normal and early-response requests
+  local endpoints = {
+    '/request',
+    '/early-reply',
+    '/request',
+    '/keepalive-early',
+    '/request',
+  }
+
+  for i, endpoint in ipairs(endpoints) do
+    local body_parts = {}
+    for j = 1, 100 do
+      body_parts[j] = string.format("stress test line %d-%d\n", i, j)
+    end
+
+    local err, response = rspamd_http.request({
+      url = 'http://127.0.0.1:18083' .. endpoint,
+      task = task,
+      method = 'post',
+      body = table.concat(body_parts),
+      timeout = 5,
+      keepalive = true,
+    })
+
+    if err then
+      results[#results + 1] = string.format('%s:err:%s', endpoint, err)
+    else
+      results[#results + 1] = string.format('%s:ok:%d', endpoint, response.code)
+    end
+  end
+
+  task:insert_result('HTTP_EARLY_KEEPALIVE_STRESS', 1.0, table.concat(results, '|'))
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_EARLY_KEEPALIVE_STRESS_TEST',
+  score = 1.0,
+  callback = test_early_keepalive_stress,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 8: Aggressive immediate close with large body
+-- This should trigger actual failures - server closes socket while client writes
+local function test_immediate_close_large(task)
+  rspamd_logger.errx(task, 'test_immediate_close_large: starting')
+
+  -- Create a VERY large body that won't fit in socket buffer
+  -- This forces the client to block on write, at which point server closes
+  local body_parts = {}
+  for i = 1, 10000 do  -- ~500KB body
+    body_parts[i] = string.format("line %05d: this is padding data to make the body very large and exceed socket buffers\n", i)
+  end
+  local large_body = table.concat(body_parts)
+  rspamd_logger.errx(task, 'test_immediate_close_large: body size = %d bytes', #large_body)
+
+  local function callback(err, code, body)
+    if err then
+      rspamd_logger.errx(task, 'test_immediate_close_large callback error: %s', err)
+      -- Error is EXPECTED here - we want to see how client handles it
+      task:insert_result('HTTP_IMMEDIATE_CLOSE_ERROR', 1.0, err)
+    else
+      rspamd_logger.errx(task, 'test_immediate_close_large callback success: code=%s', code)
+      -- Success means client received the 413 response despite server closing
+      task:insert_result('HTTP_IMMEDIATE_CLOSE_' .. tostring(code), 1.0, body)
+    end
+  end
+
+  rspamd_http.request({
+    url = 'http://127.0.0.1:18083/immediate-close-413',
+    task = task,
+    method = 'post',
+    body = large_body,
+    callback = callback,
+    timeout = 10,
+  })
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_IMMEDIATE_CLOSE_TEST',
+  score = 1.0,
+  callback = test_immediate_close_large,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 9: Slow response with large body - tests race condition
+local function test_slow_response_large(task)
+  rspamd_logger.errx(task, 'test_slow_response_large: starting')
+
+  local body_parts = {}
+  for i = 1, 5000 do  -- ~250KB body
+    body_parts[i] = string.format("slow test line %05d: padding data for the test\n", i)
+  end
+  local body = table.concat(body_parts)
+
+  local err, response = rspamd_http.request({
+    url = 'http://127.0.0.1:18083/slow-response-no-drain',
+    task = task,
+    method = 'post',
+    body = body,
+    timeout = 10,
+  })
+
+  if err then
+    rspamd_logger.errx(task, 'test_slow_response_large error: %s', err)
+    task:insert_result('HTTP_SLOW_RESPONSE_ERROR', 1.0, err)
+  else
+    rspamd_logger.errx(task, 'test_slow_response_large success: code=%s', response.code)
+    task:insert_result('HTTP_SLOW_RESPONSE_' .. tostring(response.code), 1.0, response.content)
+  end
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_SLOW_RESPONSE_TEST',
+  score = 1.0,
+  callback = test_slow_response_large,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 10: Multiple rapid requests to immediate-close endpoint
+-- This stresses the connection handling and cleanup
+local function test_rapid_close_requests(task)
+  rspamd_logger.errx(task, 'test_rapid_close_requests: starting')
+
+  local results = {}
+  local body = string.rep("x", 10000)  -- 10KB body
+
+  for i = 1, 5 do
+    local err, response = rspamd_http.request({
+      url = 'http://127.0.0.1:18083/immediate-close-413',
+      task = task,
+      method = 'post',
+      body = body,
+      timeout = 5,
+    })
+
+    if err then
+      results[#results + 1] = string.format('req%d:err:%s', i, err)
+    else
+      results[#results + 1] = string.format('req%d:ok:%d', i, response.code)
+    end
+  end
+
+  task:insert_result('HTTP_RAPID_CLOSE_RESULTS', 1.0, table.concat(results, '|'))
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_RAPID_CLOSE_TEST',
+  score = 1.0,
+  callback = test_rapid_close_requests,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 11: TRUE early response test with buffer-exceeding body
+-- This test sends a body larger than socket buffers (~256KB+) to ensure
+-- the client actually blocks on write while server sends response
+local function test_block_and_reply(task)
+  rspamd_logger.errx(task, 'test_block_and_reply: starting')
+
+  -- Create body larger than socket buffers (256KB+ for localhost)
+  -- macOS has 128KB send + 128KB receive buffers
+  local body_size = 512 * 1024  -- 512KB should definitely exceed buffers
+  local chunk = string.rep("X", 1024)  -- 1KB chunk
+  local body_parts = {}
+  for i = 1, body_size / 1024 do
+    body_parts[i] = chunk
+  end
+  local huge_body = table.concat(body_parts)
+  rspamd_logger.errx(task, 'test_block_and_reply: body size = %d bytes', #huge_body)
+
+  local function callback(err, code, body)
+    if err then
+      rspamd_logger.errx(task, 'test_block_and_reply error: %s', err)
+      task:insert_result('HTTP_BLOCK_REPLY_ERROR', 1.0, err)
+    else
+      rspamd_logger.errx(task, 'test_block_and_reply success: code=%s body=%s', code, body)
+      -- This is the IDEAL outcome - we got the early 413 response!
+      task:insert_result('HTTP_BLOCK_REPLY_' .. tostring(code), 1.0, body)
+    end
+  end
+
+  rspamd_http.request({
+    url = 'http://127.0.0.1:18083/block-and-reply',
+    task = task,
+    method = 'post',
+    body = huge_body,
+    callback = callback,
+    timeout = 10,
+  })
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_BLOCK_REPLY_TEST',
+  score = 1.0,
+  callback = test_block_and_reply,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 12: Coroutine version of block-and-reply
+local function test_block_and_reply_coro(task)
+  rspamd_logger.errx(task, 'test_block_and_reply_coro: starting')
+
+  -- 512KB body
+  local huge_body = string.rep("Y", 512 * 1024)
+
+  local err, response = rspamd_http.request({
+    url = 'http://127.0.0.1:18083/block-and-reply',
+    task = task,
+    method = 'post',
+    body = huge_body,
+    timeout = 10,
+  })
+
+  if err then
+    rspamd_logger.errx(task, 'test_block_and_reply_coro error: %s', err)
+    task:insert_result('HTTP_BLOCK_REPLY_CORO_ERROR', 1.0, err)
+  else
+    rspamd_logger.errx(task, 'test_block_and_reply_coro success: code=%s', response.code)
+    task:insert_result('HTTP_BLOCK_REPLY_CORO_' .. tostring(response.code), 1.0, response.content)
+  end
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_BLOCK_REPLY_CORO_TEST',
+  score = 1.0,
+  callback = test_block_and_reply_coro,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 13: Slow block test - server waits even longer
+local function test_block_slow(task)
+  rspamd_logger.errx(task, 'test_block_slow: starting')
+
+  -- 1MB body to really fill things up
+  local huge_body = string.rep("Z", 1024 * 1024)
+
+  local err, response = rspamd_http.request({
+    url = 'http://127.0.0.1:18083/block-and-reply-slow',
+    task = task,
+    method = 'post',
+    body = huge_body,
+    timeout = 15,  -- Longer timeout since server waits 1 second
+  })
+
+  if err then
+    rspamd_logger.errx(task, 'test_block_slow error: %s', err)
+    task:insert_result('HTTP_BLOCK_SLOW_ERROR', 1.0, err)
+  else
+    rspamd_logger.errx(task, 'test_block_slow success: code=%s', response.code)
+    task:insert_result('HTTP_BLOCK_SLOW_' .. tostring(response.code), 1.0, response.content)
+  end
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_BLOCK_SLOW_TEST',
+  score = 1.0,
+  callback = test_block_slow,
+  no_squeeze = true,
+  flags = 'coro'
+})
+
+-- Test 14: Instant reply - server responds BEFORE reading headers
+-- This is the most aggressive early response test
+local function test_instant_reply(task)
+  rspamd_logger.errx(task, 'test_instant_reply: starting')
+
+  -- 512KB body - server will respond before even reading our headers
+  local huge_body = string.rep("Z", 512 * 1024)
+
+  local err, response = rspamd_http.request({
+    url = 'http://127.0.0.1:18083/instant-reply',
+    task = task,
+    method = 'post',
+    body = huge_body,
+    timeout = 10,
+  })
+
+  if err then
+    rspamd_logger.errx(task, 'test_instant_reply error: %s', err)
+    task:insert_result('HTTP_INSTANT_REPLY_ERROR', 1.0, err)
+  else
+    rspamd_logger.errx(task, 'test_instant_reply success: code=%s', response.code)
+    task:insert_result('HTTP_INSTANT_REPLY_' .. tostring(response.code), 1.0, response.content)
+  end
+end
+
+rspamd_config:register_symbol({
+  name = 'HTTP_INSTANT_REPLY_TEST',
+  score = 1.0,
+  callback = test_instant_reply,
+  no_squeeze = true,
+  flags = 'coro'
+})
diff --git a/test/functional/util/dummy_http_early_response.py b/test/functional/util/dummy_http_early_response.py
new file mode 100755 (executable)
index 0000000..2fbb294
--- /dev/null
@@ -0,0 +1,300 @@
+#!/usr/bin/env python3
+"""
+A deliberately "buggy" HTTP server that sends early responses before
+reading the full client request. This is used to test rspamd's HTTP
+client handling of edge cases that are allowed by HTTP/1.1 spec.
+
+Scenarios implemented:
+1. /early-reply - Send response immediately after reading headers, before body
+2. /early-error-413 - Send 413 error and close after reading just the request line
+3. /early-error-close - Send error and immediately close connection (no keep-alive)
+4. /keepalive-early - Send response early but keep connection alive
+5. /slow-read-fast-reply - Read request very slowly but reply quickly
+"""
+
+import asyncio
+import argparse
+import sys
+import os
+
+# Add parent directory to path for dummy_killer
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+import dummy_killer
+
+
+class EarlyResponseServer:
+    def __init__(self, host: str, port: int):
+        self.host = host
+        self.port = port
+        self.server = None
+
+    async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
+        """Handle a single client connection."""
+        addr = writer.get_extra_info('peername')
+        print(f"Connection from {addr}", file=sys.stderr)
+
+        try:
+            # Read just the request line first
+            request_line = await asyncio.wait_for(reader.readline(), timeout=10.0)
+            if not request_line:
+                return
+
+            request_line = request_line.decode('utf-8', errors='replace').strip()
+            print(f"Request line: {request_line}", file=sys.stderr)
+
+            parts = request_line.split(' ')
+            if len(parts) < 2:
+                return
+
+            method = parts[0]
+            path = parts[1]
+
+            # For /instant-reply, send response BEFORE even reading headers
+            # This is the most aggressive early response - client is still sending headers+body
+            if path == '/instant-reply':
+                print(f"instant-reply: sending 413 BEFORE reading headers!", file=sys.stderr)
+                await self._send_response(writer, 413, "Request Entity Too Large",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Instant 413 - rejected at request line")
+                await writer.drain()
+                print(f"instant-reply: response sent, closing", file=sys.stderr)
+                return  # Close without reading anything else
+
+            # Read headers (but we might not read body depending on path)
+            headers = {}
+            content_length = 0
+            while True:
+                line = await asyncio.wait_for(reader.readline(), timeout=10.0)
+                if not line or line == b'\r\n' or line == b'\n':
+                    break
+                line = line.decode('utf-8', errors='replace').strip()
+                if ':' in line:
+                    key, value = line.split(':', 1)
+                    headers[key.strip().lower()] = value.strip()
+                    if key.strip().lower() == 'content-length':
+                        content_length = int(value.strip())
+
+            print(f"Headers received, Content-Length: {content_length}", file=sys.stderr)
+
+            # Handle different test scenarios based on path
+            if path == '/early-reply':
+                # Send response immediately WITHOUT reading body
+                # This tests client handling when server replies before body is fully sent
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"early reply - body not read")
+                # Now try to drain remaining data
+                if content_length > 0:
+                    try:
+                        remaining = await asyncio.wait_for(reader.read(content_length), timeout=1.0)
+                        print(f"Drained {len(remaining)} bytes of body", file=sys.stderr)
+                    except asyncio.TimeoutError:
+                        print("Timeout draining body", file=sys.stderr)
+
+            elif path == '/early-error-413':
+                # Send 413 immediately and close - simulates "request too large"
+                # Don't even try to read the body
+                await self._send_response(writer, 413, "Request Entity Too Large",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Request body too large")
+                # Close immediately without reading body
+
+            elif path == '/early-error-close':
+                # Send error and close connection abruptly
+                await self._send_response(writer, 500, "Internal Server Error",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Server error - closing")
+
+            elif path == '/keepalive-early':
+                # Send response early but indicate keep-alive
+                # This tests if client can handle early response + continue with keepalive
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain",
+                                           "Connection": "keep-alive",
+                                           "Keep-Alive": "timeout=30"},
+                                          b"early keepalive response")
+                # Read the body to properly maintain connection state
+                if content_length > 0:
+                    body = await asyncio.wait_for(reader.read(content_length), timeout=5.0)
+                    print(f"Read body: {len(body)} bytes", file=sys.stderr)
+
+                # Wait for potential next request on keep-alive connection
+                try:
+                    next_line = await asyncio.wait_for(reader.readline(), timeout=5.0)
+                    if next_line:
+                        print(f"Keep-alive follow-up: {next_line}", file=sys.stderr)
+                        # Handle second request simply
+                        while True:
+                            line = await asyncio.wait_for(reader.readline(), timeout=5.0)
+                            if not line or line == b'\r\n':
+                                break
+                        await self._send_response(writer, 200, "OK",
+                                                  {"Content-Type": "text/plain", "Connection": "close"},
+                                                  b"second response on keepalive")
+                except asyncio.TimeoutError:
+                    print("No follow-up request on keepalive", file=sys.stderr)
+
+            elif path == '/slow-read-fast-reply':
+                # Start reading body very slowly, but send reply quickly
+                # This creates a race condition
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"fast reply while slowly reading")
+                # Now slowly read
+                if content_length > 0:
+                    read_so_far = 0
+                    while read_so_far < content_length:
+                        chunk = await reader.read(min(100, content_length - read_so_far))
+                        if not chunk:
+                            break
+                        read_so_far += len(chunk)
+                        await asyncio.sleep(0.1)  # Slow down
+
+            elif path == '/partial-read-then-reply':
+                # Read only part of the body, then send response
+                if content_length > 0:
+                    # Read only first 100 bytes
+                    partial = await reader.read(min(100, content_length))
+                    print(f"Read partial body: {len(partial)} bytes", file=sys.stderr)
+
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"response after partial body read")
+
+            elif path == '/immediate-close-413':
+                # Send 413 and close socket IMMEDIATELY without reading anything else
+                # This is the most aggressive case - RST may be sent if client is still writing
+                await self._send_response(writer, 413, "Request Entity Too Large",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Too large - closing immediately")
+                # Force close without draining - this should cause client write to fail
+                writer.transport.abort()  # Forceful close
+                return  # Skip normal close
+
+            elif path == '/block-and-reply':
+                # TRUE early response test:
+                # Send response IMMEDIATELY after headers, before body starts.
+                # The client should receive this while still preparing/sending body.
+                print(f"block-and-reply: sending 413 IMMEDIATELY (content-length={content_length})", file=sys.stderr)
+                # Send response right away - no waiting
+                await self._send_response(writer, 413, "Request Entity Too Large",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Early 413 - body was never read")
+                # Flush immediately
+                await writer.drain()
+                print(f"block-and-reply: response sent, closing without reading body", file=sys.stderr)
+                # Don't read body, just close
+
+            elif path == '/block-and-reply-slow':
+                # Even more aggressive: wait longer to let more data queue up
+                print(f"block-and-reply-slow: waiting for client to fill buffers", file=sys.stderr)
+                await asyncio.sleep(1.0)  # Wait for client to really fill up buffers
+                await self._send_response(writer, 503, "Service Unavailable",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Server busy - your data was ignored")
+                await asyncio.sleep(0.1)
+
+            elif path == '/slow-response-no-drain':
+                # Wait a bit (let client send more data), then respond without reading
+                await asyncio.sleep(0.5)  # Let client start/continue sending
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"slow response - body not read")
+                # Don't drain - close with data potentially still in flight
+
+            elif path == '/request':
+                # Normal request handling for comparison
+                if content_length > 0:
+                    body = await asyncio.wait_for(reader.read(content_length), timeout=10.0)
+                    print(f"Read full body: {len(body)} bytes", file=sys.stderr)
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"normal response")
+
+            elif path == '/keepalive-normal':
+                # Normal keep-alive handling for comparison
+                if content_length > 0:
+                    body = await asyncio.wait_for(reader.read(content_length), timeout=10.0)
+                await self._send_response(writer, 200, "OK",
+                                          {"Content-Type": "text/plain",
+                                           "Connection": "keep-alive",
+                                           "Keep-Alive": "timeout=30"},
+                                          b"normal keepalive response")
+
+            else:
+                await self._send_response(writer, 404, "Not Found",
+                                          {"Content-Type": "text/plain", "Connection": "close"},
+                                          b"Not found")
+
+        except asyncio.TimeoutError:
+            print(f"Timeout handling {addr}", file=sys.stderr)
+        except ConnectionResetError:
+            print(f"Connection reset by {addr}", file=sys.stderr)
+        except Exception as e:
+            print(f"Error handling {addr}: {type(e).__name__}: {e}", file=sys.stderr)
+        finally:
+            try:
+                writer.close()
+                await writer.wait_closed()
+            except Exception:
+                pass
+            print(f"Connection closed for {addr}", file=sys.stderr)
+
+    async def _send_response(self, writer: asyncio.StreamWriter, status: int, status_text: str,
+                             headers: dict, body: bytes):
+        """Send an HTTP response."""
+        response_lines = [f"HTTP/1.1 {status} {status_text}"]
+        headers["Content-Length"] = str(len(body))
+        for key, value in headers.items():
+            response_lines.append(f"{key}: {value}")
+        response_lines.append("")
+        response_lines.append("")
+
+        response_header = "\r\n".join(response_lines).encode('utf-8')
+        writer.write(response_header)
+        writer.write(body)
+        await writer.drain()
+        print(f"Sent response: {status} {status_text}, {len(body)} bytes body", file=sys.stderr)
+
+    async def start(self):
+        """Start the server."""
+        self.server = await asyncio.start_server(
+            self.handle_client, self.host, self.port
+        )
+        print(f"Early response server listening on {self.host}:{self.port}", file=sys.stderr)
+
+    async def serve_forever(self):
+        """Serve until cancelled."""
+        async with self.server:
+            await self.server.serve_forever()
+
+
+async def main():
+    parser = argparse.ArgumentParser(description="HTTP server for testing early response scenarios")
+    parser.add_argument("--bind", "-b", default="127.0.0.1", help="bind address")
+    parser.add_argument("--port", "-p", type=int, default=18083, help="bind port")
+    parser.add_argument("--pidfile", "-pf", help="path to the PID file")
+    args = parser.parse_args()
+
+    print(f"dummy_http_early_response.py: Starting on {args.bind}:{args.port}", file=sys.stderr)
+
+    server = EarlyResponseServer(args.host if hasattr(args, 'host') else args.bind, args.port)
+    await server.start()
+
+    if args.pidfile:
+        dummy_killer.write_pid(args.pidfile)
+        print(f"PID file written to {args.pidfile}", file=sys.stderr)
+
+    await server.serve_forever()
+
+
+if __name__ == "__main__":
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        print("Shutting down...", file=sys.stderr)
+    except Exception as e:
+        print(f"FATAL ERROR: {type(e).__name__}: {e}", file=sys.stderr)
+        import traceback
+        traceback.print_exc(file=sys.stderr)
+        sys.exit(1)