From: Vsevolod Stakhov Date: Tue, 13 Feb 2018 12:38:25 +0000 (+0000) Subject: [Feature] Add new tooling for stats conversation X-Git-Tag: 1.7.0~198 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=0e668b86594901d34e4b31a6016c2aaabcd46f34;p=thirdparty%2Frspamd.git [Feature] Add new tooling for stats conversation --- diff --git a/lualib/stat_tools.lua b/lualib/stat_tools.lua new file mode 100644 index 0000000000..6caf8a4838 --- /dev/null +++ b/lualib/stat_tools.lua @@ -0,0 +1,269 @@ +--[[ +Copyright (c) 2018, Vsevolod Stakhov + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local logger = require "rspamd_logger" +local sqlite3 = require "rspamd_sqlite3" +local redis = require "rspamd_redis" +local util = require "rspamd_util" +local lua_redis = require "lua_redis" +local exports = {} + +local N = "stats_tools" + +-- Performs synchronous conversation of redis schema +local function convert_bayes_schema(cfg, redis_params, key) + +end + +-- It now accepts both ham and spam databases +-- parameters: +-- redis_params - how do we connect to a redis server +-- sqlite_db_spam - name for sqlite database with spam tokens +-- sqlite_db_ham - name for sqlite database with ham tokens +-- symbol_ham - name for symbol representing spam, e.g. BAYES_SPAM +-- symbol_spam - name for symbol representing ham, e.g. BAYES_HAM +-- learn_cache_spam - name for sqlite database with spam learn cache +-- learn_cache_ham - name for sqlite database with ham learn cache +-- reset_previous - if true, then the old database is flushed (slow) +local function convert_sqlite_to_redis(redis_params, + sqlite_db_spam, sqlite_db_ham, symbol_spam, symbol_ham, + learn_cache_db, expire, reset_previous) + local num = 0 + local total = 0 + local nusers = 0 + local lim = 1000 -- Update each 1000 tokens + local users_map = {} + + + local db_spam = sqlite3.open(sqlite_db_spam) + if not db_spam then + logger.errx('Cannot open source db: %s', sqlite_db_spam) + return false + end + local db_ham = sqlite3.open(sqlite_db_ham) + if not db_ham then + logger.errx('Cannot open source db: %s', sqlite_db_ham) + return false + end + + + local res,conn,addr = lua_redis.redis_connect_sync(redis_params, true) + + if not res then + logger.errx("cannot connect to redis server") + return false + end + + if reset_previous then + -- Do a more complicated cleanup + -- execute a lua script that cleans up data + local script = [[ +local members = redis.call('SMEMBERS', KEYS[1]) + +for _,prefix in ipairs(members) do + local keys = redis.call('KEYS', prefix..'*') + redis.call('DEL', keys) +end +]] + -- Common keys + for _,sym in ipairs({symbol_spam, symbol_ham}) do + logger.infox('Cleaning up old data for %s', sym) + conn:add_cmd('EVAL', {script, '1', sym}) + conn:exec() + conn:add_cmd('DEL', {sym .. "_version"}) + conn:add_cmd('DEL', {sym .. "_keys"}) + conn:exec() + end + + if learn_cache_db then + -- Cleanup learned_cache + logger.infox('Cleaning up old data learned cache') + conn:add_cmd('DEL', {"learned_ids"}) + conn:exec() + end + end + + local function convert_db(db, is_spam) + -- Map users and languages + local learns = {} + db:sql('BEGIN;') + -- Fill users mapping + for row in db:rows('SELECT * FROM users;') do + if row.id == '0' then + users_map[row.id] = '' + else + users_map[row.id] = row.name + end + learns[row.id] = row.learns + nusers = nusers + 1 + end + + -- Workaround for old databases + for row in db:rows('SELECT * FROM languages') do + if learns['0'] then + learns['0'] = learns['0'] + row.learns + else + learns['0'] = row.learns + end + end + + local function send_batch(tokens, prefix) + -- We use the new schema: RS[user]_token -> H=ham count + -- S=spam count + local hash_key = 'H' + if is_spam then + hash_key = 'S' + end + for _,tok in tokens do + -- tok schema: + -- tok[1] = token_id (uint64 represented as a string) + -- tok[2] = token value (number) + -- tok[3] = user_map[user_id] or '' + local rkey = string.format('%s%s_%s', prefix, tok[3], tok[1]) + conn:add_cmd('HINCRBYFLOAT', {rkey, hash_key, tostring(tok[2])}) + + if expire and expire ~= 0 then + conn:add_cmd('EXPIRE', {rkey, tostring(expire)}) + end + end + + return conn:exec() + end + -- Fill tokens, sending data to redis each `lim` records + + local tokens = {} + for row in db:rows('SELECT token,value,user FROM tokens;') do + local user = '' + if row.user ~= 0 and users_map[row.user] then + user = users_map[row.user] + end + + table.insert(tokens, {row.token, row.value, user}) + + num = num + 1 + total = total + 1 + if num > lim then + -- TODO: we use the default 'RS' prefix, it can be false in case of + -- classifiers with labels + local ret,err_str = send_batch(tokens, 'RS') + if not ret then + logger.errx('Cannot send tokens to the redis server: ' .. err_str) + db:sql('COMMIT;') + return false + end + + num = 0 + tokens = {} + end + end + -- Last batch + if #tokens > 0 then + local ret,err_str = send_batch(tokens, 'RS') + if not ret then + logger.errx('Cannot send tokens to the redis server: ' .. err_str) + db:sql('COMMIT;') + return false + end + end + + -- Close DB + db:sql('COMMIT;') + local symbol = symbol_ham + local learns_elt = "learns_ham" + + if is_spam then + symbol = symbol_spam + learns_elt = "learns_spam" + end + + for id,learned in pairs(learns) do + local user = users_map[id] + if not conn:add_cmd('HSET', {'RS' .. user, learns_elt, learned}) then + logger.errx('Cannot update learns for user: ' .. user) + return false + end + if user ~= '' then + if not conn:add_cmd('SADD', {symbol .. '_keys', 'RS' .. user}) then + logger.errx('Cannot update learns for user: ' .. user) + return false + end + end + end + -- Set version + conn:add_cmd('SET', {symbol..'_version', '2'}) + return conn:exec() + end + + if not convert_db(db_spam, true) then + return false + end + + if not convert_db(db_ham, false) then + return false + end + + if learn_cache_db then + logger.infox('Convert learned ids from %s', learn_cache_db) + local db = sqlite3.open(learn_cache_db) + local ret = true + local err_str + local converted = 0 + + if not db then + logger.errx('Cannot open cache database: ' .. learn_cache_db) + return false + end + + db:sql('BEGIN;') + + for row in db:rows('SELECT * FROM learns;') do + local is_spam + local digest = tostring(util.encode_base32(row.digest)) + + if row.flag == '0' then + is_spam = '-1' + else + is_spam = '1' + end + + if not conn:add_cmd('HSET', {'learned_ids', digest, is_spam}) then + logger.errx('Cannot add hash: ' .. digest) + ret = false + else + converted = converted + 1 + end + end + db:sql('COMMIT;') + + if ret then + ret,err_str = conn:exec() + end + + if ret then + logger.infox('Converted %d cached items from sqlite3 learned cache to redis', + converted) + else + logger.errx('Error occurred during sending data to redis: ' .. err_str) + end + end + + logger.infox('Migrated %d tokens for %d users for symbol %s', + total, nusers, res['symbol']) +end + +exports.convert_sqlite_to_redis = convert_sqlite_to_redis + +return exports \ No newline at end of file