break;
case STATE_LEARN:
session->learn_buf = in;
- task = construct_task (session->worker);
+ task = rspamd_task_new (session->worker);
task->msg = g_string_new_len (in->begin, in->len);
task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
session->state = STATE_REPLY;
if (session->restful) {
r = rspamd_snprintf (out_buf, sizeof (out_buf), "HTTP/1.0 500 Cannot process message" CRLF CRLF);
}
if (!learn_task (session->learn_symbol, task, &err)) {
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
if (err) {
if (session->restful) {
i = rspamd_snprintf (out_buf, sizeof (out_buf), "HTTP/1.0 500 Learn classifier error: %s" CRLF CRLF, err->message);
return TRUE;
}
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
if (session->restful) {
i = rspamd_snprintf (out_buf, sizeof (out_buf), "HTTP/1.0 200 Learn OK" CRLF CRLF);
}
break;
case STATE_LEARN_SPAM_PRE:
session->learn_buf = in;
- task = construct_task (session->worker);
+ task = rspamd_task_new (session->worker);
task->msg = g_string_new_len (in->begin, in->len);
return FALSE;
}
/* Set up async session */
- task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
+ task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, rspamd_task_free_hard, task);
session->learn_task = task;
session->state = STATE_LEARN_SPAM;
rspamd_dispatcher_pause (session->dispatcher);
break;
case STATE_WEIGHTS:
session->learn_buf = in;
- task = construct_task (session->worker);
+ task = rspamd_task_new (session->worker);
task->msg = g_string_new_len (in->begin, in->len);
task->ev_base = session->ev_base;
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
session->state = STATE_REPLY;
r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF END);
if (!session->restful) {
if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer,
session->session_pool, &c, &tokens, FALSE, part->is_utf, part->urls_offset)) {
i = rspamd_snprintf (out_buf, sizeof (out_buf), "weights failed, tokenizer error" CRLF END);
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
if (!session->restful) {
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
return FALSE;
/* Handle messages without text */
if (tokens == NULL) {
i = rspamd_snprintf (out_buf, sizeof (out_buf), "weights failed, no tokens can be extracted (no text data)" CRLF END);
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
return FALSE;
}
}
}
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
session->state = STATE_REPLY;
break;
#define RSPAMD_EVENTS_H
#include "config.h"
+#include "mem_pool.h"
struct rspamd_async_event;
lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto));
- new_task = construct_task (worker);
+ new_task = rspamd_task_new (worker);
if (su.ss.ss_family == AF_UNIX) {
msg_info ("accepted connection from unix socket");
{
struct rspamd_task **ptask, *task;
- task = construct_task (NULL);
+ task = rspamd_task_new (NULL);
ptask = lua_newuserdata (L, sizeof (gpointer));
lua_setclass (L, "rspamd{task}", -1);
*ptask = task;
data = luaL_checklstring (L, 1, &len);
if (data) {
- task = construct_task (NULL);
+ task = rspamd_task_new (NULL);
ptask = lua_newuserdata (L, sizeof (gpointer));
lua_setclass (L, "rspamd{task}", -1);
*ptask = task;
struct rspamd_task *task = lua_check_task (L);
if (task != NULL) {
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
}
return 0;
/* Worker task manipulations */
-/**
- * Construct new task for worker
- */
-struct rspamd_task* construct_task (struct rspamd_worker *worker);
-/**
- * Destroy task object and remove its IO dispatcher if it exists
- */
-void free_task (struct rspamd_task *task, gboolean is_soft);
-void free_task_hard (gpointer ud);
-void free_task_soft (gpointer ud);
-
-/**
- * Called if session was restored inside fin callback
- */
-void rspamd_restore_task (void *arg);
-
-/**
- * Called if all filters are processed
- * @return TRUE if session should be terminated
- */
-gboolean rspamd_fin_task (void *arg);
-
/**
* Set counter for a symbol
*/
return processed;
err:
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
return -1;
}
}
/* Prepare task */
- task = construct_task (session->worker);
+ task = rspamd_task_new (session->worker);
session->other_data = task;
session->state = STATE_WAIT;
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
- free_task (task, FALSE);
+ rspamd_task_free (task, FALSE);
session->state = STATE_REPLY;
if (session->restful) {
r = rspamd_snprintf (out_buf, sizeof (out_buf), "HTTP/1.0 500 Cannot process message" CRLF CRLF);
cur = g_list_next (cur);
}
- rspamd_mempool_add_destructor (session->session_pool, (rspamd_mempool_destruct_t)free_task_soft, task);
+ rspamd_mempool_add_destructor (session->session_pool, (rspamd_mempool_destruct_t)rspamd_task_free_soft, task);
if (res == -1) {
session->state = STATE_REPLY;
/* Now mmap temp file if it is small enough */
session->temp_size = st.st_size;
if (session->ctx->max_size == 0 || st.st_size < (off_t)session->ctx->max_size) {
- session->task = construct_task (session->worker);
+ session->task = rspamd_task_new (session->worker);
session->task->resolver = session->resolver;
session->task->fin_callback = smtp_write_socket;
session->task->fin_arg = session;
if (session) {
if (session->task) {
- free_task (session->task, FALSE);
+ rspamd_task_free (session->task, FALSE);
if (session->task->msg->str) {
munmap (session->task->msg->str, session->task->msg->len);
}
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+#include "task.h"
+#include "main.h"
+#include "filter.h"
+#include "message.h"
+
+/*
+ * Destructor for recipients list in a task
+ */
+static void
+rcpt_destruct (void *pointer)
+{
+ struct rspamd_task *task = (struct rspamd_task *) pointer;
+
+ if (task->rcpt) {
+ g_list_free (task->rcpt);
+ }
+}
+
+/*
+ * Create new task
+ */
+struct rspamd_task *
+rspamd_task_new (struct rspamd_worker *worker)
+{
+ struct rspamd_task *new_task;
+
+ new_task = g_slice_alloc0 (sizeof (struct rspamd_task));
+
+ new_task->worker = worker;
+ new_task->state = READ_MESSAGE;
+ if (worker) {
+ new_task->cfg = worker->srv->cfg;
+ }
+#ifdef HAVE_CLOCK_GETTIME
+# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID
+ clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts);
+# elif defined(HAVE_CLOCK_VIRTUAL)
+ clock_gettime (CLOCK_VIRTUAL, &new_task->ts);
+# else
+ clock_gettime (CLOCK_REALTIME, &new_task->ts);
+# endif
+#endif
+ if (gettimeofday (&new_task->tv, NULL) == -1) {
+ msg_warn ("gettimeofday failed: %s", strerror (errno));
+ }
+
+ new_task->task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
+
+ /* Add destructor for recipients list (it would be better to use anonymous function here */
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t) rcpt_destruct, new_task);
+ new_task->results = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t) g_hash_table_destroy,
+ new_task->results);
+ new_task->re_cache = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t) g_hash_table_destroy,
+ new_task->re_cache);
+ new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t) g_hash_table_destroy,
+ new_task->raw_headers);
+ new_task->emails = g_tree_new (compare_email_func);
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t) g_tree_destroy,
+ new_task->emails);
+ new_task->urls = g_tree_new (compare_url_func);
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t) g_tree_destroy,
+ new_task->urls);
+ new_task->sock = -1;
+ new_task->is_mime = TRUE;
+ new_task->pre_result.action = METRIC_ACTION_NOACTION;
+
+ new_task->message_id = new_task->queue_id = "undef";
+
+ return new_task;
+}
+
+
+/*
+ * Free all structures of worker_task
+ */
+void
+rspamd_task_free (struct rspamd_task *task, gboolean is_soft)
+{
+ GList *part;
+ struct mime_part *p;
+
+ if (task) {
+ debug_task ("free pointer %p", task);
+ while ((part = g_list_first (task->parts))) {
+ task->parts = g_list_remove_link (task->parts, part);
+ p = (struct mime_part *) part->data;
+ g_byte_array_free (p->content, TRUE);
+ g_list_free_1 (part);
+ }
+ if (task->text_parts) {
+ g_list_free (task->text_parts);
+ }
+ if (task->images) {
+ g_list_free (task->images);
+ }
+ if (task->messages) {
+ g_list_free (task->messages);
+ }
+ if (task->received) {
+ g_list_free (task->received);
+ }
+ if (task->http_conn != NULL) {
+ rspamd_http_connection_unref (task->http_conn);
+ }
+ if (task->sock != -1) {
+ close (task->sock);
+ }
+ rspamd_mempool_delete (task->task_pool);
+ g_slice_free1 (sizeof (struct rspamd_task), task);
+ }
+}
+
+void
+rspamd_task_free_hard (gpointer ud)
+{
+ struct rspamd_task *task = ud;
+
+ rspamd_task_free (task, FALSE);
+}
+
+void
+rspamd_task_free_soft (gpointer ud)
+{
+ struct rspamd_task *task = ud;
+
+ rspamd_task_free (task, FALSE);
+}
} pre_result; /**< Result of pre-filters */
};
+/**
+ * Construct new task for worker
+ */
+struct rspamd_task* rspamd_task_new (struct rspamd_worker *worker);
+/**
+ * Destroy task object and remove its IO dispatcher if it exists
+ */
+void rspamd_task_free (struct rspamd_task *task, gboolean is_soft);
+void rspamd_task_free_hard (gpointer ud);
+void rspamd_task_free_soft (gpointer ud);
+
+/**
+ * Called if session was restored inside fin callback
+ */
+void rspamd_task_restore (void *arg);
+
+/**
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean rspamd_task_fin (void *arg);
+
#endif /* TASK_H_ */
if (!evb) {
msg_err ("cannot allocate evbuffer for reply");
evhttp_send_reply (cbdata->req, HTTP_INTERNAL, "500 insufficient memory", NULL);
- free_task_hard (cbdata->task);
+ rspamd_task_free_hard (cbdata->task);
return;
}
cbdata->out = evb;
evhttp_send_reply (cbdata->req, HTTP_OK, "OK", evb);
evbuffer_free (evb);
- free_task_hard (cbdata->task);
+ rspamd_task_free_hard (cbdata->task);
}
/* If task is not processed, just do nothing */
return;
}
/* Prepare task */
- task = construct_task (ctx->worker);
+ task = rspamd_task_new (ctx->worker);
if (task == NULL) {
g_set_error (err, g_quark_from_static_string ("webui"), 500, "task cannot be created");
return NULL;
if (process_message (task) == -1) {
msg_warn ("processing of message failed");
g_set_error (err, g_quark_from_static_string ("webui"), 500, "message cannot be processed");
- free_task_hard (task);
+ rspamd_task_free_hard (task);
return NULL;
}
if (process_filters (task) == -1) {
msg_warn ("filtering of message failed");
g_set_error (err, g_quark_from_static_string ("webui"), 500, "message cannot be filtered");
- free_task_hard (task);
+ rspamd_task_free_hard (task);
return NULL;
}
if (!evb) {
msg_err ("cannot allocate evbuffer for reply");
evhttp_send_reply (cbdata->req, HTTP_INTERNAL, "500 insufficient memory", NULL);
- free_task_hard (cbdata->task);
+ rspamd_task_free_hard (cbdata->task);
return;
}
evhttp_send_reply (cbdata->req, HTTP_INTERNAL + err->code, err->message, evb);
evbuffer_free (evb);
g_error_free (err);
- free_task_hard (cbdata->task);
+ rspamd_task_free_hard (cbdata->task);
return;
}
/* Successful learn */
evhttp_send_reply (cbdata->req, HTTP_OK, "OK", evb);
evbuffer_free (evb);
- free_task_hard (cbdata->task);
+ rspamd_task_free_hard (cbdata->task);
}
/* If task is not processed, just do nothing */
return;
return NULL;
}
/* Prepare task */
- task = construct_task (ctx->worker);
+ task = rspamd_task_new (ctx->worker);
if (task == NULL) {
g_set_error (err, g_quark_from_static_string ("webui"), 500, "task cannot be created");
return NULL;
if (process_message (task) == -1) {
msg_warn ("processing of message failed");
g_set_error (err, g_quark_from_static_string ("webui"), 500, "message cannot be processed");
- free_task_hard (task);
+ rspamd_task_free_hard (task);
return NULL;
}
if (process_filters (task) == -1) {
msg_warn ("filtering of message failed");
g_set_error (err, g_quark_from_static_string ("webui"), 500, "message cannot be filtered");
- free_task_hard (task);
+ rspamd_task_free_hard (task);
return NULL;
}
return 0;
}
- task = construct_task (session->ctx->worker);
+ task = rspamd_task_new (session->ctx->worker);
task->ev_base = session->ctx->ev_base;
task->msg = msg->body;
task->ev_base = ctx->ev_base;
task->s = new_async_session (session->pool, rspamd_webui_learn_fin_task, NULL,
- free_task_hard, task);
+ rspamd_task_free_hard, task);
task->s->wanna_die = TRUE;
task->fin_arg = conn_ent;
return;
}
- new_task = construct_task (worker);
+ new_task = rspamd_task_new (worker);
msg_info ("accepted connection from %s port %d",
rspamd_inet_address_to_string (&addr),
rspamd_mempool_add_destructor (new_task->task_pool, (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks);
/* Set up async session */
- new_task->s = new_async_session (new_task->task_pool, rspamd_fin_task,
- rspamd_restore_task, free_task_hard, new_task);
+ new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin,
+ rspamd_task_restore, rspamd_task_free_hard, new_task);
new_task->classify_pool = ctx->classify_pool;
extern struct rspamd_main *rspamd_main;
-/*
- * Destructor for recipients list in a task
- */
-static void
-rcpt_destruct (void *pointer)
-{
- struct rspamd_task *task = (struct rspamd_task *) pointer;
-
- if (task->rcpt) {
- g_list_free (task->rcpt);
- }
-}
-
-/*
- * Create new task
- */
-struct rspamd_task *
-construct_task (struct rspamd_worker *worker)
-{
- struct rspamd_task *new_task;
-
- new_task = g_slice_alloc0 (sizeof (struct rspamd_task));
-
- new_task->worker = worker;
- new_task->state = READ_MESSAGE;
- if (worker) {
- new_task->cfg = worker->srv->cfg;
- }
-#ifdef HAVE_CLOCK_GETTIME
-# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID
- clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts);
-# elif defined(HAVE_CLOCK_VIRTUAL)
- clock_gettime (CLOCK_VIRTUAL, &new_task->ts);
-# else
- clock_gettime (CLOCK_REALTIME, &new_task->ts);
-# endif
-#endif
- if (gettimeofday (&new_task->tv, NULL) == -1) {
- msg_warn ("gettimeofday failed: %s", strerror (errno));
- }
-
- new_task->task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
-
- /* Add destructor for recipients list (it would be better to use anonymous function here */
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) rcpt_destruct, new_task);
- new_task->results = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) g_hash_table_destroy,
- new_task->results);
- new_task->re_cache = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) g_hash_table_destroy,
- new_task->re_cache);
- new_task->raw_headers = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) g_hash_table_destroy,
- new_task->raw_headers);
- new_task->emails = g_tree_new (compare_email_func);
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) g_tree_destroy,
- new_task->emails);
- new_task->urls = g_tree_new (compare_url_func);
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t) g_tree_destroy,
- new_task->urls);
- new_task->sock = -1;
- new_task->is_mime = TRUE;
- new_task->pre_result.action = METRIC_ACTION_NOACTION;
-
- new_task->message_id = new_task->queue_id = "undef";
-
- return new_task;
-}
-
/**
* Return worker's control structure by its type
* @param type
return NULL;
}
-
-/*
- * Free all structures of worker_task
- */
-void
-free_task (struct rspamd_task *task, gboolean is_soft)
-{
- GList *part;
- struct mime_part *p;
-
- if (task) {
- debug_task ("free pointer %p", task);
- while ((part = g_list_first (task->parts))) {
- task->parts = g_list_remove_link (task->parts, part);
- p = (struct mime_part *) part->data;
- g_byte_array_free (p->content, TRUE);
- g_list_free_1 (part);
- }
- if (task->text_parts) {
- g_list_free (task->text_parts);
- }
- if (task->images) {
- g_list_free (task->images);
- }
- if (task->messages) {
- g_list_free (task->messages);
- }
- if (task->received) {
- g_list_free (task->received);
- }
- if (task->http_conn != NULL) {
- rspamd_http_connection_unref (task->http_conn);
- }
- if (task->sock != -1) {
- close (task->sock);
- }
- rspamd_mempool_delete (task->task_pool);
- g_slice_free1 (sizeof (struct rspamd_task), task);
- }
-}
-
-void
-free_task_hard (gpointer ud)
-{
- struct rspamd_task *task = ud;
-
- free_task (task, FALSE);
-}
-
-void
-free_task_soft (gpointer ud)
-{
- struct rspamd_task *task = ud;
-
- free_task (task, FALSE);
-}
-
double
set_counter (const gchar *name, guint32 value)
{
* @return TRUE if session should be terminated
*/
gboolean
-rspamd_fin_task (void *arg)
+rspamd_task_fin (void *arg)
{
struct rspamd_task *task = (struct rspamd_task *) arg;
gint r;
* Called if session was restored inside fin callback
*/
void
-rspamd_restore_task (void *arg)
+rspamd_task_restore (void *arg)
{
struct rspamd_task *task = (struct rspamd_task *) arg;