]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Finally rework and simplify redis backend for statistics
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 17 Jun 2016 10:20:06 +0000 (11:20 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 17 Jun 2016 10:20:06 +0000 (11:20 +0100)
src/libstat/backends/redis_backend.c

index 823a5f71bb0ae1db8daa96c1c2351d8d5b14990a..c0d54325e8287928946f6b52ece8e285d8efc7f0 100644 (file)
@@ -66,8 +66,6 @@ struct redis_stat_runtime {
        redisAsyncContext *redis;
        guint64 learned;
        gint id;
-       enum rspamd_redis_connection_state conn_state;
-       ref_entry_t ref;
 };
 
 /* Used to get statistics from redis */
@@ -679,11 +677,18 @@ static void
 rspamd_redis_fin (gpointer data)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+       redisAsyncContext *redis;
 
-       if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
-               rt->conn_state = RSPAMD_REDIS_TERMINATED;
+       /* Stop timeout */
+       if (event_get_base (&rt->timeout_event)) {
                event_del (&rt->timeout_event);
-               REF_RELEASE (rt);
+       }
+
+       if (rt->redis) {
+               redis = rt->redis;
+               rt->redis = NULL;
+               /* This calls for all callbacks pending */
+               redisAsyncFree (redis);
        }
 }
 
@@ -691,11 +696,18 @@ static void
 rspamd_redis_fin_learn (gpointer data)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+       redisAsyncContext *redis;
 
-       if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
-               rt->conn_state = RSPAMD_REDIS_TERMINATED;
+       /* Stop timeout */
+       if (event_get_base (&rt->timeout_event)) {
                event_del (&rt->timeout_event);
-               REF_RELEASE (rt);
+       }
+
+       if (rt->redis) {
+               redis = rt->redis;
+               rt->redis = NULL;
+               /* This calls for all callbacks pending */
+               redisAsyncFree (redis);
        }
 }
 
@@ -704,26 +716,19 @@ rspamd_redis_timeout (gint fd, short what, gpointer d)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
        struct rspamd_task *task;
+       redisAsyncContext *redis;
 
        task = rt->task;
 
-       REF_RETAIN (rt);
        msg_err_task_check ("connection to redis server %s timed out",
                        rspamd_upstream_name (rt->selected));
-       rspamd_upstream_fail (rt->selected);
-
-       if (rt->conn_state == RSPAMD_REDIS_REQUEST_SENT && rt->task) {
-               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
-       }
-
-       rt->conn_state = RSPAMD_REDIS_TERMINATED;
 
        if (rt->redis) {
-               redisAsyncFree (rt->redis);
+               redis = rt->redis;
+               rt->redis = NULL;
+               /* This calls for all callbacks pending */
+               redisAsyncFree (redis);
        }
-
-       rt->redis = NULL;
-       REF_RELEASE (rt);
 }
 
 /* Called when we have connected to the redis server and got stats */
@@ -737,12 +742,6 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
 
        task = rt->task;
 
-       if (rt->conn_state == RSPAMD_REDIS_TERMINATED) {
-               /* Task has disappeared already */
-               REF_RELEASE (rt);
-               return;
-       }
-
        if (c->err == 0) {
                if (r != NULL) {
                        if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
@@ -767,27 +766,17 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
                        }
 
                        rt->learned = val;
-                       REF_RETAIN (rt);
                        msg_debug_task ("connected to redis server, tokens learned for %s: %uL",
                                        rt->redis_object_expanded, rt->learned);
                        rspamd_upstream_ok (rt->selected);
-                       /* This also set state to terminated state */
-                       rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
-                       rt->conn_state = RSPAMD_REDIS_CONNECTED;
-               }
-               else {
-                       /* This could be caused by removing redis context forcefully */
-                       rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
                }
        }
        else {
                msg_err_task ("error getting reply from redis server %s: %s",
                                rspamd_upstream_name (rt->selected), c->errstr);
                rspamd_upstream_fail (rt->selected);
-               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
        }
 
-       REF_RELEASE (rt);
 }
 
 /* Called when we have received tokens values from redis */
@@ -804,12 +793,6 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
 
        task = rt->task;
 
-       if (rt->conn_state == RSPAMD_REDIS_TERMINATED) {
-               /* Task has disappeared already */
-               REF_RELEASE (rt);
-               return;
-       }
-
        if (c->err == 0) {
                if (r != NULL) {
                        if (reply->type == REDIS_REPLY_ARRAY) {
@@ -851,36 +834,29 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
                                        }
                                }
                                else {
-                                       msg_err_task ("got invalid length of reply vector from redis: "
+                                       msg_err_task_check ("got invalid length of reply vector from redis: "
                                                        "%d, expected: %d",
                                                        (gint)reply->elements,
                                                        (gint)task->tokens->len);
                                }
                        }
                        else {
-                               msg_err_task ("got invalid reply from redis: %d",
+                               msg_err_task_check ("got invalid reply from redis: %d",
                                                reply->type);
                        }
 
-                       msg_debug_task ("received tokens for %s: %d processed, %d found",
+                       msg_debug_task_check ("received tokens for %s: %d processed, %d found",
                                        rt->redis_object_expanded, processed, found);
                        rspamd_upstream_ok (rt->selected);
-                       rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
-               }
-               else {
-                       rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
                }
-
-               rt->conn_state = RSPAMD_REDIS_CONNECTED;
        }
        else {
                msg_err_task ("error getting reply from redis server %s: %s",
                                rspamd_upstream_name (rt->selected), c->errstr);
                rspamd_upstream_fail (rt->selected);
-               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
        }
 
-       REF_RELEASE (rt);
+       rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
 }
 
 /* Called when we have set tokens during learning */
@@ -892,29 +868,16 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
 
        task = rt->task;
 
-       if (rt->conn_state == RSPAMD_REDIS_TERMINATED) {
-               /* Task has disappeared already */
-               REF_RELEASE (rt);
-               return;
-       }
-
        if (c->err == 0) {
                rspamd_upstream_ok (rt->selected);
                rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
        }
        else {
-               msg_err_task ("error getting reply from redis server %s: %s",
+               msg_err_task_check ("error getting reply from redis server %s: %s",
                                rspamd_upstream_name (rt->selected), c->errstr);
                rspamd_upstream_fail (rt->selected);
                rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
        }
-
-       if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
-               rt->conn_state = RSPAMD_REDIS_TERMINATED;
-               redisAsyncFree (rt->redis);
-       }
-
-       REF_RELEASE (rt);
 }
 
 static gboolean
@@ -1094,16 +1057,6 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
        return (gpointer)backend;
 }
 
-static void
-rspamd_redis_runtime_dtor (struct redis_stat_runtime *rt)
-{
-       if (event_get_base (&rt->timeout_event)) {
-               event_del (&rt->timeout_event);
-       }
-
-       g_slice_free1 (sizeof (*rt), rt);
-}
-
 gpointer
 rspamd_redis_runtime (struct rspamd_task *task,
                struct rspamd_statfile_config *stcf,
@@ -1113,7 +1066,6 @@ rspamd_redis_runtime (struct rspamd_task *task,
        struct redis_stat_runtime *rt;
        struct upstream *up;
        rspamd_inet_addr_t *addr;
-       struct timeval tv;
 
        g_assert (ctx != NULL);
        g_assert (stcf != NULL);
@@ -1141,40 +1093,27 @@ rspamd_redis_runtime (struct rspamd_task *task,
                return NULL;
        }
 
-       rt = g_slice_alloc0 (sizeof (*rt));
-       REF_INIT_RETAIN (rt, rspamd_redis_runtime_dtor);
+       rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
        rspamd_redis_expand_object (ctx->redis_object, ctx, task,
                        &rt->redis_object_expanded);
        rt->selected = up;
        rt->task = task;
        rt->ctx = ctx;
        rt->stcf = stcf;
-       rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
 
        addr = rspamd_upstream_addr (up);
        g_assert (addr != NULL);
        rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
                        rspamd_inet_address_get_port (addr));
-       g_assert (rt->redis != NULL);
+
+       if (rt->redis == NULL) {
+               msg_err_task ("cannot connect redis");
+               return NULL;
+       }
 
        redisLibeventAttach (rt->redis, task->ev_base);
        rspamd_redis_maybe_auth (ctx, rt->redis);
 
-       if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
-                       rt->redis_object_expanded, "learns") == REDIS_OK) {
-               rt->conn_state = RSPAMD_REDIS_REQUEST_SENT;
-
-               rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
-                               rspamd_redis_stat_quark ());
-
-               event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
-               event_base_set (task->ev_base, &rt->timeout_event);
-               double_to_tv (ctx->timeout, &tv);
-               event_add (&rt->timeout_event, &tv);
-               /* Cleared by timeout */
-               REF_RETAIN (rt);
-       }
-
        return rt;
 }
 
@@ -1204,34 +1143,42 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
        struct timeval tv;
        gint ret;
 
-       if (tokens == NULL || tokens->len == 0 || rt->redis == NULL ||
-                       rt->conn_state != RSPAMD_REDIS_CONNECTED) {
+       if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
                return FALSE;
        }
 
        rt->id = id;
-       query = rspamd_redis_tokens_to_query (task, tokens,
-                       "HMGET", rt->redis_object_expanded, FALSE, -1,
-                       rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
-       g_assert (query != NULL);
-       rspamd_mempool_add_destructor (task->task_pool,
-                               (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
 
-       ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
-                       query->str, query->len);
-       if (ret == REDIS_OK) {
-               rt->conn_state = RSPAMD_REDIS_REQUEST_SENT;
+       if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
+                       rt->redis_object_expanded, "learns") == REDIS_OK) {
+
                rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
                                rspamd_redis_stat_quark ());
-               /* Reset timeout */
-               event_del (&rt->timeout_event);
+
+               if (event_get_base (&rt->timeout_event)) {
+                       event_del (&rt->timeout_event);
+               }
+               event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+               event_base_set (task->ev_base, &rt->timeout_event);
                double_to_tv (rt->ctx->timeout, &tv);
                event_add (&rt->timeout_event, &tv);
 
-               return TRUE;
-       }
-       else {
-               msg_err_task ("call to redis failed: %s", rt->redis->errstr);
+               query = rspamd_redis_tokens_to_query (task, tokens,
+                               "HMGET", rt->redis_object_expanded, FALSE, -1,
+                               rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
+               g_assert (query != NULL);
+               rspamd_mempool_add_destructor (task->task_pool,
+                               (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
+
+               ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
+                               query->str, query->len);
+
+               if (ret == REDIS_OK) {
+                       return TRUE;
+               }
+               else {
+                       msg_err_task ("call to redis failed: %s", rt->redis->errstr);
+               }
        }
 
        return FALSE;
@@ -1242,14 +1189,16 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
                gpointer ctx)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
+       redisAsyncContext *redis;
 
-       if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
+       if (event_get_base (&rt->timeout_event)) {
                event_del (&rt->timeout_event);
-               rt->conn_state = RSPAMD_REDIS_TERMINATED;
+       }
 
-               redisAsyncFree (rt->redis);
+       if (rt->redis) {
+               redis = rt->redis;
                rt->redis = NULL;
-               REF_RELEASE (rt);
+               redisAsyncFree (redis);
        }
 }
 
@@ -1266,13 +1215,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
        rspamd_token_t *tok;
        gint ret;
 
-       if (rt->conn_state == RSPAMD_REDIS_CONNECTED) {
-               /* We are likely in some bad state */
-               msg_err_task ("invalid state for function: %d", rt->conn_state);
-
-               return FALSE;
-       }
-
        up = rspamd_upstream_get (rt->ctx->write_servers,
                        RSPAMD_UPSTREAM_MASTER_SLAVE,
                        NULL,
@@ -1359,11 +1301,14 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
        if (ret == REDIS_OK) {
                rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt,
                                rspamd_redis_stat_quark ());
-               /* Reset timeout */
-               event_del (&rt->timeout_event);
+               /* Set timeout */
+               if (event_get_base (&rt->timeout_event)) {
+                       event_del (&rt->timeout_event);
+               }
+               event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+               event_base_set (task->ev_base, &rt->timeout_event);
                double_to_tv (rt->ctx->timeout, &tv);
                event_add (&rt->timeout_event, &tv);
-               rt->conn_state = RSPAMD_REDIS_CONNECTED;
 
                return TRUE;
        }
@@ -1380,13 +1325,16 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
                gpointer ctx)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
+       redisAsyncContext *redis;
 
-       if (rt->conn_state == RSPAMD_REDIS_CONNECTED) {
+       if (event_get_base (&rt->timeout_event)) {
                event_del (&rt->timeout_event);
-               rt->conn_state = RSPAMD_REDIS_TERMINATED;
-               redisAsyncFree (rt->redis);
+       }
+
+       if (rt->redis) {
+               redis = rt->redis;
                rt->redis = NULL;
-               REF_RELEASE (rt);
+               redisAsyncFree (redis);
        }
 }
 
@@ -1434,18 +1382,15 @@ rspamd_redis_get_stat (gpointer runtime,
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
        struct rspamd_redis_stat_elt *st;
+       redisAsyncContext *redis;
 
        if (rt->ctx->stat_elt) {
                st = rt->ctx->stat_elt->ud;
 
                if (rt->redis) {
-                       if (rt->conn_state == RSPAMD_REDIS_REQUEST_SENT && rt->task) {
-                               rspamd_session_remove_event (rt->task->s, rspamd_redis_fin, rt);
-                       }
-                       event_del (&rt->timeout_event);
-                       rt->conn_state = RSPAMD_REDIS_TERMINATED;
-                       redisAsyncFree (rt->redis);
+                       redis = rt->redis;
                        rt->redis = NULL;
+                       redisAsyncFree (redis);
                }
 
                if (st->stat) {
@@ -1463,5 +1408,4 @@ rspamd_redis_load_tokenizer_config (gpointer runtime,
        return NULL;
 }
 
-
 #endif