struct http_parser parser;
struct http_parser_settings parser_cb;
struct rspamd_io_ev ev;
- ev_tstamp timeout;
+ ev_tstamp timeout; /* legacy/global timeout (fallback) */
+ /* Staged timeouts (seconds); 0 means use ctx/defaults */
+ ev_tstamp connect_timeout;
+ ev_tstamp ssl_timeout;
+ ev_tstamp write_timeout;
+ ev_tstamp read_timeout;
struct rspamd_http_message *msg;
struct iovec *out;
unsigned int outlen;
enum rspamd_http_priv_flags flags;
gsize wr_pos;
gsize wr_total;
+ /* Keepalive tuning and telemetry */
+ double created_ts; /* when connection object was created */
+ double last_used_ts; /* last time returned to pool */
+ unsigned int reuse_count; /* number of reuses from keepalive */
+ double ka_ttl_override; /* per-request TTL */
+ double ka_idle_override; /* per-request idle timeout */
+ unsigned int ka_max_reuse_override; /* per-request max reuse */
+ /* Internal state */
+ bool first_write_done;
};
static const rspamd_ftok_t key_header = {
if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
rspamd_http_connection_read_message_shared(conn, conn->ud,
- conn->priv->timeout);
+ (priv->read_timeout > 0 ? priv->read_timeout : conn->priv->timeout));
}
else {
rspamd_http_connection_read_message(conn, conn->ud,
- conn->priv->timeout);
+ (priv->read_timeout > 0 ? priv->read_timeout : conn->priv->timeout));
}
if (priv->msg) {
}
else {
/* Plan read message */
+ /* Switch to read stage timeout */
+ priv->first_write_done = true;
rspamd_http_simple_client_helper(conn);
}
}
priv = g_malloc0(sizeof(struct rspamd_http_connection_private));
conn->priv = priv;
priv->ctx = ctx;
+ /* Initialize staged timeouts and keepalive telemetry */
+ priv->connect_timeout = ctx->config.connect_timeout;
+ priv->ssl_timeout = ctx->config.ssl_timeout;
+ priv->write_timeout = ctx->config.write_timeout;
+ priv->read_timeout = ctx->config.read_timeout;
+ priv->created_ts = rspamd_get_ticks(FALSE);
+ priv->last_used_ts = 0;
+ priv->reuse_count = 0;
+ priv->ka_ttl_override = 0;
+ priv->ka_idle_override = 0;
+ priv->ka_max_reuse_override = 0;
+ priv->first_write_done = false;
priv->flags = priv_flags;
if (type == RSPAMD_HTTP_SERVER) {
priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
- priv->timeout = timeout;
+ /* Use read-stage timeout override if set; else fallback */
+ priv->timeout = (priv->read_timeout > 0 ? priv->read_timeout : timeout);
priv->header = NULL;
priv->buf = g_malloc0(sizeof(*priv->buf));
REF_INIT_RETAIN(priv->buf, rspamd_http_privbuf_dtor);
conn->ud = ud;
priv->msg = msg;
- priv->timeout = timeout;
+ /* Use write-stage timeout override if set */
+ priv->timeout = (priv->write_timeout > 0 ? priv->write_timeout : timeout);
priv->header = NULL;
priv->buf = g_malloc0(sizeof(*priv->buf));
conn->log_tag);
g_assert(priv->ssl != NULL);
+ /* Use ssl_timeout for handshake if provided */
+ ev_tstamp ssl_to = (priv->ssl_timeout > 0 ? priv->ssl_timeout : (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout));
if (!rspamd_ssl_connect_fd(priv->ssl, conn->fd, host, &priv->ev,
- priv->timeout, rspamd_http_event_handler,
+ ssl_to, rspamd_http_event_handler,
rspamd_http_ssl_err_handler, conn)) {
err = g_error_new(HTTP_ERROR, 400,
else {
rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE,
rspamd_http_event_handler, conn);
- rspamd_ev_watcher_start(priv->ctx->event_loop, &priv->ev, priv->timeout);
+ /* Use connect_timeout on initial EV_WRITE stage if provided */
+ ev_tstamp start_to = (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout);
+ rspamd_ev_watcher_start(priv->ctx->event_loop, &priv->ev, start_to);
}
return TRUE;
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
}
+
+void rspamd_http_connection_set_timeouts(struct rspamd_http_connection *conn,
+ ev_tstamp connect_timeout,
+ ev_tstamp ssl_timeout,
+ ev_tstamp write_timeout,
+ ev_tstamp read_timeout)
+{
+ struct rspamd_http_connection_private *priv = conn->priv;
+
+ if (connect_timeout > 0) {
+ priv->connect_timeout = connect_timeout;
+ }
+ if (ssl_timeout > 0) {
+ priv->ssl_timeout = ssl_timeout;
+ }
+ if (write_timeout > 0) {
+ priv->write_timeout = write_timeout;
+ }
+ if (read_timeout > 0) {
+ priv->read_timeout = read_timeout;
+ }
+}
+
+void rspamd_http_connection_set_keepalive_tuning(struct rspamd_http_connection *conn,
+ double connection_ttl,
+ double idle_timeout,
+ unsigned int max_reuse)
+{
+ struct rspamd_http_connection_private *priv = conn->priv;
+
+ if (connection_ttl > 0) {
+ priv->ka_ttl_override = connection_ttl;
+ }
+ if (idle_timeout > 0) {
+ priv->ka_idle_override = idle_timeout;
+ }
+ if (max_reuse > 0) {
+ priv->ka_max_reuse_override = max_reuse;
+ }
+}
+
+void rspamd_http_connection_keepalive_note_put(struct rspamd_http_connection *conn,
+ double now_ts)
+{
+ struct rspamd_http_connection_private *priv = conn->priv;
+ priv->last_used_ts = now_ts;
+}
+
+void rspamd_http_connection_keepalive_note_reuse(struct rspamd_http_connection *conn)
+{
+ struct rspamd_http_connection_private *priv = conn->priv;
+ priv->reuse_count++;
+}
+
+gboolean rspamd_http_connection_keepalive_is_valid(struct rspamd_http_connection *conn,
+ double now_ts,
+ double default_ttl,
+ unsigned int default_max_reuse)
+{
+ struct rspamd_http_connection_private *priv = conn->priv;
+ double ttl = (priv->ka_ttl_override > 0 ? priv->ka_ttl_override : default_ttl);
+ unsigned int max_reuse = (priv->ka_max_reuse_override > 0 ? priv->ka_max_reuse_override : default_max_reuse);
+
+ if (ttl > 0 && rspamd_get_ticks(FALSE) - priv->created_ts > ttl) {
+ return FALSE;
+ }
+ if (max_reuse > 0 && priv->reuse_count >= max_reuse) {
+ return FALSE;
+ }
+ return TRUE;
+}
+
+double rspamd_http_connection_keepalive_idle_timeout(struct rspamd_http_connection *conn,
+ double default_idle)
+{
+ struct rspamd_http_connection_private *priv = conn->priv;
+ return (priv->ka_idle_override > 0 ? priv->ka_idle_override : default_idle);
+}
unsigned opts,
rspamd_inet_addr_t *addr);
+/**
+ * Set per-request staged timeouts. Pass 0 to keep defaults.
+ */
+void rspamd_http_connection_set_timeouts(struct rspamd_http_connection *conn,
+ ev_tstamp connect_timeout,
+ ev_tstamp ssl_timeout,
+ ev_tstamp write_timeout,
+ ev_tstamp read_timeout);
+
+/**
+ * Set per-request keepalive tuning. Pass 0 to keep defaults/disable limit.
+ */
+void rspamd_http_connection_set_keepalive_tuning(struct rspamd_http_connection *conn,
+ double connection_ttl,
+ double idle_timeout,
+ unsigned int max_reuse);
+
+/* Helpers used by keepalive context */
+void rspamd_http_connection_keepalive_note_put(struct rspamd_http_connection *conn,
+ double now_ts);
+void rspamd_http_connection_keepalive_note_reuse(struct rspamd_http_connection *conn);
+gboolean rspamd_http_connection_keepalive_is_valid(struct rspamd_http_connection *conn,
+ double now_ts,
+ double default_ttl,
+ unsigned int default_max_reuse);
+double rspamd_http_connection_keepalive_idle_timeout(struct rspamd_http_connection *conn,
+ double default_idle);
+
/**
* Creates an ordinary client connection using ready file descriptor (ignores proxy)
* @param ctx
ctx->config.user_agent = default_user_agent;
ctx->config.keepalive_interval = default_keepalive_interval;
ctx->config.server_hdr = default_server_hdr;
+ /* New defaults (disabled -> 0 to preserve legacy single-timeout behavior) */
+ ctx->config.connect_timeout = 0.0;
+ ctx->config.ssl_timeout = 0.0;
+ ctx->config.write_timeout = 0.0;
+ ctx->config.read_timeout = 0.0;
+ ctx->config.keepalive_pool_size = 0;
+ ctx->config.keepalive_connection_ttl = 0.0;
+ ctx->config.keepalive_idle_timeout = 0.0; /* fall back to keepalive_interval */
+ ctx->config.keepalive_max_reuse = 0; /* unlimited */
+ ctx->config.keepalive_eviction_policy = 1; /* LRU */
ctx->ups_ctx = ups_ctx;
if (cfg) {
if (http_proxy) {
ctx->config.http_proxy = ucl_object_tostring(http_proxy);
}
+
+ /* New staged timeouts */
+ const ucl_object_t *connect_timeout = ucl_object_lookup(client_obj, "connect_timeout");
+ if (connect_timeout) {
+ ctx->config.connect_timeout = ucl_object_todouble(connect_timeout);
+ }
+ const ucl_object_t *ssl_timeout = ucl_object_lookup(client_obj, "ssl_timeout");
+ if (ssl_timeout) {
+ ctx->config.ssl_timeout = ucl_object_todouble(ssl_timeout);
+ }
+ const ucl_object_t *write_timeout = ucl_object_lookup(client_obj, "write_timeout");
+ if (write_timeout) {
+ ctx->config.write_timeout = ucl_object_todouble(write_timeout);
+ }
+ const ucl_object_t *read_timeout = ucl_object_lookup(client_obj, "read_timeout");
+ if (read_timeout) {
+ ctx->config.read_timeout = ucl_object_todouble(read_timeout);
+ }
+
+ /* Keepalive/pooling */
+ const ucl_object_t *ka_pool_size = ucl_object_lookup(client_obj, "pool_size");
+ if (ka_pool_size) {
+ ctx->config.keepalive_pool_size = ucl_object_toint(ka_pool_size);
+ }
+ const ucl_object_t *ka_ttl = ucl_object_lookup(client_obj, "connection_ttl");
+ if (ka_ttl) {
+ ctx->config.keepalive_connection_ttl = ucl_object_todouble(ka_ttl);
+ }
+ const ucl_object_t *ka_idle = ucl_object_lookup(client_obj, "idle_timeout");
+ if (ka_idle) {
+ ctx->config.keepalive_idle_timeout = ucl_object_todouble(ka_idle);
+ }
+ const ucl_object_t *ka_reuse = ucl_object_lookup(client_obj, "max_reuse");
+ if (ka_reuse) {
+ ctx->config.keepalive_max_reuse = ucl_object_toint(ka_reuse);
+ }
+ const ucl_object_t *ka_evict = ucl_object_lookup(client_obj, "eviction_policy");
+ if (ka_evict) {
+ /* map string to int policy if string provided */
+ if (ucl_object_type(ka_evict) == UCL_STRING) {
+ const char *pol = ucl_object_tostring(ka_evict);
+ if (g_ascii_strcasecmp(pol, "lifo") == 0) {
+ ctx->config.keepalive_eviction_policy = 0;
+ }
+ else {
+ ctx->config.keepalive_eviction_policy = 1;
+ }
+ }
+ else {
+ ctx->config.keepalive_eviction_policy = ucl_object_toint(ka_evict);
+ }
+ }
}
server_obj = ucl_object_lookup(http_obj, "server");
int err;
socklen_t len = sizeof(int);
- cbd = g_queue_pop_head(conns);
+ if (ctx->config.keepalive_eviction_policy == 1) {
+ /* LRU: reuse the tail (oldest) */
+ GList *tail = g_queue_peek_tail_link(conns);
+ cbd = (struct rspamd_http_keepalive_cbdata *) tail->data;
+ g_queue_delete_link(conns, tail);
+ }
+ else {
+ /* LIFO: reuse the head (most recent) */
+ cbd = g_queue_pop_head(conns);
+ }
rspamd_ev_watcher_stop(ctx->event_loop, &cbd->ev);
conn = cbd->conn;
g_free(cbd);
return NULL;
}
+ /* Enforce ttl/reuse limits */
+ double now_ts = ev_now(ctx->event_loop);
+ if (!rspamd_http_connection_keepalive_is_valid(conn, now_ts,
+ ctx->config.keepalive_connection_ttl,
+ ctx->config.keepalive_max_reuse)) {
+ msg_debug_http_context("evict expired keepalive element %s (%s, ssl=%d)",
+ rspamd_inet_address_to_string_pretty(phk->addr),
+ phk->host,
+ (int) phk->is_ssl);
+ rspamd_http_connection_unref(conn);
+ return NULL;
+ }
+
+ /* Track reuse */
+ rspamd_http_connection_keepalive_note_reuse(conn);
+
msg_debug_http_context("reused keepalive element %s (%s, ssl=%d), %d connections queued",
rspamd_inet_address_to_string_pretty(phk->addr),
phk->host,
cbdata->ctx = ctx;
conn->finished = FALSE;
+ /* Enforce pool size (evict tail if exceeded) */
+ if (ctx->config.keepalive_pool_size > 0) {
+ while ((unsigned) cbdata->queue->length > ctx->config.keepalive_pool_size) {
+ GList *last = g_queue_peek_tail_link(cbdata->queue);
+ if (last) {
+ struct rspamd_http_keepalive_cbdata *to_evict = (struct rspamd_http_keepalive_cbdata *) last->data;
+ g_queue_delete_link(cbdata->queue, last);
+ rspamd_ev_watcher_stop(cbdata->ctx->event_loop, &to_evict->ev);
+ rspamd_http_connection_unref(to_evict->conn);
+ g_free(to_evict);
+ }
+ else {
+ break;
+ }
+ }
+ }
+
+ /* Note time of putting into pool */
+ rspamd_http_connection_keepalive_note_put(conn, ev_now(event_loop));
+
rspamd_ev_watcher_init(&cbdata->ev, conn->fd, EV_READ,
rspamd_http_keepalive_handler,
cbdata);
- rspamd_ev_watcher_start(event_loop, &cbdata->ev, timeout);
+ /* Idle timeout override if provided */
+ double idle_to = rspamd_http_connection_keepalive_idle_timeout(conn, ctx->config.keepalive_idle_timeout > 0 ? ctx->config.keepalive_idle_timeout : timeout);
+ rspamd_ev_watcher_start(event_loop, &cbdata->ev, idle_to);
msg_debug_http_context("push keepalive element %s (%s), %d connections queued, %.1f timeout",
rspamd_inet_address_to_string_pretty(cbdata->conn->keepalive_hash_key->addr),
cbdata->conn->keepalive_hash_key->host,
cbdata->queue->length,
- timeout);
-}
\ No newline at end of file
+ idle_to);
+}
const char *user_agent;
const char *http_proxy;
const char *server_hdr;
+ /* Client-side staged timeouts (seconds) */
+ double connect_timeout; /* TCP connect */
+ double ssl_timeout; /* SSL handshake */
+ double write_timeout; /* Request write */
+ double read_timeout; /* Response read */
+ /* Keep-alive/pool tuning */
+ unsigned int keepalive_pool_size; /* max conns per key */
+ double keepalive_connection_ttl; /* absolute TTL */
+ double keepalive_idle_timeout; /* idle timeout */
+ unsigned int keepalive_max_reuse; /* reuse limit */
+ int keepalive_eviction_policy; /* 0=LIFO,1=LRU */
};
/**
}
static gboolean
-lua_http_make_connection(struct lua_http_cbdata *cbd)
+lua_http_make_connection(lua_State *L, struct lua_http_cbdata *cbd)
{
rspamd_inet_address_set_port(cbd->addr, cbd->msg->port);
unsigned http_opts = RSPAMD_HTTP_CLIENT_SIMPLE;
cbd->flags |= RSPAMD_LUA_HTTP_FLAG_RESOLVED;
}
+ /* Optional per-request tuning from table (if present) */
+ if (lua_type(L, 1) == LUA_TTABLE) {
+ double connect_timeout = 0, ssl_timeout = 0, write_timeout = 0, read_timeout = 0;
+ double connection_ttl = 0, idle_timeout = 0;
+ unsigned int max_reuse = 0;
+ lua_pushstring(L, "connect_timeout");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) connect_timeout = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_pushstring(L, "ssl_timeout");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) ssl_timeout = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_pushstring(L, "write_timeout");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) write_timeout = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_pushstring(L, "read_timeout");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) read_timeout = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ rspamd_http_connection_set_timeouts(cbd->conn, connect_timeout, ssl_timeout, write_timeout, read_timeout);
+ lua_pushstring(L, "connection_ttl");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) connection_ttl = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_pushstring(L, "idle_timeout");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) idle_timeout = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_pushstring(L, "max_reuse");
+ lua_gettable(L, 1);
+ if (lua_type(L, -1) == LUA_TNUMBER) max_reuse = lua_tointeger(L, -1);
+ lua_pop(L, 1);
+ rspamd_http_connection_set_keepalive_tuning(cbd->conn, connection_ttl, idle_timeout, max_reuse);
+ }
+
if (cbd->task) {
cbd->conn->log_tag = cbd->task->task_pool->tag.uid;
}
else {
REF_RETAIN(cbd);
- if (!lua_http_make_connection(cbd)) {
+ if (!lua_http_make_connection(NULL, cbd)) {
lua_http_push_error(cbd, "unable to make connection to the host");
if (cbd->ref.refcount > 1) {
* @param {boolean} keepalive enable keep-alive pool
* @param {string} user for HTTP authentication
* @param {string} password for HTTP authentication, only if "user" present
+ * @param {number} connect_timeout optional TCP connect timeout (seconds)
+ * @param {number} ssl_timeout optional SSL handshake timeout (seconds)
+ * @param {number} write_timeout optional request write timeout (seconds)
+ * @param {number} read_timeout optional response read timeout (seconds)
+ * @param {number} connection_ttl optional absolute keep-alive connection TTL (seconds)
+ * @param {number} idle_timeout optional keep-alive idle timeout override (seconds)
+ * @param {number} max_reuse optional keep-alive max reuse count per connection
* @return {boolean} `true`, in **async** mode, if a request has been successfully scheduled. If this value is `false` then some error occurred, the callback thus will not be called.
* @return In **sync** mode `string|nil, nil|table` In sync mode error message if any and response as table: `int` _code_, `string` _content_ and `table` _headers_ (header -> value)
*/
gboolean ret;
REF_RETAIN(cbd);
- ret = lua_http_make_connection(cbd);
+ ret = lua_http_make_connection(L, cbd);
if (!ret) {
if (cbd->up) {