From: Vsevolod Stakhov Date: Sun, 10 May 2026 09:25:14 +0000 (+0100) Subject: [Feature] lua_tcp: phase-specific timeouts and on_error callback X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=25545db649b45a8071e2917a9a2fcb2205c62151;p=thirdparty%2Frspamd.git [Feature] lua_tcp: phase-specific timeouts and on_error callback Two opt-in additions to rspamd_tcp.new, motivated by issue #6032 (mx_check probe shapes — connect-vs-read budget independence and connect-phase error routing without dummy-queueing a read handler). A. Phase-specific timeouts. * New options: connect_timeout, read_timeout, write_timeout. Setting any of them switches the request to phased mode: each phase gets its own budget, unset phase fields fall back to `timeout`. The watcher is re-armed from the appropriate field on every plan_handler_event entry (LUA_WANT_READ / LUA_WANT_WRITE / LUA_WANT_CONNECT). * Backwards compat: existing callers passing only `timeout` keep the current single-deducted-budget contract by construction. A new `use_deduction` flag gates both the `elapsed` deduction in lua_tcp_handler and the per-phase reset in plan_handler_event. No call site changes its observable behaviour unless it actively sets a phase field. * Rationale (Option 2 from the issue): lua_tcp underpins every AV scanner and lualib helper. The HTTP-style "no deduction" alternative would silently shift their wall-clock from `<= timeout` to `<= N x timeout`; Option 2 avoids that surprise for one extra bool and one extra branch. B. on_error callback for connect-phase errors. * New `on_error(err, conn)` callback fires at most once for failures that occur before LUA_TCP_FLAG_CONNECTED is set: DNS resolution, socket creation, connect refused/timeout, SSL handshake. Once the connection is established, errors continue to flow through the queued read/write callback unchanged. * Routing is exclusive: when on_error is set and we are pre-CONNECTED, the error goes there alone (no queue-walking fanout). One-shot — the ref is dropped on first fire so subsequent failures fall through to the regular handler path. SSL handshake errors land here because LUA_TCP_FLAG_CONNECTED is only set after the handshake completes. * Pure-probe support: a request with `read = false`, no `data`, and an on_error/on_connect would previously short-circuit (empty handler queue -> "no handlers left, finish session" before the dial ever completed). The constructor now pushes a LUA_WANT_CONNECT marker in that shape so plan_handler_event arms EV_WRITE; lua_tcp_connect_helper handles the async case (shift the marker, re-plan, let the empty queue drive the FINISHED tear-down) — previously it dereferenced cbd->thread unconditionally and was sync-only. C. Tests (test/functional/lua/tcp.lua + cases/230_tcp.robot). * PHASED_TIMEOUT_TEST — phased timeouts on the success path emit PHASED_TCP_OK. * ON_ERROR_REFUSED_TEST — connect to closed port 1, no read/data; only the on_error callback fires (regular callback must not). * ON_ERROR_POST_CONNECT_TEST — connect succeeds against dummy_http /timeout, read_timeout=0.5 trips post-CONNECTED; the read callback receives the timeout, on_error must NOT fire. --- diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index f660e70292..dd6dc7af04 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -331,6 +331,7 @@ struct lua_tcp_cbdata { GQueue *handlers; int fd; int connect_cb; + int error_cbref; unsigned int port; unsigned int flags; char tag[7]; @@ -344,6 +345,10 @@ struct lua_tcp_cbdata { struct rspamd_ssl_connection *ssl_conn; char *hostname; struct upstream *up; + double connect_timeout; + double read_timeout; + double write_timeout; + gboolean use_deduction; gboolean eof; }; @@ -448,6 +453,11 @@ lua_tcp_fin(gpointer arg) luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb); } + if (cbd->error_cbref != -1) { + luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->error_cbref); + cbd->error_cbref = -1; + } + if (cbd->ssl_conn) { /* TODO: postpone close in case ssl is used ! */ rspamd_ssl_connection_free(cbd->ssl_conn); @@ -556,6 +566,55 @@ lua_tcp_push_error(struct lua_tcp_cbdata *cbd, gboolean is_fatal, return; } + /* + * Connect-phase error routing. If the caller registered on_error and we + * have not yet entered the connected state, route the error there + * exclusively (no queue-walking fanout). Once LUA_TCP_FLAG_CONNECTED is + * set (after TCP connect, and for SSL after the handshake completes), + * errors flow through the queued read/write handler as before. + */ + if (cbd->error_cbref != -1 && !(cbd->flags & LUA_TCP_FLAG_CONNECTED)) { + lua_thread_pool_prepare_callback(cbd->cfg->lua_thread_pool, &cbs); + L = cbs.L; + + top = lua_gettop(L); + lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->error_cbref); + + va_start(ap, err); + lua_pushvfstring(L, err, ap); + va_end(ap); + + pcbd = lua_newuserdata(L, sizeof(*pcbd)); + *pcbd = cbd; + rspamd_lua_setclass(L, rspamd_tcp_classname, -1); + TCP_RETAIN(cbd); + + if (cbd->item) { + rspamd_symcache_set_cur_item(cbd->task, cbd->item); + } + + if (lua_pcall(L, 2, 0, 0) != 0) { + msg_info("on_error callback call failed: %s", lua_tostring(L, -1)); + } + + lua_settop(L, top); + TCP_RELEASE(cbd); + + /* One-shot: drop the ref so subsequent errors fall through to the + * regular handler-queue fanout (and we don't double-call). */ + luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->error_cbref); + cbd->error_cbref = -1; + + if ((cbd->flags & (LUA_TCP_FLAG_FINISHED | LUA_TCP_FLAG_CONNECTED)) == + (LUA_TCP_FLAG_FINISHED | LUA_TCP_FLAG_CONNECTED)) { + TCP_RELEASE(cbd); + } + + lua_thread_pool_restore_callback(&cbs); + + return; + } + lua_thread_pool_prepare_callback(cbd->cfg->lua_thread_pool, &cbs); L = cbs.L; @@ -782,10 +841,26 @@ lua_tcp_plan_read(struct lua_tcp_cbdata *cbd) static void lua_tcp_connect_helper(struct lua_tcp_cbdata *cbd) { - /* This is used for sync mode only */ - lua_State *L = cbd->thread->lua_state; - struct lua_tcp_cbdata **pcbd; + lua_State *L; + + if (cbd->thread == NULL) { + /* + * Async pure-connect probe: the LUA_WANT_CONNECT marker was queued only + * to keep plan_handler_event from finishing the session before the + * socket completes the dial. The on_connect callback (if any) was + * already fired in lua_tcp_handler when LUA_TCP_FLAG_CONNECTED got + * set; shift the marker and re-plan so the now-empty queue triggers + * the FINISHED tear-down. + */ + msg_debug_tcp("tcp connected (async probe)"); + lua_tcp_shift_handler(cbd); + lua_tcp_plan_handler_event(cbd, FALSE, FALSE); + return; + } + + /* Sync mode: resume the waiting Lua thread with the tcp_sync userdata. */ + L = cbd->thread->lua_state; lua_pushboolean(L, TRUE); @@ -1106,10 +1181,17 @@ lua_tcp_handler(int fd, short what, gpointer ud) elapsed = rspamd_ev_watcher_stop(cbd->event_loop, &cbd->ev); - /* Adjust timeout, as we have already spent time */ - if (elapsed > 0 && elapsed < cbd->ev.timeout) { - cbd->ev.timeout -= elapsed; + /* + * Legacy single-budget mode: deduct elapsed wall-clock from the remaining + * budget. In phased mode (any of connect/read/write_timeout set by the + * caller) ev.timeout is (re)armed per phase in lua_tcp_plan_handler_event. + */ + if (cbd->use_deduction) { + if (elapsed > 0 && elapsed < cbd->ev.timeout) { + cbd->ev.timeout -= elapsed; + } } + (void) elapsed; if (what == EV_READ) { if (cbd->ssl_conn) { @@ -1233,6 +1315,9 @@ lua_tcp_plan_handler_event(struct lua_tcp_cbdata *cbd, gboolean can_read, if (can_read) { /* We need to plan a new event */ msg_debug_tcp("plan new read"); + if (!cbd->use_deduction) { + cbd->ev.timeout = cbd->read_timeout; + } rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ); } @@ -1257,6 +1342,9 @@ lua_tcp_plan_handler_event(struct lua_tcp_cbdata *cbd, gboolean can_read, if (hdl->h.w.pos < hdl->h.w.total_bytes) { msg_debug_tcp("plan new write"); if (can_write) { + if (!cbd->use_deduction) { + cbd->ev.timeout = cbd->write_timeout; + } rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE); } @@ -1277,6 +1365,9 @@ lua_tcp_plan_handler_event(struct lua_tcp_cbdata *cbd, gboolean can_read, else { /* LUA_WANT_CONNECT */ rspamd_session_event_update_label(cbd->async_ev, "tcp connect"); msg_debug_tcp("plan new connect"); + if (!cbd->use_deduction) { + cbd->ev.timeout = cbd->connect_timeout; + } rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE); } @@ -1515,8 +1606,12 @@ lua_tcp_arg_toiovec(lua_State *L, int pos, struct lua_tcp_cbdata *cbd, * - `port`: remote port to use * - `data`: a table of strings or `rspamd_text` objects that contains data pieces * - `callback`: continuation function (required) - * - `on_connect`: callback called on connection success - * - `timeout`: floating point value that specifies timeout for IO operations in **seconds** + * - `on_connect`: callback called on connection success (success-only) + * - `on_error`: callback fired for connect-phase errors only (DNS, socket, connect refused/timeout, SSL handshake). Once the connection is established, errors flow through the regular read/write callback. + * - `timeout`: floating point value that specifies timeout for IO operations in **seconds** (single deducted budget across all phases — legacy behaviour) + * - `connect_timeout`: per-phase timeout for the dial. Setting any of `connect_timeout`/`read_timeout`/`write_timeout` opts the request into phased timeouts; unset phase fields fall back to `timeout`. + * - `read_timeout`: per-phase timeout, armed on each read. + * - `write_timeout`: per-phase timeout, armed on each write. * - `partial`: boolean flag that specifies that callback should be called on any data portion received * - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp) * - `shutdown`: half-close socket after writing (boolean: default false) @@ -1531,7 +1626,7 @@ lua_tcp_request(lua_State *L) const char *host; char *stop_pattern = NULL; unsigned int port; - int cbref, tp, conn_cbref = -1; + int cbref, tp, conn_cbref = -1, error_cbref = -1; gsize plen = 0; struct ev_loop *event_loop = NULL; struct lua_tcp_cbdata *cbd; @@ -1544,6 +1639,7 @@ lua_tcp_request(lua_State *L) unsigned int niov = 0, total_out; uint64_t h; double timeout = default_tcp_timeout; + double connect_timeout = 0.0, read_timeout = 0.0, write_timeout = 0.0; gboolean partial = FALSE, do_shutdown = FALSE, do_read = TRUE, ssl = FALSE, ssl_noverify = FALSE; @@ -1639,6 +1735,27 @@ lua_tcp_request(lua_State *L) } lua_pop(L, 1); + lua_pushstring(L, "connect_timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + connect_timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "read_timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + read_timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "write_timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + write_timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + lua_pushstring(L, "stop_pattern"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { @@ -1710,6 +1827,16 @@ lua_tcp_request(lua_State *L) lua_pop(L, 1); } + lua_pushstring(L, "on_error"); + lua_gettable(L, -2); + + if (lua_type(L, -1) == LUA_TFUNCTION) { + error_cbref = luaL_ref(L, LUA_REGISTRYINDEX); + } + else { + lua_pop(L, 1); + } + lua_pushstring(L, "upstream"); lua_gettable(L, 1); @@ -1828,7 +1955,27 @@ lua_tcp_request(lua_State *L) cbd->event_loop = event_loop; cbd->fd = -1; cbd->port = port; - cbd->ev.timeout = timeout; + + /* + * Phased vs legacy timeout selection. If the caller set ANY of the per- + * phase fields, switch to phased mode: each phase gets its own budget, + * unset phase fields fall back to `timeout`. Otherwise, preserve the + * existing single-deducted-budget contract for callers that pass only + * `timeout`. + */ + if (connect_timeout > 0 || read_timeout > 0 || write_timeout > 0) { + cbd->connect_timeout = connect_timeout > 0 ? connect_timeout : timeout; + cbd->read_timeout = read_timeout > 0 ? read_timeout : timeout; + cbd->write_timeout = write_timeout > 0 ? write_timeout : timeout; + cbd->use_deduction = FALSE; + } + else { + cbd->connect_timeout = timeout; + cbd->read_timeout = timeout; + cbd->write_timeout = timeout; + cbd->use_deduction = TRUE; + } + cbd->ev.timeout = cbd->connect_timeout; if (ssl) { cbd->flags |= LUA_TCP_FLAG_SSL; @@ -1865,7 +2012,24 @@ lua_tcp_request(lua_State *L) g_queue_push_tail(cbd->handlers, rh); } + /* + * Pure-probe shape: caller wants to verify TCP connectivity (and/or be + * notified of connect-phase errors via on_error) without queueing any + * read or write. Push a LUA_WANT_CONNECT marker so plan_handler_event + * arms the EV_WRITE watcher; the marker is shifted in + * lua_tcp_connect_helper once the connect resolves. Without this, + * plan_handler_event would see an empty queue and tear the session down + * before the dial ever completes. + */ + if (g_queue_get_length(cbd->handlers) == 0 && + (conn_cbref != -1 || error_cbref != -1)) { + struct lua_tcp_handler *ch = g_malloc0(sizeof(*ch)); + ch->type = LUA_WANT_CONNECT; + g_queue_push_tail(cbd->handlers, ch); + } + cbd->connect_cb = conn_cbref; + cbd->error_cbref = error_cbref; REF_INIT_RETAIN(cbd, lua_tcp_maybe_free); if (up) { @@ -2063,6 +2227,11 @@ lua_tcp_connect_sync(lua_State *L) cbd->in = g_byte_array_new(); cbd->connect_cb = -1; + cbd->error_cbref = -1; + cbd->connect_timeout = timeout; + cbd->read_timeout = timeout; + cbd->write_timeout = timeout; + cbd->use_deduction = TRUE; REF_INIT_RETAIN(cbd, lua_tcp_maybe_free); diff --git a/test/functional/cases/230_tcp.robot b/test/functional/cases/230_tcp.robot index d53a7fcceb..227354eb14 100644 --- a/test/functional/cases/230_tcp.robot +++ b/test/functional/cases/230_tcp.robot @@ -48,6 +48,25 @@ Sync API TCP get request # Check url /request post HTTP_SYNC_EOF_post hello post # Check url /content-length post HTTP_SYNC_CONTENT_post hello post +Phased timeouts on success path + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [PHASED_TIMEOUT_TEST]} + Expect Symbol PHASED_TCP_OK + Do Not Expect Symbol PHASED_TCP_ERROR + +on_error fires on connect refused + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [ON_ERROR_REFUSED_TEST]} + Expect Symbol ON_ERROR_FIRED + Do Not Expect Symbol ON_ERROR_REGULAR_CB_FIRED + +on_error not fired post-connect + Scan File ${MESSAGE} + ... Settings={symbols_enabled = [ON_ERROR_POST_CONNECT_TEST]} + Expect Symbol POST_CONNECT_READ_TIMEOUT + Do Not Expect Symbol POST_CONNECT_ON_ERROR_FIRED + Do Not Expect Symbol POST_CONNECT_READ_OK + *** Keywords *** Servers Setup Run Dummy Http diff --git a/test/functional/lua/tcp.lua b/test/functional/lua/tcp.lua index d2c3eb447a..92d55bb76f 100644 --- a/test/functional/lua/tcp.lua +++ b/test/functional/lua/tcp.lua @@ -276,4 +276,101 @@ rspamd_config:register_symbol({ no_squeeze = true, flags = 'coro', }) + +-- [[ Phased timeouts: connect_timeout / read_timeout / write_timeout opt the +-- request into per-phase budgets. Verify a normal request completes when +-- these are set instead of `timeout`. ]] +local function phased_timeout_symbol(task) + local function read_cb(err, data, _) + if err then + task:insert_result('PHASED_TCP_ERROR', 1.0, tostring(err)) + else + task:insert_result('PHASED_TCP_OK', 1.0, tostring(data)) + end + end + rspamd_tcp:request({ + task = task, + callback = read_cb, + host = '127.0.0.1', + data = {'GET /request HTTP/1.1\r\nHost: 127.0.0.1:18080\r\nConnection: close\r\n\r\n'}, + read = true, + port = 18080, + connect_timeout = 2.0, + read_timeout = 2.0, + write_timeout = 2.0, + }) +end + +-- [[ on_error: connect to a closed port. The on_error callback must fire and +-- the regular `callback` must NOT receive the error (proving the queue-walk +-- fanout is suppressed in connect-phase). ]] +local function on_error_refused_symbol(task) + local function regular_cb(err, _, _) + -- This MUST NOT fire when on_error is registered and connect fails. + task:insert_result('ON_ERROR_REGULAR_CB_FIRED', 1.0, tostring(err)) + end + local function err_cb(err, _) + task:insert_result('ON_ERROR_FIRED', 1.0, tostring(err)) + end + rspamd_tcp:request({ + task = task, + callback = regular_cb, + on_error = err_cb, + host = '127.0.0.1', + port = 1, -- closed port: connection refused + read = false, + connect_timeout = 1.0, + }) +end + +-- [[ on_error post-CONNECTED: connect succeeds (real HTTP server), then the +-- read times out. on_error must NOT fire; the read callback receives the +-- error as before. ]] +local function on_error_post_connect_symbol(task) + local function read_cb(err, _, _) + if err then + task:insert_result('POST_CONNECT_READ_TIMEOUT', 1.0, tostring(err)) + else + task:insert_result('POST_CONNECT_READ_OK', 1.0) + end + end + local function err_cb(err, _) + -- This MUST NOT fire because the error happens after CONNECTED is set. + task:insert_result('POST_CONNECT_ON_ERROR_FIRED', 1.0, tostring(err)) + end + rspamd_tcp:request({ + task = task, + callback = read_cb, + on_error = err_cb, + host = '127.0.0.1', + port = 18080, + -- /timeout sleeps 4s before responding. read_timeout=0.5 forces the read + -- side to time out after the connect succeeds. + data = {'GET /timeout HTTP/1.1\r\nHost: 127.0.0.1:18080\r\nConnection: close\r\n\r\n'}, + read = true, + stop_pattern = '\r\n\r\n', + connect_timeout = 2.0, + read_timeout = 0.5, + write_timeout = 2.0, + }) +end + +rspamd_config:register_symbol({ + name = 'PHASED_TIMEOUT_TEST', + score = 1.0, + callback = phased_timeout_symbol, + no_squeeze = true, +}) +rspamd_config:register_symbol({ + name = 'ON_ERROR_REFUSED_TEST', + score = 1.0, + callback = on_error_refused_symbol, + no_squeeze = true, +}) +rspamd_config:register_symbol({ + name = 'ON_ERROR_POST_CONNECT_TEST', + score = 1.0, + callback = on_error_post_connect_symbol, + no_squeeze = true, +}) -- ]]