task->ev_base = ctx->ev_base;
- task->s = new_async_session (session->pool,
+ task->s = rspamd_session_create (session->pool,
rspamd_controller_learn_fin_task,
NULL,
rspamd_task_free_hard,
if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) {
msg_warn ("filters cannot be processed for %s", task->message_id);
rspamd_controller_send_error (conn_ent, 500, task->last_error);
- destroy_session (task->s);
+ rspamd_session_destroy (task->s);
return 0;
}
session->task = task;
session->cl = cl;
session->is_spam = is_spam;
- check_session_pending (task->s);
+ rspamd_session_pending (task->s);
return 0;
}
task->resolver = ctx->resolver;
task->ev_base = ctx->ev_base;
- task->s = new_async_session (session->pool,
+ task->s = rspamd_session_create (session->pool,
rspamd_controller_check_fin_task,
NULL,
rspamd_task_free_hard,
if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) {
msg_warn ("filters cannot be processed for %s", task->message_id);
rspamd_controller_send_error (conn_ent, 500, task->last_error);
- destroy_session (task->s);
+ rspamd_session_destroy (task->s);
return 0;
}
session->task = task;
- check_session_pending (task->s);
+ rspamd_session_pending (task->s);
return 0;
}
session->ctx->worker->srv->stat->control_connections_count++;
if (session->task != NULL) {
- destroy_session (session->task->s);
+ rspamd_session_destroy (session->task->s);
}
if (session->pool) {
rspamd_mempool_delete (session->pool);
sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (r == 1) {
sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (r == 1) {
sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (r == 1) {
sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (r == 1) {
sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (r == 1) {
session->error, 0, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
session->state = SMTP_STATE_AFTER_DATA;
sizeof (CRLF) - 1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else {
- remove_normal_event (session->s,
+ rspamd_session_remove_event (session->s,
(event_finalizer_t)smtp_upstream_finalize_connection,
session);
}
1, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
return;
}
rspamd_upstream_fail (session->upstream);
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
void
session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
session->upstream_state = SMTP_STATE_GREETING;
- register_async_event (session->s,
+ rspamd_session_add_event (session->s,
(event_finalizer_t)smtp_upstream_finalize_connection,
session,
g_quark_from_static_string ("smtp proxy"));
TRUE)) {
return FALSE;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
1, FALSE, TRUE)) {
return FALSE;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (cd.action <= METRIC_ACTION_ADD_HEADER || cd.action <=
0, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
0, FALSE, TRUE)) {
goto err;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
g_object_unref (stream);
TRUE)) {
return FALSE;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
* event removing
*/
rdns_request_retain (reply->request);
- remove_normal_event (reqdata->session, rspamd_dns_fin_cb, reqdata);
+ rspamd_session_remove_event (reqdata->session, rspamd_dns_fin_cb, reqdata);
}
else if (reqdata->pool == NULL) {
g_slice_free1 (sizeof (struct rspamd_dns_request_ud), reqdata);
if (session) {
if (req != NULL) {
- register_async_event (session,
+ rspamd_session_add_event (session,
(event_finalizer_t)rspamd_dns_fin_cb,
reqdata,
g_quark_from_static_string ("dns resolver"));
struct rspamd_async_session *
-new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin,
+rspamd_session_create (rspamd_mempool_t * pool, session_finalizer_t fin,
event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
}
void
-register_async_event (struct rspamd_async_session *session,
+rspamd_session_add_event (struct rspamd_async_session *session,
event_finalizer_t fin,
void *user_data,
GQuark subsystem)
}
void
-remove_normal_event (struct rspamd_async_session *session,
+rspamd_session_remove_event (struct rspamd_async_session *session,
event_finalizer_t fin,
void *ud)
{
g_hash_table_remove (session->events, found_ev);
- check_session_pending (session);
+ rspamd_session_pending (session);
}
static gboolean
-rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
+rspamd_session_destroy_callback (gpointer k, gpointer v, gpointer unused)
{
struct rspamd_async_event *ev = v;
}
gboolean
-destroy_session (struct rspamd_async_session *session)
+rspamd_session_destroy (struct rspamd_async_session *session)
{
if (session == NULL) {
msg_info ("session is NULL");
session->flags |= RSPAMD_SESSION_FLAG_DESTROYING;
g_hash_table_foreach_remove (session->events,
- rspamd_session_destroy,
+ rspamd_session_destroy_callback,
session);
if (session->cleanup != NULL) {
}
gboolean
-check_session_pending (struct rspamd_async_session *session)
+rspamd_session_pending (struct rspamd_async_session *session)
{
gboolean ret = TRUE;
if (session->restore != NULL) {
session->restore (session->user_data);
/* Call pending once more */
- return check_session_pending (session);
+ return rspamd_session_pending (session);
}
}
else {
* @param user_data abstract user data
* @return
*/
-struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool,
+struct rspamd_async_session * rspamd_session_create (rspamd_mempool_t *pool,
session_finalizer_t fin, event_finalizer_t restore,
event_finalizer_t cleanup, gpointer user_data);
* @param user_data abstract user_data
* @param forced unused
*/
-void register_async_event (struct rspamd_async_session *session,
+void rspamd_session_add_event (struct rspamd_async_session *session,
event_finalizer_t fin, gpointer user_data, GQuark subsystem);
/**
* @param fin final callback
* @param ud user data object
*/
-void remove_normal_event (struct rspamd_async_session *session,
+void rspamd_session_remove_event (struct rspamd_async_session *session,
event_finalizer_t fin,
gpointer ud);
* Must be called at the end of session, it calls fin functions for all non-forced callbacks
* @return true if the whole session was destroyed and false if there are forced events
*/
-gboolean destroy_session (struct rspamd_async_session *session);
+gboolean rspamd_session_destroy (struct rspamd_async_session *session);
/**
* Check session for events pending and call fin callback if no events are pending
* @param session session object
* @return TRUE if session has pending events
*/
-gboolean check_session_pending (struct rspamd_async_session *session);
+gboolean rspamd_session_pending (struct rspamd_async_session *session);
/**
* Start watching for events in the session, so the specified watcher will be added
task->state = WAIT_PRE_FILTER;
}
- check_session_pending (task->s);
+ rspamd_session_pending (task->s);
return TRUE;
}
g_assert (rt->redis != NULL);
redisLibeventAttach (rt->redis, task->ev_base);
- register_async_event (task->s, rspamd_redis_fin, rt,
+ rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
rspamd_redis_stat_quark ());
return rt;
lua_http_maybe_free (struct lua_http_cbdata *cbd)
{
if (cbd->session) {
- remove_normal_event (cbd->session, lua_http_fin, cbd);
+ rspamd_session_remove_event (cbd->session, lua_http_fin, cbd);
}
else {
lua_http_fin (cbd);
cbd->fd = -1;
if (session) {
cbd->session = session;
- register_async_event (session,
+ rspamd_session_add_event (session,
(event_finalizer_t)lua_http_fin,
cbd,
g_quark_from_static_string ("lua http"));
}
if (connected) {
- remove_normal_event (ud->task->s, lua_redis_fin, ud);
+ rspamd_session_remove_event (ud->task->s, lua_redis_fin, ud);
}
}
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
}
- remove_normal_event (ud->task->s, lua_redis_fin, ud);
+ rspamd_session_remove_event (ud->task->s, lua_redis_fin, ud);
}
/**
(const gchar **)ud->args,
NULL);
if (ret == REDIS_OK) {
- register_async_event (ud->task->s,
+ rspamd_session_add_event (ud->task->s,
lua_redis_fin,
ud,
g_quark_from_static_string ("lua redis"));
}
}
- session = new_async_session (mempool,
+ session = rspamd_session_create (mempool,
lua_session_finalizer,
lua_session_restore,
lua_session_cleanup,
session = cbd->session;
if (session) {
- destroy_session (session);
+ rspamd_session_destroy (session);
return 0;
}
else {
lua_pushvalue (L, 1);
cbdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
cbdata->session = session;
- register_async_event (session,
+ rspamd_session_add_event (session,
lua_event_fin,
cbdata,
g_quark_from_static_string ("lua event"));
if (session) {
data = lua_check_event (L, 2);
if (data) {
- remove_normal_event (session, lua_event_fin, data);
+ rspamd_session_remove_event (session, lua_event_fin, data);
return 0;
}
}
lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
{
if (cbd->session) {
- remove_normal_event (cbd->session, lua_tcp_fin, cbd);
+ rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
}
else {
lua_tcp_fin (cbd);
if (session) {
cbd->session = session;
- register_async_event (session,
+ rspamd_session_add_event (session,
(event_finalizer_t)lua_tcp_fin,
cbd,
g_quark_from_static_string ("lua tcp"));
task->fin_callback = lua_util_task_fin;
task->fin_arg = &res;
task->resolver = dns_resolver_init (NULL, base, cfg);
- task->s = new_async_session (task->task_pool, rspamd_task_fin,
+ task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
rspamd_task_restore, rspamd_task_free_hard, task);
if (rspamd_task_process (task, NULL, message, mlen, TRUE)) {
errno,
strerror (errno));
rspamd_upstream_fail (session->server);
- remove_normal_event (session->task->s, fuzzy_io_fin, session);
+ rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
else {
rspamd_upstream_ok (session->server);
if (session->commands->len == 0) {
- remove_normal_event (session->task->s, fuzzy_io_fin, session);
+ rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
}
}
session->server = selected;
session->rule = rule;
event_add (&session->ev, &session->tv);
- register_async_event (task->s,
+ rspamd_session_add_event (task->s,
fuzzy_io_fin,
session,
g_quark_from_static_string ("fuzzy check"));
rspamd_inet_address_to_string (rspamd_upstream_addr (param->redirector)),
err);
rspamd_upstream_fail (param->redirector);
- remove_normal_event (param->task->s, free_redirector_session,
+ rspamd_session_remove_event (param->task->s, free_redirector_session,
param);
}
}
rspamd_upstream_ok (param->redirector);
- remove_normal_event (param->task->s, free_redirector_session,
+ rspamd_session_remove_event (param->task->s, free_redirector_session,
param);
return 0;
timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
double_to_tv (surbl_module_ctx->read_timeout, timeout);
- register_async_event (task->s,
+ rspamd_session_add_event (task->s,
free_redirector_session,
param,
g_quark_from_static_string ("surbl"));
g_list_free (session->rcpt);
}
if (session->upstream) {
- remove_normal_event (session->s,
+ rspamd_session_remove_event (session->s,
smtp_upstream_finalize_connection,
session);
session->upstream = NULL;
/* We want fin_task after pre filters are processed */
session->task->s->wanna_die = TRUE;
session->task->state = WAIT_PRE_FILTER;
- check_session_pending (session->task->s);
+ rspamd_session_pending (session->task->s);
}
}
else {
TRUE)) {
return FALSE;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
session->error, 0, FALSE, TRUE)) {
return FALSE;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
if (!smtp_write_socket (session)) {
0, FALSE, TRUE)) {
return FALSE;
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
break;
}
if (session->state == SMTP_STATE_QUIT) {
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (session->state == SMTP_STATE_WAIT_UPSTREAM) {
return FALSE;
}
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else if (session->state == SMTP_STATE_END) {
msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
/*
{
struct smtp_session *session = arg;
- remove_normal_event (session->s,
+ rspamd_session_remove_event (session->s,
(event_finalizer_t)event_del,
session->delay_timer);
if (session->state == SMTP_STATE_DELAY) {
evtimer_set (tev, smtp_delay_handler, session);
evtimer_add (tev, tv);
- register_async_event (session->s,
+ rspamd_session_add_event (session->s,
(event_finalizer_t)event_del,
tev,
g_quark_from_static_string ("smtp proxy"));
/* Resolve client's addr */
/* Set up async session */
- session->s = new_async_session (session->pool,
+ session->s = rspamd_session_create (session->pool,
NULL,
NULL,
free_smtp_session,
}
/* Free buffers */
session->state = SMTP_PROXY_STATE_REJECT;
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
/**
msg_info ("connection with %s got write error: %s",
inet_ntoa (session->client_addr),
strerror (errno));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else {
msg_info ("connection with %s got write error: %s",
inet_ntoa (session->client_addr),
strerror (errno));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
}
/* Proxy sent 500 error */
msg_info ("connection with %s got smtp error for greeting",
rspamd_upstream_name (session->upstream));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else {
msg_info ("connection with %s got read error: %s",
rspamd_upstream_name (session->upstream),
strerror (errno));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else if (session->state == SMTP_PROXY_STATE_XCLIENT) {
msg_info ("connection with %s got write error: %s",
inet_ntoa (session->client_addr),
strerror (errno));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else if (r == -1) {
/* Proxy sent 500 error */
msg_info ("connection with %s got smtp error for xclient",
rspamd_upstream_name (session->upstream));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
}
msg_info ("connection with %s got read event at improper state: %d",
rspamd_upstream_name (session->upstream),
session->state);
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else if (what == EV_WRITE) {
msg_info ("connection with %s got write error: %s",
rspamd_upstream_name (session->upstream),
strerror (errno));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else {
"connection with %s got write event at improper state: %d",
rspamd_upstream_name (session->upstream),
session->state);
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
else {
/* Timeout */
msg_info ("connection with %s timed out",
rspamd_upstream_name (session->upstream));
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
}
{
struct smtp_proxy_session *session = arg;
- remove_normal_event (session->s, (event_finalizer_t) event_del,
+ rspamd_session_remove_event (session->s, (event_finalizer_t) event_del,
session->delay_timer);
if (session->state == SMTP_PROXY_STATE_DELAY) {
/* TODO: Create upstream connection here */
evtimer_set (tev, smtp_delay_handler, session);
evtimer_add (tev, tv);
- register_async_event (session->s, (event_finalizer_t) event_del, tev,
+ rspamd_session_add_event (session->s, (event_finalizer_t) event_del, tev,
g_quark_from_static_string ("smtp proxy"));
session->delay_timer = tev;
}
0, FALSE, TRUE)) {
msg_err ("cannot write smtp error");
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
else {
/* Try to extract data */
0, FALSE, TRUE)) {
msg_err ("cannot write smtp error");
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
if (session->rcpt != NULL) {
0, FALSE, TRUE)) {
msg_err ("cannot write smtp error");
}
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
return rspamd_dispatcher_write (session->dispatcher,
struct smtp_proxy_session *session = arg;
if (session->ctx->instant_reject) {
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
return FALSE;
}
else {
g_error_free (err);
}
/* Free buffers */
- destroy_session (session->s);
+ rspamd_session_destroy (session->s);
}
/*
/* Resolve client's addr */
/* Set up async session */
- session->s = new_async_session (session->pool,
+ session->s = rspamd_session_create (session->pool,
NULL,
NULL,
free_smtp_proxy_session,
msg_info ("abnormally closing connection from: %s, error: %e",
rspamd_inet_address_to_string (task->client_addr), err);
/* Terminate session immediately */
- destroy_session (task->s);
+ rspamd_session_destroy (task->s);
}
static gint
/* We are done here */
msg_debug ("normally closing connection from: %s",
rspamd_inet_address_to_string (task->client_addr));
- destroy_session (task->s);
+ rspamd_session_destroy (task->s);
}
else if (task->state == WRITE_REPLY) {
/*
* If all filters have finished their tasks, this function will trigger
* writing a reply.
*/
- check_session_pending (task->s);
+ rspamd_session_pending (task->s);
}
return 0;
(rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks);
/* Set up async session */
- new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin,
+ new_task->s = rspamd_session_create (new_task->task_pool, rspamd_task_fin,
rspamd_task_restore, rspamd_task_free_hard, new_task);
if (ctx->key) {
struct rspamd_async_session *s = ud;
g_assert (key != NULL);
- destroy_session (s);
+ rspamd_session_destroy (s);
}
static gboolean
g_assert (ctx != NULL);
/* Key part */
- s = new_async_session (pool, session_fin, NULL, NULL, NULL);
+ s = rspamd_session_create (pool, session_fin, NULL, NULL, NULL);
g_assert (rspamd_get_dkim_key (ctx, resolver, s, test_key_handler, s));
pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
- s = new_async_session (pool, session_fin, NULL, NULL, NULL);
+ s = rspamd_session_create (pool, session_fin, NULL, NULL, NULL);
resolver = dns_resolver_init (NULL, base, cfg);