]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Rework] Reorganize fuzzy backend structure
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 1 Sep 2016 13:38:20 +0000 (14:38 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 1 Sep 2016 13:38:20 +0000 (14:38 +0100)
src/fuzzy_storage.c
src/libserver/CMakeLists.txt
src/libserver/fuzzy_backend.c
src/libserver/fuzzy_backend.h
src/libserver/fuzzy_backend_sqlite.c [new file with mode: 0644]
src/libserver/fuzzy_backend_sqlite.h [new file with mode: 0644]
src/libserver/fuzzy_wire.h [moved from src/fuzzy_storage.h with 100% similarity]
src/plugins/fuzzy_check.c

index e321aa0b6cf28bb8bc9c204c72e45be2b752f5bc..12d51b362cca1b0508beb6349e239f7ade745f78 100644 (file)
@@ -22,7 +22,7 @@
 #include "util.h"
 #include "rspamd.h"
 #include "map.h"
-#include "fuzzy_storage.h"
+#include "fuzzy_wire.h"
 #include "fuzzy_backend.h"
 #include "ottery.h"
 #include "libserver/worker_util.h"
index 295ad59c882073b0591cdfab7a2c2a92c3f0d071..4f3b9a260f405055839e376c06258f07dace90e9 100644 (file)
@@ -9,6 +9,7 @@ SET(LIBRSPAMDSERVERSRC
                                ${CMAKE_CURRENT_SOURCE_DIR}/dynamic_cfg.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/events.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend.c
+                               ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend_sqlite.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/html.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
index 463fdd1f4cfbf2ee9d83eb512255138ab5bb7664..5c36834666f47df84dad5904266d1d022b6da177 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #include "config.h"
-#include "rspamd.h"
 #include "fuzzy_backend.h"
-#include "unix-std.h"
-
-#include <sqlite3.h>
-#include "libutil/sqlite_utils.h"
-
-struct rspamd_fuzzy_backend {
-       sqlite3 *db;
-       char *path;
-       gchar id[MEMPOOL_UID_LEN];
-       gsize count;
-       gsize expired;
-       rspamd_mempool_t *pool;
-};
-
-static const gdouble sql_sleep_time = 0.1;
-static const guint max_retries = 10;
-
-#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
-        backend->pool->tag.tagname, backend->pool->tag.uid, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-#define msg_warn_fuzzy_backend(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
-        backend->pool->tag.tagname, backend->pool->tag.uid, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-#define msg_info_fuzzy_backend(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
-        backend->pool->tag.tagname, backend->pool->tag.uid, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-#define msg_debug_fuzzy_backend(...)  rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
-        backend->pool->tag.tagname, backend->pool->tag.uid, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-
-static const char *create_tables_sql =
-               "BEGIN;"
-               "CREATE TABLE IF NOT EXISTS digests("
-               "       id INTEGER PRIMARY KEY,"
-               "       flag INTEGER NOT NULL,"
-               "       digest TEXT NOT NULL,"
-               "       value INTEGER,"
-               "       time INTEGER);"
-               "CREATE TABLE IF NOT EXISTS shingles("
-               "       value INTEGER NOT NULL,"
-               "       number INTEGER NOT NULL,"
-               "       digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
-               "       ON UPDATE CASCADE);"
-               "CREATE TABLE IF NOT EXISTS sources("
-               "       name TEXT UNIQUE,"
-               "       version INTEGER,"
-               "       last INTEGER);"
-               "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
-               "CREATE INDEX IF NOT EXISTS t ON digests(time);"
-               "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
-               "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
-               "COMMIT;";
-#if 0
-static const char *create_index_sql =
-               "BEGIN;"
-               "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
-               "CREATE INDEX IF NOT EXISTS t ON digests(time);"
-               "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
-               "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
-               "COMMIT;";
-#endif
-enum rspamd_fuzzy_statement_idx {
-       RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0,
-       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
-       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
-       RSPAMD_FUZZY_BACKEND_INSERT,
-       RSPAMD_FUZZY_BACKEND_UPDATE,
-       RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
-       RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
-       RSPAMD_FUZZY_BACKEND_CHECK,
-       RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
-       RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
-       RSPAMD_FUZZY_BACKEND_DELETE,
-       RSPAMD_FUZZY_BACKEND_COUNT,
-       RSPAMD_FUZZY_BACKEND_EXPIRE,
-       RSPAMD_FUZZY_BACKEND_VACUUM,
-       RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
-       RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
-       RSPAMD_FUZZY_BACKEND_VERSION,
-       RSPAMD_FUZZY_BACKEND_SET_VERSION,
-       RSPAMD_FUZZY_BACKEND_MAX
-};
-static struct rspamd_fuzzy_stmts {
-       enum rspamd_fuzzy_statement_idx idx;
-       const gchar *sql;
-       const gchar *args;
-       sqlite3_stmt *stmt;
-       gint result;
-} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] =
-{
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
-               .sql = "BEGIN TRANSACTION;",
-               .args = "",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
-               .sql = "COMMIT;",
-               .args = "",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
-               .sql = "ROLLBACK;",
-               .args = "",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_INSERT,
-               .sql = "INSERT INTO digests(flag, digest, value, time) VALUES"
-                               "(?1, ?2, ?3, strftime('%s','now'));",
-               .args = "SDI",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_UPDATE,
-               .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE "
-                               "digest==?2;",
-               .args = "ID",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
-               .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE "
-                               "digest==?3;",
-               .args = "IID",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
-               .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) "
-                               "VALUES (?1, ?2, ?3);",
-               .args = "III",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_CHECK,
-               .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
-               .args = "D",
-               .stmt = NULL,
-               .result = SQLITE_ROW
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
-               .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
-               .args = "IS",
-               .stmt = NULL,
-               .result = SQLITE_ROW
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
-               .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1",
-               .args = "I",
-               .stmt = NULL,
-               .result = SQLITE_ROW
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_DELETE,
-               .sql = "DELETE FROM digests WHERE digest==?1;",
-               .args = "D",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_COUNT,
-               .sql = "SELECT COUNT(*) FROM digests;",
-               .args = "",
-               .stmt = NULL,
-               .result = SQLITE_ROW
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
-               .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
-               .args = "II",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_VACUUM,
-               .sql = "VACUUM;",
-               .args = "",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
-               .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;",
-               .args = "II",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
-               .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
-               .args = "TII",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_VERSION,
-               .sql = "SELECT version FROM sources WHERE name=?1;",
-               .args = "T",
-               .stmt = NULL,
-               .result = SQLITE_ROW
-       },
-       {
-               .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
-               .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);",
-               .args = "IIT",
-               .stmt = NULL,
-               .result = SQLITE_DONE
-       },
-};
-
-static GQuark
-rspamd_fuzzy_backend_quark(void)
-{
-       return g_quark_from_static_string ("fuzzy-storage-backend");
-}
-
-static gboolean
-rspamd_fuzzy_backend_prepare_stmts (struct rspamd_fuzzy_backend *bk, GError **err)
-{
-       int i;
-
-       for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) {
-               if (prepared_stmts[i].stmt != NULL) {
-                       /* Skip already prepared statements */
-                       continue;
-               }
-               if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1,
-                               &prepared_stmts[i].stmt, NULL) != SQLITE_OK) {
-                       g_set_error (err, rspamd_fuzzy_backend_quark (),
-                               -1, "Cannot initialize prepared sql `%s`: %s",
-                               prepared_stmts[i].sql, sqlite3_errmsg (bk->db));
-
-                       return FALSE;
-               }
-       }
-
-       return TRUE;
-}
-
-static int
-rspamd_fuzzy_backend_cleanup_stmt (struct rspamd_fuzzy_backend *backend,
-               int idx)
-{
-       sqlite3_stmt *stmt;
-
-       if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
-
-               return -1;
-       }
-
-       msg_debug_fuzzy_backend ("reseting `%s`", prepared_stmts[idx].sql);
-       stmt = prepared_stmts[idx].stmt;
-       sqlite3_clear_bindings (stmt);
-       sqlite3_reset (stmt);
-
-       return SQLITE_OK;
-}
-
-static int
-rspamd_fuzzy_backend_run_stmt (struct rspamd_fuzzy_backend *backend,
-               gboolean auto_cleanup,
-               int idx, ...)
-{
-       int retcode;
-       va_list ap;
-       sqlite3_stmt *stmt;
-       int i;
-       const char *argtypes;
-       guint retries = 0;
-       struct timespec ts;
-
-       if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
-
-               return -1;
-       }
-
-       stmt = prepared_stmts[idx].stmt;
-       g_assert ((int)prepared_stmts[idx].idx == idx);
-
-       if (stmt == NULL) {
-               if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1,
-                               &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) {
-                       msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s",
-                                       prepared_stmts[idx].sql, sqlite3_errmsg (backend->db));
-
-                       return retcode;
-               }
-               stmt = prepared_stmts[idx].stmt;
-       }
-
-       msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup",
-                       prepared_stmts[idx].sql, auto_cleanup ? "with" : "without");
-       argtypes = prepared_stmts[idx].args;
-       sqlite3_clear_bindings (stmt);
-       sqlite3_reset (stmt);
-       va_start (ap, idx);
-
-       for (i = 0; argtypes[i] != '\0'; i++) {
-               switch (argtypes[i]) {
-               case 'T':
-                       sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1,
-                                       SQLITE_STATIC);
-                       break;
-               case 'I':
-                       sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64));
-                       break;
-               case 'S':
-                       sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint));
-                       break;
-               case 'D':
-                       /* Special case for digests variable */
-                       sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64,
-                                       SQLITE_STATIC);
-                       break;
-               }
-       }
-
-       va_end (ap);
-
-retry:
-       retcode = sqlite3_step (stmt);
-
-       if (retcode == prepared_stmts[idx].result) {
-               retcode = SQLITE_OK;
-       }
-       else {
-               if ((retcode == SQLITE_BUSY ||
-                               retcode == SQLITE_LOCKED) && retries++ < max_retries) {
-                       double_to_ts (sql_sleep_time, &ts);
-                       nanosleep (&ts, NULL);
-                       goto retry;
-               }
-
-               msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql,
-                               retcode, sqlite3_errmsg (backend->db));
-       }
-
-       if (auto_cleanup) {
-               sqlite3_clear_bindings (stmt);
-               sqlite3_reset (stmt);
-       }
-
-       return retcode;
-}
-
-static void
-rspamd_fuzzy_backend_close_stmts (struct rspamd_fuzzy_backend *bk)
-{
-       int i;
-
-       for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
-               if (prepared_stmts[i].stmt != NULL) {
-                       sqlite3_finalize (prepared_stmts[i].stmt);
-                       prepared_stmts[i].stmt = NULL;
-               }
-       }
-
-       return;
-}
-
-static gboolean
-rspamd_fuzzy_backend_run_sql (const gchar *sql, struct rspamd_fuzzy_backend *bk,
-               GError **err)
-{
-       guint retries = 0;
-       struct timespec ts;
-       gint ret;
-
-       do {
-               ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL);
-               double_to_ts (sql_sleep_time, &ts);
-       } while (ret == SQLITE_BUSY && retries++ < max_retries &&
-                       nanosleep (&ts, NULL) == 0);
-
-       if (ret != SQLITE_OK) {
-               g_set_error (err, rspamd_fuzzy_backend_quark (),
-                               -1, "Cannot execute raw sql `%s`: %s",
-                               sql, sqlite3_errmsg (bk->db));
-               return FALSE;
-       }
-
-       return TRUE;
-}
-
-static struct rspamd_fuzzy_backend *
-rspamd_fuzzy_backend_open_db (const gchar *path, GError **err)
-{
-       struct rspamd_fuzzy_backend *bk;
-       rspamd_cryptobox_hash_state_t st;
-       guchar hash_out[rspamd_cryptobox_HASHBYTES];
-
-       g_assert (path != NULL);
-
-       bk = g_slice_alloc (sizeof (*bk));
-       bk->path = g_strdup (path);
-       bk->expired = 0;
-       bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend");
-       bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path,
-                       create_tables_sql, 1, err);
-
-       if (bk->db == NULL) {
-               rspamd_fuzzy_backend_close (bk);
-
-               return NULL;
-       }
-
-       if (!rspamd_fuzzy_backend_prepare_stmts (bk, err)) {
-               rspamd_fuzzy_backend_close (bk);
-
-               return NULL;
-       }
-
-       /* Set id for the backend */
-       rspamd_cryptobox_hash_init (&st, NULL, 0);
-       rspamd_cryptobox_hash_update (&st, path, strlen (path));
-       rspamd_cryptobox_hash_final (&st, hash_out);
-       rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out);
-       memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid));
-
-       return bk;
-}
-
-struct rspamd_fuzzy_backend *
-rspamd_fuzzy_backend_open (const gchar *path,
-               gboolean vacuum,
-               GError **err)
-{
-       struct rspamd_fuzzy_backend *backend;
-
-       if (path == NULL) {
-               g_set_error (err, rspamd_fuzzy_backend_quark (),
-                               ENOENT, "Path has not been specified");
-               return NULL;
-       }
-
-       /* Open database */
-       if ((backend = rspamd_fuzzy_backend_open_db (path, err)) == NULL) {
-               return NULL;
-       }
-
-       if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT)
-                       == SQLITE_OK) {
-               backend->count = sqlite3_column_int64 (
-                               prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
-       }
-
-       rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
-
-       return backend;
-}
-
-static gint
-rspamd_fuzzy_backend_int64_cmp (const void *a, const void *b)
-{
-       gint64 ia = *(gint64 *)a, ib = *(gint64 *)b;
-
-       return (ia - ib);
-}
-
-struct rspamd_fuzzy_reply
-rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
-               const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
-{
-       struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0};
-       const struct rspamd_fuzzy_shingle_cmd *shcmd;
-       int rc;
-       gint64 timestamp;
-       gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
-               cur_cnt, max_cnt;
-
-       if (backend == NULL) {
-               return rep;
-       }
-
-       /* Try direct match first of all */
-       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                       RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
-       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                       RSPAMD_FUZZY_BACKEND_CHECK,
-                       cmd->digest);
-
-       if (rc == SQLITE_OK) {
-               timestamp = sqlite3_column_int64 (
-                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
-               if (time (NULL) - timestamp > expire) {
-                       /* Expire element */
-                       msg_debug_fuzzy_backend ("requested hash has been expired");
-               }
-               else {
-                       rep.value = sqlite3_column_int64 (
-                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
-                       rep.prob = 1.0;
-                       rep.flag = sqlite3_column_int (
-                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
-               }
-       }
-       else if (cmd->shingles_count > 0) {
-               /* Fuzzy match */
-
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-               shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd;
-
-               for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
-                       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                                       RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
-                                       shcmd->sgl.hashes[i], i);
-                       if (rc == SQLITE_OK) {
-                               shingle_values[i] = sqlite3_column_int64 (
-                                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt,
-                                               0);
-                       }
-                       else {
-                               shingle_values[i] = -1;
-                       }
-                       msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i,
-                                       shcmd->sgl.hashes[i], rc);
-               }
-
-               rspamd_fuzzy_backend_cleanup_stmt (backend,
-                               RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE);
-
-               qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64),
-                               rspamd_fuzzy_backend_int64_cmp);
-               sel_id = -1;
-               cur_id = -1;
-               cur_cnt = 0;
-               max_cnt = 0;
-
-               for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
-                       if (shingle_values[i] == -1) {
-                               continue;
-                       }
-
-                       /* We have some value here, so we need to check it */
-                       if (shingle_values[i] == cur_id) {
-                               cur_cnt ++;
-                       }
-                       else {
-                               cur_id = shingle_values[i];
-                               if (cur_cnt >= max_cnt) {
-                                       max_cnt = cur_cnt;
-                                       sel_id = cur_id;
-                               }
-                               cur_cnt = 0;
-                       }
-               }
-
-               if (cur_cnt > max_cnt) {
-                       max_cnt = cur_cnt;
-               }
-
-               if (sel_id != -1) {
-                       /* We have some id selected here */
-                       rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;
-
-                       if (rep.prob > 0.5) {
-                               msg_debug_fuzzy_backend (
-                                               "found fuzzy hash with probability %.2f",
-                                               rep.prob);
-                               rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                                               RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
-                               if (rc == SQLITE_OK) {
-                                       timestamp = sqlite3_column_int64 (
-                                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
-                                                       2);
-                                       if (time (NULL) - timestamp > expire) {
-                                               /* Expire element */
-                                               msg_debug_fuzzy_backend (
-                                                               "requested hash has been expired");
-                                               rep.prob = 0.0;
-                                       }
-                                       else {
-                                               rep.value = sqlite3_column_int64 (
-                                                               prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
-                                                               1);
-                                               rep.flag = sqlite3_column_int (
-                                                               prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
-                                                               3);
-                                       }
-                               }
-                       }
-                       else {
-                               /* Otherwise we assume that as error */
-                               rep.value = 0;
-                       }
-
-                       rspamd_fuzzy_backend_cleanup_stmt (backend,
-                                       RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID);
-               }
-       }
-
-       rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
-
-       return rep;
-}
-
-gboolean
-rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
-               const gchar *source)
-{
-       gint rc;
-
-       if (backend == NULL) {
-               return FALSE;
-       }
-
-       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                       RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
-
-       if (rc != SQLITE_OK) {
-               msg_warn_fuzzy_backend ("cannot start transaction for updates: %s",
-                               sqlite3_errmsg (backend->db));
-               return FALSE;
-       }
-
-       return TRUE;
-}
-
-gboolean
-rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
-               const struct rspamd_fuzzy_cmd *cmd)
-{
-       int rc, i;
-       gint64 id, flag;
-       const struct rspamd_fuzzy_shingle_cmd *shcmd;
-
-       if (backend == NULL) {
-               return FALSE;
-       }
-
-       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                       RSPAMD_FUZZY_BACKEND_CHECK,
-                       cmd->digest);
-
-       if (rc == SQLITE_OK) {
-               /* Check flag */
-               flag = sqlite3_column_int64 (
-                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt,
-                               2);
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-
-               if (flag == cmd->flag) {
-                       /* We need to increase weight */
-                       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                       RSPAMD_FUZZY_BACKEND_UPDATE,
-                                       (gint64) cmd->value,
-                                       cmd->digest);
-                       if (rc != SQLITE_OK) {
-                               msg_warn_fuzzy_backend ("cannot update hash to %d -> "
-                                               "%*xs: %s", (gint) cmd->flag,
-                                               (gint) sizeof (cmd->digest), cmd->digest,
-                                               sqlite3_errmsg (backend->db));
-                       }
-               }
-               else {
-                       /* We need to relearn actually */
-
-                       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                       RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
-                                       (gint64) cmd->value,
-                                       (gint64) cmd->flag,
-                                       cmd->digest);
-
-                       if (rc != SQLITE_OK) {
-                               msg_warn_fuzzy_backend ("cannot update hash to %d -> "
-                                               "%*xs: %s", (gint) cmd->flag,
-                                               (gint) sizeof (cmd->digest), cmd->digest,
-                                               sqlite3_errmsg (backend->db));
-                       }
-               }
-       }
-       else {
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-               rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                               RSPAMD_FUZZY_BACKEND_INSERT,
-                               (gint) cmd->flag,
-                               cmd->digest,
-                               (gint64) cmd->value);
-
-               if (rc == SQLITE_OK) {
-                       if (cmd->shingles_count > 0) {
-                               id = sqlite3_last_insert_rowid (backend->db);
-                               shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
-
-                               for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
-                                       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                                       RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
-                                                       shcmd->sgl.hashes[i], (gint64)i, id);
-                                       msg_debug_fuzzy_backend ("add shingle %d -> %L: %L",
-                                                       i,
-                                                       shcmd->sgl.hashes[i],
-                                                       id);
-
-                                       if (rc != SQLITE_OK) {
-                                               msg_warn_fuzzy_backend ("cannot add shingle %d -> "
-                                                               "%L: %L: %s", i,
-                                                               shcmd->sgl.hashes[i],
-                                                               id, sqlite3_errmsg (backend->db));
-                                       }
-                               }
-                       }
-               }
-               else {
-                       msg_warn_fuzzy_backend ("cannot add hash to %d -> "
-                                       "%*xs: %s", (gint)cmd->flag,
-                                       (gint)sizeof (cmd->digest), cmd->digest,
-                                       sqlite3_errmsg (backend->db));
-               }
-
-               rspamd_fuzzy_backend_cleanup_stmt (backend,
-                               RSPAMD_FUZZY_BACKEND_INSERT);
-       }
-
-       return (rc == SQLITE_OK);
-}
-
-gboolean
-rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
-               const gchar *source, gboolean version_bump)
-{
-       gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver;
-
-       /* Get and update version */
-       if (version_bump) {
-               ver = rspamd_fuzzy_backend_version (backend, source);
-               ++ver;
-
-               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                               RSPAMD_FUZZY_BACKEND_SET_VERSION,
-                               (gint64)ver, (gint64)time (NULL), source);
-       }
-
-       if (rc == SQLITE_OK) {
-               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                               RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
-
-               if (rc != SQLITE_OK) {
-                       msg_warn_fuzzy_backend ("cannot commit updates: %s",
-                                       sqlite3_errmsg (backend->db));
-                       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
-                       return FALSE;
-               }
-               else {
-                       if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
-                               msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
-                                               sqlite3_errmsg (backend->db));
-                       }
-                       else if (wal_checkpointed > 0) {
-                               msg_info_fuzzy_backend ("total number of frames in the wal file: "
-                                               "%d, checkpointed: %d", wal_frames, wal_checkpointed);
-                       }
-               }
-       }
-       else {
-               msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
-                               sqlite3_errmsg (backend->db));
-               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                               RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
-               return FALSE;
-       }
-
-       return TRUE;
-}
-
-gboolean
-rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
-               const struct rspamd_fuzzy_cmd *cmd)
-{
-       int rc = -1;
-
-       if (backend == NULL) {
-               return FALSE;
-       }
-
-       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                       RSPAMD_FUZZY_BACKEND_CHECK,
-                       cmd->digest);
-
-       if (rc == SQLITE_OK) {
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-
-               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                               RSPAMD_FUZZY_BACKEND_DELETE,
-                               cmd->digest);
-               if (rc != SQLITE_OK) {
-                       msg_warn_fuzzy_backend ("cannot update hash to %d -> "
-                                       "%*xs: %s", (gint) cmd->flag,
-                                       (gint) sizeof (cmd->digest), cmd->digest,
-                                       sqlite3_errmsg (backend->db));
-               }
-       }
-       else {
-               /* Hash is missing */
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-       }
-
-       return (rc == SQLITE_OK);
-}
-
-gboolean
-rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
-               gint64 expire,
-               gboolean clean_orphaned)
-{
-       struct orphaned_shingle_elt {
-               gint64 value;
-               gint64 number;
-       };
-
-       /* Do not do more than 5k ops per step */
-       const guint64 max_changes = 5000;
-       gboolean ret = FALSE;
-       gint64 expire_lim, expired;
-       gint rc, i, orphaned_cnt = 0;
-       GError *err = NULL;
-       static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
-                       "FROM shingles "
-                       "LEFT JOIN digests ON "
-                       "shingles.digest_id=digests.id WHERE "
-                       "digests.id IS NULL;";
-       sqlite3_stmt *stmt;
-       GArray *orphaned;
-       struct orphaned_shingle_elt orphaned_elt, *pelt;
-
-
-       if (backend == NULL) {
-               return FALSE;
-       }
-
-       /* Perform expire */
-       if (expire > 0) {
-               expire_lim = time (NULL) - expire;
-
-               if (expire_lim > 0) {
-                       ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
-
-                       if (ret == SQLITE_OK) {
-
-                               rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                                               RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);
-
-                               if (rc == SQLITE_OK) {
-                                       expired = sqlite3_changes (backend->db);
-
-                                       if (expired > 0) {
-                                               backend->expired += expired;
-                                               msg_info_fuzzy_backend ("expired %L hashes", expired);
-                                       }
-                               }
-                               else {
-                                       msg_warn_fuzzy_backend (
-                                                       "cannot execute expired statement: %s",
-                                                       sqlite3_errmsg (backend->db));
-                               }
-
-                               rspamd_fuzzy_backend_cleanup_stmt (backend,
-                                               RSPAMD_FUZZY_BACKEND_EXPIRE);
-
-                               ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                               RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
-
-                               if (ret != SQLITE_OK) {
-                                       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
-                               }
-                       }
-                       if (ret != SQLITE_OK) {
-                               msg_warn_fuzzy_backend ("cannot expire db: %s",
-                                               sqlite3_errmsg (backend->db));
-                       }
-               }
-       }
-
-       /* Cleanup database */
-       if (clean_orphaned) {
-               ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                               RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
-
-               if (ret == SQLITE_OK) {
-                       if ((rc = sqlite3_prepare_v2 (backend->db,
-                                       orphaned_shingles,
-                                       -1,
-                                       &stmt,
-                                       NULL)) != SQLITE_OK) {
-                               msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
-                                               sqlite3_errmsg (backend->db));
-                       }
-                       else {
-                               orphaned = g_array_new (FALSE,
-                                               FALSE,
-                                               sizeof (struct orphaned_shingle_elt));
-
-                               while (sqlite3_step (stmt) == SQLITE_ROW) {
-                                       orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
-                                       orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
-                                       g_array_append_val (orphaned, orphaned_elt);
-
-                                       if (orphaned->len > max_changes) {
-                                               break;
-                                       }
-                               }
-
-                               sqlite3_finalize (stmt);
-                               orphaned_cnt = orphaned->len;
-
-                               if (orphaned_cnt > 0) {
-                                       msg_info_fuzzy_backend (
-                                                       "going to delete %ud orphaned shingles",
-                                                       orphaned_cnt);
-                                       /* Need to delete orphaned elements */
-                                       for (i = 0; i < (gint) orphaned_cnt; i++) {
-                                               pelt = &g_array_index (orphaned,
-                                                               struct orphaned_shingle_elt,
-                                                               i);
-                                               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                                               RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
-                                                               pelt->value, pelt->number);
-                                       }
-                               }
-
-
-                               g_array_free (orphaned, TRUE);
-                       }
-
-                       ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
-
-                       if (ret == SQLITE_OK) {
-                               msg_info_fuzzy_backend (
-                                               "deleted %ud orphaned shingles",
-                                               orphaned_cnt);
-                       }
-                       else {
-                               msg_warn_fuzzy_backend (
-                                               "cannot synchronize fuzzy backend: %e",
-                                               err);
-                               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                                               RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
-                       }
-               }
-       }
-
-       return ret;
-}
-
-
-void
-rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend)
-{
-       if (backend != NULL) {
-               if (backend->db != NULL) {
-                       rspamd_fuzzy_backend_close_stmts (backend);
-                       sqlite3_close (backend->db);
-               }
-
-               if (backend->path != NULL) {
-                       g_free (backend->path);
-               }
-
-               if (backend->pool) {
-                       rspamd_mempool_delete (backend->pool);
-               }
-
-               g_slice_free1 (sizeof (*backend), backend);
-       }
-}
-
-
-gsize
-rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend)
-{
-       if (backend) {
-               if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                               RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
-                       backend->count = sqlite3_column_int64 (
-                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
-               }
-
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
-
-               return backend->count;
-       }
-
-       return 0;
-}
-
-gint
-rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
-               const gchar *source)
-{
-       gint ret = -1;
-
-       if (backend) {
-               if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                               RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
-                       ret = sqlite3_column_int64 (
-                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
-               }
-
-               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION);
-       }
-
-       return ret;
-}
-
-gsize
-rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend)
-{
-       return backend != NULL ? backend->expired : 0;
-}
-
-const gchar *
-rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend)
-{
-       return backend != NULL ? backend->id : 0;
-}
+#include "fuzzy_backend_sqlite.h"
index a1736b676b8b6b1792cb0e125bdb88d8dc26e77a..a075178f728066fe5b680de18f3083871234deaf 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef FUZZY_BACKEND_H_
-#define FUZZY_BACKEND_H_
+#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_
+#define SRC_LIBSERVER_FUZZY_BACKEND_H_
 
-#include "config.h"
-#include "fuzzy_storage.h"
+#include "fuzzy_wire.h"
 
-
-struct rspamd_fuzzy_backend;
-
-/**
- * Open fuzzy backend
- * @param path file to open (legacy file will be converted automatically)
- * @param err error pointer
- * @return backend structure or NULL
- */
-struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_open (const gchar *path,
-               gboolean vacuum,
-               GError **err);
-
-/**
- * Check specified fuzzy in the backend
- * @param backend
- * @param cmd
- * @return reply with probability and weight
- */
-struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
-               struct rspamd_fuzzy_backend *backend,
-               const struct rspamd_fuzzy_cmd *cmd,
-               gint64 expire);
-
-/**
- * Prepare storage for updates (by starting transaction)
- */
-gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
-               const gchar *source);
-
-/**
- * Add digest to the database
- * @param backend
- * @param cmd
- * @return
- */
-gboolean rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
-               const struct rspamd_fuzzy_cmd *cmd);
-
-/**
- * Delete digest from the database
- * @param backend
- * @param cmd
- * @return
- */
-gboolean rspamd_fuzzy_backend_del (
-               struct rspamd_fuzzy_backend *backend,
-               const struct rspamd_fuzzy_cmd *cmd);
-
-/**
- * Commit updates to storage
- */
-gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
-               const gchar *source, gboolean version_bump);
-
-/**
- * Sync storage
- * @param backend
- * @return
- */
-gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
-               gint64 expire,
-               gboolean clean_orphaned);
-
-/**
- * Close storage
- * @param backend
- */
-void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);
-
-gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend);
-gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source);
-gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend);
-
-const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
-
-#endif /* FUZZY_BACKEND_H_ */
+#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */
diff --git a/src/libserver/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend_sqlite.c
new file mode 100644 (file)
index 0000000..463fdd1
--- /dev/null
@@ -0,0 +1,1054 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamd.h"
+#include "fuzzy_backend.h"
+#include "unix-std.h"
+
+#include <sqlite3.h>
+#include "libutil/sqlite_utils.h"
+
+struct rspamd_fuzzy_backend {
+       sqlite3 *db;
+       char *path;
+       gchar id[MEMPOOL_UID_LEN];
+       gsize count;
+       gsize expired;
+       rspamd_mempool_t *pool;
+};
+
+static const gdouble sql_sleep_time = 0.1;
+static const guint max_retries = 10;
+
+#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+        backend->pool->tag.tagname, backend->pool->tag.uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_warn_fuzzy_backend(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+        backend->pool->tag.tagname, backend->pool->tag.uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_info_fuzzy_backend(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+        backend->pool->tag.tagname, backend->pool->tag.uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_debug_fuzzy_backend(...)  rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
+        backend->pool->tag.tagname, backend->pool->tag.uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+
+static const char *create_tables_sql =
+               "BEGIN;"
+               "CREATE TABLE IF NOT EXISTS digests("
+               "       id INTEGER PRIMARY KEY,"
+               "       flag INTEGER NOT NULL,"
+               "       digest TEXT NOT NULL,"
+               "       value INTEGER,"
+               "       time INTEGER);"
+               "CREATE TABLE IF NOT EXISTS shingles("
+               "       value INTEGER NOT NULL,"
+               "       number INTEGER NOT NULL,"
+               "       digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+               "       ON UPDATE CASCADE);"
+               "CREATE TABLE IF NOT EXISTS sources("
+               "       name TEXT UNIQUE,"
+               "       version INTEGER,"
+               "       last INTEGER);"
+               "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+               "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+               "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+               "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+               "COMMIT;";
+#if 0
+static const char *create_index_sql =
+               "BEGIN;"
+               "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+               "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+               "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+               "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+               "COMMIT;";
+#endif
+enum rspamd_fuzzy_statement_idx {
+       RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0,
+       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+       RSPAMD_FUZZY_BACKEND_INSERT,
+       RSPAMD_FUZZY_BACKEND_UPDATE,
+       RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+       RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+       RSPAMD_FUZZY_BACKEND_CHECK,
+       RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+       RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+       RSPAMD_FUZZY_BACKEND_DELETE,
+       RSPAMD_FUZZY_BACKEND_COUNT,
+       RSPAMD_FUZZY_BACKEND_EXPIRE,
+       RSPAMD_FUZZY_BACKEND_VACUUM,
+       RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+       RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+       RSPAMD_FUZZY_BACKEND_VERSION,
+       RSPAMD_FUZZY_BACKEND_SET_VERSION,
+       RSPAMD_FUZZY_BACKEND_MAX
+};
+static struct rspamd_fuzzy_stmts {
+       enum rspamd_fuzzy_statement_idx idx;
+       const gchar *sql;
+       const gchar *args;
+       sqlite3_stmt *stmt;
+       gint result;
+} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] =
+{
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
+               .sql = "BEGIN TRANSACTION;",
+               .args = "",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+               .sql = "COMMIT;",
+               .args = "",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+               .sql = "ROLLBACK;",
+               .args = "",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_INSERT,
+               .sql = "INSERT INTO digests(flag, digest, value, time) VALUES"
+                               "(?1, ?2, ?3, strftime('%s','now'));",
+               .args = "SDI",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_UPDATE,
+               .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE "
+                               "digest==?2;",
+               .args = "ID",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+               .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE "
+                               "digest==?3;",
+               .args = "IID",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+               .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) "
+                               "VALUES (?1, ?2, ?3);",
+               .args = "III",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_CHECK,
+               .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
+               .args = "D",
+               .stmt = NULL,
+               .result = SQLITE_ROW
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+               .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
+               .args = "IS",
+               .stmt = NULL,
+               .result = SQLITE_ROW
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+               .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1",
+               .args = "I",
+               .stmt = NULL,
+               .result = SQLITE_ROW
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_DELETE,
+               .sql = "DELETE FROM digests WHERE digest==?1;",
+               .args = "D",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_COUNT,
+               .sql = "SELECT COUNT(*) FROM digests;",
+               .args = "",
+               .stmt = NULL,
+               .result = SQLITE_ROW
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
+               .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
+               .args = "II",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_VACUUM,
+               .sql = "VACUUM;",
+               .args = "",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+               .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;",
+               .args = "II",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+               .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+               .args = "TII",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_VERSION,
+               .sql = "SELECT version FROM sources WHERE name=?1;",
+               .args = "T",
+               .stmt = NULL,
+               .result = SQLITE_ROW
+       },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+               .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);",
+               .args = "IIT",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
+};
+
+static GQuark
+rspamd_fuzzy_backend_quark(void)
+{
+       return g_quark_from_static_string ("fuzzy-storage-backend");
+}
+
+static gboolean
+rspamd_fuzzy_backend_prepare_stmts (struct rspamd_fuzzy_backend *bk, GError **err)
+{
+       int i;
+
+       for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) {
+               if (prepared_stmts[i].stmt != NULL) {
+                       /* Skip already prepared statements */
+                       continue;
+               }
+               if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1,
+                               &prepared_stmts[i].stmt, NULL) != SQLITE_OK) {
+                       g_set_error (err, rspamd_fuzzy_backend_quark (),
+                               -1, "Cannot initialize prepared sql `%s`: %s",
+                               prepared_stmts[i].sql, sqlite3_errmsg (bk->db));
+
+                       return FALSE;
+               }
+       }
+
+       return TRUE;
+}
+
+static int
+rspamd_fuzzy_backend_cleanup_stmt (struct rspamd_fuzzy_backend *backend,
+               int idx)
+{
+       sqlite3_stmt *stmt;
+
+       if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+               return -1;
+       }
+
+       msg_debug_fuzzy_backend ("reseting `%s`", prepared_stmts[idx].sql);
+       stmt = prepared_stmts[idx].stmt;
+       sqlite3_clear_bindings (stmt);
+       sqlite3_reset (stmt);
+
+       return SQLITE_OK;
+}
+
+static int
+rspamd_fuzzy_backend_run_stmt (struct rspamd_fuzzy_backend *backend,
+               gboolean auto_cleanup,
+               int idx, ...)
+{
+       int retcode;
+       va_list ap;
+       sqlite3_stmt *stmt;
+       int i;
+       const char *argtypes;
+       guint retries = 0;
+       struct timespec ts;
+
+       if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+               return -1;
+       }
+
+       stmt = prepared_stmts[idx].stmt;
+       g_assert ((int)prepared_stmts[idx].idx == idx);
+
+       if (stmt == NULL) {
+               if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1,
+                               &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) {
+                       msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s",
+                                       prepared_stmts[idx].sql, sqlite3_errmsg (backend->db));
+
+                       return retcode;
+               }
+               stmt = prepared_stmts[idx].stmt;
+       }
+
+       msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup",
+                       prepared_stmts[idx].sql, auto_cleanup ? "with" : "without");
+       argtypes = prepared_stmts[idx].args;
+       sqlite3_clear_bindings (stmt);
+       sqlite3_reset (stmt);
+       va_start (ap, idx);
+
+       for (i = 0; argtypes[i] != '\0'; i++) {
+               switch (argtypes[i]) {
+               case 'T':
+                       sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1,
+                                       SQLITE_STATIC);
+                       break;
+               case 'I':
+                       sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64));
+                       break;
+               case 'S':
+                       sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint));
+                       break;
+               case 'D':
+                       /* Special case for digests variable */
+                       sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64,
+                                       SQLITE_STATIC);
+                       break;
+               }
+       }
+
+       va_end (ap);
+
+retry:
+       retcode = sqlite3_step (stmt);
+
+       if (retcode == prepared_stmts[idx].result) {
+               retcode = SQLITE_OK;
+       }
+       else {
+               if ((retcode == SQLITE_BUSY ||
+                               retcode == SQLITE_LOCKED) && retries++ < max_retries) {
+                       double_to_ts (sql_sleep_time, &ts);
+                       nanosleep (&ts, NULL);
+                       goto retry;
+               }
+
+               msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql,
+                               retcode, sqlite3_errmsg (backend->db));
+       }
+
+       if (auto_cleanup) {
+               sqlite3_clear_bindings (stmt);
+               sqlite3_reset (stmt);
+       }
+
+       return retcode;
+}
+
+static void
+rspamd_fuzzy_backend_close_stmts (struct rspamd_fuzzy_backend *bk)
+{
+       int i;
+
+       for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
+               if (prepared_stmts[i].stmt != NULL) {
+                       sqlite3_finalize (prepared_stmts[i].stmt);
+                       prepared_stmts[i].stmt = NULL;
+               }
+       }
+
+       return;
+}
+
+static gboolean
+rspamd_fuzzy_backend_run_sql (const gchar *sql, struct rspamd_fuzzy_backend *bk,
+               GError **err)
+{
+       guint retries = 0;
+       struct timespec ts;
+       gint ret;
+
+       do {
+               ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL);
+               double_to_ts (sql_sleep_time, &ts);
+       } while (ret == SQLITE_BUSY && retries++ < max_retries &&
+                       nanosleep (&ts, NULL) == 0);
+
+       if (ret != SQLITE_OK) {
+               g_set_error (err, rspamd_fuzzy_backend_quark (),
+                               -1, "Cannot execute raw sql `%s`: %s",
+                               sql, sqlite3_errmsg (bk->db));
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+static struct rspamd_fuzzy_backend *
+rspamd_fuzzy_backend_open_db (const gchar *path, GError **err)
+{
+       struct rspamd_fuzzy_backend *bk;
+       rspamd_cryptobox_hash_state_t st;
+       guchar hash_out[rspamd_cryptobox_HASHBYTES];
+
+       g_assert (path != NULL);
+
+       bk = g_slice_alloc (sizeof (*bk));
+       bk->path = g_strdup (path);
+       bk->expired = 0;
+       bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend");
+       bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path,
+                       create_tables_sql, 1, err);
+
+       if (bk->db == NULL) {
+               rspamd_fuzzy_backend_close (bk);
+
+               return NULL;
+       }
+
+       if (!rspamd_fuzzy_backend_prepare_stmts (bk, err)) {
+               rspamd_fuzzy_backend_close (bk);
+
+               return NULL;
+       }
+
+       /* Set id for the backend */
+       rspamd_cryptobox_hash_init (&st, NULL, 0);
+       rspamd_cryptobox_hash_update (&st, path, strlen (path));
+       rspamd_cryptobox_hash_final (&st, hash_out);
+       rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out);
+       memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid));
+
+       return bk;
+}
+
+struct rspamd_fuzzy_backend *
+rspamd_fuzzy_backend_open (const gchar *path,
+               gboolean vacuum,
+               GError **err)
+{
+       struct rspamd_fuzzy_backend *backend;
+
+       if (path == NULL) {
+               g_set_error (err, rspamd_fuzzy_backend_quark (),
+                               ENOENT, "Path has not been specified");
+               return NULL;
+       }
+
+       /* Open database */
+       if ((backend = rspamd_fuzzy_backend_open_db (path, err)) == NULL) {
+               return NULL;
+       }
+
+       if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT)
+                       == SQLITE_OK) {
+               backend->count = sqlite3_column_int64 (
+                               prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+       }
+
+       rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+       return backend;
+}
+
+static gint
+rspamd_fuzzy_backend_int64_cmp (const void *a, const void *b)
+{
+       gint64 ia = *(gint64 *)a, ib = *(gint64 *)b;
+
+       return (ia - ib);
+}
+
+struct rspamd_fuzzy_reply
+rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
+               const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
+{
+       struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0};
+       const struct rspamd_fuzzy_shingle_cmd *shcmd;
+       int rc;
+       gint64 timestamp;
+       gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
+               cur_cnt, max_cnt;
+
+       if (backend == NULL) {
+               return rep;
+       }
+
+       /* Try direct match first of all */
+       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                       RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                       RSPAMD_FUZZY_BACKEND_CHECK,
+                       cmd->digest);
+
+       if (rc == SQLITE_OK) {
+               timestamp = sqlite3_column_int64 (
+                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
+               if (time (NULL) - timestamp > expire) {
+                       /* Expire element */
+                       msg_debug_fuzzy_backend ("requested hash has been expired");
+               }
+               else {
+                       rep.value = sqlite3_column_int64 (
+                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
+                       rep.prob = 1.0;
+                       rep.flag = sqlite3_column_int (
+                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
+               }
+       }
+       else if (cmd->shingles_count > 0) {
+               /* Fuzzy match */
+
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+               shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd;
+
+               for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+                       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                                       RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+                                       shcmd->sgl.hashes[i], i);
+                       if (rc == SQLITE_OK) {
+                               shingle_values[i] = sqlite3_column_int64 (
+                                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt,
+                                               0);
+                       }
+                       else {
+                               shingle_values[i] = -1;
+                       }
+                       msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i,
+                                       shcmd->sgl.hashes[i], rc);
+               }
+
+               rspamd_fuzzy_backend_cleanup_stmt (backend,
+                               RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE);
+
+               qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64),
+                               rspamd_fuzzy_backend_int64_cmp);
+               sel_id = -1;
+               cur_id = -1;
+               cur_cnt = 0;
+               max_cnt = 0;
+
+               for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+                       if (shingle_values[i] == -1) {
+                               continue;
+                       }
+
+                       /* We have some value here, so we need to check it */
+                       if (shingle_values[i] == cur_id) {
+                               cur_cnt ++;
+                       }
+                       else {
+                               cur_id = shingle_values[i];
+                               if (cur_cnt >= max_cnt) {
+                                       max_cnt = cur_cnt;
+                                       sel_id = cur_id;
+                               }
+                               cur_cnt = 0;
+                       }
+               }
+
+               if (cur_cnt > max_cnt) {
+                       max_cnt = cur_cnt;
+               }
+
+               if (sel_id != -1) {
+                       /* We have some id selected here */
+                       rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;
+
+                       if (rep.prob > 0.5) {
+                               msg_debug_fuzzy_backend (
+                                               "found fuzzy hash with probability %.2f",
+                                               rep.prob);
+                               rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                                               RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
+                               if (rc == SQLITE_OK) {
+                                       timestamp = sqlite3_column_int64 (
+                                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+                                                       2);
+                                       if (time (NULL) - timestamp > expire) {
+                                               /* Expire element */
+                                               msg_debug_fuzzy_backend (
+                                                               "requested hash has been expired");
+                                               rep.prob = 0.0;
+                                       }
+                                       else {
+                                               rep.value = sqlite3_column_int64 (
+                                                               prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+                                                               1);
+                                               rep.flag = sqlite3_column_int (
+                                                               prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+                                                               3);
+                                       }
+                               }
+                       }
+                       else {
+                               /* Otherwise we assume that as error */
+                               rep.value = 0;
+                       }
+
+                       rspamd_fuzzy_backend_cleanup_stmt (backend,
+                                       RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID);
+               }
+       }
+
+       rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+       return rep;
+}
+
+gboolean
+rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source)
+{
+       gint rc;
+
+       if (backend == NULL) {
+               return FALSE;
+       }
+
+       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                       RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+       if (rc != SQLITE_OK) {
+               msg_warn_fuzzy_backend ("cannot start transaction for updates: %s",
+                               sqlite3_errmsg (backend->db));
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
+               const struct rspamd_fuzzy_cmd *cmd)
+{
+       int rc, i;
+       gint64 id, flag;
+       const struct rspamd_fuzzy_shingle_cmd *shcmd;
+
+       if (backend == NULL) {
+               return FALSE;
+       }
+
+       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                       RSPAMD_FUZZY_BACKEND_CHECK,
+                       cmd->digest);
+
+       if (rc == SQLITE_OK) {
+               /* Check flag */
+               flag = sqlite3_column_int64 (
+                               prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt,
+                               2);
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+               if (flag == cmd->flag) {
+                       /* We need to increase weight */
+                       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                       RSPAMD_FUZZY_BACKEND_UPDATE,
+                                       (gint64) cmd->value,
+                                       cmd->digest);
+                       if (rc != SQLITE_OK) {
+                               msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+                                               "%*xs: %s", (gint) cmd->flag,
+                                               (gint) sizeof (cmd->digest), cmd->digest,
+                                               sqlite3_errmsg (backend->db));
+                       }
+               }
+               else {
+                       /* We need to relearn actually */
+
+                       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                       RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+                                       (gint64) cmd->value,
+                                       (gint64) cmd->flag,
+                                       cmd->digest);
+
+                       if (rc != SQLITE_OK) {
+                               msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+                                               "%*xs: %s", (gint) cmd->flag,
+                                               (gint) sizeof (cmd->digest), cmd->digest,
+                                               sqlite3_errmsg (backend->db));
+                       }
+               }
+       }
+       else {
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+               rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                               RSPAMD_FUZZY_BACKEND_INSERT,
+                               (gint) cmd->flag,
+                               cmd->digest,
+                               (gint64) cmd->value);
+
+               if (rc == SQLITE_OK) {
+                       if (cmd->shingles_count > 0) {
+                               id = sqlite3_last_insert_rowid (backend->db);
+                               shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
+
+                               for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+                                       rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                                       RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+                                                       shcmd->sgl.hashes[i], (gint64)i, id);
+                                       msg_debug_fuzzy_backend ("add shingle %d -> %L: %L",
+                                                       i,
+                                                       shcmd->sgl.hashes[i],
+                                                       id);
+
+                                       if (rc != SQLITE_OK) {
+                                               msg_warn_fuzzy_backend ("cannot add shingle %d -> "
+                                                               "%L: %L: %s", i,
+                                                               shcmd->sgl.hashes[i],
+                                                               id, sqlite3_errmsg (backend->db));
+                                       }
+                               }
+                       }
+               }
+               else {
+                       msg_warn_fuzzy_backend ("cannot add hash to %d -> "
+                                       "%*xs: %s", (gint)cmd->flag,
+                                       (gint)sizeof (cmd->digest), cmd->digest,
+                                       sqlite3_errmsg (backend->db));
+               }
+
+               rspamd_fuzzy_backend_cleanup_stmt (backend,
+                               RSPAMD_FUZZY_BACKEND_INSERT);
+       }
+
+       return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source, gboolean version_bump)
+{
+       gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver;
+
+       /* Get and update version */
+       if (version_bump) {
+               ver = rspamd_fuzzy_backend_version (backend, source);
+               ++ver;
+
+               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_SET_VERSION,
+                               (gint64)ver, (gint64)time (NULL), source);
+       }
+
+       if (rc == SQLITE_OK) {
+               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+               if (rc != SQLITE_OK) {
+                       msg_warn_fuzzy_backend ("cannot commit updates: %s",
+                                       sqlite3_errmsg (backend->db));
+                       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+                       return FALSE;
+               }
+               else {
+                       if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
+                               msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+                                               sqlite3_errmsg (backend->db));
+                       }
+                       else if (wal_checkpointed > 0) {
+                               msg_info_fuzzy_backend ("total number of frames in the wal file: "
+                                               "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+                       }
+               }
+       }
+       else {
+               msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
+                               sqlite3_errmsg (backend->db));
+               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
+               const struct rspamd_fuzzy_cmd *cmd)
+{
+       int rc = -1;
+
+       if (backend == NULL) {
+               return FALSE;
+       }
+
+       rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                       RSPAMD_FUZZY_BACKEND_CHECK,
+                       cmd->digest);
+
+       if (rc == SQLITE_OK) {
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_DELETE,
+                               cmd->digest);
+               if (rc != SQLITE_OK) {
+                       msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+                                       "%*xs: %s", (gint) cmd->flag,
+                                       (gint) sizeof (cmd->digest), cmd->digest,
+                                       sqlite3_errmsg (backend->db));
+               }
+       }
+       else {
+               /* Hash is missing */
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+       }
+
+       return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
+               gint64 expire,
+               gboolean clean_orphaned)
+{
+       struct orphaned_shingle_elt {
+               gint64 value;
+               gint64 number;
+       };
+
+       /* Do not do more than 5k ops per step */
+       const guint64 max_changes = 5000;
+       gboolean ret = FALSE;
+       gint64 expire_lim, expired;
+       gint rc, i, orphaned_cnt = 0;
+       GError *err = NULL;
+       static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
+                       "FROM shingles "
+                       "LEFT JOIN digests ON "
+                       "shingles.digest_id=digests.id WHERE "
+                       "digests.id IS NULL;";
+       sqlite3_stmt *stmt;
+       GArray *orphaned;
+       struct orphaned_shingle_elt orphaned_elt, *pelt;
+
+
+       if (backend == NULL) {
+               return FALSE;
+       }
+
+       /* Perform expire */
+       if (expire > 0) {
+               expire_lim = time (NULL) - expire;
+
+               if (expire_lim > 0) {
+                       ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+                       if (ret == SQLITE_OK) {
+
+                               rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                                               RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);
+
+                               if (rc == SQLITE_OK) {
+                                       expired = sqlite3_changes (backend->db);
+
+                                       if (expired > 0) {
+                                               backend->expired += expired;
+                                               msg_info_fuzzy_backend ("expired %L hashes", expired);
+                                       }
+                               }
+                               else {
+                                       msg_warn_fuzzy_backend (
+                                                       "cannot execute expired statement: %s",
+                                                       sqlite3_errmsg (backend->db));
+                               }
+
+                               rspamd_fuzzy_backend_cleanup_stmt (backend,
+                                               RSPAMD_FUZZY_BACKEND_EXPIRE);
+
+                               ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                               RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+                               if (ret != SQLITE_OK) {
+                                       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+                               }
+                       }
+                       if (ret != SQLITE_OK) {
+                               msg_warn_fuzzy_backend ("cannot expire db: %s",
+                                               sqlite3_errmsg (backend->db));
+                       }
+               }
+       }
+
+       /* Cleanup database */
+       if (clean_orphaned) {
+               ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+               if (ret == SQLITE_OK) {
+                       if ((rc = sqlite3_prepare_v2 (backend->db,
+                                       orphaned_shingles,
+                                       -1,
+                                       &stmt,
+                                       NULL)) != SQLITE_OK) {
+                               msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
+                                               sqlite3_errmsg (backend->db));
+                       }
+                       else {
+                               orphaned = g_array_new (FALSE,
+                                               FALSE,
+                                               sizeof (struct orphaned_shingle_elt));
+
+                               while (sqlite3_step (stmt) == SQLITE_ROW) {
+                                       orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
+                                       orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
+                                       g_array_append_val (orphaned, orphaned_elt);
+
+                                       if (orphaned->len > max_changes) {
+                                               break;
+                                       }
+                               }
+
+                               sqlite3_finalize (stmt);
+                               orphaned_cnt = orphaned->len;
+
+                               if (orphaned_cnt > 0) {
+                                       msg_info_fuzzy_backend (
+                                                       "going to delete %ud orphaned shingles",
+                                                       orphaned_cnt);
+                                       /* Need to delete orphaned elements */
+                                       for (i = 0; i < (gint) orphaned_cnt; i++) {
+                                               pelt = &g_array_index (orphaned,
+                                                               struct orphaned_shingle_elt,
+                                                               i);
+                                               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                                               RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+                                                               pelt->value, pelt->number);
+                                       }
+                               }
+
+
+                               g_array_free (orphaned, TRUE);
+                       }
+
+                       ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+                       if (ret == SQLITE_OK) {
+                               msg_info_fuzzy_backend (
+                                               "deleted %ud orphaned shingles",
+                                               orphaned_cnt);
+                       }
+                       else {
+                               msg_warn_fuzzy_backend (
+                                               "cannot synchronize fuzzy backend: %e",
+                                               err);
+                               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                               RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+                       }
+               }
+       }
+
+       return ret;
+}
+
+
+void
+rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend)
+{
+       if (backend != NULL) {
+               if (backend->db != NULL) {
+                       rspamd_fuzzy_backend_close_stmts (backend);
+                       sqlite3_close (backend->db);
+               }
+
+               if (backend->path != NULL) {
+                       g_free (backend->path);
+               }
+
+               if (backend->pool) {
+                       rspamd_mempool_delete (backend->pool);
+               }
+
+               g_slice_free1 (sizeof (*backend), backend);
+       }
+}
+
+
+gsize
+rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend)
+{
+       if (backend) {
+               if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                               RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
+                       backend->count = sqlite3_column_int64 (
+                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+               }
+
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+               return backend->count;
+       }
+
+       return 0;
+}
+
+gint
+rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
+               const gchar *source)
+{
+       gint ret = -1;
+
+       if (backend) {
+               if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
+                               RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
+                       ret = sqlite3_column_int64 (
+                                       prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
+               }
+
+               rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION);
+       }
+
+       return ret;
+}
+
+gsize
+rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend)
+{
+       return backend != NULL ? backend->expired : 0;
+}
+
+const gchar *
+rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend)
+{
+       return backend != NULL ? backend->id : 0;
+}
diff --git a/src/libserver/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend_sqlite.h
new file mode 100644 (file)
index 0000000..dd8a4d0
--- /dev/null
@@ -0,0 +1,98 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FUZZY_BACKEND_H_
+#define FUZZY_BACKEND_H_
+
+#include "config.h"
+#include "fuzzy_wire.h"
+
+
+struct rspamd_fuzzy_backend;
+
+/**
+ * Open fuzzy backend
+ * @param path file to open (legacy file will be converted automatically)
+ * @param err error pointer
+ * @return backend structure or NULL
+ */
+struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_open (const gchar *path,
+               gboolean vacuum,
+               GError **err);
+
+/**
+ * Check specified fuzzy in the backend
+ * @param backend
+ * @param cmd
+ * @return reply with probability and weight
+ */
+struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
+               struct rspamd_fuzzy_backend *backend,
+               const struct rspamd_fuzzy_cmd *cmd,
+               gint64 expire);
+
+/**
+ * Prepare storage for updates (by starting transaction)
+ */
+gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source);
+
+/**
+ * Add digest to the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
+               const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Delete digest from the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_del (
+               struct rspamd_fuzzy_backend *backend,
+               const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Commit updates to storage
+ */
+gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source, gboolean version_bump);
+
+/**
+ * Sync storage
+ * @param backend
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
+               gint64 expire,
+               gboolean clean_orphaned);
+
+/**
+ * Close storage
+ * @param backend
+ */
+void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);
+
+gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend);
+gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source);
+gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend);
+
+const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
+
+#endif /* FUZZY_BACKEND_H_ */
index 9952c26d95d3c463e4152fbe99bc297003f5fa19..bc04b753b3bb0dc668a4f214cedfbd2329affbf2 100644 (file)
@@ -35,7 +35,7 @@
 #include "libutil/map.h"
 #include "libmime/images.h"
 #include "libserver/worker_util.h"
-#include "fuzzy_storage.h"
+#include "fuzzy_wire.h"
 #include "utlist.h"
 #include "cryptobox.h"
 #include "ottery.h"