]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Various fixes in redis pool
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 30 Aug 2016 17:09:47 +0000 (18:09 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 30 Aug 2016 17:09:47 +0000 (18:09 +0100)
src/libserver/redis_pool.c
src/lua/lua_redis.c

index cfc2757bed87dff7294fd6f582756c7f52577a6a..8ef0f9ad58921c242be2fbc65b281d0124076b23 100644 (file)
@@ -20,6 +20,7 @@
 #include "cfg_file.h"
 #include "hiredis/hiredis.h"
 #include "hiredis/async.h"
+#include "hiredis/adapters/libevent.h"
 #include "cryptobox.h"
 #include "ref.h"
 
@@ -78,14 +79,19 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
 static void
 rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
 {
-       if (c->active && c->ctx != NULL) {
-               g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
-               redisAsyncFree (c->ctx);
+       if (c->active) {
+               if (c->ctx) {
+                       g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
+                       redisAsyncFree (c->ctx);
+               }
+
                g_queue_unlink (c->elt->active, c->entry);
        }
+       else {
+               if (event_get_base (&c->timeout)) {
+                       event_del (&c->timeout);
+               }
 
-       if (!c->active && event_get_base (&c->timeout)) {
-               event_del (&c->timeout);
                g_queue_unlink (c->elt->inactive, c->entry);
        }
 
@@ -162,20 +168,30 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
        ctx = redisAsyncConnect (ip, port);
 
        if (ctx) {
-               conn = g_slice_alloc0 (sizeof (conn));
-               conn->entry = g_list_prepend (NULL, conn);
-               conn->elt = elt;
-               conn->active = TRUE;
-               g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
-               g_queue_push_head_link (elt->active, conn->entry);
-               conn->ctx = ctx;
-               REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
-
-               if (password) {
-                       redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+
+               if (ctx->err != REDIS_OK) {
+                       redisAsyncFree (ctx);
+
+                       return NULL;
                }
-               if (db) {
-                       redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+               else {
+                       conn = g_slice_alloc0 (sizeof (*conn));
+                       conn->entry = g_list_prepend (NULL, conn);
+                       conn->elt = elt;
+                       conn->active = TRUE;
+                       g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+                       g_queue_push_head_link (elt->active, conn->entry);
+                       conn->ctx = ctx;
+                       REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+
+                       redisLibeventAttach (ctx, pool->ev_base);
+
+                       if (password) {
+                               redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+                       }
+                       if (db) {
+                               redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+                       }
                }
 
                return conn;
@@ -252,7 +268,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
 
                        conn->active = TRUE;
                        g_queue_push_tail_link (elt->active, conn_entry);
-                       REF_RETAIN (conn);
 
                }
                else {
@@ -260,8 +275,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
                        conn = rspamd_redis_pool_new_connection (pool, elt,
                                        db, password, ip, port);
                }
-
-               return conn->ctx;
        }
        else {
                elt = rspamd_redis_pool_new_elt (pool);
@@ -272,6 +285,8 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
                                db, password, ip, port);
        }
 
+       REF_RETAIN (conn);
+
        return conn->ctx;
 }
 
@@ -289,7 +304,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
        if (conn != NULL) {
                REF_RELEASE (conn);
 
-               if (is_fatal) {
+               if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) {
                        /* We need to terminate connection forcefully */
                        REF_RELEASE (conn);
                }
index c35d9614bb058fbd615d371087144bbacb182d74..b7fb8f6c400c5d47730278ad330a87d4dfc79fb5 100644 (file)
 #include "dns.h"
 #include "utlist.h"
 
-#ifdef WITH_HIREDIS
-#include "hiredis.h"
-#include "adapters/libevent.h"
-#endif
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"
 
 #define REDIS_DEFAULT_TIMEOUT 1.0
 
@@ -155,6 +153,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
        struct lua_redis_userdata *ud;
        struct lua_redis_specific_userdata *cur, *tmp;
        gboolean is_connected = FALSE;
+       struct redisAsyncContext *ac;
 
        if (ctx->async) {
                msg_debug ("desctructing %p", ctx);
@@ -168,7 +167,10 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
                         * still be alive here!
                         */
                        ctx->ref.refcount = 100500;
-                       redisAsyncFree (ud->ctx);
+                       ac = ud->ctx;
+                       ud->ctx = NULL;
+                       rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool,
+                                       ac, FALSE);
                        ctx->ref.refcount = 0;
                        is_connected = TRUE;
                }
@@ -384,8 +386,9 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
                ac = ud->ctx;
                ud->ctx = NULL;
 
-               if (ac != NULL) {
-                       redisAsyncFree (ac);
+               if (ac) {
+                       rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool,
+                                       ac, FALSE);
                }
        }
 
@@ -413,7 +416,8 @@ lua_redis_timeout (int fd, short what, gpointer u)
                 * This will call all callbacks pending so the entire context
                 * will be destructed
                 */
-               redisAsyncFree (ac);
+               rspamd_redis_pool_release_connection (sp_ud->c->task->cfg->redis_pool,
+                               ac, TRUE);
        }
        REDIS_RELEASE (ctx);
 }
@@ -464,22 +468,6 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
        *nargs = top;
 }
 
-static void
-lua_redis_connect_cb (const struct redisAsyncContext *c, int status)
-{
-       /*
-        * Workaround to prevent double close:
-        * https://groups.google.com/forum/#!topic/redis-db/mQm46XkIPOY
-        */
-#if defined(HIREDIS_MAJOR) && HIREDIS_MAJOR == 0 && HIREDIS_MINOR <= 11
-       struct redisAsyncContext *nc = (struct redisAsyncContext *)c;
-       if (status == REDIS_ERR) {
-               nc->c.fd = -1;
-       }
-#endif
-}
-
-
 
 /***
  * @function rspamd_redis.make_request({params})
@@ -662,14 +650,15 @@ lua_redis_make_request (lua_State *L)
        if (ret) {
                ud->terminated = 0;
                ud->timeout = timeout;
-               ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+               ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool,
+                               dbname, password,
+                               rspamd_inet_address_to_string (addr->addr),
                                rspamd_inet_address_get_port (addr->addr));
 
                if (ud->ctx == NULL || ud->ctx->err) {
                        if (ud->ctx) {
                                msg_err_task_check ("cannot connect to redis: %s",
                                                ud->ctx->errstr);
-                               redisAsyncFree (ud->ctx);
                                ud->ctx = NULL;
                        }
                        else {
@@ -683,16 +672,6 @@ lua_redis_make_request (lua_State *L)
                        return 2;
                }
 
-               redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb);
-               redisLibeventAttach (ud->ctx, ud->task->ev_base);
-
-               if (password) {
-                       redisAsyncCommand (ud->ctx, NULL, NULL, "AUTH %s", password);
-               }
-               if (dbname) {
-                       redisAsyncCommand (ud->ctx, NULL, NULL, "SELECT %s", dbname);
-               }
-
                ret = redisAsyncCommandArgv (ud->ctx,
                                        lua_redis_callback,
                                        sp_ud,
@@ -719,7 +698,8 @@ lua_redis_make_request (lua_State *L)
                }
                else {
                        msg_info_task_check ("call to redis failed: %s", ud->ctx->errstr);
-                       redisAsyncFree (ud->ctx);
+                       rspamd_redis_pool_release_connection (task->cfg->redis_pool,
+                                       ud->ctx, FALSE);
                        ud->ctx = NULL;
                        REDIS_RELEASE (ctx);
                        ret = FALSE;
@@ -936,7 +916,9 @@ lua_redis_connect (lua_State *L)
        if (ret && ctx) {
                ud->terminated = 0;
                ud->timeout = timeout;
-               ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+               ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool,
+                               NULL, NULL,
+                               rspamd_inet_address_to_string (addr->addr),
                                rspamd_inet_address_get_port (addr->addr));
 
                if (ud->ctx == NULL || ud->ctx->err) {
@@ -948,8 +930,6 @@ lua_redis_connect (lua_State *L)
                        return 1;
                }
 
-               redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb);
-               redisLibeventAttach (ud->ctx, ud->task->ev_base);
                pctx = lua_newuserdata (L, sizeof (ctx));
                *pctx = ctx;
                rspamd_lua_setclass (L, "rspamd{redis}", -1);