]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Start collection only mode implementation for fuzzy storage
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 26 Jan 2017 14:35:10 +0000 (14:35 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 26 Jan 2017 15:50:43 +0000 (15:50 +0000)
src/fuzzy_storage.c

index 9738c3f63a5d967f92c7f079566a1bc948cd2a0e..d1cb2053aa938424b560cc9949370dca7b987cdf 100644 (file)
@@ -41,6 +41,7 @@
 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
 #define DEFAULT_MASTER_TIMEOUT 10.0
 #define DEFAULT_UPDATES_MAXFAIL 3
+#define COOKIE_SIZE 128
 
 static const gchar *local_db_name = "local";
 
@@ -49,15 +50,31 @@ static const gchar *local_db_name = "local";
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_warn_fuzzy_update(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
-               session->name, session->uid, \
+        session->name, session->uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_info_fuzzy_update(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
-               session->name, session->uid, \
+        session->name, session->uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_debug_fuzzy_update(...)  rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
-               session->name, session->uid, \
+        session->name, session->uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_err_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+        "fuzzy_collection", session->uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_warn_fuzzy_collection(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+       "fuzzy_collection", session->uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_info_fuzzy_collection(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+       "fuzzy_collection", session->uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_debug_fuzzy_collection(...)  rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
+        "fuzzy_collection", session->uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 
@@ -132,6 +149,9 @@ struct rspamd_fuzzy_storage_ctx {
        struct fuzzy_key *default_key;
        GHashTable *keys;
        gboolean encrypted_only;
+       gboolean collection_mode;
+       struct rspamd_cryptobox_keypair *collection_keypair;
+       struct rspamd_cryptobox_pubkey *collection_sign_key;
        struct rspamd_keypair_cache *keypair_cache;
        rspamd_lru_hash_t *errors_ips;
        struct rspamd_fuzzy_backend *backend;
@@ -141,6 +161,8 @@ struct rspamd_fuzzy_storage_ctx {
        struct rspamd_dns_resolver *resolver;
        struct rspamd_config *cfg;
        struct rspamd_worker *worker;
+       struct rspamd_http_connection_router *collection_rt;
+       guchar cookie[COOKIE_SIZE];
 };
 
 enum fuzzy_cmd_type {
@@ -816,16 +838,33 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
        }
 
        result.flag = cmd->flag;
+
        if (cmd->cmd == FUZZY_CHECK) {
-               REF_RETAIN (session);
-               rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
-                               rspamd_fuzzy_check_callback, session);
+               if (G_UNLIKELY (session->ctx->collection_mode)) {
+                       result.prob = 0;
+                       result.value = 500;
+                       result.flag = 0;
+                       rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+               }
+               else {
+                       REF_RETAIN (session);
+                       rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
+                                       rspamd_fuzzy_check_callback, session);
+               }
        }
        else if (cmd->cmd == FUZZY_STAT) {
-               result.prob = 1.0;
-               result.value = 0;
-               result.flag = session->ctx->stat.fuzzy_hashes;
-               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+               if (G_UNLIKELY (session->ctx->collection_mode)) {
+                       result.prob = 0;
+                       result.value = 500;
+                       result.flag = 0;
+                       rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+               }
+               else {
+                       result.prob = 1.0;
+                       result.value = 0;
+                       result.flag = session->ctx->stat.fuzzy_hashes;
+                       rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+               }
        }
        else {
                if (rspamd_fuzzy_check_client (session)) {
@@ -1347,6 +1386,210 @@ end:
        return 0;
 }
 
+struct rspamd_fuzzy_collection_session {
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct rspamd_worker *worker;
+       rspamd_inet_addr_t *from_addr;
+       guchar uid[16];
+};
+
+static void
+rspamd_fuzzy_collection_error_handler (struct rspamd_http_connection_entry *conn_ent,
+       GError *err)
+{
+       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+
+       msg_err_fuzzy_collection ("http error occurred: %s", err->message);
+}
+
+static void
+rspamd_fuzzy_collection_finish_handler (struct rspamd_http_connection_entry *conn_ent)
+{
+       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+
+
+       rspamd_inet_address_destroy (session->from_addr);
+
+
+       g_slice_free1 (sizeof (struct rspamd_fuzzy_collection_session), session);
+}
+
+void
+rspamd_fuzzy_collection_send_error (struct rspamd_http_connection_entry *entry,
+       gint code, const gchar *error_msg, ...)
+{
+       struct rspamd_http_message *msg;
+       va_list args;
+       rspamd_fstring_t *reply;
+
+       msg = rspamd_http_new_message (HTTP_RESPONSE);
+
+       va_start (args, error_msg);
+       msg->status = rspamd_fstring_new ();
+       rspamd_vprintf_fstring (&msg->status, error_msg, args);
+       va_end (args);
+
+       msg->date = time (NULL);
+       msg->code = code;
+       reply = rspamd_fstring_sized_new (msg->status->len + 16);
+       rspamd_printf_fstring (&reply, "%V", msg->status);
+       rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+       rspamd_http_connection_reset (entry->conn);
+       rspamd_http_router_insert_headers (entry->rt, msg);
+       rspamd_http_connection_write_message (entry->conn,
+               msg,
+               NULL,
+               "text/plain",
+               entry,
+               entry->conn->fd,
+               entry->rt->ptv,
+               entry->rt->ev_base);
+       entry->is_reply = TRUE;
+}
+
+/*
+ * Note: this function steals fstring
+ */
+void
+rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry,
+       rspamd_fstring_t *fstr)
+{
+       struct rspamd_http_message *msg;
+
+       msg = rspamd_http_new_message (HTTP_RESPONSE);
+       msg->status = rspamd_fstring_new_init ("OK", 2);
+       msg->date = time (NULL);
+       msg->code = 200;
+       rspamd_http_message_set_body_from_fstring_steal (msg, fstr);
+       rspamd_http_connection_reset (entry->conn);
+       rspamd_http_router_insert_headers (entry->rt, msg);
+       rspamd_http_connection_write_message (entry->conn,
+               msg,
+               NULL,
+               "application/octet-stream",
+               entry,
+               entry->conn->fd,
+               entry->rt->ptv,
+               entry->rt->ev_base);
+       entry->is_reply = TRUE;
+}
+
+static int
+rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent,
+       struct rspamd_http_message *msg)
+{
+       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+       rspamd_fstring_t *cookie;
+
+       cookie = rspamd_fstring_new_init (session->ctx->cookie,
+                       sizeof (session->ctx->cookie));
+       rspamd_fuzzy_collection_send_fstring (conn_ent, cookie);
+
+       return 0;
+}
+
+static int
+rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent,
+       struct rspamd_http_message *msg)
+{
+       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+       rspamd_fstring_t *cookie;
+
+       cookie = rspamd_fstring_new_init (session->ctx->cookie,
+                       sizeof (session->ctx->cookie));
+       rspamd_fuzzy_collection_send_fstring (conn_ent, cookie);
+
+       return 0;
+}
+
+
+static void
+accept_fuzzy_collection_socket (gint fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       rspamd_inet_addr_t *addr;
+       gint nfd;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct rspamd_fuzzy_collection_session *session;
+
+       if ((nfd =
+                       rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+               msg_warn ("accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               return;
+       }
+
+       ctx = worker->ctx;
+
+       if (!ctx->collection_keypair) {
+               msg_err ("deny request from %s, as no local keypair is specified",
+                               rspamd_inet_address_to_string (addr));
+               rspamd_inet_address_destroy (addr);
+               close (nfd);
+
+               return;
+       }
+
+       session = g_slice_alloc0 (sizeof (*session));
+       session->ctx = ctx;
+       session->worker = worker;
+       rspamd_random_hex (session->uid, sizeof (session->uid) - 1);
+       session->uid[sizeof (session->uid) - 1] = '\0';
+       session->from_addr = addr;
+       rspamd_http_router_handle_socket (ctx->collection_rt, nfd, session);
+       msg_info_fuzzy_collection ("accepted connection from %s port %d, session ptr: %p",
+                       rspamd_inet_address_to_string (addr),
+                       rspamd_inet_address_get_port (addr),
+                       session);
+}
+
+static void
+rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud)
+{
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
+       GList *cur;
+       struct fuzzy_peer_cmd *io_cmd;
+
+       if (++ctx->updates_failed > ctx->updates_maxfail) {
+               msg_err ("cannot store more data in workqueue, discard "
+                               "%ud updates after %d missed collection points",
+                               g_queue_get_length (ctx->updates_pending),
+                               ctx->updates_maxfail);
+               ctx->updates_failed = 0;
+               cur = ctx->updates_pending->head;
+
+               while (cur) {
+                       io_cmd = cur->data;
+                       g_slice_free1 (sizeof (*io_cmd), io_cmd);
+                       cur = g_list_next (cur);
+               }
+
+               g_queue_clear (ctx->updates_pending);
+               /* Regenerate cookie */
+               ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
+       }
+       else {
+               msg_err ("fuzzy data has not been collected in time, "
+                               "%ud updates are still pending, %d updates left",
+                               g_queue_get_length (ctx->updates_pending),
+                               ctx->updates_maxfail - ctx->updates_failed);
+       }
+
+       if (ctx->worker->wanna_die) {
+               /* Plan exit */
+               struct timeval tv;
+
+               tv.tv_sec = 0;
+               tv.tv_usec = 0;
+
+               event_base_loopexit (ctx->ev_base, &tv);
+       }
+}
+
+
 static void
 accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
 {
@@ -2205,6 +2448,30 @@ init_fuzzy (struct rspamd_config *cfg)
                        G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, updates_maxfail),
                        RSPAMD_CL_FLAG_UINT,
                        "Maximum number of updates to be failed before discarding");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "collection_only",
+                       rspamd_rcl_parse_struct_boolean,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_mode),
+                       0,
+                       "Start fuzzy in collection only mode");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "collection_signkey",
+                       rspamd_rcl_parse_struct_pubkey,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_sign_key),
+                       RSPAMD_CL_FLAG_SIGNKEY,
+                       "Accept only signed requests with the specified key");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "collection_keypair",
+                       rspamd_rcl_parse_struct_keypair,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_keypair),
+                       0,
+                       "Use the specified keypair to encrypt collection protocol");
 
        return ctx;
 }
@@ -2272,8 +2539,15 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
                        else if (worker->index == 0) {
                                /* We allow TCP listeners only for a update worker */
                                accept_events = g_slice_alloc0 (sizeof (struct event) * 2);
-                               event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
-                                               accept_fuzzy_mirror_socket, worker);
+
+                               if (ctx->collection_mode) {
+                                       event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
+                                                       accept_fuzzy_collection_socket, worker);
+                               }
+                               else {
+                                       event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
+                                                       accept_fuzzy_mirror_socket, worker);
+                               }
                                event_base_set (ctx->ev_base, &accept_events[0]);
                                event_add (&accept_events[0], NULL);
                                worker->accept_events = g_list_prepend (worker->accept_events,
@@ -2318,34 +2592,80 @@ start_fuzzy (struct rspamd_worker *worker)
                        worker->srv->cfg);
        rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
                        ctx->ev_base, ctx->resolver->r);
-
-       /*
-        * Open DB and perform VACUUM
-        */
-       if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
-                       worker->cf->options, cfg, &err)) == NULL) {
-               msg_err ("cannot open backend: %e", err);
-               g_error_free (err);
-               exit (EXIT_SUCCESS);
-       }
-
-       rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
-
        if (ctx->keypair_cache_size > 0) {
                /* Create keypairs cache */
                ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size);
        }
 
-       if (worker->index == 0) {
-               ctx->updates_pending = g_queue_new ();
-               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
-                               rspamd_fuzzy_storage_periodic_callback, ctx);
+       if (!ctx->collection_mode) {
+               /*
+                * Open DB and perform VACUUM
+                */
+               if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+                               worker->cf->options, cfg, &err)) == NULL) {
+                       msg_err ("cannot open backend: %e", err);
+                       g_error_free (err);
+                       exit (EXIT_SUCCESS);
+               }
+
+               rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
+
+
+               if (worker->index == 0) {
+                       ctx->updates_pending = g_queue_new ();
+                       rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+                                       rspamd_fuzzy_storage_periodic_callback, ctx);
+               }
+
+               double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
+               event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx);
+               event_base_set (ctx->ev_base, &ctx->stat_ev);
+               event_add (&ctx->stat_ev, &ctx->stat_tv);
+
+               /* Register custom reload and stat commands for the control socket */
+               rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
+                               rspamd_fuzzy_storage_reload, ctx);
+               rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
+                               rspamd_fuzzy_storage_stat, ctx);
+               rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
+                               rspamd_fuzzy_storage_sync, ctx);
        }
+       else {
+               /*
+                * In collection mode we do a different thing:
+                * we collect fuzzy hashes in the updates queue and ignore all read commands
+                */
+               if (worker->index == 0) {
+                       ctx->updates_pending = g_queue_new ();
+                       double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
+                       event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST,
+                                       rspamd_fuzzy_collection_periodic, ctx);
+                       event_base_set (ctx->ev_base, &ctx->stat_ev);
+                       event_add (&ctx->stat_ev, &ctx->stat_tv);
+
+                       ctx->collection_rt = rspamd_http_router_new (
+                                       rspamd_fuzzy_collection_error_handler,
+                                       rspamd_fuzzy_collection_finish_handler,
+                                       &ctx->stat_tv,
+                                       ctx->ev_base,
+                                       NULL, ctx->keypair_cache);
+
+                       if (ctx->collection_keypair) {
+                               rspamd_http_router_set_key (ctx->collection_rt,
+                                               ctx->collection_keypair);
+                       }
 
-       double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
-       event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx);
-       event_base_set (ctx->ev_base, &ctx->stat_ev);
-       event_add (&ctx->stat_ev, &ctx->stat_tv);
+                       /* Generate new cookie */
+                       ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
+                       /* Register paths */
+                       rspamd_http_router_add_path (ctx->collection_rt,
+                                       "/cookie",
+                                       rspamd_fuzzy_collection_cookie_handler);
+                       rspamd_http_router_add_path (ctx->collection_rt,
+                                       "/data",
+                                       rspamd_fuzzy_collection_data_handler);
+               }
+       }
 
        if (ctx->mirrors && ctx->mirrors->len != 0) {
                if (ctx->sync_keypair == NULL) {
@@ -2361,14 +2681,6 @@ start_fuzzy (struct rspamd_worker *worker)
                }
        }
 
-       /* Register custom reload and stat commands for the control socket */
-       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
-                       rspamd_fuzzy_storage_reload, ctx);
-       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
-                       rspamd_fuzzy_storage_stat, ctx);
-       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
-                               rspamd_fuzzy_storage_sync, ctx);
-
        /* Create radix trees */
        if (ctx->update_map != NULL) {
                rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->update_map,
@@ -2407,7 +2719,13 @@ start_fuzzy (struct rspamd_worker *worker)
                event_base_loop (ctx->ev_base, 0);
        }
 
-       rspamd_fuzzy_backend_close (ctx->backend);
+       if (!ctx->collection_mode) {
+               rspamd_fuzzy_backend_close (ctx->backend);
+       }
+       else if (worker->index == 0) {
+               rspamd_http_router_free (ctx->collection_rt);
+       }
+
        rspamd_log_close (worker->srv->logger);
 
        if (ctx->peer_fd != -1) {