From c8e7b9ca07b598c1928e688ae5901f4d52370583 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 14 Jan 2026 14:29:32 +0000 Subject: [PATCH] [Feature] Route all hyperscan cache operations through Lua backend - Route file backend through Lua for consistency with redis/http - Add zstd compression support with magic byte detection for backward compatibility (reads both .hs and .hs.zst files) - Fix rspamd_util.stat() return value handling (returns err, stat tuple) - Fix timer management for synchronous Lua callbacks to prevent early termination of re_cache compilation - Fix use-after-free in load path by pre-counting pending items - Add priority queue for re_cache compilation (short lists first) - Add ev_run() flush before blocking hyperscan compilations to ensure busy notifications are sent - Add hyperscan_notice_known() and hyperscan_get_platform_id() Lua APIs --- lualib/lua_hs_cache.lua | 432 ++++++++++++++++++++++++------- src/hs_helper.c | 8 + src/libserver/hs_cache_backend.c | 56 ++-- src/libserver/maps/map_helpers.c | 26 +- src/libserver/re_cache.c | 322 +++++++++-------------- src/libserver/worker_util.c | 92 ++----- src/libutil/multipattern.c | 47 ++-- src/lua/lua_util.c | 53 ++++ 8 files changed, 605 insertions(+), 431 deletions(-) diff --git a/lualib/lua_hs_cache.lua b/lualib/lua_hs_cache.lua index 89ef5e1ab3..5f3435a98d 100644 --- a/lualib/lua_hs_cache.lua +++ b/lualib/lua_hs_cache.lua @@ -28,19 +28,77 @@ local N = "hyperscan" local file_backend = {} file_backend.__index = file_backend +-- Zstd magic number: 0xFD2FB528 (little-endian bytes: 28 B5 2F FD) +local ZSTD_MAGIC = string.char(0x28, 0xB5, 0x2F, 0xFD) + function file_backend.new(config) local self = setmetatable({}, file_backend) - self.cache_dir = config.cache_dir or '/var/lib/rspamd/hs_cache' - self.platform_dirs = config.platform_dirs ~= false + -- Store config for logging context + self.config = config.rspamd_config or config + -- Remove trailing slashes from cache_dir + local cache_dir = config.cache_dir or '/var/lib/rspamd/hs_cache' + self.cache_dir = cache_dir:gsub("/+$", "") + -- Default to flat directory structure for backward compatibility + self.platform_dirs = config.platform_dirs == true + -- Enable compression by default (consistent with redis/http backends) + local opts = config.file or config + self.use_compression = (opts.compression ~= false) and (config.compression ~= false) + lua_util.debugm(N, self.config, "file backend config: cache_dir=%s, platform_dirs=%s, compression=%s", + self.cache_dir, self.platform_dirs and "yes" or "no", + self.use_compression and "enabled" or "disabled") return self end -function file_backend:_get_path(cache_key, platform_id) +-- Get file extension based on compression setting +function file_backend:_get_extension() + return self.use_compression and '.hs.zst' or '.hs' +end + +-- Get the path for a cache file +-- @param cache_key string cache key (hash that already includes platform info) +-- @param platform_id string platform identifier (unused in flat mode for backward compat) +-- @param ext string optional extension override (e.g., '.hs' or '.hs.zst') +function file_backend:_get_path(cache_key, platform_id, ext) + local extension = ext or self:_get_extension() if self.platform_dirs then - return string.format("%s/%s/%s.hs", self.cache_dir, platform_id, cache_key) + -- Optional: use platform subdirectories (not default) + return string.format("%s/%s/%s%s", self.cache_dir, platform_id, cache_key, extension) else - return string.format("%s/%s_%s.hs", self.cache_dir, platform_id, cache_key) + -- Default: flat structure matching original C code behavior + -- Platform info is already embedded in cache_key hash + return string.format("%s/%s%s", self.cache_dir, cache_key, extension) + end +end + +-- Check if data starts with zstd magic bytes +function file_backend:_is_zstd(data) + if not data or #data < 4 then + return false end + return data:sub(1, 4) == ZSTD_MAGIC +end + +-- Find existing cache file, trying both compressed and uncompressed extensions +-- Returns: path, is_compressed (or nil if not found) +function file_backend:_find_existing_path(cache_key, platform_id) + -- Try compressed first if compression is enabled, otherwise try uncompressed first + local primary_ext = self:_get_extension() + local secondary_ext = self.use_compression and '.hs' or '.hs.zst' + + local primary_path = self:_get_path(cache_key, platform_id, primary_ext) + -- rspamd_util.stat returns (err, stat_table) - check for no error AND valid stat + local err, stat = rspamd_util.stat(primary_path) + if not err and stat then + return primary_path, primary_ext == '.hs.zst' + end + + local secondary_path = self:_get_path(cache_key, platform_id, secondary_ext) + err, stat = rspamd_util.stat(secondary_path) + if not err and stat then + return secondary_path, secondary_ext == '.hs.zst' + end + + return nil, nil end function file_backend:_ensure_dir(path) @@ -54,73 +112,183 @@ function file_backend:_ensure_dir(path) end function file_backend:exists(cache_key, platform_id, callback) - local path = self:_get_path(cache_key, platform_id) - local stat = rspamd_util.stat(path) - - if stat then - lua_util.debugm(N, "file exists check: %s found, size: %d", path, stat.size) - callback(nil, true, { size = stat.size, mtime = stat.mtime }) + local path, is_compressed = self:_find_existing_path(cache_key, platform_id) + + if path then + local err, stat = rspamd_util.stat(path) + if not err and stat then + lua_util.debugm(N, self.config, "file exists check: %s found, size: %d, compressed: %s", + path, stat.size, is_compressed and "yes" or "no") + callback(nil, true, { size = stat.size, mtime = stat.mtime, compressed = is_compressed }) + else + -- Race condition: file disappeared between _find_existing_path and stat + lua_util.debugm(N, self.config, "file exists check: %s disappeared (race)", path) + callback(nil, false, nil) + end else - lua_util.debugm(N, "file exists check: %s not found", path) + local expected_path = self:_get_path(cache_key, platform_id) + lua_util.debugm(N, self.config, "file exists check: %s not found (checked both extensions)", expected_path) callback(nil, false, nil) end end function file_backend:load(cache_key, platform_id, callback) - local path = self:_get_path(cache_key, platform_id) + local path, expected_compressed = self:_find_existing_path(cache_key, platform_id) - lua_util.debugm(N, "file load from: %s", path) + if not path then + local expected_path = self:_get_path(cache_key, platform_id) + lua_util.debugm(N, self.config, "file load failed: %s not found (checked both extensions)", expected_path) + callback("file not found", nil) + return + end - local data, err = rspamd_util.read_file(path) - if data then - lua_util.debugm(N, "file loaded %d bytes from %s", #data, path) - callback(nil, data) + lua_util.debugm(N, self.config, "file load from: %s (expected compressed: %s)", path, expected_compressed and "yes" or "no") + + local f, err = io.open(path, "rb") + if not f then + lua_util.debugm(N, self.config, "file load failed from %s: %s", path, err or "open error") + callback(err or "open error", nil) + return + end + local data = f:read("*a") + f:close() + if not data then + lua_util.debugm(N, self.config, "file read failed from %s", path) + callback("read error", nil) + return + end + + -- Check if data is actually zstd compressed (magic byte verification) + local is_zstd = self:_is_zstd(data) + lua_util.debugm(N, self.config, "file loaded %d bytes from %s, zstd magic: %s", + #data, path, is_zstd and "yes" or "no") + + -- Notify hyperscan cache that this file is known (for cleanup tracking) + rspamd_util.hyperscan_notice_known(path) + + if is_zstd then + -- Decompress the data + local decompress_err, decompressed = rspamd_util.zstd_decompress(data) + if not decompress_err and decompressed then + lua_util.debugm(N, self.config, "file decompressed %d -> %d bytes from %s (compression ratio: %.1f%%)", + #data, #decompressed, path, (1 - #data / #decompressed) * 100) + callback(nil, decompressed) + else + lua_util.debugm(N, self.config, "file decompression failed for %s: %s", path, decompress_err or "unknown error") + callback(decompress_err or "decompression failed", nil) + end else - lua_util.debugm(N, "file load failed from %s: %s", path, err or "file not found") - callback(err or "file not found", nil) + -- Data is not compressed, return as-is + if expected_compressed then + lua_util.debugm(N, self.config, "file %s has .zst extension but no zstd magic - treating as uncompressed", path) + end + callback(nil, data) end end function file_backend:store(cache_key, platform_id, data, _ttl, callback) local path = self:_get_path(cache_key, platform_id) + lua_util.debugm(N, self.config, "file store to: %s, original size: %d bytes, compression: %s", + path, #data, self.use_compression and "enabled" or "disabled") + self:_ensure_dir(path) + local store_data = data + -- Compress if enabled + if self.use_compression then + local compressed, compress_err = rspamd_util.zstd_compress(data) + if compressed then + lua_util.debugm(N, self.config, "file compressed %d -> %d bytes (%.1f%% size reduction) for %s", + #data, #compressed, (1 - #compressed / #data) * 100, path) + store_data = compressed + else + logger.warnx(N, "compression failed: %s, storing uncompressed to %s", compress_err, path) + end + end + -- Write to temp file first, then rename atomically local tmp_path = path .. ".tmp." .. rspamd_util.random_hex(8) - local ok, err = rspamd_util.write_file(tmp_path, data) + -- store_data can be string or rspamd_text userdata + local ok, write_err + if type(store_data) == "userdata" and store_data.save_in_file then + ok, write_err = store_data:save_in_file(tmp_path) + else + local f, err = io.open(tmp_path, "wb") + if not f then + callback(err or "open failed") + return + end + ok, write_err = f:write(store_data) + f:close() + end + if not ok then + os.remove(tmp_path) + callback(write_err or "write failed") + return + end - if ok then + do local renamed, rename_err = os.rename(tmp_path, path) if renamed then - lua_util.debugm(N, "stored %d bytes to %s", #data, path) + lua_util.debugm(N, self.config, "stored %d bytes to %s", #store_data, path) + -- Notify hyperscan cache that this file is known (for cleanup tracking) + rspamd_util.hyperscan_notice_known(path) + -- Remove old file with opposite extension if it exists (migration cleanup) + local old_ext = self.use_compression and '.hs' or '.hs.zst' + local old_path = self:_get_path(cache_key, platform_id, old_ext) + local old_err, old_stat = rspamd_util.stat(old_path) + if not old_err and old_stat then + local removed = os.remove(old_path) + if removed then + lua_util.debugm(N, self.config, "removed old cache file %s (migrated to %s)", old_path, path) + end + end callback(nil) else os.remove(tmp_path) callback(rename_err or "rename failed") end - else - callback(err or "write failed") end end function file_backend:delete(cache_key, platform_id, callback) - local path = self:_get_path(cache_key, platform_id) - local ok, err = os.remove(path) + -- Try to delete both compressed and uncompressed versions + local deleted_any = false + local last_err = nil + + for _, ext in ipairs({'.hs', '.hs.zst'}) do + local path = self:_get_path(cache_key, platform_id, ext) + local stat_err, stat = rspamd_util.stat(path) + if not stat_err and stat then + local ok, err = os.remove(path) + if ok then + lua_util.debugm(N, self.config, "deleted %s", path) + deleted_any = true + else + last_err = err + end + end + end - if ok then - lua_util.debugm(N, "deleted %s", path) + if deleted_any then callback(nil) else - callback(err or "delete failed") + callback(last_err or "file not found") end end function file_backend:exists_sync(cache_key, platform_id) - local path = self:_get_path(cache_key, platform_id) - local exists = rspamd_util.stat(path) ~= nil - lua_util.debugm(N, "file sync exists check: %s %s", path, exists and "found" or "not found") - return exists, nil + local path, is_compressed = self:_find_existing_path(cache_key, platform_id) + if path then + lua_util.debugm(N, self.config, "file sync exists check: %s found (compressed: %s)", + path, is_compressed and "yes" or "no") + return true, nil + else + local expected_path = self:_get_path(cache_key, platform_id) + lua_util.debugm(N, self.config, "file sync exists check: %s not found (checked both extensions)", expected_path) + return false, nil + end end function file_backend:save_async(cache_key, platform_id, data, callback) @@ -137,37 +305,119 @@ function file_backend:exists_async(cache_key, platform_id, callback) end function file_backend:load_sync(cache_key, platform_id) - local path = self:_get_path(cache_key, platform_id) - lua_util.debugm(N, "file sync load from: %s", path) - local data, err = rspamd_util.read_file(path) - if data then - lua_util.debugm(N, "file sync loaded %d bytes from %s", #data, path) + local path, expected_compressed = self:_find_existing_path(cache_key, platform_id) + + if not path then + local expected_path = self:_get_path(cache_key, platform_id) + lua_util.debugm(N, self.config, "file sync load failed: %s not found (checked both extensions)", expected_path) + return nil, "file not found" + end + + lua_util.debugm(N, self.config, "file sync load from: %s (expected compressed: %s)", + path, expected_compressed and "yes" or "no") + + local f, err = io.open(path, "rb") + if not f then + lua_util.debugm(N, self.config, "file sync load failed from %s: %s", path, err or "open error") + return nil, err or "open error" + end + local data = f:read("*a") + f:close() + if not data then + lua_util.debugm(N, self.config, "file sync read failed from %s", path) + return nil, "read error" + end + + -- Check if data is actually zstd compressed (magic byte verification) + local is_zstd = self:_is_zstd(data) + lua_util.debugm(N, self.config, "file sync loaded %d bytes from %s, zstd magic: %s", + #data, path, is_zstd and "yes" or "no") + + -- Notify hyperscan cache that this file is known (for cleanup tracking) + rspamd_util.hyperscan_notice_known(path) + + if is_zstd then + -- Decompress the data + local decompress_err, decompressed = rspamd_util.zstd_decompress(data) + if not decompress_err and decompressed then + lua_util.debugm(N, self.config, "file sync decompressed %d -> %d bytes from %s (compression ratio: %.1f%%)", + #data, #decompressed, path, (1 - #data / #decompressed) * 100) + return decompressed, nil + else + lua_util.debugm(N, self.config, "file sync decompression failed for %s: %s", path, decompress_err or "unknown error") + return nil, decompress_err or "decompression failed" + end else - lua_util.debugm(N, "file sync load failed from %s: %s", path, err or "file not found") + -- Data is not compressed, return as-is + if expected_compressed then + lua_util.debugm(N, self.config, "file %s has .zst extension but no zstd magic - treating as uncompressed", path) + end + return data, nil end - return data, err end function file_backend:save_sync(cache_key, platform_id, data) local path = self:_get_path(cache_key, platform_id) - lua_util.debugm(N, "file sync save to: %s, size: %d bytes", path, #data) + lua_util.debugm(N, self.config, "file sync save to: %s, original size: %d bytes, compression: %s", + path, #data, self.use_compression and "enabled" or "disabled") self:_ensure_dir(path) + local store_data = data + -- Compress if enabled + if self.use_compression then + local compressed, compress_err = rspamd_util.zstd_compress(data) + if compressed then + lua_util.debugm(N, self.config, "file sync compressed %d -> %d bytes (%.1f%% size reduction) for %s", + #data, #compressed, (1 - #compressed / #data) * 100, path) + store_data = compressed + else + logger.warnx(N, "compression failed: %s, storing uncompressed to %s", compress_err, path) + end + end + local tmp_path = path .. ".tmp." .. rspamd_util.random_hex(8) - local ok, err = rspamd_util.write_file(tmp_path, data) + -- store_data can be string or rspamd_text userdata + local ok, write_err + if type(store_data) == "userdata" and store_data.save_in_file then + ok, write_err = store_data:save_in_file(tmp_path) + else + local f, err = io.open(tmp_path, "wb") + if not f then + lua_util.debugm(N, self.config, "file sync open failed for %s: %s", tmp_path, err) + return false, err + end + ok, write_err = f:write(store_data) + f:close() + end if not ok then - lua_util.debugm(N, "file sync write failed to %s: %s", tmp_path, err) - return false, err + lua_util.debugm(N, self.config, "file sync write failed to %s: %s", tmp_path, write_err) + os.remove(tmp_path) + return false, write_err end local renamed, rename_err = os.rename(tmp_path, path) if not renamed then - lua_util.debugm(N, "file sync rename failed %s -> %s: %s", tmp_path, path, rename_err) + lua_util.debugm(N, self.config, "file sync rename failed %s -> %s: %s", tmp_path, path, rename_err) os.remove(tmp_path) return false, rename_err end - lua_util.debugm(N, "file sync stored %d bytes to %s", #data, path) + lua_util.debugm(N, self.config, "file sync stored %d bytes to %s", #store_data, path) + + -- Notify hyperscan cache that this file is known (for cleanup tracking) + rspamd_util.hyperscan_notice_known(path) + + -- Remove old file with opposite extension if it exists (migration cleanup) + local old_ext = self.use_compression and '.hs' or '.hs.zst' + local old_path = self:_get_path(cache_key, platform_id, old_ext) + local old_err, old_stat = rspamd_util.stat(old_path) + if not old_err and old_stat then + local removed = os.remove(old_path) + if removed then + lua_util.debugm(N, self.config, "removed old cache file %s (migrated to %s)", old_path, path) + end + end + return true, nil end @@ -212,7 +462,7 @@ function redis_backend.new(config) local default_prefix = self.use_compression and 'rspamd_zhs' or 'rspamd_hs' self.prefix = opts.prefix or config.prefix or default_prefix - lua_util.debugm(N, "redis backend config: prefix=%s, ttl=%s, refresh_ttl=%s, compression=%s", + lua_util.debugm(N, self.config, "redis backend config: prefix=%s, ttl=%s, refresh_ttl=%s, compression=%s", self.prefix, self.default_ttl, self.refresh_ttl, self.use_compression) return self @@ -230,17 +480,17 @@ function redis_backend:exists(cache_key, platform_id, callback) return end - lua_util.debugm(N, "redis EXISTS check for key: %s", key) + lua_util.debugm(N, self.config, "redis EXISTS check for key: %s", key) local attrs = { ev_base = self.redis_params.ev_base, config = self.config, callback = function(err, data) if err then - lua_util.debugm(N, "redis EXISTS failed for key %s: %s", key, err) + lua_util.debugm(N, self.config, "redis EXISTS failed for key %s: %s", key, err) callback(err, false, nil) else - lua_util.debugm(N, "redis EXISTS result for key %s: %s", key, data == 1 and "found" or "not found") + lua_util.debugm(N, self.config, "redis EXISTS result for key %s: %s", key, data == 1 and "found" or "not found") callback(nil, data == 1, nil) end end @@ -261,10 +511,10 @@ function redis_backend:load(cache_key, platform_id, callback) -- Use GETEX to refresh TTL on read if enabled local req if self.refresh_ttl then - lua_util.debugm(N, "redis GETEX (with TTL refresh %d) for key: %s", self.default_ttl, key) + lua_util.debugm(N, self.config, "redis GETEX (with TTL refresh %d) for key: %s", self.default_ttl, key) req = {'GETEX', key, 'EX', tostring(self.default_ttl)} else - lua_util.debugm(N, "redis GET for key: %s", key) + lua_util.debugm(N, self.config, "redis GET for key: %s", key) req = {'GET', key} end @@ -273,25 +523,25 @@ function redis_backend:load(cache_key, platform_id, callback) config = self.config, callback = function(err, data) if err then - lua_util.debugm(N, "redis GET failed for key %s: %s", key, err) + lua_util.debugm(N, self.config, "redis GET failed for key %s: %s", key, err) callback(err, nil) elseif not data then - lua_util.debugm(N, "redis cache miss for key %s", key) + lua_util.debugm(N, self.config, "redis cache miss for key %s", key) callback("not found", nil) else -- Decompress if needed if self.use_compression then local decompress_err, decompressed = rspamd_util.zstd_decompress(data) if not decompress_err and decompressed then - lua_util.debugm(N, "redis loaded and decompressed %d -> %d bytes from key %s (compression ratio: %.1f%%)", + lua_util.debugm(N, self.config, "redis loaded and decompressed %d -> %d bytes from key %s (compression ratio: %.1f%%)", #data, #decompressed, key, (1 - #data / #decompressed) * 100) callback(nil, decompressed) else - lua_util.debugm(N, "redis decompression failed for key %s: %s", key, decompress_err) + lua_util.debugm(N, self.config, "redis decompression failed for key %s: %s", key, decompress_err) callback(decompress_err or "decompression failed", nil) end else - lua_util.debugm(N, "redis loaded %d bytes (uncompressed) from key %s", #data, key) + lua_util.debugm(N, self.config, "redis loaded %d bytes (uncompressed) from key %s", #data, key) callback(nil, data) end end @@ -310,7 +560,7 @@ function redis_backend:store(cache_key, platform_id, data, ttl, callback) return end - lua_util.debugm(N, "redis SETEX for key: %s, original size: %d bytes, TTL: %d, compression: %s", + lua_util.debugm(N, self.config, "redis SETEX for key: %s, original size: %d bytes, TTL: %d, compression: %s", key, #data, actual_ttl, self.use_compression and "enabled" or "disabled") local store_data = data @@ -318,7 +568,7 @@ function redis_backend:store(cache_key, platform_id, data, ttl, callback) if self.use_compression then local compressed, compress_err = rspamd_util.zstd_compress(data) if compressed then - lua_util.debugm(N, "redis compressed %d -> %d bytes (%.1f%% size reduction) for key %s", + lua_util.debugm(N, self.config, "redis compressed %d -> %d bytes (%.1f%% size reduction) for key %s", #data, #compressed, (1 - #compressed / #data) * 100, key) store_data = compressed else @@ -331,10 +581,10 @@ function redis_backend:store(cache_key, platform_id, data, ttl, callback) config = self.config, callback = function(err) if err then - lua_util.debugm(N, "redis SETEX failed for key %s: %s", key, err) + lua_util.debugm(N, self.config, "redis SETEX failed for key %s: %s", key, err) callback(err) else - lua_util.debugm(N, "redis stored %d bytes to key %s with TTL %d", + lua_util.debugm(N, self.config, "redis stored %d bytes to key %s with TTL %d", #store_data, key, actual_ttl) callback(nil) end @@ -353,17 +603,17 @@ function redis_backend:delete(cache_key, platform_id, callback) return end - lua_util.debugm(N, "redis DEL for key: %s", key) + lua_util.debugm(N, self.config, "redis DEL for key: %s", key) local attrs = { ev_base = self.redis_params.ev_base, config = self.config, callback = function(err) if err then - lua_util.debugm(N, "redis DEL failed for key %s: %s", key, err) + lua_util.debugm(N, self.config, "redis DEL failed for key %s: %s", key, err) callback(err) else - lua_util.debugm(N, "redis deleted key %s", key) + lua_util.debugm(N, self.config, "redis deleted key %s", key) callback(nil) end end @@ -391,6 +641,8 @@ http_backend.__index = http_backend function http_backend.new(config) local self = setmetatable({}, http_backend) + -- Store config for logging context + self.config = config.rspamd_config or config -- HTTP config can be in 'http' sub-section or at top level local opts = config.http or config @@ -417,7 +669,7 @@ end function http_backend:exists(cache_key, platform_id, callback) local url = self:_get_url(cache_key, platform_id) - lua_util.debugm(N, "http HEAD check for url: %s", url) + lua_util.debugm(N, self.config, "http HEAD check for url: %s", url) rspamd_http.request({ url = url, @@ -426,14 +678,14 @@ function http_backend:exists(cache_key, platform_id, callback) timeout = self.timeout, callback = function(err, code, _, headers) if err then - lua_util.debugm(N, "http HEAD failed for %s: %s", url, err) + lua_util.debugm(N, self.config, "http HEAD failed for %s: %s", url, err) callback(err, false, nil) elseif code == 200 then local size = headers and headers['content-length'] - lua_util.debugm(N, "http HEAD found %s, size: %s", url, size or "unknown") + lua_util.debugm(N, self.config, "http HEAD found %s, size: %s", url, size or "unknown") callback(nil, true, { size = tonumber(size) }) else - lua_util.debugm(N, "http HEAD not found %s (code: %d)", url, code) + lua_util.debugm(N, self.config, "http HEAD not found %s (code: %d)", url, code) callback(nil, false, nil) end end @@ -443,7 +695,7 @@ end function http_backend:load(cache_key, platform_id, callback) local url = self:_get_url(cache_key, platform_id) - lua_util.debugm(N, "http GET for url: %s", url) + lua_util.debugm(N, self.config, "http GET for url: %s", url) rspamd_http.request({ url = url, @@ -452,7 +704,7 @@ function http_backend:load(cache_key, platform_id, callback) timeout = self.timeout, callback = function(err, code, body, headers) if err then - lua_util.debugm(N, "http GET failed for %s: %s", url, err) + lua_util.debugm(N, self.config, "http GET failed for %s: %s", url, err) callback(err, nil) elseif code == 200 and body then -- Check if content is compressed @@ -460,22 +712,22 @@ function http_backend:load(cache_key, platform_id, callback) if content_encoding == 'zstd' or self.use_compression then local decompress_err, decompressed = rspamd_util.zstd_decompress(body) if not decompress_err and decompressed then - lua_util.debugm(N, "http loaded and decompressed %d -> %d bytes from %s", + lua_util.debugm(N, self.config, "http loaded and decompressed %d -> %d bytes from %s", #body, #decompressed, url) callback(nil, decompressed) else - lua_util.debugm(N, "http loaded %d bytes (no decompression) from %s", #body, url) + lua_util.debugm(N, self.config, "http loaded %d bytes (no decompression) from %s", #body, url) callback(nil, body) end else - lua_util.debugm(N, "http loaded %d bytes from %s", #body, url) + lua_util.debugm(N, self.config, "http loaded %d bytes from %s", #body, url) callback(nil, body) end elseif code == 404 then - lua_util.debugm(N, "http cache miss (404) for %s", url) + lua_util.debugm(N, self.config, "http cache miss (404) for %s", url) callback("not found", nil) else - lua_util.debugm(N, "http GET failed for %s: HTTP %d", url, code) + lua_util.debugm(N, self.config, "http GET failed for %s: HTTP %d", url, code) callback(string.format("HTTP %d", code), nil) end end @@ -486,14 +738,14 @@ function http_backend:store(cache_key, platform_id, data, ttl, callback) local url = self:_get_url(cache_key, platform_id) local headers = self:_get_headers() - lua_util.debugm(N, "http PUT for url: %s, original size: %d bytes, compression: %s", + lua_util.debugm(N, self.config, "http PUT for url: %s, original size: %d bytes, compression: %s", url, #data, self.use_compression and "enabled" or "disabled") local store_data = data if self.use_compression then local compressed = rspamd_util.zstd_compress(data) if compressed then - lua_util.debugm(N, "http compressed %d -> %d bytes (%.1f%% size reduction) for %s", + lua_util.debugm(N, self.config, "http compressed %d -> %d bytes (%.1f%% size reduction) for %s", #data, #compressed, (1 - #compressed / #data) * 100, url) store_data = compressed headers['Content-Encoding'] = 'zstd' @@ -512,13 +764,13 @@ function http_backend:store(cache_key, platform_id, data, ttl, callback) timeout = self.timeout, callback = function(err, code) if err then - lua_util.debugm(N, "http PUT failed for %s: %s", url, err) + lua_util.debugm(N, self.config, "http PUT failed for %s: %s", url, err) callback(err) elseif code >= 200 and code < 300 then - lua_util.debugm(N, "http stored %d bytes to %s", #store_data, url) + lua_util.debugm(N, self.config, "http stored %d bytes to %s", #store_data, url) callback(nil) else - lua_util.debugm(N, "http PUT failed for %s: HTTP %d", url, code) + lua_util.debugm(N, self.config, "http PUT failed for %s: HTTP %d", url, code) callback(string.format("HTTP %d", code)) end end @@ -528,7 +780,7 @@ end function http_backend:delete(cache_key, platform_id, callback) local url = self:_get_url(cache_key, platform_id) - lua_util.debugm(N, "http DELETE for url: %s", url) + lua_util.debugm(N, self.config, "http DELETE for url: %s", url) rspamd_http.request({ url = url, @@ -537,13 +789,13 @@ function http_backend:delete(cache_key, platform_id, callback) timeout = self.timeout, callback = function(err, code) if err then - lua_util.debugm(N, "http DELETE failed for %s: %s", url, err) + lua_util.debugm(N, self.config, "http DELETE failed for %s: %s", url, err) callback(err) elseif code >= 200 and code < 300 or code == 404 then - lua_util.debugm(N, "http deleted %s", url) + lua_util.debugm(N, self.config, "http deleted %s", url) callback(nil) else - lua_util.debugm(N, "http DELETE failed for %s: HTTP %d", url, code) + lua_util.debugm(N, self.config, "http DELETE failed for %s: HTTP %d", url, code) callback(string.format("HTTP %d", code)) end end @@ -562,19 +814,21 @@ end function exports.create_backend(config) local backend_type = config.backend or config.cache_backend or 'file' - lua_util.debugm(N, "creating hyperscan cache backend: %s", backend_type) + local cfg = config.rspamd_config or config + lua_util.debugm(N, cfg, "creating hyperscan cache backend: %s", backend_type) -- Always pass full config - backends will extract what they need -- (config contains ev_base, rspamd_config at top level, plus optional -- redis/http sub-sections for backend-specific settings) if backend_type == 'file' then local be = file_backend.new(config) - lua_util.debugm(N, "file backend created, cache_dir: %s", be.cache_dir or "not set") + lua_util.debugm(N, be.config, "file backend created, cache_dir: %s, compression: %s", + be.cache_dir or "not set", be.use_compression and "enabled" or "disabled") return be elseif backend_type == 'redis' then local be = redis_backend.new(config) if be.redis_params then - lua_util.debugm(N, "redis backend created, prefix: %s, compression: %s", + lua_util.debugm(N, be.config, "redis backend created, prefix: %s, compression: %s", be.prefix, be.use_compression and "enabled" or "disabled") else logger.errx(N, "redis backend created but no redis params - operations will fail!") @@ -582,7 +836,7 @@ function exports.create_backend(config) return be elseif backend_type == 'http' then local be = http_backend.new(config) - lua_util.debugm(N, "http backend created, base_url: %s", be.base_url or "not set") + lua_util.debugm(N, be.config, "http backend created, base_url: %s", be.base_url or "not set") return be else logger.errx(N, "unknown hyperscan cache backend: %s, falling back to file", backend_type) diff --git a/src/hs_helper.c b/src/hs_helper.c index f0517c7536..90f17c9eb6 100644 --- a/src/hs_helper.c +++ b/src/hs_helper.c @@ -879,6 +879,8 @@ rspamd_hs_helper_mp_exists_cb(gboolean success, /* Need to compile+store */ rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, "compile multipattern"); + /* Flush the busy notification before blocking on compilation */ + ev_run(mpctx->ctx->event_loop, EVRUN_NOWAIT); rspamd_multipattern_compile_hs_to_cache_async(entry->mp, mpctx->ctx->hs_dir, mpctx->ctx->event_loop, rspamd_hs_helper_mp_compiled_cb, mpctx); @@ -921,6 +923,8 @@ rspamd_hs_helper_compile_pending_multipatterns_next(struct rspamd_hs_helper_mp_a } else { rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, "compile multipattern"); + /* Flush the busy notification before blocking on compilation */ + ev_run(mpctx->ctx->event_loop, EVRUN_NOWAIT); if (!rspamd_multipattern_compile_hs_to_cache(entry->mp, mpctx->ctx->hs_dir, &err)) { msg_err("failed to compile multipattern '%s': %e", entry->name, err); if (err) g_error_free(err); @@ -1044,6 +1048,8 @@ rspamd_hs_helper_remap_exists_cb(gboolean success, /* Need to compile+store */ rspamd_worker_set_busy(rmctx->worker, rmctx->ctx->event_loop, "compile regexp map"); + /* Flush the busy notification before blocking on compilation */ + ev_run(rmctx->ctx->event_loop, EVRUN_NOWAIT); rspamd_regexp_map_compile_hs_to_cache_async(entry->re_map, rmctx->ctx->hs_dir, rmctx->ctx->event_loop, rspamd_hs_helper_remap_compiled_cb, rmctx); @@ -1085,6 +1091,8 @@ rspamd_hs_helper_compile_pending_regexp_maps_next(struct rspamd_hs_helper_remap_ } else { rspamd_worker_set_busy(rmctx->worker, rmctx->ctx->event_loop, "compile regexp map"); + /* Flush the busy notification before blocking on compilation */ + ev_run(rmctx->ctx->event_loop, EVRUN_NOWAIT); if (!rspamd_regexp_map_compile_hs_to_cache(entry->re_map, rmctx->ctx->hs_dir, &err)) { msg_err("failed to compile regexp map '%s': %e", entry->name, err); if (err) g_error_free(err); diff --git a/src/libserver/hs_cache_backend.c b/src/libserver/hs_cache_backend.c index 576d3da3f7..1923245822 100644 --- a/src/libserver/hs_cache_backend.c +++ b/src/libserver/hs_cache_backend.c @@ -47,14 +47,11 @@ rspamd_hs_cache_try_init_lua_backend_with_opts(struct rspamd_config *cfg, lua_State *L; int err_idx; - if (!cfg || !cfg->lua_state || !ev_base || !opts || !backend_name) { - return FALSE; - } - - if (strcmp(backend_name, "file") == 0) { + if (!cfg || !cfg->lua_state || !ev_base || !backend_name) { return FALSE; } + /* All backends (file, redis, http) now go through Lua for consistency */ L = (lua_State *) cfg->lua_state; /* Ensure redis pool is bound to this process event loop (required for lua_redis async requests) */ @@ -81,8 +78,13 @@ rspamd_hs_cache_try_init_lua_backend_with_opts(struct rspamd_config *cfg, return FALSE; } - /* Push options as config table */ - ucl_object_push_lua(L, opts, true); + /* Push options as config table (or empty table if no opts) */ + if (opts) { + ucl_object_push_lua(L, opts, true); + } + else { + lua_newtable(L); + } /* Set event loop for lua_redis */ { @@ -198,39 +200,43 @@ rspamd_hs_cache_try_init_lua_backend(struct rspamd_config *cfg, return TRUE; } - if (!cfg || !cfg->workers) { + if (!cfg) { return FALSE; } - hs_quark = g_quark_try_string("hs_helper"); - for (cur = cfg->workers; cur != NULL; cur = g_list_next(cur)) { - cf = (const struct rspamd_worker_conf *) cur->data; - if (cf && (hs_quark != 0 ? (cf->type == hs_quark) : (strcmp(g_quark_to_string(cf->type), "hs_helper") == 0))) { - opts = cf->options; - break; + /* Look for hs_helper worker config if available */ + if (cfg->workers) { + hs_quark = g_quark_try_string("hs_helper"); + for (cur = cfg->workers; cur != NULL; cur = g_list_next(cur)) { + cf = (const struct rspamd_worker_conf *) cur->data; + if (cf && (hs_quark != 0 ? (cf->type == hs_quark) : (strcmp(g_quark_to_string(cf->type), "hs_helper") == 0))) { + opts = cf->options; + break; + } } } - if (!opts) { - return FALSE; + /* Extract backend config from hs_helper options if available */ + if (opts) { + const ucl_object_t *b = ucl_object_lookup(opts, "cache_backend"); + if (b && ucl_object_type(b) == UCL_STRING) { + backend_name = ucl_object_tostring(b); + } + const ucl_object_t *d = ucl_object_lookup(opts, "cache_dir"); + if (d && ucl_object_type(d) == UCL_STRING) { + cache_dir = ucl_object_tostring(d); + } } - const ucl_object_t *b = ucl_object_lookup(opts, "cache_backend"); - if (b && ucl_object_type(b) == UCL_STRING) { - backend_name = ucl_object_tostring(b); - } + /* Default to file backend */ if (!backend_name) { backend_name = "file"; } - - const ucl_object_t *d = ucl_object_lookup(opts, "cache_dir"); - if (d && ucl_object_type(d) == UCL_STRING) { - cache_dir = ucl_object_tostring(d); - } if (!cache_dir) { cache_dir = cfg->hs_cache_dir; } + /* Always initialize Lua backend (file, redis, or http) */ return rspamd_hs_cache_try_init_lua_backend_with_opts(cfg, ev_base, opts, backend_name, cache_dir); } diff --git a/src/libserver/maps/map_helpers.c b/src/libserver/maps/map_helpers.c index 2dc3f88e33..a1ee7fab11 100644 --- a/src/libserver/maps/map_helpers.c +++ b/src/libserver/maps/map_helpers.c @@ -2124,19 +2124,8 @@ void rspamd_regexp_map_compile_hs_to_cache_async(struct rspamd_regexp_map_helper map = re_map->map; - if (!rspamd_hs_cache_has_lua_backend()) { - /* Synchronous file backend */ - gboolean success = rspamd_regexp_map_compile_hs_to_cache(re_map, cache_dir, &err); - if (cb) { - cb(re_map, success, err, ud); - } - if (err) { - g_error_free(err); - } - return; - } - - /* Async Lua backend path */ + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); hs_platform_info_t plt; hs_compile_error_t *hs_errors = NULL; hs_database_t *db = NULL; @@ -2353,16 +2342,9 @@ void rspamd_regexp_map_load_from_cache_async(struct rspamd_regexp_map_helper *re g_assert(re_map != NULL); g_assert(cache_dir != NULL); - if (!rspamd_hs_cache_has_lua_backend()) { - /* Synchronous file backend */ - gboolean success = rspamd_regexp_map_load_from_cache(re_map, cache_dir); - if (cb) { - cb(success, ud); - } - return; - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); - /* Async Lua backend path */ char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1]; rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs", (int) rspamd_cryptobox_HASHBYTES / 2, re_map->re_digest); diff --git a/src/libserver/re_cache.c b/src/libserver/re_cache.c index 8bf44ba3c0..8ec16d638d 100644 --- a/src/libserver/re_cache.c +++ b/src/libserver/re_cache.c @@ -23,6 +23,7 @@ #include "libserver/hs_cache_backend.h" #include "libutil/util.h" #include "libutil/regexp.h" +#include "libutil/heap.h" #include "lua/lua_common.h" #include "libstat/stat_api.h" #include "contrib/uthash/utlist.h" @@ -2142,8 +2143,17 @@ enum rspamd_re_cache_compile_state { RSPAMD_RE_CACHE_COMPILE_STATE_SAVING }; +/* Heap element for priority compilation queue */ +struct rspamd_re_compile_queue_elt { + unsigned int pri; /* Priority: lower = compile first */ + unsigned int idx; /* Heap index (managed by heap) */ + struct rspamd_re_class *re_class; +}; + +RSPAMD_HEAP_DECLARE(re_compile_queue, struct rspamd_re_compile_queue_elt); + struct rspamd_re_cache_hs_compile_cbdata { - GHashTableIter it; + re_compile_queue_t compile_queue; /* Priority queue of re_classes to compile */ struct rspamd_re_cache *cache; const char *cache_dir; double max_time; @@ -2336,6 +2346,7 @@ rspamd_re_cache_compile_timer_cb(EV_P_ ev_timer *w, int revents) if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) { ev_timer_stop(EV_A_ w); cbdata->cb(cbdata->total, NULL, cbdata->cbd); + rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue); g_free(w); g_free(cbdata); @@ -2346,110 +2357,49 @@ rspamd_re_cache_compile_timer_cb(EV_P_ ev_timer *w, int revents) re_class = cbdata->current_class; } else { - if (!g_hash_table_iter_next(&cbdata->it, &k, &v)) { + /* Pop next item from priority queue */ + struct rspamd_re_compile_queue_elt *elt = + rspamd_heap_pop(re_compile_queue, &cbdata->compile_queue); + if (elt == NULL) { /* All done */ ev_timer_stop(EV_A_ w); cbdata->cb(cbdata->total, NULL, cbdata->cbd); + rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue); g_free(w); g_free(cbdata); return; } - re_class = v; + re_class = elt->re_class; cbdata->current_class = re_class; cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS; } if (cbdata->state == RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS) { - if (rspamd_hs_cache_has_lua_backend()) { - struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx)); - ctx->cbdata = cbdata; - ctx->loop = loop; - ctx->w = w; - char entity_name[256]; - if (re_class->type_len > 0) { - rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)", - rspamd_re_cache_type_to_string(re_class->type), - (int) re_class->type_len - 1, re_class->type_data); - } - else { - rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s", - rspamd_re_cache_type_to_string(re_class->type)); - } - rspamd_hs_cache_lua_exists_async(re_class->hash, entity_name, rspamd_re_cache_exists_cb, ctx); - ev_timer_stop(EV_A_ w); - return; + /* Check via Lua backend (handles file, redis, http) */ + struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx)); + ctx->cbdata = cbdata; + ctx->loop = loop; + ctx->w = w; + char entity_name[256]; + if (re_class->type_len > 0) { + rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)", + rspamd_re_cache_type_to_string(re_class->type), + (int) re_class->type_len - 1, re_class->type_data); } - - /* Check file backend */ - rspamd_snprintf(path, sizeof(path), "%s%c%s.hs", cbdata->cache_dir, - G_DIR_SEPARATOR, re_class->hash); - if (rspamd_re_cache_is_valid_hyperscan_file(cache, path, TRUE, TRUE, NULL)) { - /* Read number of regexps for logging */ - fd = open(path, O_RDONLY, 00600); - - if (fd != -1) { - if (lseek(fd, RSPAMD_HS_MAGIC_LEN + sizeof(cache->plt), SEEK_SET) != -1) { - if (read(fd, &n, sizeof(n)) != sizeof(n)) { - n = 0; - } - } - close(fd); - } - - if (re_class->type_len > 0) { - if (!cbdata->silent) { - msg_info_re_cache( - "skip already valid class %s(%*s) to cache %6s, %d regexps%s%s%s", - rspamd_re_cache_type_to_string(re_class->type), - (int) re_class->type_len - 1, - re_class->type_data, - re_class->hash, - n, - cache->scope ? " for scope '" : "", - cache->scope ? cache->scope : "", - cache->scope ? "'" : ""); - } - } - else { - if (!cbdata->silent) { - msg_info_re_cache( - "skip already valid class %s to cache %6s, %d regexps%s%s%s", - rspamd_re_cache_type_to_string(re_class->type), - re_class->hash, - n, - cache->scope ? " for scope '" : "", - cache->scope ? cache->scope : "", - cache->scope ? "'" : ""); - } - } - - cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT; - cbdata->current_class = NULL; - ev_timer_again(EV_A_ w); - return; + else { + rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s", + rspamd_re_cache_type_to_string(re_class->type)); } - - cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_COMPILING; + rspamd_hs_cache_lua_exists_async(re_class->hash, entity_name, rspamd_re_cache_exists_cb, ctx); + /* Don't stop timer here - the callback (rspamd_re_cache_exists_cb) handles + * restarting the timer. For file backend the callback runs synchronously + * within exists_async, so stopping here would undo the timer restart. */ + return; } - /* Only create temp file if not using Lua backend */ - if (!rspamd_hs_cache_has_lua_backend()) { - rspamd_snprintf(path, sizeof(path), "%s%c%s%P-XXXXXXXXXX", cbdata->cache_dir, - G_DIR_SEPARATOR, re_class->hash, our_pid); - fd = g_mkstemp_full(path, O_CREAT | O_TRUNC | O_EXCL | O_WRONLY, 00600); - - if (fd == -1) { - err = g_error_new(rspamd_re_cache_quark(), errno, - "cannot open file %s: %s", path, strerror(errno)); - rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false); - return; - } - } - else { - fd = -1; /* Not using file */ - } + fd = -1; /* Not using direct file I/O, Lua backend handles storage */ g_hash_table_iter_init(&cit, re_class->re); n = g_hash_table_size(re_class->re); @@ -2658,111 +2608,44 @@ rspamd_re_cache_compile_timer_cb(EV_P_ ev_timer *w, int revents) iov[6].iov_base = hs_serialized; iov[6].iov_len = serialized_len; - if (rspamd_hs_cache_has_lua_backend()) { - /* Build combined buffer for Lua backend */ - gsize total_len = 0; - for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) { - total_len += iov[j].iov_len; - } - - unsigned char *combined = g_malloc(total_len); - gsize offset = 0; - for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) { - memcpy(combined + offset, iov[j].iov_base, iov[j].iov_len); - offset += iov[j].iov_len; - } + /* Save via Lua backend (handles file, redis, http with compression) */ + gsize total_len = 0; + for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) { + total_len += iov[j].iov_len; + } - struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx)); - ctx->cbdata = cbdata; - ctx->loop = loop; - ctx->w = w; - ctx->n = n; + unsigned char *combined = g_malloc(total_len); + gsize offset = 0; + for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) { + memcpy(combined + offset, iov[j].iov_base, iov[j].iov_len); + offset += iov[j].iov_len; + } - char entity_name[256]; - if (re_class->type_len > 0) { - rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)", - rspamd_re_cache_type_to_string(re_class->type), - (int) re_class->type_len - 1, re_class->type_data); - } - else { - rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s", - rspamd_re_cache_type_to_string(re_class->type)); - } - rspamd_hs_cache_lua_save_async(re_class->hash, entity_name, combined, total_len, rspamd_re_cache_save_cb, ctx); + struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx)); + ctx->cbdata = cbdata; + ctx->loop = loop; + ctx->w = w; + ctx->n = n; - g_free(combined); - CLEANUP_ALLOCATED(false); - g_free(hs_serialized); - ev_timer_stop(EV_A_ w); - return; + char entity_name[256]; + if (re_class->type_len > 0) { + rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)", + rspamd_re_cache_type_to_string(re_class->type), + (int) re_class->type_len - 1, re_class->type_data); } else { - /* Use file backend */ - if (writev(fd, iov, G_N_ELEMENTS(iov)) == -1) { - err = g_error_new(rspamd_re_cache_quark(), - errno, - "cannot serialize tree of regexp to %s: %s", - path, strerror(errno)); - - CLEANUP_ALLOCATED(true); - g_free(hs_serialized); - - rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false); - return; - } - - CLEANUP_ALLOCATED(false); - - /* File backend: rename temporary file to the new .hs file */ - rspamd_snprintf(npath, sizeof(npath), "%s%c%s.hs", cbdata->cache_dir, - G_DIR_SEPARATOR, re_class->hash); - - if (rename(path, npath) == -1) { - err = g_error_new(rspamd_re_cache_quark(), - errno, - "cannot rename %s to %s: %s", - path, npath, strerror(errno)); - unlink(path); - close(fd); - - rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false); - return; - } - - close(fd); - - if (re_class->type_len > 0) { - msg_info_re_cache( - "compiled class %s(%*s) to cache %6s (%s), %d/%d regexps%s%s%s", - rspamd_re_cache_type_to_string(re_class->type), - (int) re_class->type_len - 1, - re_class->type_data, - re_class->hash, - npath, - n, - (int) g_hash_table_size(re_class->re), - cache->scope ? " for scope '" : "", - cache->scope ? cache->scope : "", - cache->scope ? "'" : ""); - } - else { - msg_info_re_cache( - "compiled class %s to cache %6s (%s), %d/%d regexps%s%s%s", - rspamd_re_cache_type_to_string(re_class->type), - re_class->hash, - npath, - n, - (int) g_hash_table_size(re_class->re), - cache->scope ? " for scope '" : "", - cache->scope ? cache->scope : "", - cache->scope ? "'" : ""); - } - - cbdata->total += n; + rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s", + rspamd_re_cache_type_to_string(re_class->type)); } + rspamd_hs_cache_lua_save_async(re_class->hash, entity_name, combined, total_len, rspamd_re_cache_save_cb, ctx); - cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT; - cbdata->current_class = NULL; + g_free(combined); + CLEANUP_ALLOCATED(false); + g_free(hs_serialized); + /* Don't stop timer here - the callback (rspamd_re_cache_save_cb) handles + * restarting the timer. For file backend the callback runs synchronously + * within save_async, so stopping here would undo the timer restart. */ + return; } else { err = g_error_new(rspamd_re_cache_quark(), @@ -2805,9 +2688,45 @@ int rspamd_re_cache_compile_hyperscan(struct rspamd_re_cache *cache, ev_timer *timer; static const ev_tstamp timer_interval = 0.1; struct rspamd_re_cache_hs_compile_cbdata *cbdata; + GHashTableIter it; + gpointer k, v; cbdata = g_malloc0(sizeof(*cbdata)); - g_hash_table_iter_init(&cbdata->it, cache->re_classes); + rspamd_heap_init(re_compile_queue, &cbdata->compile_queue); + + /* + * Build priority queue for compilation order. + * Priority (lower = compile first): + * - Short lists (<100 regexps): 0 + count + * - URL type (TLD matching): 1000 + count + * - Other types: 10000 + count + */ + g_hash_table_iter_init(&it, cache->re_classes); + while (g_hash_table_iter_next(&it, &k, &v)) { + struct rspamd_re_class *re_class = v; + struct rspamd_re_compile_queue_elt elt; + unsigned int count = g_hash_table_size(re_class->re); + unsigned int base_pri; + + /* Calculate priority tier */ + if (count < 100) { + /* Short lists get highest priority */ + base_pri = 0; + } + else if (re_class->type == RSPAMD_RE_URL) { + /* URL type (TLD) gets medium priority */ + base_pri = 1000; + } + else { + /* All other types */ + base_pri = 10000; + } + + elt.pri = base_pri + count; + elt.re_class = re_class; + rspamd_heap_push_safe(re_compile_queue, &cbdata->compile_queue, &elt, heap_error); + } + cbdata->cache = cache; cbdata->cache_dir = cache_dir; cbdata->cb = cb; @@ -2824,6 +2743,11 @@ int rspamd_re_cache_compile_hyperscan(struct rspamd_re_cache *cache, ev_timer_start(event_loop, timer); return 0; + +heap_error: + rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue); + g_free(cbdata); + return -1; #endif } @@ -3608,11 +3532,8 @@ void rspamd_re_cache_load_hyperscan_scoped_async(struct rspamd_re_cache *cache_h return; } - if (!rspamd_hs_cache_has_lua_backend()) { - /* Fallback to synchronous file loading */ - (void) rspamd_re_cache_load_hyperscan_scoped(cache_head, cache_dir, try_load); - return; - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); DL_FOREACH(cache_head, cur) { @@ -3623,11 +3544,20 @@ void rspamd_re_cache_load_hyperscan_scoped_async(struct rspamd_re_cache *cache_h sctx->cache = cur; sctx->event_loop = event_loop; sctx->try_load = try_load; - sctx->pending = 0; - sctx->total = 0; sctx->loaded = 0; sctx->all_loaded = TRUE; + /* Count items first - for file backend, callbacks run synchronously, + * so we must set pending/total before starting any loads to avoid + * premature free when pending reaches 0 during the loop */ + sctx->total = g_hash_table_size(cur->re_classes); + sctx->pending = sctx->total; + + if (sctx->pending == 0) { + g_free(sctx); + continue; + } + g_hash_table_iter_init(&it, cur->re_classes); while (g_hash_table_iter_next(&it, &k, &v)) { struct rspamd_re_class *re_class = (struct rspamd_re_class *) v; @@ -3636,8 +3566,6 @@ void rspamd_re_cache_load_hyperscan_scoped_async(struct rspamd_re_cache *cache_h item->cache = cur; item->re_class = re_class; item->cache_key = g_strdup(re_class->hash); - sctx->pending++; - sctx->total++; char entity_name[256]; if (re_class->type_len > 0) { rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)", @@ -3650,10 +3578,6 @@ void rspamd_re_cache_load_hyperscan_scoped_async(struct rspamd_re_cache *cache_h } rspamd_hs_cache_lua_load_async(item->cache_key, entity_name, rspamd_re_cache_hs_load_cb, item); } - - if (sctx->pending == 0) { - g_free(sctx); - } } } #endif diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 80bd6500a8..cbc9107af6 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -2064,30 +2064,12 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main, msg_debug_hyperscan("received hyperscan loaded notification, forced=%d", cmd->cmd.hs_loaded.forced); - if (rspamd_hs_cache_has_lua_backend()) { - msg_debug_hyperscan("using async backend-based hyperscan loading"); - rspamd_re_cache_load_hyperscan_scoped_async(cache, worker->srv->event_loop, - cache_dir, false); - rep.reply.hs_loaded.status = 0; - } - else { - /* File-based loading (legacy, synchronous) */ - if (cmd->cmd.hs_loaded.scope[0] != '\0') { - const char *scope = cmd->cmd.hs_loaded.scope; - msg_debug_hyperscan("loading hyperscan expressions for scope '%s' after receiving compilation notice", scope); - rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan_scoped( - cache, cache_dir, false); - } - else { - if (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL || - cmd->cmd.hs_loaded.forced) { - msg_debug_hyperscan("loading hyperscan expressions after receiving compilation notice: %s", - (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update"); - rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan( - worker->srv->cfg->re_cache, cache_dir, false); - } - } - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); + msg_debug_hyperscan("using async backend-based hyperscan loading"); + rspamd_re_cache_load_hyperscan_scoped_async(cache, worker->srv->event_loop, + cache_dir, false); + rep.reply.hs_loaded.status = 0; if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) { msg_err("cannot write reply to the control socket: %s", @@ -2122,27 +2104,16 @@ rspamd_worker_multipattern_ready(struct rspamd_main *rspamd_main, mp = rspamd_multipattern_find_pending(name); if (mp != NULL) { - if (rspamd_hs_cache_has_lua_backend()) { - msg_debug_hyperscan("using async backend-based multipattern loading for '%s'", name); - struct rspamd_worker_mp_async_cbdata *cbd = g_malloc0(sizeof(*cbd)); - cbd->name = g_strdup(name); - cbd->cache_dir = g_strdup(cache_dir); - cbd->mp = mp; - rspamd_multipattern_load_from_cache_async(mp, cache_dir, worker->srv->event_loop, - rspamd_worker_multipattern_async_loaded, cbd); - rep.reply.hs_loaded.status = 0; - } - else { - if (rspamd_multipattern_load_from_cache(mp, cache_dir)) { - msg_debug_hyperscan("multipattern '%s' hot-swapped to hyperscan", name); - rep.reply.hs_loaded.status = 0; - } - else { - msg_warn("failed to load multipattern '%s' from cache, continuing with ACISM fallback", - name); - rep.reply.hs_loaded.status = ENOENT; - } - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); + msg_debug_hyperscan("using async backend-based multipattern loading for '%s'", name); + struct rspamd_worker_mp_async_cbdata *cbd = g_malloc0(sizeof(*cbd)); + cbd->name = g_strdup(name); + cbd->cache_dir = g_strdup(cache_dir); + cbd->mp = mp; + rspamd_multipattern_load_from_cache_async(mp, cache_dir, worker->srv->event_loop, + rspamd_worker_multipattern_async_loaded, cbd); + rep.reply.hs_loaded.status = 0; } else { msg_warn("received multipattern notification for unknown '%s'", name); @@ -2206,27 +2177,16 @@ rspamd_worker_regexp_map_ready(struct rspamd_main *rspamd_main, re_map = rspamd_regexp_map_find_pending(name); if (re_map != NULL) { - if (rspamd_hs_cache_has_lua_backend()) { - msg_debug_hyperscan("using async backend-based regexp map loading for '%s'", name); - struct rspamd_worker_remap_async_cbdata *cbd = g_malloc0(sizeof(*cbd)); - cbd->name = g_strdup(name); - cbd->cache_dir = g_strdup(cache_dir); - cbd->re_map = re_map; - rspamd_regexp_map_load_from_cache_async(re_map, cache_dir, worker->srv->event_loop, - rspamd_worker_regexp_map_async_loaded, cbd); - rep.reply.hs_loaded.status = 0; - } - else { - if (rspamd_regexp_map_load_from_cache(re_map, cache_dir)) { - msg_debug_hyperscan("regexp map '%s' hot-swapped to hyperscan", name); - rep.reply.hs_loaded.status = 0; - } - else { - msg_warn("failed to load regexp map '%s' from cache, continuing with PCRE fallback", - name); - rep.reply.hs_loaded.status = ENOENT; - } - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); + msg_debug_hyperscan("using async backend-based regexp map loading for '%s'", name); + struct rspamd_worker_remap_async_cbdata *cbd = g_malloc0(sizeof(*cbd)); + cbd->name = g_strdup(name); + cbd->cache_dir = g_strdup(cache_dir); + cbd->re_map = re_map; + rspamd_regexp_map_load_from_cache_async(re_map, cache_dir, worker->srv->event_loop, + rspamd_worker_regexp_map_async_loaded, cbd); + rep.reply.hs_loaded.status = 0; } else { msg_warn("received regexp map notification for unknown '%s'", name); diff --git a/src/libutil/multipattern.c b/src/libutil/multipattern.c index ef6926366a..cb0e9bb9ad 100644 --- a/src/libutil/multipattern.c +++ b/src/libutil/multipattern.c @@ -1512,7 +1512,11 @@ rspamd_multipattern_hs_cache_save_cb(gboolean success, (void) data; (void) len; - if (!success) { + if (success) { + msg_info("saved hyperscan multipattern cache %s to Lua backend", + ctx->cache_key ? ctx->cache_key : "(null)"); + } + else { g_set_error(&err, rspamd_multipattern_quark(), EIO, "cannot save multipattern cache %s: %s", ctx->cache_key ? ctx->cache_key : "(null)", @@ -1540,17 +1544,8 @@ void rspamd_multipattern_compile_hs_to_cache_async(struct rspamd_multipattern *m (void) event_loop; - if (!rspamd_hs_cache_has_lua_backend()) { - /* Legacy file-only path */ - gboolean ok = rspamd_multipattern_compile_hs_to_cache(mp, cache_dir, &err); - if (cb) { - cb(mp, ok, err, ud); - } - if (err) { - g_error_free(err); - } - return; - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); #ifdef WITH_HYPERSCAN hs_platform_info_t plt; @@ -1629,8 +1624,6 @@ void rspamd_multipattern_compile_hs_to_cache_async(struct rspamd_multipattern *m rspamd_multipattern_hs_cache_save_cb, ctx); g_free(bytes); - - msg_info("saved hyperscan multipattern database to Lua backend (%z bytes)", len); #else if (cb) { g_set_error(&err, rspamd_multipattern_quark(), ENOTSUP, @@ -1795,22 +1788,16 @@ void rspamd_multipattern_load_from_cache_async(struct rspamd_multipattern *mp, rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs", (int) rspamd_cryptobox_HASHBYTES / 2, hash); - if (rspamd_hs_cache_has_lua_backend()) { - struct rspamd_multipattern_load_ctx *ctx = g_malloc0(sizeof(*ctx)); - ctx->mp = mp; - ctx->cache_dir = g_strdup(cache_dir); - ctx->cache_key = g_strdup(cache_key); - ctx->cb = cb; - ctx->ud = ud; - (void) event_loop; - rspamd_hs_cache_lua_load_async(ctx->cache_key, "multipattern", rspamd_multipattern_load_from_cache_cb, ctx); - return; - } - - /* File backend fallback (synchronous) */ - if (cb) { - cb(rspamd_multipattern_load_from_cache(mp, cache_dir), ud); - } + /* All file operations go through Lua backend */ + g_assert(rspamd_hs_cache_has_lua_backend()); + struct rspamd_multipattern_load_ctx *ctx = g_malloc0(sizeof(*ctx)); + ctx->mp = mp; + ctx->cache_dir = g_strdup(cache_dir); + ctx->cache_key = g_strdup(cache_key); + ctx->cb = cb; + ctx->ud = ud; + (void) event_loop; + rspamd_hs_cache_lua_load_async(ctx->cache_key, "multipattern", rspamd_multipattern_load_from_cache_cb, ctx); #else (void) mp; (void) cache_dir; diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c index 6bfdd32162..0a0debe5a1 100644 --- a/src/lua/lua_util.c +++ b/src/lua/lua_util.c @@ -22,6 +22,7 @@ #include "libutil/hash.h" #include "libutil/str_util.h" #include "libserver/html/html.h" +#include "libserver/hyperscan_tools.h" #include "lua_parsers.h" @@ -418,6 +419,25 @@ LUA_FUNCTION_DEF(util, create_file); */ LUA_FUNCTION_DEF(util, close_file); +/*** + * @function util.hyperscan_notice_known(fname) + * Notifies the hyperscan cache system that a file is known and should not be + * deleted during cleanup. This should be called when loading cached hyperscan + * databases from files. + * + * @param {string} fname path to the hyperscan cache file + */ +LUA_FUNCTION_DEF(util, hyperscan_notice_known); + +/*** + * @function util.hyperscan_get_platform_id() + * Returns the platform identifier string for hyperscan cache keys. + * This includes the hyperscan version, platform tune, and CPU features. + * + * @return {string} platform identifier (e.g., "hs54_haswell_avx2_abc123") + */ +LUA_FUNCTION_DEF(util, hyperscan_get_platform_id); + /*** * @function util.random_hex(size) * Returns random hex string of the specified size @@ -803,6 +823,8 @@ static const struct luaL_reg utillib_f[] = { LUA_INTERFACE_DEF(util, unlock_file), LUA_INTERFACE_DEF(util, create_file), LUA_INTERFACE_DEF(util, close_file), + LUA_INTERFACE_DEF(util, hyperscan_notice_known), + LUA_INTERFACE_DEF(util, hyperscan_get_platform_id), LUA_INTERFACE_DEF(util, random_hex), LUA_INTERFACE_DEF(util, zstd_compress), LUA_INTERFACE_DEF(util, zstd_decompress), @@ -2249,6 +2271,37 @@ lua_util_close_file(lua_State *L) return 1; } +static int +lua_util_hyperscan_notice_known(lua_State *L) +{ + LUA_TRACE_POINT; +#ifdef WITH_HYPERSCAN + const char *fname = luaL_checkstring(L, 1); + + if (fname) { + rspamd_hyperscan_notice_known(fname); + } +#else + (void) L; +#endif + + return 0; +} + +static int +lua_util_hyperscan_get_platform_id(lua_State *L) +{ + LUA_TRACE_POINT; +#ifdef WITH_HYPERSCAN + const char *platform_id = rspamd_hyperscan_get_platform_id(); + lua_pushstring(L, platform_id); +#else + lua_pushstring(L, "no_hyperscan"); +#endif + + return 1; +} + static int lua_util_random_hex(lua_State *L) { -- 2.47.3