]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Add finalize process operation.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 9 Jul 2015 23:36:03 +0000 (00:36 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 9 Jul 2015 23:36:03 +0000 (00:36 +0100)
We could use sqlite transaction for reading as well.

src/libstat/backends/backends.h
src/libstat/backends/mmaped_file.c
src/libstat/backends/sqlite3_backend.c
src/libstat/stat_config.c
src/libstat/stat_process.c

index e40bb6c9ccb900a48eed59c94c84072c78bbcb27..55d3535f7ff8bfe50cf97a7c1002530529145e2c 100644 (file)
@@ -47,6 +47,8 @@ struct rspamd_stat_backend {
                        struct rspamd_statfile_config *stcf, gboolean learn, gpointer ctx);
        gboolean (*process_token)(struct rspamd_task *task, struct token_node_s *tok,
                        struct rspamd_token_result *res, gpointer ctx);
+       void (*finalize_process)(struct rspamd_task *task,
+                       gpointer runtime, gpointer ctx);
        gboolean (*learn_token)(struct rspamd_task *task, struct token_node_s *tok,
                        struct rspamd_token_result *res, gpointer ctx);
        gulong (*total_learns)(struct rspamd_task *task,
@@ -71,6 +73,9 @@ struct rspamd_stat_backend {
                                struct token_node_s *tok, \
                                struct rspamd_token_result *res, \
                                gpointer ctx); \
+               void rspamd_##name##_finalize_process (struct rspamd_task *task, \
+                               gpointer runtime, \
+                               gpointer ctx); \
                gboolean rspamd_##name##_learn_token (struct rspamd_task *task, \
                                struct token_node_s *tok, \
                                struct rspamd_token_result *res, \
index b7e5650d3e30e36e38c9741e65f8c6f9a94dfc6a..fb3c4fd43cbc968900c0e9a51e58b7c9271cb0be 100644 (file)
@@ -1089,3 +1089,9 @@ rspamd_mmaped_file_finalize_learn (struct rspamd_task *task, gpointer runtime,
                msync (mf->map, mf->len, MS_INVALIDATE | MS_ASYNC);
        }
 }
+
+void
+rspamd_mmaped_file_finalize_process (struct rspamd_task *task, gpointer runtime,
+               gpointer ctx)
+{
+}
index 31b593c6480da2546f8ddcc1c33592fb700d07c6..836886e1f7c3d615a8f9103ed4c2ec7ec2b5b117 100644 (file)
@@ -81,7 +81,8 @@ static const char *create_tables_sql =
                "COMMIT;";
 
 enum rspamd_stat_sqlite3_stmt_idx {
-       RSPAMD_STAT_BACKEND_TRANSACTION_START = 0,
+       RSPAMD_STAT_BACKEND_TRANSACTION_START_IM = 0,
+       RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF,
        RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT,
        RSPAMD_STAT_BACKEND_TRANSACTION_ROLLBACK,
        RSPAMD_STAT_BACKEND_GET_TOKEN,
@@ -102,13 +103,21 @@ static struct rspamd_sqlite3_prstmt {
 } prepared_stmts[RSPAMD_STAT_BACKEND_MAX] =
 {
        {
-               .idx = RSPAMD_STAT_BACKEND_TRANSACTION_START,
+               .idx = RSPAMD_STAT_BACKEND_TRANSACTION_START_IM,
                .sql = "BEGIN IMMEDIATE TRANSACTION;",
                .args = "",
                .stmt = NULL,
                .result = SQLITE_DONE,
                .ret = ""
        },
+       {
+               .idx = RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF,
+               .sql = "BEGIN DEFERRED TRANSACTION;",
+               .args = "",
+               .stmt = NULL,
+               .result = SQLITE_DONE,
+               .ret = ""
+       },
        {
                .idx = RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT,
                .sql = "COMMIT;",
@@ -313,6 +322,39 @@ rspamd_sqlite3_close_prstmt (struct rspamd_stat_sqlite3_db *db)
        return;
 }
 
+static gboolean
+rspamd_sqlite3_wait (const gchar *lock)
+{
+       gint fd;
+       struct timespec sleep_ts = {
+               .tv_sec = 0,
+               .tv_nsec = 1000000
+       };
+
+       fd = open (lock, O_RDONLY);
+
+       if (fd == -1) {
+               msg_err ("cannot open lock file %s: %s", lock, strerror (errno));
+
+               return FALSE;
+       }
+
+       while (!rspamd_file_lock (fd, TRUE)) {
+               if (nanosleep (&sleep_ts, NULL) == -1 && errno != EINTR) {
+                       close (fd);
+                       msg_err ("cannot sleep open lock file %s: %s", lock, strerror (errno));
+
+                       return FALSE;
+               }
+       }
+
+       rspamd_file_unlock (fd, FALSE);
+
+       close (fd);
+
+       return TRUE;
+}
+
 static struct rspamd_stat_sqlite3_db *
 rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts,
                gboolean create, GError **err)
@@ -320,15 +362,35 @@ rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts,
        struct rspamd_stat_sqlite3_db *bk;
        sqlite3 *sqlite;
        sqlite3_stmt *stmt;
-       gint rc, flags;
-       static const char sqlite_wal[] = "PRAGMA journal_mode=WAL;",
-                       fallback_journal[] = "PRAGMA journal_mode=OFF;",
+       gint rc, flags, lock_fd;
+       gchar lock_path[PATH_MAX];
+       static const char sqlite_wal[] = "PRAGMA journal_mode=\"wal\";",
+                       fallback_journal[] = "PRAGMA journal_mode=\"off\";",
                        user_version[] = "PRAGMA user_version;";
 
        flags = SQLITE_OPEN_READWRITE;
 
        if (create) {
                flags |= SQLITE_OPEN_CREATE;
+
+               rspamd_snprintf (lock_path, sizeof (lock_path), "%s.lock", path);
+               lock_fd = open (lock_path, O_WRONLY|O_CREAT|O_EXCL, 00600);
+
+               if (lock_fd == -1 && (errno == EEXIST || errno == EBUSY)) {
+                       if (!rspamd_sqlite3_wait (lock_path)) {
+                               g_set_error (err, rspamd_sqlite3_quark (),
+                                               errno, "cannot create sqlite file %s: %s",
+                                               path, strerror (errno));
+
+                               return NULL;
+                       }
+
+                       /* At this point we have database created */
+                       create = FALSE;
+               }
+               else {
+                       g_assert (rspamd_file_lock (lock_fd, FALSE));
+               }
        }
        else if (access (path, R_OK) == -1) {
                g_set_error (err, rspamd_sqlite3_quark (),
@@ -378,9 +440,16 @@ rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts,
                                        -1, "cannot execute create sql `%s`: %s",
                                        create_tables_sql, sqlite3_errmsg (sqlite));
                        sqlite3_close (sqlite);
+                       rspamd_file_unlock (lock_fd, FALSE);
+                       unlink (lock_path);
+                       close (lock_fd);
 
                        return NULL;
                }
+
+               rspamd_file_unlock (lock_fd, FALSE);
+               unlink (lock_path);
+               close (lock_fd);
        }
 
        bk = g_slice_alloc0 (sizeof (*bk));
@@ -538,6 +607,11 @@ rspamd_sqlite3_process_token (struct rspamd_task *task, struct token_node_s *tok
                return FALSE;
        }
 
+       if (!bk->in_transaction) {
+               rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF);
+               bk->in_transaction = TRUE;
+       }
+
        memcpy (&idx, tok->data, sizeof (idx));
 
        /* TODO: language and user support */
@@ -559,6 +633,24 @@ rspamd_sqlite3_process_token (struct rspamd_task *task, struct token_node_s *tok
        return TRUE;
 }
 
+void
+rspamd_sqlite3_finalize_process (struct rspamd_task *task, gpointer runtime,
+               gpointer ctx)
+{
+       struct rspamd_stat_sqlite3_rt *rt = runtime;
+       struct rspamd_stat_sqlite3_db *bk;
+
+       g_assert (rt != NULL);
+       bk = rt->db;
+
+       if (bk->in_transaction) {
+               rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT);
+               bk->in_transaction = FALSE;
+       }
+
+       return;
+}
+
 gboolean
 rspamd_sqlite3_learn_token (struct rspamd_task *task, struct token_node_s *tok,
                struct rspamd_token_result *res, gpointer p)
@@ -582,7 +674,7 @@ rspamd_sqlite3_learn_token (struct rspamd_task *task, struct token_node_s *tok,
        }
 
        if (!bk->in_transaction) {
-               rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START);
+               rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START_IM);
                bk->in_transaction = TRUE;
        }
 
index c75b02fe0543a489d37d56970a2df290e109cdf7..6f27d1c10c09fcdc1b8f846428f771c84772b89b 100644 (file)
@@ -60,6 +60,7 @@ static struct rspamd_stat_tokenizer stat_tokenizers[] = {
                .init = rspamd_##eltn##_init, \
                .runtime = rspamd_##eltn##_runtime, \
                .process_token = rspamd_##eltn##_process_token, \
+               .finalize_process = rspamd_##eltn##_finalize_process, \
                .learn_token = rspamd_##eltn##_learn_token, \
                .finalize_learn = rspamd_##eltn##_finalize_learn, \
                .total_learns = rspamd_##eltn##_total_learns, \
index 978264ac49cf7765612a5a194d9266dd0701c9f0..9c261eccd750ca14fab800e110740191d97fce77 100644 (file)
@@ -356,11 +356,12 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, GError **err)
        struct rspamd_stat_classifier *cls;
        struct rspamd_classifier_config *clcf;
        struct rspamd_stat_ctx *st_ctx;
+       struct rspamd_statfile_runtime *st_run;
        struct rspamd_tokenizer_runtime *tklist = NULL, *tok;
        struct rspamd_classifier_runtime *cl_run;
        struct classifier_ctx *cl_ctx;
        GList *cl_runtimes;
-       GList *cur;
+       GList *cur, *curst;
        gboolean ret = RSPAMD_STAT_PROCESS_ERROR, compat = TRUE;
        const ucl_object_t *obj;
 
@@ -440,6 +441,16 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, GError **err)
                        }
                }
 
+               curst = cl_run->st_runtime;
+
+               while (curst) {
+                       st_run = curst->data;
+                       st_run->backend->finalize_learn (task,
+                                       st_run->backend_runtime,
+                                       st_run->backend->ctx);
+                       curst = g_list_next (curst);
+               }
+
                cur = g_list_next (cur);
        }