#include "cfg_file.h"
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
+#include "hiredis/adapters/libevent.h"
#include "cryptobox.h"
#include "ref.h"
static void
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
{
- if (c->active && c->ctx != NULL) {
- g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
- redisAsyncFree (c->ctx);
+ if (c->active) {
+ if (c->ctx) {
+ g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
+ redisAsyncFree (c->ctx);
+ }
+
g_queue_unlink (c->elt->active, c->entry);
}
+ else {
+ if (event_get_base (&c->timeout)) {
+ event_del (&c->timeout);
+ }
- if (!c->active && event_get_base (&c->timeout)) {
- event_del (&c->timeout);
g_queue_unlink (c->elt->inactive, c->entry);
}
ctx = redisAsyncConnect (ip, port);
if (ctx) {
- conn = g_slice_alloc0 (sizeof (conn));
- conn->entry = g_list_prepend (NULL, conn);
- conn->elt = elt;
- conn->active = TRUE;
- g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
- g_queue_push_head_link (elt->active, conn->entry);
- conn->ctx = ctx;
- REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
-
- if (password) {
- redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+
+ if (ctx->err != REDIS_OK) {
+ redisAsyncFree (ctx);
+
+ return NULL;
}
- if (db) {
- redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ else {
+ conn = g_slice_alloc0 (sizeof (*conn));
+ conn->entry = g_list_prepend (NULL, conn);
+ conn->elt = elt;
+ conn->active = TRUE;
+ g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+ g_queue_push_head_link (elt->active, conn->entry);
+ conn->ctx = ctx;
+ REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+
+ redisLibeventAttach (ctx, pool->ev_base);
+
+ if (password) {
+ redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+ }
+ if (db) {
+ redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ }
}
return conn;
conn->active = TRUE;
g_queue_push_tail_link (elt->active, conn_entry);
- REF_RETAIN (conn);
}
else {
conn = rspamd_redis_pool_new_connection (pool, elt,
db, password, ip, port);
}
-
- return conn->ctx;
}
else {
elt = rspamd_redis_pool_new_elt (pool);
db, password, ip, port);
}
+ REF_RETAIN (conn);
+
return conn->ctx;
}
if (conn != NULL) {
REF_RELEASE (conn);
- if (is_fatal) {
+ if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) {
/* We need to terminate connection forcefully */
REF_RELEASE (conn);
}
#include "dns.h"
#include "utlist.h"
-#ifdef WITH_HIREDIS
-#include "hiredis.h"
-#include "adapters/libevent.h"
-#endif
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"
#define REDIS_DEFAULT_TIMEOUT 1.0
struct lua_redis_userdata *ud;
struct lua_redis_specific_userdata *cur, *tmp;
gboolean is_connected = FALSE;
+ struct redisAsyncContext *ac;
if (ctx->async) {
msg_debug ("desctructing %p", ctx);
* still be alive here!
*/
ctx->ref.refcount = 100500;
- redisAsyncFree (ud->ctx);
+ ac = ud->ctx;
+ ud->ctx = NULL;
+ rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool,
+ ac, FALSE);
ctx->ref.refcount = 0;
is_connected = TRUE;
}
ac = ud->ctx;
ud->ctx = NULL;
- if (ac != NULL) {
- redisAsyncFree (ac);
+ if (ac) {
+ rspamd_redis_pool_release_connection (ud->task->cfg->redis_pool,
+ ac, FALSE);
}
}
* This will call all callbacks pending so the entire context
* will be destructed
*/
- redisAsyncFree (ac);
+ rspamd_redis_pool_release_connection (sp_ud->c->task->cfg->redis_pool,
+ ac, TRUE);
}
REDIS_RELEASE (ctx);
}
*nargs = top;
}
-static void
-lua_redis_connect_cb (const struct redisAsyncContext *c, int status)
-{
- /*
- * Workaround to prevent double close:
- * https://groups.google.com/forum/#!topic/redis-db/mQm46XkIPOY
- */
-#if defined(HIREDIS_MAJOR) && HIREDIS_MAJOR == 0 && HIREDIS_MINOR <= 11
- struct redisAsyncContext *nc = (struct redisAsyncContext *)c;
- if (status == REDIS_ERR) {
- nc->c.fd = -1;
- }
-#endif
-}
-
-
/***
* @function rspamd_redis.make_request({params})
if (ret) {
ud->terminated = 0;
ud->timeout = timeout;
- ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+ ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool,
+ dbname, password,
+ rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));
if (ud->ctx == NULL || ud->ctx->err) {
if (ud->ctx) {
msg_err_task_check ("cannot connect to redis: %s",
ud->ctx->errstr);
- redisAsyncFree (ud->ctx);
ud->ctx = NULL;
}
else {
return 2;
}
- redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb);
- redisLibeventAttach (ud->ctx, ud->task->ev_base);
-
- if (password) {
- redisAsyncCommand (ud->ctx, NULL, NULL, "AUTH %s", password);
- }
- if (dbname) {
- redisAsyncCommand (ud->ctx, NULL, NULL, "SELECT %s", dbname);
- }
-
ret = redisAsyncCommandArgv (ud->ctx,
lua_redis_callback,
sp_ud,
}
else {
msg_info_task_check ("call to redis failed: %s", ud->ctx->errstr);
- redisAsyncFree (ud->ctx);
+ rspamd_redis_pool_release_connection (task->cfg->redis_pool,
+ ud->ctx, FALSE);
ud->ctx = NULL;
REDIS_RELEASE (ctx);
ret = FALSE;
if (ret && ctx) {
ud->terminated = 0;
ud->timeout = timeout;
- ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+ ud->ctx = rspamd_redis_pool_connect (task->cfg->redis_pool,
+ NULL, NULL,
+ rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));
if (ud->ctx == NULL || ud->ctx->err) {
return 1;
}
- redisAsyncSetConnectCallback (ud->ctx, lua_redis_connect_cb);
- redisLibeventAttach (ud->ctx, ud->task->ev_base);
pctx = lua_newuserdata (L, sizeof (ctx));
*pctx = ctx;
rspamd_lua_setclass (L, "rspamd{redis}", -1);