]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Remove threading support at all.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 26 May 2015 10:37:48 +0000 (11:37 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 26 May 2015 10:37:48 +0000 (11:37 +0100)
src/controller.c
src/libmime/filter.c
src/libmime/filter.h
src/libserver/task.c
src/libserver/task.h
src/lua/lua_util.c
src/worker.c

index 52a3dd12ac77cf3947e8bc249c48e7cf663bf84d..0d8520058a5904fe75b411d98c6f2c029d945e4b 100644 (file)
@@ -1000,14 +1000,13 @@ rspamd_controller_handle_learn_common (
                        NULL,
                        rspamd_task_free_hard,
                        task);
-       task->s->wanna_die = TRUE;
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref (conn_ent->conn);;
        task->sock = conn_ent->conn->fd;
 
 
        /* XXX: Handle encrypted messages */
-       if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, NULL, FALSE)) {
+       if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) {
                msg_warn ("filters cannot be processed for %s", task->message_id);
                rspamd_controller_send_error (conn_ent, 500, task->last_error);
                destroy_session (task->s);
@@ -1091,13 +1090,12 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
                        NULL,
                        rspamd_task_free_hard,
                        task);
-       task->s->wanna_die = TRUE;
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref (conn_ent->conn);
        task->sock = conn_ent->conn->fd;
 
        /* XXX: handle encrypted messages */
-       if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, NULL, FALSE)) {
+       if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) {
                msg_warn ("filters cannot be processed for %s", task->message_id);
                rspamd_controller_send_error (conn_ent, 500, task->last_error);
                destroy_session (task->s);
index 70e5c3c6b7043d0e8e90086c6b9ae627836e9786..e23bb8f87a8baf96e84556d63759a7e64163d22a 100644 (file)
@@ -746,24 +746,6 @@ rspamd_process_statistics (struct rspamd_task *task)
        rspamd_make_composites (task);
 }
 
-void
-rspamd_process_statistic_threaded (gpointer data, gpointer user_data)
-{
-       struct rspamd_task *task = (struct rspamd_task *)data;
-       struct lua_locked_state *nL = user_data;
-
-       if (RSPAMD_TASK_IS_SKIPPED (task)) {
-               remove_async_thread (task->s);
-               return;
-       }
-
-       /* TODO: handle err here */
-       rspamd_mutex_lock (nL->m);
-       rspamd_stat_classify (task, nL->L, NULL);
-       rspamd_mutex_unlock (nL->m);
-       remove_async_thread (task->s);
-}
-
 static void
 insert_metric_header (gpointer metric_name, gpointer metric_value,
        gpointer data)
index f763c3b8ab685c1a85ccbc9cc291724d427e32e8..67dc600104bad406930fcb35118d297f10dd5f8d 100644 (file)
@@ -108,12 +108,6 @@ gint rspamd_process_filters (struct rspamd_task *task);
  */
 void rspamd_process_statistics (struct rspamd_task *task);
 
-/**
- * Process message with statfiles threaded
- * @param data worker's task that present message from user
- */
-void rspamd_process_statistic_threaded (gpointer data, gpointer user_data);
-
 /**
  * Insert a result to task
  * @param task worker's task that present message from user
index 1874847de7ad8ae98ab099aaea39908aa4ab3395..d4fae99db89772adc0c140d3e82c18ee4ac97a48 100644 (file)
@@ -122,7 +122,6 @@ rspamd_task_fin (void *arg)
 {
        struct rspamd_task *task = (struct rspamd_task *) arg;
        gint r;
-       GError *err = NULL;
 
        /* Task is already finished or skipped */
        if (task->state == WRITE_REPLY) {
@@ -133,14 +132,9 @@ rspamd_task_fin (void *arg)
        /* We processed all filters and want to process statfiles */
        if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
                /* Process all statfiles */
-               if (task->classify_pool == NULL) {
-                       /* Non-threaded version */
-                       rspamd_process_statistics (task);
-               }
-               else {
-                       /* Just process composites */
-                       rspamd_make_composites (task);
-               }
+               /* Non-threaded version */
+               rspamd_process_statistics (task);
+
                if (task->cfg->post_filters) {
                        /* More to process */
                        /* Special state */
@@ -174,16 +168,7 @@ rspamd_task_fin (void *arg)
                                rspamd_task_reply (task);
                                return TRUE;
                        }
-                       /* Add task to classify to classify pool */
-                       if (!RSPAMD_TASK_IS_SKIPPED (task) && task->classify_pool) {
-                               register_async_thread (task->s);
-                               g_thread_pool_push (task->classify_pool, task, &err);
-                               if (err != NULL) {
-                                       msg_err ("cannot pull task to the pool: %s", err->message);
-                                       remove_async_thread (task->s);
-                                       g_error_free (err);
-                               }
-                       }
+
                        if (RSPAMD_TASK_IS_SKIPPED (task)) {
                                rspamd_task_reply (task);
                        }
@@ -209,7 +194,6 @@ rspamd_task_restore (void *arg)
                        !(task->flags & RSPAMD_TASK_FLAG_SKIP_EXTRA)) {
                rspamd_lua_call_post_filters (task);
        }
-       task->s->wanna_die = TRUE;
 }
 
 /*
@@ -294,22 +278,17 @@ rspamd_task_free_soft (gpointer ud)
 gboolean
 rspamd_task_process (struct rspamd_task *task,
        struct rspamd_http_message *msg, const gchar *start, gsize len,
-       GThreadPool *classify_pool,
        gboolean process_extra_filters)
 {
        gint r;
        guint control_len;
        struct ucl_parser *parser;
        ucl_object_t *control_obj;
-       GError *err = NULL;
 
        task->msg.start = start;
        task->msg.len = len;
        debug_task ("got string of length %z", task->msg.len);
 
-       /* We got body, set wanna_die flag */
-       task->s->wanna_die = TRUE;
-
        if (msg) {
                rspamd_protocol_handle_headers (task, msg);
        }
@@ -359,35 +338,23 @@ rspamd_task_process (struct rspamd_task *task,
        }
        if (!process_extra_filters || task->cfg->pre_filters == NULL) {
                r = rspamd_process_filters (task);
+
                if (r == -1) {
                        task->last_error = "filter processing error";
                        task->error_code = RSPAMD_FILTER_ERROR;
                        task->state = WRITE_REPLY;
                        return FALSE;
                }
-               /* Add task to classify to classify pool */
-               if (!RSPAMD_TASK_IS_SKIPPED (task) && classify_pool) {
-                       register_async_thread (task->s);
-                       g_thread_pool_push (classify_pool, task, &err);
-                       if (err != NULL) {
-                               msg_err ("cannot pull task to the pool: %s", err->message);
-                               remove_async_thread (task->s);
-                               g_error_free (err);
-                       }
-                       else {
-                               task->classify_pool = classify_pool;
-                       }
-               }
+
                if (RSPAMD_TASK_IS_SKIPPED (task)) {
                        /* Call write_socket to write reply and exit */
                        task->state = WRITE_REPLY;
                }
-               task->s->wanna_die = TRUE;
+
        }
        else {
                rspamd_lua_call_pre_filters (task);
                /* We want fin_task after pre filters are processed */
-               task->s->wanna_die = TRUE;
                task->state = WAIT_PRE_FILTER;
        }
 
index 606e4dcea7e5aae3c4c7ba84a2716bd2c63d6385..05a50ecdc8dccb896c70d6cfef16ec1507d81050 100644 (file)
@@ -151,7 +151,6 @@ struct rspamd_task {
        struct rspamd_dns_resolver *resolver;                       /**< DNS resolver                                                                   */
        struct event_base *ev_base;                                 /**< Event base                                                                             */
 
-       GThreadPool *classify_pool;                                 /**< A pool of classify threads                     */
        gpointer classify_data;                                                                         /**< Opaque classifiers data                                            */
 
        struct {
@@ -188,13 +187,11 @@ gboolean rspamd_task_fin (void *arg);
  * Process task from http message and write reply or call task->fin_handler
  * @param task task to process
  * @param msg incoming http message
- * @param classify_pool classify pool (or NULL)
  * @param process_extra_filters whether to check pre and post filters
  * @return task has been successfully parsed and processed
  */
 gboolean rspamd_task_process (struct rspamd_task *task,
        struct rspamd_http_message *msg, const gchar *start, gsize len,
-       GThreadPool *classify_pool,
        gboolean process_extra_filters);
 
 /**
index e84b3748bafb87e4b585973a0b2ab2daaddc4c85..4442a31612689e2de52d8784cc1a512600e4452a 100644 (file)
@@ -193,7 +193,7 @@ lua_util_process_message (lua_State *L)
                task->s = new_async_session (task->task_pool, rspamd_task_fin,
                                        rspamd_task_restore, rspamd_task_free_hard, task);
 
-               if (rspamd_task_process (task, NULL, message, mlen, NULL, TRUE)) {
+               if (rspamd_task_process (task, NULL, message, mlen, TRUE)) {
                        event_base_loop (base, 0);
 
                        if (res != NULL) {
index 7c72b05159533c53cdab3cd85a6c58a2fd66c04c..dec3d679fa8108061cd3ce4f06fe5f64dd0b492f 100644 (file)
 
 #include "lua/lua_common.h"
 
-#ifdef WITH_GPERF_TOOLS
-#   include <glib/gprintf.h>
-#endif
-
 /* 60 seconds for worker's IO */
 #define DEFAULT_WORKER_IO_TIMEOUT 60000
 
@@ -81,10 +77,6 @@ struct rspamd_worker_ctx {
        guint32 tasks;
        /* Limit of tasks */
        guint32 max_tasks;
-       /* Classify threads */
-       guint32 classify_threads;
-       /* Classify threads */
-       GThreadPool *classify_pool;
        /* Events base */
        struct event_base *ev_base;
        /* Encryption key */
@@ -133,7 +125,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
        }
 
 
-       if (!rspamd_task_process (task, msg, chunk, len, ctx->classify_pool, TRUE)) {
+       if (!rspamd_task_process (task, msg, chunk, len, TRUE)) {
                task->state = WRITE_REPLY;
        }
 
@@ -180,7 +172,6 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
                 * If all filters have finished their tasks, this function will trigger
                 * writing a reply.
                 */
-               task->s->wanna_die = TRUE;
                check_session_pending (task->s);
        }
 
@@ -254,8 +245,6 @@ accept_socket (gint fd, short what, void *arg)
        new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin,
                        rspamd_task_restore, rspamd_task_free_hard, new_task);
 
-       new_task->classify_pool = ctx->classify_pool;
-
        if (ctx->key) {
                rspamd_http_connection_set_key (new_task->http_conn, ctx->key);
        }
@@ -279,7 +268,6 @@ init_worker (struct rspamd_config *cfg)
 
        ctx->is_mime = TRUE;
        ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
-       ctx->classify_threads = 1;
 
        rspamd_rcl_register_worker_option (cfg, type, "mime",
                rspamd_rcl_parse_struct_boolean, ctx,
@@ -307,12 +295,6 @@ init_worker (struct rspamd_config *cfg)
                G_STRUCT_OFFSET (struct rspamd_worker_ctx,
                max_tasks), RSPAMD_CL_FLAG_INT_32);
 
-       rspamd_rcl_register_worker_option (cfg, type, "classify_threads",
-               rspamd_rcl_parse_struct_integer, ctx,
-               G_STRUCT_OFFSET (struct rspamd_worker_ctx,
-               classify_threads), RSPAMD_CL_FLAG_INT_32);
-
-
        rspamd_rcl_register_worker_option (cfg, type, "keypair",
                rspamd_rcl_parse_struct_keypair, ctx,
                G_STRUCT_OFFSET (struct rspamd_worker_ctx,
@@ -328,8 +310,6 @@ void
 start_worker (struct rspamd_worker *worker)
 {
        struct rspamd_worker_ctx *ctx = worker->ctx;
-       GError *err = NULL;
-       struct lua_locked_state *nL;
 
        ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
        msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -344,22 +324,6 @@ start_worker (struct rspamd_worker *worker)
        rspamd_upstreams_library_init (ctx->resolver->r, ctx->ev_base);
        rspamd_upstreams_library_config (worker->srv->cfg);
 
-       /* Create classify pool */
-       ctx->classify_pool = NULL;
-       if (ctx->classify_threads > 1) {
-               nL = rspamd_init_lua_locked (worker->srv->cfg);
-               ctx->classify_pool = g_thread_pool_new (rspamd_process_statistic_threaded,
-                               nL,
-                               ctx->classify_threads,
-                               TRUE,
-                               &err);
-               if (err != NULL) {
-                       msg_err ("pool create failed: %e", err);
-                       g_error_free (err);
-                       ctx->classify_pool = NULL;
-               }
-       }
-
        /* XXX: stupid default */
        ctx->keys_cache = rspamd_keypair_cache_new (256);