]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Implement new fuzzy updates architecture
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Nov 2015 17:28:59 +0000 (17:28 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Nov 2015 17:28:59 +0000 (17:28 +0000)
So far, fuzzy storage can run in multiple processes. However, merely one process is responsible for changes whilst others just work as proxies when dealing with updates. That should fix sqlite concurrency issues.

src/fuzzy_storage.c

index 0a3ab89afc86d6437dc82291e32307d46f1f8ca5..b1b1b27086a426b1d23726ceba58cd9661b01ae4 100644 (file)
@@ -78,10 +78,13 @@ struct rspamd_fuzzy_storage_ctx {
        gchar *update_map;
        guint keypair_cache_size;
        struct event_base *ev_base;
+       gint peer_fd;
+       struct event peer_ev;
        /* Local keypair */
        gpointer key;
        struct rspamd_keypair_cache *keypair_cache;
        struct rspamd_fuzzy_backend *backend;
+       GQueue *updates_pending;
 };
 
 enum fuzzy_cmd_type {
@@ -114,6 +117,18 @@ struct fuzzy_session {
        guchar nm[rspamd_cryptobox_MAX_NMBYTES];
 };
 
+struct fuzzy_peer_cmd {
+       union {
+               struct rspamd_fuzzy_cmd normal;
+               struct rspamd_fuzzy_shingle_cmd shingle;
+       } cmd;
+};
+
+struct fuzzy_peer_request {
+       struct event io_ev;
+       struct fuzzy_peer_cmd cmd;
+};
+
 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
 
 static gboolean
@@ -128,6 +143,35 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session)
        return TRUE;
 }
 
+static void
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
+{
+       GList *cur;
+       struct fuzzy_peer_cmd *cmd;
+       guint nupdates = 0;
+
+       cur = ctx->updates_pending->head;
+       while (cur) {
+               cmd = cur->data;
+
+               if (cmd->cmd.normal.cmd == FUZZY_WRITE) {
+                       rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal);
+               }
+               else {
+                       rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal);
+               }
+
+               g_slice_free1 (sizeof (*cmd), cmd);
+               nupdates ++;
+               cur = g_list_next (cur);
+       }
+
+       g_queue_clear (ctx->updates_pending);
+       server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+
+       msg_info ("updated fuzzy storage: %ud updates processed", nupdates);
+}
+
 static void
 rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
 {
@@ -173,26 +217,49 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
        }
 }
 
+static void
+fuzzy_peer_send_io (gint fd, gshort what, gpointer d)
+{
+       struct fuzzy_peer_request *up_req = d;
+       gssize r;
+
+       r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
+
+       if (r != sizeof (up_req->cmd)) {
+               msg_err ("cannot send update request to the peer: %s", strerror (errno));
+       }
+
+       event_del (&up_req->io_ev);
+       g_slice_free1 (sizeof (*up_req), up_req);
+}
+
 static void
 rspamd_fuzzy_process_command (struct fuzzy_session *session)
 {
-       gboolean res = FALSE, encrypted = FALSE;
+       gboolean encrypted = FALSE;
        struct rspamd_fuzzy_cmd *cmd;
        struct rspamd_fuzzy_reply result;
+       struct fuzzy_peer_cmd *up_cmd;
+       struct fuzzy_peer_request *up_req;
+       gsize up_len;
 
        switch (session->cmd_type) {
        case CMD_NORMAL:
                cmd = &session->cmd.normal;
+               up_len = sizeof (session->cmd.normal);
                break;
        case CMD_SHINGLE:
                cmd = &session->cmd.shingle.basic;
+               up_len = sizeof (session->cmd.shingle);
                break;
        case CMD_ENCRYPTED_NORMAL:
                cmd = &session->cmd.enc_normal.cmd;
+               up_len = sizeof (session->cmd.normal);
                encrypted = TRUE;
                break;
        case CMD_ENCRYPTED_SHINGLE:
                cmd = &session->cmd.enc_shingle.cmd.basic;
+               up_len = sizeof (session->cmd.shingle);
                encrypted = TRUE;
                break;
        }
@@ -210,27 +277,30 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
        else {
                result.flag = cmd->flag;
                if (rspamd_fuzzy_check_client (session)) {
-                       if (cmd->cmd == FUZZY_WRITE) {
-                               res = rspamd_fuzzy_backend_add (session->ctx->backend, cmd);
-                       }
-                       else {
-                               res = rspamd_fuzzy_backend_del (session->ctx->backend, cmd);
-                       }
-                       if (!res) {
-                               result.value = 404;
-                               result.prob = 0.0;
+
+                       if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
+                               /* Just add to the queue */
+                               up_cmd = g_slice_alloc (sizeof (*up_cmd));
+                               memcpy (up_cmd, cmd, up_len);
+                               g_queue_push_tail (session->ctx->updates_pending, up_cmd);
                        }
                        else {
-                               result.value = 0;
-                               result.prob = 1.0;
+                               /* We need to send request to the peer */
+                               up_req = g_slice_alloc (sizeof (*up_req));
+                               memcpy (&up_req->cmd, cmd, up_len);
+                               event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE,
+                                               fuzzy_peer_send_io, up_req);
+                               event_base_set (session->ctx->ev_base, &up_req->io_ev);
+                               event_add (&up_req->io_ev, NULL);
                        }
+
+                       result.value = 0;
+                       result.prob = 1.0;
                }
                else {
                        result.value = 403;
                        result.prob = 0.0;
                }
-
-               server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (session->ctx->backend);
        }
 
        result.tag = cmd->tag;
@@ -489,6 +559,7 @@ sync_callback (gint fd, short what, void *arg)
        ctx = worker->ctx;
 
        if (ctx->backend) {
+               rspamd_fuzzy_process_updates_queue (ctx);
                /* Call backend sync */
                old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
                rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -605,6 +676,67 @@ init_fuzzy (struct rspamd_config *cfg)
        return ctx;
 }
 
+static void
+rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d)
+{
+       struct fuzzy_peer_cmd cmd, *pcmd;
+       struct rspamd_fuzzy_storage_ctx *ctx = d;
+       gssize r;
+
+       r = read (fd, &cmd, sizeof (cmd));
+
+       if (r != sizeof (cmd)) {
+               msg_err ("cannot read command from peers: %s", strerror (errno));
+       }
+       else {
+               pcmd = g_slice_alloc (sizeof (*pcmd));
+               memcpy (pcmd, &cmd, sizeof (cmd));
+               g_queue_push_tail (ctx->updates_pending, pcmd);
+       }
+}
+
+static void
+fuzzy_peer_rep (struct rspamd_worker *worker,
+               struct rspamd_srv_reply *rep, gint rep_fd,
+               gpointer ud)
+{
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
+       GList *cur;
+       gint listen_socket;
+       struct event *accept_event;
+
+       ctx->peer_fd = rep_fd;
+
+       if (rep_fd == -1) {
+               msg_warn ("cannot receive peer fd from the main process");
+       }
+
+       /* Start listening */
+       cur = worker->cf->listen_socks;
+       while (cur) {
+               listen_socket = GPOINTER_TO_INT (cur->data);
+               if (listen_socket != -1) {
+                       accept_event = g_slice_alloc0 (sizeof (struct event));
+                       event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+                                       accept_fuzzy_socket, worker);
+                       event_base_set (ctx->ev_base, accept_event);
+                       event_add (accept_event, NULL);
+                       worker->accept_events = g_list_prepend (worker->accept_events,
+                                       accept_event);
+               }
+               cur = g_list_next (cur);
+       }
+
+       if (worker->index == 0 && ctx->peer_fd != -1) {
+               /* Listen for peer requests */
+               event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST,
+                               rspamd_fuzzy_peer_io, ctx);
+               event_base_set (ctx->ev_base, &ctx->peer_ev);
+               event_add (&ctx->peer_ev, NULL);
+               ctx->updates_pending = g_queue_new ();
+       }
+}
+
 /*
  * Start worker process
  */
@@ -614,10 +746,12 @@ start_fuzzy (struct rspamd_worker *worker)
        struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
        GError *err = NULL;
        gdouble next_check;
+       struct rspamd_srv_command srv_cmd;
 
        ctx->ev_base = rspamd_prepare_worker (worker,
                        "fuzzy",
-                       accept_fuzzy_socket);
+                       NULL);
+       ctx->peer_fd = -1;
        server_stat = worker->srv->stat;
 
        /*
@@ -666,13 +800,34 @@ start_fuzzy (struct rspamd_worker *worker)
        /* Maps events */
        rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
 
+       /* Get peer pipe */
+       srv_cmd.type = RSPAMD_SRV_SOCKETPAIR;
+       srv_cmd.id = ottery_rand_uint64 ();
+       srv_cmd.cmd.spair.af = SOCK_DGRAM;
+       srv_cmd.cmd.spair.pair_num = worker->index;
+       memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
+       memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));
+
+       rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, fuzzy_peer_rep, ctx);
+
        event_base_loop (ctx->ev_base, 0);
        rspamd_worker_block_signals ();
 
-       rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+       if (worker->index == 0) {
+               rspamd_fuzzy_process_updates_queue (ctx);
+               rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+       }
+
        rspamd_fuzzy_backend_close (ctx->backend);
        rspamd_log_close (worker->srv->logger);
 
+       if (ctx->peer_fd != -1) {
+               if (worker->index == 0) {
+                       event_del (&ctx->peer_ev);
+               }
+               close (ctx->peer_fd);
+       }
+
        if (ctx->keypair_cache) {
                rspamd_keypair_cache_destroy (ctx->keypair_cache);
        }