]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] lua: tolerate nil upstream in transport, plugins, rspamadm
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 25 Apr 2026 18:45:11 +0000 (19:45 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 25 Apr 2026 18:45:11 +0000 (19:45 +0100)
Audit of every Lua caller of upstream_list:get_upstream_round_robin /
:get_upstream_master_slave / :get_upstream_by_hash that is not a
scanner. Each one now reacts to a nil result instead of dereferencing
it and crashing the call site:

* lua_redis.lua: all four selection sites already logged "cannot
  select server" but then continued into addr:get_addr() and crashed.
  They now `return false, nil, nil` after the log, so callers see a
  proper failure. The sentinel watcher tick logs and skips this round.
* lua_maps.lua: the external-map HTTP path logs and invokes the
  caller's callback with (false, "no upstream available", 502, ctx)
  so map consumers see a normal lookup failure.
* aws_s3.lua: lifts the upstream selection out of the http.request
  table so it can warn before letting the HTTP layer fall back to
  URL-based connect (the request still goes out).
* clickhouse.lua, elastic.lua, gpt.lua: each get_upstream_round_robin
  site now logs and returns from its enclosing function (send,
  retention, distro detect, geoip pipeline, index policy/template,
  GPT/Ollama model dispatch).
* rspamadm/clickhouse.lua and rspamadm/statistics_dump.lua: print to
  stderr and exit / abort the redistribute scan.

lualib/lua_maps.lua
lualib/lua_redis.lua
lualib/rspamadm/clickhouse.lua
lualib/rspamadm/statistics_dump.lua
src/plugins/lua/aws_s3.lua
src/plugins/lua/clickhouse.lua
src/plugins/lua/elastic.lua
src/plugins/lua/gpt.lua

index 80319df41b3e6615af3bf94b4ee5f7cce7d67d6f..b1f3fab0c005d325b36ca8fa5b7683032d5f1ac5 100644 (file)
@@ -167,6 +167,14 @@ local function query_external_map(map_config, upstreams, key, callback, task_or_
     log_obj = task_or_ctx.config
   end
 
+  if not upstream then
+    rspamd_logger.errx(log_obj,
+        'no upstream available for external map %s (all backends dead or pending DNS resolution)',
+        map_config.backend)
+    callback(false, 'no upstream available', 502, task_or_ctx)
+    return
+  end
+
   if type(key) == 'string' or type(key) == 'userdata' then
     if map_config.method == 'body' then
       http_body = key
index a844c19e10cb206ec24c6232bc2bf1f676a750f4..4e4d192bc417bd974b5e5c8ff6f1294d0f8e49bd 100644 (file)
@@ -270,6 +270,12 @@ local function redis_query_sentinel(ev_base, params, initialised)
   local sentinels = params.sentinels
   local addr = sentinels:get_upstream_round_robin()
 
+  if not addr then
+    logger.errx(rspamd_config,
+        'no sentinel upstream available (all dead or pending DNS resolution); skipping sentinel watch tick')
+    return
+  end
+
   local host = addr:get_addr()
   local masters = {}
   local process_masters -- Function that is called to process masters data
@@ -1200,7 +1206,9 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write,
   end
 
   if not addr then
-    logger.errx(task, 'cannot select server to make redis request')
+    logger.errx(task,
+        'cannot select redis server (all dead or pending DNS resolution)')
+    return false, nil, nil
   end
 
   if redis_params['expand_keys'] then
@@ -1312,7 +1320,9 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key,
   end
 
   if not addr then
-    logger.errx(cfg, 'cannot select server to make redis request')
+    logger.errx(cfg,
+        'cannot select redis server (all dead or pending DNS resolution)')
+    return false, nil, nil
   end
 
   local options = {
@@ -1816,7 +1826,9 @@ local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base)
   end
 
   if not addr then
-    logger.errx(cfg, 'cannot select server to make redis request')
+    logger.errx(cfg or rspamd_config,
+        'cannot select redis server (all dead or pending DNS resolution)')
+    return false, nil
   end
 
   local options = {
@@ -1955,7 +1967,9 @@ exports.request = function(redis_params, attrs, req)
   end
 
   if not addr then
-    logger.errx(log_obj, 'cannot select server to make redis request')
+    logger.errx(log_obj,
+        'cannot select redis server (all dead or pending DNS resolution)')
+    return false, nil, nil
   end
 
   opts.host = addr:get_addr()
@@ -2077,7 +2091,9 @@ exports.connect = function(redis_params, attrs)
   end
 
   if not addr then
-    logger.errx(log_obj, 'cannot select server to make redis connect')
+    logger.errx(log_obj,
+        'cannot select redis server (all dead or pending DNS resolution)')
+    return false, nil, nil
   end
 
   opts.host = addr:get_addr()
index b22d8007cec411c4064ebc80359c16298ed44cda..98919c1d6fa555909e8ba666250f1e30ccb813d8 100644 (file)
@@ -304,6 +304,10 @@ local function handle_neural_profile(args)
     table.insert(conditions, string.format("Date = '%s'", query_day))
     local query = string.format(query_fmt, table.concat(conditions, ' AND '), limit)
     local upstream = args.upstream:get_upstream_round_robin()
+    if not upstream then
+      io.stderr:write('No clickhouse upstream available (DNS pending or all dead)\n')
+      os.exit(1)
+    end
     local err = lua_clickhouse.select_sync(upstream, args, http_params, query, process_row)
     if err ~= nil then
       io.stderr:write(string.format('Error querying Clickhouse: %s\n', err))
@@ -447,6 +451,10 @@ local function handle_neural_train(args)
       table.insert(conditions, string.format("Date = '%s'", query_day))
       local query = string.format(query_fmt, args.column_name_vector, table.concat(conditions, ' AND '), limit)
       local upstream = args.upstream:get_upstream_round_robin()
+      if not upstream then
+        io.stderr:write('No clickhouse upstream available (DNS pending or all dead)\n')
+        os.exit(1)
+      end
       local err = lua_clickhouse.select_sync(upstream, args, http_params, query, process_row)
       if err ~= nil then
         io.stderr:write(string.format('Error querying Clickhouse: %s\n', err))
index 985da9a9c01568f4ce49ec4323a9d6581c57a34a..2d5123c5c76ebb17fc418b6052e25830738ba5d1 100644 (file)
@@ -1151,6 +1151,11 @@ local function migrate_handler(opts)
           for _, prefix in ipairs(prefixes) do
             stats.checked = stats.checked + 1
             local target_up = write_servers:get_upstream_by_hash(prefix)
+            if not target_up then
+              rspamd_logger.errx('no upstream available for prefix %s; aborting redistribute scan',
+                  prefix)
+              return false
+            end
             local target_name = target_up:get_name()
 
             if target_name == shard.name then
index 5a4290c06adbe39e8e5e3536aa4841d962505a56..31b16dd702715a2bc0d584c7cb7bd4733663eee8 100644 (file)
@@ -196,6 +196,12 @@ local function s3_aws_callback(task)
       secret_key = settings.s3_secret_key,
       method = 'PUT',
     }, content)
+    local s3_upstream = settings.upstreams:get_upstream_round_robin()
+    if not s3_upstream then
+      rspamd_logger.warnx(task,
+          'no S3 upstream available for %s; falling back to URL-only connect',
+          path)
+    end
     rspamd_http.request({
       url = uri .. path,
       task = task,
@@ -203,7 +209,7 @@ local function s3_aws_callback(task)
       body = content,
       callback = gen_s3_http_callback(path, 'structured message'),
       headers = hdrs,
-      upstream = settings.upstreams:get_upstream_round_robin(),
+      upstream = s3_upstream,
       timeout = settings.s3_timeout,
     })
 
@@ -219,10 +225,16 @@ local function s3_aws_callback(task)
         secret_key = settings.s3_secret_key,
         method = 'PUT',
       }, part_content)
+      local part_upstream = settings.upstreams:get_upstream_round_robin()
+      if not part_upstream then
+        rspamd_logger.warnx(task,
+            'no S3 upstream available for part %s; falling back to URL-only connect',
+            ref)
+      end
       rspamd_http.request({
         url = uri .. ref,
         task = task,
-        upstream = settings.upstreams:get_upstream_round_robin(),
+        upstream = part_upstream,
         method = 'PUT',
         body = part_content,
         callback = gen_s3_http_callback(ref, 'part content'),
index a97d68ff75a6cb7703721533f535ac734a1e2203..c5fac01519e1436642dd0fded79f3a55c933ec24 100644 (file)
@@ -477,6 +477,12 @@ end
 local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows, extra_rows)
   local log_object = task or rspamd_config
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(log_object,
+        "no clickhouse upstream available (DNS pending or all dead); skipping send (%s)",
+        why)
+    return
+  end
   local ip_addr = upstream:get_addr():to_string(true)
   rspamd_logger.infox(log_object, "trying to send %s rows to clickhouse server %s; started as %s",
       #gen_rows + #cust_rows, ip_addr, why)
@@ -1173,6 +1179,12 @@ end
 local function do_remove_partition(ev_base, cfg, table_name, partition, method_override)
   lua_util.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition)
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(rspamd_config,
+        "no clickhouse upstream available; cannot remove partition %s.%s",
+        table_name, partition)
+    return
+  end
   local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION '${partition}'"
   local method = method_override or settings.retention.method
   local remove_method = (method == 'drop') and 'DROP' or 'DETACH'
@@ -1330,6 +1342,11 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
   end
 
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(rspamd_config,
+        "no clickhouse upstream available; cannot run retention pass")
+    return false
+  end
   local partition_to_remove_sql = "SELECT partition, table " ..
       "FROM system.parts WHERE table IN ('${tables}') " ..
       "GROUP BY partition, table " ..
index d78f7ad7437b33092dbde5e6dc3a976f7cf99d0e..3683de68ba016927aed094e172e2f8542ed30207 100644 (file)
@@ -349,6 +349,11 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
     es_index = settings['index_template']['name'] .. '-' .. os.date(settings['index_template']['pattern'])
 
     upstream = settings.upstream:get_upstream_round_robin()
+    if not upstream then
+      rspamd_logger.errx(log_object,
+          'no elastic upstream available (DNS pending or all dead); will retry next tick')
+      return
+    end
     host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
     local ip_addr = upstream:get_addr():to_string(true)
     push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
@@ -717,6 +722,11 @@ end
 
 local function configure_geoip_pipeline(cfg, ev_base)
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(rspamd_config,
+        'no elastic upstream available; cannot configure geoip pipeline')
+    return
+  end
   local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
   local ip_addr = upstream:get_addr():to_string(true)
   local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/' .. settings['geoip']['pipeline_name']
@@ -912,6 +922,11 @@ end
 
 local function configure_index_policy(cfg, ev_base)
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(rspamd_config,
+        'no elastic upstream available; cannot configure index policy')
+    return
+  end
   local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
   local ip_addr = upstream:get_addr():to_string(true)
   local index_policy_path = nil
@@ -1185,6 +1200,11 @@ end
 
 local function configure_index_template(cfg, ev_base)
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(rspamd_config,
+        'no elastic upstream available; cannot configure index template')
+    return
+  end
   local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
   local ip_addr = upstream:get_addr():to_string(true)
   local template_url = connect_prefix .. ip_addr .. '/_index_template/' .. settings['index_template']['name']
@@ -1461,6 +1481,11 @@ local function configure_distro(cfg, ev_base)
   end
 
   local upstream = settings.upstream:get_upstream_round_robin()
+  if not upstream then
+    rspamd_logger.errx(rspamd_config,
+        'no elastic upstream available; will retry distro detection on next tick')
+    return
+  end
   local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
   local ip_addr = upstream:get_addr():to_string(true)
   local root_url = connect_prefix .. ip_addr .. '/'
index e6548f865680bf06cb0365bbd6ec778f7572b531..0bc6488ad0117e6f9d83ee91513d9d5fa384a608 100644 (file)
@@ -1032,6 +1032,13 @@ local function openai_check(task, content, sel_part, context_snippet)
     body.model = model
 
     upstream = settings.upstreams:get_upstream_round_robin()
+    if not upstream then
+      rspamd_logger.errx(task,
+          'no GPT upstream available (DNS pending or all dead); skipping model %s',
+          model)
+      results[idx].checked = true
+      results[idx].error = 'no upstream available'
+    else
     local http_params = {
       url = settings.url,
       method = 'post',
@@ -1057,6 +1064,7 @@ local function openai_check(task, content, sel_part, context_snippet)
     if not rspamd_http.request(http_params) then
       results[idx].checked = true
     end
+    end
   end
 end
 
@@ -1169,6 +1177,13 @@ local function ollama_check(task, content, sel_part, context_snippet)
     body.model = model
 
     upstream = settings.upstreams:get_upstream_round_robin()
+    if not upstream then
+      rspamd_logger.errx(task,
+          'no Ollama upstream available (DNS pending or all dead); skipping model %s',
+          model)
+      results[idx].checked = true
+      results[idx].error = 'no upstream available'
+    else
     local http_params = {
       url = settings.url,
       method = 'post',
@@ -1191,6 +1206,7 @@ local function ollama_check(task, content, sel_part, context_snippet)
     if not rspamd_http.request(http_params) then
       results[idx].checked = true
     end
+    end
   end
 end