--- /dev/null
+--[[
+Copyright (c) 2026, Vsevolod Stakhov <vsevolod@rspamd.com>
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+]]
+
+-- Lua module for fuzzy Redis backend update path (following the Bayes pattern)
+
+local exports = {}
+local lua_redis = require "lua_redis"
+local logger = require "rspamd_logger"
+
+local N = "fuzzy_redis"
+
+local function gen_update_functor(redis_params, update_script_id)
+ -- Returns function(ev_base, prefix, updates, src, expire, callback)
+ -- updates is an array of tables: {op, digest, flag, value, is_weak, shingle_keys}
+ -- callback(success_boolean) is called when all operations complete
+ return function(ev_base, prefix, updates, src, expire, callback)
+ local n_ops = 0
+ local n_completed = 0
+ local has_error = false
+ local count_key = prefix .. "_count"
+
+ -- Count actual operations (skip "dup")
+ for _, upd in ipairs(updates) do
+ if upd.op ~= "dup" then
+ n_ops = n_ops + 1
+ end
+ end
+
+ -- Final step: INCR version key and invoke callback
+ local function do_version_incr()
+ local version_key = prefix .. src
+
+ local function version_cb(err, _)
+ if err then
+ logger.errx(rspamd_config, '%s: version INCR failed for %s: %s',
+ N, version_key, err)
+ end
+ callback(not has_error)
+ end
+
+ if not lua_redis.redis_make_request_taskless(ev_base, rspamd_config,
+ redis_params, version_key, true, version_cb, 'INCR', { version_key }) then
+ logger.errx(rspamd_config, '%s: cannot make version INCR request', N)
+ callback(false)
+ end
+ end
+
+ -- Called when one exec_redis_script completes
+ local function on_op_complete()
+ n_completed = n_completed + 1
+ if n_completed >= n_ops then
+ do_version_incr()
+ end
+ end
+
+ -- If no actual operations, just do version INCR
+ if n_ops == 0 then
+ do_version_incr()
+ return
+ end
+
+ for _, upd in ipairs(updates) do
+ if upd.op ~= "dup" then
+ local hash_key = prefix .. upd.digest
+
+ -- Build KEYS array for the Redis script
+ local keys = {
+ hash_key,
+ upd.op,
+ tostring(upd.flag),
+ tostring(upd.value),
+ tostring(expire),
+ tostring(upd.timestamp),
+ tostring(upd.is_weak),
+ count_key,
+ upd.digest,
+ }
+
+ -- Append shingle keys if present
+ if upd.shingle_keys then
+ for _, sk in ipairs(upd.shingle_keys) do
+ keys[#keys + 1] = sk
+ end
+ end
+
+ local function update_cb(err, _)
+ if err then
+ logger.errx(rspamd_config, '%s: update script failed: %s', N, err)
+ has_error = true
+ end
+ on_op_complete()
+ end
+
+ lua_redis.exec_redis_script(update_script_id,
+ { ev_base = ev_base, is_write = true, key = hash_key },
+ update_cb, keys)
+ end
+ end
+ end
+end
+
+-- Initialize fuzzy Redis update module
+-- @param redis_params table returned by lua_redis.try_load_redis_servers
+-- @return update functor or nil on error
+exports.lua_fuzzy_redis_init = function(redis_params)
+ if not redis_params then
+ logger.errx(rspamd_config, '%s: no redis params provided', N)
+ return nil
+ end
+
+ local update_script_id, err = lua_redis.load_redis_script_from_file(
+ "fuzzy_update.lua", redis_params)
+ if not update_script_id then
+ logger.errx(rspamd_config, '%s: cannot load fuzzy_update.lua: %s', N,
+ err or "unknown error")
+ return nil
+ end
+
+ return gen_update_functor(redis_params, update_script_id)
+end
+
+return exports
--- /dev/null
+-- Copyright 2026 Vsevolod Stakhov
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- Fuzzy hash update script (per-hash, atomic)
+-- Handles ADD, DEL, and REFRESH operations including multi-flag merge and shingles
+--
+-- KEYS[1] = hash_key (prefix + digest)
+-- KEYS[2] = operation: "add", "del", "refresh"
+-- KEYS[3] = flag (string number)
+-- KEYS[4] = value (string number)
+-- KEYS[5] = expire (string number, seconds)
+-- KEYS[6] = timestamp (string number, calendar seconds)
+-- KEYS[7] = is_weak ("0" or "1")
+-- KEYS[8] = count_key (prefix .. "_count")
+-- KEYS[9] = digest (raw bytes, used as value for shingle SETEX)
+-- KEYS[10..] = shingle keys (0 or 32 of them)
+
+local key = KEYS[1]
+local op = KEYS[2]
+local new_flag = tonumber(KEYS[3])
+local new_value = tonumber(KEYS[4])
+local expire = tonumber(KEYS[5])
+local timestamp = KEYS[6]
+local is_weak = tonumber(KEYS[7])
+local count_key = KEYS[8]
+local digest = KEYS[9]
+
+if op == "add" then
+ -- Multi-flag merge logic: up to 8 flag slots (primary '' + extra '1'..'7')
+ local data = redis.call('HGETALL', key)
+ local fields = {}
+ for i = 1, #data, 2 do
+ fields[data[i]] = data[i+1]
+ end
+
+ local slots = {}
+ local n_slots = 0
+
+ -- Check primary slot
+ if fields['V'] and fields['F'] then
+ slots[''] = {flag=tonumber(fields['F']), value=tonumber(fields['V'])}
+ n_slots = n_slots + 1
+ end
+ -- Check extra slots 1..7
+ for i = 1, 7 do
+ local si = tostring(i)
+ if fields['V'..si] and fields['F'..si] then
+ slots[si] = {flag=tonumber(fields['F'..si]), value=tonumber(fields['V'..si])}
+ n_slots = n_slots + 1
+ end
+ end
+
+ -- Try to find existing slot with same flag
+ local found_slot = nil
+ for slot, entry in pairs(slots) do
+ if entry.flag == new_flag then
+ found_slot = slot
+ break
+ end
+ end
+
+ if found_slot then
+ -- Increment existing slot value
+ redis.call('HINCRBY', key, 'V'..found_slot, new_value)
+ elseif n_slots == 0 then
+ -- Empty hash: create primary slot
+ if is_weak == 1 then
+ redis.call('HSETNX', key, 'F', new_flag)
+ redis.call('HSETNX', key, 'V', new_value)
+ else
+ redis.call('HSET', key, 'F', new_flag, 'V', new_value)
+ end
+ slots[''] = {flag=new_flag, value=new_value}
+ n_slots = 1
+ elseif n_slots < 8 then
+ -- Find an empty slot and use it
+ local empty_slot = nil
+ if not slots[''] then
+ empty_slot = ''
+ else
+ for i = 1, 7 do
+ if not slots[tostring(i)] then
+ empty_slot = tostring(i)
+ break
+ end
+ end
+ end
+ if empty_slot then
+ redis.call('HSET', key, 'F'..empty_slot, new_flag, 'V'..empty_slot, new_value)
+ slots[empty_slot] = {flag=new_flag, value=new_value}
+ n_slots = n_slots + 1
+ end
+ else
+ -- All 8 slots full: replace the minimum-value slot if new_value is larger
+ if is_weak == 0 then
+ local min_slot = nil
+ local min_val = nil
+ for slot, entry in pairs(slots) do
+ if min_val == nil or entry.value < min_val then
+ min_val = entry.value
+ min_slot = slot
+ end
+ end
+ if min_val ~= nil and new_value > min_val then
+ redis.call('HSET', key, 'F'..min_slot, new_flag, 'V'..min_slot, new_value)
+ slots[min_slot] = {flag=new_flag, value=new_value}
+ end
+ end
+ end
+
+ -- Ensure primary slot has the highest value (swap if needed)
+ if n_slots > 1 then
+ local max_val = nil
+ local max_slot = nil
+ for slot, _ in pairs(slots) do
+ local v = tonumber(redis.call('HGET', key, 'V'..slot) or '0')
+ if max_val == nil or v > max_val then
+ max_val = v
+ max_slot = slot
+ end
+ end
+ if max_slot ~= nil and max_slot ~= '' and slots[''] then
+ local pv = redis.call('HGET', key, 'V')
+ local pf = redis.call('HGET', key, 'F')
+ local bv = redis.call('HGET', key, 'V'..max_slot)
+ local bf = redis.call('HGET', key, 'F'..max_slot)
+ redis.call('HSET', key, 'V', bv, 'F', bf)
+ redis.call('HSET', key, 'V'..max_slot, pv, 'F'..max_slot, pf)
+ end
+ end
+
+ redis.call('HSETNX', key, 'C', timestamp)
+ redis.call('EXPIRE', key, expire)
+ redis.call('INCR', count_key)
+
+ -- Handle shingles: SETEX each shingle key with expire and digest as value
+ for i = 10, #KEYS do
+ redis.call('SETEX', KEYS[i], expire, digest)
+ end
+
+elseif op == "del" then
+ redis.call('DEL', key)
+ redis.call('DECR', count_key)
+
+ for i = 10, #KEYS do
+ redis.call('DEL', KEYS[i])
+ end
+
+elseif op == "refresh" then
+ redis.call('EXPIRE', key, expire)
+
+ for i = 10, #KEYS do
+ redis.call('EXPIRE', KEYS[i], expire)
+ end
+end
+
+return 1
if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
/* Encrypted reply */
- data = &session->reply;
-
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
- len = sizeof(session->reply);
+ if (session->epoch >= RSPAMD_FUZZY_EPOCH12) {
+ data = &session->reply.v2;
+ len = sizeof(session->reply.v2);
+ }
+ else if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ data = &session->reply.v1;
+ len = sizeof(session->reply.v1);
}
else {
- len = sizeof(session->reply.hdr) + sizeof(session->reply.rep.v1);
+ data = &session->reply.v1;
+ len = sizeof(session->reply.v1.hdr) + sizeof(session->reply.v1.rep.v1);
}
}
else {
- data = &session->reply.rep;
-
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
- len = sizeof(session->reply.rep);
+ if (session->epoch >= RSPAMD_FUZZY_EPOCH12) {
+ data = &session->reply.v2.rep;
+ len = sizeof(session->reply.v2.rep);
+ }
+ else if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ data = &session->reply.v1.rep;
+ len = sizeof(session->reply.v1.rep);
}
else {
- len = sizeof(session->reply.rep.v1);
+ data = &session->reply.v1.rep;
+ len = sizeof(session->reply.v1.rep.v1);
}
}
if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
/* Encrypted reply */
- data = &session->reply;
-
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
- len = sizeof(session->reply);
+ if (session->epoch >= RSPAMD_FUZZY_EPOCH12) {
+ data = &session->reply.v2;
+ len = sizeof(session->reply.v2);
+ }
+ else if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ data = &session->reply.v1;
+ len = sizeof(session->reply.v1);
}
else {
- len = sizeof(session->reply.hdr) + sizeof(session->reply.rep.v1);
+ data = &session->reply.v1;
+ len = sizeof(session->reply.v1.hdr) + sizeof(session->reply.v1.rep.v1);
}
}
else {
- data = &session->reply.rep;
-
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
- len = sizeof(session->reply.rep);
+ if (session->epoch >= RSPAMD_FUZZY_EPOCH12) {
+ data = &session->reply.v2.rep;
+ len = sizeof(session->reply.v2.rep);
+ }
+ else if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ data = &session->reply.v1.rep;
+ len = sizeof(session->reply.v1.rep);
}
else {
- len = sizeof(session->reply.rep.v1);
+ data = &session->reply.v1.rep;
+ len = sizeof(session->reply.v1.rep.v1);
}
}
static void
rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd,
struct rspamd_fuzzy_reply *result,
+ struct rspamd_fuzzy_multiflag_result *mf_result,
struct fuzzy_session *session,
int flags)
{
if (cmd) {
result->v1.tag = cmd->tag;
- memcpy(&session->reply.rep, result, sizeof(*result));
- if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
- /* Hash is too fresh, need to delay it */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- }
+ if (session->epoch >= RSPAMD_FUZZY_EPOCH12) {
+ /* Populate v2 reply with multi-flag data */
+ struct rspamd_fuzzy_reply_v2 *rep_v2 = &session->reply.v2.rep;
+ memset(rep_v2, 0, sizeof(*rep_v2));
- bool default_disabled = false;
+ /* Primary flag goes into v1 sub-struct */
+ rep_v2->v1 = result->v1;
+ memcpy(rep_v2->digest, result->digest, sizeof(rep_v2->digest));
+ rep_v2->ts = result->ts;
- {
- khiter_t k;
+ /* Extra flags from multiflag result */
+ if (mf_result) {
+ rep_v2->n_extra_flags = mf_result->n_extra_flags;
+ memcpy(rep_v2->extra_flags, mf_result->extra_flags,
+ sizeof(struct rspamd_fuzzy_flag_entry) * mf_result->n_extra_flags);
+ }
- k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, session->reply.rep.v1.flag);
+ if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
+ rep_v2->ts = 0;
+ rep_v2->v1.prob = 0.0f;
+ rep_v2->v1.value = 0;
+ }
- if (k != kh_end(session->ctx->default_forbidden_ids)) {
- /* Hash is from a forbidden flag by default */
- default_disabled = true;
+ bool default_disabled = false;
+ {
+ khiter_t k;
+ k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, rep_v2->v1.flag);
+ if (k != kh_end(session->ctx->default_forbidden_ids)) {
+ default_disabled = true;
+ }
}
- }
- if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
+ if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
+ if (rep_v2->v1.prob > 0 && session->key && session->key->forbidden_ids) {
+ khiter_t k;
+ k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, rep_v2->v1.flag);
+ if (k != kh_end(session->key->forbidden_ids)) {
+ rep_v2->ts = 0;
+ rep_v2->v1.prob = 0.0f;
+ rep_v2->v1.value = 0;
+ rep_v2->v1.flag = 0;
+ }
+ }
+ else if (default_disabled) {
+ rep_v2->ts = 0;
+ rep_v2->v1.prob = 0.0f;
+ rep_v2->v1.value = 0;
+ rep_v2->v1.flag = 0;
+ }
- if (session->reply.rep.v1.prob > 0 && session->key && session->key->forbidden_ids) {
- khiter_t k;
+ /* Use a temporary v1 reply for stats (stats API expects rspamd_fuzzy_reply) */
+ struct rspamd_fuzzy_reply stats_rep;
+ memset(&stats_rep, 0, sizeof(stats_rep));
+ stats_rep.v1 = rep_v2->v1;
+ memcpy(stats_rep.digest, rep_v2->digest, sizeof(stats_rep.digest));
+ stats_rep.ts = rep_v2->ts;
+
+ ottery_rand_bytes(session->reply.v2.hdr.nonce,
+ sizeof(session->reply.v2.hdr.nonce));
+
+ len = sizeof(session->reply.v2.rep);
+
+ if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+ rspamd_fuzzy_update_stats(session->ctx,
+ session->epoch,
+ rep_v2->v1.prob > 0.5f,
+ flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+ flags & RSPAMD_FUZZY_REPLY_DELAY,
+ session->key,
+ session->ip_stat,
+ cmd->cmd,
+ &stats_rep,
+ session->timestamp);
+ }
- k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, session->reply.rep.v1.flag);
+ rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &session->reply.v2.rep,
+ len,
+ session->reply.v2.hdr.nonce,
+ session->nm,
+ session->reply.v2.hdr.mac);
+ }
+ else {
+ if (default_disabled) {
+ rep_v2->ts = 0;
+ rep_v2->v1.prob = 0.0f;
+ rep_v2->v1.value = 0;
+ rep_v2->v1.flag = 0;
+ }
- if (k != kh_end(session->key->forbidden_ids)) {
- /* Hash is from a forbidden flag for this key */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- session->reply.rep.v1.flag = 0;
+ struct rspamd_fuzzy_reply stats_rep;
+ memset(&stats_rep, 0, sizeof(stats_rep));
+ stats_rep.v1 = rep_v2->v1;
+ memcpy(stats_rep.digest, rep_v2->digest, sizeof(stats_rep.digest));
+ stats_rep.ts = rep_v2->ts;
+
+ if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+ rspamd_fuzzy_update_stats(session->ctx,
+ session->epoch,
+ rep_v2->v1.prob > 0.5f,
+ flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+ flags & RSPAMD_FUZZY_REPLY_DELAY,
+ session->key,
+ session->ip_stat,
+ cmd->cmd,
+ &stats_rep,
+ session->timestamp);
}
}
- else if (default_disabled) {
- /* Hash is from a forbidden flag by default */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- session->reply.rep.v1.flag = 0;
+ }
+ else {
+ /* EPOCH10/EPOCH11 path — v1 reply */
+ memcpy(&session->reply.v1.rep, result, sizeof(*result));
+
+ if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
+ session->reply.v1.rep.ts = 0;
+ session->reply.v1.rep.v1.prob = 0.0f;
+ session->reply.v1.rep.v1.value = 0;
}
- /* We need also to encrypt reply */
- ottery_rand_bytes(session->reply.hdr.nonce,
- sizeof(session->reply.hdr.nonce));
+ bool default_disabled = false;
+ {
+ khiter_t k;
+ k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, session->reply.v1.rep.v1.flag);
+ if (k != kh_end(session->ctx->default_forbidden_ids)) {
+ default_disabled = true;
+ }
+ }
- /*
- * For old replies we need to encrypt just old part, otherwise
- * decryption would fail due to mac verification mistake
- */
+ if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
+ if (session->reply.v1.rep.v1.prob > 0 && session->key && session->key->forbidden_ids) {
+ khiter_t k;
+ k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, session->reply.v1.rep.v1.flag);
+ if (k != kh_end(session->key->forbidden_ids)) {
+ session->reply.v1.rep.ts = 0;
+ session->reply.v1.rep.v1.prob = 0.0f;
+ session->reply.v1.rep.v1.value = 0;
+ session->reply.v1.rep.v1.flag = 0;
+ }
+ }
+ else if (default_disabled) {
+ session->reply.v1.rep.ts = 0;
+ session->reply.v1.rep.v1.prob = 0.0f;
+ session->reply.v1.rep.v1.value = 0;
+ session->reply.v1.rep.v1.flag = 0;
+ }
+
+ ottery_rand_bytes(session->reply.v1.hdr.nonce,
+ sizeof(session->reply.v1.hdr.nonce));
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
- len = sizeof(session->reply.rep);
+ if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ len = sizeof(session->reply.v1.rep);
+ }
+ else {
+ len = sizeof(session->reply.v1.rep.v1);
+ }
+
+ if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+ rspamd_fuzzy_update_stats(session->ctx,
+ session->epoch,
+ session->reply.v1.rep.v1.prob > 0.5f,
+ flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+ flags & RSPAMD_FUZZY_REPLY_DELAY,
+ session->key,
+ session->ip_stat,
+ cmd->cmd,
+ &session->reply.v1.rep,
+ session->timestamp);
+ }
+
+ rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &session->reply.v1.rep,
+ len,
+ session->reply.v1.hdr.nonce,
+ session->nm,
+ session->reply.v1.hdr.mac);
}
- else {
- len = sizeof(session->reply.rep.v1);
- }
-
- /* Update stats before encryption */
- if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
- rspamd_fuzzy_update_stats(session->ctx,
- session->epoch,
- session->reply.rep.v1.prob > 0.5f,
- flags & RSPAMD_FUZZY_REPLY_SHINGLE,
- flags & RSPAMD_FUZZY_REPLY_DELAY,
- session->key,
- session->ip_stat,
- cmd->cmd,
- &session->reply.rep,
- session->timestamp);
- }
-
- rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &session->reply.rep,
- len,
- session->reply.hdr.nonce,
- session->nm,
- session->reply.hdr.mac);
- }
- else if (default_disabled) {
- /* Hash is from a forbidden flag by default, and there is no encryption override */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- session->reply.rep.v1.flag = 0;
- }
- if (!(flags & RSPAMD_FUZZY_REPLY_ENCRYPTED)) {
- if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
- rspamd_fuzzy_update_stats(session->ctx,
- session->epoch,
- session->reply.rep.v1.prob > 0.5f,
- flags & RSPAMD_FUZZY_REPLY_SHINGLE,
- flags & RSPAMD_FUZZY_REPLY_DELAY,
- session->key,
- session->ip_stat,
- cmd->cmd,
- &session->reply.rep,
- session->timestamp);
+ else if (default_disabled) {
+ session->reply.v1.rep.ts = 0;
+ session->reply.v1.rep.v1.prob = 0.0f;
+ session->reply.v1.rep.v1.value = 0;
+ session->reply.v1.rep.v1.flag = 0;
+ }
+ if (!(flags & RSPAMD_FUZZY_REPLY_ENCRYPTED)) {
+ if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+ rspamd_fuzzy_update_stats(session->ctx,
+ session->epoch,
+ session->reply.v1.rep.v1.prob > 0.5f,
+ flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+ flags & RSPAMD_FUZZY_REPLY_DELAY,
+ session->key,
+ session->ip_stat,
+ cmd->cmd,
+ &session->reply.v1.rep,
+ session->timestamp);
+ }
}
}
}
}
static void
-rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
+rspamd_fuzzy_check_callback(struct rspamd_fuzzy_multiflag_result *mf_result, void *ud)
{
struct fuzzy_session *session = ud;
gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
struct rspamd_fuzzy_cmd *cmd = NULL;
const struct rspamd_shingle *shingle = NULL;
struct rspamd_shingle sgl_cpy;
+ struct rspamd_fuzzy_reply *result = &mf_result->rep;
int send_flags = 0;
switch (session->cmd_type) {
}
lua_settop(L, 0);
- rspamd_fuzzy_make_reply(cmd, result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, result, mf_result, session, send_flags);
REF_RELEASE(session);
return;
}
}
- rspamd_fuzzy_make_reply(cmd, result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, result, mf_result, session, send_flags);
REF_RELEASE(session);
}
}
lua_settop(L, 0);
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
return;
}
if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
result.v1.value = 500;
result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
return;
}
/* Do not accept unencrypted commands */
result.v1.value = 415;
result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
return;
}
if (!rspamd_fuzzy_check_client(session->ctx, session->addr)) {
result.v1.value = 503;
result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
return;
}
result.v1.value = 503;
result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
return;
}
}
if (session->key && !(session->key->flags & FUZZY_KEY_READ)) {
result.v1.value = 503;
result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
return;
}
result.v1.value = 403;
result.v1.prob = 0.0f;
result.v1.flag = 0;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
}
}
else if (cmd->cmd == FUZZY_STAT) {
/* Store high qword in value and low qword in flag */
result.v1.value = (int32_t) ((uint64_t) session->ctx->stat.fuzzy_hashes >> 32);
result.v1.flag = (uint32_t) (session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
}
else if (cmd->cmd == FUZZY_PING) {
result.v1.prob = 1.0f;
result.v1.value = cmd->value;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
}
else {
if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key, cmd->cmd)) {
result.v1.prob = 0.0f;
}
reply:
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_reply(cmd, &result, NULL, session, send_flags);
}
}
enum rspamd_fuzzy_epoch ret = RSPAMD_FUZZY_EPOCH_MAX;
switch (cmd->version & RSPAMD_FUZZY_VERSION_MASK) {
+ case 5:
+ if (cmd->shingles_count > 0) {
+ if (r >= sizeof(struct rspamd_fuzzy_shingle_cmd)) {
+ ret = RSPAMD_FUZZY_EPOCH12;
+ }
+ }
+ else {
+ if (r >= sizeof(*cmd)) {
+ ret = RSPAMD_FUZZY_EPOCH12;
+ }
+ }
+ break;
case 4:
if (cmd->shingles_count > 0) {
if (r >= sizeof(struct rspamd_fuzzy_shingle_cmd)) {
void *subr_ud)
{
struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
- struct rspamd_fuzzy_reply rep;
+ struct rspamd_fuzzy_multiflag_result mf_result;
- rep = rspamd_fuzzy_backend_sqlite_check(sq, cmd, bk->expire);
+ memset(&mf_result, 0, sizeof(mf_result));
+ mf_result.rep = rspamd_fuzzy_backend_sqlite_check(sq, cmd, bk->expire);
if (cb) {
- cb(&rep, ud);
+ cb(&mf_result, ud);
}
}
struct rspamd_fuzzy_backend;
struct rspamd_config;
+/*
+ * Multi-flag result from backend check
+ */
+struct rspamd_fuzzy_multiflag_result {
+ struct rspamd_fuzzy_reply rep;
+ struct rspamd_fuzzy_flag_entry extra_flags[RSPAMD_FUZZY_MAX_EXTRA_FLAGS];
+ uint8_t n_extra_flags;
+};
+
/*
* Callbacks for fuzzy methods
*/
-typedef void (*rspamd_fuzzy_check_cb)(struct rspamd_fuzzy_reply *rep, void *ud);
+typedef void (*rspamd_fuzzy_check_cb)(struct rspamd_fuzzy_multiflag_result *result, void *ud);
typedef void (*rspamd_fuzzy_update_cb)(gboolean success,
unsigned int nadded,
rspamd_fuzzy_check_cb cb, void *ud,
void *subr_ud)
{
- struct rspamd_fuzzy_reply rep;
+ struct rspamd_fuzzy_multiflag_result mf_result;
if (cb) {
- memset(&rep, 0, sizeof(rep));
- cb(&rep, ud);
+ memset(&mf_result, 0, sizeof(mf_result));
+ cb(&mf_result, ud);
}
return;
/*
- * Copyright 2024 Vsevolod Stakhov
+ * Copyright 2026 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
struct rspamd_redis_pool *pool;
double timeout;
int conf_ref;
+ int cbref_update; /* Lua functor ref for updates */
bool terminated;
ref_entry_t ref;
};
enum rspamd_fuzzy_redis_command {
RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
- RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
RSPAMD_FUZZY_REDIS_COMMAND_CHECK
};
enum rspamd_fuzzy_redis_command command;
unsigned int nargs;
- unsigned int nadded;
- unsigned int ndeleted;
- unsigned int nextended;
- unsigned int nignored;
-
union {
rspamd_fuzzy_check_cb cb_check;
- rspamd_fuzzy_update_cb cb_update;
rspamd_fuzzy_version_cb cb_version;
rspamd_fuzzy_count_cb cb_count;
} callback;
static void
rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis *backend)
{
- if (!backend->terminated && backend->conf_ref != -1) {
- luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref);
+ if (!backend->terminated) {
+ if (backend->conf_ref != -1) {
+ luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref);
+ }
+ if (backend->cbref_update != -1) {
+ luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->cbref_update);
+ }
}
if (backend->id) {
backend->timeout = REDIS_DEFAULT_TIMEOUT;
backend->redis_object = REDIS_DEFAULT_OBJECT;
+ backend->cbref_update = -1;
backend->L = L;
ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref);
rspamd_cryptobox_hash_final(&st, id_hash);
backend->id = rspamd_encode_base32(id_hash, sizeof(id_hash), RSPAMD_BASE32_DEFAULT);
+ /* Load Lua update functor following the Bayes pattern */
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ int err_idx = lua_gettop(L);
+
+ if (!rspamd_lua_require_function(L, "lua_fuzzy_redis", "lua_fuzzy_redis_init")) {
+ msg_err_config("cannot require lua_fuzzy_redis.lua_fuzzy_redis_init");
+ lua_settop(L, err_idx - 1);
+ /* Non-fatal: CHECK/COUNT/VERSION still work, only UPDATE is disabled */
+ }
+ else {
+ /* Push redis params table (same as conf_ref) */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref);
+
+ if (lua_pcall(L, 1, 1, err_idx) != 0) {
+ msg_err_config("call to lua_fuzzy_redis_init failed: %s",
+ lua_tostring(L, -1));
+ lua_settop(L, err_idx - 1);
+ }
+ else if (lua_isfunction(L, -1)) {
+ backend->cbref_update = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_settop(L, err_idx - 1);
+ }
+ else {
+ msg_err_config("lua_fuzzy_redis_init returned non-function");
+ lua_settop(L, err_idx - 1);
+ }
+ }
+
return backend;
}
{
struct rspamd_fuzzy_redis_session *session = priv;
redisReply *reply = r, *cur;
- struct rspamd_fuzzy_reply rep;
+ struct rspamd_fuzzy_multiflag_result mf_result;
GString *key;
struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
unsigned int i, found = 0, max_found = 0, cur_found = 0;
ev_timer_stop(session->event_loop, &session->timeout);
- memset(&rep, 0, sizeof(rep));
+ memset(&mf_result, 0, sizeof(mf_result));
if (c->err == 0 && reply != NULL) {
rspamd_upstream_ok(session->up);
if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
session->prob = ((float) max_found) / RSPAMD_SHINGLE_SIZE;
- rep.v1.prob = session->prob;
g_assert(sel != NULL);
- /* Prepare new check command */
+ /* Prepare new check command — use HGETALL for multi-flag */
rspamd_fuzzy_redis_session_free_args(session);
- session->nargs = 5;
+ session->nargs = 2;
session->argv = g_malloc(sizeof(char *) * session->nargs);
session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
key = g_string_new(session->backend->redis_object);
g_string_append_len(key, sel->digest, sizeof(sel->digest));
- session->argv[0] = g_strdup("HMGET");
- session->argv_lens[0] = 5;
+ session->argv[0] = g_strdup("HGETALL");
+ session->argv_lens[0] = 7;
session->argv[1] = key->str;
session->argv_lens[1] = key->len;
- session->argv[2] = g_strdup("V");
- session->argv_lens[2] = 1;
- session->argv[3] = g_strdup("F");
- session->argv_lens[3] = 1;
- session->argv[4] = g_strdup("C");
- session->argv_lens[4] = 1;
g_string_free(key, FALSE); /* Do not free underlying array */
memcpy(session->found_digest, sel->digest,
sizeof(session->cmd->digest));
session->argv_lens) != REDIS_OK) {
if (session->callback.cb_check) {
- memset(&rep, 0, sizeof(rep));
- session->callback.cb_check(&rep, session->cbdata);
+ session->callback.cb_check(&mf_result, session->cbdata);
}
rspamd_fuzzy_redis_session_dtor(session, TRUE);
}
if (session->callback.cb_check) {
- session->callback.cb_check(&rep, session->cbdata);
+ session->callback.cb_check(&mf_result, session->cbdata);
}
}
else {
if (session->callback.cb_check) {
- session->callback.cb_check(&rep, session->cbdata);
+ session->callback.cb_check(&mf_result, session->cbdata);
}
if (c->errstr) {
static void
rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session *session)
{
- struct rspamd_fuzzy_reply rep;
+ struct rspamd_fuzzy_multiflag_result mf_result;
const struct rspamd_fuzzy_shingle_cmd *shcmd;
GString *key;
unsigned int i, init_len;
session->ctx->errstr);
if (session->callback.cb_check) {
- memset(&rep, 0, sizeof(rep));
- session->callback.cb_check(&rep, session->cbdata);
+ memset(&mf_result, 0, sizeof(mf_result));
+ session->callback.cb_check(&mf_result, session->cbdata);
}
rspamd_fuzzy_redis_session_dtor(session, TRUE);
gpointer priv)
{
struct rspamd_fuzzy_redis_session *session = priv;
- redisReply *reply = r, *cur;
- struct rspamd_fuzzy_reply rep;
+ redisReply *reply = r, *cur_key, *cur_val;
+ struct rspamd_fuzzy_multiflag_result mf_result;
gulong value;
- unsigned int found_elts = 0;
+ gboolean found_primary = FALSE;
ev_timer_stop(session->event_loop, &session->timeout);
- memset(&rep, 0, sizeof(rep));
+ memset(&mf_result, 0, sizeof(mf_result));
if (c->err == 0 && reply != NULL) {
rspamd_upstream_ok(session->up);
if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
- cur = reply->element[0];
-
- if (cur->type == REDIS_REPLY_STRING) {
- value = strtoul(cur->str, NULL, 10);
- rep.v1.value = value;
- found_elts++;
- }
-
- cur = reply->element[1];
-
- if (cur->type == REDIS_REPLY_STRING) {
- value = strtoul(cur->str, NULL, 10);
- rep.v1.flag = value;
- found_elts++;
- }
+ /*
+ * HGETALL returns key-value pairs: [k1, v1, k2, v2, ...]
+ * Parse V/F/C (primary) and V1/F1, V2/F2, ..., V7/F7 (extra flags)
+ */
+ int32_t primary_value = 0;
+ uint32_t primary_flag = 0;
+ uint32_t ts = 0;
+ gboolean have_v = FALSE, have_f = FALSE;
+
+ /* Temporary storage for extra flag slots */
+ int32_t extra_values[RSPAMD_FUZZY_MAX_EXTRA_FLAGS];
+ uint32_t extra_flags[RSPAMD_FUZZY_MAX_EXTRA_FLAGS];
+ gboolean extra_have_v[RSPAMD_FUZZY_MAX_EXTRA_FLAGS];
+ gboolean extra_have_f[RSPAMD_FUZZY_MAX_EXTRA_FLAGS];
+
+ memset(extra_have_v, 0, sizeof(extra_have_v));
+ memset(extra_have_f, 0, sizeof(extra_have_f));
+ memset(extra_values, 0, sizeof(extra_values));
+ memset(extra_flags, 0, sizeof(extra_flags));
+
+ for (gsize i = 0; i + 1 < reply->elements; i += 2) {
+ cur_key = reply->element[i];
+ cur_val = reply->element[i + 1];
+
+ if (cur_key->type != REDIS_REPLY_STRING || cur_val->type != REDIS_REPLY_STRING) {
+ continue;
+ }
- if (found_elts >= 2) {
- rep.v1.prob = session->prob;
- memcpy(rep.digest, session->found_digest, sizeof(rep.digest));
+ if (cur_key->len == 1) {
+ switch (cur_key->str[0]) {
+ case 'V':
+ primary_value = strtol(cur_val->str, NULL, 10);
+ have_v = TRUE;
+ break;
+ case 'F':
+ primary_flag = strtoul(cur_val->str, NULL, 10);
+ have_f = TRUE;
+ break;
+ case 'C':
+ ts = strtoul(cur_val->str, NULL, 10);
+ break;
+ default:
+ break;
+ }
+ }
+ else if (cur_key->len == 2 && cur_key->str[0] >= 'A') {
+ /* Extra flag fields: V1..V7, F1..F7 */
+ int slot = cur_key->str[1] - '1';
+ if (slot >= 0 && slot < RSPAMD_FUZZY_MAX_EXTRA_FLAGS) {
+ if (cur_key->str[0] == 'V') {
+ extra_values[slot] = strtol(cur_val->str, NULL, 10);
+ extra_have_v[slot] = TRUE;
+ }
+ else if (cur_key->str[0] == 'F') {
+ extra_flags[slot] = strtoul(cur_val->str, NULL, 10);
+ extra_have_f[slot] = TRUE;
+ }
+ }
+ }
}
- rep.ts = 0;
-
- if (reply->elements > 2) {
- cur = reply->element[2];
-
- if (cur->type == REDIS_REPLY_STRING) {
- rep.ts = strtoul(cur->str, NULL, 10);
+ if (have_v && have_f) {
+ found_primary = TRUE;
+ mf_result.rep.v1.value = primary_value;
+ mf_result.rep.v1.flag = primary_flag;
+ mf_result.rep.v1.prob = session->prob;
+ mf_result.rep.ts = ts;
+ memcpy(mf_result.rep.digest, session->found_digest,
+ sizeof(mf_result.rep.digest));
+
+ /* Collect extra flags */
+ for (int j = 0; j < RSPAMD_FUZZY_MAX_EXTRA_FLAGS; j++) {
+ if (extra_have_v[j] && extra_have_f[j]) {
+ int idx = mf_result.n_extra_flags;
+ mf_result.extra_flags[idx].value = extra_values[j];
+ mf_result.extra_flags[idx].flag = extra_flags[j];
+ mf_result.n_extra_flags++;
+ }
}
}
}
reply->str);
}
- if (found_elts < 2) {
+ if (!found_primary) {
if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
/* We also need to check all shingles here */
rspamd_fuzzy_backend_check_shingles(session);
}
else {
if (session->callback.cb_check) {
- session->callback.cb_check(&rep, session->cbdata);
+ session->callback.cb_check(&mf_result, session->cbdata);
}
}
}
else {
if (session->callback.cb_check) {
- session->callback.cb_check(&rep, session->cbdata);
+ session->callback.cb_check(&mf_result, session->cbdata);
}
}
}
else {
if (session->callback.cb_check) {
- session->callback.cb_check(&rep, session->cbdata);
+ session->callback.cb_check(&mf_result, session->cbdata);
}
if (c->errstr) {
struct upstream *up;
struct upstream_list *ups;
rspamd_inet_addr_t *addr;
- struct rspamd_fuzzy_reply rep;
+ struct rspamd_fuzzy_multiflag_result mf_result;
GString *key;
g_assert(backend != NULL);
ups = rspamd_redis_get_servers(backend, "read_servers");
if (!ups) {
if (cb) {
- memset(&rep, 0, sizeof(rep));
- cb(&rep, ud);
+ memset(&mf_result, 0, sizeof(mf_result));
+ cb(&mf_result, ud);
}
return;
session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
session->cmd = cmd;
session->prob = 1.0;
- memcpy(rep.digest, session->cmd->digest, sizeof(rep.digest));
- memcpy(session->found_digest, session->cmd->digest, sizeof(rep.digest));
+ memcpy(session->found_digest, session->cmd->digest, sizeof(session->found_digest));
session->event_loop = rspamd_fuzzy_backend_event_base(bk);
- /* First of all check digest */
- session->nargs = 5;
+ /* First of all check digest — use HGETALL to get all flags */
+ session->nargs = 2;
session->argv = g_malloc(sizeof(char *) * session->nargs);
session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
key = g_string_new(backend->redis_object);
g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- session->argv[0] = g_strdup("HMGET");
- session->argv_lens[0] = 5;
+ session->argv[0] = g_strdup("HGETALL");
+ session->argv_lens[0] = 7;
session->argv[1] = key->str;
session->argv_lens[1] = key->len;
- session->argv[2] = g_strdup("V");
- session->argv_lens[2] = 1;
- session->argv[3] = g_strdup("F");
- session->argv_lens[3] = 1;
- session->argv[4] = g_strdup("C");
- session->argv_lens[4] = 1;
g_string_free(key, FALSE); /* Do not free underlying array */
up = rspamd_upstream_get(ups,
if (up == NULL) {
rspamd_fuzzy_redis_session_dtor(session, TRUE);
if (cb) {
- memset(&rep, 0, sizeof(rep));
- cb(&rep, ud);
+ memset(&mf_result, 0, sizeof(mf_result));
+ cb(&mf_result, ud);
}
return;
}
rspamd_fuzzy_redis_session_dtor(session, TRUE);
if (cb) {
- memset(&rep, 0, sizeof(rep));
- cb(&rep, ud);
+ memset(&mf_result, 0, sizeof(mf_result));
+ cb(&mf_result, ud);
}
}
else {
rspamd_fuzzy_redis_session_dtor(session, TRUE);
if (cb) {
- memset(&rep, 0, sizeof(rep));
- cb(&rep, ud);
+ memset(&mf_result, 0, sizeof(mf_result));
+ cb(&mf_result, ud);
}
}
else {
g_assert(backend != NULL);
}
-static gboolean
-rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend *bk,
- struct rspamd_fuzzy_redis_session *session,
- struct fuzzy_peer_cmd *io_cmd, unsigned int *shift)
-{
- GString *key, *value;
- unsigned int cur_shift = *shift;
- unsigned int i, klen;
- struct rspamd_fuzzy_cmd *cmd;
-
- if (io_cmd->is_shingle) {
- cmd = &io_cmd->cmd.shingle.basic;
- }
- else {
- cmd = &io_cmd->cmd.normal;
- }
-
- if (cmd->cmd == FUZZY_WRITE) {
- /*
- * For each normal hash addition we do 5 redis commands:
- * HSET <key> F <flag>
- * HSETNX <key> C <time>
- * HINCRBY <key> V <weight>
- * EXPIRE <key> <expire>
- * Where <key> is <prefix> || <digest>
- */
-
- /* HSET */
- klen = strlen(session->backend->redis_object) +
- sizeof(cmd->digest) + 1;
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- value = g_string_sized_new(sizeof("4294967296"));
- rspamd_printf_gstring(value, "%d", cmd->flag);
-
- if (cmd->version & RSPAMD_FUZZY_FLAG_WEAK) {
- session->argv[cur_shift] = g_strdup("HSETNX");
- session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1;
- }
- else {
- session->argv[cur_shift] = g_strdup("HSET");
- session->argv_lens[cur_shift++] = sizeof("HSET") - 1;
- }
-
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = g_strdup("F");
- session->argv_lens[cur_shift++] = sizeof("F") - 1;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 4,
- (const char **) &session->argv[cur_shift - 4],
- &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
-
- return FALSE;
- }
-
- /* HSETNX */
- klen = strlen(session->backend->redis_object) +
- sizeof(cmd->digest) + 1;
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- value = g_string_sized_new(sizeof("18446744073709551616"));
- rspamd_printf_gstring(value, "%L", (int64_t) rspamd_get_calendar_ticks());
- session->argv[cur_shift] = g_strdup("HSETNX");
- session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = g_strdup("C");
- session->argv_lens[cur_shift++] = sizeof("C") - 1;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 4,
- (const char **) &session->argv[cur_shift - 4],
- &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
-
- return FALSE;
- }
-
- /* HINCRBY */
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- value = g_string_sized_new(sizeof("4294967296"));
- rspamd_printf_gstring(value, "%d", cmd->value);
- session->argv[cur_shift] = g_strdup("HINCRBY");
- session->argv_lens[cur_shift++] = sizeof("HINCRBY") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = g_strdup("V");
- session->argv_lens[cur_shift++] = sizeof("V") - 1;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 4,
- (const char **) &session->argv[cur_shift - 4],
- &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
-
- return FALSE;
- }
-
- /* EXPIRE */
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- value = g_string_sized_new(sizeof("4294967296"));
- rspamd_printf_gstring(value, "%d",
- (int) rspamd_fuzzy_backend_get_expire(bk));
- session->argv[cur_shift] = g_strdup("EXPIRE");
- session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 3,
- (const char **) &session->argv[cur_shift - 3],
- &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
-
- return FALSE;
- }
+/*
+ * Callback data for Lua-based fuzzy update.
+ * Stored as lightuserdata upvalue in the C closure passed to Lua.
+ */
+struct rspamd_fuzzy_lua_update_cbdata {
+ rspamd_fuzzy_update_cb cb;
+ void *ud;
+ struct rspamd_fuzzy_backend_redis *backend;
+ unsigned int nadded;
+ unsigned int ndeleted;
+ unsigned int nextended;
+ unsigned int nignored;
+};
- /* INCR */
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append(key, "_count");
- session->argv[cur_shift] = g_strdup("INCR");
- session->argv_lens[cur_shift++] = sizeof("INCR") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- g_string_free(key, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 2,
- (const char **) &session->argv[cur_shift - 2],
- &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
-
- return FALSE;
- }
- }
- else if (cmd->cmd == FUZZY_DEL) {
- /* DEL */
- klen = strlen(session->backend->redis_object) +
- sizeof(cmd->digest) + 1;
-
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- session->argv[cur_shift] = g_strdup("DEL");
- session->argv_lens[cur_shift++] = sizeof("DEL") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- g_string_free(key, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 2,
- (const char **) &session->argv[cur_shift - 2],
- &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
-
- return FALSE;
- }
+/*
+ * C closure called from Lua when all fuzzy update operations complete.
+ * Lua calls this as: callback(success_boolean)
+ * Upvalue 1: lightuserdata -> struct rspamd_fuzzy_lua_update_cbdata
+ */
+static int
+rspamd_fuzzy_redis_lua_update_cb(lua_State *L)
+{
+ struct rspamd_fuzzy_lua_update_cbdata *cbdata =
+ (struct rspamd_fuzzy_lua_update_cbdata *) lua_touserdata(L, lua_upvalueindex(1));
- /* DECR */
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append(key, "_count");
- session->argv[cur_shift] = g_strdup("DECR");
- session->argv_lens[cur_shift++] = sizeof("DECR") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- g_string_free(key, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 2,
- (const char **) &session->argv[cur_shift - 2],
- &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
-
- return FALSE;
- }
- }
- else if (cmd->cmd == FUZZY_REFRESH) {
- /*
- * Issue refresh command by just EXPIRE command
- * EXPIRE <key> <expire>
- * Where <key> is <prefix> || <digest>
- */
-
- klen = strlen(session->backend->redis_object) +
- sizeof(cmd->digest) + 1;
-
- /* EXPIRE */
- key = g_string_sized_new(klen);
- g_string_append(key, session->backend->redis_object);
- g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
- value = g_string_sized_new(sizeof("4294967296"));
- rspamd_printf_gstring(value, "%d",
- (int) rspamd_fuzzy_backend_get_expire(bk));
- session->argv[cur_shift] = g_strdup("EXPIRE");
- session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 3,
- (const char **) &session->argv[cur_shift - 3],
- &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
-
- return FALSE;
- }
- }
- else if (cmd->cmd == FUZZY_DUP) {
- /* Ignore */
- }
- else {
- g_assert_not_reached();
+ if (cbdata == NULL) {
+ return 0;
}
- if (io_cmd->is_shingle) {
- if (cmd->cmd == FUZZY_WRITE) {
- klen = strlen(session->backend->redis_object) +
- 64 + 1;
-
- for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
- unsigned char *hval;
- /*
- * For each command with shingles we additionally emit 32 commands:
- * SETEX <prefix>_<number>_<value> <expire> <digest>
- */
-
- /* SETEX */
- key = g_string_sized_new(klen);
- rspamd_printf_gstring(key, "%s_%d_%uL",
- session->backend->redis_object,
- i,
- io_cmd->cmd.shingle.sgl.hashes[i]);
- value = g_string_sized_new(sizeof("4294967296"));
- rspamd_printf_gstring(value, "%d",
- (int) rspamd_fuzzy_backend_get_expire(bk));
- hval = g_malloc(sizeof(io_cmd->cmd.shingle.basic.digest));
- memcpy(hval, io_cmd->cmd.shingle.basic.digest,
- sizeof(io_cmd->cmd.shingle.basic.digest));
- session->argv[cur_shift] = g_strdup("SETEX");
- session->argv_lens[cur_shift++] = sizeof("SETEX") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- session->argv[cur_shift] = hval;
- session->argv_lens[cur_shift++] = sizeof(io_cmd->cmd.shingle.basic.digest);
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 4,
- (const char **) &session->argv[cur_shift - 4],
- &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
-
- return FALSE;
- }
- }
- }
- else if (cmd->cmd == FUZZY_DEL) {
- klen = strlen(session->backend->redis_object) +
- 64 + 1;
-
- for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
- key = g_string_sized_new(klen);
- rspamd_printf_gstring(key, "%s_%d_%uL",
- session->backend->redis_object,
- i,
- io_cmd->cmd.shingle.sgl.hashes[i]);
- session->argv[cur_shift] = g_strdup("DEL");
- session->argv_lens[cur_shift++] = sizeof("DEL") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- g_string_free(key, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 2,
- (const char **) &session->argv[cur_shift - 2],
- &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
-
- return FALSE;
- }
- }
- }
- else if (cmd->cmd == FUZZY_REFRESH) {
- klen = strlen(session->backend->redis_object) +
- 64 + 1;
+ gboolean success = lua_toboolean(L, 1);
- for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
- /*
- * For each command with shingles we additionally emit 32 commands:
- * EXPIRE <prefix>_<number>_<value> <expire>
- */
-
- /* Expire */
- key = g_string_sized_new(klen);
- rspamd_printf_gstring(key, "%s_%d_%uL",
- session->backend->redis_object,
- i,
- io_cmd->cmd.shingle.sgl.hashes[i]);
- value = g_string_sized_new(sizeof("18446744073709551616"));
- rspamd_printf_gstring(value, "%d",
- (int) rspamd_fuzzy_backend_get_expire(bk));
- session->argv[cur_shift] = g_strdup("EXPIRE");
- session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- session->argv[cur_shift] = value->str;
- session->argv_lens[cur_shift++] = value->len;
- g_string_free(key, FALSE);
- g_string_free(value, FALSE);
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 3,
- (const char **) &session->argv[cur_shift - 3],
- &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
-
- return FALSE;
- }
- }
- }
- else if (cmd->cmd == FUZZY_DUP) {
- /* Ignore */
- }
- else {
- g_assert_not_reached();
- }
+ if (cbdata->cb) {
+ cbdata->cb(success,
+ cbdata->nadded,
+ cbdata->ndeleted,
+ cbdata->nextended,
+ cbdata->nignored,
+ cbdata->ud);
}
- *shift = cur_shift;
+ REF_RELEASE(cbdata->backend);
+ g_free(cbdata);
- return TRUE;
-}
-
-static void
-rspamd_fuzzy_redis_update_callback(redisAsyncContext *c, gpointer r,
- gpointer priv)
-{
- struct rspamd_fuzzy_redis_session *session = priv;
- redisReply *reply = r;
-
- ev_timer_stop(session->event_loop, &session->timeout);
-
- if (c->err == 0 && reply != NULL) {
- rspamd_upstream_ok(session->up);
-
- if (reply->type == REDIS_REPLY_ARRAY) {
- /* TODO: check all replies somehow */
- if (session->callback.cb_update) {
- session->callback.cb_update(TRUE,
- session->nadded,
- session->ndeleted,
- session->nextended,
- session->nignored,
- session->cbdata);
- }
- }
- else {
- if (reply->type == REDIS_REPLY_ERROR) {
- msg_err_redis_session("fuzzy backend redis error: \"%s\"",
- reply->str);
- }
- if (session->callback.cb_update) {
- session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata);
- }
- }
- }
- else {
- if (session->callback.cb_update) {
- session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata);
- }
-
- if (c->errstr) {
- msg_err_redis_session("error sending update to redis %s: %s",
- rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
- c->errstr);
- rspamd_upstream_fail(session->up, FALSE, c->errstr);
- }
- }
-
- rspamd_fuzzy_redis_session_dtor(session, FALSE);
+ return 0;
}
void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk,
void *subr_ud)
{
struct rspamd_fuzzy_backend_redis *backend = subr_ud;
- struct rspamd_fuzzy_redis_session *session;
- struct upstream *up;
- struct upstream_list *ups;
- rspamd_inet_addr_t *addr;
+ lua_State *L = backend->L;
unsigned int i;
- GString *key;
struct fuzzy_peer_cmd *io_cmd;
- struct rspamd_fuzzy_cmd *cmd = NULL;
- unsigned int nargs, cur_shift;
+ struct rspamd_fuzzy_cmd *cmd;
+ unsigned int nadded = 0, ndeleted = 0, nextended = 0, nignored = 0;
g_assert(backend != NULL);
- ups = rspamd_redis_get_servers(backend, "write_servers");
- if (!ups) {
+ if (backend->cbref_update == -1) {
+ msg_err("fuzzy redis update functor not initialized");
if (cb) {
cb(FALSE, 0, 0, 0, 0, ud);
}
-
return;
}
- session = g_malloc0(sizeof(*session));
- session->backend = backend;
- REF_RETAIN(session->backend);
-
- /*
- * For each normal hash addition we do 3 redis commands:
- * HSET <key> F <flag> **OR** HSETNX <key> F <flag> when flag is weak
- * HINCRBY <key> V <weight>
- * EXPIRE <key> <expire>
- * INCR <prefix||fuzzy_count>
- *
- * Where <key> is <prefix> || <digest>
- *
- * For each command with shingles we additionally emit 32 commands:
- * SETEX <prefix>_<number>_<value> <expire> <digest>
- *
- * For each delete command we emit:
- * DEL <key>
- *
- * For each delete command with shingles we emit also 32 commands:
- * DEL <prefix>_<number>_<value>
- * DECR <prefix||fuzzy_count>
- */
-
- nargs = 4;
-
+ /* Count stats upfront */
for (i = 0; i < updates->len; i++) {
io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
}
if (cmd->cmd == FUZZY_WRITE) {
- nargs += 17;
- session->nadded++;
-
- if (io_cmd->is_shingle) {
- nargs += RSPAMD_SHINGLE_SIZE * 4;
- }
+ nadded++;
}
else if (cmd->cmd == FUZZY_DEL) {
- nargs += 4;
- session->ndeleted++;
-
- if (io_cmd->is_shingle) {
- nargs += RSPAMD_SHINGLE_SIZE * 2;
- }
+ ndeleted++;
}
else if (cmd->cmd == FUZZY_REFRESH) {
- nargs += 3;
- session->nextended++;
-
- if (io_cmd->is_shingle) {
- nargs += RSPAMD_SHINGLE_SIZE * 3;
- }
+ nextended++;
}
else {
- session->nignored++;
+ nignored++;
}
}
- /* Now we need to create a new request */
- session->callback.cb_update = cb;
- session->cbdata = ud;
- session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
- session->cmd = cmd;
- session->prob = 1.0f;
- session->event_loop = rspamd_fuzzy_backend_event_base(bk);
-
- /* First of all check digest */
- session->nargs = nargs;
- session->argv = g_malloc0(sizeof(char *) * session->nargs);
- session->argv_lens = g_malloc0(sizeof(gsize) * session->nargs);
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ int err_idx = lua_gettop(L);
- up = rspamd_upstream_get(ups,
- RSPAMD_UPSTREAM_MASTER_SLAVE,
- NULL,
- 0);
+ /* Push the update functor */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, backend->cbref_update);
- if (up == NULL) {
- rspamd_fuzzy_redis_session_dtor(session, TRUE);
- if (cb) {
- cb(FALSE, 0, 0, 0, 0, ud);
- }
- return;
- }
+ /* Arg 1: ev_base */
+ struct ev_loop **pev = lua_newuserdata(L, sizeof(struct ev_loop *));
+ *pev = rspamd_fuzzy_backend_event_base(bk);
+ rspamd_lua_setclass(L, rspamd_ev_base_classname, -1);
- session->up = rspamd_upstream_ref(up);
- addr = rspamd_upstream_addr_next(up);
- g_assert(addr != NULL);
- session->ctx = rspamd_redis_pool_connect(backend->pool,
- backend->dbname,
- backend->username, backend->password,
- rspamd_inet_address_to_string(addr),
- rspamd_inet_address_get_port(addr));
+ /* Arg 2: prefix */
+ lua_pushstring(L, backend->redis_object);
- if (session->ctx == NULL) {
- rspamd_upstream_fail(up, TRUE, strerror(errno));
- rspamd_fuzzy_redis_session_dtor(session, TRUE);
+ /* Arg 3: updates table */
+ lua_createtable(L, updates->len, 0);
+ for (i = 0; i < updates->len; i++) {
+ io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
- if (cb) {
- cb(FALSE, 0, 0, 0, 0, ud);
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
}
- }
- else {
- /* Start with MULTI command */
- session->argv[0] = g_strdup("MULTI");
- session->argv_lens[0] = 5;
-
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 1,
- (const char **) session->argv,
- session->argv_lens) != REDIS_OK) {
- if (cb) {
- cb(FALSE, 0, 0, 0, 0, ud);
- }
- rspamd_fuzzy_redis_session_dtor(session, TRUE);
+ lua_createtable(L, 0, 6);
- return;
+ /* op */
+ if (cmd->cmd == FUZZY_WRITE) {
+ lua_pushstring(L, "add");
}
+ else if (cmd->cmd == FUZZY_DEL) {
+ lua_pushstring(L, "del");
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ lua_pushstring(L, "refresh");
+ }
+ else {
+ lua_pushstring(L, "dup");
+ }
+ lua_setfield(L, -2, "op");
- /* Now split the rest of commands in packs and emit them command by command */
- cur_shift = 1;
-
- for (i = 0; i < updates->len; i++) {
- io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
+ /* digest (raw bytes) */
+ lua_pushlstring(L, (const char *) cmd->digest, sizeof(cmd->digest));
+ lua_setfield(L, -2, "digest");
- if (!rspamd_fuzzy_update_append_command(bk, session, io_cmd,
- &cur_shift)) {
- if (cb) {
- cb(FALSE, 0, 0, 0, 0, ud);
- }
- rspamd_fuzzy_redis_session_dtor(session, TRUE);
+ /* flag */
+ lua_pushinteger(L, cmd->flag);
+ lua_setfield(L, -2, "flag");
- return;
- }
- }
+ /* value */
+ lua_pushinteger(L, cmd->value);
+ lua_setfield(L, -2, "value");
- /* Now INCR command for the source */
- key = g_string_new(backend->redis_object);
- g_string_append(key, src);
- session->argv[cur_shift] = g_strdup("INCR");
- session->argv_lens[cur_shift++] = 4;
- session->argv[cur_shift] = key->str;
- session->argv_lens[cur_shift++] = key->len;
- g_string_free(key, FALSE);
+ /* is_weak */
+ lua_pushinteger(L, (cmd->version & RSPAMD_FUZZY_FLAG_WEAK) ? 1 : 0);
+ lua_setfield(L, -2, "is_weak");
- if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
- 2,
- (const char **) &session->argv[cur_shift - 2],
- &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+ /* timestamp */
+ lua_pushinteger(L, (lua_Integer) rspamd_get_calendar_ticks());
+ lua_setfield(L, -2, "timestamp");
- if (cb) {
- cb(FALSE, 0, 0, 0, 0, ud);
+ /* shingle_keys (array of strings, only for shingle commands) */
+ if (io_cmd->is_shingle) {
+ unsigned int j;
+ lua_createtable(L, RSPAMD_SHINGLE_SIZE, 0);
+ for (j = 0; j < RSPAMD_SHINGLE_SIZE; j++) {
+ GString *sk = g_string_sized_new(64);
+ rspamd_printf_gstring(sk, "%s_%d_%uL",
+ backend->redis_object,
+ j,
+ io_cmd->cmd.shingle.sgl.hashes[j]);
+ lua_pushlstring(L, sk->str, sk->len);
+ lua_rawseti(L, -2, j + 1);
+ g_string_free(sk, TRUE);
}
- rspamd_fuzzy_redis_session_dtor(session, TRUE);
-
- return;
+ lua_setfield(L, -2, "shingle_keys");
}
- /* Finally we call EXEC with a specific callback */
- session->argv[cur_shift] = g_strdup("EXEC");
- session->argv_lens[cur_shift] = 4;
-
- if (redisAsyncCommandArgv(session->ctx,
- rspamd_fuzzy_redis_update_callback, session,
- 1,
- (const char **) &session->argv[cur_shift],
- &session->argv_lens[cur_shift]) != REDIS_OK) {
+ lua_rawseti(L, -2, i + 1);
+ }
- if (cb) {
- cb(FALSE, 0, 0, 0, 0, ud);
- }
- rspamd_fuzzy_redis_session_dtor(session, TRUE);
+ /* Arg 4: src */
+ lua_pushstring(L, src);
+
+ /* Arg 5: expire */
+ lua_pushnumber(L, rspamd_fuzzy_backend_get_expire(bk));
+
+ /* Arg 6: callback (C closure with cbdata as lightuserdata upvalue) */
+ struct rspamd_fuzzy_lua_update_cbdata *cbdata;
+ cbdata = g_malloc(sizeof(*cbdata));
+ cbdata->cb = cb;
+ cbdata->ud = ud;
+ cbdata->backend = backend;
+ cbdata->nadded = nadded;
+ cbdata->ndeleted = ndeleted;
+ cbdata->nextended = nextended;
+ cbdata->nignored = nignored;
+ REF_RETAIN(backend);
+ lua_pushlightuserdata(L, cbdata);
+ lua_pushcclosure(L, &rspamd_fuzzy_redis_lua_update_cb, 1);
+
+ if (lua_pcall(L, 6, 0, err_idx) != 0) {
+ msg_err("call to fuzzy redis update functor failed: %s",
+ lua_tostring(L, -1));
- return;
- }
- else {
- /* Add timeout */
- session->timeout.data = session;
- ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
- ev_timer_init(&session->timeout,
- rspamd_fuzzy_redis_timeout,
- session->backend->timeout, 0.0);
- ev_timer_start(session->event_loop, &session->timeout);
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
}
+
+ REF_RELEASE(backend);
+ g_free(cbdata);
}
+
+ lua_settop(L, err_idx - 1);
}
void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk,
struct rspamd_fuzzy_storage_ctx *ctx;
struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */
- struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
+ union {
+ struct rspamd_fuzzy_encrypted_reply v1;
+ struct rspamd_fuzzy_encrypted_reply_v2 v2;
+ } reply;
struct fuzzy_key_stat *ip_stat;
enum rspamd_fuzzy_epoch epoch;
extern "C" {
#endif
-#define RSPAMD_FUZZY_VERSION 4
+#define RSPAMD_FUZZY_VERSION 5
#define RSPAMD_FUZZY_KEYLEN 8
#define RSPAMD_FUZZY_FLAG_WEAK (1u << 7u)
enum rspamd_fuzzy_epoch {
RSPAMD_FUZZY_EPOCH10, /**< 1.0+ encryption */
RSPAMD_FUZZY_EPOCH11, /**< 1.7+ extended reply */
+ RSPAMD_FUZZY_EPOCH12, /**< multi-flag reply */
RSPAMD_FUZZY_EPOCH_MAX
};
struct rspamd_fuzzy_reply rep;
};
+#define RSPAMD_FUZZY_MAX_EXTRA_FLAGS 7
+
+RSPAMD_PACKED(rspamd_fuzzy_flag_entry)
+{
+ int32_t value;
+ uint32_t flag;
+};
+
+RSPAMD_PACKED(rspamd_fuzzy_reply_v2)
+{
+ struct rspamd_fuzzy_reply_v1 v1;
+ char digest[rspamd_cryptobox_HASHBYTES];
+ uint32_t ts;
+ uint8_t n_extra_flags;
+ uint8_t reserved[3];
+ struct rspamd_fuzzy_flag_entry extra_flags[RSPAMD_FUZZY_MAX_EXTRA_FLAGS];
+};
+
+RSPAMD_PACKED(rspamd_fuzzy_encrypted_reply_v2)
+{
+ struct rspamd_fuzzy_encrypted_rep_hdr hdr;
+ struct rspamd_fuzzy_reply_v2 rep;
+};
+
static const unsigned char fuzzy_encrypted_magic[4] = {'r', 's', 'f', 'e'};
enum rspamd_fuzzy_extension_type {
unsigned char *data, gsize len)
{
struct rspamd_fuzzy_encrypted_reply encrep;
+ struct rspamd_fuzzy_encrypted_reply_v2 encrep_v2;
const struct rspamd_fuzzy_reply *rep;
+ const struct rspamd_fuzzy_reply_v2 *rep_v2 = NULL;
+ struct rspamd_fuzzy_reply synthetic_rep;
struct fuzzy_rule *rule = conn->rule;
unsigned int required_size;
struct fuzzy_tcp_pending_command *pending;
uint32_t tag;
+ gboolean is_v2 = FALSE;
- /* Check if we have encryption */
+ /* Check minimum size — accept v1 as minimum */
if (conn->encrypted) {
required_size = sizeof(encrep);
}
return;
}
+ /* Detect v2 reply by size */
+ if (conn->encrypted && len >= sizeof(encrep_v2)) {
+ is_v2 = TRUE;
+ }
+ else if (!conn->encrypted && len >= sizeof(struct rspamd_fuzzy_reply_v2)) {
+ is_v2 = TRUE;
+ }
+
/* Decrypt if needed - use keys from connection */
if (conn->encrypted) {
- memcpy(&encrep, data, sizeof(encrep));
-
/* Process keys through cache */
rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
conn->local_key, conn->peer_key);
- /* Decrypt with connection keys */
- if (!rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep.rep,
- sizeof(encrep.rep),
- encrep.hdr.nonce,
- rspamd_pubkey_get_nm(conn->peer_key, conn->local_key),
- encrep.hdr.mac)) {
- msg_warn("fuzzy_tcp: cannot decrypt reply from %s",
- rspamd_upstream_name(conn->server));
- return;
+ if (is_v2) {
+ memcpy(&encrep_v2, data, sizeof(encrep_v2));
+
+ if (!rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep_v2.rep,
+ sizeof(encrep_v2.rep),
+ encrep_v2.hdr.nonce,
+ rspamd_pubkey_get_nm(conn->peer_key, conn->local_key),
+ encrep_v2.hdr.mac)) {
+ msg_warn("fuzzy_tcp: cannot decrypt v2 reply from %s",
+ rspamd_upstream_name(conn->server));
+ return;
+ }
+
+ rep_v2 = &encrep_v2.rep;
+ /* Build a v1-compatible rep from v2 for primary processing */
+ memset(&synthetic_rep, 0, sizeof(synthetic_rep));
+ synthetic_rep.v1 = rep_v2->v1;
+ memcpy(synthetic_rep.digest, rep_v2->digest, sizeof(synthetic_rep.digest));
+ synthetic_rep.ts = rep_v2->ts;
+ rep = &synthetic_rep;
}
+ else {
+ memcpy(&encrep, data, sizeof(encrep));
+
+ if (!rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep.rep,
+ sizeof(encrep.rep),
+ encrep.hdr.nonce,
+ rspamd_pubkey_get_nm(conn->peer_key, conn->local_key),
+ encrep.hdr.mac)) {
+ msg_warn("fuzzy_tcp: cannot decrypt reply from %s",
+ rspamd_upstream_name(conn->server));
+ return;
+ }
- rep = &encrep.rep;
+ rep = &encrep.rep;
+ }
}
else {
- rep = (const struct rspamd_fuzzy_reply *) data;
+ if (is_v2) {
+ rep_v2 = (const struct rspamd_fuzzy_reply_v2 *) data;
+ memset(&synthetic_rep, 0, sizeof(synthetic_rep));
+ synthetic_rep.v1 = rep_v2->v1;
+ memcpy(synthetic_rep.digest, rep_v2->digest, sizeof(synthetic_rep.digest));
+ synthetic_rep.ts = rep_v2->ts;
+ rep = &synthetic_rep;
+ }
+ else {
+ rep = (const struct rspamd_fuzzy_reply *) data;
+ }
}
/* Extract tag and lookup pending command */
/* Process the reply - similar to UDP code in fuzzy_check_try_read */
if (rep->v1.prob > 0.5) {
if (pending->io->cmd.cmd == FUZZY_CHECK) {
+ /* Insert result for primary flag */
fuzzy_insert_result(pending->session, rep, &pending->io->cmd,
pending->io, rep->v1.flag);
+
+ /* Insert results for extra flags from v2 reply */
+ if (rep_v2 && rep_v2->n_extra_flags > 0) {
+ for (uint8_t ei = 0; ei < rep_v2->n_extra_flags && ei < RSPAMD_FUZZY_MAX_EXTRA_FLAGS; ei++) {
+ struct rspamd_fuzzy_reply extra_rep;
+ memset(&extra_rep, 0, sizeof(extra_rep));
+ extra_rep.v1.value = rep_v2->extra_flags[ei].value;
+ extra_rep.v1.flag = rep_v2->extra_flags[ei].flag;
+ extra_rep.v1.tag = rep->v1.tag;
+ extra_rep.v1.prob = rep->v1.prob;
+ memcpy(extra_rep.digest, rep_v2->digest, sizeof(extra_rep.digest));
+ extra_rep.ts = rep_v2->ts;
+
+ fuzzy_insert_result(pending->session, &extra_rep, &pending->io->cmd,
+ pending->io, extra_rep.v1.flag);
+ }
+ }
}
else if (pending->io->cmd.cmd == FUZZY_STAT) {
/*
static const struct rspamd_fuzzy_reply *
fuzzy_process_reply(unsigned char **pos, int *r, GPtrArray *req,
struct fuzzy_rule *rule, struct rspamd_fuzzy_cmd **pcmd,
- struct fuzzy_cmd_io **pio)
+ struct fuzzy_cmd_io **pio,
+ const struct rspamd_fuzzy_reply_v2 **p_rep_v2)
{
unsigned char *p = *pos;
int remain = *r;
struct fuzzy_cmd_io *io;
const struct rspamd_fuzzy_reply *rep;
struct rspamd_fuzzy_encrypted_reply encrep;
+ struct rspamd_fuzzy_encrypted_reply_v2 encrep_v2;
+ static struct rspamd_fuzzy_reply synthetic_rep;
gboolean found = FALSE;
+ gboolean is_v2 = FALSE;
+ if (p_rep_v2) {
+ *p_rep_v2 = NULL;
+ }
+
+ /* Use v1 size as minimum */
if (fuzzy_rule_has_encryption(rule)) {
required_size = sizeof(encrep);
}
else {
- required_size = sizeof(*rep);
+ required_size = sizeof(struct rspamd_fuzzy_reply);
}
if (remain <= 0 || (unsigned int) remain < required_size) {
return NULL;
}
+ /* Detect v2 by available size */
+ if (fuzzy_rule_has_encryption(rule) && (unsigned int) remain >= sizeof(encrep_v2)) {
+ is_v2 = TRUE;
+ }
+ else if (!fuzzy_rule_has_encryption(rule) && (unsigned int) remain >= sizeof(struct rspamd_fuzzy_reply_v2)) {
+ is_v2 = TRUE;
+ }
+
if (fuzzy_rule_has_encryption(rule)) {
- memcpy(&encrep, p, sizeof(encrep));
- *pos += required_size;
- *r -= required_size;
+ gsize actual_size = is_v2 ? sizeof(encrep_v2) : sizeof(encrep);
+ *pos += actual_size;
+ *r -= actual_size;
/* Try to decrypt with available keys, starting with read keys (more common) */
struct rspamd_cryptobox_keypair *local_keys[3];
}
}
- /* Try decryption with each key pair */
- for (i = 0; i < nkeys && !decrypted; i++) {
- struct rspamd_fuzzy_encrypted_reply encrep_copy;
- memcpy(&encrep_copy, &encrep, sizeof(encrep_copy));
+ if (is_v2) {
+ memcpy(&encrep_v2, p, sizeof(encrep_v2));
- rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
- local_keys[i], peer_keys[i]);
+ for (i = 0; i < (unsigned int) nkeys && !decrypted; i++) {
+ struct rspamd_fuzzy_encrypted_reply_v2 encrep_v2_copy;
+ memcpy(&encrep_v2_copy, &encrep_v2, sizeof(encrep_v2_copy));
- if (rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep_copy.rep,
- sizeof(encrep_copy.rep),
- encrep_copy.hdr.nonce,
- rspamd_pubkey_get_nm(peer_keys[i], local_keys[i]),
- encrep_copy.hdr.mac)) {
- /* Successfully decrypted */
- memcpy(&encrep, &encrep_copy, sizeof(encrep));
- decrypted = TRUE;
- break;
+ rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
+ local_keys[i], peer_keys[i]);
+
+ if (rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep_v2_copy.rep,
+ sizeof(encrep_v2_copy.rep),
+ encrep_v2_copy.hdr.nonce,
+ rspamd_pubkey_get_nm(peer_keys[i], local_keys[i]),
+ encrep_v2_copy.hdr.mac)) {
+ memcpy(&encrep_v2, &encrep_v2_copy, sizeof(encrep_v2));
+ decrypted = TRUE;
+ break;
+ }
}
- }
- if (!decrypted) {
- msg_info("cannot decrypt reply with any available keys");
- return NULL;
+ if (!decrypted) {
+ /* Fallback: try as v1 */
+ is_v2 = FALSE;
+ memcpy(&encrep, p, sizeof(encrep));
+
+ for (i = 0; i < (unsigned int) nkeys && !decrypted; i++) {
+ struct rspamd_fuzzy_encrypted_reply encrep_copy;
+ memcpy(&encrep_copy, &encrep, sizeof(encrep_copy));
+
+ rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
+ local_keys[i], peer_keys[i]);
+
+ if (rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep_copy.rep,
+ sizeof(encrep_copy.rep),
+ encrep_copy.hdr.nonce,
+ rspamd_pubkey_get_nm(peer_keys[i], local_keys[i]),
+ encrep_copy.hdr.mac)) {
+ memcpy(&encrep, &encrep_copy, sizeof(encrep));
+ decrypted = TRUE;
+ break;
+ }
+ }
+
+ if (!decrypted) {
+ msg_info("cannot decrypt reply with any available keys");
+ return NULL;
+ }
+
+ memcpy(p, &encrep.rep, sizeof(encrep.rep));
+ }
+ else {
+ /* Copy decrypted v2 back and build synthetic v1 rep */
+ memcpy(p, &encrep_v2.rep, sizeof(encrep_v2.rep));
+ }
}
+ else {
+ memcpy(&encrep, p, sizeof(encrep));
+
+ for (i = 0; i < (unsigned int) nkeys && !decrypted; i++) {
+ struct rspamd_fuzzy_encrypted_reply encrep_copy;
+ memcpy(&encrep_copy, &encrep, sizeof(encrep_copy));
+
+ rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
+ local_keys[i], peer_keys[i]);
+
+ if (rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep_copy.rep,
+ sizeof(encrep_copy.rep),
+ encrep_copy.hdr.nonce,
+ rspamd_pubkey_get_nm(peer_keys[i], local_keys[i]),
+ encrep_copy.hdr.mac)) {
+ memcpy(&encrep, &encrep_copy, sizeof(encrep));
+ decrypted = TRUE;
+ break;
+ }
+ }
+
+ if (!decrypted) {
+ msg_info("cannot decrypt reply with any available keys");
+ return NULL;
+ }
- /* Copy decrypted over the input wire */
- memcpy(p, &encrep.rep, sizeof(encrep.rep));
+ memcpy(p, &encrep.rep, sizeof(encrep.rep));
+ }
}
else {
-
- *pos += required_size;
- *r -= required_size;
+ gsize actual_size = is_v2 ? sizeof(struct rspamd_fuzzy_reply_v2)
+ : sizeof(struct rspamd_fuzzy_reply);
+ *pos += actual_size;
+ *r -= actual_size;
}
- rep = (const struct rspamd_fuzzy_reply *) p;
+ if (is_v2) {
+ const struct rspamd_fuzzy_reply_v2 *rv2 = (const struct rspamd_fuzzy_reply_v2 *) p;
+ memset(&synthetic_rep, 0, sizeof(synthetic_rep));
+ synthetic_rep.v1 = rv2->v1;
+ memcpy(synthetic_rep.digest, rv2->digest, sizeof(synthetic_rep.digest));
+ synthetic_rep.ts = rv2->ts;
+ rep = &synthetic_rep;
+ if (p_rep_v2) {
+ *p_rep_v2 = rv2;
+ }
+ }
+ else {
+ rep = (const struct rspamd_fuzzy_reply *) p;
+ }
/*
* Search for tag
*/
ret = 0;
+ const struct rspamd_fuzzy_reply_v2 *rep_v2 = NULL;
+
while ((rep = fuzzy_process_reply(&p, &r,
- session->commands, session->rule, &cmd, &io)) != NULL) {
+ session->commands, session->rule, &cmd, &io, &rep_v2)) != NULL) {
if (rep->v1.prob > 0.5) {
if (cmd->cmd == FUZZY_CHECK) {
fuzzy_insert_result(session, rep, cmd, io, rep->v1.flag);
+
+ /* Insert extra flag results from v2 reply */
+ if (rep_v2 && rep_v2->n_extra_flags > 0) {
+ for (uint8_t ei = 0; ei < rep_v2->n_extra_flags && ei < RSPAMD_FUZZY_MAX_EXTRA_FLAGS; ei++) {
+ struct rspamd_fuzzy_reply extra_rep;
+ memset(&extra_rep, 0, sizeof(extra_rep));
+ extra_rep.v1.value = rep_v2->extra_flags[ei].value;
+ extra_rep.v1.flag = rep_v2->extra_flags[ei].flag;
+ extra_rep.v1.tag = rep->v1.tag;
+ extra_rep.v1.prob = rep->v1.prob;
+ memcpy(extra_rep.digest, rep_v2->digest, sizeof(extra_rep.digest));
+ extra_rep.ts = rep_v2->ts;
+
+ fuzzy_insert_result(session, &extra_rep, cmd, io, extra_rep.v1.flag);
+ }
+ }
}
else if (cmd->cmd == FUZZY_STAT) {
/*
ret = return_want_more;
while ((rep = fuzzy_process_reply(&p, &r,
- session->commands, session->rule, &cmd, &io)) != NULL) {
+ session->commands, session->rule, &cmd, &io, NULL)) != NULL) {
if ((map =
g_hash_table_lookup(session->rule->mappings,
GINT_TO_POINTER(rep->v1.flag))) == NULL) {
ret = 0;
while ((rep = fuzzy_process_reply(&p, &r,
- session->commands, session->rule, &cmd, &io)) != NULL) {
+ session->commands, session->rule, &cmd, &io, NULL)) != NULL) {
if (rep->v1.prob > 0.5) {
if (cmd->cmd == FUZZY_PING) {