local file_backend = {}
file_backend.__index = file_backend
+-- Zstd magic number: 0xFD2FB528 (little-endian bytes: 28 B5 2F FD)
+local ZSTD_MAGIC = string.char(0x28, 0xB5, 0x2F, 0xFD)
+
function file_backend.new(config)
local self = setmetatable({}, file_backend)
- self.cache_dir = config.cache_dir or '/var/lib/rspamd/hs_cache'
- self.platform_dirs = config.platform_dirs ~= false
+ -- Store config for logging context
+ self.config = config.rspamd_config or config
+ -- Remove trailing slashes from cache_dir
+ local cache_dir = config.cache_dir or '/var/lib/rspamd/hs_cache'
+ self.cache_dir = cache_dir:gsub("/+$", "")
+ -- Default to flat directory structure for backward compatibility
+ self.platform_dirs = config.platform_dirs == true
+ -- Enable compression by default (consistent with redis/http backends)
+ local opts = config.file or config
+ self.use_compression = (opts.compression ~= false) and (config.compression ~= false)
+ lua_util.debugm(N, self.config, "file backend config: cache_dir=%s, platform_dirs=%s, compression=%s",
+ self.cache_dir, self.platform_dirs and "yes" or "no",
+ self.use_compression and "enabled" or "disabled")
return self
end
-function file_backend:_get_path(cache_key, platform_id)
+-- Get file extension based on compression setting
+function file_backend:_get_extension()
+ return self.use_compression and '.hs.zst' or '.hs'
+end
+
+-- Get the path for a cache file
+-- @param cache_key string cache key (hash that already includes platform info)
+-- @param platform_id string platform identifier (unused in flat mode for backward compat)
+-- @param ext string optional extension override (e.g., '.hs' or '.hs.zst')
+function file_backend:_get_path(cache_key, platform_id, ext)
+ local extension = ext or self:_get_extension()
if self.platform_dirs then
- return string.format("%s/%s/%s.hs", self.cache_dir, platform_id, cache_key)
+ -- Optional: use platform subdirectories (not default)
+ return string.format("%s/%s/%s%s", self.cache_dir, platform_id, cache_key, extension)
else
- return string.format("%s/%s_%s.hs", self.cache_dir, platform_id, cache_key)
+ -- Default: flat structure matching original C code behavior
+ -- Platform info is already embedded in cache_key hash
+ return string.format("%s/%s%s", self.cache_dir, cache_key, extension)
+ end
+end
+
+-- Check if data starts with zstd magic bytes
+function file_backend:_is_zstd(data)
+ if not data or #data < 4 then
+ return false
end
+ return data:sub(1, 4) == ZSTD_MAGIC
+end
+
+-- Find existing cache file, trying both compressed and uncompressed extensions
+-- Returns: path, is_compressed (or nil if not found)
+function file_backend:_find_existing_path(cache_key, platform_id)
+ -- Try compressed first if compression is enabled, otherwise try uncompressed first
+ local primary_ext = self:_get_extension()
+ local secondary_ext = self.use_compression and '.hs' or '.hs.zst'
+
+ local primary_path = self:_get_path(cache_key, platform_id, primary_ext)
+ -- rspamd_util.stat returns (err, stat_table) - check for no error AND valid stat
+ local err, stat = rspamd_util.stat(primary_path)
+ if not err and stat then
+ return primary_path, primary_ext == '.hs.zst'
+ end
+
+ local secondary_path = self:_get_path(cache_key, platform_id, secondary_ext)
+ err, stat = rspamd_util.stat(secondary_path)
+ if not err and stat then
+ return secondary_path, secondary_ext == '.hs.zst'
+ end
+
+ return nil, nil
end
function file_backend:_ensure_dir(path)
end
function file_backend:exists(cache_key, platform_id, callback)
- local path = self:_get_path(cache_key, platform_id)
- local stat = rspamd_util.stat(path)
-
- if stat then
- lua_util.debugm(N, "file exists check: %s found, size: %d", path, stat.size)
- callback(nil, true, { size = stat.size, mtime = stat.mtime })
+ local path, is_compressed = self:_find_existing_path(cache_key, platform_id)
+
+ if path then
+ local err, stat = rspamd_util.stat(path)
+ if not err and stat then
+ lua_util.debugm(N, self.config, "file exists check: %s found, size: %d, compressed: %s",
+ path, stat.size, is_compressed and "yes" or "no")
+ callback(nil, true, { size = stat.size, mtime = stat.mtime, compressed = is_compressed })
+ else
+ -- Race condition: file disappeared between _find_existing_path and stat
+ lua_util.debugm(N, self.config, "file exists check: %s disappeared (race)", path)
+ callback(nil, false, nil)
+ end
else
- lua_util.debugm(N, "file exists check: %s not found", path)
+ local expected_path = self:_get_path(cache_key, platform_id)
+ lua_util.debugm(N, self.config, "file exists check: %s not found (checked both extensions)", expected_path)
callback(nil, false, nil)
end
end
function file_backend:load(cache_key, platform_id, callback)
- local path = self:_get_path(cache_key, platform_id)
+ local path, expected_compressed = self:_find_existing_path(cache_key, platform_id)
- lua_util.debugm(N, "file load from: %s", path)
+ if not path then
+ local expected_path = self:_get_path(cache_key, platform_id)
+ lua_util.debugm(N, self.config, "file load failed: %s not found (checked both extensions)", expected_path)
+ callback("file not found", nil)
+ return
+ end
- local data, err = rspamd_util.read_file(path)
- if data then
- lua_util.debugm(N, "file loaded %d bytes from %s", #data, path)
- callback(nil, data)
+ lua_util.debugm(N, self.config, "file load from: %s (expected compressed: %s)", path, expected_compressed and "yes" or "no")
+
+ local f, err = io.open(path, "rb")
+ if not f then
+ lua_util.debugm(N, self.config, "file load failed from %s: %s", path, err or "open error")
+ callback(err or "open error", nil)
+ return
+ end
+ local data = f:read("*a")
+ f:close()
+ if not data then
+ lua_util.debugm(N, self.config, "file read failed from %s", path)
+ callback("read error", nil)
+ return
+ end
+
+ -- Check if data is actually zstd compressed (magic byte verification)
+ local is_zstd = self:_is_zstd(data)
+ lua_util.debugm(N, self.config, "file loaded %d bytes from %s, zstd magic: %s",
+ #data, path, is_zstd and "yes" or "no")
+
+ -- Notify hyperscan cache that this file is known (for cleanup tracking)
+ rspamd_util.hyperscan_notice_known(path)
+
+ if is_zstd then
+ -- Decompress the data
+ local decompress_err, decompressed = rspamd_util.zstd_decompress(data)
+ if not decompress_err and decompressed then
+ lua_util.debugm(N, self.config, "file decompressed %d -> %d bytes from %s (compression ratio: %.1f%%)",
+ #data, #decompressed, path, (1 - #data / #decompressed) * 100)
+ callback(nil, decompressed)
+ else
+ lua_util.debugm(N, self.config, "file decompression failed for %s: %s", path, decompress_err or "unknown error")
+ callback(decompress_err or "decompression failed", nil)
+ end
else
- lua_util.debugm(N, "file load failed from %s: %s", path, err or "file not found")
- callback(err or "file not found", nil)
+ -- Data is not compressed, return as-is
+ if expected_compressed then
+ lua_util.debugm(N, self.config, "file %s has .zst extension but no zstd magic - treating as uncompressed", path)
+ end
+ callback(nil, data)
end
end
function file_backend:store(cache_key, platform_id, data, _ttl, callback)
local path = self:_get_path(cache_key, platform_id)
+ lua_util.debugm(N, self.config, "file store to: %s, original size: %d bytes, compression: %s",
+ path, #data, self.use_compression and "enabled" or "disabled")
+
self:_ensure_dir(path)
+ local store_data = data
+ -- Compress if enabled
+ if self.use_compression then
+ local compressed, compress_err = rspamd_util.zstd_compress(data)
+ if compressed then
+ lua_util.debugm(N, self.config, "file compressed %d -> %d bytes (%.1f%% size reduction) for %s",
+ #data, #compressed, (1 - #compressed / #data) * 100, path)
+ store_data = compressed
+ else
+ logger.warnx(N, "compression failed: %s, storing uncompressed to %s", compress_err, path)
+ end
+ end
+
-- Write to temp file first, then rename atomically
local tmp_path = path .. ".tmp." .. rspamd_util.random_hex(8)
- local ok, err = rspamd_util.write_file(tmp_path, data)
+ -- store_data can be string or rspamd_text userdata
+ local ok, write_err
+ if type(store_data) == "userdata" and store_data.save_in_file then
+ ok, write_err = store_data:save_in_file(tmp_path)
+ else
+ local f, err = io.open(tmp_path, "wb")
+ if not f then
+ callback(err or "open failed")
+ return
+ end
+ ok, write_err = f:write(store_data)
+ f:close()
+ end
+ if not ok then
+ os.remove(tmp_path)
+ callback(write_err or "write failed")
+ return
+ end
- if ok then
+ do
local renamed, rename_err = os.rename(tmp_path, path)
if renamed then
- lua_util.debugm(N, "stored %d bytes to %s", #data, path)
+ lua_util.debugm(N, self.config, "stored %d bytes to %s", #store_data, path)
+ -- Notify hyperscan cache that this file is known (for cleanup tracking)
+ rspamd_util.hyperscan_notice_known(path)
+ -- Remove old file with opposite extension if it exists (migration cleanup)
+ local old_ext = self.use_compression and '.hs' or '.hs.zst'
+ local old_path = self:_get_path(cache_key, platform_id, old_ext)
+ local old_err, old_stat = rspamd_util.stat(old_path)
+ if not old_err and old_stat then
+ local removed = os.remove(old_path)
+ if removed then
+ lua_util.debugm(N, self.config, "removed old cache file %s (migrated to %s)", old_path, path)
+ end
+ end
callback(nil)
else
os.remove(tmp_path)
callback(rename_err or "rename failed")
end
- else
- callback(err or "write failed")
end
end
function file_backend:delete(cache_key, platform_id, callback)
- local path = self:_get_path(cache_key, platform_id)
- local ok, err = os.remove(path)
+ -- Try to delete both compressed and uncompressed versions
+ local deleted_any = false
+ local last_err = nil
+
+ for _, ext in ipairs({'.hs', '.hs.zst'}) do
+ local path = self:_get_path(cache_key, platform_id, ext)
+ local stat_err, stat = rspamd_util.stat(path)
+ if not stat_err and stat then
+ local ok, err = os.remove(path)
+ if ok then
+ lua_util.debugm(N, self.config, "deleted %s", path)
+ deleted_any = true
+ else
+ last_err = err
+ end
+ end
+ end
- if ok then
- lua_util.debugm(N, "deleted %s", path)
+ if deleted_any then
callback(nil)
else
- callback(err or "delete failed")
+ callback(last_err or "file not found")
end
end
function file_backend:exists_sync(cache_key, platform_id)
- local path = self:_get_path(cache_key, platform_id)
- local exists = rspamd_util.stat(path) ~= nil
- lua_util.debugm(N, "file sync exists check: %s %s", path, exists and "found" or "not found")
- return exists, nil
+ local path, is_compressed = self:_find_existing_path(cache_key, platform_id)
+ if path then
+ lua_util.debugm(N, self.config, "file sync exists check: %s found (compressed: %s)",
+ path, is_compressed and "yes" or "no")
+ return true, nil
+ else
+ local expected_path = self:_get_path(cache_key, platform_id)
+ lua_util.debugm(N, self.config, "file sync exists check: %s not found (checked both extensions)", expected_path)
+ return false, nil
+ end
end
function file_backend:save_async(cache_key, platform_id, data, callback)
end
function file_backend:load_sync(cache_key, platform_id)
- local path = self:_get_path(cache_key, platform_id)
- lua_util.debugm(N, "file sync load from: %s", path)
- local data, err = rspamd_util.read_file(path)
- if data then
- lua_util.debugm(N, "file sync loaded %d bytes from %s", #data, path)
+ local path, expected_compressed = self:_find_existing_path(cache_key, platform_id)
+
+ if not path then
+ local expected_path = self:_get_path(cache_key, platform_id)
+ lua_util.debugm(N, self.config, "file sync load failed: %s not found (checked both extensions)", expected_path)
+ return nil, "file not found"
+ end
+
+ lua_util.debugm(N, self.config, "file sync load from: %s (expected compressed: %s)",
+ path, expected_compressed and "yes" or "no")
+
+ local f, err = io.open(path, "rb")
+ if not f then
+ lua_util.debugm(N, self.config, "file sync load failed from %s: %s", path, err or "open error")
+ return nil, err or "open error"
+ end
+ local data = f:read("*a")
+ f:close()
+ if not data then
+ lua_util.debugm(N, self.config, "file sync read failed from %s", path)
+ return nil, "read error"
+ end
+
+ -- Check if data is actually zstd compressed (magic byte verification)
+ local is_zstd = self:_is_zstd(data)
+ lua_util.debugm(N, self.config, "file sync loaded %d bytes from %s, zstd magic: %s",
+ #data, path, is_zstd and "yes" or "no")
+
+ -- Notify hyperscan cache that this file is known (for cleanup tracking)
+ rspamd_util.hyperscan_notice_known(path)
+
+ if is_zstd then
+ -- Decompress the data
+ local decompress_err, decompressed = rspamd_util.zstd_decompress(data)
+ if not decompress_err and decompressed then
+ lua_util.debugm(N, self.config, "file sync decompressed %d -> %d bytes from %s (compression ratio: %.1f%%)",
+ #data, #decompressed, path, (1 - #data / #decompressed) * 100)
+ return decompressed, nil
+ else
+ lua_util.debugm(N, self.config, "file sync decompression failed for %s: %s", path, decompress_err or "unknown error")
+ return nil, decompress_err or "decompression failed"
+ end
else
- lua_util.debugm(N, "file sync load failed from %s: %s", path, err or "file not found")
+ -- Data is not compressed, return as-is
+ if expected_compressed then
+ lua_util.debugm(N, self.config, "file %s has .zst extension but no zstd magic - treating as uncompressed", path)
+ end
+ return data, nil
end
- return data, err
end
function file_backend:save_sync(cache_key, platform_id, data)
local path = self:_get_path(cache_key, platform_id)
- lua_util.debugm(N, "file sync save to: %s, size: %d bytes", path, #data)
+ lua_util.debugm(N, self.config, "file sync save to: %s, original size: %d bytes, compression: %s",
+ path, #data, self.use_compression and "enabled" or "disabled")
self:_ensure_dir(path)
+ local store_data = data
+ -- Compress if enabled
+ if self.use_compression then
+ local compressed, compress_err = rspamd_util.zstd_compress(data)
+ if compressed then
+ lua_util.debugm(N, self.config, "file sync compressed %d -> %d bytes (%.1f%% size reduction) for %s",
+ #data, #compressed, (1 - #compressed / #data) * 100, path)
+ store_data = compressed
+ else
+ logger.warnx(N, "compression failed: %s, storing uncompressed to %s", compress_err, path)
+ end
+ end
+
local tmp_path = path .. ".tmp." .. rspamd_util.random_hex(8)
- local ok, err = rspamd_util.write_file(tmp_path, data)
+ -- store_data can be string or rspamd_text userdata
+ local ok, write_err
+ if type(store_data) == "userdata" and store_data.save_in_file then
+ ok, write_err = store_data:save_in_file(tmp_path)
+ else
+ local f, err = io.open(tmp_path, "wb")
+ if not f then
+ lua_util.debugm(N, self.config, "file sync open failed for %s: %s", tmp_path, err)
+ return false, err
+ end
+ ok, write_err = f:write(store_data)
+ f:close()
+ end
if not ok then
- lua_util.debugm(N, "file sync write failed to %s: %s", tmp_path, err)
- return false, err
+ lua_util.debugm(N, self.config, "file sync write failed to %s: %s", tmp_path, write_err)
+ os.remove(tmp_path)
+ return false, write_err
end
local renamed, rename_err = os.rename(tmp_path, path)
if not renamed then
- lua_util.debugm(N, "file sync rename failed %s -> %s: %s", tmp_path, path, rename_err)
+ lua_util.debugm(N, self.config, "file sync rename failed %s -> %s: %s", tmp_path, path, rename_err)
os.remove(tmp_path)
return false, rename_err
end
- lua_util.debugm(N, "file sync stored %d bytes to %s", #data, path)
+ lua_util.debugm(N, self.config, "file sync stored %d bytes to %s", #store_data, path)
+
+ -- Notify hyperscan cache that this file is known (for cleanup tracking)
+ rspamd_util.hyperscan_notice_known(path)
+
+ -- Remove old file with opposite extension if it exists (migration cleanup)
+ local old_ext = self.use_compression and '.hs' or '.hs.zst'
+ local old_path = self:_get_path(cache_key, platform_id, old_ext)
+ local old_err, old_stat = rspamd_util.stat(old_path)
+ if not old_err and old_stat then
+ local removed = os.remove(old_path)
+ if removed then
+ lua_util.debugm(N, self.config, "removed old cache file %s (migrated to %s)", old_path, path)
+ end
+ end
+
return true, nil
end
local default_prefix = self.use_compression and 'rspamd_zhs' or 'rspamd_hs'
self.prefix = opts.prefix or config.prefix or default_prefix
- lua_util.debugm(N, "redis backend config: prefix=%s, ttl=%s, refresh_ttl=%s, compression=%s",
+ lua_util.debugm(N, self.config, "redis backend config: prefix=%s, ttl=%s, refresh_ttl=%s, compression=%s",
self.prefix, self.default_ttl, self.refresh_ttl, self.use_compression)
return self
return
end
- lua_util.debugm(N, "redis EXISTS check for key: %s", key)
+ lua_util.debugm(N, self.config, "redis EXISTS check for key: %s", key)
local attrs = {
ev_base = self.redis_params.ev_base,
config = self.config,
callback = function(err, data)
if err then
- lua_util.debugm(N, "redis EXISTS failed for key %s: %s", key, err)
+ lua_util.debugm(N, self.config, "redis EXISTS failed for key %s: %s", key, err)
callback(err, false, nil)
else
- lua_util.debugm(N, "redis EXISTS result for key %s: %s", key, data == 1 and "found" or "not found")
+ lua_util.debugm(N, self.config, "redis EXISTS result for key %s: %s", key, data == 1 and "found" or "not found")
callback(nil, data == 1, nil)
end
end
-- Use GETEX to refresh TTL on read if enabled
local req
if self.refresh_ttl then
- lua_util.debugm(N, "redis GETEX (with TTL refresh %d) for key: %s", self.default_ttl, key)
+ lua_util.debugm(N, self.config, "redis GETEX (with TTL refresh %d) for key: %s", self.default_ttl, key)
req = {'GETEX', key, 'EX', tostring(self.default_ttl)}
else
- lua_util.debugm(N, "redis GET for key: %s", key)
+ lua_util.debugm(N, self.config, "redis GET for key: %s", key)
req = {'GET', key}
end
config = self.config,
callback = function(err, data)
if err then
- lua_util.debugm(N, "redis GET failed for key %s: %s", key, err)
+ lua_util.debugm(N, self.config, "redis GET failed for key %s: %s", key, err)
callback(err, nil)
elseif not data then
- lua_util.debugm(N, "redis cache miss for key %s", key)
+ lua_util.debugm(N, self.config, "redis cache miss for key %s", key)
callback("not found", nil)
else
-- Decompress if needed
if self.use_compression then
local decompress_err, decompressed = rspamd_util.zstd_decompress(data)
if not decompress_err and decompressed then
- lua_util.debugm(N, "redis loaded and decompressed %d -> %d bytes from key %s (compression ratio: %.1f%%)",
+ lua_util.debugm(N, self.config, "redis loaded and decompressed %d -> %d bytes from key %s (compression ratio: %.1f%%)",
#data, #decompressed, key, (1 - #data / #decompressed) * 100)
callback(nil, decompressed)
else
- lua_util.debugm(N, "redis decompression failed for key %s: %s", key, decompress_err)
+ lua_util.debugm(N, self.config, "redis decompression failed for key %s: %s", key, decompress_err)
callback(decompress_err or "decompression failed", nil)
end
else
- lua_util.debugm(N, "redis loaded %d bytes (uncompressed) from key %s", #data, key)
+ lua_util.debugm(N, self.config, "redis loaded %d bytes (uncompressed) from key %s", #data, key)
callback(nil, data)
end
end
return
end
- lua_util.debugm(N, "redis SETEX for key: %s, original size: %d bytes, TTL: %d, compression: %s",
+ lua_util.debugm(N, self.config, "redis SETEX for key: %s, original size: %d bytes, TTL: %d, compression: %s",
key, #data, actual_ttl, self.use_compression and "enabled" or "disabled")
local store_data = data
if self.use_compression then
local compressed, compress_err = rspamd_util.zstd_compress(data)
if compressed then
- lua_util.debugm(N, "redis compressed %d -> %d bytes (%.1f%% size reduction) for key %s",
+ lua_util.debugm(N, self.config, "redis compressed %d -> %d bytes (%.1f%% size reduction) for key %s",
#data, #compressed, (1 - #compressed / #data) * 100, key)
store_data = compressed
else
config = self.config,
callback = function(err)
if err then
- lua_util.debugm(N, "redis SETEX failed for key %s: %s", key, err)
+ lua_util.debugm(N, self.config, "redis SETEX failed for key %s: %s", key, err)
callback(err)
else
- lua_util.debugm(N, "redis stored %d bytes to key %s with TTL %d",
+ lua_util.debugm(N, self.config, "redis stored %d bytes to key %s with TTL %d",
#store_data, key, actual_ttl)
callback(nil)
end
return
end
- lua_util.debugm(N, "redis DEL for key: %s", key)
+ lua_util.debugm(N, self.config, "redis DEL for key: %s", key)
local attrs = {
ev_base = self.redis_params.ev_base,
config = self.config,
callback = function(err)
if err then
- lua_util.debugm(N, "redis DEL failed for key %s: %s", key, err)
+ lua_util.debugm(N, self.config, "redis DEL failed for key %s: %s", key, err)
callback(err)
else
- lua_util.debugm(N, "redis deleted key %s", key)
+ lua_util.debugm(N, self.config, "redis deleted key %s", key)
callback(nil)
end
end
function http_backend.new(config)
local self = setmetatable({}, http_backend)
+ -- Store config for logging context
+ self.config = config.rspamd_config or config
-- HTTP config can be in 'http' sub-section or at top level
local opts = config.http or config
function http_backend:exists(cache_key, platform_id, callback)
local url = self:_get_url(cache_key, platform_id)
- lua_util.debugm(N, "http HEAD check for url: %s", url)
+ lua_util.debugm(N, self.config, "http HEAD check for url: %s", url)
rspamd_http.request({
url = url,
timeout = self.timeout,
callback = function(err, code, _, headers)
if err then
- lua_util.debugm(N, "http HEAD failed for %s: %s", url, err)
+ lua_util.debugm(N, self.config, "http HEAD failed for %s: %s", url, err)
callback(err, false, nil)
elseif code == 200 then
local size = headers and headers['content-length']
- lua_util.debugm(N, "http HEAD found %s, size: %s", url, size or "unknown")
+ lua_util.debugm(N, self.config, "http HEAD found %s, size: %s", url, size or "unknown")
callback(nil, true, { size = tonumber(size) })
else
- lua_util.debugm(N, "http HEAD not found %s (code: %d)", url, code)
+ lua_util.debugm(N, self.config, "http HEAD not found %s (code: %d)", url, code)
callback(nil, false, nil)
end
end
function http_backend:load(cache_key, platform_id, callback)
local url = self:_get_url(cache_key, platform_id)
- lua_util.debugm(N, "http GET for url: %s", url)
+ lua_util.debugm(N, self.config, "http GET for url: %s", url)
rspamd_http.request({
url = url,
timeout = self.timeout,
callback = function(err, code, body, headers)
if err then
- lua_util.debugm(N, "http GET failed for %s: %s", url, err)
+ lua_util.debugm(N, self.config, "http GET failed for %s: %s", url, err)
callback(err, nil)
elseif code == 200 and body then
-- Check if content is compressed
if content_encoding == 'zstd' or self.use_compression then
local decompress_err, decompressed = rspamd_util.zstd_decompress(body)
if not decompress_err and decompressed then
- lua_util.debugm(N, "http loaded and decompressed %d -> %d bytes from %s",
+ lua_util.debugm(N, self.config, "http loaded and decompressed %d -> %d bytes from %s",
#body, #decompressed, url)
callback(nil, decompressed)
else
- lua_util.debugm(N, "http loaded %d bytes (no decompression) from %s", #body, url)
+ lua_util.debugm(N, self.config, "http loaded %d bytes (no decompression) from %s", #body, url)
callback(nil, body)
end
else
- lua_util.debugm(N, "http loaded %d bytes from %s", #body, url)
+ lua_util.debugm(N, self.config, "http loaded %d bytes from %s", #body, url)
callback(nil, body)
end
elseif code == 404 then
- lua_util.debugm(N, "http cache miss (404) for %s", url)
+ lua_util.debugm(N, self.config, "http cache miss (404) for %s", url)
callback("not found", nil)
else
- lua_util.debugm(N, "http GET failed for %s: HTTP %d", url, code)
+ lua_util.debugm(N, self.config, "http GET failed for %s: HTTP %d", url, code)
callback(string.format("HTTP %d", code), nil)
end
end
local url = self:_get_url(cache_key, platform_id)
local headers = self:_get_headers()
- lua_util.debugm(N, "http PUT for url: %s, original size: %d bytes, compression: %s",
+ lua_util.debugm(N, self.config, "http PUT for url: %s, original size: %d bytes, compression: %s",
url, #data, self.use_compression and "enabled" or "disabled")
local store_data = data
if self.use_compression then
local compressed = rspamd_util.zstd_compress(data)
if compressed then
- lua_util.debugm(N, "http compressed %d -> %d bytes (%.1f%% size reduction) for %s",
+ lua_util.debugm(N, self.config, "http compressed %d -> %d bytes (%.1f%% size reduction) for %s",
#data, #compressed, (1 - #compressed / #data) * 100, url)
store_data = compressed
headers['Content-Encoding'] = 'zstd'
timeout = self.timeout,
callback = function(err, code)
if err then
- lua_util.debugm(N, "http PUT failed for %s: %s", url, err)
+ lua_util.debugm(N, self.config, "http PUT failed for %s: %s", url, err)
callback(err)
elseif code >= 200 and code < 300 then
- lua_util.debugm(N, "http stored %d bytes to %s", #store_data, url)
+ lua_util.debugm(N, self.config, "http stored %d bytes to %s", #store_data, url)
callback(nil)
else
- lua_util.debugm(N, "http PUT failed for %s: HTTP %d", url, code)
+ lua_util.debugm(N, self.config, "http PUT failed for %s: HTTP %d", url, code)
callback(string.format("HTTP %d", code))
end
end
function http_backend:delete(cache_key, platform_id, callback)
local url = self:_get_url(cache_key, platform_id)
- lua_util.debugm(N, "http DELETE for url: %s", url)
+ lua_util.debugm(N, self.config, "http DELETE for url: %s", url)
rspamd_http.request({
url = url,
timeout = self.timeout,
callback = function(err, code)
if err then
- lua_util.debugm(N, "http DELETE failed for %s: %s", url, err)
+ lua_util.debugm(N, self.config, "http DELETE failed for %s: %s", url, err)
callback(err)
elseif code >= 200 and code < 300 or code == 404 then
- lua_util.debugm(N, "http deleted %s", url)
+ lua_util.debugm(N, self.config, "http deleted %s", url)
callback(nil)
else
- lua_util.debugm(N, "http DELETE failed for %s: HTTP %d", url, code)
+ lua_util.debugm(N, self.config, "http DELETE failed for %s: HTTP %d", url, code)
callback(string.format("HTTP %d", code))
end
end
function exports.create_backend(config)
local backend_type = config.backend or config.cache_backend or 'file'
- lua_util.debugm(N, "creating hyperscan cache backend: %s", backend_type)
+ local cfg = config.rspamd_config or config
+ lua_util.debugm(N, cfg, "creating hyperscan cache backend: %s", backend_type)
-- Always pass full config - backends will extract what they need
-- (config contains ev_base, rspamd_config at top level, plus optional
-- redis/http sub-sections for backend-specific settings)
if backend_type == 'file' then
local be = file_backend.new(config)
- lua_util.debugm(N, "file backend created, cache_dir: %s", be.cache_dir or "not set")
+ lua_util.debugm(N, be.config, "file backend created, cache_dir: %s, compression: %s",
+ be.cache_dir or "not set", be.use_compression and "enabled" or "disabled")
return be
elseif backend_type == 'redis' then
local be = redis_backend.new(config)
if be.redis_params then
- lua_util.debugm(N, "redis backend created, prefix: %s, compression: %s",
+ lua_util.debugm(N, be.config, "redis backend created, prefix: %s, compression: %s",
be.prefix, be.use_compression and "enabled" or "disabled")
else
logger.errx(N, "redis backend created but no redis params - operations will fail!")
return be
elseif backend_type == 'http' then
local be = http_backend.new(config)
- lua_util.debugm(N, "http backend created, base_url: %s", be.base_url or "not set")
+ lua_util.debugm(N, be.config, "http backend created, base_url: %s", be.base_url or "not set")
return be
else
logger.errx(N, "unknown hyperscan cache backend: %s, falling back to file", backend_type)
#include "libserver/hs_cache_backend.h"
#include "libutil/util.h"
#include "libutil/regexp.h"
+#include "libutil/heap.h"
#include "lua/lua_common.h"
#include "libstat/stat_api.h"
#include "contrib/uthash/utlist.h"
RSPAMD_RE_CACHE_COMPILE_STATE_SAVING
};
+/* Heap element for priority compilation queue */
+struct rspamd_re_compile_queue_elt {
+ unsigned int pri; /* Priority: lower = compile first */
+ unsigned int idx; /* Heap index (managed by heap) */
+ struct rspamd_re_class *re_class;
+};
+
+RSPAMD_HEAP_DECLARE(re_compile_queue, struct rspamd_re_compile_queue_elt);
+
struct rspamd_re_cache_hs_compile_cbdata {
- GHashTableIter it;
+ re_compile_queue_t compile_queue; /* Priority queue of re_classes to compile */
struct rspamd_re_cache *cache;
const char *cache_dir;
double max_time;
if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) {
ev_timer_stop(EV_A_ w);
cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue);
g_free(w);
g_free(cbdata);
re_class = cbdata->current_class;
}
else {
- if (!g_hash_table_iter_next(&cbdata->it, &k, &v)) {
+ /* Pop next item from priority queue */
+ struct rspamd_re_compile_queue_elt *elt =
+ rspamd_heap_pop(re_compile_queue, &cbdata->compile_queue);
+ if (elt == NULL) {
/* All done */
ev_timer_stop(EV_A_ w);
cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue);
g_free(w);
g_free(cbdata);
return;
}
- re_class = v;
+ re_class = elt->re_class;
cbdata->current_class = re_class;
cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS;
}
if (cbdata->state == RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS) {
- if (rspamd_hs_cache_has_lua_backend()) {
- struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx));
- ctx->cbdata = cbdata;
- ctx->loop = loop;
- ctx->w = w;
- char entity_name[256];
- if (re_class->type_len > 0) {
- rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)",
- rspamd_re_cache_type_to_string(re_class->type),
- (int) re_class->type_len - 1, re_class->type_data);
- }
- else {
- rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s",
- rspamd_re_cache_type_to_string(re_class->type));
- }
- rspamd_hs_cache_lua_exists_async(re_class->hash, entity_name, rspamd_re_cache_exists_cb, ctx);
- ev_timer_stop(EV_A_ w);
- return;
+ /* Check via Lua backend (handles file, redis, http) */
+ struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx));
+ ctx->cbdata = cbdata;
+ ctx->loop = loop;
+ ctx->w = w;
+ char entity_name[256];
+ if (re_class->type_len > 0) {
+ rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)",
+ rspamd_re_cache_type_to_string(re_class->type),
+ (int) re_class->type_len - 1, re_class->type_data);
}
-
- /* Check file backend */
- rspamd_snprintf(path, sizeof(path), "%s%c%s.hs", cbdata->cache_dir,
- G_DIR_SEPARATOR, re_class->hash);
- if (rspamd_re_cache_is_valid_hyperscan_file(cache, path, TRUE, TRUE, NULL)) {
- /* Read number of regexps for logging */
- fd = open(path, O_RDONLY, 00600);
-
- if (fd != -1) {
- if (lseek(fd, RSPAMD_HS_MAGIC_LEN + sizeof(cache->plt), SEEK_SET) != -1) {
- if (read(fd, &n, sizeof(n)) != sizeof(n)) {
- n = 0;
- }
- }
- close(fd);
- }
-
- if (re_class->type_len > 0) {
- if (!cbdata->silent) {
- msg_info_re_cache(
- "skip already valid class %s(%*s) to cache %6s, %d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- (int) re_class->type_len - 1,
- re_class->type_data,
- re_class->hash,
- n,
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
- }
- }
- else {
- if (!cbdata->silent) {
- msg_info_re_cache(
- "skip already valid class %s to cache %6s, %d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- re_class->hash,
- n,
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
- }
- }
-
- cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
- cbdata->current_class = NULL;
- ev_timer_again(EV_A_ w);
- return;
+ else {
+ rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s",
+ rspamd_re_cache_type_to_string(re_class->type));
}
-
- cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_COMPILING;
+ rspamd_hs_cache_lua_exists_async(re_class->hash, entity_name, rspamd_re_cache_exists_cb, ctx);
+ /* Don't stop timer here - the callback (rspamd_re_cache_exists_cb) handles
+ * restarting the timer. For file backend the callback runs synchronously
+ * within exists_async, so stopping here would undo the timer restart. */
+ return;
}
- /* Only create temp file if not using Lua backend */
- if (!rspamd_hs_cache_has_lua_backend()) {
- rspamd_snprintf(path, sizeof(path), "%s%c%s%P-XXXXXXXXXX", cbdata->cache_dir,
- G_DIR_SEPARATOR, re_class->hash, our_pid);
- fd = g_mkstemp_full(path, O_CREAT | O_TRUNC | O_EXCL | O_WRONLY, 00600);
-
- if (fd == -1) {
- err = g_error_new(rspamd_re_cache_quark(), errno,
- "cannot open file %s: %s", path, strerror(errno));
- rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
- return;
- }
- }
- else {
- fd = -1; /* Not using file */
- }
+ fd = -1; /* Not using direct file I/O, Lua backend handles storage */
g_hash_table_iter_init(&cit, re_class->re);
n = g_hash_table_size(re_class->re);
iov[6].iov_base = hs_serialized;
iov[6].iov_len = serialized_len;
- if (rspamd_hs_cache_has_lua_backend()) {
- /* Build combined buffer for Lua backend */
- gsize total_len = 0;
- for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) {
- total_len += iov[j].iov_len;
- }
-
- unsigned char *combined = g_malloc(total_len);
- gsize offset = 0;
- for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) {
- memcpy(combined + offset, iov[j].iov_base, iov[j].iov_len);
- offset += iov[j].iov_len;
- }
+ /* Save via Lua backend (handles file, redis, http with compression) */
+ gsize total_len = 0;
+ for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) {
+ total_len += iov[j].iov_len;
+ }
- struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx));
- ctx->cbdata = cbdata;
- ctx->loop = loop;
- ctx->w = w;
- ctx->n = n;
+ unsigned char *combined = g_malloc(total_len);
+ gsize offset = 0;
+ for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) {
+ memcpy(combined + offset, iov[j].iov_base, iov[j].iov_len);
+ offset += iov[j].iov_len;
+ }
- char entity_name[256];
- if (re_class->type_len > 0) {
- rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)",
- rspamd_re_cache_type_to_string(re_class->type),
- (int) re_class->type_len - 1, re_class->type_data);
- }
- else {
- rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s",
- rspamd_re_cache_type_to_string(re_class->type));
- }
- rspamd_hs_cache_lua_save_async(re_class->hash, entity_name, combined, total_len, rspamd_re_cache_save_cb, ctx);
+ struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx));
+ ctx->cbdata = cbdata;
+ ctx->loop = loop;
+ ctx->w = w;
+ ctx->n = n;
- g_free(combined);
- CLEANUP_ALLOCATED(false);
- g_free(hs_serialized);
- ev_timer_stop(EV_A_ w);
- return;
+ char entity_name[256];
+ if (re_class->type_len > 0) {
+ rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)",
+ rspamd_re_cache_type_to_string(re_class->type),
+ (int) re_class->type_len - 1, re_class->type_data);
}
else {
- /* Use file backend */
- if (writev(fd, iov, G_N_ELEMENTS(iov)) == -1) {
- err = g_error_new(rspamd_re_cache_quark(),
- errno,
- "cannot serialize tree of regexp to %s: %s",
- path, strerror(errno));
-
- CLEANUP_ALLOCATED(true);
- g_free(hs_serialized);
-
- rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
- return;
- }
-
- CLEANUP_ALLOCATED(false);
-
- /* File backend: rename temporary file to the new .hs file */
- rspamd_snprintf(npath, sizeof(npath), "%s%c%s.hs", cbdata->cache_dir,
- G_DIR_SEPARATOR, re_class->hash);
-
- if (rename(path, npath) == -1) {
- err = g_error_new(rspamd_re_cache_quark(),
- errno,
- "cannot rename %s to %s: %s",
- path, npath, strerror(errno));
- unlink(path);
- close(fd);
-
- rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
- return;
- }
-
- close(fd);
-
- if (re_class->type_len > 0) {
- msg_info_re_cache(
- "compiled class %s(%*s) to cache %6s (%s), %d/%d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- (int) re_class->type_len - 1,
- re_class->type_data,
- re_class->hash,
- npath,
- n,
- (int) g_hash_table_size(re_class->re),
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
- }
- else {
- msg_info_re_cache(
- "compiled class %s to cache %6s (%s), %d/%d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- re_class->hash,
- npath,
- n,
- (int) g_hash_table_size(re_class->re),
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
- }
-
- cbdata->total += n;
+ rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s",
+ rspamd_re_cache_type_to_string(re_class->type));
}
+ rspamd_hs_cache_lua_save_async(re_class->hash, entity_name, combined, total_len, rspamd_re_cache_save_cb, ctx);
- cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
- cbdata->current_class = NULL;
+ g_free(combined);
+ CLEANUP_ALLOCATED(false);
+ g_free(hs_serialized);
+ /* Don't stop timer here - the callback (rspamd_re_cache_save_cb) handles
+ * restarting the timer. For file backend the callback runs synchronously
+ * within save_async, so stopping here would undo the timer restart. */
+ return;
}
else {
err = g_error_new(rspamd_re_cache_quark(),
ev_timer *timer;
static const ev_tstamp timer_interval = 0.1;
struct rspamd_re_cache_hs_compile_cbdata *cbdata;
+ GHashTableIter it;
+ gpointer k, v;
cbdata = g_malloc0(sizeof(*cbdata));
- g_hash_table_iter_init(&cbdata->it, cache->re_classes);
+ rspamd_heap_init(re_compile_queue, &cbdata->compile_queue);
+
+ /*
+ * Build priority queue for compilation order.
+ * Priority (lower = compile first):
+ * - Short lists (<100 regexps): 0 + count
+ * - URL type (TLD matching): 1000 + count
+ * - Other types: 10000 + count
+ */
+ g_hash_table_iter_init(&it, cache->re_classes);
+ while (g_hash_table_iter_next(&it, &k, &v)) {
+ struct rspamd_re_class *re_class = v;
+ struct rspamd_re_compile_queue_elt elt;
+ unsigned int count = g_hash_table_size(re_class->re);
+ unsigned int base_pri;
+
+ /* Calculate priority tier */
+ if (count < 100) {
+ /* Short lists get highest priority */
+ base_pri = 0;
+ }
+ else if (re_class->type == RSPAMD_RE_URL) {
+ /* URL type (TLD) gets medium priority */
+ base_pri = 1000;
+ }
+ else {
+ /* All other types */
+ base_pri = 10000;
+ }
+
+ elt.pri = base_pri + count;
+ elt.re_class = re_class;
+ rspamd_heap_push_safe(re_compile_queue, &cbdata->compile_queue, &elt, heap_error);
+ }
+
cbdata->cache = cache;
cbdata->cache_dir = cache_dir;
cbdata->cb = cb;
ev_timer_start(event_loop, timer);
return 0;
+
+heap_error:
+ rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue);
+ g_free(cbdata);
+ return -1;
#endif
}
return;
}
- if (!rspamd_hs_cache_has_lua_backend()) {
- /* Fallback to synchronous file loading */
- (void) rspamd_re_cache_load_hyperscan_scoped(cache_head, cache_dir, try_load);
- return;
- }
+ /* All file operations go through Lua backend */
+ g_assert(rspamd_hs_cache_has_lua_backend());
DL_FOREACH(cache_head, cur)
{
sctx->cache = cur;
sctx->event_loop = event_loop;
sctx->try_load = try_load;
- sctx->pending = 0;
- sctx->total = 0;
sctx->loaded = 0;
sctx->all_loaded = TRUE;
+ /* Count items first - for file backend, callbacks run synchronously,
+ * so we must set pending/total before starting any loads to avoid
+ * premature free when pending reaches 0 during the loop */
+ sctx->total = g_hash_table_size(cur->re_classes);
+ sctx->pending = sctx->total;
+
+ if (sctx->pending == 0) {
+ g_free(sctx);
+ continue;
+ }
+
g_hash_table_iter_init(&it, cur->re_classes);
while (g_hash_table_iter_next(&it, &k, &v)) {
struct rspamd_re_class *re_class = (struct rspamd_re_class *) v;
item->cache = cur;
item->re_class = re_class;
item->cache_key = g_strdup(re_class->hash);
- sctx->pending++;
- sctx->total++;
char entity_name[256];
if (re_class->type_len > 0) {
rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s(%*s)",
}
rspamd_hs_cache_lua_load_async(item->cache_key, entity_name, rspamd_re_cache_hs_load_cb, item);
}
-
- if (sctx->pending == 0) {
- g_free(sctx);
- }
}
}
#endif