]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Rework task processing.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 2 Jun 2015 13:19:55 +0000 (14:19 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 2 Jun 2015 14:39:37 +0000 (15:39 +0100)
src/libserver/task.c
src/libserver/task.h
src/lmtp.c
src/lua/lua_task.c
src/plugins/dkim_check.c
src/plugins/fuzzy_check.c
src/smtp.c

index c632244157963f9bfd65b7590f0e2298847c68d9..33a03049fd271b387c575ce85b008f005eb775c1 100644 (file)
@@ -329,47 +329,141 @@ rspamd_task_load_message (struct rspamd_task *task,
        return TRUE;
 }
 
+static gint
+rspamd_task_select_processing_stage (struct rspamd_task *task, guint stages)
+{
+       gint st;
+
+       st = ffs (task->processed_stages);
+
+       if (st == -1) {
+               st = (1 << 0);
+       }
+       else {
+               st = (1 << (st + 1));
+       }
+
+       if (stages & st) {
+               return st;
+       }
+       else if (st < RSPAMD_TASK_STAGE_DONE) {
+               /* We assume that the stage that was not requested is done */
+               task->processed_stages |= st;
+               return rspamd_task_select_processing_stage (task, stages);
+       }
+
+       /* We are done */
+       return RSPAMD_TASK_STAGE_DONE;
+}
+
+static gboolean
+rspamd_process_filters (struct rspamd_task *task)
+{
+       GList *cur;
+       struct metric *metric;
+       gpointer item = NULL;
+
+       /* Insert default metric to be sure that it exists all the time */
+       /* TODO: make preprocessing only once */
+       rspamd_create_metric_result (task, DEFAULT_METRIC);
+       if (task->settings) {
+               const ucl_object_t *wl;
+
+               wl = ucl_object_find_key (task->settings, "whitelist");
+               if (wl != NULL) {
+                       msg_info ("<%s> is whitelisted", task->message_id);
+                       task->flags |= RSPAMD_TASK_FLAG_SKIP;
+                       return TRUE;
+               }
+       }
+
+       /* Process metrics symbols */
+       while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
+               /* Check reject actions */
+               cur = task->cfg->metrics_list;
+               while (cur) {
+                       metric = cur->data;
+                       if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
+                               metric->actions[METRIC_ACTION_REJECT].score > 0 &&
+                               check_metric_is_spam (task, metric)) {
+                               msg_info ("<%s> has already scored more than %.2f, so do not "
+                                               "plan any more checks", task->message_id,
+                                               metric->actions[METRIC_ACTION_REJECT].score);
+                               return TRUE;
+                       }
+                       cur = g_list_next (cur);
+               }
+       }
+
+       return TRUE;
+}
+
 gboolean
-rspamd_task_process (struct rspamd_task *task,
-       struct rspamd_http_message *msg, const gchar *start, gsize len,
-       guint stages)
+rspamd_task_process (struct rspamd_task *task, guint stages)
 {
-       gint r;
+       gint st;
 
-       if (stages & RSPAMD_TASK_STAGE_READ_MESSAGE) {
-               /* Process message itself */
-               r = process_message (task);
-               if (r == -1) {
-                       msg_warn ("processing of message failed");
-                       task->last_error = "MIME processing error";
-                       task->error_code = RSPAMD_FILTER_ERROR;
+       if (RSPAMD_TASK_IS_PROCESSED (task)) {
+               return TRUE;
+       }
+
+       st = rspamd_task_select_processing_stage (task, stages);
+
+       switch (st) {
+       case RSPAMD_TASK_STAGE_READ_MESSAGE:
+               if (!rspamd_message_parse (task)) {
                        return FALSE;
                }
-       if (!process_extra_filters) {
-               task->flags |= RSPAMD_TASK_FLAG_SKIP_EXTRA;
-       }
-       if (!process_extra_filters || task->cfg->pre_filters == NULL) {
-               r = rspamd_process_filters (task);
+               break;
 
-               if (r == -1) {
-                       task->last_error = "filter processing error";
-                       task->error_code = RSPAMD_FILTER_ERROR;
-                       task->state = WRITE_REPLY;
+       case RSPAMD_TASK_STAGE_PRE_FILTERS:
+               rspamd_lua_call_pre_filters (task);
+               break;
+
+       case RSPAMD_TASK_STAGE_FILTERS:
+               if (!rspamd_process_filters (task)) {
                        return FALSE;
                }
+               break;
 
-               if (RSPAMD_TASK_IS_SKIPPED (task)) {
-                       /* Call write_socket to write reply and exit */
-                       task->state = WRITE_REPLY;
+       case RSPAMD_TASK_STAGE_CLASSIFIERS:
+               if (!rspamd_stat_classify (task, task->cfg->lua_state, &task->err)) {
+                       return FALSE;
                }
+               break;
+
+       case RSPAMD_TASK_STAGE_COMPOSITES:
+               rspamd_make_composites (task);
+               break;
+
+       case RSPAMD_TASK_STAGE_POST_FILTERS:
+               rspamd_lua_call_post_filters (task);
+               break;
+
+       case RSPAMD_TASK_STAGE_DONE:
+               return TRUE;
+
+       default:
+               /* TODO: not implemented stage */
+               break;
+       }
+
+       if (RSPAMD_TASK_IS_SKIPPED (task)) {
+               task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
+               return TRUE;
+       }
 
+       if (rspamd_session_events_pending (task->s) != 0) {
+               /* We have events pending, so we consider this stage as incomplete */
+               msg_debug ("need more work on stage %d", st);
        }
        else {
-               rspamd_lua_call_pre_filters (task);
-               /* We want fin_task after pre filters are processed */
-               if (rspamd_session_events_pending (task->s) != 0) {
-                       task->state = WAIT_PRE_FILTER;
-               }
+               /* Mark the current stage as done and go to the next stage */
+               msg_debug ("completed stage %d", st);
+               task->processed_stages |= st;
+
+               /* Tail recursion */
+               return rspamd_task_process (task, stages);
        }
 
        return TRUE;
@@ -548,48 +642,3 @@ check_metric_is_spam (struct rspamd_task *task, struct metric *metric)
 
        return FALSE;
 }
-
-gint
-rspamd_process_filters (struct rspamd_task *task)
-{
-       GList *cur;
-       struct metric *metric;
-       gpointer item = NULL;
-
-       /* Insert default metric to be sure that it exists all the time */
-       rspamd_create_metric_result (task, DEFAULT_METRIC);
-       if (task->settings) {
-               const ucl_object_t *wl;
-
-               wl = ucl_object_find_key (task->settings, "whitelist");
-               if (wl != NULL) {
-                       msg_info ("<%s> is whitelisted", task->message_id);
-                       task->flags |= RSPAMD_TASK_FLAG_SKIP;
-                       return 0;
-               }
-       }
-
-       /* Process metrics symbols */
-       while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
-               /* Check reject actions */
-               cur = task->cfg->metrics_list;
-               while (cur) {
-                       metric = cur->data;
-                       if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
-                               metric->actions[METRIC_ACTION_REJECT].score > 0 &&
-                               check_metric_is_spam (task, metric)) {
-                               msg_info ("<%s> has already scored more than %.2f, so do not "
-                                               "plan any more checks", task->message_id,
-                                               metric->actions[METRIC_ACTION_REJECT].score);
-                               return 1;
-                       }
-                       cur = g_list_next (cur);
-               }
-       }
-
-       if (rspamd_session_events_pending (task->s) != 0) {
-               task->state = WAIT_FILTER;
-       }
-
-       return 1;
-}
index 01a59b98cc5c75af97b0260830f13688b4800090..d7698a130da2c10d283a3b52a2b33d854deae657 100644 (file)
@@ -58,8 +58,9 @@ enum rspamd_task_stage {
        RSPAMD_TASK_STAGE_PRE_FILTERS = (1 << 3),
        RSPAMD_TASK_STAGE_FILTERS = (1 << 4),
        RSPAMD_TASK_STAGE_CLASSIFIERS = (1 << 5),
-       RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 6),
-       RSPAMD_TASK_STAGE_WRITE_REPLY = (1 << 7)
+       RSPAMD_TASK_STAGE_COMPOSITES = (1 << 6),
+       RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 7),
+       RSPAMD_TASK_STAGE_DONE = (1 << 8)
 };
 
 #define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \
@@ -89,6 +90,7 @@ enum rspamd_task_stage {
 #define RSPAMD_TASK_IS_SKIPPED(task) (((task)->flags & RSPAMD_TASK_FLAG_SKIP))
 #define RSPAMD_TASK_IS_JSON(task) (((task)->flags & RSPAMD_TASK_FLAG_JSON))
 #define RSPAMD_TASK_IS_SPAMC(task) (((task)->flags & RSPAMD_TASK_FLAG_SPAMC))
+#define RSPAMD_TASK_IS_PROCESSED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_DONE))
 
 typedef gint (*protocol_reply_func)(struct rspamd_task *task);
 
@@ -157,7 +159,6 @@ struct rspamd_task {
        double time_virtual;
        struct timeval tv;
        guint32 scan_milliseconds;                                  /**< how much milliseconds passed                                   */
-       guint32 parser_recursion;                                   /**< for avoiding recursion stack overflow                  */
        gboolean (*fin_callback)(struct rspamd_task *task, void *arg); /**< calback for filters finalizing                                      */
        void *fin_arg;                                              /**< argument for fin callback                                              */
 
index 91fbff17eb0a22ff6c8929e9bf06bb909cbfce76..8adb466778ade6371e17eeaf9b994d2e56a54c8e 100644 (file)
@@ -166,7 +166,7 @@ lmtp_read_socket (rspamd_fstring_t * in, void *arg)
                }
                break;
        case READ_MESSAGE:
-               r = process_message (lmtp->task);
+               r = rspamd_message_parse (lmtp->task);
                r = rspamd_process_filters (lmtp->task);
                if (r == -1) {
                        return FALSE;
index 8a8f6c640dbdc5e66b0d8d0375217b57104d9ad9..36f10044916aed0ee12478e25027fa4ba0d8316d 100644 (file)
@@ -71,7 +71,7 @@ LUA_FUNCTION_DEF (task, create_empty);
 LUA_FUNCTION_DEF (task, create_from_buffer);
 /* Task methods */
 LUA_FUNCTION_DEF (task, get_message);
-LUA_FUNCTION_DEF (task, process_message);
+LUA_FUNCTION_DEF (task, rspamd_message_parse);
 /***
  * @method task:get_cfg()
  * Get configuration object for a task.
@@ -474,7 +474,7 @@ static const struct luaL_reg tasklib_f[] = {
 static const struct luaL_reg tasklib_m[] = {
        LUA_INTERFACE_DEF (task, get_message),
        LUA_INTERFACE_DEF (task, destroy),
-       LUA_INTERFACE_DEF (task, process_message),
+       LUA_INTERFACE_DEF (task, rspamd_message_parse),
        LUA_INTERFACE_DEF (task, set_cfg),
        LUA_INTERFACE_DEF (task, get_cfg),
        LUA_INTERFACE_DEF (task, get_mempool),
@@ -623,7 +623,7 @@ lua_task_process_message (lua_State *L)
        struct rspamd_task *task = lua_check_task (L, 1);
 
        if (task != NULL && task->msg.len > 0) {
-               if (process_message (task) == 0) {
+               if (rspamd_message_parse (task) == 0) {
                        lua_pushboolean (L, TRUE);
                }
                else {
index 4ffbb96728fa8156d12fa533a4d944c1b723f81c..9787ac5a5a5e276ba7c1ddcb5f68c225747947f6 100644 (file)
@@ -425,7 +425,7 @@ dkim_symbol_callback (struct rspamd_task *task, void *unused)
        struct dkim_check_result *res = NULL, *cur;
        /* First check if a message has its signature */
 
-       hlist = message_get_header (task,
+       hlist = rspamd_message_get_header (task,
                        DKIM_SIGNHEADER,
                        FALSE);
        if (hlist != NULL) {
index 848d5d54b4634f4560e4604e928f28232acc344b..c585df7a4a1aded0c63cee52a7bae9ffeddb2cee 100644 (file)
@@ -1227,7 +1227,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
 
        saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint));
        err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *));
-       r = process_message (task);
+       r = rspamd_message_parse (task);
        if (r == -1) {
                msg_warn ("<%s>: cannot process message for fuzzy", task->message_id);
                rspamd_task_free (task, FALSE);
index 74e2323a2413c7c6fa08bd82b6ff3631f7b0157d..51d5753aafe08c1f0ed609e549dd1b6c279cea24 100644 (file)
@@ -305,7 +305,7 @@ process_smtp_data (struct smtp_session *session)
                        sizeof (struct in_addr));
                session->task->cmd = CMD_CHECK;
 
-               if (process_message (session->task) == -1) {
+               if (rspamd_message_parse (session->task) == -1) {
                        msg_err ("cannot process message");
                        munmap (session->task->msg->str, st.st_size);
                        goto err;