command: ['luacheck', 'lualib', 'src/plugins/lua']
ClangFormat:
enabled: true
- command: ['sh', '-c', 'git clang-format --diff --quiet || (echo "Running clang-format to fix issues..." && git clang-format && git add -u && echo "Files formatted and staged.")']
+ command: ['sh', '-c', 'git clang-format --extensions c,cc,cpp,cxx,h,hh,hpp,hxx --diff --quiet || (echo "Running clang-format to fix issues..." && git clang-format --extensions c,cc,cpp,cxx,h,hh,hpp,hxx && git add -u && echo "Files formatted and staged.")']
on_warn: fail
#PostCheckout:
# ALL: # Special hook name that customizes all hooks of this type
#
# See https://rspamd.com/doc/faq.html#what-are-the-locald-and-overrided-directories
# for details
-#
-# This worker compiles hyperscan databases in the background, allowing
-# the main rspamd process to start quickly without blocking on compilation.
-
# Directory to store compiled hyperscan databases
# cache_dir = "${DBDIR}";
limitations under the License.
]]--
---[[
-Pluggable Hyperscan cache storage backends.
-
-This module provides a unified interface for storing and loading serialized
-Hyperscan databases from various backends (files, Redis, HTTP).
-
-Usage:
- local hs_cache = require "lua_hs_cache"
- local backend = hs_cache.create_backend(config)
- backend:load(cache_key, platform_id, function(err, data) ... end)
- backend:store(cache_key, platform_id, data, ttl, function(err) ... end)
-]]--
-
local logger = require "rspamd_logger"
local rspamd_util = require "rspamd_util"
local lua_redis = require "lua_redis"
local exports = {}
local N = "lua_hs_cache"
---[[
-Backend interface definition (for documentation):
-
-backend = {
- -- Check if cache entry exists
- -- callback(err, exists: boolean, metadata: table|nil)
- exists = function(self, cache_key, platform_id, callback) end,
-
- -- Load serialized database
- -- callback(err, data: string|nil)
- load = function(self, cache_key, platform_id, callback) end,
-
- -- Store serialized database
- -- callback(err)
- store = function(self, cache_key, platform_id, data, ttl, callback) end,
-
- -- Delete cache entry
- -- callback(err)
- delete = function(self, cache_key, platform_id, callback) end,
-}
-]]--
-
--------------------------------------------------------------------------------
--- File Backend
--------------------------------------------------------------------------------
+-- File backend
local file_backend = {}
file_backend.__index = file_backend
function file_backend.new(config)
local self = setmetatable({}, file_backend)
self.cache_dir = config.cache_dir or '/var/lib/rspamd/hs_cache'
- self.platform_dirs = config.platform_dirs ~= false -- Create platform subdirs by default
+ self.platform_dirs = config.platform_dirs ~= false
return self
end
function file_backend:_ensure_dir(path)
local dir = path:match("(.*/)")
if dir then
- -- Create directory if it doesn't exist
local ok, err = rspamd_util.mkdir(dir, true)
if not ok and err then
logger.warnx(N, "failed to create directory %s: %s", dir, err)
end
end
--------------------------------------------------------------------------------
--- Redis Backend
--------------------------------------------------------------------------------
+function file_backend:exists_sync(cache_key, platform_id)
+ local path = self:_get_path(cache_key, platform_id)
+ return rspamd_util.stat(path) ~= nil, nil
+end
+
+function file_backend:save_async(cache_key, platform_id, data, callback)
+ self:store(cache_key, platform_id, data, nil, callback)
+end
+
+function file_backend:load_async(cache_key, platform_id, callback)
+ self:load(cache_key, platform_id, callback)
+end
+
+function file_backend:exists_async(cache_key, platform_id, callback)
+ local exists, err = self:exists_sync(cache_key, platform_id)
+ callback(err, exists)
+end
+
+function file_backend:load_sync(cache_key, platform_id)
+ local path = self:_get_path(cache_key, platform_id)
+ return rspamd_util.read_file(path)
+end
+
+function file_backend:save_sync(cache_key, platform_id, data)
+ local path = self:_get_path(cache_key, platform_id)
+ self:_ensure_dir(path)
+
+ local tmp_path = path .. ".tmp." .. rspamd_util.random_hex(8)
+ local ok, err = rspamd_util.write_file(tmp_path, data)
+ if not ok then
+ return false, err
+ end
+
+ local renamed, rename_err = os.rename(tmp_path, path)
+ if not renamed then
+ os.remove(tmp_path)
+ return false, rename_err
+ end
+
+ return true, nil
+end
+
+-- Redis backend
local redis_backend = {}
redis_backend.__index = redis_backend
if not self.redis_params then
self.redis_params = lua_redis.parse_redis_server(nil, config)
end
+
+ if config.ev_base and self.redis_params then
+ self.redis_params.ev_base = config.ev_base
+ end
+
+ if config.rspamd_config then
+ self.config = config.rspamd_config
+ else
+ self.config = config
+ end
+
self.prefix = config.prefix or 'rspamd_hs'
self.default_ttl = config.ttl or (86400 * 30) -- 30 days default
self.refresh_ttl = config.refresh_ttl ~= false -- Refresh TTL on read by default
- self.use_compression = config.compression ~= false -- zstd compression by default
+ self.use_compression = config.compression ~= false
return self
end
return
end
- lua_redis.request(self.redis_params, nil, {
- cmd = 'EXISTS',
- args = { key },
+ local attrs = {
+ ev_base = self.redis_params.ev_base,
+ config = self.config,
callback = function(err, data)
if err then
callback(err, false, nil)
callback(nil, data == 1, nil)
end
end
- })
+ }
+
+ local req = {'EXISTS', key}
+ lua_redis.request(self.redis_params, attrs, req)
end
function redis_backend:load(cache_key, platform_id, callback)
end
-- Use GETEX to refresh TTL on read if enabled
- local cmd, args
+ local req
if self.refresh_ttl then
- cmd = 'GETEX'
- args = { key, 'EX', tostring(self.default_ttl) }
+ req = {'GETEX', key, 'EX', tostring(self.default_ttl)}
else
- cmd = 'GET'
- args = { key }
+ req = {'GET', key}
end
- lua_redis.request(self.redis_params, nil, {
- cmd = cmd,
- args = args,
+ local attrs = {
+ ev_base = self.redis_params.ev_base,
+ config = self.config,
callback = function(err, data)
if err then
callback(err, nil)
elseif not data then
callback("not found", nil)
- else
- -- Decompress if needed
- if self.use_compression then
- local decompressed, decompress_err = rspamd_util.zstd_decompress(data)
- if decompressed then
- logger.debugx(N, "loaded and decompressed %d -> %d bytes from redis key %s",
- #data, #decompressed, key)
- callback(nil, decompressed)
- else
- callback(decompress_err or "decompression failed", nil)
- end
- else
- logger.debugx(N, "loaded %d bytes from redis key %s", #data, key)
- callback(nil, data)
- end
- end
+ else
+ -- Decompress if needed
+ if self.use_compression then
+ local decompress_err, decompressed = rspamd_util.zstd_decompress(data)
+ if not decompress_err and decompressed then
+ logger.debugx(N, "loaded and decompressed %d -> %d bytes from redis key %s",
+ #data, #decompressed, key)
+ callback(nil, decompressed)
+ else
+ callback(decompress_err or "decompression failed", nil)
+ end
+ else
+ logger.debugx(N, "loaded %d bytes from redis key %s", #data, key)
+ callback(nil, data)
+ end
+ end
end
- })
+ }
+
+ lua_redis.request(self.redis_params, attrs, req)
end
function redis_backend:store(cache_key, platform_id, data, ttl, callback)
end
end
- lua_redis.request(self.redis_params, nil, {
- cmd = 'SETEX',
- args = { key, tostring(actual_ttl), store_data },
+ local attrs = {
+ ev_base = self.redis_params.ev_base,
+ config = self.config,
callback = function(err)
if err then
callback(err)
callback(nil)
end
end
- })
+ }
+
+ local req = {'SETEX', key, tostring(actual_ttl), store_data}
+ lua_redis.request(self.redis_params, attrs, req)
end
function redis_backend:delete(cache_key, platform_id, callback)
return
end
- lua_redis.request(self.redis_params, nil, {
- cmd = 'DEL',
- args = { key },
+ local attrs = {
+ ev_base = self.redis_params.ev_base,
+ config = self.config,
callback = function(err)
if err then
callback(err)
callback(nil)
end
end
- })
+ }
+
+ local req = {'DEL', key}
+ lua_redis.request(self.redis_params, attrs, req)
+end
+
+-- Synchronous methods for C backend interface
+function redis_backend:exists_sync(cache_key, platform_id)
+ local key = self:_get_key(cache_key, platform_id)
+
+ if not self.redis_params then
+ return false, "redis not configured"
+ end
+
+ local ret, conn = lua_redis.redis_connect_sync(self.redis_params, false, key,
+ self.config or rspamd_config, self.redis_params.ev_base)
+ if not ret then
+ return false, "cannot connect to redis"
+ end
+
+ conn:add_cmd('EXISTS', { key })
+ local ok, result = conn:exec()
+ if not ok then
+ return false, "redis EXISTS failed"
+ end
+
+ return result == 1, nil
+end
+
+function redis_backend:load_sync(cache_key, platform_id)
+ local key = self:_get_key(cache_key, platform_id)
+
+ if not self.redis_params then
+ return nil, "redis not configured"
+ end
+
+ local ret, conn = lua_redis.redis_connect_sync(self.redis_params, false, key,
+ self.config or rspamd_config, self.redis_params.ev_base)
+ if not ret then
+ return nil, "cannot connect to redis"
+ end
+
+ -- Use GETEX to refresh TTL on read if enabled
+ if self.refresh_ttl then
+ conn:add_cmd('GETEX', { key, 'EX', tostring(self.default_ttl) })
+ else
+ conn:add_cmd('GET', { key })
+ end
+
+ local ok, data = conn:exec()
+ if not ok then
+ return nil, "redis GET failed"
+ end
+
+ if not data then
+ return nil, nil -- Cache miss, not an error
+ end
+
+ -- Decompress if needed
+ if self.use_compression then
+ local decompress_err, decompressed = rspamd_util.zstd_decompress(data)
+ if not decompress_err and decompressed then
+ logger.debugx(N, "loaded and decompressed %d -> %d bytes from redis key %s",
+ #data, #decompressed, key)
+ return decompressed, nil
+ end
+
+ return nil, decompress_err or "decompression failed"
+ else
+ logger.debugx(N, "loaded %d bytes from redis key %s", #data, key)
+ return data, nil
+ end
+end
+
+function redis_backend:save_async(cache_key, platform_id, data, callback)
+ self:store(cache_key, platform_id, data, nil, callback)
+end
+
+function redis_backend:load_async(cache_key, platform_id, callback)
+ self:load(cache_key, platform_id, callback)
+end
+
+function redis_backend:exists_async(cache_key, platform_id, callback)
+ self:exists(cache_key, platform_id, callback)
+end
+
+function redis_backend:save_sync(cache_key, platform_id, data)
+ local key = self:_get_key(cache_key, platform_id)
+
+ if not self.redis_params then
+ return false, "redis not configured"
+ end
+
+ local ret, conn = lua_redis.redis_connect_sync(self.redis_params, true, key,
+ self.config or rspamd_config, self.redis_params.ev_base)
+ if not ret then
+ return false, "cannot connect to redis"
+ end
+
+ local store_data = data
+ -- Compress if enabled
+ if self.use_compression then
+ local compressed, compress_err = rspamd_util.zstd_compress(data)
+ if compressed then
+ logger.debugx(N, "compressed %d -> %d bytes (%.1f%% reduction)",
+ #data, #compressed, (1 - #compressed / #data) * 100)
+ store_data = compressed
+ else
+ logger.warnx(N, "compression failed: %s, storing uncompressed", compress_err)
+ end
+ end
+
+ conn:add_cmd('SETEX', { key, tostring(self.default_ttl), store_data })
+ local ok, result = conn:exec()
+ if not ok then
+ return false, "redis SETEX failed: " .. tostring(result)
+ end
+
+ logger.debugx(N, "stored %d bytes to redis key %s with TTL %d",
+ #store_data, key, self.default_ttl)
+ return true, nil
end
--------------------------------------------------------------------------------
--- HTTP Backend
--------------------------------------------------------------------------------
+-- HTTP backend
local http_backend = {}
http_backend.__index = http_backend
method = 'GET',
headers = self:_get_headers(),
timeout = self.timeout,
- callback = function(err, code, body, headers)
+ callback = function(err, code, body, headers)
if err then
callback(err, nil)
elseif code == 200 and body then
-- Check if content is compressed
local content_encoding = headers and headers['content-encoding']
if content_encoding == 'zstd' or self.use_compression then
- local decompressed = rspamd_util.zstd_decompress(body)
- if decompressed then
- callback(nil, decompressed)
+ local decompress_err, decompressed = rspamd_util.zstd_decompress(body)
+ if not decompress_err and decompressed then
+ callback(nil, decompressed)
else
- -- Maybe it wasn't compressed after all
- callback(nil, body)
+ callback(nil, body)
end
else
- callback(nil, body)
+ callback(nil, body)
end
elseif code == 404 then
callback("not found", nil)
})
end
--------------------------------------------------------------------------------
--- Backend Factory
--------------------------------------------------------------------------------
+-- Backend factory
-- Create a backend instance based on configuration
-- @param config table with:
+++ /dev/null
---[[
-Copyright (c) 2026, Vsevolod Stakhov <vsevolod@rspamd.com>
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-]]--
-
---[[
-Unified Hyperscan compilation service.
-
-This module provides a single interface for compiling Hyperscan databases
-with pluggable cache backends. It unifies the compilation paths used by
-multipattern and re_cache.
-
-Usage:
- local hs_compile = require "lua_hs_compile"
-
- -- Compile with caching (async)
- hs_compile.compile({
- patterns = {"pat1", "pat2"},
- flags = {0, 0},
- ids = {1, 2},
- cache_key = "my_patterns_hash",
- backend = backend_instance, -- from lua_hs_cache
- callback = function(err, db) ... end
- })
-
- -- Compile with caching (sync, for initialization)
- local db, err = hs_compile.compile_sync({
- patterns = {"pat1", "pat2"},
- cache_key = "my_patterns_hash",
- backend = backend_instance,
- })
-]]--
-
-local rspamd_hyperscan = require "rspamd_hyperscan"
-local rspamd_cryptobox_hash = require "rspamd_cryptobox_hash"
-local logger = require "rspamd_logger"
-
-local exports = {}
-local N = "lua_hs_compile"
-
--- Check if hyperscan is available
-exports.has_hyperscan = rspamd_hyperscan.has_hyperscan
-
--- Get platform identifier
-exports.platform_id = rspamd_hyperscan.platform_id
-
--- Hyperscan flags (re-exported for convenience)
-exports.flags = rspamd_hyperscan.flags or {}
-
---[[
-Generate a cache key from patterns and flags.
-@param patterns table of pattern strings
-@param flags table of flag values (optional)
-@return string cache key (hex hash)
-]]--
-function exports.generate_cache_key(patterns, flags)
- local h = rspamd_cryptobox_hash.create()
-
- for i, pat in ipairs(patterns) do
- h:update(pat)
- if flags and flags[i] then
- h:update(tostring(flags[i]))
- end
- end
-
- return h:hex():sub(1, 16)
-end
-
---[[
-Compile patterns into a hyperscan database with optional caching.
-This is the async version suitable for use in workers with event loops.
-
-@param opts table with:
- - patterns: table of pattern strings (required)
- - flags: table of HS_FLAG_* values (optional, default 0 for each)
- - ids: table of pattern IDs (optional, defaults to 1..n)
- - cache_key: string cache key (optional, auto-generated if not provided)
- - backend: cache backend instance from lua_hs_cache (optional)
- - ttl: cache TTL in seconds (optional)
- - callback: function(err, db) called on completion (required)
-]]--
-function exports.compile(opts)
- local callback = opts.callback
- if not callback then
- error("callback is required for async compile")
- end
-
- local patterns = opts.patterns
- if not patterns or #patterns == 0 then
- callback("no patterns provided", nil)
- return
- end
-
- if not rspamd_hyperscan.has_hyperscan() then
- callback("hyperscan not available", nil)
- return
- end
-
- local flags = opts.flags or {}
- local ids = opts.ids or {}
- local cache_key = opts.cache_key or exports.generate_cache_key(patterns, flags)
- local backend = opts.backend
- local ttl = opts.ttl
- local platform_id = rspamd_hyperscan.platform_id()
-
- -- Fill in default IDs if not provided
- if #ids == 0 then
- for i = 1, #patterns do
- ids[i] = i
- end
- end
-
- -- If no backend, compile directly
- if not backend then
- local db, err = rspamd_hyperscan.compile(patterns, flags, ids)
- if db then
- callback(nil, db)
- else
- callback(err or "compile failed", nil)
- end
- return
- end
-
- -- Try to load from cache first
- backend:load(cache_key, platform_id, function(load_err, data)
- if data then
- -- Validate the cached data
- local valid, valid_err = rspamd_hyperscan.validate(data)
- if valid then
- -- Deserialize
- local db, deser_err = rspamd_hyperscan.deserialize(data)
- if db then
- logger.debugx(N, "loaded cached hyperscan db for key %s", cache_key)
- callback(nil, db)
- return
- else
- logger.warnx(N, "failed to deserialize cached db for key %s: %s",
- cache_key, deser_err)
- end
- else
- logger.debugx(N, "cached db for key %s is invalid: %s", cache_key, valid_err)
- end
- end
-
- -- Cache miss or invalid - compile
- local db, compile_err = rspamd_hyperscan.compile(patterns, flags, ids)
- if not db then
- callback(compile_err or "compile failed", nil)
- return
- end
-
- -- Serialize and store
- local blob = rspamd_hyperscan.serialize(db, ids, flags)
- if blob then
- backend:store(cache_key, platform_id, blob, ttl, function(store_err)
- if store_err then
- logger.warnx(N, "failed to store compiled db for key %s: %s",
- cache_key, store_err)
- else
- logger.debugx(N, "stored compiled db for key %s (%d bytes)",
- cache_key, #blob)
- end
- end)
- end
-
- callback(nil, db)
- end)
-end
-
---[[
-Compile patterns synchronously with optional caching.
-This is suitable for use during initialization before event loops start.
-
-@param opts table with same options as compile() except callback
-@return db, err - database object or nil and error message
-]]--
-function exports.compile_sync(opts)
- local patterns = opts.patterns
- if not patterns or #patterns == 0 then
- return nil, "no patterns provided"
- end
-
- if not rspamd_hyperscan.has_hyperscan() then
- return nil, "hyperscan not available"
- end
-
- local flags = opts.flags or {}
- local ids = opts.ids or {}
- local cache_key = opts.cache_key or exports.generate_cache_key(patterns, flags)
- local backend = opts.backend
- local ttl = opts.ttl
- local platform_id = rspamd_hyperscan.platform_id()
-
- -- Fill in default IDs if not provided
- if #ids == 0 then
- for i = 1, #patterns do
- ids[i] = i
- end
- end
-
- -- If no backend, compile directly
- if not backend then
- return rspamd_hyperscan.compile(patterns, flags, ids)
- end
-
- -- For sync mode with backend, check if backend supports sync operations
- if backend.load_sync then
- local data = backend:load_sync(cache_key, platform_id)
- if data then
- local valid = rspamd_hyperscan.validate(data)
- if valid then
- local db = rspamd_hyperscan.deserialize(data)
- if db then
- logger.debugx(N, "loaded cached hyperscan db for key %s (sync)", cache_key)
- return db, nil
- end
- end
- end
- end
-
- -- Compile
- local db, compile_err = rspamd_hyperscan.compile(patterns, flags, ids)
- if not db then
- return nil, compile_err or "compile failed"
- end
-
- -- Try to store (best effort for sync mode)
- if backend.store_sync then
- local blob = rspamd_hyperscan.serialize(db, ids, flags)
- if blob then
- local ok = backend:store_sync(cache_key, platform_id, blob, ttl)
- if ok then
- logger.debugx(N, "stored compiled db for key %s (sync)", cache_key)
- end
- end
- end
-
- return db, nil
-end
-
---[[
-Validate a serialized hyperscan blob.
-@param blob string or text containing serialized database
-@return boolean, error_message
-]]--
-exports.validate = rspamd_hyperscan.validate
-
---[[
-Deserialize a hyperscan database from blob.
-@param blob string or text containing serialized database
-@return db, error_message
-]]--
-exports.deserialize = rspamd_hyperscan.deserialize
-
---[[
-Serialize a hyperscan database to blob.
-@param db database object
-@param ids optional table of pattern IDs
-@param flags optional table of pattern flags
-@return blob as rspamd_text or nil
-]]--
-exports.serialize = rspamd_hyperscan.serialize
-
---[[
-Direct compilation without caching.
-@param patterns table of pattern strings
-@param flags table of flag values (optional)
-@param ids table of pattern IDs (optional)
-@return db, error_message
-]]--
-exports.compile_direct = rspamd_hyperscan.compile
-
-return exports
#include "libserver/cfg_rcl.h"
#include "libserver/worker_util.h"
#include "libserver/rspamd_control.h"
+#include "libserver/hs_cache_backend.h"
#include "lua/lua_common.h"
+#include "lua/lua_classnames.h"
#include "unix-std.h"
#ifdef WITH_HYPERSCAN
/*
* Compile pending multipatterns that were queued during pre-fork initialization
*/
+
+struct rspamd_hs_helper_mp_async_ctx {
+ struct hs_helper_ctx *ctx;
+ struct rspamd_worker *worker;
+ struct rspamd_multipattern_pending *pending;
+ unsigned int count;
+ unsigned int idx;
+};
+
+static void rspamd_hs_helper_compile_pending_multipatterns_next(struct rspamd_hs_helper_mp_async_ctx *mpctx);
+
static void
-rspamd_hs_helper_compile_pending_multipatterns(struct hs_helper_ctx *ctx,
- struct rspamd_worker *worker)
+rspamd_hs_helper_mp_send_notification(struct hs_helper_ctx *ctx,
+ struct rspamd_worker *worker,
+ const char *name)
{
- struct rspamd_multipattern_pending *pending;
- unsigned int count = 0;
+ struct rspamd_srv_command srv_cmd;
- pending = rspamd_multipattern_get_pending(&count);
- if (pending == NULL || count == 0) {
- msg_debug("no pending multipattern compilations");
+ memset(&srv_cmd, 0, sizeof(srv_cmd));
+ srv_cmd.type = RSPAMD_SRV_MULTIPATTERN_LOADED;
+ rspamd_strlcpy(srv_cmd.cmd.mp_loaded.name, name,
+ sizeof(srv_cmd.cmd.mp_loaded.name));
+ rspamd_strlcpy(srv_cmd.cmd.mp_loaded.cache_dir, ctx->hs_dir,
+ sizeof(srv_cmd.cmd.mp_loaded.cache_dir));
+
+ rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+ msg_info("sent multipattern loaded notification for '%s'", name);
+}
+
+static void
+rspamd_hs_helper_mp_compiled_cb(struct rspamd_multipattern *mp,
+ gboolean success,
+ GError *err,
+ void *ud)
+{
+ struct rspamd_hs_helper_mp_async_ctx *mpctx = ud;
+ struct rspamd_multipattern_pending *entry = &mpctx->pending[mpctx->idx];
+
+ (void) mp;
+ rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, NULL);
+
+ if (!success) {
+ msg_err("failed to compile multipattern '%s': %e", entry->name, err);
+ }
+ else {
+ rspamd_hs_helper_mp_send_notification(mpctx->ctx, mpctx->worker, entry->name);
+ }
+
+ mpctx->idx++;
+ rspamd_hs_helper_compile_pending_multipatterns_next(mpctx);
+}
+
+static void
+rspamd_hs_helper_mp_exists_cb(gboolean success,
+ const unsigned char *data,
+ gsize len,
+ const char *error,
+ void *ud)
+{
+ struct rspamd_hs_helper_mp_async_ctx *mpctx = ud;
+ struct rspamd_multipattern_pending *entry = &mpctx->pending[mpctx->idx];
+ bool exists = (success && data == NULL && len == 1);
+
+ (void) error;
+
+ if (exists) {
+ msg_info("multipattern cache already exists for '%s', skipping compilation", entry->name);
+ rspamd_hs_helper_mp_send_notification(mpctx->ctx, mpctx->worker, entry->name);
+ mpctx->idx++;
+ rspamd_hs_helper_compile_pending_multipatterns_next(mpctx);
return;
}
- msg_info("processing %ud pending multipattern compilations", count);
+ /* Need to compile+store */
+ rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, "compile multipattern");
+ rspamd_multipattern_compile_hs_to_cache_async(entry->mp, mpctx->ctx->hs_dir,
+ mpctx->ctx->event_loop,
+ rspamd_hs_helper_mp_compiled_cb, mpctx);
+}
- for (unsigned int i = 0; i < count; i++) {
- struct rspamd_multipattern_pending *entry = &pending[i];
- struct rspamd_multipattern *mp = entry->mp;
- unsigned int npatterns;
- char fp[PATH_MAX];
- GError *err = NULL;
+static void
+rspamd_hs_helper_compile_pending_multipatterns_next(struct rspamd_hs_helper_mp_async_ctx *mpctx)
+{
+ if (mpctx->worker->state != rspamd_worker_state_running) {
+ msg_info("worker terminating, stopping multipattern compilation");
+ goto done;
+ }
- if (worker->state != rspamd_worker_state_running) {
- msg_info("worker terminating, stopping multipattern compilation");
- break;
- }
+ if (mpctx->idx >= mpctx->count) {
+ goto done;
+ }
- npatterns = rspamd_multipattern_get_npatterns(mp);
- msg_info("compiling multipattern '%s' with %ud patterns", entry->name, npatterns);
+ struct rspamd_multipattern_pending *entry = &mpctx->pending[mpctx->idx];
+ unsigned int npatterns = rspamd_multipattern_get_npatterns(entry->mp);
+ msg_info("processing multipattern '%s' with %ud patterns", entry->name, npatterns);
- /* Build cache file path */
- rspamd_snprintf(fp, sizeof(fp), "%s/%*xs.hs", ctx->hs_dir,
+ if (rspamd_hs_cache_has_lua_backend()) {
+ char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1];
+ rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs",
(int) sizeof(entry->hash) / 2, entry->hash);
+ rspamd_hs_cache_lua_exists_async(cache_key, rspamd_hs_helper_mp_exists_cb, mpctx);
+ return;
+ }
- /* Check if cache file already exists (race with another process) */
+ /* File backend path: keep existing synchronous behaviour */
+ {
+ char fp[PATH_MAX];
+ GError *err = NULL;
+ rspamd_snprintf(fp, sizeof(fp), "%s/%*xs.hs", mpctx->ctx->hs_dir,
+ (int) sizeof(entry->hash) / 2, entry->hash);
if (access(fp, R_OK) == 0) {
msg_info("cache file %s already exists for multipattern '%s', skipping compilation",
fp, entry->name);
}
else {
- rspamd_worker_set_busy(worker, ctx->event_loop, "compile multipattern");
-
- if (worker->state != rspamd_worker_state_running) {
- rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
- break;
- }
-
- if (!rspamd_multipattern_compile_hs_to_cache(mp, ctx->hs_dir, &err)) {
- rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
+ rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, "compile multipattern");
+ if (!rspamd_multipattern_compile_hs_to_cache(entry->mp, mpctx->ctx->hs_dir, &err)) {
msg_err("failed to compile multipattern '%s': %e", entry->name, err);
- if (err) {
- g_error_free(err);
- }
- continue;
- }
-
- rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
-
- if (worker->state != rspamd_worker_state_running) {
- break;
+ if (err) g_error_free(err);
}
+ rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, NULL);
}
- if (worker->state != rspamd_worker_state_running) {
- break;
- }
+ rspamd_hs_helper_mp_send_notification(mpctx->ctx, mpctx->worker, entry->name);
+ mpctx->idx++;
+ rspamd_hs_helper_compile_pending_multipatterns_next(mpctx);
+ return;
+ }
- struct rspamd_srv_command srv_cmd;
- memset(&srv_cmd, 0, sizeof(srv_cmd));
- srv_cmd.type = RSPAMD_SRV_MULTIPATTERN_LOADED;
- rspamd_strlcpy(srv_cmd.cmd.mp_loaded.name, entry->name,
- sizeof(srv_cmd.cmd.mp_loaded.name));
- rspamd_strlcpy(srv_cmd.cmd.mp_loaded.cache_dir, ctx->hs_dir,
- sizeof(srv_cmd.cmd.mp_loaded.cache_dir));
+done:
+ rspamd_multipattern_clear_pending();
+ for (unsigned int i = 0; i < mpctx->count; i++) {
+ /* names are freed by clear_pending */
+ (void) i;
+ }
+ g_free(mpctx);
+}
+
+static void
+rspamd_hs_helper_compile_pending_multipatterns(struct hs_helper_ctx *ctx,
+ struct rspamd_worker *worker)
+{
+ struct rspamd_multipattern_pending *pending;
+ unsigned int count = 0;
- rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
- msg_info("sent multipattern loaded notification for '%s'", entry->name);
+ pending = rspamd_multipattern_get_pending(&count);
+ if (pending == NULL || count == 0) {
+ msg_debug("no pending multipattern compilations");
+ return;
}
- rspamd_multipattern_clear_pending();
+ msg_info("processing %ud pending multipattern compilations", count);
+
+ struct rspamd_hs_helper_mp_async_ctx *mpctx = g_malloc0(sizeof(*mpctx));
+ mpctx->ctx = ctx;
+ mpctx->worker = worker;
+ mpctx->pending = pending;
+ mpctx->count = count;
+ mpctx->idx = 0;
+
+ rspamd_hs_helper_compile_pending_multipatterns_next(mpctx);
}
#endif
strerror(errno));
}
+ /* If we are shutting down, do not start any long-running work */
+ if (worker->state != rspamd_worker_state_running) {
+ if (attached_fd != -1) {
+ close(attached_fd);
+ }
+ return TRUE;
+ }
+
/* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */
if (ctx->loaded && worker->state == rspamd_worker_state_running) {
static struct rspamd_srv_command srv_cmd;
rspamd_rs_compile(ctx, worker, FALSE);
}
-/**
- * Initialize the Lua cache backend
- * Loads lua_hs_cache module and creates a backend instance
- */
-static gboolean
-rspamd_hs_helper_init_lua_backend(struct hs_helper_ctx *ctx, struct rspamd_worker *worker)
-{
- lua_State *L = ctx->cfg->lua_state;
- const char *backend_name;
-
- switch (ctx->cache_backend) {
- case HS_CACHE_BACKEND_FILE:
- backend_name = "file";
- break;
- case HS_CACHE_BACKEND_REDIS:
- backend_name = "redis";
- break;
- case HS_CACHE_BACKEND_HTTP:
- backend_name = "http";
- break;
- case HS_CACHE_BACKEND_LUA:
- backend_name = "lua";
- break;
- default:
- backend_name = "file";
- break;
- }
-
- /* Load lua_hs_cache module */
- lua_getglobal(L, "require");
- lua_pushstring(L, "lua_hs_cache");
-
- if (lua_pcall(L, 1, 1, 0) != 0) {
- msg_err("failed to load lua_hs_cache module: %s", lua_tostring(L, -1));
- lua_pop(L, 1);
- return FALSE;
- }
-
- /* Get create_backend function */
- lua_getfield(L, -1, "create_backend");
- if (!lua_isfunction(L, -1)) {
- msg_err("lua_hs_cache.create_backend is not a function");
- lua_pop(L, 2);
- return FALSE;
- }
-
- /* Create configuration table */
- lua_newtable(L);
-
- lua_pushstring(L, backend_name);
- lua_setfield(L, -2, "backend");
-
- lua_pushstring(L, ctx->hs_dir);
- lua_setfield(L, -2, "cache_dir");
-
- /* Add platform_id if available */
-#ifdef WITH_HYPERSCAN
- const char *platform_id = rspamd_hyperscan_get_platform_id();
- if (platform_id) {
- lua_pushstring(L, platform_id);
- lua_setfield(L, -2, "platform_id");
- }
-#endif
-
- /* Call create_backend(config) */
- if (lua_pcall(L, 1, 1, 0) != 0) {
- msg_err("failed to create cache backend: %s", lua_tostring(L, -1));
- lua_pop(L, 2);
- return FALSE;
- }
-
- /* Store reference to backend object */
- ctx->lua_backend_ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- /* Pop the module table */
- lua_pop(L, 1);
-
- msg_info("initialized %s cache backend", backend_name);
- return TRUE;
-}
-
static void
start_hs_helper(struct rspamd_worker *worker)
{
ctx->recompile_time,
ctx->workers_ready ? "yes" : "no");
- /* Initialize Lua cache backend if not using default file backend */
- if (ctx->cache_backend != HS_CACHE_BACKEND_FILE) {
- if (!rspamd_hs_helper_init_lua_backend(ctx, worker)) {
- msg_warn("failed to initialize %s cache backend, falling back to file",
- ctx->cache_backend == HS_CACHE_BACKEND_REDIS ? "redis" : ctx->cache_backend == HS_CACHE_BACKEND_HTTP ? "http"
- : "lua");
- ctx->cache_backend = HS_CACHE_BACKEND_FILE;
- }
- }
-
ctx->event_loop = rspamd_prepare_worker(worker,
"hs_helper",
NULL);
+ /* HS cache Lua backend (if configured) is initialized for all workers in rspamd_prepare_worker() */
+
rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_RECOMPILE,
rspamd_hs_helper_reload, ctx);
rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_WORKERS_SPAWNED,
ev_loop(ctx->event_loop, 0);
rspamd_worker_block_signals();
+#ifdef WITH_HYPERSCAN
+ /* Prevent any further Lua backend calls during shutdown */
+ rspamd_hs_cache_free_backend();
+#endif
+
CFG_REF_RELEASE(ctx->cfg);
CFG_REF_RELEASE(ctx->cfg);
rspamd_log_close(worker->srv->logger);
${CMAKE_CURRENT_SOURCE_DIR}/html/html_url_rewrite_c.cxx
${CMAKE_CURRENT_SOURCE_DIR}/html/html_tests.cxx
${CMAKE_CURRENT_SOURCE_DIR}/hyperscan_tools.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/hs_cache_backend.c
${CMAKE_CURRENT_SOURCE_DIR}/backtrace.cxx
${LIBCSSSRC})
--- /dev/null
+/*
+ * Copyright 2025 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hs_cache_backend.h"
+#include "lua/lua_common.h"
+#include "lua/lua_classnames.h"
+#include "libutil/util.h"
+#include "libserver/worker_util.h"
+#include "libserver/cfg_file.h"
+#include "libserver/redis_pool.h"
+#ifdef WITH_HYPERSCAN
+#include "libserver/hyperscan_tools.h"
+#endif
+
+static struct rspamd_hs_cache_backend *global_hs_cache_backend = NULL;
+
+/* Lua backend state - set by hs_helper when using non-file backend */
+static lua_State *lua_backend_L = NULL;
+static int lua_backend_ref = LUA_NOREF;
+static const char *lua_backend_platform_id = NULL;
+
+static gboolean
+rspamd_hs_cache_try_init_lua_backend_with_opts(struct rspamd_config *cfg,
+ struct ev_loop *ev_base,
+ const ucl_object_t *opts,
+ const char *backend_name,
+ const char *cache_dir)
+{
+ lua_State *L;
+ int err_idx;
+
+ if (!cfg || !cfg->lua_state || !ev_base || !opts || !backend_name) {
+ return FALSE;
+ }
+
+ if (strcmp(backend_name, "file") == 0) {
+ return FALSE;
+ }
+
+ L = (lua_State *) cfg->lua_state;
+
+ /* Ensure redis pool is bound to this process event loop (required for lua_redis async requests) */
+ if (cfg->redis_pool) {
+ rspamd_redis_pool_config(cfg->redis_pool, cfg, ev_base);
+ }
+
+ /* Load lua_hs_cache module */
+ lua_pushcfunction(L, rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+
+ lua_getglobal(L, "require");
+ lua_pushstring(L, "lua_hs_cache");
+
+ if (lua_pcall(L, 1, 1, err_idx) != 0) {
+ lua_settop(L, err_idx - 1);
+ return FALSE;
+ }
+
+ /* Get create_backend function */
+ lua_getfield(L, -1, "create_backend");
+ if (!lua_isfunction(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ return FALSE;
+ }
+
+ /* Push options as config table */
+ ucl_object_push_lua(L, opts, true);
+
+ /* Set event loop for lua_redis */
+ {
+ struct ev_loop **pev_base = (struct ev_loop **) lua_newuserdata(L, sizeof(struct ev_loop *));
+ *pev_base = ev_base;
+ rspamd_lua_setclass(L, rspamd_ev_base_classname, -1);
+ lua_setfield(L, -2, "ev_base");
+ }
+
+ /* Set rspamd_config for lua_redis */
+ {
+ struct rspamd_config **pcfg = (struct rspamd_config **) lua_newuserdata(L, sizeof(struct rspamd_config *));
+ *pcfg = cfg;
+ rspamd_lua_setclass(L, rspamd_config_classname, -1);
+ lua_setfield(L, -2, "rspamd_config");
+ }
+
+ /* Force backend/cache_dir */
+ lua_pushstring(L, backend_name);
+ lua_setfield(L, -2, "backend");
+ if (cache_dir) {
+ lua_pushstring(L, cache_dir);
+ lua_setfield(L, -2, "cache_dir");
+ }
+
+#ifdef WITH_HYPERSCAN
+ const char *platform_id = rspamd_hyperscan_get_platform_id();
+ if (platform_id) {
+ lua_pushstring(L, platform_id);
+ lua_setfield(L, -2, "platform_id");
+ }
+#else
+ const char *platform_id = NULL;
+#endif
+
+ /* Call create_backend(config) */
+ if (lua_pcall(L, 1, 1, err_idx) != 0) {
+ lua_settop(L, err_idx - 1);
+ return FALSE;
+ }
+
+ int ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ /* Pop the module table */
+ lua_pop(L, 1);
+
+ rspamd_hs_cache_set_lua_backend(L, ref, platform_id);
+ lua_settop(L, err_idx - 1);
+
+ return TRUE;
+}
+
+void rspamd_hs_cache_set_backend(struct rspamd_hs_cache_backend *backend)
+{
+ if (global_hs_cache_backend) {
+ g_free(global_hs_cache_backend);
+ }
+ global_hs_cache_backend = backend;
+}
+
+struct rspamd_hs_cache_backend *
+rspamd_hs_cache_get_backend(void)
+{
+ return global_hs_cache_backend;
+}
+
+gboolean
+rspamd_hs_cache_has_custom_backend(void)
+{
+ return global_hs_cache_backend != NULL;
+}
+
+void rspamd_hs_cache_free_backend(void)
+{
+ if (global_hs_cache_backend) {
+ g_free(global_hs_cache_backend);
+ global_hs_cache_backend = NULL;
+ }
+ lua_backend_L = NULL;
+ lua_backend_ref = LUA_NOREF;
+ lua_backend_platform_id = NULL;
+}
+
+void rspamd_hs_cache_set_lua_backend(lua_State *L, int ref, const char *platform_id)
+{
+ lua_backend_L = L;
+ lua_backend_ref = ref;
+ lua_backend_platform_id = platform_id;
+}
+
+gboolean
+rspamd_hs_cache_has_lua_backend(void)
+{
+ return lua_backend_L != NULL && lua_backend_ref != LUA_NOREF;
+}
+
+gboolean
+rspamd_hs_cache_try_init_lua_backend(struct rspamd_config *cfg,
+ struct ev_loop *ev_base)
+{
+ GList *cur;
+ const struct rspamd_worker_conf *cf = NULL;
+ const ucl_object_t *opts = NULL;
+ const char *backend_name = NULL;
+ const char *cache_dir = NULL;
+ GQuark hs_quark;
+
+ if (rspamd_hs_cache_has_lua_backend()) {
+ return TRUE;
+ }
+
+ if (!cfg || !cfg->workers) {
+ return FALSE;
+ }
+
+ hs_quark = g_quark_try_string("hs_helper");
+ for (cur = cfg->workers; cur != NULL; cur = g_list_next(cur)) {
+ cf = (const struct rspamd_worker_conf *) cur->data;
+ if (cf && (hs_quark != 0 ? (cf->type == hs_quark) : (strcmp(g_quark_to_string(cf->type), "hs_helper") == 0))) {
+ opts = cf->options;
+ break;
+ }
+ }
+
+ if (!opts) {
+ return FALSE;
+ }
+
+ const ucl_object_t *b = ucl_object_lookup(opts, "cache_backend");
+ if (b && ucl_object_type(b) == UCL_STRING) {
+ backend_name = ucl_object_tostring(b);
+ }
+ if (!backend_name) {
+ backend_name = "file";
+ }
+
+ const ucl_object_t *d = ucl_object_lookup(opts, "cache_dir");
+ if (d && ucl_object_type(d) == UCL_STRING) {
+ cache_dir = ucl_object_tostring(d);
+ }
+ if (!cache_dir) {
+ cache_dir = cfg->hs_cache_dir;
+ }
+
+ return rspamd_hs_cache_try_init_lua_backend_with_opts(cfg, ev_base, opts, backend_name, cache_dir);
+}
+
+gboolean
+rspamd_hs_cache_lua_save(const char *cache_key,
+ const unsigned char *data,
+ gsize len,
+ GError **err)
+{
+ lua_State *L = lua_backend_L;
+
+ if (rspamd_current_worker && rspamd_current_worker->state != rspamd_worker_state_running) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), ECANCELED,
+ "worker is terminating");
+ return FALSE;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend not initialized");
+ return FALSE;
+ }
+
+ /* Get backend object */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, lua_backend_ref);
+ if (!lua_istable(L, -1)) {
+ lua_pop(L, 1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Invalid Lua backend reference");
+ return FALSE;
+ }
+
+ /* Get save_sync method */
+ lua_getfield(L, -1, "save_sync");
+ if (!lua_isfunction(L, -1)) {
+ lua_pop(L, 2);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend has no save_sync method");
+ return FALSE;
+ }
+
+ /* Push self (backend object) */
+ lua_pushvalue(L, -2);
+ /* Push cache_key */
+ lua_pushstring(L, cache_key);
+ /* Push platform_id */
+ lua_pushstring(L, lua_backend_platform_id ? lua_backend_platform_id : "default");
+ /* Push data as string */
+ lua_pushlstring(L, (const char *) data, len);
+
+ /* Call backend:save_sync(cache_key, platform_id, data) */
+ if (lua_pcall(L, 4, 2, 0) != 0) {
+ const char *lua_err = lua_tostring(L, -1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua save_sync failed: %s", lua_err ? lua_err : "unknown error");
+ lua_pop(L, 2); /* error + backend table */
+ return FALSE;
+ }
+
+ /* Check result: returns success, error_message */
+ gboolean success = lua_toboolean(L, -2);
+ if (!success) {
+ const char *lua_err = lua_tostring(L, -1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend save failed: %s", lua_err ? lua_err : "unknown error");
+ }
+
+ lua_pop(L, 3); /* result, error, backend table */
+ return success;
+}
+
+gboolean
+rspamd_hs_cache_lua_load(const char *cache_key,
+ unsigned char **data,
+ gsize *len,
+ GError **err)
+{
+ lua_State *L = lua_backend_L;
+
+ if (rspamd_current_worker && rspamd_current_worker->state != rspamd_worker_state_running) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), ECANCELED,
+ "worker is terminating");
+ return FALSE;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend not initialized");
+ return FALSE;
+ }
+
+ /* Get backend object */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, lua_backend_ref);
+ if (!lua_istable(L, -1)) {
+ lua_pop(L, 1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Invalid Lua backend reference");
+ return FALSE;
+ }
+
+ /* Get load_sync method */
+ lua_getfield(L, -1, "load_sync");
+ if (!lua_isfunction(L, -1)) {
+ lua_pop(L, 2);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend has no load_sync method");
+ return FALSE;
+ }
+
+ /* Push self (backend object) */
+ lua_pushvalue(L, -2);
+ /* Push cache_key */
+ lua_pushstring(L, cache_key);
+ /* Push platform_id */
+ lua_pushstring(L, lua_backend_platform_id ? lua_backend_platform_id : "default");
+
+ /* Call backend:load_sync(cache_key, platform_id) */
+ if (lua_pcall(L, 3, 2, 0) != 0) {
+ const char *lua_err = lua_tostring(L, -1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua load_sync failed: %s", lua_err ? lua_err : "unknown error");
+ lua_pop(L, 2); /* error + backend table */
+ return FALSE;
+ }
+
+ /* Check result: returns data_or_nil, error_message */
+ if (lua_isnil(L, -2)) {
+ const char *lua_err = lua_tostring(L, -1);
+ if (lua_err) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), ENOENT,
+ "Lua backend load failed: %s", lua_err);
+ }
+ /* Not an error - cache miss */
+ lua_pop(L, 3); /* nil, error, backend table */
+ *data = NULL;
+ *len = 0;
+ return TRUE; /* Cache miss is not an error */
+ }
+
+ /* Get data */
+ size_t data_len;
+ const char *lua_data = lua_tolstring(L, -2, &data_len);
+ if (lua_data && data_len > 0) {
+ *data = g_malloc(data_len);
+ memcpy(*data, lua_data, data_len);
+ *len = data_len;
+ }
+ else {
+ *data = NULL;
+ *len = 0;
+ }
+
+ lua_pop(L, 3); /* data, error/nil, backend table */
+ return TRUE;
+}
+
+gboolean
+rspamd_hs_cache_lua_exists(const char *cache_key, GError **err)
+{
+ lua_State *L = lua_backend_L;
+
+ if (rspamd_current_worker && rspamd_current_worker->state != rspamd_worker_state_running) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), ECANCELED,
+ "worker is terminating");
+ return FALSE;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend not initialized");
+ return FALSE;
+ }
+
+ /* Get backend object */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, lua_backend_ref);
+ if (!lua_istable(L, -1)) {
+ lua_pop(L, 1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Invalid Lua backend reference");
+ return FALSE;
+ }
+
+ /* Get exists_sync method */
+ lua_getfield(L, -1, "exists_sync");
+ if (!lua_isfunction(L, -1)) {
+ lua_pop(L, 2);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua backend has no exists_sync method");
+ return FALSE;
+ }
+
+ /* Push self (backend object) */
+ lua_pushvalue(L, -2);
+ /* Push cache_key */
+ lua_pushstring(L, cache_key);
+ /* Push platform_id */
+ lua_pushstring(L, lua_backend_platform_id ? lua_backend_platform_id : "default");
+
+ /* Call backend:exists_sync(cache_key, platform_id) */
+ if (lua_pcall(L, 3, 2, 0) != 0) {
+ const char *lua_err = lua_tostring(L, -1);
+ g_set_error(err, g_quark_from_static_string("hs_cache"), EINVAL,
+ "Lua exists_sync failed: %s", lua_err ? lua_err : "unknown error");
+ lua_pop(L, 2);
+ return FALSE;
+ }
+
+ gboolean exists = lua_toboolean(L, -2);
+ lua_pop(L, 3); /* result, error/nil, backend table */
+ return exists;
+}
+
+static int
+lua_hs_cache_async_callback(lua_State *L)
+{
+ rspamd_hs_cache_async_cb cb = (rspamd_hs_cache_async_cb) lua_touserdata(L, lua_upvalueindex(1));
+ void *ud = lua_touserdata(L, lua_upvalueindex(2));
+ const char *err = lua_tostring(L, 1);
+ const unsigned char *data = NULL;
+ size_t len = 0;
+
+ if (lua_gettop(L) >= 2 && !lua_isnil(L, 2)) {
+ if (lua_isboolean(L, 2)) {
+ /* exists_async: pass boolean as len (1/0), keep data NULL */
+ len = lua_toboolean(L, 2) ? 1 : 0;
+ }
+ else {
+ /* Prefer rspamd{text} or Lua strings without forcing conversion */
+ struct rspamd_lua_text *t = lua_check_text_or_string(L, 2);
+ if (t && t->start) {
+ data = (const unsigned char *) t->start;
+ len = t->len;
+ }
+ }
+ }
+
+ if (cb) {
+ cb(err == NULL, data, len, err, ud);
+ }
+
+ return 0;
+}
+
+void rspamd_hs_cache_lua_save_async(const char *cache_key,
+ const unsigned char *data,
+ gsize len,
+ rspamd_hs_cache_async_cb cb,
+ void *ud)
+{
+ lua_State *L = lua_backend_L;
+ int err_idx;
+
+ if (rspamd_current_worker && rspamd_current_worker->state != rspamd_worker_state_running) {
+ if (cb) cb(FALSE, NULL, 0, "worker is terminating", ud);
+ return;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ if (cb) cb(FALSE, NULL, 0, "Lua backend not initialized", ud);
+ return;
+ }
+
+ lua_pushcfunction(L, rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+
+ /* Get backend object */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, lua_backend_ref);
+ if (!lua_istable(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ if (cb) cb(FALSE, NULL, 0, "Invalid Lua backend reference", ud);
+ return;
+ }
+
+ /* Get save_async method */
+ lua_getfield(L, -1, "save_async");
+ if (!lua_isfunction(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ if (cb) cb(FALSE, NULL, 0, "Lua backend has no save_async method", ud);
+ return;
+ }
+
+ /* Push self (backend object) */
+ lua_pushvalue(L, -2);
+ /* Push cache_key */
+ lua_pushstring(L, cache_key);
+ /* Push platform_id */
+ lua_pushstring(L, lua_backend_platform_id ? lua_backend_platform_id : "default");
+ /* Push data */
+ lua_pushlstring(L, (const char *) data, len);
+
+ /* Push callback wrapper */
+ lua_pushlightuserdata(L, (void *) cb);
+ lua_pushlightuserdata(L, ud);
+ lua_pushcclosure(L, lua_hs_cache_async_callback, 2);
+
+ /* Call backend:save_async(cache_key, platform_id, data, callback) */
+ if (lua_pcall(L, 5, 0, err_idx) != 0) {
+ const char *lua_err = lua_tostring(L, -1);
+ if (cb) cb(FALSE, NULL, 0, lua_err ? lua_err : "Lua call failed", ud);
+ lua_settop(L, err_idx - 1);
+ return;
+ }
+
+ lua_settop(L, err_idx - 1);
+}
+
+void rspamd_hs_cache_lua_load_async(const char *cache_key,
+ rspamd_hs_cache_async_cb cb,
+ void *ud)
+{
+ lua_State *L = lua_backend_L;
+ int err_idx;
+
+ if (rspamd_current_worker && rspamd_current_worker->state != rspamd_worker_state_running) {
+ if (cb) cb(FALSE, NULL, 0, "worker is terminating", ud);
+ return;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ if (cb) cb(FALSE, NULL, 0, "Lua backend not initialized", ud);
+ return;
+ }
+
+ lua_pushcfunction(L, rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+
+ /* Get backend object */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, lua_backend_ref);
+ if (!lua_istable(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ if (cb) cb(FALSE, NULL, 0, "Invalid Lua backend reference", ud);
+ return;
+ }
+
+ /* Get load_async method */
+ lua_getfield(L, -1, "load_async");
+ if (!lua_isfunction(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ if (cb) cb(FALSE, NULL, 0, "Lua backend has no load_async method", ud);
+ return;
+ }
+
+ /* Push self (backend object) */
+ lua_pushvalue(L, -2);
+ /* Push cache_key */
+ lua_pushstring(L, cache_key);
+ /* Push platform_id */
+ lua_pushstring(L, lua_backend_platform_id ? lua_backend_platform_id : "default");
+
+ /* Push callback wrapper */
+ lua_pushlightuserdata(L, (void *) cb);
+ lua_pushlightuserdata(L, ud);
+ lua_pushcclosure(L, lua_hs_cache_async_callback, 2);
+
+ /* Call backend:load_async(cache_key, platform_id, callback) */
+ if (lua_pcall(L, 4, 0, err_idx) != 0) {
+ const char *lua_err = lua_tostring(L, -1);
+ if (cb) cb(FALSE, NULL, 0, lua_err ? lua_err : "Lua call failed", ud);
+ lua_settop(L, err_idx - 1);
+ return;
+ }
+
+ lua_settop(L, err_idx - 1);
+}
+
+void rspamd_hs_cache_lua_exists_async(const char *cache_key,
+ rspamd_hs_cache_async_cb cb,
+ void *ud)
+{
+ lua_State *L = lua_backend_L;
+ int err_idx;
+
+ if (rspamd_current_worker && rspamd_current_worker->state != rspamd_worker_state_running) {
+ if (cb) cb(FALSE, NULL, 0, "worker is terminating", ud);
+ return;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ if (cb) cb(FALSE, NULL, 0, "Lua backend not initialized", ud);
+ return;
+ }
+
+ lua_pushcfunction(L, rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+
+ /* Get backend object */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, lua_backend_ref);
+ if (!lua_istable(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ if (cb) cb(FALSE, NULL, 0, "Invalid Lua backend reference", ud);
+ return;
+ }
+
+ /* Get exists_async method */
+ lua_getfield(L, -1, "exists_async");
+ if (!lua_isfunction(L, -1)) {
+ lua_settop(L, err_idx - 1);
+ if (cb) cb(FALSE, NULL, 0, "Lua backend has no exists_async method", ud);
+ return;
+ }
+
+ /* Push self (backend object) */
+ lua_pushvalue(L, -2);
+ /* Push cache_key */
+ lua_pushstring(L, cache_key);
+ /* Push platform_id */
+ lua_pushstring(L, lua_backend_platform_id ? lua_backend_platform_id : "default");
+
+ /* Push callback wrapper */
+ lua_pushlightuserdata(L, (void *) cb);
+ lua_pushlightuserdata(L, ud);
+ lua_pushcclosure(L, lua_hs_cache_async_callback, 2);
+
+ /* Call backend:exists_async(cache_key, platform_id, callback) */
+ if (lua_pcall(L, 4, 0, err_idx) != 0) {
+ const char *lua_err = lua_tostring(L, -1);
+ if (cb) cb(FALSE, NULL, 0, lua_err ? lua_err : "Lua call failed", ud);
+ lua_settop(L, err_idx - 1);
+ return;
+ }
+
+ lua_settop(L, err_idx - 1);
+}
--- /dev/null
+/*
+ * Copyright 2025 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef RSPAMD_HS_CACHE_BACKEND_H
+#define RSPAMD_HS_CACHE_BACKEND_H
+
+#include "config.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rspamd_config;
+struct ev_loop;
+
+/**
+ * Callback for async cache operations
+ * @param err error message or NULL on success
+ * @param data loaded data (for load operations) or NULL
+ * @param len data length
+ * @param ud userdata
+ */
+typedef void (*rspamd_hs_cache_cb_t)(const char *err,
+ const unsigned char *data,
+ gsize len,
+ void *ud);
+
+/**
+ * Cache backend operations structure
+ */
+struct rspamd_hs_cache_backend {
+ /**
+ * Save data to cache
+ * @param cache_key unique key for this cache entry
+ * @param platform_id platform identifier
+ * @param data serialized hyperscan database
+ * @param len data length
+ * @param callback completion callback
+ * @param ud userdata for callback
+ */
+ void (*save)(const char *cache_key,
+ const char *platform_id,
+ const unsigned char *data,
+ gsize len,
+ rspamd_hs_cache_cb_t callback,
+ void *ud);
+
+ /**
+ * Load data from cache
+ * @param cache_key unique key for this cache entry
+ * @param platform_id platform identifier
+ * @param callback completion callback with data
+ * @param ud userdata for callback
+ */
+ void (*load)(const char *cache_key,
+ const char *platform_id,
+ rspamd_hs_cache_cb_t callback,
+ void *ud);
+
+ /**
+ * Check if cache entry exists
+ * @param cache_key unique key
+ * @param platform_id platform identifier
+ * @param callback completion callback (data will be NULL, check err)
+ * @param ud userdata
+ */
+ void (*exists)(const char *cache_key,
+ const char *platform_id,
+ rspamd_hs_cache_cb_t callback,
+ void *ud);
+
+ /* Opaque backend context */
+ void *ctx;
+};
+
+/**
+ * Set the global hyperscan cache backend.
+ * Called by hs_helper after initializing the Lua backend.
+ * @param backend backend operations structure (takes ownership)
+ */
+void rspamd_hs_cache_set_backend(struct rspamd_hs_cache_backend *backend);
+
+/**
+ * Get the current hyperscan cache backend.
+ * @return backend or NULL if using default file backend
+ */
+struct rspamd_hs_cache_backend *rspamd_hs_cache_get_backend(void);
+
+/**
+ * Check if a custom (non-file) backend is configured.
+ * @return TRUE if custom backend is set
+ */
+gboolean rspamd_hs_cache_has_custom_backend(void);
+
+/**
+ * Free the cache backend
+ */
+void rspamd_hs_cache_free_backend(void);
+
+typedef struct lua_State lua_State;
+/**
+ * Set the Lua backend state (called by hs_helper)
+ * @param L Lua state
+ * @param ref registry reference to the backend object
+ * @param platform_id hyperscan platform identifier
+ */
+void rspamd_hs_cache_set_lua_backend(lua_State *L, int ref, const char *platform_id);
+
+/**
+ * Check if Lua backend is available
+ * @return TRUE if Lua backend is set
+ */
+gboolean rspamd_hs_cache_has_lua_backend(void);
+
+/**
+ * Initialize Lua HS cache backend in the current process using hs_helper worker
+ * configuration (if configured and Lua is available).
+ *
+ * This is meant to be called from worker initialization after ev_base is ready.
+ */
+gboolean rspamd_hs_cache_try_init_lua_backend(struct rspamd_config *cfg,
+ struct ev_loop *ev_base);
+
+/**
+ * Save data to cache via Lua backend (synchronous)
+ * @param cache_key unique cache key (hash)
+ * @param data serialized hyperscan data
+ * @param len data length
+ * @param err error output
+ * @return TRUE on success
+ */
+gboolean rspamd_hs_cache_lua_save(const char *cache_key,
+ const unsigned char *data,
+ gsize len,
+ GError **err);
+
+/**
+ * Load data from cache via Lua backend (synchronous)
+ * @param cache_key unique cache key (hash)
+ * @param data output data (caller must g_free)
+ * @param len output data length
+ * @param err error output
+ * @return TRUE on success (including cache miss with data=NULL)
+ */
+gboolean rspamd_hs_cache_lua_load(const char *cache_key,
+ unsigned char **data,
+ gsize *len,
+ GError **err);
+
+/**
+ * Check if cache entry exists via Lua backend (synchronous)
+ * @param cache_key unique cache key (hash)
+ * @param err error output
+ * @return TRUE if exists
+ */
+gboolean rspamd_hs_cache_lua_exists(const char *cache_key, GError **err);
+
+/**
+ * Async callback type
+ * @param success TRUE if operation succeeded
+ * @param data loaded data (for load) or NULL
+ * @param len data length
+ * @param error error message or NULL
+ * @param ud userdata
+ */
+typedef void (*rspamd_hs_cache_async_cb)(gboolean success,
+ const unsigned char *data,
+ gsize len,
+ const char *error,
+ void *ud);
+
+/**
+ * Save data to cache via Lua backend (asynchronous)
+ * @param cache_key unique cache key (hash)
+ * @param data serialized hyperscan data
+ * @param len data length
+ * @param cb completion callback
+ * @param ud userdata
+ */
+void rspamd_hs_cache_lua_save_async(const char *cache_key,
+ const unsigned char *data,
+ gsize len,
+ rspamd_hs_cache_async_cb cb,
+ void *ud);
+
+/**
+ * Load data from cache via Lua backend (asynchronous)
+ * @param cache_key unique cache key (hash)
+ * @param cb completion callback
+ * @param ud userdata
+ */
+void rspamd_hs_cache_lua_load_async(const char *cache_key,
+ rspamd_hs_cache_async_cb cb,
+ void *ud);
+
+/**
+ * Check if cache entry exists via Lua backend (asynchronous)
+ * @param cache_key unique cache key (hash)
+ * @param cb completion callback (len will be 1 if exists, 0 otherwise)
+ * @param ud userdata
+ */
+void rspamd_hs_cache_lua_exists_async(const char *cache_key,
+ rspamd_hs_cache_async_cb cb,
+ void *ud);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* RSPAMD_HS_CACHE_BACKEND_H */
#define HYPERSCAN_LOG_TAG "hsxxxx"
-// Hyperscan does not provide any API to check validity of it's databases
-// However, it is required for us to perform migrations properly without
-// failing at `hs_alloc_scratch` phase or even `hs_scan` which is **way too late**
-// Hence, we have to check hyperscan internal guts to prevent that situation...
-
#ifdef HS_MAJOR
#ifndef HS_VERSION_32BIT
#define HS_VERSION_32BIT ((HS_MAJOR << 24) | (HS_MINOR << 16) | (HS_PATCH << 8) | 0)
namespace rspamd::util {
-/*
- * A singleton class that is responsible for deletion of the outdated hyperscan files
- * One issue is that it must know about HS files in all workers, which is a problem
- * TODO: we need to export hyperscan caches from all workers to a single place where
- * we can clean them up (probably, to the main process)
- */
class hs_known_files_cache {
private:
// These fields are filled when we add new known cache files
virtual ~hs_known_files_cache()
{
- // Cleanup cache dir
cleanup_maybe();
}
return FALSE;
}
- // Map with MAP_SHARED for sharing between processes
void *map = mmap(nullptr, unserialized_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (map == MAP_FAILED) {
msg_err_hyperscan("cannot mmap temp file: %s", strerror(errno));
return FALSE;
}
- // Deserialize into mapped region
if (hs_deserialize_database_at(serialized_data, serialized_size, reinterpret_cast<hs_database_t *>(map)) != HS_SUCCESS) {
msg_err_hyperscan("cannot deserialize database into shared memory");
munmap(map, unserialized_size);
return FALSE;
}
- // Change protection to read-only
if (mprotect(map, unserialized_size, PROT_READ) == -1) {
msg_err_hyperscan("cannot mprotect shared memory: %s", strerror(errno));
}
return TRUE;
}
-
-/* Unified hyperscan format magic */
static const unsigned char rspamd_hs_magic[] = {'r', 's', 'h', 's', 'r', 'e', '1', '1'};
#define RSPAMD_HS_MAGIC_LEN (sizeof(rspamd_hs_magic))
#include "libserver/url.h"
#include "libserver/task.h"
#include "libserver/cfg_file.h"
+#include "libserver/hs_cache_backend.h"
#include "libutil/util.h"
#include "libutil/regexp.h"
#include "lua/lua_common.h"
#endif
#ifdef WITH_HYPERSCAN
+enum rspamd_re_cache_compile_state {
+ RSPAMD_RE_CACHE_COMPILE_STATE_INIT,
+ RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS,
+ RSPAMD_RE_CACHE_COMPILE_STATE_COMPILING,
+ RSPAMD_RE_CACHE_COMPILE_STATE_SAVING
+};
+
struct rspamd_re_cache_hs_compile_cbdata {
GHashTableIter it;
struct rspamd_re_cache *cache;
void (*cb)(unsigned int ncompiled, GError *err, void *cbd);
void *cbd;
+
+ /* Async state */
+ struct rspamd_re_class *current_class;
+ enum rspamd_re_cache_compile_state state;
+};
+
+struct rspamd_re_cache_async_ctx {
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata;
+ struct ev_loop *loop;
+ ev_timer *w;
+ int n;
};
static void
rspamd_re_cache_compile_err(EV_P_ ev_timer *w, GError *err,
- struct rspamd_re_cache_hs_compile_cbdata *cbdata, bool is_fatal)
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata, bool is_fatal);
+
+static void
+rspamd_re_cache_exists_cb(gboolean success, const unsigned char *data, gsize len, const char *err, void *ud)
+{
+ struct rspamd_re_cache_async_ctx *ctx = ud;
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata = ctx->cbdata;
+ const gboolean lua_backend = rspamd_hs_cache_has_lua_backend();
+ char path[PATH_MAX];
+
+ if (success && len > 0) {
+ /* Exists */
+ struct rspamd_re_class *re_class = cbdata->current_class;
+ struct rspamd_re_cache *cache = cbdata->cache;
+ int n = g_hash_table_size(re_class->re);
+
+ if (!lua_backend) {
+ rspamd_snprintf(path, sizeof(path), "%s%c%s.hs", cbdata->cache_dir,
+ G_DIR_SEPARATOR, re_class->hash);
+ }
+
+ if (re_class->type_len > 0) {
+ if (!cbdata->silent) {
+ msg_info_re_cache(
+ "skip already valid class %s(%*s) to cache %6s (%s), %d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ (int) re_class->type_len - 1,
+ re_class->type_data,
+ re_class->hash,
+ lua_backend ? "Lua backend" : path,
+ n,
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
+ }
+ else {
+ if (!cbdata->silent) {
+ msg_info_re_cache(
+ "skip already valid class %s to cache %6s (%s), %d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ re_class->hash,
+ lua_backend ? "Lua backend" : path,
+ n,
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
+ }
+
+ /* Skip compilation */
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
+ cbdata->current_class = NULL;
+ }
+ else {
+ /* Not exists, proceed */
+ if (err) {
+ msg_warn("cache check failed: %s", err);
+ }
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_COMPILING;
+ }
+
+ ev_timer_again(ctx->loop, ctx->w);
+ g_free(ctx);
+}
+
+static void
+rspamd_re_cache_save_cb(gboolean success, const unsigned char *data, gsize len, const char *err, void *ud)
{
- cbdata->cb(cbdata->total, err, cbdata->cbd);
+ struct rspamd_re_cache_async_ctx *ctx = ud;
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata = ctx->cbdata;
+
+ if (!success) {
+ GError *gerr = g_error_new(rspamd_re_cache_quark(), EINVAL,
+ "backend save failed: %s", err ? err : "unknown error");
+ rspamd_re_cache_compile_err(ctx->loop, ctx->w, gerr, cbdata, false);
+ }
+ else {
+ struct rspamd_re_class *re_class = cbdata->current_class;
+ struct rspamd_re_cache *cache = cbdata->cache;
+ int n = ctx->n;
+ if (re_class->type_len > 0) {
+ msg_info_re_cache(
+ "compiled class %s(%*s) to cache %6s (Lua backend), %d/%d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ (int) re_class->type_len - 1,
+ re_class->type_data,
+ re_class->hash,
+ n,
+ (int) g_hash_table_size(re_class->re),
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
+ else {
+ msg_info_re_cache(
+ "compiled class %s to cache %6s (Lua backend), %d/%d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ re_class->hash,
+ n,
+ (int) g_hash_table_size(re_class->re),
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
+ cbdata->total += n;
+ }
+
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
+ cbdata->current_class = NULL;
+
+ ev_timer_again(ctx->loop, ctx->w);
+ g_free(ctx);
+}
+
+static void
+rspamd_re_cache_compile_err(EV_P_ ev_timer *w, GError *err,
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata, bool is_fatal)
+{
if (is_fatal) {
+ cbdata->cb(cbdata->total, err, cbdata->cbd);
ev_timer_stop(EV_A_ w);
g_free(w);
g_free(cbdata);
}
else {
+ msg_err("hyperscan compilation error: %s", err->message);
/* Continue compilation */
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
+ cbdata->current_class = NULL;
ev_timer_again(EV_A_ w);
}
g_error_free(err);
return;
}
- if (!g_hash_table_iter_next(&cbdata->it, &k, &v)) {
- /* All done */
- ev_timer_stop(EV_A_ w);
- cbdata->cb(cbdata->total, NULL, cbdata->cbd);
- g_free(w);
- g_free(cbdata);
-
- return;
+ if (cbdata->current_class) {
+ re_class = cbdata->current_class;
}
+ else {
+ if (!g_hash_table_iter_next(&cbdata->it, &k, &v)) {
+ /* All done */
+ ev_timer_stop(EV_A_ w);
+ cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ g_free(w);
+ g_free(cbdata);
- re_class = v;
- rspamd_snprintf(path, sizeof(path), "%s%c%s.hs", cbdata->cache_dir,
- G_DIR_SEPARATOR, re_class->hash);
+ return;
+ }
- if (rspamd_re_cache_is_valid_hyperscan_file(cache, path, TRUE, TRUE, NULL)) {
- fd = open(path, O_RDONLY, 00600);
+ re_class = v;
+ cbdata->current_class = re_class;
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS;
+ }
+
+ if (cbdata->state == RSPAMD_RE_CACHE_COMPILE_STATE_CHECK_EXISTS) {
+ if (rspamd_hs_cache_has_lua_backend()) {
+ struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx));
+ ctx->cbdata = cbdata;
+ ctx->loop = loop;
+ ctx->w = w;
+ rspamd_hs_cache_lua_exists_async(re_class->hash, rspamd_re_cache_exists_cb, ctx);
+ ev_timer_stop(EV_A_ w);
+ return;
+ }
- /* Read number of regexps */
- g_assert(fd != -1);
- g_assert(lseek(fd, RSPAMD_HS_MAGIC_LEN + sizeof(cache->plt), SEEK_SET) != -1);
- g_assert(read(fd, &n, sizeof(n)) == sizeof(n));
- close(fd);
+ /* Check file backend */
+ rspamd_snprintf(path, sizeof(path), "%s%c%s.hs", cbdata->cache_dir,
+ G_DIR_SEPARATOR, re_class->hash);
+ if (rspamd_re_cache_is_valid_hyperscan_file(cache, path, TRUE, TRUE, NULL)) {
+ /* Read number of regexps for logging */
+ fd = open(path, O_RDONLY, 00600);
+
+ if (fd != -1) {
+ if (lseek(fd, RSPAMD_HS_MAGIC_LEN + sizeof(cache->plt), SEEK_SET) != -1) {
+ if (read(fd, &n, sizeof(n)) != sizeof(n)) {
+ n = 0;
+ }
+ }
+ close(fd);
+ }
- if (re_class->type_len > 0) {
- if (!cbdata->silent) {
- msg_info_re_cache(
- "skip already valid class %s(%*s) to cache %6s, %d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- (int) re_class->type_len - 1,
- re_class->type_data,
- re_class->hash,
- n,
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
+ if (re_class->type_len > 0) {
+ if (!cbdata->silent) {
+ msg_info_re_cache(
+ "skip already valid class %s(%*s) to cache %6s, %d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ (int) re_class->type_len - 1,
+ re_class->type_data,
+ re_class->hash,
+ n,
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
}
- }
- else {
- if (!cbdata->silent) {
- msg_info_re_cache(
- "skip already valid class %s to cache %6s, %d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- re_class->hash,
- n,
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
+ else {
+ if (!cbdata->silent) {
+ msg_info_re_cache(
+ "skip already valid class %s to cache %6s, %d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ re_class->hash,
+ n,
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
}
+
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
+ cbdata->current_class = NULL;
+ ev_timer_again(EV_A_ w);
+ return;
}
- ev_timer_again(EV_A_ w);
- return;
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_COMPILING;
}
- rspamd_snprintf(path, sizeof(path), "%s%c%s%P-XXXXXXXXXX", cbdata->cache_dir,
- G_DIR_SEPARATOR, re_class->hash, our_pid);
- fd = g_mkstemp_full(path, O_CREAT | O_TRUNC | O_EXCL | O_WRONLY, 00600);
+ /* Only create temp file if not using Lua backend */
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ rspamd_snprintf(path, sizeof(path), "%s%c%s%P-XXXXXXXXXX", cbdata->cache_dir,
+ G_DIR_SEPARATOR, re_class->hash, our_pid);
+ fd = g_mkstemp_full(path, O_CREAT | O_TRUNC | O_EXCL | O_WRONLY, 00600);
- if (fd == -1) {
- err = g_error_new(rspamd_re_cache_quark(), errno,
- "cannot open file %s: %s", path, strerror(errno));
- rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
- return;
+ if (fd == -1) {
+ err = g_error_new(rspamd_re_cache_quark(), errno,
+ "cannot open file %s: %s", path, strerror(errno));
+ rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
+ return;
+ }
+ }
+ else {
+ fd = -1; /* Not using file */
}
g_hash_table_iter_init(&cit, re_class->re);
if (n > 0) {
hs_errors = NULL;
+ if (cbdata->worker &&
+ cbdata->worker->state != rspamd_worker_state_running) {
+ CLEANUP_ALLOCATED(false);
+ ev_timer_stop(EV_A_ w);
+ cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ g_free(w);
+ g_free(cbdata);
+ return;
+ }
+
if (cbdata->worker) {
rspamd_worker_set_busy(cbdata->worker, EV_A, "compile hyperscan");
-
- if (cbdata->worker->state != rspamd_worker_state_running) {
- rspamd_worker_set_busy(cbdata->worker, EV_A, NULL);
- CLEANUP_ALLOCATED(false);
- ev_timer_stop(EV_A_ w);
- cbdata->cb(cbdata->total, NULL, cbdata->cbd);
- g_free(w);
- g_free(cbdata);
- return;
- }
}
hs_error_t compile_result = hs_compile_ext_multi((const char **) hs_pats,
if (cbdata->worker) {
rspamd_worker_set_busy(cbdata->worker, EV_A, NULL);
+ }
- if (cbdata->worker->state != rspamd_worker_state_running) {
- if (test_db) {
- hs_free_database(test_db);
- }
- CLEANUP_ALLOCATED(false);
- ev_timer_stop(EV_A_ w);
- cbdata->cb(cbdata->total, NULL, cbdata->cbd);
- g_free(w);
- g_free(cbdata);
- return;
+ if (cbdata->worker &&
+ cbdata->worker->state != rspamd_worker_state_running) {
+ if (test_db) {
+ hs_free_database(test_db);
}
+ CLEANUP_ALLOCATED(false);
+ ev_timer_stop(EV_A_ w);
+ cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ g_free(w);
+ g_free(cbdata);
+ return;
}
if (compile_result != HS_SUCCESS) {
iov[6].iov_base = hs_serialized;
iov[6].iov_len = serialized_len;
- if (writev(fd, iov, G_N_ELEMENTS(iov)) == -1) {
- err = g_error_new(rspamd_re_cache_quark(),
- errno,
- "cannot serialize tree of regexp to %s: %s",
- path, strerror(errno));
+ if (rspamd_hs_cache_has_lua_backend()) {
+ /* Build combined buffer for Lua backend */
+ gsize total_len = 0;
+ for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) {
+ total_len += iov[j].iov_len;
+ }
- CLEANUP_ALLOCATED(true);
- g_free(hs_serialized);
+ unsigned char *combined = g_malloc(total_len);
+ gsize offset = 0;
+ for (unsigned int j = 0; j < G_N_ELEMENTS(iov); j++) {
+ memcpy(combined + offset, iov[j].iov_base, iov[j].iov_len);
+ offset += iov[j].iov_len;
+ }
- rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
- return;
- }
+ struct rspamd_re_cache_async_ctx *ctx = g_malloc(sizeof(*ctx));
+ ctx->cbdata = cbdata;
+ ctx->loop = loop;
+ ctx->w = w;
+ ctx->n = n;
- if (re_class->type_len > 0) {
- msg_info_re_cache(
- "compiled class %s(%*s) to cache %6s, %d/%d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- (int) re_class->type_len - 1,
- re_class->type_data,
- re_class->hash,
- n,
- (int) g_hash_table_size(re_class->re),
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
+ rspamd_hs_cache_lua_save_async(re_class->hash, combined, total_len, rspamd_re_cache_save_cb, ctx);
+
+ g_free(combined);
+ CLEANUP_ALLOCATED(false);
+ g_free(hs_serialized);
+ ev_timer_stop(EV_A_ w);
+ return;
}
else {
- msg_info_re_cache(
- "compiled class %s to cache %6s, %d/%d regexps%s%s%s",
- rspamd_re_cache_type_to_string(re_class->type),
- re_class->hash,
- n,
- (int) g_hash_table_size(re_class->re),
- cache->scope ? " for scope '" : "",
- cache->scope ? cache->scope : "",
- cache->scope ? "'" : "");
- }
+ /* Use file backend */
+ if (writev(fd, iov, G_N_ELEMENTS(iov)) == -1) {
+ err = g_error_new(rspamd_re_cache_quark(),
+ errno,
+ "cannot serialize tree of regexp to %s: %s",
+ path, strerror(errno));
- cbdata->total += n;
- CLEANUP_ALLOCATED(false);
+ CLEANUP_ALLOCATED(true);
+ g_free(hs_serialized);
- /* Now rename temporary file to the new .hs file */
- rspamd_snprintf(npath, sizeof(npath), "%s%c%s.hs", cbdata->cache_dir,
- G_DIR_SEPARATOR, re_class->hash);
+ rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
+ return;
+ }
+
+ CLEANUP_ALLOCATED(false);
+
+ /* File backend: rename temporary file to the new .hs file */
+ rspamd_snprintf(npath, sizeof(npath), "%s%c%s.hs", cbdata->cache_dir,
+ G_DIR_SEPARATOR, re_class->hash);
+
+ if (rename(path, npath) == -1) {
+ err = g_error_new(rspamd_re_cache_quark(),
+ errno,
+ "cannot rename %s to %s: %s",
+ path, npath, strerror(errno));
+ unlink(path);
+ close(fd);
+
+ rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
+ return;
+ }
- if (rename(path, npath) == -1) {
- err = g_error_new(rspamd_re_cache_quark(),
- errno,
- "cannot rename %s to %s: %s",
- path, npath, strerror(errno));
- unlink(path);
close(fd);
- rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
- return;
+ if (re_class->type_len > 0) {
+ msg_info_re_cache(
+ "compiled class %s(%*s) to cache %6s (%s), %d/%d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ (int) re_class->type_len - 1,
+ re_class->type_data,
+ re_class->hash,
+ npath,
+ n,
+ (int) g_hash_table_size(re_class->re),
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
+ else {
+ msg_info_re_cache(
+ "compiled class %s to cache %6s (%s), %d/%d regexps%s%s%s",
+ rspamd_re_cache_type_to_string(re_class->type),
+ re_class->hash,
+ npath,
+ n,
+ (int) g_hash_table_size(re_class->re),
+ cache->scope ? " for scope '" : "",
+ cache->scope ? cache->scope : "",
+ cache->scope ? "'" : "");
+ }
+
+ cbdata->total += n;
}
- close(fd);
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
+ cbdata->current_class = NULL;
}
else {
err = g_error_new(rspamd_re_cache_quark(),
CLEANUP_ALLOCATED(true);
rspamd_re_cache_compile_err(EV_A_ w, err, cbdata, false);
+ cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
+ cbdata->current_class = NULL;
return;
}
#ifndef WITH_HYPERSCAN
return -1;
#else
- static ev_timer *timer;
+ ev_timer *timer;
static const ev_tstamp timer_interval = 0.1;
struct rspamd_re_cache_hs_compile_cbdata *cbdata;
cbdata->total = 0;
cbdata->worker = worker;
timer = g_malloc0(sizeof(*timer));
- timer->data = (void *) cbdata; /* static */
+ timer->data = (void *) cbdata;
ev_timer_init(timer, rspamd_re_cache_compile_timer_cb,
timer_interval, timer_interval);
#endif
}
+#ifdef WITH_HYPERSCAN
+struct rspamd_re_cache_hs_load_item {
+ struct rspamd_re_cache_hs_load_scope *scope_ctx;
+ struct rspamd_re_cache *cache;
+ struct rspamd_re_class *re_class;
+ char *cache_key;
+};
+
+struct rspamd_re_cache_hs_load_scope {
+ struct rspamd_re_cache *cache;
+ struct ev_loop *event_loop;
+ bool try_load;
+ unsigned int pending;
+ unsigned int total;
+ unsigned int loaded;
+ gboolean all_loaded;
+};
+
+static gboolean
+rspamd_re_cache_apply_hyperscan_blob(struct rspamd_re_cache *cache,
+ struct rspamd_re_class *re_class,
+ const unsigned char *data,
+ gsize len,
+ bool try_load)
+{
+ GError *err = NULL;
+ rspamd_hyperscan_t *hs_db;
+ const char *p;
+ unsigned int n;
+ const unsigned int *ids;
+ const unsigned int *flags;
+ int ret;
+
+ hs_db = rspamd_hyperscan_load_from_header((const char *) data, len, &err);
+ if (!hs_db) {
+ if (!try_load) {
+ msg_err_re_cache("cannot load hyperscan class %s: %s",
+ re_class->hash,
+ err ? err->message : "unknown error");
+ }
+ else {
+ msg_debug_re_cache("cannot load hyperscan class %s: %s",
+ re_class->hash,
+ err ? err->message : "unknown error");
+ }
+ g_clear_error(&err);
+ return FALSE;
+ }
+
+ /* Parse ids/flags from the unified header */
+ if (len < RSPAMD_HS_MAGIC_LEN + sizeof(hs_platform_info_t) + sizeof(unsigned int) + sizeof(uint64_t)) {
+ rspamd_hyperscan_free(hs_db, true);
+ return FALSE;
+ }
+
+ p = (const char *) data + RSPAMD_HS_MAGIC_LEN + sizeof(hs_platform_info_t);
+ memcpy(&n, p, sizeof(n));
+ p += sizeof(n);
+
+ if ((gsize) (p - (const char *) data) + (gsize) n * sizeof(unsigned int) * 2 + sizeof(uint64_t) > len) {
+ rspamd_hyperscan_free(hs_db, true);
+ return FALSE;
+ }
+
+ ids = (const unsigned int *) p;
+ p += n * sizeof(unsigned int);
+ flags = (const unsigned int *) p;
+
+ /* Cleanup old */
+ if (re_class->hs_scratch) {
+ hs_free_scratch(re_class->hs_scratch);
+ re_class->hs_scratch = NULL;
+ }
+ if (re_class->hs_db) {
+ rspamd_hyperscan_free(re_class->hs_db, false);
+ re_class->hs_db = NULL;
+ }
+ if (re_class->hs_ids) {
+ g_free(re_class->hs_ids);
+ re_class->hs_ids = NULL;
+ }
+
+ /* Apply match types */
+ for (unsigned int i = 0; i < n; i++) {
+ if ((int) ids[i] < 0 || ids[i] >= (unsigned int) cache->re->len) {
+ continue;
+ }
+ struct rspamd_re_cache_elt *elt = g_ptr_array_index(cache->re, ids[i]);
+ if (flags[i] & HS_FLAG_PREFILTER) {
+ elt->match_type = RSPAMD_RE_CACHE_HYPERSCAN_PRE;
+ }
+ else {
+ elt->match_type = RSPAMD_RE_CACHE_HYPERSCAN;
+ }
+ }
+
+ /* Store ids */
+ re_class->hs_ids = g_malloc(sizeof(int) * n);
+ for (unsigned int i = 0; i < n; i++) {
+ re_class->hs_ids[i] = (int) ids[i];
+ }
+ re_class->nhs = (int) n;
+ re_class->hs_db = hs_db;
+
+ if ((ret = hs_alloc_scratch(rspamd_hyperscan_get_database(re_class->hs_db),
+ &re_class->hs_scratch)) != HS_SUCCESS) {
+ if (!try_load) {
+ msg_err_re_cache("cannot allocate scratch for hs class %s: %d",
+ re_class->hash, ret);
+ }
+ rspamd_hyperscan_free(re_class->hs_db, true);
+ re_class->hs_db = NULL;
+ g_free(re_class->hs_ids);
+ re_class->hs_ids = NULL;
+ re_class->nhs = 0;
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+rspamd_re_cache_hs_load_item_free(struct rspamd_re_cache_hs_load_item *it)
+{
+ if (!it) return;
+ g_free(it->cache_key);
+ g_free(it);
+}
+
+static void
+rspamd_re_cache_hs_load_cb(gboolean success, const unsigned char *data, gsize len,
+ const char *err, void *ud)
+{
+ struct rspamd_re_cache_hs_load_item *it = (struct rspamd_re_cache_hs_load_item *) ud;
+ struct rspamd_re_cache_hs_load_scope *sctx = it->scope_ctx;
+
+ if (success && data && len > 0) {
+ if (rspamd_re_cache_apply_hyperscan_blob(it->cache, it->re_class, data, len, sctx->try_load)) {
+ sctx->loaded++;
+ }
+ else {
+ sctx->all_loaded = FALSE;
+ }
+ }
+ else {
+ /* cache miss or error */
+ sctx->all_loaded = FALSE;
+ (void) err;
+ }
+
+ if (sctx->pending > 0) {
+ sctx->pending--;
+ }
+
+ if (sctx->pending == 0) {
+ if (sctx->loaded > 0) {
+ sctx->cache->hyperscan_loaded = sctx->all_loaded ? RSPAMD_HYPERSCAN_LOADED_FULL : RSPAMD_HYPERSCAN_LOADED_PARTIAL;
+ }
+ else {
+ sctx->cache->hyperscan_loaded = RSPAMD_HYPERSCAN_LOAD_ERROR;
+ }
+ g_free(sctx);
+ }
+
+ rspamd_re_cache_hs_load_item_free(it);
+}
+
+void rspamd_re_cache_load_hyperscan_scoped_async(struct rspamd_re_cache *cache_head,
+ struct ev_loop *event_loop,
+ const char *cache_dir,
+ bool try_load)
+{
+ struct rspamd_re_cache *cur;
+
+ if (!cache_head || !event_loop) {
+ return;
+ }
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ /* Fallback to synchronous file loading */
+ (void) rspamd_re_cache_load_hyperscan_scoped(cache_head, cache_dir, try_load);
+ return;
+ }
+
+ DL_FOREACH(cache_head, cur)
+ {
+ struct rspamd_re_cache_hs_load_scope *sctx = g_malloc0(sizeof(*sctx));
+ GHashTableIter it;
+ gpointer k, v;
+
+ sctx->cache = cur;
+ sctx->event_loop = event_loop;
+ sctx->try_load = try_load;
+ sctx->pending = 0;
+ sctx->total = 0;
+ sctx->loaded = 0;
+ sctx->all_loaded = TRUE;
+
+ g_hash_table_iter_init(&it, cur->re_classes);
+ while (g_hash_table_iter_next(&it, &k, &v)) {
+ struct rspamd_re_class *re_class = (struct rspamd_re_class *) v;
+ struct rspamd_re_cache_hs_load_item *item = g_malloc0(sizeof(*item));
+ item->scope_ctx = sctx;
+ item->cache = cur;
+ item->re_class = re_class;
+ item->cache_key = g_strdup(re_class->hash);
+ sctx->pending++;
+ sctx->total++;
+ rspamd_hs_cache_lua_load_async(item->cache_key, rspamd_re_cache_hs_load_cb, item);
+ }
+
+ if (sctx->pending == 0) {
+ g_free(sctx);
+ }
+ }
+}
+#endif
+
void rspamd_re_cache_add_selector(struct rspamd_re_cache *cache,
const char *sname,
int ref)
struct rspamd_re_cache_hs_compile_scoped_cbdata *scoped_cbd =
(struct rspamd_re_cache_hs_compile_scoped_cbdata *) cbd;
- /* Remove lock */
- rspamd_re_cache_remove_scope_lock(scoped_cbd->cache_dir, scoped_cbd->scope,
- scoped_cbd->lock_fd);
-
/* Call original callback */
if (scoped_cbd->cb) {
scoped_cbd->cb(scoped_cbd->scope, ncompiled, err, scoped_cbd->cbd);
}
- g_free(scoped_cbd);
+ /*
+ * Only free when compilation is complete (err==NULL means done).
+ * When err!=NULL, it's a per-class error and compilation continues,
+ * so we must not free yet - we'll be called again.
+ */
+ if (err == NULL) {
+ /* Remove lock only when done */
+ rspamd_re_cache_remove_scope_lock(scoped_cbd->cache_dir, scoped_cbd->scope,
+ scoped_cbd->lock_fd);
+ g_free(scoped_cbd);
+ }
}
int rspamd_re_cache_compile_hyperscan_scoped_single(struct rspamd_re_cache *cache,
struct rspamd_re_cache *cache_head,
const char *cache_dir, bool try_load);
+/**
+ * Asynchronously load hyperscan cache for all scopes using the configured cache backend
+ * (Lua backend if present, otherwise filesystem).
+ *
+ * This function does not block; it schedules async loads and applies hot-swap when
+ * blobs arrive.
+ */
+void rspamd_re_cache_load_hyperscan_scoped_async(struct rspamd_re_cache *cache_head,
+ struct ev_loop *event_loop,
+ const char *cache_dir,
+ bool try_load);
+
/**
* Compile expressions to the hyperscan tree for a single scope with locking
*/
break;
case RSPAMD_SRV_HYPERSCAN_LOADED:
#ifdef WITH_HYPERSCAN
- /* Load RE cache to provide it for new forks */
+ /* Main process just broadcasts cache update events to workers */
if (cmd.cmd.hs_loaded.scope[0] != '\0') {
/* Scoped loading */
const char *scope = cmd.cmd.hs_loaded.scope;
msg_info_main("received scoped hyperscan cache loaded from %s for scope: %s",
cmd.cmd.hs_loaded.cache_dir, scope);
- /* Load specific scope */
- rspamd_re_cache_load_hyperscan_scoped(
- rspamd_main->cfg->re_cache,
- cmd.cmd.hs_loaded.cache_dir,
- false);
-
/* Broadcast scoped command to all workers */
memset(&wcmd, 0, sizeof(wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
rspamd_control_ignore_io_handler, NULL, worker->pid);
}
else {
- /* Legacy full cache loading */
- if (rspamd_re_cache_is_hs_loaded(rspamd_main->cfg->re_cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
- cmd.cmd.hs_loaded.forced) {
- rspamd_re_cache_load_hyperscan(
- rspamd_main->cfg->re_cache,
- cmd.cmd.hs_loaded.cache_dir,
- false);
- }
+ /* Legacy full cache update */
/* After getting this notice, we can clean up old hyperscan files */
rspamd_hyperscan_notice_loaded();
if (cmd.cmd.busy.is_busy) {
rspamd_strlcpy(worker->hb.busy_reason, cmd.cmd.busy.reason,
sizeof(worker->hb.busy_reason));
- msg_info_main("worker type %s with pid %P marked as busy: %s",
- g_quark_to_string(worker->type), worker->pid,
- worker->hb.busy_reason);
+ rspamd_default_log_function(G_LOG_LEVEL_DEBUG,
+ rspamd_main->server_pool->tag.tagname,
+ rspamd_main->server_pool->tag.uid,
+ RSPAMD_LOG_FUNC,
+ "worker type %s with pid %P marked as busy: %s",
+ g_quark_to_string(worker->type), worker->pid,
+ worker->hb.busy_reason);
}
else {
- msg_info_main("worker type %s with pid %P finished: %s",
- g_quark_to_string(worker->type), worker->pid,
- worker->hb.busy_reason);
+ rspamd_default_log_function(G_LOG_LEVEL_DEBUG,
+ rspamd_main->server_pool->tag.tagname,
+ rspamd_main->server_pool->tag.uid,
+ RSPAMD_LOG_FUNC,
+ "worker type %s with pid %P finished: %s",
+ g_quark_to_string(worker->type), worker->pid,
+ worker->hb.busy_reason);
worker->hb.busy_reason[0] = '\0';
}
break;
case RSPAMD_SRV_BUSY:
reply = "busy";
break;
+ default:
+ break;
}
return reply;
}
-struct rspamd_busy_cb_data {
- gboolean completed;
-};
-
-static void
-rspamd_worker_busy_reply_handler(struct rspamd_worker *worker,
- struct rspamd_srv_reply *rep,
- int rep_fd,
- gpointer ud)
-{
- struct rspamd_busy_cb_data *cbd = (struct rspamd_busy_cb_data *) ud;
- cbd->completed = TRUE;
-}
-
void rspamd_worker_set_busy(struct rspamd_worker *worker,
struct ev_loop *event_loop,
const char *reason)
{
struct rspamd_srv_command srv_cmd;
- struct rspamd_busy_cb_data cbd;
- int max_iterations = 100; /* Safety limit */
- /* Don't send if worker is terminating */
if (worker->state != rspamd_worker_state_running) {
return;
}
memset(&srv_cmd, 0, sizeof(srv_cmd));
srv_cmd.type = RSPAMD_SRV_BUSY;
srv_cmd.cmd.busy.is_busy = (reason != NULL);
- if (reason) {
+
+ if (reason != NULL) {
rspamd_strlcpy(srv_cmd.cmd.busy.reason, reason,
sizeof(srv_cmd.cmd.busy.reason));
}
-
- cbd.completed = FALSE;
- rspamd_srv_send_command(worker, event_loop, &srv_cmd, -1,
- rspamd_worker_busy_reply_handler, &cbd);
-
- /* Run the event loop until the notification is acknowledged
- * Also stop if worker starts terminating (signal received during wait) */
- while (!cbd.completed && max_iterations-- > 0 &&
- worker->state == rspamd_worker_state_running) {
- ev_run(event_loop, EVRUN_ONCE);
+ else {
+ srv_cmd.cmd.busy.reason[0] = '\0';
}
- /* If worker is terminating, propagate the break to the outer event loop */
- if (worker->state != rspamd_worker_state_running) {
- ev_break(event_loop, EVBREAK_ALL);
- }
- else if (!cbd.completed) {
- msg_warn("busy notification may not have reached main process");
- }
+ rspamd_srv_send_command(worker, event_loop, &srv_cmd, -1, NULL, NULL);
}
RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
RSPAMD_SRV_WORKERS_SPAWNED, /* Used to notify workers that all workers have been spawned */
RSPAMD_SRV_MULTIPATTERN_LOADED, /* Multipattern HS compiled and ready */
- RSPAMD_SRV_BUSY, /* Worker is busy with long-running operation, suspend heartbeat */
+ RSPAMD_SRV_BUSY, /* Worker is busy with long-running operation */
};
enum rspamd_log_pipe_type {
gboolean forced;
char cache_dir[CONTROL_PATHLEN];
char scope[64]; /* Scope name, NULL means all scopes */
- gsize fd_size; /* Size of FD-based db, 0 if not using FD */
} hs_loaded;
struct {
char name[64];
gboolean forced;
char cache_dir[CONTROL_PATHLEN];
char scope[64]; /* Scope name, NULL means all scopes */
- gsize fd_size; /* Size of FD-based db, 0 if not using FD */
} hs_loaded;
struct {
char tag[32];
char name[64];
char cache_dir[CONTROL_PATHLEN];
} mp_loaded;
- /* Sent when worker starts/finishes long-running operation */
struct {
gboolean is_busy;
- char reason[32]; /* Short reason like "compile hyperscan" */
+ char reason[32];
} busy;
} cmd;
};
*/
void rspamd_pending_control_free(gpointer p);
-/**
- * Notify main process that worker is busy with long-running operation
- * Main process will skip heartbeat checks while worker is busy
- * @param worker worker instance
- * @param event_loop worker event loop
- * @param reason short reason string (e.g., "compile hyperscan"), NULL to clear
- */
void rspamd_worker_set_busy(struct rspamd_worker *worker,
struct ev_loop *event_loop,
const char *reason);
#include "utlist.h"
#include "ottery.h"
#include "rspamd_control.h"
+#include "hs_cache_backend.h"
#include "libserver/maps/map.h"
#include "libserver/maps/map_private.h"
#include "libserver/http/http_private.h"
rspamd_redis_pool_config(worker->srv->cfg->redis_pool,
worker->srv->cfg, event_loop);
+#ifdef WITH_HYPERSCAN
+ /* Ensure HS cache Lua backend is configured in this worker if hs_helper uses it */
+ rspamd_hs_cache_try_init_lua_backend(worker->srv->cfg, event_loop);
+#endif
+
/* Accept all sockets */
if (hdl) {
cur = worker->cf->listen_socks;
rspamd_main = wrk->srv;
if (wrk->hb.is_busy || rspamd_main->wanna_die) {
- /* Worker is doing long-running operation or we're shutting down,
- * skip heartbeat check */
return;
}
}
#ifdef WITH_HYPERSCAN
+struct rspamd_worker_mp_async_cbdata {
+ char *name;
+ char *cache_dir;
+ struct rspamd_multipattern *mp;
+};
+
+static void
+rspamd_worker_multipattern_async_loaded(gboolean success, void *ud)
+{
+ struct rspamd_worker_mp_async_cbdata *cbd = (struct rspamd_worker_mp_async_cbdata *) ud;
+
+ if (success) {
+ msg_info("multipattern '%s' hot-swapped to hyperscan (backend)", cbd->name);
+ }
+ else {
+ /* Try file fallback if available */
+ if (cbd->mp && cbd->cache_dir && rspamd_multipattern_load_from_cache(cbd->mp, cbd->cache_dir)) {
+ msg_info("multipattern '%s' hot-swapped to hyperscan (file fallback)", cbd->name);
+ }
+ else {
+ msg_warn("failed to hot-swap multipattern '%s' to hyperscan, continuing with ACISM fallback",
+ cbd->name);
+ }
+ }
+
+ g_free(cbd->name);
+ g_free(cbd->cache_dir);
+ g_free(cbd);
+}
+
gboolean
rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, int fd,
memset(&rep, 0, sizeof(rep));
rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- /* FD-based loading infrastructure - close unused FD for now */
- if (attached_fd >= 0 && cmd->cmd.hs_loaded.fd_size > 0) {
- close(attached_fd);
- attached_fd = -1;
- }
-
- /* Check if this is a scoped notification */
- if (cmd->cmd.hs_loaded.scope[0] != '\0') {
- /* Scoped hyperscan loading */
- const char *scope = cmd->cmd.hs_loaded.scope;
-
- msg_info("loading hyperscan expressions for scope '%s' after receiving compilation notice", scope);
-
- rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan_scoped(
- cache, cmd->cmd.hs_loaded.cache_dir, false);
+ if (rspamd_hs_cache_has_lua_backend()) {
+ /*
+ * Backend-based hot-swap: schedule async loads to avoid blocking control pipe.
+ * Reply immediately.
+ */
+ rspamd_re_cache_load_hyperscan_scoped_async(cache, worker->srv->event_loop,
+ cmd->cmd.hs_loaded.cache_dir, false);
+ rep.reply.hs_loaded.status = 0;
}
else {
- /* Legacy/full cache loading */
- if (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
- cmd->cmd.hs_loaded.forced) {
-
- msg_info("loading hyperscan expressions after receiving compilation "
- "notice: %s",
- (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update");
- rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan(
- worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false);
+ /* File-based loading (legacy, synchronous) */
+ if (cmd->cmd.hs_loaded.scope[0] != '\0') {
+ const char *scope = cmd->cmd.hs_loaded.scope;
+ msg_info("loading hyperscan expressions for scope '%s' after receiving compilation notice", scope);
+ rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan_scoped(
+ cache, cmd->cmd.hs_loaded.cache_dir, false);
+ }
+ else {
+ if (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
+ cmd->cmd.hs_loaded.forced) {
+ msg_info("loading hyperscan expressions after receiving compilation notice: %s",
+ (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update");
+ rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan(
+ worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false);
+ }
}
}
mp = rspamd_multipattern_find_pending(name);
if (mp != NULL) {
- if (rspamd_multipattern_load_from_cache(mp, cache_dir)) {
- msg_info("multipattern '%s' hot-swapped to hyperscan", name);
+ if (rspamd_hs_cache_has_lua_backend()) {
+ struct rspamd_worker_mp_async_cbdata *cbd = g_malloc0(sizeof(*cbd));
+ cbd->name = g_strdup(name);
+ cbd->cache_dir = g_strdup(cache_dir);
+ cbd->mp = mp;
+ rspamd_multipattern_load_from_cache_async(mp, cache_dir, worker->srv->event_loop,
+ rspamd_worker_multipattern_async_loaded, cbd);
rep.reply.hs_loaded.status = 0;
}
else {
- msg_warn("failed to load multipattern '%s' from cache, "
- "continuing with ACISM fallback",
- name);
- rep.reply.hs_loaded.status = ENOENT;
+ if (rspamd_multipattern_load_from_cache(mp, cache_dir)) {
+ msg_info("multipattern '%s' hot-swapped to hyperscan", name);
+ rep.reply.hs_loaded.status = 0;
+ }
+ else {
+ msg_warn("failed to load multipattern '%s' from cache, continuing with ACISM fallback",
+ name);
+ rep.reply.hs_loaded.status = ENOENT;
+ }
}
}
else {
#include "libutil/str_util.h"
#include "libcryptobox/cryptobox.h"
#include "logger.h"
+#include "libserver/hs_cache_backend.h"
#ifdef WITH_HYPERSCAN
#include "unix-std.h"
hs_free_database(db);
- /* Write to temp file and rename */
+ /* Generate cache key from hash */
+ char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1];
+ rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs",
+ (int) rspamd_cryptobox_HASHBYTES / 2, hash);
+
+ /*
+ * Multipattern cache is consumed by rspamd_multipattern_load_from_cache(),
+ * which currently loads from filesystem. Hence, always save to file cache.
+ */
rspamd_snprintf(fp, sizeof(fp), "%s%chs-mp-XXXXXXXXXXXXX",
cache_dir, G_DIR_SEPARATOR);
close(fd);
/* Rename to final path */
- rspamd_snprintf(np, sizeof(np), "%s/%*xs.hs", cache_dir,
- (int) rspamd_cryptobox_HASHBYTES / 2, hash);
+ rspamd_snprintf(np, sizeof(np), "%s/%s.hs", cache_dir, cache_key);
if (rename(fp, np) == -1) {
g_set_error(err, rspamd_multipattern_quark(), errno,
}
rspamd_hyperscan_notice_known(np);
- msg_info("saved hyperscan database to %s (%z bytes)", np, len);
+ msg_info("saved hyperscan multipattern database to %s (%z bytes)", np, len);
return TRUE;
#else
#endif
}
+struct rspamd_multipattern_hs_cache_async_ctx {
+ struct rspamd_multipattern *mp;
+ char *cache_key;
+ rspamd_multipattern_hs_cache_cb_t cb;
+ void *ud;
+};
+
+static void
+rspamd_multipattern_hs_cache_async_ctx_free(struct rspamd_multipattern_hs_cache_async_ctx *ctx)
+{
+ if (!ctx) return;
+ g_free(ctx->cache_key);
+ g_free(ctx);
+}
+
+static void
+rspamd_multipattern_hs_cache_save_cb(gboolean success,
+ const unsigned char *data,
+ gsize len,
+ const char *error,
+ void *ud)
+{
+ struct rspamd_multipattern_hs_cache_async_ctx *ctx = (struct rspamd_multipattern_hs_cache_async_ctx *) ud;
+ GError *err = NULL;
+
+ (void) data;
+ (void) len;
+
+ if (!success) {
+ g_set_error(&err, rspamd_multipattern_quark(), EIO,
+ "cannot save multipattern cache %s: %s",
+ ctx->cache_key ? ctx->cache_key : "(null)",
+ error ? error : "unknown error");
+ }
+
+ if (ctx->cb) {
+ ctx->cb(ctx->mp, success, err, ctx->ud);
+ }
+
+ if (err) {
+ g_error_free(err);
+ }
+
+ rspamd_multipattern_hs_cache_async_ctx_free(ctx);
+}
+
+void rspamd_multipattern_compile_hs_to_cache_async(struct rspamd_multipattern *mp,
+ const char *cache_dir,
+ struct ev_loop *event_loop,
+ rspamd_multipattern_hs_cache_cb_t cb,
+ void *ud)
+{
+ GError *err = NULL;
+
+ (void) event_loop;
+
+ if (!rspamd_hs_cache_has_lua_backend()) {
+ /* Legacy file-only path */
+ gboolean ok = rspamd_multipattern_compile_hs_to_cache(mp, cache_dir, &err);
+ if (cb) {
+ cb(mp, ok, err, ud);
+ }
+ if (err) {
+ g_error_free(err);
+ }
+ return;
+ }
+
+#ifdef WITH_HYPERSCAN
+ hs_platform_info_t plt;
+ hs_compile_error_t *hs_errors = NULL;
+ hs_database_t *db = NULL;
+ unsigned char hash[rspamd_cryptobox_HASHBYTES];
+ char *bytes = NULL;
+ gsize len = 0;
+ char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1];
+
+ if (!mp || !cache_dir) {
+ if (cb) {
+ g_set_error(&err, rspamd_multipattern_quark(), EINVAL, "invalid arguments");
+ cb(mp, FALSE, err, ud);
+ g_error_free(err);
+ }
+ return;
+ }
+
+ if (mp->state != RSPAMD_MP_STATE_COMPILING || mp->hs_pats == NULL || mp->cnt == 0) {
+ if (cb) {
+ g_set_error(&err, rspamd_multipattern_quark(), EINVAL, "multipattern is not ready for compilation");
+ cb(mp, FALSE, err, ud);
+ g_error_free(err);
+ }
+ return;
+ }
+
+ g_assert(hs_populate_platform(&plt) == HS_SUCCESS);
+ rspamd_multipattern_get_hash(mp, hash);
+ rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs",
+ (int) rspamd_cryptobox_HASHBYTES / 2, hash);
+
+ msg_info("compiling hyperscan database for %ud patterns", mp->cnt);
+
+ if (hs_compile_multi((const char *const *) mp->hs_pats->data,
+ (const unsigned int *) mp->hs_flags->data,
+ (const unsigned int *) mp->hs_ids->data,
+ mp->cnt,
+ HS_MODE_BLOCK,
+ &plt,
+ &db,
+ &hs_errors) != HS_SUCCESS) {
+ g_set_error(&err, rspamd_multipattern_quark(), EINVAL,
+ "cannot compile hyperscan: %s (pattern %d)",
+ hs_errors->message, hs_errors->expression);
+ hs_free_compile_error(hs_errors);
+ if (cb) {
+ cb(mp, FALSE, err, ud);
+ }
+ g_error_free(err);
+ return;
+ }
+
+ if (!rspamd_hyperscan_serialize_with_header(db, NULL, NULL, 0, &bytes, &len)) {
+ hs_free_database(db);
+ g_set_error(&err, rspamd_multipattern_quark(), EINVAL,
+ "cannot serialize hyperscan database");
+ if (cb) {
+ cb(mp, FALSE, err, ud);
+ }
+ g_error_free(err);
+ return;
+ }
+
+ hs_free_database(db);
+
+ /* save_async copies bytes into Lua string (lua_pushlstring), safe to free immediately */
+ struct rspamd_multipattern_hs_cache_async_ctx *ctx = g_malloc0(sizeof(*ctx));
+ ctx->mp = mp;
+ ctx->cache_key = g_strdup(cache_key);
+ ctx->cb = cb;
+ ctx->ud = ud;
+
+ rspamd_hs_cache_lua_save_async(cache_key, (const unsigned char *) bytes, len,
+ rspamd_multipattern_hs_cache_save_cb, ctx);
+
+ g_free(bytes);
+
+ msg_info("saved hyperscan multipattern database to Lua backend (%z bytes)", len);
+#else
+ if (cb) {
+ g_set_error(&err, rspamd_multipattern_quark(), ENOTSUP,
+ "hyperscan not available");
+ cb(mp, FALSE, err, ud);
+ g_error_free(err);
+ }
+#endif
+}
+
gboolean
rspamd_multipattern_load_from_cache(struct rspamd_multipattern *mp,
const char *cache_dir)
return FALSE;
#endif
}
+
+#ifdef WITH_HYPERSCAN
+struct rspamd_multipattern_load_ctx {
+ struct rspamd_multipattern *mp;
+ char *cache_dir;
+ char *cache_key;
+ void (*cb)(gboolean success, void *ud);
+ void *ud;
+};
+
+static void
+rspamd_multipattern_load_ctx_free(struct rspamd_multipattern_load_ctx *ctx)
+{
+ if (!ctx) return;
+ g_free(ctx->cache_dir);
+ g_free(ctx->cache_key);
+ g_free(ctx);
+}
+
+static void
+rspamd_multipattern_load_from_cache_cb(gboolean success,
+ const unsigned char *data,
+ gsize len,
+ const char *err,
+ void *ud)
+{
+ struct rspamd_multipattern_load_ctx *ctx = (struct rspamd_multipattern_load_ctx *) ud;
+ struct rspamd_multipattern *mp = ctx->mp;
+ GError *gerr = NULL;
+ gboolean ok = FALSE;
+
+ if (success && data && len > 0) {
+ if (mp->state == RSPAMD_MP_STATE_COMPILING) {
+ mp->hs_db = rspamd_hyperscan_load_from_header((const char *) data, len, &gerr);
+ if (mp->hs_db) {
+ if (rspamd_multipattern_alloc_scratch(mp, &gerr)) {
+ mp->state = RSPAMD_MP_STATE_COMPILED;
+ ok = TRUE;
+ }
+ else {
+ rspamd_hyperscan_free(mp->hs_db, true);
+ mp->hs_db = NULL;
+ }
+ }
+ }
+ }
+ else {
+ (void) err;
+ }
+
+ if (!ok && gerr) {
+ msg_debug("multipattern hs load failed: %s", gerr->message);
+ }
+ g_clear_error(&gerr);
+
+ if (ctx->cb) {
+ ctx->cb(ok, ctx->ud);
+ }
+
+ rspamd_multipattern_load_ctx_free(ctx);
+}
+#endif
+
+void rspamd_multipattern_load_from_cache_async(struct rspamd_multipattern *mp,
+ const char *cache_dir,
+ struct ev_loop *event_loop,
+ void (*cb)(gboolean success, void *ud),
+ void *ud)
+{
+#ifdef WITH_HYPERSCAN
+ unsigned char hash[rspamd_cryptobox_HASHBYTES];
+ char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1];
+
+ if (!mp || !cache_dir) {
+ if (cb) cb(FALSE, ud);
+ return;
+ }
+
+ if (mp->state != RSPAMD_MP_STATE_COMPILING) {
+ if (cb) cb(FALSE, ud);
+ return;
+ }
+
+ /* Calculate hash for cache key */
+ rspamd_multipattern_get_hash(mp, hash);
+ rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs",
+ (int) rspamd_cryptobox_HASHBYTES / 2, hash);
+
+ if (rspamd_hs_cache_has_lua_backend()) {
+ struct rspamd_multipattern_load_ctx *ctx = g_malloc0(sizeof(*ctx));
+ ctx->mp = mp;
+ ctx->cache_dir = g_strdup(cache_dir);
+ ctx->cache_key = g_strdup(cache_key);
+ ctx->cb = cb;
+ ctx->ud = ud;
+ (void) event_loop;
+ rspamd_hs_cache_lua_load_async(ctx->cache_key, rspamd_multipattern_load_from_cache_cb, ctx);
+ return;
+ }
+
+ /* File backend fallback (synchronous) */
+ if (cb) {
+ cb(rspamd_multipattern_load_from_cache(mp, cache_dir), ud);
+ }
+#else
+ (void) mp;
+ (void) cache_dir;
+ (void) event_loop;
+ if (cb) cb(FALSE, ud);
+ (void) ud;
+#endif
+}
extern "C" {
#endif
+struct ev_loop;
+
enum rspamd_multipattern_flags {
RSPAMD_MULTIPATTERN_DEFAULT = 0,
RSPAMD_MULTIPATTERN_ICASE = (1 << 0),
const char *cache_dir,
GError **err);
+typedef void (*rspamd_multipattern_hs_cache_cb_t)(struct rspamd_multipattern *mp,
+ gboolean success,
+ GError *err,
+ void *ud);
+
+/**
+ * Compile multipattern HS database and store it in the configured HS cache backend.
+ * If Lua backend is enabled, store is done asynchronously and callback is invoked on completion.
+ * For file backend, compilation+store is synchronous and callback is invoked immediately.
+ */
+void rspamd_multipattern_compile_hs_to_cache_async(struct rspamd_multipattern *mp,
+ const char *cache_dir,
+ struct ev_loop *event_loop,
+ rspamd_multipattern_hs_cache_cb_t cb,
+ void *ud);
+
/**
* Load hyperscan database from cache file.
* This is called by workers when they receive notification that
gboolean rspamd_multipattern_load_from_cache(struct rspamd_multipattern *mp,
const char *cache_dir);
+/**
+ * Asynchronously load hyperscan database for a multipattern from the configured
+ * HS cache backend (Lua backend if present, otherwise filesystem).
+ *
+ * The callback is invoked when hot-swap has been attempted.
+ */
+void rspamd_multipattern_load_from_cache_async(struct rspamd_multipattern *mp,
+ const char *cache_dir,
+ struct ev_loop *event_loop,
+ void (*cb)(gboolean success, void *ud),
+ void *ud);
+
#ifdef __cplusplus
}
#endif
${CMAKE_CURRENT_SOURCE_DIR}/lua_compress.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_archive.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_classnames.c
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_shingles.cxx
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_hyperscan.cxx)
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_shingles.cxx)
SET(RSPAMD_LUA ${LUASRC} PARENT_SCOPE)
luaopen_compress(L);
luaopen_libarchive(L);
luaopen_shingle(L);
- luaopen_hyperscan(L);
#ifndef WITH_LUAJIT
rspamd_lua_add_preload(L, "bit", luaopen_bit);
lua_settop(L, 0);
/* libarchive-based archive module */
void luaopen_libarchive(lua_State *L);
-/* Hyperscan module */
-void luaopen_hyperscan(lua_State *L);
-
void rspamd_lua_dostring(const char *line);
double rspamd_lua_normalize(struct rspamd_config *cfg,
+++ /dev/null
-/*
- * Copyright 2026 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "lua_common.h"
-
-#ifdef WITH_HYPERSCAN
-#include "hs.h"
-#include "libserver/hyperscan_tools.h"
-#include "cryptobox.h"
-#include <vector>
-#include <cstring>
-
-/***
- * @module rspamd_hyperscan
- * Rspamd hyperscan module provides Lua bindings for Hyperscan pattern matching.
- * This module exposes compilation, serialization, and validation functions
- * for hyperscan databases.
- *
- * @example
-local rspamd_hyperscan = require "rspamd_hyperscan"
-
--- Check if hyperscan is available
-if rspamd_hyperscan.has_hyperscan() then
- -- Get platform identifier
- local platform_id = rspamd_hyperscan.platform_id()
-
- -- Compile patterns
- local patterns = {"pattern1", "pattern2"}
- local flags = {0, 0} -- HS_FLAG_* values
- local ids = {1, 2}
- local db, err = rspamd_hyperscan.compile(patterns, flags, ids)
-
- if db then
- -- Serialize to binary blob
- local blob = rspamd_hyperscan.serialize(db)
-
- -- Validate blob
- local valid, err = rspamd_hyperscan.validate(blob)
-
- -- Deserialize back
- local db2 = rspamd_hyperscan.deserialize(blob)
- end
-end
- */
-
-/* Database magic for unified format */
-static const unsigned char rspamd_hs_magic[] = {'r', 's', 'h', 's', 'r', 'e', '1', '1'};
-#define RSPAMD_HS_MAGIC_LEN (sizeof(rspamd_hs_magic))
-
-/* Userdata wrapper for hs_database_t */
-struct lua_hs_db {
- hs_database_t *db;
- hs_scratch_t *scratch;
-};
-
-#define LUA_HS_DB "rspamd{hyperscan_db}"
-
-static struct lua_hs_db *
-lua_check_hs_db(lua_State *L, int idx)
-{
- void *ud = rspamd_lua_check_udata(L, idx, LUA_HS_DB);
- luaL_argcheck(L, ud != NULL, idx, "'hyperscan_db' expected");
- return (struct lua_hs_db *) ud;
-}
-
-/***
- * @function rspamd_hyperscan.has_hyperscan()
- * Check if hyperscan support is available
- * @return {boolean} true if hyperscan is available
- */
-static int
-lua_hyperscan_has_hyperscan(lua_State *L)
-{
- lua_pushboolean(L, true);
- return 1;
-}
-
-/***
- * @function rspamd_hyperscan.platform_id()
- * Get platform identifier string for cache keys
- * @return {string} platform identifier including HS version, CPU features, etc.
- */
-static int
-lua_hyperscan_platform_id(lua_State *L)
-{
- const char *pid = rspamd_hyperscan_get_platform_id();
- lua_pushstring(L, pid);
- return 1;
-}
-
-/***
- * @function rspamd_hyperscan.compile(patterns, flags, ids)
- * Compile patterns into a hyperscan database
- * @param {table} patterns array of pattern strings
- * @param {table} flags array of HS_FLAG_* values (one per pattern)
- * @param {table} ids array of pattern IDs (one per pattern)
- * @return {hyperscan_db,nil} database object or nil on error
- * @return {nil,string} nil and error message on failure
- */
-static int
-lua_hyperscan_compile(lua_State *L)
-{
- if (!lua_istable(L, 1)) {
- return luaL_error(L, "patterns must be a table");
- }
-
- size_t npat = rspamd_lua_table_size(L, 1);
- if (npat == 0) {
- lua_pushnil(L);
- lua_pushstring(L, "no patterns provided");
- return 2;
- }
-
- std::vector<const char *> patterns(npat);
- std::vector<std::string> pattern_storage(npat);
- std::vector<unsigned int> flags(npat, 0);
- std::vector<unsigned int> ids(npat);
-
- /* Extract patterns */
- for (size_t i = 0; i < npat; i++) {
- lua_rawgeti(L, 1, i + 1);
- if (lua_isstring(L, -1)) {
- size_t len;
- const char *pat = lua_tolstring(L, -1, &len);
- pattern_storage[i] = std::string(pat, len);
- patterns[i] = pattern_storage[i].c_str();
- }
- else {
- lua_pop(L, 1);
- lua_pushnil(L);
- lua_pushfstring(L, "pattern %d is not a string", (int) (i + 1));
- return 2;
- }
- lua_pop(L, 1);
- ids[i] = i;
- }
-
- /* Extract flags if provided */
- if (lua_istable(L, 2)) {
- for (size_t i = 0; i < npat; i++) {
- lua_rawgeti(L, 2, i + 1);
- if (lua_isnumber(L, -1)) {
- flags[i] = lua_tointeger(L, -1);
- }
- lua_pop(L, 1);
- }
- }
-
- /* Extract IDs if provided */
- if (lua_istable(L, 3)) {
- for (size_t i = 0; i < npat; i++) {
- lua_rawgeti(L, 3, i + 1);
- if (lua_isnumber(L, -1)) {
- ids[i] = lua_tointeger(L, -1);
- }
- lua_pop(L, 1);
- }
- }
-
- hs_database_t *db = nullptr;
- hs_compile_error_t *compile_err = nullptr;
-
- hs_error_t err = hs_compile_multi(
- patterns.data(),
- flags.data(),
- ids.data(),
- npat,
- HS_MODE_BLOCK,
- nullptr,
- &db,
- &compile_err);
-
- if (err != HS_SUCCESS) {
- const char *err_msg = compile_err ? compile_err->message : "unknown error";
- lua_pushnil(L);
- if (compile_err && compile_err->expression >= 0) {
- lua_pushfstring(L, "compile error at pattern %d: %s",
- compile_err->expression, err_msg);
- }
- else {
- lua_pushfstring(L, "compile error: %s", err_msg);
- }
- if (compile_err) {
- hs_free_compile_error(compile_err);
- }
- return 2;
- }
-
- /* Allocate scratch for matching */
- hs_scratch_t *scratch = nullptr;
- err = hs_alloc_scratch(db, &scratch);
- if (err != HS_SUCCESS) {
- hs_free_database(db);
- lua_pushnil(L);
- lua_pushstring(L, "failed to allocate scratch space");
- return 2;
- }
-
- /* Create userdata */
- auto *ud = (struct lua_hs_db *) lua_newuserdata(L, sizeof(struct lua_hs_db));
- ud->db = db;
- ud->scratch = scratch;
- rspamd_lua_setclass(L, LUA_HS_DB, -1);
-
- return 1;
-}
-
-/***
- * @function rspamd_hyperscan.serialize(db, [ids, flags])
- * Serialize a hyperscan database to binary blob with unified header
- * @param {hyperscan_db} db database to serialize
- * @param {table} ids optional array of pattern IDs to include in header
- * @param {table} flags optional array of pattern flags to include in header
- * @return {text} serialized database as rspamd_text or nil on error
- */
-static int
-lua_hyperscan_serialize(lua_State *L)
-{
- struct lua_hs_db *db = lua_check_hs_db(L, 1);
- if (!db || !db->db) {
- lua_pushnil(L);
- lua_pushstring(L, "invalid database");
- return 2;
- }
-
- /* Serialize database first - hyperscan allocates the buffer */
- char *ser_bytes = nullptr;
- size_t ser_size = 0;
- hs_error_t err = hs_serialize_database(db->db, &ser_bytes, &ser_size);
- if (err != HS_SUCCESS) {
- lua_pushnil(L);
- lua_pushstring(L, "failed to serialize database");
- return 2;
- }
-
- /* Get platform info */
- hs_platform_info_t plt;
- err = hs_populate_platform(&plt);
- if (err != HS_SUCCESS) {
- free(ser_bytes);
- lua_pushnil(L);
- lua_pushstring(L, "failed to get platform info");
- return 2;
- }
-
- /* Extract IDs and flags if provided */
- std::vector<unsigned int> ids;
- std::vector<unsigned int> hs_flags;
-
- if (lua_istable(L, 2)) {
- size_t n = rspamd_lua_table_size(L, 2);
- ids.resize(n);
- for (size_t i = 0; i < n; i++) {
- lua_rawgeti(L, 2, i + 1);
- ids[i] = lua_isnumber(L, -1) ? lua_tointeger(L, -1) : i;
- lua_pop(L, 1);
- }
- }
-
- if (lua_istable(L, 3)) {
- size_t n = rspamd_lua_table_size(L, 3);
- hs_flags.resize(n);
- for (size_t i = 0; i < n; i++) {
- lua_rawgeti(L, 3, i + 1);
- hs_flags[i] = lua_isnumber(L, -1) ? lua_tointeger(L, -1) : 0;
- lua_pop(L, 1);
- }
- }
-
- int n = (int) ids.size();
-
- /* Calculate total size */
- size_t header_size = RSPAMD_HS_MAGIC_LEN +
- sizeof(plt) +
- sizeof(n) +
- (n > 0 ? sizeof(unsigned int) * n * 2 : 0) +
- sizeof(uint64_t); /* CRC */
-
- size_t total_size = header_size + ser_size;
-
- /* Allocate buffer */
- auto *text = static_cast<struct rspamd_lua_text *>(
- lua_newuserdata(L, sizeof(struct rspamd_lua_text)));
- rspamd_lua_setclass(L, rspamd_text_classname, -1);
-
- char *buf = static_cast<char *>(g_malloc(total_size));
- text->start = buf;
- text->len = total_size;
- text->flags = RSPAMD_TEXT_FLAG_OWN;
-
- /* Write header */
- char *p = buf;
-
- /* Magic */
- memcpy(p, rspamd_hs_magic, RSPAMD_HS_MAGIC_LEN);
- p += RSPAMD_HS_MAGIC_LEN;
-
- /* Platform */
- memcpy(p, &plt, sizeof(plt));
- p += sizeof(plt);
-
- /* Count */
- memcpy(p, &n, sizeof(n));
- p += sizeof(n);
-
- /* IDs - remember position for CRC */
- char *ids_start = p;
- if (n > 0) {
- memcpy(p, ids.data(), sizeof(unsigned int) * n);
- p += sizeof(unsigned int) * n;
-
- /* Flags */
- if (hs_flags.size() == ids.size()) {
- memcpy(p, hs_flags.data(), sizeof(unsigned int) * n);
- }
- else {
- memset(p, 0, sizeof(unsigned int) * n);
- }
- p += sizeof(unsigned int) * n;
- }
-
- /* Calculate CRC over IDs + flags + HS blob (compatible with re_cache.c) */
- rspamd_cryptobox_fast_hash_state_t crc_st;
- rspamd_cryptobox_fast_hash_init(&crc_st, 0xdeadbabe);
- if (n > 0) {
- rspamd_cryptobox_fast_hash_update(&crc_st, ids_start, sizeof(unsigned int) * n);
- rspamd_cryptobox_fast_hash_update(&crc_st, ids_start + sizeof(unsigned int) * n,
- sizeof(unsigned int) * n);
- }
- rspamd_cryptobox_fast_hash_update(&crc_st, ser_bytes, ser_size);
- uint64_t crc = rspamd_cryptobox_fast_hash_final(&crc_st);
-
- memcpy(p, &crc, sizeof(crc));
- p += sizeof(crc);
-
- /* Copy serialized database */
- memcpy(p, ser_bytes, ser_size);
-
- /* Free hyperscan-allocated buffer (use free(), not g_free()) */
- free(ser_bytes);
-
- return 1;
-}
-
-/***
- * @function rspamd_hyperscan.validate(blob)
- * Validate a serialized hyperscan database blob
- * @param {text|string} blob serialized database
- * @return {boolean} true if valid
- * @return {string} error message if invalid
- */
-static int
-lua_hyperscan_validate(lua_State *L)
-{
- const char *data = nullptr;
- size_t len = 0;
- struct rspamd_lua_text *t;
-
- if (lua_isstring(L, 1)) {
- data = lua_tolstring(L, 1, &len);
- }
- else if ((t = (struct rspamd_lua_text *) rspamd_lua_check_udata_maybe(L, 1, rspamd_text_classname))) {
- data = t->start;
- len = t->len;
- }
- else {
- return luaL_error(L, "blob must be a string or text");
- }
-
- if (len < RSPAMD_HS_MAGIC_LEN) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "blob too small");
- return 2;
- }
-
- /* Check magic */
- if (memcmp(data, rspamd_hs_magic, RSPAMD_HS_MAGIC_LEN) != 0) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "invalid magic");
- return 2;
- }
-
- const char *p = data + RSPAMD_HS_MAGIC_LEN;
- const char *end = data + len;
-
- /* Check platform */
- if ((size_t) (end - p) < sizeof(hs_platform_info_t)) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "truncated platform info");
- return 2;
- }
-
- hs_platform_info_t stored_plt;
- memcpy(&stored_plt, p, sizeof(stored_plt));
- p += sizeof(stored_plt);
-
- hs_platform_info_t cur_plt;
- if (hs_populate_platform(&cur_plt) != HS_SUCCESS) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "failed to get current platform");
- return 2;
- }
-
- /* Compare platform - tune is the most important */
- if (stored_plt.tune != cur_plt.tune) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "platform mismatch");
- return 2;
- }
-
- /* Read count */
- if ((size_t) (end - p) < sizeof(int)) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "truncated count");
- return 2;
- }
-
- int n;
- memcpy(&n, p, sizeof(n));
- p += sizeof(n);
-
- if (n < 0) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "invalid pattern count");
- return 2;
- }
-
- /* Remember start of IDs for CRC calculation */
- const char *ids_start = p;
- size_t arrays_size = (n > 0) ? sizeof(unsigned int) * n * 2 : 0;
- if ((size_t) (end - p) < arrays_size + sizeof(uint64_t)) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "truncated arrays or CRC");
- return 2;
- }
-
- p += arrays_size;
-
- /* Verify CRC (over IDs + flags + HS blob, compatible with re_cache.c) */
- uint64_t stored_crc;
- memcpy(&stored_crc, p, sizeof(stored_crc));
- p += sizeof(stored_crc);
-
- const char *hs_blob = p;
- size_t hs_len = end - p;
-
- rspamd_cryptobox_fast_hash_state_t crc_st;
- rspamd_cryptobox_fast_hash_init(&crc_st, 0xdeadbabe);
- if (n > 0) {
- rspamd_cryptobox_fast_hash_update(&crc_st, ids_start, sizeof(unsigned int) * n);
- rspamd_cryptobox_fast_hash_update(&crc_st, ids_start + sizeof(unsigned int) * n,
- sizeof(unsigned int) * n);
- }
- rspamd_cryptobox_fast_hash_update(&crc_st, hs_blob, hs_len);
- uint64_t calc_crc = rspamd_cryptobox_fast_hash_final(&crc_st);
-
- if (stored_crc != calc_crc) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "CRC mismatch");
- return 2;
- }
-
- /* Validate hyperscan portion */
- if (hs_len == 0) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "empty hyperscan database");
- return 2;
- }
-
- hs_database_t *test_db = nullptr;
- hs_error_t err = hs_deserialize_database(p, hs_len, &test_db);
- if (err != HS_SUCCESS) {
- lua_pushboolean(L, false);
- lua_pushfstring(L, "hyperscan deserialize failed: %d", err);
- return 2;
- }
-
- hs_free_database(test_db);
- lua_pushboolean(L, true);
- return 1;
-}
-
-/***
- * @function rspamd_hyperscan.deserialize(blob)
- * Deserialize a hyperscan database from blob
- * @param {text|string} blob serialized database
- * @return {hyperscan_db} database object or nil
- * @return {string} error message on failure
- */
-static int
-lua_hyperscan_deserialize(lua_State *L)
-{
- const char *data = nullptr;
- size_t len = 0;
- struct rspamd_lua_text *t;
-
- if (lua_isstring(L, 1)) {
- data = lua_tolstring(L, 1, &len);
- }
- else if ((t = (struct rspamd_lua_text *) rspamd_lua_check_udata_maybe(L, 1, rspamd_text_classname))) {
- data = t->start;
- len = t->len;
- }
- else {
- return luaL_error(L, "blob must be a string or text");
- }
-
- /* Validate first */
- if (len < RSPAMD_HS_MAGIC_LEN) {
- lua_pushnil(L);
- lua_pushstring(L, "blob too small");
- return 2;
- }
-
- if (memcmp(data, rspamd_hs_magic, RSPAMD_HS_MAGIC_LEN) != 0) {
- lua_pushnil(L);
- lua_pushstring(L, "invalid magic");
- return 2;
- }
-
- const char *p = data + RSPAMD_HS_MAGIC_LEN;
- const char *end = data + len;
-
- /* Skip platform */
- p += sizeof(hs_platform_info_t);
-
- /* Read count */
- int n;
- memcpy(&n, p, sizeof(n));
- p += sizeof(n);
-
- /* Skip IDs and flags */
- if (n > 0) {
- p += sizeof(unsigned int) * n * 2;
- }
-
- /* Skip CRC */
- p += sizeof(uint64_t);
-
- /* Deserialize hyperscan database */
- size_t hs_len = end - p;
- hs_database_t *db = nullptr;
-
- hs_error_t err = hs_deserialize_database(p, hs_len, &db);
- if (err != HS_SUCCESS) {
- lua_pushnil(L);
- lua_pushfstring(L, "deserialize failed: %d", err);
- return 2;
- }
-
- /* Allocate scratch */
- hs_scratch_t *scratch = nullptr;
- err = hs_alloc_scratch(db, &scratch);
- if (err != HS_SUCCESS) {
- hs_free_database(db);
- lua_pushnil(L);
- lua_pushstring(L, "failed to allocate scratch");
- return 2;
- }
-
- /* Create userdata */
- auto *ud = (struct lua_hs_db *) lua_newuserdata(L, sizeof(struct lua_hs_db));
- ud->db = db;
- ud->scratch = scratch;
- rspamd_lua_setclass(L, LUA_HS_DB, -1);
-
- return 1;
-}
-
-/* Database methods */
-static int
-lua_hs_db_gc(lua_State *L)
-{
- struct lua_hs_db *db = lua_check_hs_db(L, 1);
- if (db) {
- if (db->scratch) {
- hs_free_scratch(db->scratch);
- }
- if (db->db) {
- hs_free_database(db->db);
- }
- }
- return 0;
-}
-
-/***
- * @method hyperscan_db:match(text)
- * Match text against the database
- * @param {text|string} text to match
- * @return {table} array of {id, from, to} matches or empty table
- */
-struct match_context {
- lua_State *L;
- int match_count;
-};
-
-static int
-match_callback(unsigned int id, unsigned long long from,
- unsigned long long to, unsigned int flags, void *context)
-{
- auto *ctx = (struct match_context *) context;
- lua_State *L = ctx->L;
-
- ctx->match_count++;
-
- /* Push match table: {id=id, from=from, to=to} */
- lua_createtable(L, 0, 3);
-
- lua_pushinteger(L, id);
- lua_setfield(L, -2, "id");
-
- lua_pushinteger(L, from);
- lua_setfield(L, -2, "from");
-
- lua_pushinteger(L, to);
- lua_setfield(L, -2, "to");
-
- /* Add to result array */
- lua_rawseti(L, -2, ctx->match_count);
-
- return 0;
-}
-
-static int
-lua_hs_db_match(lua_State *L)
-{
- struct lua_hs_db *db = lua_check_hs_db(L, 1);
- if (!db || !db->db || !db->scratch) {
- lua_newtable(L);
- return 1;
- }
-
- const char *text = nullptr;
- size_t len = 0;
- struct rspamd_lua_text *t;
-
- if (lua_isstring(L, 2)) {
- text = lua_tolstring(L, 2, &len);
- }
- else if ((t = (struct rspamd_lua_text *) rspamd_lua_check_udata_maybe(L, 2, rspamd_text_classname))) {
- text = t->start;
- len = t->len;
- }
- else {
- lua_newtable(L);
- return 1;
- }
-
- /* Create result table */
- lua_newtable(L);
-
- struct match_context ctx;
- ctx.L = L;
- ctx.match_count = 0;
-
- hs_scan(db->db, text, len, 0, db->scratch, match_callback, &ctx);
-
- return 1;
-}
-
-static const struct luaL_reg hyperscanlib_f[] = {
- LUA_INTERFACE_DEF(hyperscan, has_hyperscan),
- LUA_INTERFACE_DEF(hyperscan, platform_id),
- LUA_INTERFACE_DEF(hyperscan, compile),
- LUA_INTERFACE_DEF(hyperscan, serialize),
- LUA_INTERFACE_DEF(hyperscan, validate),
- LUA_INTERFACE_DEF(hyperscan, deserialize),
- {NULL, NULL}};
-
-static const struct luaL_reg hs_db_m[] = {
- LUA_INTERFACE_DEF(hs_db, match),
- {"__gc", lua_hs_db_gc},
- {"__tostring", rspamd_lua_class_tostring},
- {NULL, NULL}};
-
-static int
-lua_load_hyperscan(lua_State *L)
-{
- lua_newtable(L);
-
- /* Hyperscan flags */
- lua_pushstring(L, "flags");
- lua_newtable(L);
- lua_pushinteger(L, HS_FLAG_CASELESS);
- lua_setfield(L, -2, "caseless");
- lua_pushinteger(L, HS_FLAG_DOTALL);
- lua_setfield(L, -2, "dotall");
- lua_pushinteger(L, HS_FLAG_MULTILINE);
- lua_setfield(L, -2, "multiline");
- lua_pushinteger(L, HS_FLAG_SINGLEMATCH);
- lua_setfield(L, -2, "singlematch");
- lua_pushinteger(L, HS_FLAG_UTF8);
- lua_setfield(L, -2, "utf8");
- lua_pushinteger(L, HS_FLAG_UCP);
- lua_setfield(L, -2, "ucp");
- lua_pushinteger(L, HS_FLAG_SOM_LEFTMOST);
- lua_setfield(L, -2, "som_leftmost");
- lua_settable(L, -3);
-
- luaL_register(L, NULL, hyperscanlib_f);
-
- return 1;
-}
-
-#else /* !WITH_HYPERSCAN */
-
-static int
-lua_hyperscan_has_hyperscan(lua_State *L)
-{
- lua_pushboolean(L, false);
- return 1;
-}
-
-static int
-lua_hyperscan_not_available(lua_State *L)
-{
- return luaL_error(L, "hyperscan support is not available");
-}
-
-static const struct luaL_reg hyperscanlib_f[] = {
- LUA_INTERFACE_DEF(hyperscan, has_hyperscan),
- {"platform_id", lua_hyperscan_not_available},
- {"compile", lua_hyperscan_not_available},
- {"serialize", lua_hyperscan_not_available},
- {"validate", lua_hyperscan_not_available},
- {"deserialize", lua_hyperscan_not_available},
- {NULL, NULL}};
-
-static int
-lua_load_hyperscan(lua_State *L)
-{
- lua_newtable(L);
- luaL_register(L, NULL, hyperscanlib_f);
- return 1;
-}
-
-#endif /* WITH_HYPERSCAN */
-
-extern "C" void luaopen_hyperscan(lua_State *L)
-{
-#ifdef WITH_HYPERSCAN
- rspamd_lua_new_class(L, LUA_HS_DB, hs_db_m);
- lua_pop(L, 1);
-#endif
- rspamd_lua_add_preload(L, "rspamd_hyperscan", lua_load_hyperscan);
-}
}
}
else {
- if (w->hb.is_busy && w->hb.busy_reason[0]) {
- msg_warn_main("terminate worker %s(%P) with SIGKILL; "
- "worker was busy: %s",
- g_quark_to_string(w->type), w->pid,
- w->hb.busy_reason);
- }
- else {
- msg_warn_main("terminate worker %s(%P) with SIGKILL",
- g_quark_to_string(w->type), w->pid);
- }
+ msg_warn_main("terminate worker %s(%P) with SIGKILL",
+ g_quark_to_string(w->type), w->pid);
}
}
else {
return;
}
else {
- if (w->hb.is_busy && w->hb.busy_reason[0]) {
- msg_err_main("data corruption warning: terminating "
- "special worker %s(%P) with SIGKILL; "
- "worker was busy: %s",
- g_quark_to_string(w->type), w->pid,
- w->hb.busy_reason);
- }
- else {
- msg_err_main("data corruption warning: terminating "
- "special worker %s(%P) with SIGKILL",
- g_quark_to_string(w->type), w->pid);
- }
+ msg_err_main("data corruption warning: terminating "
+ "special worker %s(%P) with SIGKILL",
+ g_quark_to_string(w->type), w->pid);
}
}
}
rspamd_attach_worker(rspamd_main, cur);
}
-static void
-rspamd_log_pending_worker(gpointer key, gpointer value, gpointer ud)
-{
- struct rspamd_worker *w = (struct rspamd_worker *) value;
- struct rspamd_main *rspamd_main = w->srv;
-
- if (w->hb.is_busy && w->hb.busy_reason[0]) {
- msg_info_main(" - %s(%P): busy with %s",
- g_quark_to_string(w->type), w->pid,
- w->hb.busy_reason);
- }
- else {
- msg_info_main(" - %s(%P): shutting down",
- g_quark_to_string(w->type), w->pid);
- }
-}
-
-/* Soft monitoring timer - logs shutdown status periodically */
-static void
-rspamd_shutdown_monitor_handler(EV_P_ ev_timer *w, int revents)
-{
- struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
- unsigned int nworkers = g_hash_table_size(rspamd_main->workers);
-
- if (nworkers > 0) {
- msg_info_main("shutdown: waiting for %d worker(s):", nworkers);
- g_hash_table_foreach(rspamd_main->workers, rspamd_log_pending_worker, NULL);
- }
- else {
- ev_timer_stop(EV_A_ w);
- }
-}
-
static void
rspamd_final_timer_handler(EV_P_ ev_timer *w, int revents)
{
term_attempts--;
- /* Log pending workers when we're about to force kill them */
- if (term_attempts == 0 && g_hash_table_size(rspamd_main->workers) > 0) {
- msg_warn_main("shutdown timeout reached, %d worker(s) still running - sending SIGKILL",
- (int) g_hash_table_size(rspamd_main->workers));
- }
-
g_hash_table_foreach(rspamd_main->workers, hash_worker_wait_callback,
NULL);
}
/* Signal handlers */
-#define SHUTDOWN_MONITOR_INITIAL 1.0
-#define SHUTDOWN_MONITOR_REPEAT 10.0
-
static void
rspamd_term_handler(struct ev_loop *loop, ev_signal *w, int revents)
{
struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
static ev_timer ev_finale;
- static ev_timer ev_monitor;
ev_tstamp shutdown_ts;
if (!rspamd_main->wanna_die) {
ev_timer_init(&ev_finale, rspamd_final_timer_handler,
TERMINATION_INTERVAL, TERMINATION_INTERVAL);
ev_timer_start(rspamd_main->event_loop, &ev_finale);
-
- ev_monitor.data = rspamd_main;
- ev_timer_init(&ev_monitor, rspamd_shutdown_monitor_handler,
- SHUTDOWN_MONITOR_INITIAL, SHUTDOWN_MONITOR_REPEAT);
- ev_timer_start(rspamd_main->event_loop, &ev_monitor);
}
}
ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */
ev_tstamp last_event; /**< last heartbeat received timestamp */
int64_t nbeats; /**< positive for beats received, negative for beats missed */
- gboolean is_busy; /**< worker is doing long-running operation, skip heartbeat checks */
- char busy_reason[32]; /**< reason for being busy (for logging) */
+ gboolean is_busy;
+ char busy_reason[32];
};
enum rspamd_worker_state {