]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Handle failures for inactive pooled connections
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 8 Sep 2016 14:25:28 +0000 (15:25 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 8 Sep 2016 14:25:28 +0000 (15:25 +0100)
src/libserver/redis_pool.c

index dd5adf2210f97b2d92c0838b209ff427a30a5a34..ba39bb2f81686c7bbc2d744eff15c8c905da2cef 100644 (file)
@@ -115,6 +115,10 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
                        event_del (&conn->timeout);
                }
 
+               if (conn->ctx) {
+                       g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx);
+               }
+
                g_queue_unlink (conn->elt->inactive, conn->entry);
        }
 
@@ -180,6 +184,28 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
        event_add (&conn->timeout, &tv);
 }
 
+static void
+rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status,
+               void *ud)
+{
+       struct rspamd_redis_pool_connection *conn = ud;
+
+       /*
+        * Here, we know that redis itself will free this connection
+        * so, we need to do something very clever about it
+        */
+
+       if (!conn->active) {
+               /* Do nothing for active connections as it is already handled somewhere */
+               if (conn->ctx) {
+                       msg_info_rpool ("inactive connection terminated: %s",
+                               conn->ctx->errstr);
+               }
+
+               REF_RELEASE (conn);
+       }
+}
+
 static struct rspamd_redis_pool_connection *
 rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
                struct rspamd_redis_pool_elt *elt,
@@ -214,6 +240,8 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
                        msg_debug_rpool ("created new connection to %s:%d", ip, port);
 
                        redisLibeventAttach (ctx, pool->ev_base);
+                       redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect,
+                                       conn);
 
                        if (password) {
                                redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
@@ -295,9 +323,16 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
                                event_del (&conn->timeout);
                        }
 
-                       conn->active = TRUE;
-                       g_queue_push_tail_link (elt->active, conn_entry);
-                       msg_debug_rpool ("reused existing connection to %s:%d", ip, port);
+                       if (conn->ctx->err == REDIS_OK) {
+                               conn->active = TRUE;
+                               g_queue_push_tail_link (elt->active, conn_entry);
+                               msg_debug_rpool ("reused existing connection to %s:%d", ip, port);
+                       }
+                       else {
+                               REF_RELEASE (conn);
+                               conn = rspamd_redis_pool_new_connection (pool, elt,
+                                               db, password, ip, port);
+                       }
 
                }
                else {
@@ -307,6 +342,7 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
                }
        }
        else {
+               /* Need to create a pool */
                elt = rspamd_redis_pool_new_elt (pool);
                elt->key = key;
                g_hash_table_insert (pool->elts_by_key, &elt->key, elt);