]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Add support of granular timeouts to plugins and maps
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 11 Sep 2025 14:25:43 +0000 (15:25 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 11 Sep 2025 14:25:43 +0000 (15:25 +0100)
lualib/lua_clickhouse.lua
lualib/plugins/neural/providers/llm.lua
src/libserver/maps/map.c
src/libserver/maps/map_private.h
src/plugins/lua/bimi.lua
src/plugins/lua/contextal.lua
src/plugins/lua/gpt.lua
src/plugins/lua/metadata_exporter.lua

index 28366d28a90cfd918a650b09fedab0953ce362ba..891331a88e27c5106db506bf0f4c3e1fc99ed5b3 100644 (file)
@@ -13,7 +13,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-]]--
+]] --
 
 --[[[
 -- @module lua_clickhouse
@@ -75,7 +75,6 @@ end
 
 -- Converts a row into TSV, taking extra care about arrays
 local function row_to_tsv(row)
-
   for i, elt in ipairs(row) do
     local t = type(elt)
     if t == 'table' then
@@ -185,8 +184,8 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
         fail_cb(params, err_message, data)
       else
         rspamd_logger.errx(params.log_obj,
-            "request failed on clickhouse server %s: %s",
-            ip_addr, err_message)
+          "request failed on clickhouse server %s: %s",
+          ip_addr, err_message)
       end
       upstream:fail()
     else
@@ -198,8 +197,8 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
           ok_cb(params, rows)
         else
           lua_util.debugm(N, params.log_obj,
-              "http_select_cb ok: %s, %s, %s, %s", err_message, code,
-              data:gsub('[\n%s]+', ' '), _)
+            "http_select_cb ok: %s, %s, %s, %s", err_message, code,
+            data:gsub('[\n%s]+', ' '), _)
         end
       else
         if fail_cb then
@@ -207,8 +206,8 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
         else
           local ip_addr = upstream:get_addr():to_string(true)
           rspamd_logger.errx(params.log_obj,
-              "request failed on clickhouse server %s: %s",
-              ip_addr, 'failed to parse reply')
+            "request failed on clickhouse server %s: %s",
+            ip_addr, 'failed to parse reply')
         end
       end
     end
@@ -230,8 +229,8 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
         fail_cb(params, err_message, data)
       else
         rspamd_logger.errx(params.log_obj,
-            "request failed on clickhouse server %s: %s",
-            ip_addr, err_message)
+          "request failed on clickhouse server %s: %s",
+          ip_addr, err_message)
       end
       upstream:fail()
     else
@@ -245,11 +244,10 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
         else
           ok_cb(params, parsed)
         end
-
       else
         lua_util.debugm(N, params.log_obj,
-            "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
-            data:gsub('[\n%s]+', ' '), _)
+          "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
+          data:gsub('[\n%s]+', ' '), _)
       end
     end
   end
@@ -294,6 +292,11 @@ exports.select = function(upstream, settings, params, query, ok_cb, fail_cb, row
   http_params.body = query
   http_params.log_obj = params.task or params.config
   http_params.opaque_body = true
+  -- staged timeouts
+  http_params.connect_timeout = settings.connect_timeout
+  http_params.ssl_timeout = settings.ssl_timeout
+  http_params.write_timeout = settings.write_timeout
+  http_params.read_timeout = settings.read_timeout
 
   lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
 
@@ -305,7 +308,7 @@ exports.select = function(upstream, settings, params, query, ok_cb, fail_cb, row
     local ip_addr = upstream:get_addr():to_string(true)
     local database = settings.database or 'default'
     http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
-        connect_prefix, ip_addr, escape_spaces(database))
+      connect_prefix, ip_addr, escape_spaces(database))
   end
 
   return rspamd_http.request(http_params)
@@ -349,6 +352,11 @@ exports.select_sync = function(upstream, settings, params, query, row_cb)
   http_params.body = query
   http_params.log_obj = params.task or params.config
   http_params.opaque_body = true
+  -- staged timeouts
+  http_params.connect_timeout = settings.connect_timeout
+  http_params.ssl_timeout = settings.ssl_timeout
+  http_params.write_timeout = settings.write_timeout
+  http_params.read_timeout = settings.read_timeout
 
   lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
 
@@ -360,7 +368,7 @@ exports.select_sync = function(upstream, settings, params, query, row_cb)
     local ip_addr = upstream:get_addr():to_string(true)
     local database = settings.database or 'default'
     http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
-        connect_prefix, ip_addr, escape_spaces(database))
+      connect_prefix, ip_addr, escape_spaces(database))
   end
 
   local err, response = rspamd_http.request(http_params)
@@ -414,6 +422,11 @@ exports.insert = function(upstream, settings, params, query, rows,
   http_params.method = 'POST'
   http_params.body = { rspamd_text.fromtable(rows, '\n'), '\n' }
   http_params.log_obj = params.task or params.config
+  -- staged timeouts
+  http_params.connect_timeout = settings.connect_timeout
+  http_params.ssl_timeout = settings.ssl_timeout
+  http_params.write_timeout = settings.write_timeout
+  http_params.read_timeout = settings.read_timeout
 
   if not http_params.url then
     local connect_prefix = "http://"
@@ -423,10 +436,10 @@ exports.insert = function(upstream, settings, params, query, rows,
     local ip_addr = upstream:get_addr():to_string(true)
     local database = settings.database or 'default'
     http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
-        connect_prefix,
-        ip_addr,
-        escape_spaces(database),
-        escape_spaces(query))
+      connect_prefix,
+      ip_addr,
+      escape_spaces(database),
+      escape_spaces(query))
   end
 
   return rspamd_http.request(http_params)
@@ -468,6 +481,11 @@ exports.generic = function(upstream, settings, params, query,
   http_params.password = settings.password
   http_params.log_obj = params.task or params.config
   http_params.body = query
+  -- staged timeouts
+  http_params.connect_timeout = settings.connect_timeout
+  http_params.ssl_timeout = settings.ssl_timeout
+  http_params.write_timeout = settings.write_timeout
+  http_params.read_timeout = settings.read_timeout
 
   if not http_params.url then
     local connect_prefix = "http://"
@@ -477,7 +495,7 @@ exports.generic = function(upstream, settings, params, query,
     local ip_addr = upstream:get_addr():to_string(true)
     local database = settings.database or 'default'
     http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
-        connect_prefix, ip_addr, escape_spaces(database))
+      connect_prefix, ip_addr, escape_spaces(database))
   end
 
   return rspamd_http.request(http_params)
@@ -515,6 +533,11 @@ exports.generic_sync = function(upstream, settings, params, query)
   http_params.password = settings.password
   http_params.log_obj = params.task or params.config
   http_params.body = query
+  -- staged timeouts
+  http_params.connect_timeout = settings.connect_timeout
+  http_params.ssl_timeout = settings.ssl_timeout
+  http_params.write_timeout = settings.write_timeout
+  http_params.read_timeout = settings.read_timeout
 
   if not http_params.url then
     local connect_prefix = "http://"
@@ -524,7 +547,7 @@ exports.generic_sync = function(upstream, settings, params, query)
     local ip_addr = upstream:get_addr():to_string(true)
     local database = settings.database or 'default'
     http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
-        connect_prefix, ip_addr, escape_spaces(database))
+      connect_prefix, ip_addr, escape_spaces(database))
   end
 
   local err, response = rspamd_http.request(http_params)
index 1bc1063aae8c4868f22ab89d05edbc1755a9f911..17fc0c9f3e4869b019497f8e09dabf503a11d28b 100644 (file)
@@ -45,6 +45,11 @@ local function compose_llm_settings(pcfg)
     cache_prefix = pcfg.cache_prefix or 'neural_llm',
     cache_hash_len = pcfg.cache_hash_len or 32,
     cache_use_hashing = (pcfg.cache_use_hashing ~= false),
+    -- Optional staged timeouts (inherit from global gpt if present)
+    connect_timeout = pcfg.connect_timeout or gpt_settings.connect_timeout,
+    ssl_timeout = pcfg.ssl_timeout or gpt_settings.ssl_timeout,
+    write_timeout = pcfg.write_timeout or gpt_settings.write_timeout,
+    read_timeout = pcfg.read_timeout or gpt_settings.read_timeout,
   }
 end
 
@@ -182,6 +187,11 @@ neural_common.register_provider('llm', {
         use_gzip = true,
         keepalive = true,
         callback = http_cb,
+        -- staged timeouts
+        connect_timeout = llm.connect_timeout,
+        ssl_timeout = llm.ssl_timeout,
+        write_timeout = llm.write_timeout,
+        read_timeout = llm.read_timeout,
       }
 
       rspamd_http.request(http_params)
index 6de694eb3ccf90835f8211b361cc48ab786c2dcf..1910bd61400fd2271ab1a99ffc7a0c65f10f50d6 100644 (file)
@@ -1425,6 +1425,21 @@ rspamd_map_dns_callback(struct rdns_reply *reply, void *arg)
                                                                                                          cbd->addr);
 
                if (cbd->conn != NULL) {
+                       /* Apply optional staged timeouts and keepalive tuning */
+                       if (cbd->data->connect_timeout > 0 || cbd->data->ssl_timeout > 0 ||
+                               cbd->data->write_timeout > 0 || cbd->data->read_timeout > 0) {
+                               rspamd_http_connection_set_timeouts(cbd->conn,
+                                                                                                       cbd->data->connect_timeout,
+                                                                                                       cbd->data->ssl_timeout,
+                                                                                                       cbd->data->write_timeout,
+                                                                                                       cbd->data->read_timeout);
+                       }
+                       if (cbd->data->connection_ttl > 0 || cbd->data->idle_timeout > 0 || cbd->data->max_reuse > 0) {
+                               rspamd_http_connection_set_keepalive_tuning(cbd->conn,
+                                                                                                                       cbd->data->connection_ttl,
+                                                                                                                       cbd->data->idle_timeout,
+                                                                                                                       cbd->data->max_reuse);
+                       }
                        write_http_request(cbd);
                }
                else {
@@ -1982,7 +1997,21 @@ check:
                        addr);
 
                if (cbd->conn != NULL) {
-                       cbd->stage = http_map_http_conn;
+                       /* Apply optional staged timeouts and keepalive tuning */
+                       if (cbd->data->connect_timeout > 0 || cbd->data->ssl_timeout > 0 ||
+                               cbd->data->write_timeout > 0 || cbd->data->read_timeout > 0) {
+                               rspamd_http_connection_set_timeouts(cbd->conn,
+                                                                                                       cbd->data->connect_timeout,
+                                                                                                       cbd->data->ssl_timeout,
+                                                                                                       cbd->data->write_timeout,
+                                                                                                       cbd->data->read_timeout);
+                       }
+                       if (cbd->data->connection_ttl > 0 || cbd->data->idle_timeout > 0 || cbd->data->max_reuse > 0) {
+                               rspamd_http_connection_set_keepalive_tuning(cbd->conn,
+                                                                                                                       cbd->data->connection_ttl,
+                                                                                                                       cbd->data->idle_timeout,
+                                                                                                                       cbd->data->max_reuse);
+                       }
                        write_http_request(cbd);
                        cbd->addr = addr;
                        MAP_RELEASE(cbd, "http_callback_data");
@@ -2881,6 +2910,39 @@ rspamd_map_parse_backend(struct rspamd_config *cfg, const char *map_line)
                        }
                }
 
+               /* Parse optional HTTP timeouts and keepalive tuning from global options -> maps.* block */
+               {
+                       const ucl_object_t *maps_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "maps");
+                       const ucl_object_t *opt = NULL;
+                       if (maps_obj && ucl_object_type(maps_obj) == UCL_OBJECT) {
+                               /* Per-URL overrides: allow stanza keyed by exact URL */
+                               const ucl_object_t *url_obj = ucl_object_lookup(maps_obj, bk->uri);
+                               const ucl_object_t *src = url_obj ? url_obj : maps_obj;
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "connect_timeout", "connect-timeout", NULL);
+                               if (opt) hdata->connect_timeout = ucl_object_todouble(opt);
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "ssl_timeout", "ssl-timeout", NULL);
+                               if (opt) hdata->ssl_timeout = ucl_object_todouble(opt);
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "write_timeout", "write-timeout", NULL);
+                               if (opt) hdata->write_timeout = ucl_object_todouble(opt);
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "read_timeout", "read-timeout", NULL);
+                               if (opt) hdata->read_timeout = ucl_object_todouble(opt);
+                               /* Keepalive tuning */
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "connection_ttl", "connection-ttl", "keepalive_ttl", NULL);
+                               if (opt) hdata->connection_ttl = ucl_object_todouble(opt);
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "idle_timeout", "idle-timeout", "keepalive_idle", NULL);
+                               if (opt) hdata->idle_timeout = ucl_object_todouble(opt);
+                               opt = ucl_object_lookup_any(src,
+                                                                                       "max_reuse", "max-reuse", "keepalive_max_reuse", NULL);
+                               if (opt) hdata->max_reuse = (unsigned int) ucl_object_toint(opt);
+                       }
+               }
+
                hdata->cache = rspamd_mempool_alloc0_shared(cfg->cfg_pool,
                                                                                                        sizeof(*hdata->cache));
 
index 65df8d7f5a413fa036d9cc687de1b3bfafdc01a4..fba882160193b5e5dfe4ab9d28b74bc4f299d79e 100644 (file)
@@ -118,6 +118,15 @@ struct http_map_data {
        gboolean request_sent;
        uint64_t gen;
        uint16_t port;
+       /* Optional per-map HTTP staged timeouts */
+       ev_tstamp connect_timeout;
+       ev_tstamp ssl_timeout;
+       ev_tstamp write_timeout;
+       ev_tstamp read_timeout;
+       /* Optional keepalive tuning */
+       double connection_ttl;
+       double idle_timeout;
+       unsigned int max_reuse;
 };
 
 struct static_map_data {
index bb848b1fe39d4672fb65ea15b6f6cc446497499b..8f375c4da4700ea11cd05c09b7fd40524b942f1a 100644 (file)
@@ -42,6 +42,11 @@ local settings_schema = lua_redis.enrich_schema({
   redis_min_expiry = ts.number + ts.string / lua_util.parse_time_interval,
   redis_prefix = ts.string,
   enabled = ts.boolean:is_optional(),
+  -- New optional staged timeouts for HTTP helper
+  helper_connect_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
+  helper_ssl_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
+  helper_write_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
+  helper_read_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
 })
 
 local function check_dmarc_policy(task)
@@ -189,11 +194,11 @@ local function make_helper_request(task, domain, record, redis_server)
         end
 
         ret, _, upstream = lua_redis.redis_make_request(task,
-          redis_params,   -- connect params
-          redis_key,      -- hash key
-          true,           -- is write
-          redis_set_cb,   --callback
-          'PSETEX',       -- command
+          redis_params, -- connect params
+          redis_key,    -- hash key
+          true,         -- is write
+          redis_set_cb, --callback
+          'PSETEX',     -- command
           { redis_key, tostring(settings.redis_min_expiry * 1000.0),
             ucl.to_format(d, "json-compact") })
 
@@ -235,6 +240,11 @@ local function make_helper_request(task, domain, record, redis_server)
     url = helper_url,
     callback = http_helper_callback,
     keepalive = true,
+    -- staged timeouts if configured
+    connect_timeout = settings.helper_connect_timeout,
+    ssl_timeout = settings.helper_ssl_timeout,
+    write_timeout = settings.helper_write_timeout,
+    read_timeout = settings.helper_read_timeout,
   })
 end
 
@@ -282,11 +292,11 @@ local function check_bimi_vmc(task, domain, record)
 
   -- We first check Redis and then try to use helper
   ret, _, upstream = lua_redis.redis_make_request(task,
-    redis_params,      -- connect params
-    redis_key,         -- hash key
-    false,             -- is write
-    redis_cached_cb,   --callback
-    'GET',             -- command
+    redis_params,    -- connect params
+    redis_key,       -- hash key
+    false,           -- is write
+    redis_cached_cb, --callback
+    'GET',           -- command
     { redis_key })
 
   if not ret then
index e29c21645ac298c09690a6ab9e0e2fe1c427ef39..b613733890e486fb33432a2525309773228f90a1 100644 (file)
@@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-]]--
+]] --
 
 local E = {}
 local N = 'contextal'
@@ -58,6 +58,11 @@ local config_schema = lua_redis.enrich_schema {
   http_timeout = ts.number:is_optional(),
   request_ttl = ts.number:is_optional(),
   submission_symbol = ts.string:is_optional(),
+  -- staged timeouts
+  connect_timeout = ts.number:is_optional(),
+  ssl_timeout = ts.number:is_optional(),
+  write_timeout = ts.number:is_optional(),
+  read_timeout = ts.number:is_optional(),
 }
 
 local settings = {
@@ -104,11 +109,11 @@ local function process_actions(task, obj, is_cached)
 
   local cache_obj
   if (obj[1] or E).actions then
-    cache_obj = {[1] = {["actions"] = obj[1].actions}}
+    cache_obj = { [1] = { ["actions"] = obj[1].actions } }
   else
     local work_id = task:get_mempool():get_variable('contextal_work_id', 'string')
     if work_id then
-      cache_obj = {[1] = {["work_id"] = work_id}}
+      cache_obj = { [1] = { ["work_id"] = work_id } }
     else
       rspamd_logger.err(task, 'no work id found in mempool')
       return
@@ -116,9 +121,9 @@ local function process_actions(task, obj, is_cached)
   end
 
   redis_cache.cache_set(task,
-      task:get_digest(),
-      cache_obj,
-      cache_context)
+    task:get_digest(),
+    cache_obj,
+    cache_context)
 
   maybe_defer(task, obj)
 end
@@ -167,18 +172,22 @@ local function action_cb(task)
   end
 
   rspamd_http.request({
-      task = task,
-      url = settings.actions_url .. work_id,
-      callback = http_callback,
-      timeout = settings.http_timeout,
-      gzip = settings.gzip,
-      keepalive = settings.keepalive,
-      no_ssl_verify = settings.no_ssl_verify,
+    task = task,
+    url = settings.actions_url .. work_id,
+    callback = http_callback,
+    timeout = settings.http_timeout,
+    gzip = settings.gzip,
+    keepalive = settings.keepalive,
+    no_ssl_verify = settings.no_ssl_verify,
+    -- staged timeouts
+    connect_timeout = settings.connect_timeout,
+    ssl_timeout = settings.ssl_timeout,
+    write_timeout = settings.write_timeout,
+    read_timeout = settings.read_timeout,
   })
 end
 
 local function submit(task)
-
   local function http_callback(err, code, body, hdrs)
     if err then
       rspamd_logger.err(task, 'http error: %s', err)
@@ -203,33 +212,38 @@ local function submit(task)
       task:get_mempool():set_variable('contextal_work_id', work_id)
     end
     task:insert_result(settings.submission_symbol, 1.0,
-        string.format('work_id=%s', work_id or 'nil'))
+      string.format('work_id=%s', work_id or 'nil'))
     if wait_request_ttl then
       task:add_timer(settings.request_ttl, action_cb)
     end
   end
 
   local req = {
-    object_data = {['data'] = task:get_content()},
+    object_data = { ['data'] = task:get_content() },
   }
   if settings.request_ttl then
-    req.ttl = {['data'] = tostring(settings.request_ttl)}
+    req.ttl = { ['data'] = tostring(settings.request_ttl) }
   end
   if settings.max_recursion then
-    req.maxrec = {['data'] = tostring(settings.max_recursion)}
+    req.maxrec = { ['data'] = tostring(settings.max_recursion) }
   end
   rspamd_http.request({
-      task = task,
-      url = settings.submit_url,
-      body = lua_util.table_to_multipart_body(req, static_boundary),
-      callback = http_callback,
-      headers = {
-        ['Content-Type'] = string.format('multipart/form-data; boundary="%s"', static_boundary)
-      },
-      timeout = settings.http_timeout,
-      gzip = settings.gzip,
-      keepalive = settings.keepalive,
-      no_ssl_verify = settings.no_ssl_verify,
+    task = task,
+    url = settings.submit_url,
+    body = lua_util.table_to_multipart_body(req, static_boundary),
+    callback = http_callback,
+    headers = {
+      ['Content-Type'] = string.format('multipart/form-data; boundary="%s"', static_boundary)
+    },
+    timeout = settings.http_timeout,
+    gzip = settings.gzip,
+    keepalive = settings.keepalive,
+    no_ssl_verify = settings.no_ssl_verify,
+    -- staged timeouts
+    connect_timeout = settings.connect_timeout,
+    ssl_timeout = settings.ssl_timeout,
+    write_timeout = settings.write_timeout,
+    read_timeout = settings.read_timeout,
   })
 end
 
@@ -244,11 +258,11 @@ end
 local function submit_cb(task)
   if cache_context then
     redis_cache.cache_get(task,
-        task:get_digest(),
-        cache_context,
-        settings.cache_timeout,
-        submit,
-        cache_hit
+      task:get_digest(),
+      cache_context,
+      settings.cache_timeout,
+      submit,
+      cache_hit
     )
   else
     submit(task)
@@ -293,10 +307,10 @@ end
 redis_params = lua_redis.parse_redis_server(N)
 if redis_params then
   cache_context = redis_cache.create_cache_context(redis_params, {
-      cache_prefix = settings.cache_prefix,
-      cache_ttl = settings.cache_ttl,
-      cache_format = 'json',
-      cache_use_hashing = false
+    cache_prefix = settings.cache_prefix,
+    cache_ttl = settings.cache_ttl,
+    cache_format = 'json',
+    cache_use_hashing = false
   })
 end
 
index b9d8f0d53aabe49f4258ffeb75e05d45d73e0d2b..3418478674b4770f06e68e5bd0a988978f78668b 100644 (file)
@@ -131,6 +131,11 @@ local settings = {
     }
   },
   timeout = 10,
+  -- Optional staged timeouts
+  connect_timeout = nil,
+  ssl_timeout = nil,
+  write_timeout = nil,
+  read_timeout = nil,
   prompt = nil,
   condition = nil,
   autolearn = false,
@@ -744,6 +749,11 @@ local function openai_check(task, content, sel_part)
       task = task,
       upstream = upstream,
       use_gzip = true,
+      -- staged timeouts
+      connect_timeout = settings.connect_timeout,
+      ssl_timeout = settings.ssl_timeout,
+      write_timeout = settings.write_timeout,
+      read_timeout = settings.read_timeout,
     }
 
     if not rspamd_http.request(http_params) then
@@ -846,6 +856,11 @@ local function ollama_check(task, content, sel_part)
       task = task,
       upstream = upstream,
       use_gzip = true,
+      -- staged timeouts
+      connect_timeout = settings.connect_timeout,
+      ssl_timeout = settings.ssl_timeout,
+      write_timeout = settings.write_timeout,
+      read_timeout = settings.read_timeout,
     }
 
     rspamd_http.request(http_params)
index 70f9540d1d5a4ac2fecd633288c9df4f21d3f789..5599cbf4b214a692bfb06cade10bc04e15f238fc 100644 (file)
@@ -13,7 +13,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-]]--
+]] --
 
 if confighelp then
   return
@@ -164,8 +164,8 @@ local function get_general_metadata(task, flatten, no_content)
   scan_real = math.floor(scan_real * 1000)
   if scan_real < 0 then
     rspamd_logger.messagex(task,
-        'clock skew detected for message: %s ms real sca time (reset to 0)',
-        scan_real)
+      'clock skew detected for message: %s ms real sca time (reset to 0)',
+      scan_real)
     scan_real = 0
   end
 
@@ -286,18 +286,18 @@ local pushers = {
     local function redis_pub_cb(err)
       if err then
         rspamd_logger.errx(task, 'got error %s when publishing on server %s',
-            err, upstream:get_addr())
+          err, upstream:get_addr())
         return maybe_defer(task, rule)
       end
       return true
     end
     ret, _, upstream = lua_redis.redis_make_request(task,
-        redis_params, -- connect params
-        nil, -- hash key
-        true, -- is write
-        redis_pub_cb, --callback
-        'PUBLISH', -- command
-        { rule.channel, formatted } -- arguments
+      redis_params,                 -- connect params
+      nil,                          -- hash key
+      true,                         -- is write
+      redis_pub_cb,                 --callback
+      'PUBLISH',                    -- command
+      { rule.channel, formatted }   -- arguments
     )
     if not ret then
       rspamd_logger.errx(task, 'error connecting to redis')
@@ -346,6 +346,11 @@ local pushers = {
       gzip = rule.gzip or settings.gzip,
       keepalive = rule.keepalive or settings.keepalive,
       no_ssl_verify = rule.no_ssl_verify or settings.no_ssl_verify,
+      -- staged timeouts
+      connect_timeout = rule.connect_timeout or settings.connect_timeout,
+      ssl_timeout = rule.ssl_timeout or settings.ssl_timeout,
+      write_timeout = rule.write_timeout or settings.write_timeout,
+      read_timeout = rule.read_timeout or settings.read_timeout,
     })
   end,
   send_mail = function(task, formatted, rule, extra)