]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Add Redis logic to the clustering module
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 26 Sep 2018 12:54:41 +0000 (13:54 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 26 Sep 2018 17:21:52 +0000 (18:21 +0100)
src/plugins/lua/clustering.lua

index cf74141e77113649a0b37d4ad08b95053765de0b..6a7fd2c8de00acaa2505767bdd7036ff14433017 100644 (file)
@@ -40,9 +40,9 @@ local default_rule = {
   expire_overflow = 36000, -- Expire for a bucket when limit is reached
   spam_mult = 1.0, -- Increase on spam hit
   junk_mult = 0.5, -- Increase on junk
-  ham_mult = 0.1, -- Increase on ham
+  ham_mult = -0.1, -- Increase on ham
   size_mult = 0.01, -- Reaches 1.0 on `max_elts`
-  rate_mult = 0.1,
+  score_mult = 0.1,
 }
 
 local rule_schema = ts.shape{
@@ -53,13 +53,87 @@ local rule_schema = ts.shape{
   junk_mult = ts.number,
   ham_mult = ts.number,
   size_mult = ts.number,
-  rate_mult = ts.number,
+  score_mult = ts.number,
   source_selector = ts.string,
   cluster_selector = ts.string,
   symbol = ts.string:is_optional(),
   prefix = ts.string:is_optional(),
 }
 
+-- Redis scripts
+
+-- Queries for a cluster's data
+-- Arguments:
+-- 1. Source selector (string)
+-- 2. Cluster selector (string)
+-- Returns: {cur_elts, total_score, element_score}
+local query_cluster_script = [[
+local sz = redis.call('HLEN', KEYS[1])
+
+if not sz or not tonumber(sz) then
+  -- New bucket, will update on idempotent phase
+  return {0, '0', '0'}
+end
+
+local total_score = redis.call('HGET', KEYS[1], '__s')
+total_score = tonumber(total_score) or 0
+local score = redis.call('HGET', KEYS[1], KEYS[2])
+if not score or not tonumber(score) then
+  return {sz, tostring(total_score), '0'}
+end
+return {sz, tostring(total_score), tostring(score)}
+]]
+local query_cluster_id
+
+-- Updates cluster's data
+-- Arguments:
+-- 1. Source selector (string)
+-- 2. Cluster selector (string)
+-- 3. Score (number)
+-- 4. Max buckets (number)
+-- 5. Expire (number)
+-- 6. Expire overflow (number)
+-- Returns: nothing
+local update_cluster_script = [[
+local sz = redis.call('HLEN', KEYS[1])
+
+if not sz or not tonumber(sz) then
+  -- Create bucket
+  redis.call('HSET', KEYS[1], KEYS[2], math.abs(KEYS[3]))
+  redis.call('HSET', KEYS[1], '__s', KEYS[3])
+  redis.call('EXPIRE', KEYS[1], KEYS[5])
+
+  return
+end
+
+sz = tonumber(sz)
+local lim = tonumber(KEYS[4])
+
+if sz > lim then
+
+  if k then
+    -- Existing key
+    redis.call('HINCRBYFLOAT', KEYS[1], KEYS[2], math.abs(KEYS[3]))
+  end
+else
+  redis.call('HINCRBYFLOAT', KEYS[1], KEYS[2], math.abs(KEYS[3]))
+  redis.call('EXPIRE', KEYS[1], KEYS[6])
+end
+
+redis.call('HINCRBYFLOAT', KEYS[1], '__s', KEYS[3])
+redis.call('EXPIRE', KEYS[1], KEYS[5])
+]]
+local update_cluster_id
+
+-- Callbacks and logic
+
+local function clusterting_filter_cb(task, rule)
+
+end
+
+local function clusterting_idempotent_cb(task, rule)
+
+end
 -- Init part
 redis_params = lua_redis.parse_redis_server('clustering')
 local opts = rspamd_config:get_all_opt("clustering")
@@ -100,6 +174,9 @@ if opts['rules'] then
   end
 
   if #rules > 0 then
+
+    query_cluster_id = lua_redis.add_redis_script(query_cluster_script, redis_params)
+    update_cluster_id = lua_redis.add_redis_script(update_cluster_script, redis_params)
     local function callback_gen(f, rule)
       return function(task) return f(task, rule) end
     end