WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-]]--
+]] --
--[[[
-- @module lua_clickhouse
-- Converts a row into TSV, taking extra care about arrays
local function row_to_tsv(row)
-
for i, elt in ipairs(row) do
local t = type(elt)
if t == 'table' then
fail_cb(params, err_message, data)
else
rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
end
upstream:fail()
else
ok_cb(params, rows)
else
lua_util.debugm(N, params.log_obj,
- "http_select_cb ok: %s, %s, %s, %s", err_message, code,
- data:gsub('[\n%s]+', ' '), _)
+ "http_select_cb ok: %s, %s, %s, %s", err_message, code,
+ data:gsub('[\n%s]+', ' '), _)
end
else
if fail_cb then
else
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, 'failed to parse reply')
+ "request failed on clickhouse server %s: %s",
+ ip_addr, 'failed to parse reply')
end
end
end
fail_cb(params, err_message, data)
else
rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
end
upstream:fail()
else
else
ok_cb(params, parsed)
end
-
else
lua_util.debugm(N, params.log_obj,
- "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
- data:gsub('[\n%s]+', ' '), _)
+ "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
+ data:gsub('[\n%s]+', ' '), _)
end
end
end
http_params.body = query
http_params.log_obj = params.task or params.config
http_params.opaque_body = true
+ -- staged timeouts
+ http_params.connect_timeout = settings.connect_timeout
+ http_params.ssl_timeout = settings.ssl_timeout
+ http_params.write_timeout = settings.write_timeout
+ http_params.read_timeout = settings.read_timeout
lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
- connect_prefix, ip_addr, escape_spaces(database))
+ connect_prefix, ip_addr, escape_spaces(database))
end
return rspamd_http.request(http_params)
http_params.body = query
http_params.log_obj = params.task or params.config
http_params.opaque_body = true
+ -- staged timeouts
+ http_params.connect_timeout = settings.connect_timeout
+ http_params.ssl_timeout = settings.ssl_timeout
+ http_params.write_timeout = settings.write_timeout
+ http_params.read_timeout = settings.read_timeout
lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
- connect_prefix, ip_addr, escape_spaces(database))
+ connect_prefix, ip_addr, escape_spaces(database))
end
local err, response = rspamd_http.request(http_params)
http_params.method = 'POST'
http_params.body = { rspamd_text.fromtable(rows, '\n'), '\n' }
http_params.log_obj = params.task or params.config
+ -- staged timeouts
+ http_params.connect_timeout = settings.connect_timeout
+ http_params.ssl_timeout = settings.ssl_timeout
+ http_params.write_timeout = settings.write_timeout
+ http_params.read_timeout = settings.read_timeout
if not http_params.url then
local connect_prefix = "http://"
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
- connect_prefix,
- ip_addr,
- escape_spaces(database),
- escape_spaces(query))
+ connect_prefix,
+ ip_addr,
+ escape_spaces(database),
+ escape_spaces(query))
end
return rspamd_http.request(http_params)
http_params.password = settings.password
http_params.log_obj = params.task or params.config
http_params.body = query
+ -- staged timeouts
+ http_params.connect_timeout = settings.connect_timeout
+ http_params.ssl_timeout = settings.ssl_timeout
+ http_params.write_timeout = settings.write_timeout
+ http_params.read_timeout = settings.read_timeout
if not http_params.url then
local connect_prefix = "http://"
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
- connect_prefix, ip_addr, escape_spaces(database))
+ connect_prefix, ip_addr, escape_spaces(database))
end
return rspamd_http.request(http_params)
http_params.password = settings.password
http_params.log_obj = params.task or params.config
http_params.body = query
+ -- staged timeouts
+ http_params.connect_timeout = settings.connect_timeout
+ http_params.ssl_timeout = settings.ssl_timeout
+ http_params.write_timeout = settings.write_timeout
+ http_params.read_timeout = settings.read_timeout
if not http_params.url then
local connect_prefix = "http://"
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
- connect_prefix, ip_addr, escape_spaces(database))
+ connect_prefix, ip_addr, escape_spaces(database))
end
local err, response = rspamd_http.request(http_params)
cache_prefix = pcfg.cache_prefix or 'neural_llm',
cache_hash_len = pcfg.cache_hash_len or 32,
cache_use_hashing = (pcfg.cache_use_hashing ~= false),
+ -- Optional staged timeouts (inherit from global gpt if present)
+ connect_timeout = pcfg.connect_timeout or gpt_settings.connect_timeout,
+ ssl_timeout = pcfg.ssl_timeout or gpt_settings.ssl_timeout,
+ write_timeout = pcfg.write_timeout or gpt_settings.write_timeout,
+ read_timeout = pcfg.read_timeout or gpt_settings.read_timeout,
}
end
use_gzip = true,
keepalive = true,
callback = http_cb,
+ -- staged timeouts
+ connect_timeout = llm.connect_timeout,
+ ssl_timeout = llm.ssl_timeout,
+ write_timeout = llm.write_timeout,
+ read_timeout = llm.read_timeout,
}
rspamd_http.request(http_params)
cbd->addr);
if (cbd->conn != NULL) {
+ /* Apply optional staged timeouts and keepalive tuning */
+ if (cbd->data->connect_timeout > 0 || cbd->data->ssl_timeout > 0 ||
+ cbd->data->write_timeout > 0 || cbd->data->read_timeout > 0) {
+ rspamd_http_connection_set_timeouts(cbd->conn,
+ cbd->data->connect_timeout,
+ cbd->data->ssl_timeout,
+ cbd->data->write_timeout,
+ cbd->data->read_timeout);
+ }
+ if (cbd->data->connection_ttl > 0 || cbd->data->idle_timeout > 0 || cbd->data->max_reuse > 0) {
+ rspamd_http_connection_set_keepalive_tuning(cbd->conn,
+ cbd->data->connection_ttl,
+ cbd->data->idle_timeout,
+ cbd->data->max_reuse);
+ }
write_http_request(cbd);
}
else {
addr);
if (cbd->conn != NULL) {
- cbd->stage = http_map_http_conn;
+ /* Apply optional staged timeouts and keepalive tuning */
+ if (cbd->data->connect_timeout > 0 || cbd->data->ssl_timeout > 0 ||
+ cbd->data->write_timeout > 0 || cbd->data->read_timeout > 0) {
+ rspamd_http_connection_set_timeouts(cbd->conn,
+ cbd->data->connect_timeout,
+ cbd->data->ssl_timeout,
+ cbd->data->write_timeout,
+ cbd->data->read_timeout);
+ }
+ if (cbd->data->connection_ttl > 0 || cbd->data->idle_timeout > 0 || cbd->data->max_reuse > 0) {
+ rspamd_http_connection_set_keepalive_tuning(cbd->conn,
+ cbd->data->connection_ttl,
+ cbd->data->idle_timeout,
+ cbd->data->max_reuse);
+ }
write_http_request(cbd);
cbd->addr = addr;
MAP_RELEASE(cbd, "http_callback_data");
}
}
+ /* Parse optional HTTP timeouts and keepalive tuning from global options -> maps.* block */
+ {
+ const ucl_object_t *maps_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "maps");
+ const ucl_object_t *opt = NULL;
+ if (maps_obj && ucl_object_type(maps_obj) == UCL_OBJECT) {
+ /* Per-URL overrides: allow stanza keyed by exact URL */
+ const ucl_object_t *url_obj = ucl_object_lookup(maps_obj, bk->uri);
+ const ucl_object_t *src = url_obj ? url_obj : maps_obj;
+ opt = ucl_object_lookup_any(src,
+ "connect_timeout", "connect-timeout", NULL);
+ if (opt) hdata->connect_timeout = ucl_object_todouble(opt);
+ opt = ucl_object_lookup_any(src,
+ "ssl_timeout", "ssl-timeout", NULL);
+ if (opt) hdata->ssl_timeout = ucl_object_todouble(opt);
+ opt = ucl_object_lookup_any(src,
+ "write_timeout", "write-timeout", NULL);
+ if (opt) hdata->write_timeout = ucl_object_todouble(opt);
+ opt = ucl_object_lookup_any(src,
+ "read_timeout", "read-timeout", NULL);
+ if (opt) hdata->read_timeout = ucl_object_todouble(opt);
+ /* Keepalive tuning */
+ opt = ucl_object_lookup_any(src,
+ "connection_ttl", "connection-ttl", "keepalive_ttl", NULL);
+ if (opt) hdata->connection_ttl = ucl_object_todouble(opt);
+ opt = ucl_object_lookup_any(src,
+ "idle_timeout", "idle-timeout", "keepalive_idle", NULL);
+ if (opt) hdata->idle_timeout = ucl_object_todouble(opt);
+ opt = ucl_object_lookup_any(src,
+ "max_reuse", "max-reuse", "keepalive_max_reuse", NULL);
+ if (opt) hdata->max_reuse = (unsigned int) ucl_object_toint(opt);
+ }
+ }
+
hdata->cache = rspamd_mempool_alloc0_shared(cfg->cfg_pool,
sizeof(*hdata->cache));
gboolean request_sent;
uint64_t gen;
uint16_t port;
+ /* Optional per-map HTTP staged timeouts */
+ ev_tstamp connect_timeout;
+ ev_tstamp ssl_timeout;
+ ev_tstamp write_timeout;
+ ev_tstamp read_timeout;
+ /* Optional keepalive tuning */
+ double connection_ttl;
+ double idle_timeout;
+ unsigned int max_reuse;
};
struct static_map_data {
redis_min_expiry = ts.number + ts.string / lua_util.parse_time_interval,
redis_prefix = ts.string,
enabled = ts.boolean:is_optional(),
+ -- New optional staged timeouts for HTTP helper
+ helper_connect_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
+ helper_ssl_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
+ helper_write_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
+ helper_read_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
})
local function check_dmarc_policy(task)
end
ret, _, upstream = lua_redis.redis_make_request(task,
- redis_params, -- connect params
- redis_key, -- hash key
- true, -- is write
- redis_set_cb, --callback
- 'PSETEX', -- command
+ redis_params, -- connect params
+ redis_key, -- hash key
+ true, -- is write
+ redis_set_cb, --callback
+ 'PSETEX', -- command
{ redis_key, tostring(settings.redis_min_expiry * 1000.0),
ucl.to_format(d, "json-compact") })
url = helper_url,
callback = http_helper_callback,
keepalive = true,
+ -- staged timeouts if configured
+ connect_timeout = settings.helper_connect_timeout,
+ ssl_timeout = settings.helper_ssl_timeout,
+ write_timeout = settings.helper_write_timeout,
+ read_timeout = settings.helper_read_timeout,
})
end
-- We first check Redis and then try to use helper
ret, _, upstream = lua_redis.redis_make_request(task,
- redis_params, -- connect params
- redis_key, -- hash key
- false, -- is write
- redis_cached_cb, --callback
- 'GET', -- command
+ redis_params, -- connect params
+ redis_key, -- hash key
+ false, -- is write
+ redis_cached_cb, --callback
+ 'GET', -- command
{ redis_key })
if not ret then
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-]]--
+]] --
local E = {}
local N = 'contextal'
http_timeout = ts.number:is_optional(),
request_ttl = ts.number:is_optional(),
submission_symbol = ts.string:is_optional(),
+ -- staged timeouts
+ connect_timeout = ts.number:is_optional(),
+ ssl_timeout = ts.number:is_optional(),
+ write_timeout = ts.number:is_optional(),
+ read_timeout = ts.number:is_optional(),
}
local settings = {
local cache_obj
if (obj[1] or E).actions then
- cache_obj = {[1] = {["actions"] = obj[1].actions}}
+ cache_obj = { [1] = { ["actions"] = obj[1].actions } }
else
local work_id = task:get_mempool():get_variable('contextal_work_id', 'string')
if work_id then
- cache_obj = {[1] = {["work_id"] = work_id}}
+ cache_obj = { [1] = { ["work_id"] = work_id } }
else
rspamd_logger.err(task, 'no work id found in mempool')
return
end
redis_cache.cache_set(task,
- task:get_digest(),
- cache_obj,
- cache_context)
+ task:get_digest(),
+ cache_obj,
+ cache_context)
maybe_defer(task, obj)
end
end
rspamd_http.request({
- task = task,
- url = settings.actions_url .. work_id,
- callback = http_callback,
- timeout = settings.http_timeout,
- gzip = settings.gzip,
- keepalive = settings.keepalive,
- no_ssl_verify = settings.no_ssl_verify,
+ task = task,
+ url = settings.actions_url .. work_id,
+ callback = http_callback,
+ timeout = settings.http_timeout,
+ gzip = settings.gzip,
+ keepalive = settings.keepalive,
+ no_ssl_verify = settings.no_ssl_verify,
+ -- staged timeouts
+ connect_timeout = settings.connect_timeout,
+ ssl_timeout = settings.ssl_timeout,
+ write_timeout = settings.write_timeout,
+ read_timeout = settings.read_timeout,
})
end
local function submit(task)
-
local function http_callback(err, code, body, hdrs)
if err then
rspamd_logger.err(task, 'http error: %s', err)
task:get_mempool():set_variable('contextal_work_id', work_id)
end
task:insert_result(settings.submission_symbol, 1.0,
- string.format('work_id=%s', work_id or 'nil'))
+ string.format('work_id=%s', work_id or 'nil'))
if wait_request_ttl then
task:add_timer(settings.request_ttl, action_cb)
end
end
local req = {
- object_data = {['data'] = task:get_content()},
+ object_data = { ['data'] = task:get_content() },
}
if settings.request_ttl then
- req.ttl = {['data'] = tostring(settings.request_ttl)}
+ req.ttl = { ['data'] = tostring(settings.request_ttl) }
end
if settings.max_recursion then
- req.maxrec = {['data'] = tostring(settings.max_recursion)}
+ req.maxrec = { ['data'] = tostring(settings.max_recursion) }
end
rspamd_http.request({
- task = task,
- url = settings.submit_url,
- body = lua_util.table_to_multipart_body(req, static_boundary),
- callback = http_callback,
- headers = {
- ['Content-Type'] = string.format('multipart/form-data; boundary="%s"', static_boundary)
- },
- timeout = settings.http_timeout,
- gzip = settings.gzip,
- keepalive = settings.keepalive,
- no_ssl_verify = settings.no_ssl_verify,
+ task = task,
+ url = settings.submit_url,
+ body = lua_util.table_to_multipart_body(req, static_boundary),
+ callback = http_callback,
+ headers = {
+ ['Content-Type'] = string.format('multipart/form-data; boundary="%s"', static_boundary)
+ },
+ timeout = settings.http_timeout,
+ gzip = settings.gzip,
+ keepalive = settings.keepalive,
+ no_ssl_verify = settings.no_ssl_verify,
+ -- staged timeouts
+ connect_timeout = settings.connect_timeout,
+ ssl_timeout = settings.ssl_timeout,
+ write_timeout = settings.write_timeout,
+ read_timeout = settings.read_timeout,
})
end
local function submit_cb(task)
if cache_context then
redis_cache.cache_get(task,
- task:get_digest(),
- cache_context,
- settings.cache_timeout,
- submit,
- cache_hit
+ task:get_digest(),
+ cache_context,
+ settings.cache_timeout,
+ submit,
+ cache_hit
)
else
submit(task)
redis_params = lua_redis.parse_redis_server(N)
if redis_params then
cache_context = redis_cache.create_cache_context(redis_params, {
- cache_prefix = settings.cache_prefix,
- cache_ttl = settings.cache_ttl,
- cache_format = 'json',
- cache_use_hashing = false
+ cache_prefix = settings.cache_prefix,
+ cache_ttl = settings.cache_ttl,
+ cache_format = 'json',
+ cache_use_hashing = false
})
end
}
},
timeout = 10,
+ -- Optional staged timeouts
+ connect_timeout = nil,
+ ssl_timeout = nil,
+ write_timeout = nil,
+ read_timeout = nil,
prompt = nil,
condition = nil,
autolearn = false,
task = task,
upstream = upstream,
use_gzip = true,
+ -- staged timeouts
+ connect_timeout = settings.connect_timeout,
+ ssl_timeout = settings.ssl_timeout,
+ write_timeout = settings.write_timeout,
+ read_timeout = settings.read_timeout,
}
if not rspamd_http.request(http_params) then
task = task,
upstream = upstream,
use_gzip = true,
+ -- staged timeouts
+ connect_timeout = settings.connect_timeout,
+ ssl_timeout = settings.ssl_timeout,
+ write_timeout = settings.write_timeout,
+ read_timeout = settings.read_timeout,
}
rspamd_http.request(http_params)
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-]]--
+]] --
if confighelp then
return
scan_real = math.floor(scan_real * 1000)
if scan_real < 0 then
rspamd_logger.messagex(task,
- 'clock skew detected for message: %s ms real sca time (reset to 0)',
- scan_real)
+ 'clock skew detected for message: %s ms real sca time (reset to 0)',
+ scan_real)
scan_real = 0
end
local function redis_pub_cb(err)
if err then
rspamd_logger.errx(task, 'got error %s when publishing on server %s',
- err, upstream:get_addr())
+ err, upstream:get_addr())
return maybe_defer(task, rule)
end
return true
end
ret, _, upstream = lua_redis.redis_make_request(task,
- redis_params, -- connect params
- nil, -- hash key
- true, -- is write
- redis_pub_cb, --callback
- 'PUBLISH', -- command
- { rule.channel, formatted } -- arguments
+ redis_params, -- connect params
+ nil, -- hash key
+ true, -- is write
+ redis_pub_cb, --callback
+ 'PUBLISH', -- command
+ { rule.channel, formatted } -- arguments
)
if not ret then
rspamd_logger.errx(task, 'error connecting to redis')
gzip = rule.gzip or settings.gzip,
keepalive = rule.keepalive or settings.keepalive,
no_ssl_verify = rule.no_ssl_verify or settings.no_ssl_verify,
+ -- staged timeouts
+ connect_timeout = rule.connect_timeout or settings.connect_timeout,
+ ssl_timeout = rule.ssl_timeout or settings.ssl_timeout,
+ write_timeout = rule.write_timeout or settings.write_timeout,
+ read_timeout = rule.read_timeout or settings.read_timeout,
})
end,
send_mail = function(task, formatted, rule, extra)