]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Start implementation of lazy redis statistics.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 9 Jan 2016 14:43:24 +0000 (14:43 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 9 Jan 2016 14:43:24 +0000 (14:43 +0000)
src/libstat/backends/redis_backend.c

index 683ba0fda064d9fd21766568deba62195e66214e..cf6f90d06fbeb00356c00be2fd6ff83ebc22913d 100644 (file)
 #define REDIS_DEFAULT_PORT 6379
 #define REDIS_DEFAULT_OBJECT "%s%l"
 #define REDIS_DEFAULT_TIMEOUT 0.5
+#define REDIS_STAT_TIMEOUT 30
 
 struct redis_stat_ctx {
+       struct rspamd_statfile_config *stcf;
        struct upstream_list *read_servers;
        struct upstream_list *write_servers;
-
+       struct rspamd_stat_async_elt *stat_elt;
        const gchar *redis_object;
        gdouble timeout;
 };
@@ -66,6 +68,24 @@ struct redis_stat_runtime {
        enum rspamd_redis_connection_state conn_state;
 };
 
+/* Used to get statistics from redis */
+struct rspamd_redis_stat_cbdata;
+
+struct rspamd_redis_stat_elt {
+       struct event_base *ev_base;
+       ucl_object_t *stat;
+       struct rspamd_redis_stat_cbdata *cbdata;
+};
+
+struct rspamd_redis_stat_cbdata {
+       struct rspamd_redis_stat_elt *elt;
+       redisAsyncContext *redis;
+       ucl_object_t *cur;
+       GPtrArray *cur_keys;
+       struct upstream *selected;
+       guint inflight;
+};
+
 #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
 
 static GQuark
@@ -348,6 +368,121 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, GPtrArray *tokens,
        return out;
 }
 
+static void
+rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
+{
+       guint i;
+       gchar *k;
+
+       if (cbdata) {
+               redisAsyncFree (cbdata->redis);
+               ucl_object_unref (cbdata->cur);
+
+               for (i = 0; i < cbdata->cur_keys->len; i ++) {
+                       k = g_ptr_array_index (cbdata->cur_keys, i);
+                       g_free (k);
+               }
+
+               g_ptr_array_free (cbdata->cur_keys, TRUE);
+
+               if (cbdata->elt) {
+                       cbdata->elt->cbdata = NULL;
+               }
+
+               g_slice_free1 (sizeof (*cbdata), cbdata);
+       }
+}
+
+/* Called when we have connected to the redis server and got keys to check */
+static void
+rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_stat_cbdata *cbdata = priv;
+       redisReply *reply = r, *elt;
+       gchar **k;
+       guint i, processed = 0;
+
+       if (c->err == 0 && r != NULL) {
+               if (reply->type == REDIS_REPLY_ARRAY) {
+                       g_ptr_array_set_size (cbdata->cur_keys, reply->elements);
+
+                       for (i = 0; i < reply->elements; i ++) {
+                               elt = reply->element[i];
+
+                               if (elt->type == REDIS_REPLY_STRING) {
+                                       k = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
+                                       *k = g_malloc (elt->len + 1);
+                                       rspamd_strlcpy (*k, elt->str, elt->len + 1);
+                                       processed ++;
+                               }
+                       }
+
+                       if (processed) {
+
+                       }
+               }
+       }
+       else {
+               msg_err ("cannot get keys to gather stat");
+               rspamd_upstream_fail (cbdata->selected);
+               rspamd_redis_async_cbdata_cleanup (cbdata);
+       }
+}
+
+static void
+rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
+{
+       struct redis_stat_ctx *ctx = REDIS_CTX (d);
+       struct rspamd_redis_stat_elt *redis_elt = elt->ud;
+       struct rspamd_redis_stat_cbdata *cbdata;
+       rspamd_inet_addr_t *addr;
+
+       g_assert (redis_elt != NULL);
+
+       if (redis_elt->cbdata) {
+               /* We have some other process pending */
+               rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+       }
+
+       /* Disable further events unless needed */
+       elt->enabled = FALSE;
+
+       cbdata = g_slice_alloc0 (sizeof (*cbdata));
+       cbdata->selected = rspamd_upstream_get (ctx->read_servers,
+                                       RSPAMD_UPSTREAM_ROUND_ROBIN,
+                                       NULL,
+                                       0);
+
+       g_assert (cbdata->selected != NULL);
+       addr = rspamd_upstream_addr (cbdata->selected);
+       g_assert (addr != NULL);
+       cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+                       rspamd_inet_address_get_port (addr));
+       g_assert (cbdata->redis != NULL);
+
+       redisLibeventAttach (cbdata->redis, redis_elt->ev_base);
+
+       cbdata->inflight = 1;
+       cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
+       cbdata->elt = redis_elt;
+       cbdata->cur_keys = g_ptr_array_new ();
+       redis_elt->cbdata = cbdata;
+
+       /* XXX: deal with timeouts maybe */
+       /* Get keys in redis that match our symbol */
+       redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
+                       "KEYS %s*",
+                       ctx->stcf->symbol);
+}
+
+static void
+rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
+{
+       struct rspamd_redis_stat_elt *redis_elt = elt->ud;
+
+       rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+}
+
 /* Called on connection termination */
 static void
 rspamd_redis_fin (gpointer data)
@@ -542,6 +677,7 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
 {
        struct redis_stat_ctx *backend;
        struct rspamd_statfile_config *stf = st->stcf;
+       struct rspamd_redis_stat_elt *st_elt;
        const ucl_object_t *elt;
 
        backend = g_slice_alloc0 (sizeof (*backend));
@@ -605,6 +741,15 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
        }
 
        stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND;
+       backend->stcf = stf;
+
+       st_elt = g_slice_alloc0 (sizeof (*st_elt));
+       st_elt->ev_base = ctx->ev_base;
+       backend->stat_elt = rspamd_stat_ctx_register_async (
+                       rspamd_redis_async_stat_cb,
+                       rspamd_redis_async_stat_fin,
+                       st_elt,
+                       REDIS_STAT_TIMEOUT);
 
        return (gpointer)backend;
 }