return TRUE;
}
+static gint
+rspamd_task_select_processing_stage (struct rspamd_task *task, guint stages)
+{
+ gint st;
+
+ st = ffs (task->processed_stages);
+
+ if (st == -1) {
+ st = (1 << 0);
+ }
+ else {
+ st = (1 << (st + 1));
+ }
+
+ if (stages & st) {
+ return st;
+ }
+ else if (st < RSPAMD_TASK_STAGE_DONE) {
+ /* We assume that the stage that was not requested is done */
+ task->processed_stages |= st;
+ return rspamd_task_select_processing_stage (task, stages);
+ }
+
+ /* We are done */
+ return RSPAMD_TASK_STAGE_DONE;
+}
+
+static gboolean
+rspamd_process_filters (struct rspamd_task *task)
+{
+ GList *cur;
+ struct metric *metric;
+ gpointer item = NULL;
+
+ /* Insert default metric to be sure that it exists all the time */
+ /* TODO: make preprocessing only once */
+ rspamd_create_metric_result (task, DEFAULT_METRIC);
+ if (task->settings) {
+ const ucl_object_t *wl;
+
+ wl = ucl_object_find_key (task->settings, "whitelist");
+ if (wl != NULL) {
+ msg_info ("<%s> is whitelisted", task->message_id);
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ return TRUE;
+ }
+ }
+
+ /* Process metrics symbols */
+ while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
+ /* Check reject actions */
+ cur = task->cfg->metrics_list;
+ while (cur) {
+ metric = cur->data;
+ if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
+ metric->actions[METRIC_ACTION_REJECT].score > 0 &&
+ check_metric_is_spam (task, metric)) {
+ msg_info ("<%s> has already scored more than %.2f, so do not "
+ "plan any more checks", task->message_id,
+ metric->actions[METRIC_ACTION_REJECT].score);
+ return TRUE;
+ }
+ cur = g_list_next (cur);
+ }
+ }
+
+ return TRUE;
+}
+
gboolean
-rspamd_task_process (struct rspamd_task *task,
- struct rspamd_http_message *msg, const gchar *start, gsize len,
- guint stages)
+rspamd_task_process (struct rspamd_task *task, guint stages)
{
- gint r;
+ gint st;
- if (stages & RSPAMD_TASK_STAGE_READ_MESSAGE) {
- /* Process message itself */
- r = process_message (task);
- if (r == -1) {
- msg_warn ("processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
+ if (RSPAMD_TASK_IS_PROCESSED (task)) {
+ return TRUE;
+ }
+
+ st = rspamd_task_select_processing_stage (task, stages);
+
+ switch (st) {
+ case RSPAMD_TASK_STAGE_READ_MESSAGE:
+ if (!rspamd_message_parse (task)) {
return FALSE;
}
- if (!process_extra_filters) {
- task->flags |= RSPAMD_TASK_FLAG_SKIP_EXTRA;
- }
- if (!process_extra_filters || task->cfg->pre_filters == NULL) {
- r = rspamd_process_filters (task);
+ break;
- if (r == -1) {
- task->last_error = "filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ rspamd_lua_call_pre_filters (task);
+ break;
+
+ case RSPAMD_TASK_STAGE_FILTERS:
+ if (!rspamd_process_filters (task)) {
return FALSE;
}
+ break;
- if (RSPAMD_TASK_IS_SKIPPED (task)) {
- /* Call write_socket to write reply and exit */
- task->state = WRITE_REPLY;
+ case RSPAMD_TASK_STAGE_CLASSIFIERS:
+ if (!rspamd_stat_classify (task, task->cfg->lua_state, &task->err)) {
+ return FALSE;
}
+ break;
+
+ case RSPAMD_TASK_STAGE_COMPOSITES:
+ rspamd_make_composites (task);
+ break;
+
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
+ rspamd_lua_call_post_filters (task);
+ break;
+
+ case RSPAMD_TASK_STAGE_DONE:
+ return TRUE;
+
+ default:
+ /* TODO: not implemented stage */
+ break;
+ }
+
+ if (RSPAMD_TASK_IS_SKIPPED (task)) {
+ task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
+ return TRUE;
+ }
+ if (rspamd_session_events_pending (task->s) != 0) {
+ /* We have events pending, so we consider this stage as incomplete */
+ msg_debug ("need more work on stage %d", st);
}
else {
- rspamd_lua_call_pre_filters (task);
- /* We want fin_task after pre filters are processed */
- if (rspamd_session_events_pending (task->s) != 0) {
- task->state = WAIT_PRE_FILTER;
- }
+ /* Mark the current stage as done and go to the next stage */
+ msg_debug ("completed stage %d", st);
+ task->processed_stages |= st;
+
+ /* Tail recursion */
+ return rspamd_task_process (task, stages);
}
return TRUE;
return FALSE;
}
-
-gint
-rspamd_process_filters (struct rspamd_task *task)
-{
- GList *cur;
- struct metric *metric;
- gpointer item = NULL;
-
- /* Insert default metric to be sure that it exists all the time */
- rspamd_create_metric_result (task, DEFAULT_METRIC);
- if (task->settings) {
- const ucl_object_t *wl;
-
- wl = ucl_object_find_key (task->settings, "whitelist");
- if (wl != NULL) {
- msg_info ("<%s> is whitelisted", task->message_id);
- task->flags |= RSPAMD_TASK_FLAG_SKIP;
- return 0;
- }
- }
-
- /* Process metrics symbols */
- while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
- /* Check reject actions */
- cur = task->cfg->metrics_list;
- while (cur) {
- metric = cur->data;
- if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
- metric->actions[METRIC_ACTION_REJECT].score > 0 &&
- check_metric_is_spam (task, metric)) {
- msg_info ("<%s> has already scored more than %.2f, so do not "
- "plan any more checks", task->message_id,
- metric->actions[METRIC_ACTION_REJECT].score);
- return 1;
- }
- cur = g_list_next (cur);
- }
- }
-
- if (rspamd_session_events_pending (task->s) != 0) {
- task->state = WAIT_FILTER;
- }
-
- return 1;
-}
RSPAMD_TASK_STAGE_PRE_FILTERS = (1 << 3),
RSPAMD_TASK_STAGE_FILTERS = (1 << 4),
RSPAMD_TASK_STAGE_CLASSIFIERS = (1 << 5),
- RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 6),
- RSPAMD_TASK_STAGE_WRITE_REPLY = (1 << 7)
+ RSPAMD_TASK_STAGE_COMPOSITES = (1 << 6),
+ RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 7),
+ RSPAMD_TASK_STAGE_DONE = (1 << 8)
};
#define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \
#define RSPAMD_TASK_IS_SKIPPED(task) (((task)->flags & RSPAMD_TASK_FLAG_SKIP))
#define RSPAMD_TASK_IS_JSON(task) (((task)->flags & RSPAMD_TASK_FLAG_JSON))
#define RSPAMD_TASK_IS_SPAMC(task) (((task)->flags & RSPAMD_TASK_FLAG_SPAMC))
+#define RSPAMD_TASK_IS_PROCESSED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_DONE))
typedef gint (*protocol_reply_func)(struct rspamd_task *task);
double time_virtual;
struct timeval tv;
guint32 scan_milliseconds; /**< how much milliseconds passed */
- guint32 parser_recursion; /**< for avoiding recursion stack overflow */
gboolean (*fin_callback)(struct rspamd_task *task, void *arg); /**< calback for filters finalizing */
void *fin_arg; /**< argument for fin callback */
LUA_FUNCTION_DEF (task, create_from_buffer);
/* Task methods */
LUA_FUNCTION_DEF (task, get_message);
-LUA_FUNCTION_DEF (task, process_message);
+LUA_FUNCTION_DEF (task, rspamd_message_parse);
/***
* @method task:get_cfg()
* Get configuration object for a task.
static const struct luaL_reg tasklib_m[] = {
LUA_INTERFACE_DEF (task, get_message),
LUA_INTERFACE_DEF (task, destroy),
- LUA_INTERFACE_DEF (task, process_message),
+ LUA_INTERFACE_DEF (task, rspamd_message_parse),
LUA_INTERFACE_DEF (task, set_cfg),
LUA_INTERFACE_DEF (task, get_cfg),
LUA_INTERFACE_DEF (task, get_mempool),
struct rspamd_task *task = lua_check_task (L, 1);
if (task != NULL && task->msg.len > 0) {
- if (process_message (task) == 0) {
+ if (rspamd_message_parse (task) == 0) {
lua_pushboolean (L, TRUE);
}
else {