]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
fix: use sub_utf8 to strip headers value to not break utf8 strings
authorDmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Tue, 21 Jan 2025 23:00:28 +0000 (00:00 +0100)
committerGitHub <noreply@github.com>
Tue, 21 Jan 2025 23:00:28 +0000 (00:00 +0100)
src/plugins/lua/elastic.lua

index 8bed9fcf48d297fcf75dadf93f21acf922ddd127..cc36893c0d7e19f709554301f1a7681523686a68 100644 (file)
@@ -19,6 +19,7 @@ local rspamd_logger = require 'rspamd_logger'
 local rspamd_http = require "rspamd_http"
 local lua_util = require "lua_util"
 local rspamd_util = require "rspamd_util"
+local rspamd_text = require "rspamd_text"
 local ucl = require "ucl"
 local upstream_list = require "rspamd_upstream_list"
 
@@ -287,7 +288,7 @@ end
 local function handle_error(action, component, limit)
   if states[component]['errors'] >= limit then
     rspamd_logger.errx(rspamd_config, 'cannot %s elastic %s, failed attempts: %s/%s, stop trying',
-        action, component:gsub('_', ' '), states[component]['errors'], limit)
+      action, component:gsub('_', ' '), states[component]['errors'], limit)
     states[component]['configured'] = true
   else
     states[component]['errors'] = states[component]['errors'] + 1
@@ -315,54 +316,6 @@ local function get_received_delay(received_headers)
   return delay
 end
 
-local function is_empty(str)
-  -- define a pattern that includes invisible unicode characters
-  local str_cleared = str:gsub('[' ..
-      '\xC2\xA0' .. -- U+00A0 non-breaking space
-      '\xE2\x80\x8B' .. -- U+200B zero width space
-      '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space)
-      '\xE2\x80\x8C' .. -- U+200C zero width non-joiner
-      '\xE2\x80\x8D' .. -- U+200D zero width joiner
-      '\xE2\x80\x8E' .. -- U+200E left-to-right mark
-      '\xE2\x80\x8F' .. -- U+200F right-to-left mark
-      '\xE2\x81\xA0' .. -- U+2060 word joiner
-      '\xE2\x80\xAA' .. -- U+202A left-to-right embedding
-      '\xE2\x80\xAB' .. -- U+202B right-to-left embedding
-      '\xE2\x80\xAC' .. -- U+202C pop directional formatting
-      '\xE2\x80\xAD' .. -- U+202D left-to-right override
-      '\xE2\x80\xAE' .. -- U+202E right-to-left override
-      '\xE2\x81\x9F' .. -- U+2061 function application
-      '\xE2\x81\xA1' .. -- U+2061 invisible separator
-      '\xE2\x81\xA2' .. -- U+2062 invisible times
-      '\xE2\x81\xA3' .. -- U+2063 invisible separator
-      '\xE2\x81\xA4' .. -- U+2064 invisible plus
-      ']', '') -- gsub replaces all matched characters with an empty string
-  if str_cleared:match('[%S]') then
-    return false
-  else
-    return true
-  end
-end
-
-local function fill_empty_strings(tbl, empty_value)
-  local filled_tbl = {}
-  for key, value in pairs(tbl) do
-    if value and type(value) == 'table' then
-      local nested_filtered = fill_empty_strings(value, empty_value)
-      if next(nested_filtered) ~= nil then
-        filled_tbl[key] = nested_filtered
-      end
-    elseif type(value) == 'boolean' then
-      filled_tbl[key] = value
-    elseif value and type(value) == 'string' and is_empty(value) then
-      filled_tbl[key] = empty_value
-    elseif value then
-      filled_tbl[key] = value
-    end
-  end
-  return filled_tbl
-end
-
 local function create_bulk_json(es_index, logs_to_send)
   local tbl = {}
   for _, row in pairs(logs_to_send) do
@@ -407,15 +360,17 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
   local function http_callback(err, code, body, _)
     local push_done = false
     if err then
-      rspamd_logger.errx(log_object, 'cannot send logs to elastic (%s): %s; failed attempts: %s/%s',
-          push_url, err, buffer['errors'], settings['limits']['max_fail'])
+      rspamd_logger.errx(log_object,
+        'cannot send logs to elastic (%s): %s; failed attempts: %s/%s',
+        push_url, err, buffer['errors'], settings['limits']['max_fail'])
     elseif code == 200 then
       local parser = ucl.parser()
       local res, ucl_err = parser:parse_string(body)
       if not ucl_err and res then
         local obj = parser:get_object()
         push_done = true
-        lua_util.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send)
+        lua_util.debugm(N, log_object,
+          'successfully sent payload with %s logs', nlogs_to_send)
         if obj['errors'] then
           for _, value in pairs(obj['items']) do
             if value['index'] and value['index']['status'] >= 400 then
@@ -424,15 +379,15 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
               local error_type = safe_get(value, 'index', 'error', 'type') or ''
               local error_reason = safe_get(value, 'index', 'error', 'reason') or ''
               rspamd_logger.warnx(log_object,
-                  'error while pushing logs to elastic, status: %s, index: %s, type: %s, reason: %s',
-                  status, index, error_type, error_reason)
+                'error while pushing logs to elastic, status: %s, index: %s, type: %s, reason: %s',
+                status, index, error_type, error_reason)
             end
           end
         end
       else
         rspamd_logger.errx(log_object,
-            'cannot parse response from elastic (%s): %s; failed attempts: %s/%s',
-            push_url, ucl_err, buffer['errors'], settings['limits']['max_fail'])
+          'cannot parse response from elastic (%s): %s; failed attempts: %s/%s',
+          push_url, ucl_err, buffer['errors'], settings['limits']['max_fail'])
       end
     else
       rspamd_logger.errx(log_object,
@@ -448,8 +403,8 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
       upstream:fail()
       if buffer['errors'] >= settings['limits']['max_fail'] then
         rspamd_logger.errx(log_object,
-            'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
-            nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
+          'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
+          nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
         buffer['logs']:pop_first(nlogs_to_send)
         buffer['errors'] = 0
       else
@@ -565,8 +520,8 @@ local function get_general_metadata(task)
   if task:has_from('smtp') then
     local from = task:get_from({ 'smtp', 'orig' })[1]
     if from and
-        from['user'] and #from['user'] > 0 and
-        from['domain'] and #from['domain'] > 0
+      from['user'] and #from['user'] > 0 and
+      from['domain'] and #from['domain'] > 0
     then
       r.from_user = from['user']
       r.from_domain = from['domain']:lower()
@@ -578,8 +533,8 @@ local function get_general_metadata(task)
   if task:has_from('mime') then
     local mime_from = task:get_from({ 'mime', 'orig' })[1]
     if mime_from and
-        mime_from['user'] and #mime_from['user'] > 0 and
-        mime_from['domain'] and #mime_from['domain'] > 0
+      mime_from['user'] and #mime_from['user'] > 0 and
+      mime_from['domain'] and #mime_from['domain'] > 0
     then
       r.mime_from_user = mime_from['user']
       r.mime_from_domain = mime_from['domain']:lower()
@@ -608,25 +563,34 @@ local function get_general_metadata(task)
 
   local function process_header(name)
     local hdr = task:get_header_full(name)
-    local headers_text_ignore_above = settings['index_template']['headers_text_ignore_above'] - 3
     if hdr and #hdr > 0 then
       local l = {}
       for _, h in ipairs(hdr) do
-        if settings['index_template']['headers_count_ignore_above'] ~= 0 and
-            #l >= settings['index_template']['headers_count_ignore_above']
+        if settings['index_template']['headers_count_ignore_above'] > 0 and
+          #l >= settings['index_template']['headers_count_ignore_above']
         then
           table.insert(l, 'ignored above...')
           break
         end
         local header
-        if settings['index_template']['headers_text_ignore_above'] ~= 0 and
-            h.decoded and #h.decoded >= headers_text_ignore_above
-        then
-          header = h.decoded:sub(1, headers_text_ignore_above) .. '...'
-        elseif h.decoded and #h.decoded > 0 then
-          header = h.decoded
+        local header_len
+        if h.decoded then
+          header = rspamd_text.fromstring(h.decoded)
+          header_len = header:len_utf8()
         else
-          header = empty
+          table.insert(l, empty)
+          break
+        end
+        if not header_len or header_len == 0 then
+          table.insert(l, empty)
+          break
+        end
+        if settings['index_template']['headers_text_ignore_above'] > 0 and
+          header_len >= settings['index_template']['headers_text_ignore_above']
+        then
+          header = header:sub_utf8(1, settings['index_template']['headers_text_ignore_above'])
+          table.insert(l, header .. rspamd_text.fromstring('...'))
+          break
         end
         table.insert(l, header)
       end
@@ -686,7 +650,7 @@ local function get_general_metadata(task)
 
   r.received_delay = get_received_delay(task:get_received_headers())
 
-  return fill_empty_strings(r, empty)
+  return r
 end
 
 local function elastic_collect(task)
@@ -773,8 +737,8 @@ local function configure_geoip_pipeline(cfg, ev_base)
       upstream:ok()
     else
       rspamd_logger.errx(rspamd_config,
-          'cannot configure elastic geoip pipeline (%s), status code: %s, response: %s',
-          geoip_url, code, body)
+        'cannot configure elastic geoip pipeline (%s), status code: %s, response: %s',
+        geoip_url, code, body)
       upstream:fail()
       handle_error('configure', 'geoip_pipeline', settings['limits']['max_fail'])
     end
@@ -810,8 +774,9 @@ local function put_index_policy(cfg, ev_base, upstream, host, policy_url, index_
       states['index_policy']['configured'] = true
       upstream:ok()
     else
-      rspamd_logger.errx(rspamd_config, 'cannot configure elastic index policy (%s), status code: %s, response: %s',
-          policy_url, code, body)
+      rspamd_logger.errx(rspamd_config,
+        'cannot configure elastic index policy (%s), status code: %s, response: %s',
+        policy_url, code, body)
       upstream:fail()
       handle_error('configure', 'index_policy', settings['limits']['max_fail'])
     end
@@ -867,7 +832,7 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_
             if not lua_util.table_cmp(our_policy['policy']['default_state'], current_default_state) then
               update_needed = true
             elseif not lua_util.table_cmp(our_policy['policy']['ism_template'][1]['index_patterns'],
-                current_ism_index_patterns) then
+              current_ism_index_patterns) then
               update_needed = true
             elseif not lua_util.table_cmp(our_policy['policy']['states'], current_states) then
               update_needed = true
@@ -890,8 +855,8 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_
                 put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
               else
                 rspamd_logger.errx(rspamd_config,
-                    'current elastic index policy (%s) not returned correct seq_no/primary_term, policy will not be updated, response: %s',
-                    policy_url, body)
+                  'current elastic index policy (%s) not returned correct seq_no/primary_term, policy will not be updated, response: %s',
+                  policy_url, body)
                 upstream:fail()
                 handle_error('validate current', 'index_policy', settings['limits']['max_fail'])
               end
@@ -909,8 +874,8 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_
       end
     else
       rspamd_logger.errx(rspamd_config,
-          'cannot get current elastic index policy (%s), status code: %s, response: %s',
-          policy_url, code, body)
+        'cannot get current elastic index policy (%s), status code: %s, response: %s',
+        policy_url, code, body)
       handle_error('get current', 'index_policy', settings['limits']['max_fail'])
       upstream:fail()
     end
@@ -1037,7 +1002,7 @@ local function configure_index_policy(cfg, ev_base)
       }
       index_policy['policy']['phases']['delete'] = delete_obj
     end
-    -- opensearch state policy with hot state
+  -- opensearch state policy with hot state
   elseif detected_distro['name'] == 'opensearch' then
     local retry = {
       count = 3,
@@ -1381,7 +1346,7 @@ local function configure_index_template(cfg, ev_base)
       upstream:ok()
     else
       rspamd_logger.errx(rspamd_config, 'cannot configure elastic index template (%s), status code: %s, response: %s',
-          template_url, code, body)
+        template_url, code, body)
       upstream:fail()
       handle_error('configure', 'index_template', settings['limits']['max_fail'])
     end
@@ -1424,8 +1389,9 @@ local function verify_distro(manual)
     local supported_distro_info = supported_distro[detected_distro_name]
     -- check that detected_distro_version is valid
     if not detected_distro_version or type(detected_distro_version) ~= 'string' then
-      rspamd_logger.errx(rspamd_config, 'elastic version should be a string, but we received: %s',
-          type(detected_distro_version))
+      rspamd_logger.errx(rspamd_config,
+        'elastic version should be a string, but we received: %s',
+        type(detected_distro_version))
       valid = false
     elseif detected_distro_version == '' then
       rspamd_logger.errx(rspamd_config, 'unsupported elastic version: empty string')
@@ -1434,21 +1400,22 @@ local function verify_distro(manual)
       -- compare versions using compare_versions
       local cmp_from = compare_versions(detected_distro_version, supported_distro_info['from'])
       if cmp_from == -1 then
-        rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, minimal supported version of %s is %s',
-            detected_distro_version, detected_distro_name, supported_distro_info['from'])
+        rspamd_logger.errx(rspamd_config,
+          'unsupported elastic version: %s, minimal supported version of %s is %s',
+          detected_distro_version, detected_distro_name, supported_distro_info['from'])
         valid = false
       else
         local cmp_till = compare_versions(detected_distro_version, supported_distro_info['till'])
         if (cmp_till >= 0) and not supported_distro_info['till_unknown'] then
           rspamd_logger.errx(rspamd_config,
-              'unsupported elastic version: %s, maximum supported version of %s is less than %s',
-              detected_distro_version, detected_distro_name, supported_distro_info['till'])
+            'unsupported elastic version: %s, maximum supported version of %s is less than %s',
+            detected_distro_version, detected_distro_name, supported_distro_info['till'])
           valid = false
         elseif (cmp_till >= 0) and supported_distro_info['till_unknown'] then
           rspamd_logger.warnx(rspamd_config,
-              'compatibility of elastic version: %s is unknown, maximum known supported version of %s is less than %s,' ..
-                  'use at your own risk',
-              detected_distro_version, detected_distro_name, supported_distro_info['till'])
+            'compatibility of elastic version: %s is unknown, maximum known ' ..
+            'supported version of %s is less than %s, use at your own risk',
+            detected_distro_version, detected_distro_name, supported_distro_info['till'])
           valid_unknown = true
         end
       end
@@ -1460,11 +1427,12 @@ local function verify_distro(manual)
   else
     if valid and manual then
       rspamd_logger.infox(
-          rspamd_config, 'assuming elastic distro: %s, version: %s', detected_distro_name, detected_distro_version)
+        rspamd_config, 'assuming elastic distro: %s, version: %s', detected_distro_name, detected_distro_version)
       detected_distro['supported'] = true
     elseif valid and not manual then
-      rspamd_logger.infox(rspamd_config, 'successfully connected to elastic distro: %s, version: %s',
-          detected_distro_name, detected_distro_version)
+      rspamd_logger.infox(rspamd_config,
+        'successfully connected to elastic distro: %s, version: %s',
+        detected_distro_name, detected_distro_version)
       detected_distro['supported'] = true
     else
       handle_error('configure', 'distro', settings['version']['autodetect_max_fail'])
@@ -1477,7 +1445,7 @@ local function configure_distro(cfg, ev_base)
     detected_distro['name'] = settings['version']['override']['name']
     detected_distro['version'] = settings['version']['override']['version']
     rspamd_logger.infox(rspamd_config,
-        'automatic detection of elastic distro and version is disabled, taking configuration from settings')
+      'automatic detection of elastic distro and version is disabled, taking configuration from settings')
     verify_distro(true)
   end
 
@@ -1490,14 +1458,16 @@ local function configure_distro(cfg, ev_base)
       rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', root_url, err)
       upstream:fail()
     elseif code ~= 200 then
-      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s), status code: %s, response: %s', root_url, code,
-          body)
+      rspamd_logger.errx(rspamd_config,
+        'cannot connect to elastic (%s), status code: %s, response: %s',
+        root_url, code, body)
       upstream:fail()
     else
       local parser = ucl.parser()
       local res, ucl_err = parser:parse_string(body)
       if not res then
-        rspamd_logger.errx(rspamd_config, 'failed to parse reply from elastic (%s): %s', root_url, ucl_err)
+        rspamd_logger.errx(rspamd_config, 'failed to parse reply from elastic (%s): %s',
+          root_url, ucl_err)
         upstream:fail()
       else
         local obj = parser:get_object()