GQueue *handlers;
int fd;
int connect_cb;
+ int error_cbref;
unsigned int port;
unsigned int flags;
char tag[7];
struct rspamd_ssl_connection *ssl_conn;
char *hostname;
struct upstream *up;
+ double connect_timeout;
+ double read_timeout;
+ double write_timeout;
+ gboolean use_deduction;
gboolean eof;
};
luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
}
+ if (cbd->error_cbref != -1) {
+ luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->error_cbref);
+ cbd->error_cbref = -1;
+ }
+
if (cbd->ssl_conn) {
/* TODO: postpone close in case ssl is used ! */
rspamd_ssl_connection_free(cbd->ssl_conn);
return;
}
+ /*
+ * Connect-phase error routing. If the caller registered on_error and we
+ * have not yet entered the connected state, route the error there
+ * exclusively (no queue-walking fanout). Once LUA_TCP_FLAG_CONNECTED is
+ * set (after TCP connect, and for SSL after the handshake completes),
+ * errors flow through the queued read/write handler as before.
+ */
+ if (cbd->error_cbref != -1 && !(cbd->flags & LUA_TCP_FLAG_CONNECTED)) {
+ lua_thread_pool_prepare_callback(cbd->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
+
+ top = lua_gettop(L);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->error_cbref);
+
+ va_start(ap, err);
+ lua_pushvfstring(L, err, ap);
+ va_end(ap);
+
+ pcbd = lua_newuserdata(L, sizeof(*pcbd));
+ *pcbd = cbd;
+ rspamd_lua_setclass(L, rspamd_tcp_classname, -1);
+ TCP_RETAIN(cbd);
+
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item(cbd->task, cbd->item);
+ }
+
+ if (lua_pcall(L, 2, 0, 0) != 0) {
+ msg_info("on_error callback call failed: %s", lua_tostring(L, -1));
+ }
+
+ lua_settop(L, top);
+ TCP_RELEASE(cbd);
+
+ /* One-shot: drop the ref so subsequent errors fall through to the
+ * regular handler-queue fanout (and we don't double-call). */
+ luaL_unref(cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->error_cbref);
+ cbd->error_cbref = -1;
+
+ if ((cbd->flags & (LUA_TCP_FLAG_FINISHED | LUA_TCP_FLAG_CONNECTED)) ==
+ (LUA_TCP_FLAG_FINISHED | LUA_TCP_FLAG_CONNECTED)) {
+ TCP_RELEASE(cbd);
+ }
+
+ lua_thread_pool_restore_callback(&cbs);
+
+ return;
+ }
+
lua_thread_pool_prepare_callback(cbd->cfg->lua_thread_pool, &cbs);
L = cbs.L;
static void
lua_tcp_connect_helper(struct lua_tcp_cbdata *cbd)
{
- /* This is used for sync mode only */
- lua_State *L = cbd->thread->lua_state;
-
struct lua_tcp_cbdata **pcbd;
+ lua_State *L;
+
+ if (cbd->thread == NULL) {
+ /*
+ * Async pure-connect probe: the LUA_WANT_CONNECT marker was queued only
+ * to keep plan_handler_event from finishing the session before the
+ * socket completes the dial. The on_connect callback (if any) was
+ * already fired in lua_tcp_handler when LUA_TCP_FLAG_CONNECTED got
+ * set; shift the marker and re-plan so the now-empty queue triggers
+ * the FINISHED tear-down.
+ */
+ msg_debug_tcp("tcp connected (async probe)");
+ lua_tcp_shift_handler(cbd);
+ lua_tcp_plan_handler_event(cbd, FALSE, FALSE);
+ return;
+ }
+
+ /* Sync mode: resume the waiting Lua thread with the tcp_sync userdata. */
+ L = cbd->thread->lua_state;
lua_pushboolean(L, TRUE);
elapsed = rspamd_ev_watcher_stop(cbd->event_loop, &cbd->ev);
- /* Adjust timeout, as we have already spent time */
- if (elapsed > 0 && elapsed < cbd->ev.timeout) {
- cbd->ev.timeout -= elapsed;
+ /*
+ * Legacy single-budget mode: deduct elapsed wall-clock from the remaining
+ * budget. In phased mode (any of connect/read/write_timeout set by the
+ * caller) ev.timeout is (re)armed per phase in lua_tcp_plan_handler_event.
+ */
+ if (cbd->use_deduction) {
+ if (elapsed > 0 && elapsed < cbd->ev.timeout) {
+ cbd->ev.timeout -= elapsed;
+ }
}
+ (void) elapsed;
if (what == EV_READ) {
if (cbd->ssl_conn) {
if (can_read) {
/* We need to plan a new event */
msg_debug_tcp("plan new read");
+ if (!cbd->use_deduction) {
+ cbd->ev.timeout = cbd->read_timeout;
+ }
rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev,
EV_READ);
}
if (hdl->h.w.pos < hdl->h.w.total_bytes) {
msg_debug_tcp("plan new write");
if (can_write) {
+ if (!cbd->use_deduction) {
+ cbd->ev.timeout = cbd->write_timeout;
+ }
rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev,
EV_WRITE);
}
else { /* LUA_WANT_CONNECT */
rspamd_session_event_update_label(cbd->async_ev, "tcp connect");
msg_debug_tcp("plan new connect");
+ if (!cbd->use_deduction) {
+ cbd->ev.timeout = cbd->connect_timeout;
+ }
rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev,
EV_WRITE);
}
* - `port`: remote port to use
* - `data`: a table of strings or `rspamd_text` objects that contains data pieces
* - `callback`: continuation function (required)
- * - `on_connect`: callback called on connection success
- * - `timeout`: floating point value that specifies timeout for IO operations in **seconds**
+ * - `on_connect`: callback called on connection success (success-only)
+ * - `on_error`: callback fired for connect-phase errors only (DNS, socket, connect refused/timeout, SSL handshake). Once the connection is established, errors flow through the regular read/write callback.
+ * - `timeout`: floating point value that specifies timeout for IO operations in **seconds** (single deducted budget across all phases — legacy behaviour)
+ * - `connect_timeout`: per-phase timeout for the dial. Setting any of `connect_timeout`/`read_timeout`/`write_timeout` opts the request into phased timeouts; unset phase fields fall back to `timeout`.
+ * - `read_timeout`: per-phase timeout, armed on each read.
+ * - `write_timeout`: per-phase timeout, armed on each write.
* - `partial`: boolean flag that specifies that callback should be called on any data portion received
* - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp)
* - `shutdown`: half-close socket after writing (boolean: default false)
const char *host;
char *stop_pattern = NULL;
unsigned int port;
- int cbref, tp, conn_cbref = -1;
+ int cbref, tp, conn_cbref = -1, error_cbref = -1;
gsize plen = 0;
struct ev_loop *event_loop = NULL;
struct lua_tcp_cbdata *cbd;
unsigned int niov = 0, total_out;
uint64_t h;
double timeout = default_tcp_timeout;
+ double connect_timeout = 0.0, read_timeout = 0.0, write_timeout = 0.0;
gboolean partial = FALSE, do_shutdown = FALSE, do_read = TRUE,
ssl = FALSE, ssl_noverify = FALSE;
}
lua_pop(L, 1);
+ lua_pushstring(L, "connect_timeout");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ connect_timeout = lua_tonumber(L, -1);
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "read_timeout");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ read_timeout = lua_tonumber(L, -1);
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "write_timeout");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ write_timeout = lua_tonumber(L, -1);
+ }
+ lua_pop(L, 1);
+
lua_pushstring(L, "stop_pattern");
lua_gettable(L, -2);
if (lua_type(L, -1) == LUA_TSTRING) {
lua_pop(L, 1);
}
+ lua_pushstring(L, "on_error");
+ lua_gettable(L, -2);
+
+ if (lua_type(L, -1) == LUA_TFUNCTION) {
+ error_cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+ }
+ else {
+ lua_pop(L, 1);
+ }
+
lua_pushstring(L, "upstream");
lua_gettable(L, 1);
cbd->event_loop = event_loop;
cbd->fd = -1;
cbd->port = port;
- cbd->ev.timeout = timeout;
+
+ /*
+ * Phased vs legacy timeout selection. If the caller set ANY of the per-
+ * phase fields, switch to phased mode: each phase gets its own budget,
+ * unset phase fields fall back to `timeout`. Otherwise, preserve the
+ * existing single-deducted-budget contract for callers that pass only
+ * `timeout`.
+ */
+ if (connect_timeout > 0 || read_timeout > 0 || write_timeout > 0) {
+ cbd->connect_timeout = connect_timeout > 0 ? connect_timeout : timeout;
+ cbd->read_timeout = read_timeout > 0 ? read_timeout : timeout;
+ cbd->write_timeout = write_timeout > 0 ? write_timeout : timeout;
+ cbd->use_deduction = FALSE;
+ }
+ else {
+ cbd->connect_timeout = timeout;
+ cbd->read_timeout = timeout;
+ cbd->write_timeout = timeout;
+ cbd->use_deduction = TRUE;
+ }
+ cbd->ev.timeout = cbd->connect_timeout;
if (ssl) {
cbd->flags |= LUA_TCP_FLAG_SSL;
g_queue_push_tail(cbd->handlers, rh);
}
+ /*
+ * Pure-probe shape: caller wants to verify TCP connectivity (and/or be
+ * notified of connect-phase errors via on_error) without queueing any
+ * read or write. Push a LUA_WANT_CONNECT marker so plan_handler_event
+ * arms the EV_WRITE watcher; the marker is shifted in
+ * lua_tcp_connect_helper once the connect resolves. Without this,
+ * plan_handler_event would see an empty queue and tear the session down
+ * before the dial ever completes.
+ */
+ if (g_queue_get_length(cbd->handlers) == 0 &&
+ (conn_cbref != -1 || error_cbref != -1)) {
+ struct lua_tcp_handler *ch = g_malloc0(sizeof(*ch));
+ ch->type = LUA_WANT_CONNECT;
+ g_queue_push_tail(cbd->handlers, ch);
+ }
+
cbd->connect_cb = conn_cbref;
+ cbd->error_cbref = error_cbref;
REF_INIT_RETAIN(cbd, lua_tcp_maybe_free);
if (up) {
cbd->in = g_byte_array_new();
cbd->connect_cb = -1;
+ cbd->error_cbref = -1;
+ cbd->connect_timeout = timeout;
+ cbd->read_timeout = timeout;
+ cbd->write_timeout = timeout;
+ cbd->use_deduction = TRUE;
REF_INIT_RETAIN(cbd, lua_tcp_maybe_free);
no_squeeze = true,
flags = 'coro',
})
+
+-- [[ Phased timeouts: connect_timeout / read_timeout / write_timeout opt the
+-- request into per-phase budgets. Verify a normal request completes when
+-- these are set instead of `timeout`. ]]
+local function phased_timeout_symbol(task)
+ local function read_cb(err, data, _)
+ if err then
+ task:insert_result('PHASED_TCP_ERROR', 1.0, tostring(err))
+ else
+ task:insert_result('PHASED_TCP_OK', 1.0, tostring(data))
+ end
+ end
+ rspamd_tcp:request({
+ task = task,
+ callback = read_cb,
+ host = '127.0.0.1',
+ data = {'GET /request HTTP/1.1\r\nHost: 127.0.0.1:18080\r\nConnection: close\r\n\r\n'},
+ read = true,
+ port = 18080,
+ connect_timeout = 2.0,
+ read_timeout = 2.0,
+ write_timeout = 2.0,
+ })
+end
+
+-- [[ on_error: connect to a closed port. The on_error callback must fire and
+-- the regular `callback` must NOT receive the error (proving the queue-walk
+-- fanout is suppressed in connect-phase). ]]
+local function on_error_refused_symbol(task)
+ local function regular_cb(err, _, _)
+ -- This MUST NOT fire when on_error is registered and connect fails.
+ task:insert_result('ON_ERROR_REGULAR_CB_FIRED', 1.0, tostring(err))
+ end
+ local function err_cb(err, _)
+ task:insert_result('ON_ERROR_FIRED', 1.0, tostring(err))
+ end
+ rspamd_tcp:request({
+ task = task,
+ callback = regular_cb,
+ on_error = err_cb,
+ host = '127.0.0.1',
+ port = 1, -- closed port: connection refused
+ read = false,
+ connect_timeout = 1.0,
+ })
+end
+
+-- [[ on_error post-CONNECTED: connect succeeds (real HTTP server), then the
+-- read times out. on_error must NOT fire; the read callback receives the
+-- error as before. ]]
+local function on_error_post_connect_symbol(task)
+ local function read_cb(err, _, _)
+ if err then
+ task:insert_result('POST_CONNECT_READ_TIMEOUT', 1.0, tostring(err))
+ else
+ task:insert_result('POST_CONNECT_READ_OK', 1.0)
+ end
+ end
+ local function err_cb(err, _)
+ -- This MUST NOT fire because the error happens after CONNECTED is set.
+ task:insert_result('POST_CONNECT_ON_ERROR_FIRED', 1.0, tostring(err))
+ end
+ rspamd_tcp:request({
+ task = task,
+ callback = read_cb,
+ on_error = err_cb,
+ host = '127.0.0.1',
+ port = 18080,
+ -- /timeout sleeps 4s before responding. read_timeout=0.5 forces the read
+ -- side to time out after the connect succeeds.
+ data = {'GET /timeout HTTP/1.1\r\nHost: 127.0.0.1:18080\r\nConnection: close\r\n\r\n'},
+ read = true,
+ stop_pattern = '\r\n\r\n',
+ connect_timeout = 2.0,
+ read_timeout = 0.5,
+ write_timeout = 2.0,
+ })
+end
+
+rspamd_config:register_symbol({
+ name = 'PHASED_TIMEOUT_TEST',
+ score = 1.0,
+ callback = phased_timeout_symbol,
+ no_squeeze = true,
+})
+rspamd_config:register_symbol({
+ name = 'ON_ERROR_REFUSED_TEST',
+ score = 1.0,
+ callback = on_error_refused_symbol,
+ no_squeeze = true,
+})
+rspamd_config:register_symbol({
+ name = 'ON_ERROR_POST_CONNECT_TEST',
+ score = 1.0,
+ callback = on_error_post_connect_symbol,
+ no_squeeze = true,
+})
-- ]]