for LLM-based classification.
Main function:
- - fetch_and_format(task, opts, callback, debug_module): Fetch search context and format for LLM
+ - fetch_and_format(task, redis_params, opts, callback, debug_module): Fetch search context and format for LLM
Options (all optional with safe defaults):
enabled: boolean (default: false)
timeout: number (default: 5) - HTTP request timeout in seconds
cache_ttl: number (default: 3600) - cache TTL in seconds
cache_key_prefix: string (default: "gpt_search")
- retry_count: number (default: 3) - number of retry attempts
- retry_delay: number (default: 1) - initial retry delay in seconds
as_system: boolean (default: true) - inject as system message vs user message
enable_expression: table - optional gating expression
disable_expression: table - optional negative gating expression
]]
+local N = 'llm_search_context'
+
local M = {}
local rspamd_http = require "rspamd_http"
timeout = 5,
cache_ttl = 3600, -- 1 hour
cache_key_prefix = "gpt_search",
- retry_count = 3,
- retry_delay = 1, -- seconds
as_system = true,
enable_expression = nil,
disable_expression = nil,
local domains = {}
local seen = {}
- -- Get URLs from the task
- local urls = task:get_urls() or {}
+ -- Get URLs from the task using extract_specific_urls
+ local urls = lua_util.extract_specific_urls({
+ task = task,
+ limit = max_domains * 3, -- Get more to filter
+ esld_limit = max_domains,
+ }) or {}
for _, url in ipairs(urls) do
if #domains >= max_domains then
end
-- Query search API for a single domain
-local function query_search_api(domain, opts, callback, debug_module)
- local debug_m = debug_module or 'llm_search_context'
+local function query_search_api(task, domain, opts, callback)
local url = opts.search_url or DEFAULTS.search_url
local timeout = opts.timeout or DEFAULTS.timeout
local max_results = opts.max_results_per_query or DEFAULTS.max_results_per_query
local full_url = url .. "?" .. query_string
- rspamd_logger.debugm(debug_m, nil, "querying search API: %s", full_url)
+ lua_util.debugm(N, task, "querying search API: %s", full_url)
local function http_callback(err, code, body, _)
if err then
- rspamd_logger.errx(debug_m, "search API error for %s: %s", domain, err)
+ lua_util.debugm(N, task, "search API error for %s: %s", domain, err)
callback(nil, domain, err)
return
end
if code ~= 200 then
- rspamd_logger.warnx(debug_m, "search API returned code %s for %s", code, domain)
+ lua_util.debugm(N, task, "search API returned code %s for %s", code, domain)
callback(nil, domain, string.format("HTTP %s", code))
return
end
local parser = ucl.parser()
local ok, parse_err = parser:parse_string(body)
if not ok then
- rspamd_logger.errx(debug_m, "failed to parse search API response for %s: %s", domain, parse_err)
+ rspamd_logger.errx(task, "%s: failed to parse search API response for %s: %s",
+ N, domain, parse_err)
callback(nil, domain, parse_err)
return
end
url = full_url,
timeout = timeout,
callback = http_callback,
+ task = task,
+ log_obj = task,
})
end
--- Query with retry logic
-local function query_with_retry(domain, opts, callback, debug_module, attempt)
- local debug_m = debug_module or 'llm_search_context'
- attempt = attempt or 1
- local max_attempts = opts.retry_count or DEFAULTS.retry_count
-
- if attempt > max_attempts then
- rspamd_logger.warnx(debug_m, "max retries exceeded for domain %s", domain)
- callback(nil, domain, "max retries exceeded")
- return
- end
-
- query_search_api(domain, opts, function(results, dom, err)
- if err and attempt < max_attempts then
- -- Calculate exponential backoff delay
- local delay = (opts.retry_delay or DEFAULTS.retry_delay) * (2 ^ (attempt - 1))
- rspamd_logger.debugm(debug_m, nil, "retrying search for %s after %ss (attempt %s/%s)",
- domain, delay, attempt + 1, max_attempts)
-
- -- Schedule retry
- rspamd_config:add_delayed_callback(delay, function()
- query_with_retry(domain, opts, callback, debug_module, attempt + 1)
- end)
- else
- callback(results, dom, err)
- end
- end, debug_module)
-end
-
-- Format search results as context
local function format_search_results(all_results, opts)
if not all_results or #all_results == 0 then
end
-- Check Redis cache for domain search results
-local function check_cache(redis_params, domain, opts, callback, debug_module)
- local debug_m = debug_module or 'llm_search_context'
+local function check_cache(task, redis_params, domain, opts, callback)
local cache_key = get_cache_key(domain, opts)
local function redis_callback(err, data)
if err then
- rspamd_logger.debugm(debug_m, nil, "Redis error for cache key %s: %s", cache_key, err)
+ lua_util.debugm(N, task, "Redis error for cache key %s: %s", cache_key, err)
callback(nil, domain)
return
end
local parser = ucl.parser()
local ok, parse_err = parser:parse_string(data)
if ok then
- rspamd_logger.debugm(debug_m, nil, "cache hit for domain %s", domain)
+ lua_util.debugm(N, task, "cache hit for domain %s", domain)
callback(parser:get_object(), domain)
else
- rspamd_logger.warnx(debug_m, "failed to parse cached data for %s: %s", domain, parse_err)
+ rspamd_logger.warnx(task, "%s: failed to parse cached data for %s: %s",
+ N, domain, parse_err)
callback(nil, domain)
end
else
- rspamd_logger.debugm(debug_m, nil, "cache miss for domain %s", domain)
+ lua_util.debugm(N, task, "cache miss for domain %s", domain)
callback(nil, domain)
end
end
- lua_redis.redis_make_request(nil, redis_params, cache_key, false,
+ lua_redis.redis_make_request(task, redis_params, cache_key, false,
redis_callback, 'GET', { cache_key })
end
-- Store search results in Redis cache
-local function store_cache(redis_params, domain, results, opts, debug_module)
- local debug_m = debug_module or 'llm_search_context'
+local function store_cache(task, redis_params, domain, results, opts)
local cache_key = get_cache_key(domain, opts)
local ttl = opts.cache_ttl or DEFAULTS.cache_ttl
local function redis_callback(err, _)
if err then
- rspamd_logger.warnx(debug_m, "failed to cache results for %s: %s", domain, err)
+ rspamd_logger.warnx(task, "%s: failed to cache results for %s: %s",
+ N, domain, err)
else
- rspamd_logger.debugm(debug_m, nil, "cached results for domain %s (TTL: %ss)", domain, ttl)
+ lua_util.debugm(N, task, "cached results for domain %s (TTL: %ss)", domain, ttl)
end
end
- lua_redis.redis_make_request(nil, redis_params, cache_key, true,
+ lua_redis.redis_make_request(task, redis_params, cache_key, true,
redis_callback, 'SETEX', { cache_key, tostring(ttl), data })
end
-- Main function to fetch and format search context
function M.fetch_and_format(task, redis_params, opts, callback, debug_module)
- local debug_m = debug_module or 'llm_search_context'
+ local Np = debug_module or N
-- Apply defaults
opts = lua_util.override_defaults(DEFAULTS, opts or {})
if not opts.enabled then
- rspamd_logger.debugm(debug_m, task, "search context disabled")
+ lua_util.debugm(Np, task, "search context disabled")
callback(task, false, nil)
return
end
local domains = extract_domains(task, opts.max_domains)
if #domains == 0 then
- rspamd_logger.debugm(debug_m, task, "no domains to search")
+ lua_util.debugm(Np, task, "no domains to search")
callback(task, false, nil)
return
end
- rspamd_logger.debugm(debug_m, task, "extracted %s domain(s) for search: %s",
+ lua_util.debugm(Np, task, "extracted %s domain(s) for search: %s",
#domains, table.concat(domains, ", "))
local pending_queries = #domains
results = results
})
elseif err then
- rspamd_logger.debugm(debug_m, task, "search failed for domain %s: %s", domain, err)
+ lua_util.debugm(Np, task, "search failed for domain %s: %s", domain, err)
end
if pending_queries == 0 then
-- All queries complete
if #all_results == 0 then
- rspamd_logger.debugm(debug_m, task, "no search results obtained")
+ lua_util.debugm(Np, task, "no search results obtained")
callback(task, false, nil)
else
local context_snippet = format_search_results(all_results, opts)
- rspamd_logger.debugm(debug_m, task, "search context formatted (%s bytes)",
+ lua_util.debugm(Np, task, "search context formatted (%s bytes)",
context_snippet and #context_snippet or 0)
callback(task, true, context_snippet)
end
for _, domain in ipairs(domains) do
if redis_params then
-- Check cache first
- check_cache(redis_params, domain, opts, function(cached_results, dom)
+ check_cache(task, redis_params, domain, opts, function(cached_results, dom)
if cached_results then
-- Use cached results
domain_callback(cached_results, dom, nil)
else
- -- Query API and cache results
- query_with_retry(dom, opts, function(api_results, d, api_err)
+ -- Query API and cache results (no retry, fail gracefully)
+ query_search_api(task, dom, opts, function(api_results, d, api_err)
if api_results and redis_params then
- store_cache(redis_params, d, api_results, opts, debug_module)
+ store_cache(task, redis_params, d, api_results, opts)
end
domain_callback(api_results, d, api_err)
- end, debug_module)
+ end)
end
- end, debug_module)
+ end)
else
- -- No Redis, query directly
- query_with_retry(domain, opts, domain_callback, debug_module)
+ -- No Redis, query directly (no retry, fail gracefully)
+ query_search_api(task, domain, opts, domain_callback)
end
end
end
-return M
\ No newline at end of file
+return M