From 642a8addb3ac1bc7448afa4cd09c9e2ea6e448ef Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 11 Sep 2025 14:43:15 +0100 Subject: [PATCH] [Project] Implement more flexible http timeouts Issue: #5611 --- src/libserver/http/http_connection.c | 126 +++++++++++++++++++++++++-- src/libserver/http/http_connection.h | 28 ++++++ src/libserver/http/http_context.c | 117 ++++++++++++++++++++++++- src/libserver/http/http_context.h | 11 +++ src/lua/lua_http.c | 50 ++++++++++- 5 files changed, 318 insertions(+), 14 deletions(-) diff --git a/src/libserver/http/http_connection.c b/src/libserver/http/http_connection.c index b5d70fc1c8..b25b535108 100644 --- a/src/libserver/http/http_connection.c +++ b/src/libserver/http/http_connection.c @@ -69,13 +69,27 @@ struct rspamd_http_connection_private { struct http_parser parser; struct http_parser_settings parser_cb; struct rspamd_io_ev ev; - ev_tstamp timeout; + ev_tstamp timeout; /* legacy/global timeout (fallback) */ + /* Staged timeouts (seconds); 0 means use ctx/defaults */ + ev_tstamp connect_timeout; + ev_tstamp ssl_timeout; + ev_tstamp write_timeout; + ev_tstamp read_timeout; struct rspamd_http_message *msg; struct iovec *out; unsigned int outlen; enum rspamd_http_priv_flags flags; gsize wr_pos; gsize wr_total; + /* Keepalive tuning and telemetry */ + double created_ts; /* when connection object was created */ + double last_used_ts; /* last time returned to pool */ + unsigned int reuse_count; /* number of reuses from keepalive */ + double ka_ttl_override; /* per-request TTL */ + double ka_idle_override; /* per-request idle timeout */ + unsigned int ka_max_reuse_override; /* per-request max reuse */ + /* Internal state */ + bool first_write_done; }; static const rspamd_ftok_t key_header = { @@ -731,11 +745,11 @@ rspamd_http_simple_client_helper(struct rspamd_http_connection *conn) if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) { rspamd_http_connection_read_message_shared(conn, conn->ud, - conn->priv->timeout); + (priv->read_timeout > 0 ? priv->read_timeout : conn->priv->timeout)); } else { rspamd_http_connection_read_message(conn, conn->ud, - conn->priv->timeout); + (priv->read_timeout > 0 ? priv->read_timeout : conn->priv->timeout)); } if (priv->msg) { @@ -853,6 +867,8 @@ call_finish_handler: } else { /* Plan read message */ + /* Switch to read stage timeout */ + priv->first_write_done = true; rspamd_http_simple_client_helper(conn); } } @@ -1130,6 +1146,18 @@ rspamd_http_connection_new_common(struct rspamd_http_context *ctx, priv = g_malloc0(sizeof(struct rspamd_http_connection_private)); conn->priv = priv; priv->ctx = ctx; + /* Initialize staged timeouts and keepalive telemetry */ + priv->connect_timeout = ctx->config.connect_timeout; + priv->ssl_timeout = ctx->config.ssl_timeout; + priv->write_timeout = ctx->config.write_timeout; + priv->read_timeout = ctx->config.read_timeout; + priv->created_ts = rspamd_get_ticks(FALSE); + priv->last_used_ts = 0; + priv->reuse_count = 0; + priv->ka_ttl_override = 0; + priv->ka_idle_override = 0; + priv->ka_max_reuse_override = 0; + priv->first_write_done = false; priv->flags = priv_flags; if (type == RSPAMD_HTTP_SERVER) { @@ -1513,7 +1541,8 @@ rspamd_http_connection_read_message_common(struct rspamd_http_connection *conn, priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED; } - priv->timeout = timeout; + /* Use read-stage timeout override if set; else fallback */ + priv->timeout = (priv->read_timeout > 0 ? priv->read_timeout : timeout); priv->header = NULL; priv->buf = g_malloc0(sizeof(*priv->buf)); REF_INIT_RETAIN(priv->buf, rspamd_http_privbuf_dtor); @@ -2045,7 +2074,8 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn, conn->ud = ud; priv->msg = msg; - priv->timeout = timeout; + /* Use write-stage timeout override if set */ + priv->timeout = (priv->write_timeout > 0 ? priv->write_timeout : timeout); priv->header = NULL; priv->buf = g_malloc0(sizeof(*priv->buf)); @@ -2387,8 +2417,10 @@ if (conn->opts & RSPAMD_HTTP_CLIENT_SSL) { conn->log_tag); g_assert(priv->ssl != NULL); + /* Use ssl_timeout for handshake if provided */ + ev_tstamp ssl_to = (priv->ssl_timeout > 0 ? priv->ssl_timeout : (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout)); if (!rspamd_ssl_connect_fd(priv->ssl, conn->fd, host, &priv->ev, - priv->timeout, rspamd_http_event_handler, + ssl_to, rspamd_http_event_handler, rspamd_http_ssl_err_handler, conn)) { err = g_error_new(HTTP_ERROR, 400, @@ -2415,7 +2447,9 @@ if (conn->opts & RSPAMD_HTTP_CLIENT_SSL) { else { rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE, rspamd_http_event_handler, conn); - rspamd_ev_watcher_start(priv->ctx->event_loop, &priv->ev, priv->timeout); + /* Use connect_timeout on initial EV_WRITE stage if provided */ + ev_tstamp start_to = (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout); + rspamd_ev_watcher_start(priv->ctx->event_loop, &priv->ev, start_to); } return TRUE; @@ -2654,3 +2688,81 @@ void rspamd_http_connection_disable_encryption(struct rspamd_http_connection *co priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_ENCRYPTED; } } + +void rspamd_http_connection_set_timeouts(struct rspamd_http_connection *conn, + ev_tstamp connect_timeout, + ev_tstamp ssl_timeout, + ev_tstamp write_timeout, + ev_tstamp read_timeout) +{ + struct rspamd_http_connection_private *priv = conn->priv; + + if (connect_timeout > 0) { + priv->connect_timeout = connect_timeout; + } + if (ssl_timeout > 0) { + priv->ssl_timeout = ssl_timeout; + } + if (write_timeout > 0) { + priv->write_timeout = write_timeout; + } + if (read_timeout > 0) { + priv->read_timeout = read_timeout; + } +} + +void rspamd_http_connection_set_keepalive_tuning(struct rspamd_http_connection *conn, + double connection_ttl, + double idle_timeout, + unsigned int max_reuse) +{ + struct rspamd_http_connection_private *priv = conn->priv; + + if (connection_ttl > 0) { + priv->ka_ttl_override = connection_ttl; + } + if (idle_timeout > 0) { + priv->ka_idle_override = idle_timeout; + } + if (max_reuse > 0) { + priv->ka_max_reuse_override = max_reuse; + } +} + +void rspamd_http_connection_keepalive_note_put(struct rspamd_http_connection *conn, + double now_ts) +{ + struct rspamd_http_connection_private *priv = conn->priv; + priv->last_used_ts = now_ts; +} + +void rspamd_http_connection_keepalive_note_reuse(struct rspamd_http_connection *conn) +{ + struct rspamd_http_connection_private *priv = conn->priv; + priv->reuse_count++; +} + +gboolean rspamd_http_connection_keepalive_is_valid(struct rspamd_http_connection *conn, + double now_ts, + double default_ttl, + unsigned int default_max_reuse) +{ + struct rspamd_http_connection_private *priv = conn->priv; + double ttl = (priv->ka_ttl_override > 0 ? priv->ka_ttl_override : default_ttl); + unsigned int max_reuse = (priv->ka_max_reuse_override > 0 ? priv->ka_max_reuse_override : default_max_reuse); + + if (ttl > 0 && rspamd_get_ticks(FALSE) - priv->created_ts > ttl) { + return FALSE; + } + if (max_reuse > 0 && priv->reuse_count >= max_reuse) { + return FALSE; + } + return TRUE; +} + +double rspamd_http_connection_keepalive_idle_timeout(struct rspamd_http_connection *conn, + double default_idle) +{ + struct rspamd_http_connection_private *priv = conn->priv; + return (priv->ka_idle_override > 0 ? priv->ka_idle_override : default_idle); +} diff --git a/src/libserver/http/http_connection.h b/src/libserver/http/http_connection.h index 466a3edd97..879417b9f8 100644 --- a/src/libserver/http/http_connection.h +++ b/src/libserver/http/http_connection.h @@ -186,6 +186,34 @@ struct rspamd_http_connection *rspamd_http_connection_new_client( unsigned opts, rspamd_inet_addr_t *addr); +/** + * Set per-request staged timeouts. Pass 0 to keep defaults. + */ +void rspamd_http_connection_set_timeouts(struct rspamd_http_connection *conn, + ev_tstamp connect_timeout, + ev_tstamp ssl_timeout, + ev_tstamp write_timeout, + ev_tstamp read_timeout); + +/** + * Set per-request keepalive tuning. Pass 0 to keep defaults/disable limit. + */ +void rspamd_http_connection_set_keepalive_tuning(struct rspamd_http_connection *conn, + double connection_ttl, + double idle_timeout, + unsigned int max_reuse); + +/* Helpers used by keepalive context */ +void rspamd_http_connection_keepalive_note_put(struct rspamd_http_connection *conn, + double now_ts); +void rspamd_http_connection_keepalive_note_reuse(struct rspamd_http_connection *conn); +gboolean rspamd_http_connection_keepalive_is_valid(struct rspamd_http_connection *conn, + double now_ts, + double default_ttl, + unsigned int default_max_reuse); +double rspamd_http_connection_keepalive_idle_timeout(struct rspamd_http_connection *conn, + double default_idle); + /** * Creates an ordinary client connection using ready file descriptor (ignores proxy) * @param ctx diff --git a/src/libserver/http/http_context.c b/src/libserver/http/http_context.c index df32a22584..0a8c4192a8 100644 --- a/src/libserver/http/http_context.c +++ b/src/libserver/http/http_context.c @@ -101,6 +101,16 @@ rspamd_http_context_new_default(struct rspamd_config *cfg, ctx->config.user_agent = default_user_agent; ctx->config.keepalive_interval = default_keepalive_interval; ctx->config.server_hdr = default_server_hdr; + /* New defaults (disabled -> 0 to preserve legacy single-timeout behavior) */ + ctx->config.connect_timeout = 0.0; + ctx->config.ssl_timeout = 0.0; + ctx->config.write_timeout = 0.0; + ctx->config.read_timeout = 0.0; + ctx->config.keepalive_pool_size = 0; + ctx->config.keepalive_connection_ttl = 0.0; + ctx->config.keepalive_idle_timeout = 0.0; /* fall back to keepalive_interval */ + ctx->config.keepalive_max_reuse = 0; /* unlimited */ + ctx->config.keepalive_eviction_policy = 1; /* LRU */ ctx->ups_ctx = ups_ctx; if (cfg) { @@ -270,6 +280,58 @@ rspamd_http_context_create(struct rspamd_config *cfg, if (http_proxy) { ctx->config.http_proxy = ucl_object_tostring(http_proxy); } + + /* New staged timeouts */ + const ucl_object_t *connect_timeout = ucl_object_lookup(client_obj, "connect_timeout"); + if (connect_timeout) { + ctx->config.connect_timeout = ucl_object_todouble(connect_timeout); + } + const ucl_object_t *ssl_timeout = ucl_object_lookup(client_obj, "ssl_timeout"); + if (ssl_timeout) { + ctx->config.ssl_timeout = ucl_object_todouble(ssl_timeout); + } + const ucl_object_t *write_timeout = ucl_object_lookup(client_obj, "write_timeout"); + if (write_timeout) { + ctx->config.write_timeout = ucl_object_todouble(write_timeout); + } + const ucl_object_t *read_timeout = ucl_object_lookup(client_obj, "read_timeout"); + if (read_timeout) { + ctx->config.read_timeout = ucl_object_todouble(read_timeout); + } + + /* Keepalive/pooling */ + const ucl_object_t *ka_pool_size = ucl_object_lookup(client_obj, "pool_size"); + if (ka_pool_size) { + ctx->config.keepalive_pool_size = ucl_object_toint(ka_pool_size); + } + const ucl_object_t *ka_ttl = ucl_object_lookup(client_obj, "connection_ttl"); + if (ka_ttl) { + ctx->config.keepalive_connection_ttl = ucl_object_todouble(ka_ttl); + } + const ucl_object_t *ka_idle = ucl_object_lookup(client_obj, "idle_timeout"); + if (ka_idle) { + ctx->config.keepalive_idle_timeout = ucl_object_todouble(ka_idle); + } + const ucl_object_t *ka_reuse = ucl_object_lookup(client_obj, "max_reuse"); + if (ka_reuse) { + ctx->config.keepalive_max_reuse = ucl_object_toint(ka_reuse); + } + const ucl_object_t *ka_evict = ucl_object_lookup(client_obj, "eviction_policy"); + if (ka_evict) { + /* map string to int policy if string provided */ + if (ucl_object_type(ka_evict) == UCL_STRING) { + const char *pol = ucl_object_tostring(ka_evict); + if (g_ascii_strcasecmp(pol, "lifo") == 0) { + ctx->config.keepalive_eviction_policy = 0; + } + else { + ctx->config.keepalive_eviction_policy = 1; + } + } + else { + ctx->config.keepalive_eviction_policy = ucl_object_toint(ka_evict); + } + } } server_obj = ucl_object_lookup(http_obj, "server"); @@ -429,7 +491,16 @@ rspamd_http_context_check_keepalive(struct rspamd_http_context *ctx, int err; socklen_t len = sizeof(int); - cbd = g_queue_pop_head(conns); + if (ctx->config.keepalive_eviction_policy == 1) { + /* LRU: reuse the tail (oldest) */ + GList *tail = g_queue_peek_tail_link(conns); + cbd = (struct rspamd_http_keepalive_cbdata *) tail->data; + g_queue_delete_link(conns, tail); + } + else { + /* LIFO: reuse the head (most recent) */ + cbd = g_queue_pop_head(conns); + } rspamd_ev_watcher_stop(ctx->event_loop, &cbd->ev); conn = cbd->conn; g_free(cbd); @@ -453,6 +524,22 @@ rspamd_http_context_check_keepalive(struct rspamd_http_context *ctx, return NULL; } + /* Enforce ttl/reuse limits */ + double now_ts = ev_now(ctx->event_loop); + if (!rspamd_http_connection_keepalive_is_valid(conn, now_ts, + ctx->config.keepalive_connection_ttl, + ctx->config.keepalive_max_reuse)) { + msg_debug_http_context("evict expired keepalive element %s (%s, ssl=%d)", + rspamd_inet_address_to_string_pretty(phk->addr), + phk->host, + (int) phk->is_ssl); + rspamd_http_connection_unref(conn); + return NULL; + } + + /* Track reuse */ + rspamd_http_connection_keepalive_note_reuse(conn); + msg_debug_http_context("reused keepalive element %s (%s, ssl=%d), %d connections queued", rspamd_inet_address_to_string_pretty(phk->addr), phk->host, @@ -656,14 +743,36 @@ void rspamd_http_context_push_keepalive(struct rspamd_http_context *ctx, cbdata->ctx = ctx; conn->finished = FALSE; + /* Enforce pool size (evict tail if exceeded) */ + if (ctx->config.keepalive_pool_size > 0) { + while ((unsigned) cbdata->queue->length > ctx->config.keepalive_pool_size) { + GList *last = g_queue_peek_tail_link(cbdata->queue); + if (last) { + struct rspamd_http_keepalive_cbdata *to_evict = (struct rspamd_http_keepalive_cbdata *) last->data; + g_queue_delete_link(cbdata->queue, last); + rspamd_ev_watcher_stop(cbdata->ctx->event_loop, &to_evict->ev); + rspamd_http_connection_unref(to_evict->conn); + g_free(to_evict); + } + else { + break; + } + } + } + + /* Note time of putting into pool */ + rspamd_http_connection_keepalive_note_put(conn, ev_now(event_loop)); + rspamd_ev_watcher_init(&cbdata->ev, conn->fd, EV_READ, rspamd_http_keepalive_handler, cbdata); - rspamd_ev_watcher_start(event_loop, &cbdata->ev, timeout); + /* Idle timeout override if provided */ + double idle_to = rspamd_http_connection_keepalive_idle_timeout(conn, ctx->config.keepalive_idle_timeout > 0 ? ctx->config.keepalive_idle_timeout : timeout); + rspamd_ev_watcher_start(event_loop, &cbdata->ev, idle_to); msg_debug_http_context("push keepalive element %s (%s), %d connections queued, %.1f timeout", rspamd_inet_address_to_string_pretty(cbdata->conn->keepalive_hash_key->addr), cbdata->conn->keepalive_hash_key->host, cbdata->queue->length, - timeout); -} \ No newline at end of file + idle_to); +} diff --git a/src/libserver/http/http_context.h b/src/libserver/http/http_context.h index 1b15f40679..b393f93b7c 100644 --- a/src/libserver/http/http_context.h +++ b/src/libserver/http/http_context.h @@ -41,6 +41,17 @@ struct rspamd_http_context_cfg { const char *user_agent; const char *http_proxy; const char *server_hdr; + /* Client-side staged timeouts (seconds) */ + double connect_timeout; /* TCP connect */ + double ssl_timeout; /* SSL handshake */ + double write_timeout; /* Request write */ + double read_timeout; /* Response read */ + /* Keep-alive/pool tuning */ + unsigned int keepalive_pool_size; /* max conns per key */ + double keepalive_connection_ttl; /* absolute TTL */ + double keepalive_idle_timeout; /* idle timeout */ + unsigned int keepalive_max_reuse; /* reuse limit */ + int keepalive_eviction_policy; /* 0=LIFO,1=LRU */ }; /** diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 731b8b0571..0ab5944fed 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -506,7 +506,7 @@ lua_http_resume_handler(struct rspamd_http_connection *conn, } static gboolean -lua_http_make_connection(struct lua_http_cbdata *cbd) +lua_http_make_connection(lua_State *L, struct lua_http_cbdata *cbd) { rspamd_inet_address_set_port(cbd->addr, cbd->msg->port); unsigned http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; @@ -574,6 +574,43 @@ lua_http_make_connection(struct lua_http_cbdata *cbd) cbd->flags |= RSPAMD_LUA_HTTP_FLAG_RESOLVED; } + /* Optional per-request tuning from table (if present) */ + if (lua_type(L, 1) == LUA_TTABLE) { + double connect_timeout = 0, ssl_timeout = 0, write_timeout = 0, read_timeout = 0; + double connection_ttl = 0, idle_timeout = 0; + unsigned int max_reuse = 0; + lua_pushstring(L, "connect_timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) connect_timeout = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushstring(L, "ssl_timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) ssl_timeout = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushstring(L, "write_timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) write_timeout = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushstring(L, "read_timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) read_timeout = lua_tonumber(L, -1); + lua_pop(L, 1); + rspamd_http_connection_set_timeouts(cbd->conn, connect_timeout, ssl_timeout, write_timeout, read_timeout); + lua_pushstring(L, "connection_ttl"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) connection_ttl = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushstring(L, "idle_timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) idle_timeout = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushstring(L, "max_reuse"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) max_reuse = lua_tointeger(L, -1); + lua_pop(L, 1); + rspamd_http_connection_set_keepalive_tuning(cbd->conn, connection_ttl, idle_timeout, max_reuse); + } + if (cbd->task) { cbd->conn->log_tag = cbd->task->task_pool->tag.uid; @@ -635,7 +672,7 @@ lua_http_dns_handler(struct rdns_reply *reply, gpointer ud) } else { REF_RETAIN(cbd); - if (!lua_http_make_connection(cbd)) { + if (!lua_http_make_connection(NULL, cbd)) { lua_http_push_error(cbd, "unable to make connection to the host"); if (cbd->ref.refcount > 1) { @@ -713,6 +750,13 @@ lua_http_push_headers(lua_State *L, struct rspamd_http_message *msg) * @param {boolean} keepalive enable keep-alive pool * @param {string} user for HTTP authentication * @param {string} password for HTTP authentication, only if "user" present + * @param {number} connect_timeout optional TCP connect timeout (seconds) + * @param {number} ssl_timeout optional SSL handshake timeout (seconds) + * @param {number} write_timeout optional request write timeout (seconds) + * @param {number} read_timeout optional response read timeout (seconds) + * @param {number} connection_ttl optional absolute keep-alive connection TTL (seconds) + * @param {number} idle_timeout optional keep-alive idle timeout override (seconds) + * @param {number} max_reuse optional keep-alive max reuse count per connection * @return {boolean} `true`, in **async** mode, if a request has been successfully scheduled. If this value is `false` then some error occurred, the callback thus will not be called. * @return In **sync** mode `string|nil, nil|table` In sync mode error message if any and response as table: `int` _code_, `string` _content_ and `table` _headers_ (header -> value) */ @@ -1276,7 +1320,7 @@ lua_http_request(lua_State *L) gboolean ret; REF_RETAIN(cbd); - ret = lua_http_make_connection(cbd); + ret = lua_http_make_connection(L, cbd); if (!ret) { if (cbd->up) { -- 2.47.3