]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Implement the rest functions for redis fuzzy backend
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 5 Sep 2016 14:26:08 +0000 (15:26 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 5 Sep 2016 14:26:08 +0000 (15:26 +0100)
src/libserver/fuzzy_backend.c
src/libserver/fuzzy_backend.h
src/libserver/fuzzy_backend_redis.c

index 84f2289e802f16b383bed9818aae5caf6a5fa83a..030028389df4c693f5f9c67f949104430838881e 100644 (file)
@@ -442,3 +442,9 @@ rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
 {
        return backend->ev_base;
 }
+
+gdouble
+rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend)
+{
+       return backend->expire;
+}
index 1eaa0fe2b465677b0a190c67e633fbf6a516a772..6c880d9c8a3734bae68c6e1a6114c88cf35afec6 100644 (file)
@@ -102,6 +102,7 @@ void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend,
                void *ud);
 
 struct event_base* rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend);
+gdouble rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend);
 
 /**
  * Closes backend
index 007b75856253545b98c90d9a05f0b15182d7bbfa..66490d386cf84df0ab0a1ea9534329fa68292b13 100644 (file)
@@ -634,15 +634,6 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
        }
 }
 
-void
-rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
-               GQueue *updates, const gchar *src,
-               rspamd_fuzzy_update_cb cb, void *ud,
-               void *subr_ud)
-{
-       struct rspamd_fuzzy_backend_redis *backend = subr_ud;
-}
-
 static void
 rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
                gpointer priv)
@@ -759,6 +750,47 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
        }
 }
 
+static void
+rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r,
+               gpointer priv)
+{
+       struct rspamd_fuzzy_redis_session *session = priv;
+       redisReply *reply = r;
+       gulong nelts;
+
+       event_del (&session->timeout);
+
+       if (c->err == 0) {
+               rspamd_upstream_ok (session->up);
+
+               if (reply->type == REDIS_REPLY_INTEGER) {
+                       if (session->callback.cb_version) {
+                               session->callback.cb_version (reply->integer, session->cbdata);
+                       }
+               }
+               else if (reply->type == REDIS_REPLY_STRING) {
+                       nelts = strtoul (reply->str, NULL, 10);
+
+                       if (session->callback.cb_version) {
+                               session->callback.cb_version (nelts, session->cbdata);
+                       }
+               }
+               else {
+                       if (session->callback.cb_version) {
+                               session->callback.cb_version (0, session->cbdata);
+                       }
+               }
+       }
+       else {
+               if (session->callback.cb_version) {
+                       session->callback.cb_version (0, session->cbdata);
+               }
+               rspamd_upstream_fail (session->up);
+       }
+
+       rspamd_fuzzy_redis_session_dtor (session);
+}
+
 void
 rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
                const gchar *src,
@@ -766,8 +798,73 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
                void *subr_ud)
 {
        struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+       struct rspamd_fuzzy_redis_session *session;
+       struct upstream *up;
+       struct timeval tv;
+       rspamd_inet_addr_t *addr;
+       GString *key;
 
        g_assert (backend != NULL);
+
+       session = g_slice_alloc0 (sizeof (*session));
+       session->backend = backend;
+       REF_RETAIN (session->backend);
+
+       session->callback.cb_version = cb;
+       session->cbdata = ud;
+       session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
+       session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+
+       session->nargs = 2;
+       session->argv = g_malloc (sizeof (gchar *) * 2);
+       session->argv_lens = g_malloc (sizeof (gsize) * 2);
+       key = g_string_new (backend->redis_object);
+       g_string_append (key, src);
+       session->argv[0] = g_strdup ("GET");
+       session->argv_lens[0] = 3;
+       session->argv[1] = key->str;
+       session->argv_lens[1] = key->len;
+       g_string_free (key, FALSE); /* Do not free underlying array */
+
+       up = rspamd_upstream_get (backend->read_servers,
+                       RSPAMD_UPSTREAM_ROUND_ROBIN,
+                       NULL,
+                       0);
+
+       session->up = up;
+       addr = rspamd_upstream_addr (up);
+       g_assert (addr != NULL);
+       session->ctx = rspamd_redis_pool_connect (backend->pool,
+                       backend->dbname, backend->password,
+                       rspamd_inet_address_to_string (addr),
+                       rspamd_inet_address_get_port (addr));
+
+       if (session->ctx == NULL) {
+               rspamd_fuzzy_redis_session_dtor (session);
+
+               if (cb) {
+                       cb (0, subr_ud);
+               }
+       }
+       else {
+               if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback,
+                               session, session->nargs,
+                               (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
+                       rspamd_fuzzy_redis_session_dtor (session);
+
+                       if (cb) {
+                               cb (0, subr_ud);
+                       }
+               }
+               else {
+                       /* Add timeout */
+                       event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
+                                       session);
+                       event_base_set (session->ev_base, &session->timeout);
+                       double_to_tv (backend->timeout, &tv);
+                       event_add (&session->timeout, &tv);
+               }
+       }
 }
 
 const gchar*
@@ -789,6 +886,340 @@ rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
        g_assert (backend != NULL);
 }
 
+static gboolean
+rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
+               struct rspamd_fuzzy_redis_session *session,
+               struct fuzzy_peer_cmd *io_cmd, guint *shift)
+{
+       GString *key, *value;
+       guint cur_shift = *shift;
+       struct rspamd_fuzzy_cmd *cmd;
+
+       if (io_cmd->is_shingle) {
+               cmd = &io_cmd->cmd.shingle.basic;
+
+               if (cmd->cmd == FUZZY_WRITE) {
+
+               }
+       }
+       else {
+               cmd = &io_cmd->cmd.normal;
+
+       }
+
+       if (cmd->cmd == FUZZY_WRITE) {
+               /*
+                * For each normal hash addition we do 3 redis commands:
+                * HSET <key> F <flag>
+                * HINCRBY <key> V <weight>
+                * EXPIRE <key> <expire>
+                * Where <key> is <prefix> || <digest>
+                */
+
+               /* HSET */
+               key = g_string_new (session->backend->redis_object);
+               g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+               value = g_string_sized_new (32);
+               rspamd_printf_gstring (value, "%d", cmd->flag);
+               session->argv[cur_shift] = g_strdup ("HSET");
+               session->argv_lens[cur_shift++] = sizeof ("HSET") - 1;
+               session->argv[cur_shift] = key->str;
+               session->argv_lens[cur_shift++] = key->len;
+               session->argv[cur_shift] = g_strdup ("F");
+               session->argv_lens[cur_shift++] = sizeof ("F") - 1;
+               session->argv[cur_shift] = value->str;
+               session->argv_lens[cur_shift++] = value->len;
+               g_string_free (key, FALSE);
+               g_string_free (value, FALSE);
+
+               if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+                               3,
+                               (const gchar **)&session->argv[cur_shift - 4],
+                               &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+                       return FALSE;
+               }
+
+               /* HINCRBY */
+               key = g_string_new (session->backend->redis_object);
+               g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+               value = g_string_sized_new (32);
+               rspamd_printf_gstring (value, "%d", cmd->value);
+               session->argv[cur_shift] = g_strdup ("HINCRBY");
+               session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1;
+               session->argv[cur_shift] = key->str;
+               session->argv_lens[cur_shift++] = key->len;
+               session->argv[cur_shift] = g_strdup ("V");
+               session->argv_lens[cur_shift++] = sizeof ("V") - 1;
+               session->argv[cur_shift] = value->str;
+               session->argv_lens[cur_shift++] = value->len;
+               g_string_free (key, FALSE);
+               g_string_free (value, FALSE);
+
+               if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+                               3,
+                               (const gchar **)&session->argv[cur_shift - 4],
+                               &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+                       return FALSE;
+               }
+
+               /* EXPIRE */
+               key = g_string_new (session->backend->redis_object);
+               g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+               value = g_string_sized_new (32);
+               rspamd_printf_gstring (value, "%d",
+                               (gint)rspamd_fuzzy_backend_get_expire (bk));
+               session->argv[cur_shift] = g_strdup ("EXPIRE");
+               session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
+               session->argv[cur_shift] = key->str;
+               session->argv_lens[cur_shift++] = key->len;
+               session->argv[cur_shift] = value->str;
+               session->argv_lens[cur_shift++] = value->len;
+               g_string_free (key, FALSE);
+               g_string_free (value, FALSE);
+
+               if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+                               3,
+                               (const gchar **)&session->argv[cur_shift - 3],
+                               &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+                       return FALSE;
+               }
+       }
+
+       *shift = cur_shift;
+
+       return TRUE;
+}
+
+static void
+rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r,
+               gpointer priv)
+{
+       struct rspamd_fuzzy_redis_session *session = priv;
+       redisReply *reply = r;
+       event_del (&session->timeout);
+
+       if (c->err == 0) {
+               rspamd_upstream_ok (session->up);
+
+               if (reply->type == REDIS_REPLY_ARRAY) {
+                       /* TODO: check all replies somehow */
+                       if (session->callback.cb_update) {
+                               session->callback.cb_update (TRUE, session->cbdata);
+                       }
+               }
+               else {
+                       if (session->callback.cb_update) {
+                               session->callback.cb_update (FALSE, session->cbdata);
+                       }
+               }
+       }
+       else {
+               if (session->callback.cb_update) {
+                       session->callback.cb_update (FALSE, session->cbdata);
+               }
+
+               rspamd_upstream_fail (session->up);
+       }
+
+       rspamd_fuzzy_redis_session_dtor (session);
+}
+
+void
+rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
+               GQueue *updates, const gchar *src,
+               rspamd_fuzzy_update_cb cb, void *ud,
+               void *subr_ud)
+{
+       struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+       struct rspamd_fuzzy_redis_session *session;
+       struct upstream *up;
+       struct timeval tv;
+       rspamd_inet_addr_t *addr;
+       GList *cur;
+       GString *key;
+       struct fuzzy_peer_cmd *io_cmd;
+       struct rspamd_fuzzy_cmd *cmd;
+       guint nargs, ncommands, cur_shift;
+
+       g_assert (backend != NULL);
+
+       session = g_slice_alloc0 (sizeof (*session));
+       session->backend = backend;
+       REF_RETAIN (session->backend);
+
+       /*
+        * For each normal hash addition we do 3 redis commands:
+        * HSET <key> F <flag>
+        * HINCRBY <key> V <weight>
+        * EXPIRE <key> <expire>
+        *
+        * Where <key> is <prefix> || <digest>
+        *
+        * For each command with shingles we additionally emit 32 commands:
+        * SETEX <prefix>_<number>_<value> <expire> <digest>
+        *
+        * For each delete command we emit:
+        * DEL <key>
+        *
+        * For each delete command with shingles we emit also 32 commands:
+        * DEL <prefix>_<number>_<value>
+        */
+
+       ncommands = 3; /* For MULTI + EXEC */
+       nargs = 5;
+
+       for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) {
+               io_cmd = cur->data;
+
+               if (io_cmd->is_shingle) {
+                       cmd = &io_cmd->cmd.shingle.basic;
+               }
+               else {
+                       cmd = &io_cmd->cmd.normal;
+               }
+
+               if (cmd->cmd == FUZZY_WRITE) {
+                       ncommands += 3;
+                       nargs += 11;
+
+                       if (io_cmd->is_shingle) {
+                               ncommands += RSPAMD_SHINGLE_SIZE;
+                               nargs += RSPAMD_SHINGLE_SIZE * 4;
+                       }
+
+               }
+               else if (cmd->cmd == FUZZY_DEL) {
+                       ncommands += 1;
+                       nargs += 2;
+
+                       if (io_cmd->is_shingle) {
+                               ncommands += RSPAMD_SHINGLE_SIZE;
+                               nargs += RSPAMD_SHINGLE_SIZE * 2;
+                       }
+               }
+       }
+
+       /* Now we need to create a new request */
+       session->callback.cb_update = cb;
+       session->cbdata = ud;
+       session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
+       session->cmd = cmd;
+       session->prob = 1.0;
+       session->ev_base = rspamd_fuzzy_backend_event_base (bk);
+
+       /* First of all check digest */
+       session->nargs = nargs;
+       session->argv = g_malloc (sizeof (gchar *) * session->nargs);
+       session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
+
+       up = rspamd_upstream_get (backend->write_servers,
+                       RSPAMD_UPSTREAM_MASTER_SLAVE,
+                       NULL,
+                       0);
+
+       session->up = up;
+       addr = rspamd_upstream_addr (up);
+       g_assert (addr != NULL);
+       session->ctx = rspamd_redis_pool_connect (backend->pool,
+                       backend->dbname, backend->password,
+                       rspamd_inet_address_to_string (addr),
+                       rspamd_inet_address_get_port (addr));
+
+       if (session->ctx == NULL) {
+               rspamd_fuzzy_redis_session_dtor (session);
+
+               if (cb) {
+                       cb (FALSE, subr_ud);
+               }
+       }
+       else {
+               /* Start with MULTI command */
+               session->argv[0] = g_strdup ("MULTI");
+               session->argv_lens[0] = 5;
+
+               if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+                               1,
+                               (const gchar **)session->argv,
+                               session->argv_lens) != REDIS_OK) {
+
+                       if (cb) {
+                               cb (FALSE, subr_ud);
+                       }
+                       rspamd_fuzzy_redis_session_dtor (session);
+
+                       return;
+               }
+
+               /* Now split the rest of commands in packs and emit them command by command */
+               cur_shift = 1;
+
+               for (cur = updates->head; cur != NULL; cur = g_list_next (cur)) {
+                       io_cmd = cur->data;
+
+                       if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd,
+                                       &cur_shift)) {
+                               if (cb) {
+                                       cb (FALSE, subr_ud);
+                               }
+                               rspamd_fuzzy_redis_session_dtor (session);
+
+                               return;
+                       }
+               }
+
+               /* Now INCR command for the source */
+               key = g_string_new (backend->redis_object);
+               g_string_append (key, src);
+               session->argv[cur_shift] = g_strdup ("INCR");
+               session->argv_lens[cur_shift ++] = 4;
+               session->argv[cur_shift] = key->str;
+               session->argv_lens[cur_shift ++] = key->len;
+               g_string_free (key, FALSE);
+
+               if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+                               2,
+                               (const gchar **)&session->argv[cur_shift - 2],
+                               &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+                       if (cb) {
+                               cb (FALSE, subr_ud);
+                       }
+                       rspamd_fuzzy_redis_session_dtor (session);
+
+                       return;
+               }
+
+               /* Finally we call EXEC with a specific callback */
+               session->argv[cur_shift] = g_strdup ("EXEC");
+               session->argv_lens[cur_shift] = 4;
+
+               if (redisAsyncCommandArgv (session->ctx,
+                               rspamd_fuzzy_redis_update_callback, session,
+                               1,
+                               (const gchar **)&session->argv[cur_shift],
+                               &session->argv_lens[cur_shift]) != REDIS_OK) {
+
+                       if (cb) {
+                               cb (FALSE, subr_ud);
+                       }
+                       rspamd_fuzzy_redis_session_dtor (session);
+
+                       return;
+               }
+               else {
+                       /* Add timeout */
+                       event_set (&session->timeout, -1, EV_TIMEOUT, rspamd_fuzzy_redis_timeout,
+                                       session);
+                       event_base_set (session->ev_base, &session->timeout);
+                       double_to_tv (backend->timeout, &tv);
+                       event_add (&session->timeout, &tv);
+               }
+       }
+}
+
 void
 rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
                void *subr_ud)