]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Write initialization for redis cache
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 11 Jan 2016 09:54:59 +0000 (09:54 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 11 Jan 2016 10:40:42 +0000 (10:40 +0000)
src/libstat/learn_cache/learn_cache.h
src/libstat/learn_cache/redis_cache.c
src/libstat/learn_cache/sqlite3_cache.c
src/libstat/stat_process.c

index 1ebe2864afaaf74dfe76d513b987c1106b2d5602..263252695f43c908fa9721600f2631ff47b5068a 100644 (file)
@@ -42,7 +42,7 @@ struct rspamd_stat_cache {
                        struct rspamd_statfile *st,
                        const ucl_object_t *cf);
        gpointer (*runtime)(struct rspamd_task *task,
-                       gpointer ctx);
+                       gpointer ctx, gboolean learn);
        gint (*check)(struct rspamd_task *task,
                        gboolean is_spam,
                        gpointer runtime,
@@ -61,7 +61,7 @@ struct rspamd_stat_cache {
                                struct rspamd_statfile *st, \
                                const ucl_object_t *cf); \
                gpointer rspamd_stat_cache_##name##_runtime (struct rspamd_task *task, \
-                               gpointer ctx); \
+                               gpointer ctx, gboolean learn); \
                gint rspamd_stat_cache_##name##_check (struct rspamd_task *task, \
                                gboolean is_spam, \
                                gpointer runtime, \
index de88936be1b22173940745a26ad20ba0e1d69dff..56c651fc978bac9065a1521e4e2db7184f98c878 100644 (file)
 #include "hiredis/hiredis.h"
 #include "hiredis/adapters/libevent.h"
 
+#define REDIS_DEFAULT_TIMEOUT 0.5
+#define REDIS_STAT_TIMEOUT 30
+#define REDIS_DEFAULT_PORT 6379
+#define DEFAULT_REDIS_KEY "learned_ids"
+
+struct rspamd_redis_cache_ctx {
+       struct rspamd_statfile_config *stcf;
+       struct upstream_list *read_servers;
+       struct upstream_list *write_servers;
+       const gchar *redis_object;
+       gdouble timeout;
+};
+
+struct rspamd_redis_cache_runtime {
+       struct rspamd_redis_cache_ctx *ctx;
+       struct rspamd_task *task;
+       struct upstream *selected;
+       struct event timeout_event;
+       redisAsyncContext *redis;
+};
+
+static GQuark
+rspamd_stat_cache_redis_quark (void)
+{
+       return g_quark_from_static_string ("redis-statistics");
+}
+
+/* Called on connection termination */
+static void
+rspamd_redis_cache_fin (gpointer data)
+{
+       struct rspamd_redis_cache_runtime *rt = data;
+
+       event_del (&rt->timeout_event);
+       redisAsyncFree (rt->redis);
+}
+
+static void
+rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
+{
+       struct rspamd_redis_cache_runtime *rt = d;
+       struct rspamd_task *task;
+
+       task = rt->task;
+
+       msg_err_task ("connection to redis server %s timed out",
+                       rspamd_upstream_name (rt->selected));
+       rspamd_upstream_fail (rt->selected);
+       rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
+}
+
+static void
+rspamd_stat_cache_redis_generate_id (struct rspamd_task *task)
+{
+       rspamd_cryptobox_hash_state_t st;
+       rspamd_token_t *tok;
+       guint i;
+       guchar out[rspamd_cryptobox_HASHBYTES];
+       gchar *b32out;
+
+       rspamd_cryptobox_hash_init (&st, NULL, 0);
+
+       for (i = 0; i < task->tokens->len; i ++) {
+               tok = g_ptr_array_index (task->tokens, i);
+               rspamd_cryptobox_hash_update (&st, tok->data, tok->datalen);
+       }
+
+       rspamd_cryptobox_hash_final (&st, out);
+
+       b32out = rspamd_encode_base32 (out, sizeof (out));
+       g_assert (b32out != NULL);
+       rspamd_mempool_set_variable (task->task_pool, "words_hash", b32out, g_free);
+}
+
 gpointer
 rspamd_stat_cache_redis_init (struct rspamd_stat_ctx *ctx,
                struct rspamd_config *cfg,
                struct rspamd_statfile *st,
                const ucl_object_t *cf)
 {
-       return NULL;
+       struct rspamd_redis_cache_ctx *cache_ctx;
+       struct rspamd_statfile_config *stf = st->stcf;
+       const ucl_object_t *elt;
+
+       cache_ctx = g_slice_alloc0 (sizeof (*cache_ctx));
+
+       elt = ucl_object_find_key (stf->opts, "read_servers");
+       if (elt == NULL) {
+               elt = ucl_object_find_key (stf->opts, "servers");
+       }
+       if (elt == NULL) {
+               msg_err ("statfile %s has no redis servers", stf->symbol);
+
+               return NULL;
+       }
+       else {
+               cache_ctx->read_servers = rspamd_upstreams_create (cfg->ups_ctx);
+               if (!rspamd_upstreams_from_ucl (cache_ctx->read_servers, elt,
+                               REDIS_DEFAULT_PORT, NULL)) {
+                       msg_err ("statfile %s cannot read servers configuration",
+                                       stf->symbol);
+                       return NULL;
+               }
+       }
+
+       elt = ucl_object_find_key (stf->opts, "write_servers");
+       if (elt == NULL) {
+               msg_err ("statfile %s has no write redis servers, "
+                               "so learning is impossible", stf->symbol);
+               cache_ctx->write_servers = NULL;
+       }
+       else {
+               cache_ctx->write_servers = rspamd_upstreams_create (cfg->ups_ctx);
+               if (!rspamd_upstreams_from_ucl (cache_ctx->write_servers, elt,
+                               REDIS_DEFAULT_PORT, NULL)) {
+                       msg_err ("statfile %s cannot write servers configuration",
+                                       stf->symbol);
+                       rspamd_upstreams_destroy (cache_ctx->write_servers);
+                       cache_ctx->write_servers = NULL;
+               }
+       }
+
+       elt = ucl_object_find_key (stf->opts, "key");
+       if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
+               cache_ctx->redis_object = DEFAULT_REDIS_KEY;
+       }
+       else {
+               cache_ctx->redis_object = ucl_object_tostring (elt);
+       }
+
+       elt = ucl_object_find_key (stf->opts, "timeout");
+       if (elt) {
+               cache_ctx->timeout = ucl_object_todouble (elt);
+       }
+       else {
+               cache_ctx->timeout = REDIS_DEFAULT_TIMEOUT;
+       }
+
+       cache_ctx->stcf = stf;
+
+       return (gpointer)cache_ctx;
 }
 
 gpointer
 rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
-               gpointer ctx)
+               gpointer c, gboolean learn)
 {
-       return NULL;
+       struct rspamd_redis_cache_ctx *ctx = c;
+       struct rspamd_redis_cache_runtime *rt;
+       struct upstream *up;
+       rspamd_inet_addr_t *addr;
+       struct timeval tv;
+
+       g_assert (ctx != NULL);
+
+       if (learn && ctx->write_servers == NULL) {
+               msg_err_task ("no write servers defined for %s, cannot learn",
+                               ctx->stcf->symbol);
+               return NULL;
+       }
+
+       if (learn) {
+               up = rspamd_upstream_get (ctx->write_servers,
+                               RSPAMD_UPSTREAM_MASTER_SLAVE,
+                               NULL,
+                               0);
+       }
+       else {
+               up = rspamd_upstream_get (ctx->read_servers,
+                               RSPAMD_UPSTREAM_ROUND_ROBIN,
+                               NULL,
+                               0);
+       }
+
+       if (up == NULL) {
+               msg_err_task ("no upstreams reachable");
+               return NULL;
+       }
+
+       rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
+       rt->selected = up;
+       rt->task = task;
+       rt->ctx = ctx;
+
+       addr = rspamd_upstream_addr (up);
+       g_assert (addr != NULL);
+       rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+                       rspamd_inet_address_get_port (addr));
+       g_assert (rt->redis != NULL);
+
+       redisLibeventAttach (rt->redis, task->ev_base);
+       rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
+                       rspamd_stat_cache_redis_quark ());
+
+       /* Now check stats */
+       event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
+       event_base_set (task->ev_base, &rt->timeout_event);
+       double_to_tv (ctx->timeout, &tv);
+       event_add (&rt->timeout_event, &tv);
+
+       if (!learn) {
+               rspamd_stat_cache_redis_generate_id (task);
+       }
+
+       return rt;
 }
 
 gint
@@ -54,6 +245,12 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
                gpointer runtime,
                gpointer c)
 {
+       struct rspamd_redis_cache_runtime *rt = runtime;
+       gchar *h;
+
+       h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
+       g_assert (h != NULL);
+
        return RSPAMD_LEARN_OK;
 }
 
index 4d97e308410526b514768bd380caff3e483cfea7..9594941ba8f00eeab03b9a53fb9b3574dcc2b75a 100644 (file)
@@ -171,7 +171,7 @@ rspamd_stat_cache_sqlite3_init (struct rspamd_stat_ctx *ctx,
 
 gpointer
 rspamd_stat_cache_sqlite3_runtime (struct rspamd_task *task,
-                               gpointer ctx)
+                               gpointer ctx, gboolean learn)
 {
        /* No need of runtime for this type of classifier */
        return NULL;
index 864336a614285d47968176edf078e5523397cc83..74b226407db51a3bae77b78c4d9bf87beb9a54cb 100644 (file)
@@ -390,7 +390,7 @@ rspamd_stat_cache_check (struct rspamd_stat_ctx *st_ctx,
                }
 
                if (cl->cache && cl->cachecf) {
-                       rt = cl->cache->runtime (task, cl->cachecf);
+                       rt = cl->cache->runtime (task, cl->cachecf, FALSE);
                        learn_res = cl->cache->check (task, spam,
                                        cl->cachecf, rt);
                }
@@ -575,7 +575,7 @@ rspamd_stat_backends_post_learn (struct rspamd_stat_ctx *st_ctx,
                }
 
                if (cl->cache) {
-                       cache_run = cl->cache->runtime (task, cl->cachecf);
+                       cache_run = cl->cache->runtime (task, cl->cachecf, TRUE);
                        cl->cache->learn (task, spam, cache_run, cl->cachecf);
                }