]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] lua_tcp: phase-specific timeouts and on_error callback 6034/head
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sun, 10 May 2026 09:25:14 +0000 (10:25 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sun, 10 May 2026 09:25:14 +0000 (10:25 +0100)
Two opt-in additions to rspamd_tcp.new, motivated by issue #6032 (mx_check
probe shapes — connect-vs-read budget independence and connect-phase error
routing without dummy-queueing a read handler).

A. Phase-specific timeouts.

  * New options: connect_timeout, read_timeout, write_timeout. Setting any
    of them switches the request to phased mode: each phase gets its own
    budget, unset phase fields fall back to `timeout`. The watcher is
    re-armed from the appropriate field on every plan_handler_event entry
    (LUA_WANT_READ / LUA_WANT_WRITE / LUA_WANT_CONNECT).

  * Backwards compat: existing callers passing only `timeout` keep the
    current single-deducted-budget contract by construction. A new
    `use_deduction` flag gates both the `elapsed` deduction in
    lua_tcp_handler and the per-phase reset in plan_handler_event. No call
    site changes its observable behaviour unless it actively sets a phase
    field.

  * Rationale (Option 2 from the issue): lua_tcp underpins every AV scanner
    and lualib helper. The HTTP-style "no deduction" alternative would
    silently shift their wall-clock from `<= timeout` to `<= N x timeout`;
    Option 2 avoids that surprise for one extra bool and one extra branch.

B. on_error callback for connect-phase errors.

  * New `on_error(err, conn)` callback fires at most once for failures
    that occur before LUA_TCP_FLAG_CONNECTED is set: DNS resolution, socket
    creation, connect refused/timeout, SSL handshake. Once the connection
    is established, errors continue to flow through the queued read/write
    callback unchanged.

  * Routing is exclusive: when on_error is set and we are pre-CONNECTED,
    the error goes there alone (no queue-walking fanout). One-shot — the
    ref is dropped on first fire so subsequent failures fall through to
    the regular handler path. SSL handshake errors land here because
    LUA_TCP_FLAG_CONNECTED is only set after the handshake completes.

  * Pure-probe support: a request with `read = false`, no `data`, and an
    on_error/on_connect would previously short-circuit (empty handler
    queue -> "no handlers left, finish session" before the dial ever
    completed). The constructor now pushes a LUA_WANT_CONNECT marker in
    that shape so plan_handler_event arms EV_WRITE; lua_tcp_connect_helper
    handles the async case (shift the marker, re-plan, let the empty queue
    drive the FINISHED tear-down) — previously it dereferenced cbd->thread
    unconditionally and was sync-only.

C. Tests (test/functional/lua/tcp.lua + cases/230_tcp.robot).

  * PHASED_TIMEOUT_TEST — phased timeouts on the success path emit
    PHASED_TCP_OK.
  * ON_ERROR_REFUSED_TEST — connect to closed port 1, no read/data; only
    the on_error callback fires (regular callback must not).
  * ON_ERROR_POST_CONNECT_TEST — connect succeeds against dummy_http
    /timeout, read_timeout=0.5 trips post-CONNECTED; the read callback
    receives the timeout, on_error must NOT fire.

src/lua/lua_tcp.c
test/functional/cases/230_tcp.robot
test/functional/lua/tcp.lua

index f660e70292a352a9628f06736d695d89c486aafc..dd6dc7af04ea763ab4b6a2d259d94cb7ee67c422 100644 (file)
@@ -331,6 +331,7 @@ struct lua_tcp_cbdata {
        GQueue *handlers;
        int fd;
        int connect_cb;
+       int error_cbref;
        unsigned int port;
        unsigned int flags;
        char tag[7];
@@ -344,6 +345,10 @@ struct lua_tcp_cbdata {
        struct rspamd_ssl_connection *ssl_conn;
        char *hostname;
        struct upstream *up;
+       double connect_timeout;
+       double read_timeout;
+       double write_timeout;
+       gboolean use_deduction;
        gboolean eof;
 };
 
@@ -448,6 +453,11 @@ lua_tcp_fin(gpointer arg)
                luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
        }
 
+       if (cbd->error_cbref != -1) {
+               luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->error_cbref);
+               cbd->error_cbref = -1;
+       }
+
        if (cbd->ssl_conn) {
                /* TODO: postpone close in case ssl is used ! */
                rspamd_ssl_connection_free(cbd->ssl_conn);
@@ -556,6 +566,55 @@ lua_tcp_push_error(struct lua_tcp_cbdata *cbd, gboolean is_fatal,
                return;
        }
 
+       /*
+        * Connect-phase error routing. If the caller registered on_error and we
+        * have not yet entered the connected state, route the error there
+        * exclusively (no queue-walking fanout). Once LUA_TCP_FLAG_CONNECTED is
+        * set (after TCP connect, and for SSL after the handshake completes),
+        * errors flow through the queued read/write handler as before.
+        */
+       if (cbd->error_cbref != -1 && !(cbd->flags & LUA_TCP_FLAG_CONNECTED)) {
+               lua_thread_pool_prepare_callback(cbd->cfg->lua_thread_pool, &cbs);
+               L = cbs.L;
+
+               top = lua_gettop(L);
+               lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->error_cbref);
+
+               va_start(ap, err);
+               lua_pushvfstring(L, err, ap);
+               va_end(ap);
+
+               pcbd = lua_newuserdata(L, sizeof(*pcbd));
+               *pcbd = cbd;
+               rspamd_lua_setclass(L, rspamd_tcp_classname, -1);
+               TCP_RETAIN(cbd);
+
+               if (cbd->item) {
+                       rspamd_symcache_set_cur_item(cbd->task, cbd->item);
+               }
+
+               if (lua_pcall(L, 2, 0, 0) != 0) {
+                       msg_info("on_error callback call failed: %s", lua_tostring(L, -1));
+               }
+
+               lua_settop(L, top);
+               TCP_RELEASE(cbd);
+
+               /* One-shot: drop the ref so subsequent errors fall through to the
+                * regular handler-queue fanout (and we don't double-call). */
+               luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->error_cbref);
+               cbd->error_cbref = -1;
+
+               if ((cbd->flags & (LUA_TCP_FLAG_FINISHED | LUA_TCP_FLAG_CONNECTED)) ==
+                       (LUA_TCP_FLAG_FINISHED | LUA_TCP_FLAG_CONNECTED)) {
+                       TCP_RELEASE(cbd);
+               }
+
+               lua_thread_pool_restore_callback(&cbs);
+
+               return;
+       }
+
        lua_thread_pool_prepare_callback(cbd->cfg->lua_thread_pool, &cbs);
        L = cbs.L;
 
@@ -782,10 +841,26 @@ lua_tcp_plan_read(struct lua_tcp_cbdata *cbd)
 static void
 lua_tcp_connect_helper(struct lua_tcp_cbdata *cbd)
 {
-       /* This is used for sync mode only */
-       lua_State *L = cbd->thread->lua_state;
-
        struct lua_tcp_cbdata **pcbd;
+       lua_State *L;
+
+       if (cbd->thread == NULL) {
+               /*
+                * Async pure-connect probe: the LUA_WANT_CONNECT marker was queued only
+                * to keep plan_handler_event from finishing the session before the
+                * socket completes the dial. The on_connect callback (if any) was
+                * already fired in lua_tcp_handler when LUA_TCP_FLAG_CONNECTED got
+                * set; shift the marker and re-plan so the now-empty queue triggers
+                * the FINISHED tear-down.
+                */
+               msg_debug_tcp("tcp connected (async probe)");
+               lua_tcp_shift_handler(cbd);
+               lua_tcp_plan_handler_event(cbd, FALSE, FALSE);
+               return;
+       }
+
+       /* Sync mode: resume the waiting Lua thread with the tcp_sync userdata. */
+       L = cbd->thread->lua_state;
 
        lua_pushboolean(L, TRUE);
 
@@ -1106,10 +1181,17 @@ lua_tcp_handler(int fd, short what, gpointer ud)
 
        elapsed = rspamd_ev_watcher_stop(cbd->event_loop, &cbd->ev);
 
-       /* Adjust timeout, as we have already spent time */
-       if (elapsed > 0 && elapsed < cbd->ev.timeout) {
-               cbd->ev.timeout -= elapsed;
+       /*
+        * Legacy single-budget mode: deduct elapsed wall-clock from the remaining
+        * budget. In phased mode (any of connect/read/write_timeout set by the
+        * caller) ev.timeout is (re)armed per phase in lua_tcp_plan_handler_event.
+        */
+       if (cbd->use_deduction) {
+               if (elapsed > 0 && elapsed < cbd->ev.timeout) {
+                       cbd->ev.timeout -= elapsed;
+               }
        }
+       (void) elapsed;
 
        if (what == EV_READ) {
                if (cbd->ssl_conn) {
@@ -1233,6 +1315,9 @@ lua_tcp_plan_handler_event(struct lua_tcp_cbdata *cbd, gboolean can_read,
                                if (can_read) {
                                        /* We need to plan a new event */
                                        msg_debug_tcp("plan new read");
+                                       if (!cbd->use_deduction) {
+                                               cbd->ev.timeout = cbd->read_timeout;
+                                       }
                                        rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev,
                                                                                                 EV_READ);
                                }
@@ -1257,6 +1342,9 @@ lua_tcp_plan_handler_event(struct lua_tcp_cbdata *cbd, gboolean can_read,
                        if (hdl->h.w.pos < hdl->h.w.total_bytes) {
                                msg_debug_tcp("plan new write");
                                if (can_write) {
+                                       if (!cbd->use_deduction) {
+                                               cbd->ev.timeout = cbd->write_timeout;
+                                       }
                                        rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev,
                                                                                                 EV_WRITE);
                                }
@@ -1277,6 +1365,9 @@ lua_tcp_plan_handler_event(struct lua_tcp_cbdata *cbd, gboolean can_read,
                else { /* LUA_WANT_CONNECT */
                        rspamd_session_event_update_label(cbd->async_ev, "tcp connect");
                        msg_debug_tcp("plan new connect");
+                       if (!cbd->use_deduction) {
+                               cbd->ev.timeout = cbd->connect_timeout;
+                       }
                        rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev,
                                                                                 EV_WRITE);
                }
@@ -1515,8 +1606,12 @@ lua_tcp_arg_toiovec(lua_State *L, int pos, struct lua_tcp_cbdata *cbd,
  * - `port`: remote port to use
  * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
  * - `callback`: continuation function (required)
- * - `on_connect`: callback called on connection success
- * - `timeout`: floating point value that specifies timeout for IO operations in **seconds**
+ * - `on_connect`: callback called on connection success (success-only)
+ * - `on_error`: callback fired for connect-phase errors only (DNS, socket, connect refused/timeout, SSL handshake). Once the connection is established, errors flow through the regular read/write callback.
+ * - `timeout`: floating point value that specifies timeout for IO operations in **seconds** (single deducted budget across all phases — legacy behaviour)
+ * - `connect_timeout`: per-phase timeout for the dial. Setting any of `connect_timeout`/`read_timeout`/`write_timeout` opts the request into phased timeouts; unset phase fields fall back to `timeout`.
+ * - `read_timeout`: per-phase timeout, armed on each read.
+ * - `write_timeout`: per-phase timeout, armed on each write.
  * - `partial`: boolean flag that specifies that callback should be called on any data portion received
  * - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp)
  * - `shutdown`: half-close socket after writing (boolean: default false)
@@ -1531,7 +1626,7 @@ lua_tcp_request(lua_State *L)
        const char *host;
        char *stop_pattern = NULL;
        unsigned int port;
-       int cbref, tp, conn_cbref = -1;
+       int cbref, tp, conn_cbref = -1, error_cbref = -1;
        gsize plen = 0;
        struct ev_loop *event_loop = NULL;
        struct lua_tcp_cbdata *cbd;
@@ -1544,6 +1639,7 @@ lua_tcp_request(lua_State *L)
        unsigned int niov = 0, total_out;
        uint64_t h;
        double timeout = default_tcp_timeout;
+       double connect_timeout = 0.0, read_timeout = 0.0, write_timeout = 0.0;
        gboolean partial = FALSE, do_shutdown = FALSE, do_read = TRUE,
                         ssl = FALSE, ssl_noverify = FALSE;
 
@@ -1639,6 +1735,27 @@ lua_tcp_request(lua_State *L)
                }
                lua_pop(L, 1);
 
+               lua_pushstring(L, "connect_timeout");
+               lua_gettable(L, -2);
+               if (lua_type(L, -1) == LUA_TNUMBER) {
+                       connect_timeout = lua_tonumber(L, -1);
+               }
+               lua_pop(L, 1);
+
+               lua_pushstring(L, "read_timeout");
+               lua_gettable(L, -2);
+               if (lua_type(L, -1) == LUA_TNUMBER) {
+                       read_timeout = lua_tonumber(L, -1);
+               }
+               lua_pop(L, 1);
+
+               lua_pushstring(L, "write_timeout");
+               lua_gettable(L, -2);
+               if (lua_type(L, -1) == LUA_TNUMBER) {
+                       write_timeout = lua_tonumber(L, -1);
+               }
+               lua_pop(L, 1);
+
                lua_pushstring(L, "stop_pattern");
                lua_gettable(L, -2);
                if (lua_type(L, -1) == LUA_TSTRING) {
@@ -1710,6 +1827,16 @@ lua_tcp_request(lua_State *L)
                        lua_pop(L, 1);
                }
 
+               lua_pushstring(L, "on_error");
+               lua_gettable(L, -2);
+
+               if (lua_type(L, -1) == LUA_TFUNCTION) {
+                       error_cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+               }
+               else {
+                       lua_pop(L, 1);
+               }
+
                lua_pushstring(L, "upstream");
                lua_gettable(L, 1);
 
@@ -1828,7 +1955,27 @@ lua_tcp_request(lua_State *L)
        cbd->event_loop = event_loop;
        cbd->fd = -1;
        cbd->port = port;
-       cbd->ev.timeout = timeout;
+
+       /*
+        * Phased vs legacy timeout selection. If the caller set ANY of the per-
+        * phase fields, switch to phased mode: each phase gets its own budget,
+        * unset phase fields fall back to `timeout`. Otherwise, preserve the
+        * existing single-deducted-budget contract for callers that pass only
+        * `timeout`.
+        */
+       if (connect_timeout > 0 || read_timeout > 0 || write_timeout > 0) {
+               cbd->connect_timeout = connect_timeout > 0 ? connect_timeout : timeout;
+               cbd->read_timeout = read_timeout > 0 ? read_timeout : timeout;
+               cbd->write_timeout = write_timeout > 0 ? write_timeout : timeout;
+               cbd->use_deduction = FALSE;
+       }
+       else {
+               cbd->connect_timeout = timeout;
+               cbd->read_timeout = timeout;
+               cbd->write_timeout = timeout;
+               cbd->use_deduction = TRUE;
+       }
+       cbd->ev.timeout = cbd->connect_timeout;
 
        if (ssl) {
                cbd->flags |= LUA_TCP_FLAG_SSL;
@@ -1865,7 +2012,24 @@ lua_tcp_request(lua_State *L)
                g_queue_push_tail(cbd->handlers, rh);
        }
 
+       /*
+        * Pure-probe shape: caller wants to verify TCP connectivity (and/or be
+        * notified of connect-phase errors via on_error) without queueing any
+        * read or write. Push a LUA_WANT_CONNECT marker so plan_handler_event
+        * arms the EV_WRITE watcher; the marker is shifted in
+        * lua_tcp_connect_helper once the connect resolves. Without this,
+        * plan_handler_event would see an empty queue and tear the session down
+        * before the dial ever completes.
+        */
+       if (g_queue_get_length(cbd->handlers) == 0 &&
+               (conn_cbref != -1 || error_cbref != -1)) {
+               struct lua_tcp_handler *ch = g_malloc0(sizeof(*ch));
+               ch->type = LUA_WANT_CONNECT;
+               g_queue_push_tail(cbd->handlers, ch);
+       }
+
        cbd->connect_cb = conn_cbref;
+       cbd->error_cbref = error_cbref;
        REF_INIT_RETAIN(cbd, lua_tcp_maybe_free);
 
        if (up) {
@@ -2063,6 +2227,11 @@ lua_tcp_connect_sync(lua_State *L)
        cbd->in = g_byte_array_new();
 
        cbd->connect_cb = -1;
+       cbd->error_cbref = -1;
+       cbd->connect_timeout = timeout;
+       cbd->read_timeout = timeout;
+       cbd->write_timeout = timeout;
+       cbd->use_deduction = TRUE;
 
        REF_INIT_RETAIN(cbd, lua_tcp_maybe_free);
 
index d53a7fcceb302f517dec7c99006b96f2de172ab9..227354eb14ceb2984a1878713ba1f28e6ca8ef21 100644 (file)
@@ -48,6 +48,25 @@ Sync API TCP get request
 #  Check url  /request  post  HTTP_SYNC_EOF_post  hello post
 #  Check url  /content-length  post  HTTP_SYNC_CONTENT_post  hello post
 
+Phased timeouts on success path
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [PHASED_TIMEOUT_TEST]}
+  Expect Symbol  PHASED_TCP_OK
+  Do Not Expect Symbol  PHASED_TCP_ERROR
+
+on_error fires on connect refused
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [ON_ERROR_REFUSED_TEST]}
+  Expect Symbol  ON_ERROR_FIRED
+  Do Not Expect Symbol  ON_ERROR_REGULAR_CB_FIRED
+
+on_error not fired post-connect
+  Scan File  ${MESSAGE}
+  ...  Settings={symbols_enabled = [ON_ERROR_POST_CONNECT_TEST]}
+  Expect Symbol  POST_CONNECT_READ_TIMEOUT
+  Do Not Expect Symbol  POST_CONNECT_ON_ERROR_FIRED
+  Do Not Expect Symbol  POST_CONNECT_READ_OK
+
 *** Keywords ***
 Servers Setup
   Run Dummy Http
index d2c3eb447a0cad49d8f8403e4b0aa748ea540118..92d55bb76fa113fc14d9261c934b5b16a3f30f56 100644 (file)
@@ -276,4 +276,101 @@ rspamd_config:register_symbol({
   no_squeeze = true,
   flags = 'coro',
 })
+
+-- [[ Phased timeouts: connect_timeout / read_timeout / write_timeout opt the
+--    request into per-phase budgets. Verify a normal request completes when
+--    these are set instead of `timeout`. ]]
+local function phased_timeout_symbol(task)
+  local function read_cb(err, data, _)
+    if err then
+      task:insert_result('PHASED_TCP_ERROR', 1.0, tostring(err))
+    else
+      task:insert_result('PHASED_TCP_OK', 1.0, tostring(data))
+    end
+  end
+  rspamd_tcp:request({
+    task = task,
+    callback = read_cb,
+    host = '127.0.0.1',
+    data = {'GET /request HTTP/1.1\r\nHost: 127.0.0.1:18080\r\nConnection: close\r\n\r\n'},
+    read = true,
+    port = 18080,
+    connect_timeout = 2.0,
+    read_timeout = 2.0,
+    write_timeout = 2.0,
+  })
+end
+
+-- [[ on_error: connect to a closed port. The on_error callback must fire and
+--    the regular `callback` must NOT receive the error (proving the queue-walk
+--    fanout is suppressed in connect-phase). ]]
+local function on_error_refused_symbol(task)
+  local function regular_cb(err, _, _)
+    -- This MUST NOT fire when on_error is registered and connect fails.
+    task:insert_result('ON_ERROR_REGULAR_CB_FIRED', 1.0, tostring(err))
+  end
+  local function err_cb(err, _)
+    task:insert_result('ON_ERROR_FIRED', 1.0, tostring(err))
+  end
+  rspamd_tcp:request({
+    task = task,
+    callback = regular_cb,
+    on_error = err_cb,
+    host = '127.0.0.1',
+    port = 1,            -- closed port: connection refused
+    read = false,
+    connect_timeout = 1.0,
+  })
+end
+
+-- [[ on_error post-CONNECTED: connect succeeds (real HTTP server), then the
+--    read times out. on_error must NOT fire; the read callback receives the
+--    error as before. ]]
+local function on_error_post_connect_symbol(task)
+  local function read_cb(err, _, _)
+    if err then
+      task:insert_result('POST_CONNECT_READ_TIMEOUT', 1.0, tostring(err))
+    else
+      task:insert_result('POST_CONNECT_READ_OK', 1.0)
+    end
+  end
+  local function err_cb(err, _)
+    -- This MUST NOT fire because the error happens after CONNECTED is set.
+    task:insert_result('POST_CONNECT_ON_ERROR_FIRED', 1.0, tostring(err))
+  end
+  rspamd_tcp:request({
+    task = task,
+    callback = read_cb,
+    on_error = err_cb,
+    host = '127.0.0.1',
+    port = 18080,
+    -- /timeout sleeps 4s before responding. read_timeout=0.5 forces the read
+    -- side to time out after the connect succeeds.
+    data = {'GET /timeout HTTP/1.1\r\nHost: 127.0.0.1:18080\r\nConnection: close\r\n\r\n'},
+    read = true,
+    stop_pattern = '\r\n\r\n',
+    connect_timeout = 2.0,
+    read_timeout = 0.5,
+    write_timeout = 2.0,
+  })
+end
+
+rspamd_config:register_symbol({
+  name = 'PHASED_TIMEOUT_TEST',
+  score = 1.0,
+  callback = phased_timeout_symbol,
+  no_squeeze = true,
+})
+rspamd_config:register_symbol({
+  name = 'ON_ERROR_REFUSED_TEST',
+  score = 1.0,
+  callback = on_error_refused_symbol,
+  no_squeeze = true,
+})
+rspamd_config:register_symbol({
+  name = 'ON_ERROR_POST_CONNECT_TEST',
+  score = 1.0,
+  callback = on_error_post_connect_symbol,
+  no_squeeze = true,
+})
 -- ]]