]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Unify task scan functions.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 22 Apr 2014 13:43:00 +0000 (14:43 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 22 Apr 2014 13:43:00 +0000 (14:43 +0100)
src/libmime/worker_util.c
src/libserver/task.c
src/libserver/task.h
src/worker.c

index d029f5dc4159924a7cd1fefd41b86892d46cc254..aa5719f2cdef93084eb5f92a7265d8f69c2472dd 100644 (file)
@@ -140,116 +140,3 @@ worker_stop_accept (struct rspamd_worker *worker)
                g_list_free (worker->accept_events);
        }
 }
-
-/*
- * Called if all filters are processed
- * @return TRUE if session should be terminated
- */
-gboolean
-rspamd_task_fin (void *arg)
-{
-       struct rspamd_task              *task = (struct rspamd_task *) arg;
-       gint r;
-       GError *err = NULL;
-
-       /* Task is already finished or skipped */
-       if (task->state == WRITE_REPLY) {
-               if (task->fin_callback) {
-                       task->fin_callback (task->fin_arg);
-               }
-               else {
-                       rspamd_protocol_write_reply (task);
-               }
-               return TRUE;
-       }
-
-       /* We processed all filters and want to process statfiles */
-       if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
-               /* Process all statfiles */
-               if (task->classify_pool == NULL) {
-                       /* Non-threaded version */
-                       process_statfiles (task);
-               }
-               else {
-                       /* Just process composites */
-                       make_composites (task);
-               }
-               if (task->cfg->post_filters) {
-                       /* More to process */
-                       /* Special state */
-                       task->state = WAIT_POST_FILTER;
-                       return FALSE;
-               }
-
-       }
-
-       /* We are on post-filter waiting state */
-       if (task->state != WAIT_PRE_FILTER) {
-               /* Check if we have all events finished */
-               task->state = WRITE_REPLY;
-               if (task->fin_callback) {
-                       task->fin_callback (task->fin_arg);
-               }
-               else {
-                       rspamd_protocol_write_reply (task);
-               }
-       }
-       else {
-               /* We were waiting for pre-filter */
-               if (task->pre_result.action != METRIC_ACTION_NOACTION) {
-                       /* Write result based on pre filters */
-                       task->state = WRITE_REPLY;
-                       if (task->fin_callback) {
-                               task->fin_callback (task->fin_arg);
-                       }
-                       else {
-                               rspamd_protocol_write_reply (task);
-                       }
-                       return TRUE;
-               }
-               else {
-                       task->state = WAIT_FILTER;
-                       r = process_filters (task);
-                       if (r == -1) {
-                               task->last_error = "Filter processing error";
-                               task->error_code = RSPAMD_FILTER_ERROR;
-                               task->state = WRITE_REPLY;
-                               rspamd_protocol_write_reply (task);
-                               return TRUE;
-                       }
-                       /* Add task to classify to classify pool */
-                       if (!task->is_skipped && task->classify_pool) {
-                               register_async_thread (task->s);
-                               g_thread_pool_push (task->classify_pool, task, &err);
-                               if (err != NULL) {
-                                       msg_err ("cannot pull task to the pool: %s", err->message);
-                                       remove_async_thread (task->s);
-                                       g_error_free (err);
-                               }
-                       }
-                       if (task->is_skipped) {
-                               rspamd_protocol_write_reply (task);
-                       }
-                       else {
-                               return FALSE;
-                       }
-               }
-       }
-
-       return TRUE;
-}
-
-/*
- * Called if session was restored inside fin callback
- */
-void
-rspamd_task_restore (void *arg)
-{
-       struct rspamd_task             *task = (struct rspamd_task *) arg;
-
-       /* Call post filters */
-       if (task->state == WAIT_POST_FILTER) {
-               lua_call_post_filters (task);
-       }
-       task->s->wanna_die = TRUE;
-}
index f389793dd051f212f560aadd9040f9dc090264c0..04e385479a911bf2cd3644ab153b15ab9d88cfbc 100644 (file)
@@ -24,7 +24,9 @@
 #include "task.h"
 #include "main.h"
 #include "filter.h"
+#include "protocol.h"
 #include "message.h"
+#include "lua/lua_common.h"
 
 /*
  * Destructor for recipients list in a task
@@ -102,6 +104,115 @@ rspamd_task_new (struct rspamd_worker *worker)
 }
 
 
+static void
+rspamd_task_reply (struct rspamd_task *task)
+{
+       if (task->fin_callback) {
+               task->fin_callback (task->fin_arg);
+       }
+       else {
+               rspamd_protocol_write_reply (task);
+       }
+}
+
+/*
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean
+rspamd_task_fin (void *arg)
+{
+       struct rspamd_task              *task = (struct rspamd_task *) arg;
+       gint r;
+       GError *err = NULL;
+
+       /* Task is already finished or skipped */
+       if (task->state == WRITE_REPLY) {
+               rspamd_task_reply (task);
+               return TRUE;
+       }
+
+       /* We processed all filters and want to process statfiles */
+       if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
+               /* Process all statfiles */
+               if (task->classify_pool == NULL) {
+                       /* Non-threaded version */
+                       process_statfiles (task);
+               }
+               else {
+                       /* Just process composites */
+                       make_composites (task);
+               }
+               if (task->cfg->post_filters) {
+                       /* More to process */
+                       /* Special state */
+                       task->state = WAIT_POST_FILTER;
+                       return FALSE;
+               }
+
+       }
+
+       /* We are on post-filter waiting state */
+       if (task->state != WAIT_PRE_FILTER) {
+               /* Check if we have all events finished */
+               task->state = WRITE_REPLY;
+               rspamd_task_reply (task);
+       }
+       else {
+               /* We were waiting for pre-filter */
+               if (task->pre_result.action != METRIC_ACTION_NOACTION) {
+                       /* Write result based on pre filters */
+                       task->state = WRITE_REPLY;
+                       rspamd_task_reply (task);
+                       return TRUE;
+               }
+               else {
+                       task->state = WAIT_FILTER;
+                       r = process_filters (task);
+                       if (r == -1) {
+                               task->last_error = "Filter processing error";
+                               task->error_code = RSPAMD_FILTER_ERROR;
+                               task->state = WRITE_REPLY;
+                               rspamd_task_reply (task);
+                               return TRUE;
+                       }
+                       /* Add task to classify to classify pool */
+                       if (!task->is_skipped && task->classify_pool) {
+                               register_async_thread (task->s);
+                               g_thread_pool_push (task->classify_pool, task, &err);
+                               if (err != NULL) {
+                                       msg_err ("cannot pull task to the pool: %s", err->message);
+                                       remove_async_thread (task->s);
+                                       g_error_free (err);
+                               }
+                       }
+                       if (task->is_skipped) {
+                               rspamd_task_reply (task);
+                       }
+                       else {
+                               return FALSE;
+                       }
+               }
+       }
+
+       return TRUE;
+}
+
+/*
+ * Called if session was restored inside fin callback
+ */
+void
+rspamd_task_restore (void *arg)
+{
+       struct rspamd_task             *task = (struct rspamd_task *) arg;
+
+       /* Call post filters */
+       if (task->state == WAIT_POST_FILTER && !task->skip_extra_filters) {
+               lua_call_post_filters (task);
+       }
+       task->s->wanna_die = TRUE;
+}
+
 /*
  * Free all structures of worker_task
  */
@@ -142,18 +253,85 @@ rspamd_task_free (struct rspamd_task *task, gboolean is_soft)
        }
 }
 
-void
-rspamd_task_free_hard (gpointer ud)
+void rspamd_task_free_hard (gpointer ud)
 {
-  struct rspamd_task             *task = ud;
+       struct rspamd_task *task = ud;
 
-  rspamd_task_free (task, FALSE);
+       rspamd_task_free (task, FALSE);
 }
 
-void
-rspamd_task_free_soft (gpointer ud)
+void rspamd_task_free_soft (gpointer ud)
+{
+       struct rspamd_task *task = ud;
+
+       rspamd_task_free (task, FALSE);
+}
+
+
+gboolean
+rspamd_task_process (struct rspamd_task *task,
+               struct rspamd_http_message *msg, GThreadPool *classify_pool,
+               gboolean process_extra_filters)
 {
-  struct rspamd_task             *task = ud;
+       gint r;
+       GError *err = NULL;
+
+       if (msg->body->len == 0) {
+               msg_err ("got zero length body");
+               task->last_error = "message's body is empty";
+               return FALSE;
+       }
+
+       task->msg = msg->body;
+
+       debug_task ("got string of length %z", task->msg->len);
+
+       /* We got body, set wanna_die flag */
+       task->s->wanna_die = TRUE;
+
+       r = process_message (task);
+       if (r == -1) {
+               msg_warn ("processing of message failed");
+               task->last_error = "MIME processing error";
+               task->error_code = RSPAMD_FILTER_ERROR;
+               task->state = WRITE_REPLY;
+               return FALSE;
+       }
+       task->skip_extra_filters = !process_extra_filters;
+       if (!process_extra_filters || task->cfg->pre_filters == NULL) {
+               r = process_filters (task);
+               if (r == -1) {
+                       task->last_error = "filter processing error";
+                       task->error_code = RSPAMD_FILTER_ERROR;
+                       task->state = WRITE_REPLY;
+                       return FALSE;
+               }
+               /* Add task to classify to classify pool */
+               if (!task->is_skipped && classify_pool) {
+                       register_async_thread (task->s);
+                       g_thread_pool_push (classify_pool, task, &err);
+                       if (err != NULL) {
+                               msg_err ("cannot pull task to the pool: %s", err->message);
+                               remove_async_thread (task->s);
+                               g_error_free (err);
+                       }
+                       else {
+                               task->classify_pool = classify_pool;
+                       }
+               }
+               if (task->is_skipped) {
+                       /* Call write_socket to write reply and exit */
+                       task->state = WRITE_REPLY;
+                       return TRUE;
+               }
+       }
+       else {
+               lua_call_pre_filters (task);
+               /* We want fin_task after pre filters are processed */
+               task->s->wanna_die = TRUE;
+               task->state = WAIT_PRE_FILTER;
+               check_session_pending (task->s);
+       }
 
-  rspamd_task_free (task, FALSE);
+       return TRUE;
 }
index f8f7c89e386e34900bce892a1073712a2902f017..0891dc6e236b360041aa2c4c6c56dcecf517c107 100644 (file)
@@ -76,7 +76,7 @@ struct rspamd_task {
        gint sock;                                                                                                      /**< socket descriptor                                                          */
        gboolean is_mime;                                           /**< if this task is mime task                      */
        gboolean is_json;                                                                                       /**< output is JSON                                                                     */
-       gboolean allow_learn;                                                                           /**< allow learning                                                                     */
+       gboolean skip_extra_filters;                                                            /**< skip pre and post filters                                          */
        gboolean is_skipped;                                        /**< whether message was skipped by configuration   */
 
        gchar *helo;                                                                                                    /**< helo header value                                                          */
@@ -162,4 +162,16 @@ void rspamd_task_restore (void *arg);
  */
 gboolean rspamd_task_fin (void *arg);
 
+/**
+ * Process task from http message and write reply or call task->fin_handler
+ * @param task task to process
+ * @param msg incoming http message
+ * @param classify_pool classify pool (or NULL)
+ * @param process_extra_filters whether to check pre and post filters
+ * @return task has been successfully parsed and processed
+ */
+gboolean rspamd_task_process (struct rspamd_task *task,
+               struct rspamd_http_message *msg, GThreadPool *classify_pool,
+               gboolean process_extra_filters);
+
 #endif /* TASK_H_ */
index 94ec7445108d5e1f8f5b983889ec9a29dd55ada7..9f6e269bb65e40d502a3115c48091da9e90c5fa1 100644 (file)
@@ -170,8 +170,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 {
        struct rspamd_task             *task = (struct rspamd_task *) conn->ud;
        struct rspamd_worker_ctx       *ctx;
-       ssize_t                         r;
-       GError                         *err = NULL;
 
        ctx = task->worker->ctx;
 
@@ -193,58 +191,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
                return 0;
        }
 
-       task->msg = msg->body;
+       rspamd_task_process (task, msg, ctx->classify_pool, TRUE);
 
-       debug_task ("got string of length %z", task->msg->len);
-
-       /* We got body, set wanna_die flag */
-       task->s->wanna_die = TRUE;
-
-       r = process_message (task);
-       if (r == -1) {
-               msg_warn ("processing of message failed");
-               task->last_error = "MIME processing error";
-               task->error_code = RSPAMD_FILTER_ERROR;
-               task->state = WRITE_REPLY;
-               return 0;
-       }
-       if (task->cmd == CMD_OTHER) {
-               /* Skip filters */
-               task->state = WRITE_REPLY;
-               return 0;
-       }
-       else {
-               if (task->cfg->pre_filters == NULL) {
-                       r = process_filters (task);
-                       if (r == -1) {
-                               task->last_error = "filter processing error";
-                               task->error_code = RSPAMD_FILTER_ERROR;
-                               task->state = WRITE_REPLY;
-                               return 0;
-                       }
-                       /* Add task to classify to classify pool */
-                       if (!task->is_skipped && ctx->classify_pool) {
-                               register_async_thread (task->s);
-                               g_thread_pool_push (ctx->classify_pool, task, &err);
-                               if (err != NULL) {
-                                       msg_err ("cannot pull task to the pool: %s", err->message);
-                                       remove_async_thread (task->s);
-                               }
-                       }
-                       if (task->is_skipped) {
-                               /* Call write_socket to write reply and exit */
-                               task->state = WRITE_REPLY;
-                               return 0;
-                       }
-               }
-               else {
-                       lua_call_pre_filters (task);
-                       /* We want fin_task after pre filters are processed */
-                       task->s->wanna_die = TRUE;
-                       task->state = WAIT_PRE_FILTER;
-                       check_session_pending (task->s);
-               }
-       }
        return 0;
 }
 
@@ -329,7 +277,6 @@ accept_socket (gint fd, short what, void *arg)
        /* Copy some variables */
        new_task->sock = nfd;
        new_task->is_mime = ctx->is_mime;
-       new_task->allow_learn = ctx->allow_learn;
        memcpy (&new_task->client_addr, &addr, sizeof (addr));
 
        worker->srv->stat->connections_count++;