redisAsyncContext *redis;
guint64 learned;
gint id;
- enum rspamd_redis_connection_state conn_state;
- ref_entry_t ref;
};
/* Used to get statistics from redis */
rspamd_redis_fin (gpointer data)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+ redisAsyncContext *redis;
- if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
+ /* Stop timeout */
+ if (event_get_base (&rt->timeout_event)) {
event_del (&rt->timeout_event);
- REF_RELEASE (rt);
+ }
+
+ if (rt->redis) {
+ redis = rt->redis;
+ rt->redis = NULL;
+ /* This calls for all callbacks pending */
+ redisAsyncFree (redis);
}
}
rspamd_redis_fin_learn (gpointer data)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+ redisAsyncContext *redis;
- if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
+ /* Stop timeout */
+ if (event_get_base (&rt->timeout_event)) {
event_del (&rt->timeout_event);
- REF_RELEASE (rt);
+ }
+
+ if (rt->redis) {
+ redis = rt->redis;
+ rt->redis = NULL;
+ /* This calls for all callbacks pending */
+ redisAsyncFree (redis);
}
}
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
struct rspamd_task *task;
+ redisAsyncContext *redis;
task = rt->task;
- REF_RETAIN (rt);
msg_err_task_check ("connection to redis server %s timed out",
rspamd_upstream_name (rt->selected));
- rspamd_upstream_fail (rt->selected);
-
- if (rt->conn_state == RSPAMD_REDIS_REQUEST_SENT && rt->task) {
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
- }
-
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
if (rt->redis) {
- redisAsyncFree (rt->redis);
+ redis = rt->redis;
+ rt->redis = NULL;
+ /* This calls for all callbacks pending */
+ redisAsyncFree (redis);
}
-
- rt->redis = NULL;
- REF_RELEASE (rt);
}
/* Called when we have connected to the redis server and got stats */
task = rt->task;
- if (rt->conn_state == RSPAMD_REDIS_TERMINATED) {
- /* Task has disappeared already */
- REF_RELEASE (rt);
- return;
- }
-
if (c->err == 0) {
if (r != NULL) {
if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
}
rt->learned = val;
- REF_RETAIN (rt);
msg_debug_task ("connected to redis server, tokens learned for %s: %uL",
rt->redis_object_expanded, rt->learned);
rspamd_upstream_ok (rt->selected);
- /* This also set state to terminated state */
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
- rt->conn_state = RSPAMD_REDIS_CONNECTED;
- }
- else {
- /* This could be caused by removing redis context forcefully */
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
}
else {
msg_err_task ("error getting reply from redis server %s: %s",
rspamd_upstream_name (rt->selected), c->errstr);
rspamd_upstream_fail (rt->selected);
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
- REF_RELEASE (rt);
}
/* Called when we have received tokens values from redis */
task = rt->task;
- if (rt->conn_state == RSPAMD_REDIS_TERMINATED) {
- /* Task has disappeared already */
- REF_RELEASE (rt);
- return;
- }
-
if (c->err == 0) {
if (r != NULL) {
if (reply->type == REDIS_REPLY_ARRAY) {
}
}
else {
- msg_err_task ("got invalid length of reply vector from redis: "
+ msg_err_task_check ("got invalid length of reply vector from redis: "
"%d, expected: %d",
(gint)reply->elements,
(gint)task->tokens->len);
}
}
else {
- msg_err_task ("got invalid reply from redis: %d",
+ msg_err_task_check ("got invalid reply from redis: %d",
reply->type);
}
- msg_debug_task ("received tokens for %s: %d processed, %d found",
+ msg_debug_task_check ("received tokens for %s: %d processed, %d found",
rt->redis_object_expanded, processed, found);
rspamd_upstream_ok (rt->selected);
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
- }
- else {
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
-
- rt->conn_state = RSPAMD_REDIS_CONNECTED;
}
else {
msg_err_task ("error getting reply from redis server %s: %s",
rspamd_upstream_name (rt->selected), c->errstr);
rspamd_upstream_fail (rt->selected);
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
- REF_RELEASE (rt);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
/* Called when we have set tokens during learning */
task = rt->task;
- if (rt->conn_state == RSPAMD_REDIS_TERMINATED) {
- /* Task has disappeared already */
- REF_RELEASE (rt);
- return;
- }
-
if (c->err == 0) {
rspamd_upstream_ok (rt->selected);
rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
}
else {
- msg_err_task ("error getting reply from redis server %s: %s",
+ msg_err_task_check ("error getting reply from redis server %s: %s",
rspamd_upstream_name (rt->selected), c->errstr);
rspamd_upstream_fail (rt->selected);
rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
}
-
- if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
- redisAsyncFree (rt->redis);
- }
-
- REF_RELEASE (rt);
}
static gboolean
return (gpointer)backend;
}
-static void
-rspamd_redis_runtime_dtor (struct redis_stat_runtime *rt)
-{
- if (event_get_base (&rt->timeout_event)) {
- event_del (&rt->timeout_event);
- }
-
- g_slice_free1 (sizeof (*rt), rt);
-}
-
gpointer
rspamd_redis_runtime (struct rspamd_task *task,
struct rspamd_statfile_config *stcf,
struct redis_stat_runtime *rt;
struct upstream *up;
rspamd_inet_addr_t *addr;
- struct timeval tv;
g_assert (ctx != NULL);
g_assert (stcf != NULL);
return NULL;
}
- rt = g_slice_alloc0 (sizeof (*rt));
- REF_INIT_RETAIN (rt, rspamd_redis_runtime_dtor);
+ rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
rspamd_redis_expand_object (ctx->redis_object, ctx, task,
&rt->redis_object_expanded);
rt->selected = up;
rt->task = task;
rt->ctx = ctx;
rt->stcf = stcf;
- rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
addr = rspamd_upstream_addr (up);
g_assert (addr != NULL);
rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
rspamd_inet_address_get_port (addr));
- g_assert (rt->redis != NULL);
+
+ if (rt->redis == NULL) {
+ msg_err_task ("cannot connect redis");
+ return NULL;
+ }
redisLibeventAttach (rt->redis, task->ev_base);
rspamd_redis_maybe_auth (ctx, rt->redis);
- if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
- rt->redis_object_expanded, "learns") == REDIS_OK) {
- rt->conn_state = RSPAMD_REDIS_REQUEST_SENT;
-
- rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
- rspamd_redis_stat_quark ());
-
- event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
- event_base_set (task->ev_base, &rt->timeout_event);
- double_to_tv (ctx->timeout, &tv);
- event_add (&rt->timeout_event, &tv);
- /* Cleared by timeout */
- REF_RETAIN (rt);
- }
-
return rt;
}
struct timeval tv;
gint ret;
- if (tokens == NULL || tokens->len == 0 || rt->redis == NULL ||
- rt->conn_state != RSPAMD_REDIS_CONNECTED) {
+ if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
return FALSE;
}
rt->id = id;
- query = rspamd_redis_tokens_to_query (task, tokens,
- "HMGET", rt->redis_object_expanded, FALSE, -1,
- rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
- g_assert (query != NULL);
- rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
- ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
- query->str, query->len);
- if (ret == REDIS_OK) {
- rt->conn_state = RSPAMD_REDIS_REQUEST_SENT;
+ if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
+ rt->redis_object_expanded, "learns") == REDIS_OK) {
+
rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
rspamd_redis_stat_quark ());
- /* Reset timeout */
- event_del (&rt->timeout_event);
+
+ if (event_get_base (&rt->timeout_event)) {
+ event_del (&rt->timeout_event);
+ }
+ event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+ event_base_set (task->ev_base, &rt->timeout_event);
double_to_tv (rt->ctx->timeout, &tv);
event_add (&rt->timeout_event, &tv);
- return TRUE;
- }
- else {
- msg_err_task ("call to redis failed: %s", rt->redis->errstr);
+ query = rspamd_redis_tokens_to_query (task, tokens,
+ "HMGET", rt->redis_object_expanded, FALSE, -1,
+ rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
+ g_assert (query != NULL);
+ rspamd_mempool_add_destructor (task->task_pool,
+ (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
+
+ ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
+ query->str, query->len);
+
+ if (ret == REDIS_OK) {
+ return TRUE;
+ }
+ else {
+ msg_err_task ("call to redis failed: %s", rt->redis->errstr);
+ }
}
return FALSE;
gpointer ctx)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
+ redisAsyncContext *redis;
- if (rt->conn_state != RSPAMD_REDIS_TERMINATED) {
+ if (event_get_base (&rt->timeout_event)) {
event_del (&rt->timeout_event);
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
+ }
- redisAsyncFree (rt->redis);
+ if (rt->redis) {
+ redis = rt->redis;
rt->redis = NULL;
- REF_RELEASE (rt);
+ redisAsyncFree (redis);
}
}
rspamd_token_t *tok;
gint ret;
- if (rt->conn_state == RSPAMD_REDIS_CONNECTED) {
- /* We are likely in some bad state */
- msg_err_task ("invalid state for function: %d", rt->conn_state);
-
- return FALSE;
- }
-
up = rspamd_upstream_get (rt->ctx->write_servers,
RSPAMD_UPSTREAM_MASTER_SLAVE,
NULL,
if (ret == REDIS_OK) {
rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt,
rspamd_redis_stat_quark ());
- /* Reset timeout */
- event_del (&rt->timeout_event);
+ /* Set timeout */
+ if (event_get_base (&rt->timeout_event)) {
+ event_del (&rt->timeout_event);
+ }
+ event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+ event_base_set (task->ev_base, &rt->timeout_event);
double_to_tv (rt->ctx->timeout, &tv);
event_add (&rt->timeout_event, &tv);
- rt->conn_state = RSPAMD_REDIS_CONNECTED;
return TRUE;
}
gpointer ctx)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
+ redisAsyncContext *redis;
- if (rt->conn_state == RSPAMD_REDIS_CONNECTED) {
+ if (event_get_base (&rt->timeout_event)) {
event_del (&rt->timeout_event);
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
- redisAsyncFree (rt->redis);
+ }
+
+ if (rt->redis) {
+ redis = rt->redis;
rt->redis = NULL;
- REF_RELEASE (rt);
+ redisAsyncFree (redis);
}
}
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
struct rspamd_redis_stat_elt *st;
+ redisAsyncContext *redis;
if (rt->ctx->stat_elt) {
st = rt->ctx->stat_elt->ud;
if (rt->redis) {
- if (rt->conn_state == RSPAMD_REDIS_REQUEST_SENT && rt->task) {
- rspamd_session_remove_event (rt->task->s, rspamd_redis_fin, rt);
- }
- event_del (&rt->timeout_event);
- rt->conn_state = RSPAMD_REDIS_TERMINATED;
- redisAsyncFree (rt->redis);
+ redis = rt->redis;
rt->redis = NULL;
+ redisAsyncFree (redis);
}
if (st->stat) {
return NULL;
}
-
#endif