From: Vsevolod Stakhov Date: Fri, 2 Jan 2026 09:37:44 +0000 (+0000) Subject: [Feature] Add pluggable hyperscan cache storage infrastructure X-Git-Tag: 4.0.0~208^2~30 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b65cd450df05e21bf57acc662ffd21ff13279187;p=thirdparty%2Frspamd.git [Feature] Add pluggable hyperscan cache storage infrastructure This commit adds infrastructure for pluggable hyperscan cache storage backends and FD-based shared memory distribution: - Add platform ID function (rspamd_hyperscan_get_platform_id) for platform-aware cache keys - Create lua_hs_cache.lua with file, Redis, and HTTP backends - Add FD-based loading APIs (rspamd_hyperscan_from_fd, rspamd_hyperscan_create_shared_unser) - Add fd_size field to control messages for FD passing - Update worker to handle attached FDs in hyperscan notifications - Add cache_backend configuration option to hs_helper --- diff --git a/lualib/lua_hs_cache.lua b/lualib/lua_hs_cache.lua new file mode 100644 index 0000000000..fe986746ef --- /dev/null +++ b/lualib/lua_hs_cache.lua @@ -0,0 +1,470 @@ +--[[ +Copyright (c) 2026, Vsevolod Stakhov + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +--[[ +Pluggable Hyperscan cache storage backends. + +This module provides a unified interface for storing and loading serialized +Hyperscan databases from various backends (files, Redis, HTTP). + +Usage: + local hs_cache = require "lua_hs_cache" + local backend = hs_cache.create_backend(config) + backend:load(cache_key, platform_id, function(err, data) ... end) + backend:store(cache_key, platform_id, data, ttl, function(err) ... end) +]]-- + +local logger = require "rspamd_logger" +local rspamd_util = require "rspamd_util" +local lua_redis = require "lua_redis" +local rspamd_http = require "rspamd_http" + +local exports = {} +local N = "lua_hs_cache" + +--[[ +Backend interface definition (for documentation): + +backend = { + -- Check if cache entry exists + -- callback(err, exists: boolean, metadata: table|nil) + exists = function(self, cache_key, platform_id, callback) end, + + -- Load serialized database + -- callback(err, data: string|nil) + load = function(self, cache_key, platform_id, callback) end, + + -- Store serialized database + -- callback(err) + store = function(self, cache_key, platform_id, data, ttl, callback) end, + + -- Delete cache entry + -- callback(err) + delete = function(self, cache_key, platform_id, callback) end, +} +]]-- + +------------------------------------------------------------------------------- +-- File Backend +------------------------------------------------------------------------------- +local file_backend = {} +file_backend.__index = file_backend + +function file_backend.new(config) + local self = setmetatable({}, file_backend) + self.cache_dir = config.cache_dir or '/var/lib/rspamd/hs_cache' + self.platform_dirs = config.platform_dirs ~= false -- Create platform subdirs by default + return self +end + +function file_backend:_get_path(cache_key, platform_id) + if self.platform_dirs then + return string.format("%s/%s/%s.hs", self.cache_dir, platform_id, cache_key) + else + return string.format("%s/%s_%s.hs", self.cache_dir, platform_id, cache_key) + end +end + +function file_backend:_ensure_dir(path) + local dir = path:match("(.*/)") + if dir then + -- Create directory if it doesn't exist + local ok, err = rspamd_util.mkdir(dir, true) + if not ok and err then + logger.warnx(N, "failed to create directory %s: %s", dir, err) + end + end +end + +function file_backend:exists(cache_key, platform_id, callback) + local path = self:_get_path(cache_key, platform_id) + local stat = rspamd_util.stat(path) + + if stat then + callback(nil, true, { size = stat.size, mtime = stat.mtime }) + else + callback(nil, false, nil) + end +end + +function file_backend:load(cache_key, platform_id, callback) + local path = self:_get_path(cache_key, platform_id) + + local data, err = rspamd_util.read_file(path) + if data then + logger.debugx(N, "loaded %d bytes from %s", #data, path) + callback(nil, data) + else + callback(err or "file not found", nil) + end +end + +function file_backend:store(cache_key, platform_id, data, _ttl, callback) + local path = self:_get_path(cache_key, platform_id) + + self:_ensure_dir(path) + + -- Write to temp file first, then rename atomically + local tmp_path = path .. ".tmp." .. rspamd_util.random_hex(8) + local ok, err = rspamd_util.write_file(tmp_path, data) + + if ok then + local renamed, rename_err = os.rename(tmp_path, path) + if renamed then + logger.debugx(N, "stored %d bytes to %s", #data, path) + callback(nil) + else + os.remove(tmp_path) + callback(rename_err or "rename failed") + end + else + callback(err or "write failed") + end +end + +function file_backend:delete(cache_key, platform_id, callback) + local path = self:_get_path(cache_key, platform_id) + local ok, err = os.remove(path) + + if ok then + logger.debugx(N, "deleted %s", path) + callback(nil) + else + callback(err or "delete failed") + end +end + +------------------------------------------------------------------------------- +-- Redis Backend +------------------------------------------------------------------------------- +local redis_backend = {} +redis_backend.__index = redis_backend + +function redis_backend.new(config) + local self = setmetatable({}, redis_backend) + self.redis_params = lua_redis.parse_redis_server('hyperscan', config) + if not self.redis_params then + self.redis_params = lua_redis.parse_redis_server(nil, config) + end + self.prefix = config.prefix or 'rspamd_hs' + self.default_ttl = config.ttl or (86400 * 30) -- 30 days default + self.refresh_ttl = config.refresh_ttl ~= false -- Refresh TTL on read by default + self.use_compression = config.compression ~= false -- zstd compression by default + return self +end + +function redis_backend:_get_key(cache_key, platform_id) + return string.format("%s:%s:%s", self.prefix, platform_id, cache_key) +end + +function redis_backend:exists(cache_key, platform_id, callback) + local key = self:_get_key(cache_key, platform_id) + + if not self.redis_params then + callback("redis not configured", false, nil) + return + end + + lua_redis.request(self.redis_params, nil, { + cmd = 'EXISTS', + args = { key }, + callback = function(err, data) + if err then + callback(err, false, nil) + else + callback(nil, data == 1, nil) + end + end + }) +end + +function redis_backend:load(cache_key, platform_id, callback) + local key = self:_get_key(cache_key, platform_id) + + if not self.redis_params then + callback("redis not configured", nil) + return + end + + -- Use GETEX to refresh TTL on read if enabled + local cmd, args + if self.refresh_ttl then + cmd = 'GETEX' + args = { key, 'EX', tostring(self.default_ttl) } + else + cmd = 'GET' + args = { key } + end + + lua_redis.request(self.redis_params, nil, { + cmd = cmd, + args = args, + callback = function(err, data) + if err then + callback(err, nil) + elseif not data then + callback("not found", nil) + else + -- Decompress if needed + if self.use_compression then + local decompressed, decompress_err = rspamd_util.zstd_decompress(data) + if decompressed then + logger.debugx(N, "loaded and decompressed %d -> %d bytes from redis key %s", + #data, #decompressed, key) + callback(nil, decompressed) + else + callback(decompress_err or "decompression failed", nil) + end + else + logger.debugx(N, "loaded %d bytes from redis key %s", #data, key) + callback(nil, data) + end + end + end + }) +end + +function redis_backend:store(cache_key, platform_id, data, ttl, callback) + local key = self:_get_key(cache_key, platform_id) + local actual_ttl = ttl or self.default_ttl + + if not self.redis_params then + callback("redis not configured") + return + end + + local store_data = data + -- Compress if enabled + if self.use_compression then + local compressed, compress_err = rspamd_util.zstd_compress(data) + if compressed then + logger.debugx(N, "compressed %d -> %d bytes (%.1f%% reduction)", + #data, #compressed, (1 - #compressed / #data) * 100) + store_data = compressed + else + logger.warnx(N, "compression failed: %s, storing uncompressed", compress_err) + end + end + + lua_redis.request(self.redis_params, nil, { + cmd = 'SETEX', + args = { key, tostring(actual_ttl), store_data }, + callback = function(err) + if err then + callback(err) + else + logger.debugx(N, "stored %d bytes to redis key %s with TTL %d", + #store_data, key, actual_ttl) + callback(nil) + end + end + }) +end + +function redis_backend:delete(cache_key, platform_id, callback) + local key = self:_get_key(cache_key, platform_id) + + if not self.redis_params then + callback("redis not configured") + return + end + + lua_redis.request(self.redis_params, nil, { + cmd = 'DEL', + args = { key }, + callback = function(err) + if err then + callback(err) + else + logger.debugx(N, "deleted redis key %s", key) + callback(nil) + end + end + }) +end + +------------------------------------------------------------------------------- +-- HTTP Backend +------------------------------------------------------------------------------- +local http_backend = {} +http_backend.__index = http_backend + +function http_backend.new(config) + local self = setmetatable({}, http_backend) + self.base_url = config.base_url or config.url + self.timeout = config.timeout or 30 + self.auth_header = config.auth_header + self.auth_value = config.auth_value + self.use_compression = config.compression ~= false + return self +end + +function http_backend:_get_url(cache_key, platform_id) + return string.format("%s/%s/%s", self.base_url, platform_id, cache_key) +end + +function http_backend:_get_headers() + local headers = {} + if self.auth_header and self.auth_value then + headers[self.auth_header] = self.auth_value + end + return headers +end + +function http_backend:exists(cache_key, platform_id, callback) + local url = self:_get_url(cache_key, platform_id) + + rspamd_http.request({ + url = url, + method = 'HEAD', + headers = self:_get_headers(), + timeout = self.timeout, + callback = function(err, code, _, headers) + if err then + callback(err, false, nil) + elseif code == 200 then + local size = headers and headers['content-length'] + callback(nil, true, { size = tonumber(size) }) + else + callback(nil, false, nil) + end + end + }) +end + +function http_backend:load(cache_key, platform_id, callback) + local url = self:_get_url(cache_key, platform_id) + + rspamd_http.request({ + url = url, + method = 'GET', + headers = self:_get_headers(), + timeout = self.timeout, + callback = function(err, code, body, headers) + if err then + callback(err, nil) + elseif code == 200 and body then + -- Check if content is compressed + local content_encoding = headers and headers['content-encoding'] + if content_encoding == 'zstd' or self.use_compression then + local decompressed = rspamd_util.zstd_decompress(body) + if decompressed then + callback(nil, decompressed) + else + -- Maybe it wasn't compressed after all + callback(nil, body) + end + else + callback(nil, body) + end + elseif code == 404 then + callback("not found", nil) + else + callback(string.format("HTTP %d", code), nil) + end + end + }) +end + +function http_backend:store(cache_key, platform_id, data, ttl, callback) + local url = self:_get_url(cache_key, platform_id) + local headers = self:_get_headers() + + local store_data = data + if self.use_compression then + local compressed = rspamd_util.zstd_compress(data) + if compressed then + store_data = compressed + headers['Content-Encoding'] = 'zstd' + end + end + + if ttl then + headers['X-TTL'] = tostring(ttl) + end + + rspamd_http.request({ + url = url, + method = 'PUT', + headers = headers, + body = store_data, + timeout = self.timeout, + callback = function(err, code) + if err then + callback(err) + elseif code >= 200 and code < 300 then + callback(nil) + else + callback(string.format("HTTP %d", code)) + end + end + }) +end + +function http_backend:delete(cache_key, platform_id, callback) + local url = self:_get_url(cache_key, platform_id) + + rspamd_http.request({ + url = url, + method = 'DELETE', + headers = self:_get_headers(), + timeout = self.timeout, + callback = function(err, code) + if err then + callback(err) + elseif code >= 200 and code < 300 or code == 404 then + callback(nil) + else + callback(string.format("HTTP %d", code)) + end + end + }) +end + +------------------------------------------------------------------------------- +-- Backend Factory +------------------------------------------------------------------------------- + +-- Create a backend instance based on configuration +-- @param config table with: +-- - backend: "file"|"redis"|"http" (default: "file") +-- - cache_dir: directory for file backend +-- - redis: redis configuration table +-- - http: http configuration table +-- @return backend instance +function exports.create_backend(config) + local backend_type = config.backend or config.cache_backend or 'file' + + if backend_type == 'file' then + return file_backend.new(config) + elseif backend_type == 'redis' then + local redis_config = config.redis or config + return redis_backend.new(redis_config) + elseif backend_type == 'http' then + local http_config = config.http or config + return http_backend.new(http_config) + else + logger.errx(N, "unknown hyperscan cache backend: %s, falling back to file", backend_type) + return file_backend.new(config) + end +end + +-- Export individual backend constructors for direct use +exports.file_backend = file_backend +exports.redis_backend = redis_backend +exports.http_backend = http_backend + +return exports diff --git a/src/hs_helper.c b/src/hs_helper.c index 5a3d69bcf5..1464b9628e 100644 --- a/src/hs_helper.c +++ b/src/hs_helper.c @@ -19,8 +19,13 @@ #include "libserver/cfg_rcl.h" #include "libserver/worker_util.h" #include "libserver/rspamd_control.h" +#include "lua/lua_common.h" #include "unix-std.h" +#ifdef WITH_HYPERSCAN +#include "libserver/hyperscan_tools.h" +#endif + #ifdef HAVE_GLOB_H #include #endif @@ -41,6 +46,16 @@ static const double default_max_time = 1.0; static const double default_recompile_time = 60.0; static const uint64_t rspamd_hs_helper_magic = 0x22d310157a2288a0ULL; +/* + * Cache backend types + */ +enum hs_cache_backend_type { + HS_CACHE_BACKEND_FILE = 0, + HS_CACHE_BACKEND_REDIS, + HS_CACHE_BACKEND_HTTP, + HS_CACHE_BACKEND_LUA, +}; + /* * Worker's context */ @@ -59,8 +74,32 @@ struct hs_helper_ctx { double max_time; double recompile_time; ev_timer recompile_timer; + /* Cache backend configuration */ + char *cache_backend_str; /* Backend name from config: file, redis, http, lua */ + enum hs_cache_backend_type cache_backend; + ucl_object_t *cache_config; /* Backend-specific configuration */ + int lua_backend_ref; /* Lua reference to backend object */ }; +/* Parse cache_backend string to enum */ +static enum hs_cache_backend_type +rspamd_hs_parse_cache_backend(const char *backend_str) +{ + if (backend_str == NULL || g_ascii_strcasecmp(backend_str, "file") == 0) { + return HS_CACHE_BACKEND_FILE; + } + else if (g_ascii_strcasecmp(backend_str, "redis") == 0) { + return HS_CACHE_BACKEND_REDIS; + } + else if (g_ascii_strcasecmp(backend_str, "http") == 0) { + return HS_CACHE_BACKEND_HTTP; + } + else if (g_ascii_strcasecmp(backend_str, "lua") == 0) { + return HS_CACHE_BACKEND_LUA; + } + return HS_CACHE_BACKEND_FILE; +} + static gpointer init_hs_helper(struct rspamd_config *cfg) { @@ -77,6 +116,10 @@ init_hs_helper(struct rspamd_config *cfg) ctx->workers_ready = FALSE; ctx->max_time = default_max_time; ctx->recompile_time = default_recompile_time; + ctx->cache_backend_str = NULL; + ctx->cache_backend = HS_CACHE_BACKEND_FILE; + ctx->cache_config = NULL; + ctx->lua_backend_ref = LUA_NOREF; rspamd_rcl_register_worker_option(cfg, type, @@ -110,6 +153,14 @@ init_hs_helper(struct rspamd_config *cfg) G_STRUCT_OFFSET(struct hs_helper_ctx, max_time), RSPAMD_CL_FLAG_TIME_FLOAT, "Maximum time to wait for compilation of a single expression"); + rspamd_rcl_register_worker_option(cfg, + type, + "cache_backend", + rspamd_rcl_parse_struct_string, + ctx, + G_STRUCT_OFFSET(struct hs_helper_ctx, cache_backend_str), + 0, + "Cache backend: file, redis, http, or lua"); return ctx; } @@ -616,6 +667,87 @@ rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents) rspamd_rs_compile(ctx, worker, FALSE); } +/** + * Initialize the Lua cache backend + * Loads lua_hs_cache module and creates a backend instance + */ +static gboolean +rspamd_hs_helper_init_lua_backend(struct hs_helper_ctx *ctx, struct rspamd_worker *worker) +{ + lua_State *L = ctx->cfg->lua_state; + const char *backend_name; + + switch (ctx->cache_backend) { + case HS_CACHE_BACKEND_FILE: + backend_name = "file"; + break; + case HS_CACHE_BACKEND_REDIS: + backend_name = "redis"; + break; + case HS_CACHE_BACKEND_HTTP: + backend_name = "http"; + break; + case HS_CACHE_BACKEND_LUA: + backend_name = "lua"; + break; + default: + backend_name = "file"; + break; + } + + /* Load lua_hs_cache module */ + lua_getglobal(L, "require"); + lua_pushstring(L, "lua_hs_cache"); + + if (lua_pcall(L, 1, 1, 0) != 0) { + msg_err("failed to load lua_hs_cache module: %s", lua_tostring(L, -1)); + lua_pop(L, 1); + return FALSE; + } + + /* Get create_backend function */ + lua_getfield(L, -1, "create_backend"); + if (!lua_isfunction(L, -1)) { + msg_err("lua_hs_cache.create_backend is not a function"); + lua_pop(L, 2); + return FALSE; + } + + /* Create configuration table */ + lua_newtable(L); + + lua_pushstring(L, backend_name); + lua_setfield(L, -2, "backend"); + + lua_pushstring(L, ctx->hs_dir); + lua_setfield(L, -2, "cache_dir"); + + /* Add platform_id if available */ +#ifdef WITH_HYPERSCAN + const char *platform_id = rspamd_hyperscan_get_platform_id(); + if (platform_id) { + lua_pushstring(L, platform_id); + lua_setfield(L, -2, "platform_id"); + } +#endif + + /* Call create_backend(config) */ + if (lua_pcall(L, 1, 1, 0) != 0) { + msg_err("failed to create cache backend: %s", lua_tostring(L, -1)); + lua_pop(L, 2); + return FALSE; + } + + /* Store reference to backend object */ + ctx->lua_backend_ref = luaL_ref(L, LUA_REGISTRYINDEX); + + /* Pop the module table */ + lua_pop(L, 1); + + msg_info("initialized %s cache backend", backend_name); + return TRUE; +} + static void start_hs_helper(struct rspamd_worker *worker) { @@ -633,8 +765,24 @@ start_hs_helper(struct rspamd_worker *worker) ctx->hs_dir = RSPAMD_DBDIR "/"; } - msg_info("hs_helper starting: cache_dir=%s, recompile_time=%.1f, workers_ready=%s", - ctx->hs_dir, ctx->recompile_time, ctx->workers_ready ? "yes" : "no"); + /* Parse cache backend from config string */ + ctx->cache_backend = rspamd_hs_parse_cache_backend(ctx->cache_backend_str); + + msg_info("hs_helper starting: cache_dir=%s, cache_backend=%s, recompile_time=%.1f, workers_ready=%s", + ctx->hs_dir, + ctx->cache_backend_str ? ctx->cache_backend_str : "file", + ctx->recompile_time, + ctx->workers_ready ? "yes" : "no"); + + /* Initialize Lua cache backend if not using default file backend */ + if (ctx->cache_backend != HS_CACHE_BACKEND_FILE) { + if (!rspamd_hs_helper_init_lua_backend(ctx, worker)) { + msg_warn("failed to initialize %s cache backend, falling back to file", + ctx->cache_backend == HS_CACHE_BACKEND_REDIS ? "redis" : ctx->cache_backend == HS_CACHE_BACKEND_HTTP ? "http" + : "lua"); + ctx->cache_backend = HS_CACHE_BACKEND_FILE; + } + } ctx->event_loop = rspamd_prepare_worker(worker, "hs_helper", diff --git a/src/libserver/hyperscan_tools.cxx b/src/libserver/hyperscan_tools.cxx index a6f5bfdf88..27094c01d6 100644 --- a/src/libserver/hyperscan_tools.cxx +++ b/src/libserver/hyperscan_tools.cxx @@ -35,6 +35,7 @@ #include /* for std::getenv */ #include "unix-std.h" #include "rspamd_control.h" +#include "cryptobox.h" #define HYPERSCAN_LOG_TAG "hsxxxx" @@ -281,20 +282,61 @@ public: }; +/** + * Simple holder for FD-based mmap (no file path) + */ +struct fd_mmap_holder { + void *map = nullptr; + std::size_t size = 0; + int fd = -1; + + fd_mmap_holder() = default; + fd_mmap_holder(void *m, std::size_t s, int f) + : map(m), size(s), fd(f) + { + } + ~fd_mmap_holder() + { + if (map && map != MAP_FAILED) { + munmap(map, size); + } + if (fd >= 0) { + close(fd); + } + } + fd_mmap_holder(const fd_mmap_holder &) = delete; + fd_mmap_holder &operator=(const fd_mmap_holder &) = delete; + fd_mmap_holder(fd_mmap_holder &&other) noexcept + : map(other.map), size(other.size), fd(other.fd) + { + other.map = nullptr; + other.size = 0; + other.fd = -1; + } + fd_mmap_holder &operator=(fd_mmap_holder &&other) noexcept + { + std::swap(map, other.map); + std::swap(size, other.size); + std::swap(fd, other.fd); + return *this; + } +}; + /** * This is a higher level representation of the cached hyperscan file */ struct hs_shared_database { hs_database_t *db = nullptr; /**< internal database (might be in a shared memory) */ std::optional maybe_map; + std::optional maybe_fd_map; /**< for FD-based loading */ std::string cached_path; ~hs_shared_database() { - if (!maybe_map) { + if (!maybe_map && !maybe_fd_map) { hs_free_database(db); } - // Otherwise, handled by maybe_map dtor + // Otherwise, handled by maybe_map or maybe_fd_map dtor } explicit hs_shared_database(raii_mmaped_file &&map, hs_database_t *db) @@ -313,6 +355,11 @@ struct hs_shared_database { cached_path = ""; } } + explicit hs_shared_database(hs_database_t *db, fd_mmap_holder &&fd_map) + : db(db), maybe_fd_map(std::move(fd_map)) + { + cached_path = ""; + } hs_shared_database(const hs_shared_database &other) = delete; hs_shared_database() = default; hs_shared_database(hs_shared_database &&other) noexcept @@ -323,6 +370,7 @@ struct hs_shared_database { { std::swap(db, other.db); std::swap(maybe_map, other.maybe_map); + std::swap(maybe_fd_map, other.maybe_fd_map); return *this; } }; @@ -396,6 +444,69 @@ hs_is_valid_database(void *raw, std::size_t len, std::string_view fname) -> tl:: return true; } +/** + * Get platform identifier string for hyperscan cache keys + * Format: hs{major}{minor}_{platform}_{features}_{hash8} + * Example: hs54_haswell_avx2_abc12345 + */ +static auto +hs_get_platform_id_impl() -> std::string +{ + hs_platform_info_t plt; + if (hs_populate_platform(&plt) != HS_SUCCESS) { + return "hs_unknown"; + } + + // Parse version string to get major.minor + const char *version_str = hs_version(); + unsigned int major = 0, minor = 0; + sscanf(version_str, "%u.%u", &major, &minor); + + // Determine platform name + const char *platform_name = "generic"; + switch (plt.tune) { + case HS_TUNE_FAMILY_HSW: + platform_name = "haswell"; + break; + case HS_TUNE_FAMILY_SNB: + platform_name = "sandy"; + break; + case HS_TUNE_FAMILY_BDW: + platform_name = "broadwell"; + break; + case HS_TUNE_FAMILY_IVB: + platform_name = "ivy"; + break; + default: + break; + } + + // Build features string + std::string features; + if (plt.cpu_features & HS_CPU_FEATURES_AVX2) { + features = "avx2"; + } + else if (plt.cpu_features & HS_CPU_FEATURES_AVX512) { + features = "avx512"; + } + else { + features = "base"; + } + + // Create hash of platform info for uniqueness + unsigned char hash_out[rspamd_cryptobox_HASHBYTES]; + rspamd_cryptobox_hash_state_t hash_state; + rspamd_cryptobox_hash_init(&hash_state, nullptr, 0); + rspamd_cryptobox_hash_update(&hash_state, reinterpret_cast(version_str), strlen(version_str)); + rspamd_cryptobox_hash_update(&hash_state, reinterpret_cast(&plt), sizeof(plt)); + rspamd_cryptobox_hash_final(&hash_state, hash_out); + + // Format: hs{major}{minor}_{platform}_{features}_{hash8} + return fmt::format("hs{}{}_{}_{}_{:02x}{:02x}{:02x}{:02x}", + major, minor, platform_name, features, + hash_out[0], hash_out[1], hash_out[2], hash_out[3]); +} + static auto hs_shared_from_unserialized(hs_known_files_cache &hs_cache, raii_mmaped_file &&map) -> tl::expected { @@ -701,4 +812,120 @@ void rspamd_hyperscan_notice_loaded(void) rspamd::util::hs_known_files_cache::get().notice_loaded(); } +const char *rspamd_hyperscan_get_platform_id(void) +{ + static std::string cached_platform_id; + + if (cached_platform_id.empty()) { + cached_platform_id = rspamd::util::hs_get_platform_id_impl(); + } + + return cached_platform_id.c_str(); +} + +rspamd_hyperscan_t *rspamd_hyperscan_from_fd(int fd, gsize size) +{ + if (fd < 0 || size == 0) { + msg_err_hyperscan("invalid fd (%d) or size (%z) for hyperscan database", fd, size); + return nullptr; + } + + void *map = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0); + if (map == MAP_FAILED) { + msg_err_hyperscan("cannot mmap fd %d: %s", fd, strerror(errno)); + return nullptr; + } + + auto is_valid = rspamd::util::hs_is_valid_database(map, size, "fd"); + if (!is_valid) { + msg_err_hyperscan("invalid hyperscan database from fd: %s", is_valid.error().c_str()); + munmap(map, size); + return nullptr; + } + + auto *db = reinterpret_cast(map); + // Create fd_mmap_holder to manage the mapping lifetime + // Note: we dup() the fd so the holder owns its own copy + int owned_fd = dup(fd); + if (owned_fd == -1) { + msg_err_hyperscan("cannot dup fd %d: %s", fd, strerror(errno)); + munmap(map, size); + return nullptr; + } + rspamd::util::fd_mmap_holder holder{map, size, owned_fd}; + auto *ndb = new rspamd::util::hs_shared_database{db, std::move(holder)}; + + msg_info_hyperscan("loaded hyperscan database from fd %d, size %z", fd, size); + return C_DB_FROM_CXX(ndb); +} + +gboolean rspamd_hyperscan_create_shared_unser(const char *serialized_data, + gsize serialized_size, + int *out_fd, + gsize *out_size) +{ + if (!serialized_data || serialized_size == 0 || !out_fd || !out_size) { + return FALSE; + } + + std::size_t unserialized_size = 0; + if (hs_serialized_database_size(serialized_data, serialized_size, &unserialized_size) != HS_SUCCESS) { + msg_err_hyperscan("cannot determine unserialized database size"); + return FALSE; + } + + // Create temp file + char tmppath[] = "/tmp/rspamd_hs_XXXXXX"; + int fd = mkstemp(tmppath); + if (fd == -1) { + msg_err_hyperscan("cannot create temp file: %s", strerror(errno)); + return FALSE; + } + + // Unlink immediately - file stays open via FD + if (unlink(tmppath) == -1) { + msg_err_hyperscan("cannot unlink temp file %s: %s", tmppath, strerror(errno)); + close(fd); + return FALSE; + } + + // Extend file to required size + if (ftruncate(fd, unserialized_size) == -1) { + msg_err_hyperscan("cannot ftruncate temp file: %s", strerror(errno)); + close(fd); + return FALSE; + } + + // Map with MAP_SHARED for sharing between processes + void *map = mmap(nullptr, unserialized_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (map == MAP_FAILED) { + msg_err_hyperscan("cannot mmap temp file: %s", strerror(errno)); + close(fd); + return FALSE; + } + + // Deserialize into mapped region + if (hs_deserialize_database_at(serialized_data, serialized_size, reinterpret_cast(map)) != HS_SUCCESS) { + msg_err_hyperscan("cannot deserialize database into shared memory"); + munmap(map, unserialized_size); + close(fd); + return FALSE; + } + + // Change protection to read-only + if (mprotect(map, unserialized_size, PROT_READ) == -1) { + msg_err_hyperscan("cannot mprotect shared memory: %s", strerror(errno)); + } + + munmap(map, unserialized_size); + + *out_fd = fd; + *out_size = unserialized_size; + + msg_info_hyperscan("created shared hyperscan database: serialized %z -> unserialized %z bytes", + serialized_size, unserialized_size); + + return TRUE; +} + #endif// WITH_HYPERSCAN \ No newline at end of file diff --git a/src/libserver/hyperscan_tools.h b/src/libserver/hyperscan_tools.h index 624b7b0693..aac9897f0d 100644 --- a/src/libserver/hyperscan_tools.h +++ b/src/libserver/hyperscan_tools.h @@ -70,6 +70,40 @@ void rspamd_hyperscan_notice_loaded(void); */ void rspamd_hyperscan_cleanup_maybe(void); +/** + * Get a platform identifier string for hyperscan cache keys. + * This includes the hyperscan version, platform tune, and CPU features. + * The returned string is owned by the library and should not be freed. + * @return platform identifier string (e.g., "hs54_haswell_avx2_abc123") + */ +const char *rspamd_hyperscan_get_platform_id(void); + +/** + * Create a hyperscan database wrapper from a file descriptor pointing to + * an unserialized (ready to use) hyperscan database. The FD should be + * suitable for mmap with MAP_SHARED. + * @param fd file descriptor to mmap + * @param size size of the mapped region + * @return database wrapper or NULL on error + */ +rspamd_hyperscan_t *rspamd_hyperscan_from_fd(int fd, gsize size); + +/** + * Create a shared memory region containing an unserialized hyperscan database. + * The returned FD can be passed to other processes via SCM_RIGHTS and used + * with rspamd_hyperscan_from_fd(). The temp file is unlinked immediately + * so it will be cleaned up when all FDs are closed. + * @param serialized_data pointer to serialized hyperscan database + * @param serialized_size size of serialized data + * @param[out] out_fd output file descriptor + * @param[out] out_size output size of unserialized database + * @return TRUE on success + */ +gboolean rspamd_hyperscan_create_shared_unser(const char *serialized_data, + gsize serialized_size, + int *out_fd, + gsize *out_size); + G_END_DECLS #endif diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index 247a6b235d..5048035b46 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -78,6 +78,7 @@ struct rspamd_control_command { gboolean forced; char cache_dir[CONTROL_PATHLEN]; char scope[64]; /* Scope name, NULL means all scopes */ + gsize fd_size; /* Size of FD-based db, 0 if not using FD */ } hs_loaded; struct { char tag[32]; @@ -187,6 +188,7 @@ struct rspamd_srv_command { gboolean forced; char cache_dir[CONTROL_PATHLEN]; char scope[64]; /* Scope name, NULL means all scopes */ + gsize fd_size; /* Size of FD-based db, 0 if not using FD */ } hs_loaded; struct { char tag[32]; diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index b72ee17386..e55138bbb7 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -1908,6 +1908,22 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main, memset(&rep, 0, sizeof(rep)); rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED; + /* + * Check if we received an FD for shared memory hyperscan database. + * FD-based loading is used when hs_helper sends a pre-deserialized + * database via SCM_RIGHTS. This allows workers to mmap the database + * directly without disk I/O. + */ + if (attached_fd >= 0 && cmd->cmd.hs_loaded.fd_size > 0) { + msg_info("received hyperscan fd %d with size %z (scope: %s) - " + "FD-based loading infrastructure ready, using file-based for now", + attached_fd, cmd->cmd.hs_loaded.fd_size, + cmd->cmd.hs_loaded.scope[0] != '\0' ? cmd->cmd.hs_loaded.scope : "all"); + /* Close the FD since we're not using it yet */ + close(attached_fd); + attached_fd = -1; + } + /* Check if this is a scoped notification */ if (cmd->cmd.hs_loaded.scope[0] != '\0') { /* Scoped hyperscan loading */ @@ -1936,6 +1952,11 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main, strerror(errno)); } + /* Close any remaining FD we didn't use */ + if (attached_fd >= 0) { + close(attached_fd); + } + return TRUE; } #endif /* With Hyperscan */