log_obj = task_or_ctx.config
end
+ if not upstream then
+ rspamd_logger.errx(log_obj,
+ 'no upstream available for external map %s (all backends dead or pending DNS resolution)',
+ map_config.backend)
+ callback(false, 'no upstream available', 502, task_or_ctx)
+ return
+ end
+
if type(key) == 'string' or type(key) == 'userdata' then
if map_config.method == 'body' then
http_body = key
local sentinels = params.sentinels
local addr = sentinels:get_upstream_round_robin()
+ if not addr then
+ logger.errx(rspamd_config,
+ 'no sentinel upstream available (all dead or pending DNS resolution); skipping sentinel watch tick')
+ return
+ end
+
local host = addr:get_addr()
local masters = {}
local process_masters -- Function that is called to process masters data
end
if not addr then
- logger.errx(task, 'cannot select server to make redis request')
+ logger.errx(task,
+ 'cannot select redis server (all dead or pending DNS resolution)')
+ return false, nil, nil
end
if redis_params['expand_keys'] then
end
if not addr then
- logger.errx(cfg, 'cannot select server to make redis request')
+ logger.errx(cfg,
+ 'cannot select redis server (all dead or pending DNS resolution)')
+ return false, nil, nil
end
local options = {
end
if not addr then
- logger.errx(cfg, 'cannot select server to make redis request')
+ logger.errx(cfg or rspamd_config,
+ 'cannot select redis server (all dead or pending DNS resolution)')
+ return false, nil
end
local options = {
end
if not addr then
- logger.errx(log_obj, 'cannot select server to make redis request')
+ logger.errx(log_obj,
+ 'cannot select redis server (all dead or pending DNS resolution)')
+ return false, nil, nil
end
opts.host = addr:get_addr()
end
if not addr then
- logger.errx(log_obj, 'cannot select server to make redis connect')
+ logger.errx(log_obj,
+ 'cannot select redis server (all dead or pending DNS resolution)')
+ return false, nil, nil
end
opts.host = addr:get_addr()
table.insert(conditions, string.format("Date = '%s'", query_day))
local query = string.format(query_fmt, table.concat(conditions, ' AND '), limit)
local upstream = args.upstream:get_upstream_round_robin()
+ if not upstream then
+ io.stderr:write('No clickhouse upstream available (DNS pending or all dead)\n')
+ os.exit(1)
+ end
local err = lua_clickhouse.select_sync(upstream, args, http_params, query, process_row)
if err ~= nil then
io.stderr:write(string.format('Error querying Clickhouse: %s\n', err))
table.insert(conditions, string.format("Date = '%s'", query_day))
local query = string.format(query_fmt, args.column_name_vector, table.concat(conditions, ' AND '), limit)
local upstream = args.upstream:get_upstream_round_robin()
+ if not upstream then
+ io.stderr:write('No clickhouse upstream available (DNS pending or all dead)\n')
+ os.exit(1)
+ end
local err = lua_clickhouse.select_sync(upstream, args, http_params, query, process_row)
if err ~= nil then
io.stderr:write(string.format('Error querying Clickhouse: %s\n', err))
for _, prefix in ipairs(prefixes) do
stats.checked = stats.checked + 1
local target_up = write_servers:get_upstream_by_hash(prefix)
+ if not target_up then
+ rspamd_logger.errx('no upstream available for prefix %s; aborting redistribute scan',
+ prefix)
+ return false
+ end
local target_name = target_up:get_name()
if target_name == shard.name then
secret_key = settings.s3_secret_key,
method = 'PUT',
}, content)
+ local s3_upstream = settings.upstreams:get_upstream_round_robin()
+ if not s3_upstream then
+ rspamd_logger.warnx(task,
+ 'no S3 upstream available for %s; falling back to URL-only connect',
+ path)
+ end
rspamd_http.request({
url = uri .. path,
task = task,
body = content,
callback = gen_s3_http_callback(path, 'structured message'),
headers = hdrs,
- upstream = settings.upstreams:get_upstream_round_robin(),
+ upstream = s3_upstream,
timeout = settings.s3_timeout,
})
secret_key = settings.s3_secret_key,
method = 'PUT',
}, part_content)
+ local part_upstream = settings.upstreams:get_upstream_round_robin()
+ if not part_upstream then
+ rspamd_logger.warnx(task,
+ 'no S3 upstream available for part %s; falling back to URL-only connect',
+ ref)
+ end
rspamd_http.request({
url = uri .. ref,
task = task,
- upstream = settings.upstreams:get_upstream_round_robin(),
+ upstream = part_upstream,
method = 'PUT',
body = part_content,
callback = gen_s3_http_callback(ref, 'part content'),
local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows, extra_rows)
local log_object = task or rspamd_config
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(log_object,
+ "no clickhouse upstream available (DNS pending or all dead); skipping send (%s)",
+ why)
+ return
+ end
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.infox(log_object, "trying to send %s rows to clickhouse server %s; started as %s",
#gen_rows + #cust_rows, ip_addr, why)
local function do_remove_partition(ev_base, cfg, table_name, partition, method_override)
lua_util.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition)
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(rspamd_config,
+ "no clickhouse upstream available; cannot remove partition %s.%s",
+ table_name, partition)
+ return
+ end
local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION '${partition}'"
local method = method_override or settings.retention.method
local remove_method = (method == 'drop') and 'DROP' or 'DETACH'
end
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(rspamd_config,
+ "no clickhouse upstream available; cannot run retention pass")
+ return false
+ end
local partition_to_remove_sql = "SELECT partition, table " ..
"FROM system.parts WHERE table IN ('${tables}') " ..
"GROUP BY partition, table " ..
es_index = settings['index_template']['name'] .. '-' .. os.date(settings['index_template']['pattern'])
upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(log_object,
+ 'no elastic upstream available (DNS pending or all dead); will retry next tick')
+ return
+ end
host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
local function configure_geoip_pipeline(cfg, ev_base)
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(rspamd_config,
+ 'no elastic upstream available; cannot configure geoip pipeline')
+ return
+ end
local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/' .. settings['geoip']['pipeline_name']
local function configure_index_policy(cfg, ev_base)
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(rspamd_config,
+ 'no elastic upstream available; cannot configure index policy')
+ return
+ end
local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
local index_policy_path = nil
local function configure_index_template(cfg, ev_base)
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(rspamd_config,
+ 'no elastic upstream available; cannot configure index template')
+ return
+ end
local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
local template_url = connect_prefix .. ip_addr .. '/_index_template/' .. settings['index_template']['name']
end
local upstream = settings.upstream:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(rspamd_config,
+ 'no elastic upstream available; will retry distro detection on next tick')
+ return
+ end
local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
local root_url = connect_prefix .. ip_addr .. '/'
body.model = model
upstream = settings.upstreams:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(task,
+ 'no GPT upstream available (DNS pending or all dead); skipping model %s',
+ model)
+ results[idx].checked = true
+ results[idx].error = 'no upstream available'
+ else
local http_params = {
url = settings.url,
method = 'post',
if not rspamd_http.request(http_params) then
results[idx].checked = true
end
+ end
end
end
body.model = model
upstream = settings.upstreams:get_upstream_round_robin()
+ if not upstream then
+ rspamd_logger.errx(task,
+ 'no Ollama upstream available (DNS pending or all dead); skipping model %s',
+ model)
+ results[idx].checked = true
+ results[idx].error = 'no upstream available'
+ else
local http_params = {
url = settings.url,
method = 'post',
if not rspamd_http.request(http_params) then
results[idx].checked = true
end
+ end
end
end