]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add support for separate read and write servers in fuzzy check
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 1 May 2025 08:17:00 +0000 (09:17 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 1 May 2025 08:22:04 +0000 (09:22 +0100)
src/plugins/fuzzy_check.c

index 4ed94a141d363d3396cf6fc390a816504ea8b111..35d7adf422d8ecdc0e4648d671d2f1b0f23a3303 100644 (file)
@@ -78,7 +78,9 @@ enum fuzzy_rule_mode {
 };
 
 struct fuzzy_rule {
-       struct upstream_list *servers;
+       struct upstream_list *servers;      /* For backward compatibility */
+       struct upstream_list *read_servers;  /* Servers for read operations */
+       struct upstream_list *write_servers; /* Servers for write operations */
        const char *symbol;
        const char *algorithm_str;
        const char *name;
@@ -559,6 +561,67 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
                        return -1;
                }
        }
+       else {
+               /* Check for read_servers and write_servers */
+               gboolean has_read = FALSE, has_write = FALSE;
+
+               if ((value = ucl_object_lookup(obj, "read_servers")) != NULL) {
+                       rule->read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+                       rspamd_upstreams_set_limits(rule->read_servers,
+                                                                               (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN,
+                                                                               (unsigned int) fuzzy_module_ctx->max_errors, 0);
+
+                       rspamd_mempool_add_destructor(cfg->cfg_pool,
+                                                                               (rspamd_mempool_destruct_t) rspamd_upstreams_destroy,
+                                                                               rule->read_servers);
+                       if (!rspamd_upstreams_from_ucl(rule->read_servers, value, DEFAULT_PORT, NULL)) {
+                               msg_err_config("cannot read read_servers definition");
+                               return -1;
+                       }
+                       has_read = TRUE;
+               }
+
+               if ((value = ucl_object_lookup(obj, "write_servers")) != NULL) {
+                       rule->write_servers = rspamd_upstreams_create(cfg->ups_ctx);
+                       rspamd_upstreams_set_limits(rule->write_servers,
+                                                                               (double) fuzzy_module_ctx->revive_time, NAN, NAN, NAN,
+                                                                               (unsigned int) fuzzy_module_ctx->max_errors, 0);
+
+                       rspamd_mempool_add_destructor(cfg->cfg_pool,
+                                                                               (rspamd_mempool_destruct_t) rspamd_upstreams_destroy,
+                                                                               rule->write_servers);
+                       if (!rspamd_upstreams_from_ucl(rule->write_servers, value, DEFAULT_PORT, NULL)) {
+                               msg_err_config("cannot read write_servers definition");
+                               return -1;
+                       }
+                       has_write = TRUE;
+               }
+
+               /* If we have both read and write servers, we don't need the common servers list */
+               if (has_read && has_write) {
+                       rule->servers = NULL;
+               }
+               else if (has_read) {
+                       /* Use read_servers for all operations */
+                       rule->servers = rule->read_servers;
+                       rule->write_servers = rule->read_servers;
+               }
+               else if (has_write) {
+                       /* Use write_servers for all operations */
+                       rule->servers = rule->write_servers;
+                       rule->read_servers = rule->write_servers;
+               }
+       }
+
+       /* Ensure all server lists are properly set */
+       if (rule->servers != NULL) {
+               if (rule->read_servers == NULL) {
+                       rule->read_servers = rule->servers;
+               }
+               if (rule->write_servers == NULL) {
+                       rule->write_servers = rule->servers;
+               }
+       }
        if ((value = ucl_object_lookup(obj, "fuzzy_map")) != NULL) {
                it = NULL;
                while ((cur = ucl_object_iterate(value, &it, true)) != NULL) {
@@ -896,6 +959,24 @@ int fuzzy_check_module_init(struct rspamd_config *cfg, struct module_ctx **ctx)
                                                           0,
                                                           NULL,
                                                           0);
+       rspamd_rcl_add_doc_by_path(cfg,
+                                                          "fuzzy_check.rule",
+                                                          "List of servers to check (read-only operations)",
+                                                          "read_servers",
+                                                          UCL_STRING,
+                                                          NULL,
+                                                          0,
+                                                          NULL,
+                                                          0);
+       rspamd_rcl_add_doc_by_path(cfg,
+                                                          "fuzzy_check.rule",
+                                                          "List of servers to learn (write operations)",
+                                                          "write_servers",
+                                                          UCL_STRING,
+                                                          NULL,
+                                                          0,
+                                                          NULL,
+                                                          0);
        rspamd_rcl_add_doc_by_path(cfg,
                                                           "fuzzy_check.rule",
                                                           "If true then never try to learn this fuzzy storage",
@@ -3398,8 +3479,8 @@ register_fuzzy_client_call(struct rspamd_task *task,
        int sock;
 
        if (!rspamd_session_blocked(task->s)) {
-               /* Get upstream */
-               selected = rspamd_upstream_get(rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+               /* Get upstream - use read_servers for check operations */
+               selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
                                                                           NULL, 0);
                if (selected) {
                        addr = rspamd_upstream_addr_next(selected);
@@ -3526,9 +3607,8 @@ register_fuzzy_controller_call(struct rspamd_http_connection_entry *entry,
        int sock;
        int ret = -1;
 
-       /* Get upstream */
-
-       while ((selected = rspamd_upstream_get_forced(rule->servers,
+       /* Get upstream - use write_servers for learn/unlearn operations */
+       while ((selected = rspamd_upstream_get_forced(rule->write_servers,
                                                                                                  RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
                /* Create UDP socket */
                addr = rspamd_upstream_addr_next(selected);
@@ -3898,7 +3978,7 @@ fuzzy_check_send_lua_learn(struct fuzzy_rule *rule,
 
        /* Get upstream */
        if (!rspamd_session_blocked(task->s)) {
-               while ((selected = rspamd_upstream_get(rule->servers,
+               while ((selected = rspamd_upstream_get(rule->write_servers,
                                                                                           RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
                        /* Create UDP socket */
                        addr = rspamd_upstream_addr_next(selected);
@@ -4495,9 +4575,22 @@ fuzzy_lua_list_storages(lua_State *L)
                lua_setfield(L, -2, "read_only");
 
                /* Push servers */
-               lua_createtable(L, rspamd_upstreams_count(rule->servers), 0);
-               rspamd_upstreams_foreach(rule->servers, lua_upstream_str_inserter, L);
-               lua_setfield(L, -2, "servers");
+               if (rule->read_servers == rule->write_servers) {
+                       /* Same servers for both operations */
+                       lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0);
+                       rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L);
+                       lua_setfield(L, -2, "servers");
+               }
+               else {
+                       /* Different servers for read and write */
+                       lua_createtable(L, rspamd_upstreams_count(rule->read_servers), 0);
+                       rspamd_upstreams_foreach(rule->read_servers, lua_upstream_str_inserter, L);
+                       lua_setfield(L, -2, "read_servers");
+                       
+                       lua_createtable(L, rspamd_upstreams_count(rule->write_servers), 0);
+                       rspamd_upstreams_foreach(rule->write_servers, lua_upstream_str_inserter, L);
+                       lua_setfield(L, -2, "write_servers");
+               }
 
                /* Push flags */
                GHashTableIter it;
@@ -4828,4 +4921,4 @@ fuzzy_lua_ping_storage(lua_State *L)
 
        lua_pushboolean(L, TRUE);
        return 1;
-}
\ No newline at end of file
+}