#include "worker_util.h"
#include "libutil/http_connection.h"
#include "libutil/http_private.h"
+#include "libutil/libev_helper.h"
#include "unix-std.h"
#include "utlist.h"
#include <sys/resource.h>
#endif
-static struct timeval io_timeout = {
- .tv_sec = 30,
- .tv_usec = 0
-};
-static struct timeval worker_io_timeout = {
- .tv_sec = 0,
- .tv_usec = 500000
-};
+static ev_tstamp io_timeout = 30.0;
+static ev_tstamp worker_io_timeout = 0.5;
struct rspamd_control_session;
struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
- struct event io_ev;
+ struct rspamd_io_ev ev;
struct rspamd_worker *wrk;
gpointer ud;
gint attached_fd;
struct rspamd_control_session {
gint fd;
+ struct ev_loop *event_loop;
struct rspamd_main *rspamd_main;
struct rspamd_http_connection *conn;
struct rspamd_control_command cmd;
NULL,
"application/json",
session,
- &io_timeout);
+ io_timeout);
}
static void
NULL,
"application/json",
session,
- &io_timeout);
+ io_timeout);
}
static void
rspamd_inet_address_to_string (session->addr));
DL_FOREACH_SAFE (session->replies, elt, telt) {
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (session->event_loop,
+ &elt->ev);
g_free (elt);
}
}
session->replies_remain --;
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (session->event_loop,
+ &elt->ev);
if (session->replies_remain == 0) {
rspamd_control_write_reply (session);
rep_elt = g_malloc0 (sizeof (*rep_elt));
rep_elt->wrk = wrk;
rep_elt->ud = ud;
- event_set (&rep_elt->io_ev, wrk->control_pipe[0],
- EV_READ | EV_PERSIST, handler,
+ rspamd_ev_watcher_init (&rep_elt->ev,
+ wrk->control_pipe[0],
+ EV_READ, handler,
rep_elt);
- event_base_set (rspamd_main->event_loop,
- &rep_elt->io_ev);
- event_add (&rep_elt->io_ev, &worker_io_timeout);
+ rspamd_ev_watcher_start (rspamd_main->event_loop,
+ &rep_elt->ev, worker_io_timeout);
DL_APPEND (res, rep_elt);
}
session->rspamd_main = rspamd_main;
session->addr = addr;
rspamd_http_connection_read_message (session->conn, session,
- &io_timeout);
+ io_timeout);
}
struct rspamd_worker_control_data {
- struct event io_ev;
+ ev_io io_ev;
struct rspamd_worker *worker;
struct ev_loop *ev_base;
struct {
}
static void
-rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
+rspamd_control_default_worker_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker_control_data *cd = ud;
+ struct rspamd_worker_control_data *cd =
+ (struct rspamd_worker_control_data *)w->data;
static struct rspamd_control_command cmd;
static struct msghdr msg;
static struct iovec iov;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = recvmsg (fd, &msg, 0);
+ r = recvmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot read request from the control socket: %s",
strerror (errno));
if (errno != EAGAIN && errno != EINTR) {
- event_del (&cd->io_ev);
- close (fd);
+ ev_io_stop (cd->ev_base, &cd->io_ev);
+ close (w->fd);
}
}
else if (r < (gint)sizeof (cmd)) {
(gint)sizeof (cmd));
if (r == 0) {
- event_del (&cd->io_ev);
- close (fd);
+ ev_io_stop (cd->ev_base, &cd->io_ev);
+ close (w->fd);
}
}
else if ((gint)cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) {
if (cd->handlers[cmd.type].handler) {
cd->handlers[cmd.type].handler (cd->worker->srv,
cd->worker,
- fd,
+ w->fd,
rfd,
&cmd,
cd->handlers[cmd.type].ud);
}
else {
- rspamd_control_default_cmd_handler (fd, rfd, cd, &cmd);
+ rspamd_control_default_cmd_handler (w->fd, rfd, cd, &cmd);
}
}
else {
cd->worker = worker;
cd->ev_base = ev_base;
- event_set (&cd->io_ev, worker->control_pipe[1], EV_READ | EV_PERSIST,
- rspamd_control_default_worker_handler, cd);
- event_base_set (ev_base, &cd->io_ev);
- event_add (&cd->io_ev, NULL);
+ cd->io_ev.data = cd;
+ ev_io_init (&cd->io_ev, rspamd_control_default_worker_handler,
+ worker->control_pipe[1], EV_READ);
+ ev_io_start (ev_base, &cd->io_ev);
worker->control_data = cd;
}
};
static void
-rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_hs_io_handler (int fd, short what, void *ud)
{
- struct rspamd_control_reply_elt *elt = ud;
+ struct rspamd_control_reply_elt *elt =
+ (struct rspamd_control_reply_elt *)ud;
struct rspamd_control_reply rep;
/* At this point we just ignore replies from the workers */
(void)read (fd, &rep, sizeof (rep));
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
g_free (elt);
}
static void
-rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
{
- struct rspamd_control_reply_elt *elt = ud;
+ struct rspamd_control_reply_elt *elt =
+ (struct rspamd_control_reply_elt *)ud;
struct rspamd_control_reply rep;
/* At this point we just ignore replies from the workers */
(void) read (fd, &rep, sizeof (rep));
- event_del (&elt->io_ev);
+ rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
g_free (elt);
}
}
static void
-rspamd_srv_handler (gint fd, short what, gpointer ud)
+rspamd_srv_handler (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker;
static struct rspamd_srv_command cmd;
struct rspamd_control_command wcmd;
gssize r;
- if (what == EV_READ) {
- worker = ud;
+ if (revents == EV_READ) {
+ worker = (struct rspamd_worker *)w->data;
srv = worker->srv;
iov.iov_base = &cmd;
iov.iov_len = sizeof (cmd);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = recvmsg (fd, &msg, 0);
+ r = recvmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot read from worker's srv pipe: %s",
* Usually this means that a worker is dead, so do not try to read
* anything
*/
- event_del (&worker->srv_ev);
+ ev_io_stop (EV_A_ w);
}
else if (r != sizeof (cmd)) {
msg_err ("cannot read from worker's srv pipe incomplete command: %d",
}
/* Now plan write event and send data back */
- event_del (&worker->srv_ev);
- event_set (&worker->srv_ev,
- worker->srv_pipe[0],
- EV_WRITE,
- rspamd_srv_handler,
- rdata);
- event_add (&worker->srv_ev, NULL);
+ w->data = rdata;
+ ev_io_stop (EV_A_ w);
+ ev_io_set (w, worker->srv_pipe[0], EV_WRITE);
+ ev_io_start (EV_A_ w);
}
}
- else if (what == EV_WRITE) {
- rdata = ud;
+ else if (revents == EV_WRITE) {
+ rdata = (struct rspamd_srv_reply_data *)w->data;
worker = rdata->worker;
worker->tmp_data = NULL; /* Avoid race */
srv = rdata->srv;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = sendmsg (fd, &msg, 0);
+ r = sendmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot write to worker's srv pipe: %s",
}
g_free (rdata);
- event_del (&worker->srv_ev);
- event_set (&worker->srv_ev,
- worker->srv_pipe[0],
- EV_READ | EV_PERSIST,
- rspamd_srv_handler,
- worker);
- event_add (&worker->srv_ev, NULL);
+ w->data = worker;
+ ev_io_stop (EV_A_ w);
+ ev_io_set (w, worker->srv_pipe[0], EV_READ);
+ ev_io_start (EV_A_ w);
}
}
g_assert (worker != NULL);
worker->tmp_data = NULL;
- event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
- rspamd_srv_handler, worker);
- event_base_set (ev_base, &worker->srv_ev);
- event_add (&worker->srv_ev, NULL);
+ worker->srv_ev.data = worker;
+ ev_io_init (&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ);
+ ev_io_start (ev_base, &worker->srv_ev);
}
struct rspamd_srv_request_data {
gint attached_fd;
struct rspamd_srv_reply rep;
rspamd_srv_reply_handler handler;
- struct event io_ev;
+ ev_io io_ev;
gpointer ud;
};
static void
-rspamd_srv_request_handler (gint fd, short what, gpointer ud)
+rspamd_srv_request_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_srv_request_data *rd = ud;
+ struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *)w->data;
struct msghdr msg;
struct iovec iov;
guchar fdspace[CMSG_SPACE(sizeof (int))];
gssize r;
gint rfd = -1;
- if (what == EV_WRITE) {
+ if (revents == EV_WRITE) {
/* Send request to server */
memset (&msg, 0, sizeof (msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = sendmsg (fd, &msg, 0);
+ r = sendmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot write to server pipe: %s", strerror (errno));
goto cleanup;
}
- event_del (&rd->io_ev);
- event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ,
- rspamd_srv_request_handler, rd);
- event_add (&rd->io_ev, NULL);
+ ev_io_stop (EV_A_ w);
+ ev_io_set (w, rd->worker->srv_pipe[1], EV_READ);
+ ev_io_start (EV_A_ w);
}
else {
iov.iov_base = &rd->rep;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- r = recvmsg (fd, &msg, 0);
+ r = recvmsg (w->fd, &msg, 0);
if (r == -1) {
msg_err ("cannot read from server pipe: %s", strerror (errno));
if (rd->handler) {
rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
}
- event_del (&rd->io_ev);
+
+ ev_io_stop (EV_A_ w);
g_free (rd);
}
rd->rep.type = cmd->type;
rd->attached_fd = attached_fd;
- event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE,
- rspamd_srv_request_handler, rd);
- event_base_set (ev_base, &rd->io_ev);
- event_add (&rd->io_ev, NULL);
+ rd->io_ev.data = rd;
+ ev_io_init (&rd->io_ev, rspamd_srv_request_handler,
+ rd->worker->srv_pipe[1], EV_WRITE);
+ ev_io_start (ev_base, &rd->io_ev);
}
#include "libstat/stat_api.h"
#include <math.h>
#include <src/libmime/message.h>
+#include "libutil/libev_helper.h"
#define DEFAULT_SYMBOL "R_FUZZY_HASH"
struct rspamd_symcache_item *item;
struct upstream *server;
struct fuzzy_rule *rule;
- struct event ev;
- struct event timev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
gint state;
gint fd;
guint retransmits;
struct upstream *server;
struct fuzzy_rule *rule;
struct rspamd_task *task;
- struct event ev;
- struct event timev;
- struct timeval tv;
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
gint fd;
guint retransmits;
};
g_ptr_array_free (session->results, TRUE);
}
- event_del (&session->ev);
- event_del (&session->timev);
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
close (session->fd);
}
return FALSE;
}
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_client_session *session = arg;
+ struct rspamd_task *task;
+
+ task = session->task;
+
+ /* We might be here because of other checks being slow */
+ if (fuzzy_check_try_read (session) > 0) {
+ if (fuzzy_check_session_is_completed (session)) {
+ return;
+ }
+ }
+
+ if (session->retransmits >= session->rule->ctx->retransmits) {
+ msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+ rspamd_upstream_name (session->server),
+ rspamd_inet_address_to_string_pretty (
+ rspamd_upstream_addr_cur (session->server)),
+ session->retransmits);
+ rspamd_upstream_fail (session->server, TRUE);
+
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+ }
+ rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Plan write event */
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ|EV_WRITE);
+ session->retransmits ++;
+ }
+}
+
/* Fuzzy check callback */
static void
fuzzy_check_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
struct rspamd_task *task;
- struct ev_loop *ev_base;
gint r;
enum {
}
}
else {
- /* Should not happen */
- g_assert (0);
+ fuzzy_check_timer_callback (fd, what, arg);
+ return;
}
if (ret == return_want_more) {
/* Processed write, switch to reading */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
}
else if (ret == return_error) {
/* Error state */
/* Read something from network */
if (!fuzzy_check_session_is_completed (session)) {
/* Need to read more */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, session->fd, EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
}
}
}
-/* Fuzzy check timeout callback */
+
static void
-fuzzy_check_timer_callback (gint fd, short what, void *arg)
+fuzzy_lua_fin (void *ud)
{
- struct fuzzy_client_session *session = arg;
+ struct fuzzy_learn_session *session = ud;
+
+ (*session->saved)--;
+
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
+ close (session->fd);
+}
+
+/* Controller IO */
+
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+ struct fuzzy_learn_session *session = arg;
struct rspamd_task *task;
- struct ev_loop *ev_base;
task = session->task;
- /* We might be here because of other checks being slow */
- if (fuzzy_check_try_read (session) > 0) {
- if (fuzzy_check_session_is_completed (session)) {
- return;
- }
- }
-
if (session->retransmits >= session->rule->ctx->retransmits) {
- msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+ rspamd_upstream_fail (session->server, TRUE);
+ msg_err_task_check ("got IO timeout with server %s(%s), "
+ "after %d retransmits",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr_cur (session->server)),
session->retransmits);
- rspamd_upstream_fail (session->server, TRUE);
- if (session->item) {
- rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+ if (session->session) {
+ rspamd_session_remove_event (session->session, fuzzy_lua_fin,
+ session);
+ }
+ else {
+ if (session->http_entry) {
+ rspamd_controller_send_error (session->http_entry,
+ 500, "IO timeout with fuzzy storage");
+ }
+
+ if (*session->saved > 0 ) {
+ (*session->saved)--;
+ if (*session->saved == 0) {
+ if (session->http_entry) {
+ rspamd_task_free (session->task);
+ }
+
+ session->task = NULL;
+ }
+ }
+
+ if (session->http_entry) {
+ rspamd_http_connection_unref (session->http_entry->conn);
+ }
+
+ rspamd_ev_watcher_stop (session->event_loop,
+ &session->ev);
+ close (session->fd);
}
- rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
}
else {
/* Plan write event */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_WRITE|EV_READ,
- fuzzy_check_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
-
- /* Plan new retransmit timer */
- ev_base = event_get_base (&session->timev);
- event_del (&session->timev);
- event_base_set (ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ|EV_WRITE);
session->retransmits ++;
}
}
-static void
-fuzzy_lua_fin (void *ud)
-{
- struct fuzzy_learn_session *session = ud;
-
- (*session->saved)--;
-
- event_del (&session->ev);
- event_del (&session->timev);
- close (session->fd);
-}
-
-/* Controller IO */
static void
fuzzy_controller_io_callback (gint fd, short what, void *arg)
{
struct fuzzy_cmd_io *io;
struct rspamd_fuzzy_cmd *cmd = NULL;
const gchar *symbol, *ftype;
- struct ev_loop *ev_base;
gint r;
enum {
return_error = 0,
if (what & EV_READ) {
if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
return;
}
}
}
else {
- g_assert (0);
+ fuzzy_controller_timer_callback (fd, what, arg);
+
+ return;
}
if (ret == return_want_more) {
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_READ,
- fuzzy_controller_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
+ rspamd_ev_watcher_reschedule (session->event_loop,
+ &session->ev, EV_READ);
return;
}
rspamd_http_connection_unref (session->http_entry->conn);
}
- event_del (&session->ev);
- event_del (&session->timev);
+ rspamd_ev_watcher_stop (session->event_loop, &session->ev);
close (session->fd);
if (*session->saved == 0) {
if (session->http_entry) {
ucl_object_t *reply, *hashes;
- guint i;
gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
reply = ucl_object_typed_new (UCL_OBJECT);
}
-static void
-fuzzy_controller_timer_callback (gint fd, short what, void *arg)
-{
- struct fuzzy_learn_session *session = arg;
- struct rspamd_task *task;
- struct ev_loop *ev_base;
-
- task = session->task;
-
- if (session->retransmits >= session->rule->ctx->retransmits) {
- rspamd_upstream_fail (session->server, TRUE);
- msg_err_task_check ("got IO timeout with server %s(%s), "
- "after %d retransmits",
- rspamd_upstream_name (session->server),
- rspamd_inet_address_to_string_pretty (
- rspamd_upstream_addr_cur (session->server)),
- session->retransmits);
-
- if (session->session) {
- rspamd_session_remove_event (session->session, fuzzy_lua_fin,
- session);
- }
- else {
- if (session->http_entry) {
- rspamd_controller_send_error (session->http_entry,
- 500, "IO timeout with fuzzy storage");
- }
-
- if (*session->saved > 0 ) {
- (*session->saved)--;
- if (*session->saved == 0) {
- if (session->http_entry) {
- rspamd_task_free (session->task);
- }
-
- session->task = NULL;
- }
- }
-
- if (session->http_entry) {
- rspamd_http_connection_unref (session->http_entry->conn);
- }
-
- event_del (&session->ev);
- event_del (&session->timev);
- close (session->fd);
- }
- }
- else {
- /* Plan write event */
- ev_base = event_get_base (&session->ev);
- event_del (&session->ev);
- event_set (&session->ev, fd, EV_WRITE|EV_READ,
- fuzzy_controller_io_callback, session);
- event_base_set (ev_base, &session->ev);
- event_add (&session->ev, NULL);
-
- /* Plan new retransmit timer */
- ev_base = event_get_base (&session->timev);
- event_del (&session->timev);
- event_base_set (ev_base, &session->timev);
- event_add (&session->timev, &session->tv);
- session->retransmits ++;
- }
-}
-
static GPtrArray *
fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule,
gint c, gint flag, guint32 value, guint flags)
session =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_client_session));
- msec_to_tv (rule->ctx->io_timeout, &session->tv);
session->state = 0;
session->commands = commands;
session->task = task;
session->server = selected;
session->rule = rule;
session->results = g_ptr_array_sized_new (32);
+ session->event_loop = task->event_loop;
- event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
- session);
- event_base_set (session->task->event_loop, &session->ev);
- event_add (&session->ev, NULL);
-
- evtimer_set (&session->timev, fuzzy_check_timer_callback,
+ rspamd_ev_watcher_init (&session->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_check_io_callback,
session);
- event_base_set (session->task->event_loop, &session->timev);
- event_add (&session->timev, &session->tv);
+ rspamd_ev_watcher_start (session->event_loop, &session->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
rspamd_session_add_event (task->s, fuzzy_io_fin, session, M);
session->item = rspamd_symcache_get_cur_item (task);
struct rspamd_controller_session *session = entry->ud;
gint sock;
gint ret = -1;
- struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);
/* Get upstream */
rspamd_mempool_alloc0 (session->pool,
sizeof (struct fuzzy_learn_session));
- msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
s->http_entry = entry;
s->fd = sock;
s->err = err;
s->rule = rule;
+ s->event_loop = task->event_loop;
/* We ref connection to avoid freeing before we process fuzzy rule */
rspamd_http_connection_ref (entry->conn);
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (entry->rt->event_loop, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback,
+ rspamd_ev_watcher_init (&s->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_controller_io_callback,
s);
- event_base_set (s->task->event_loop, &s->timev);
- event_add (&s->timev, &s->tv);
+ rspamd_ev_watcher_start (s->event_loop, &s->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
(*saved)++;
ret = 1;
s =
rspamd_mempool_alloc0 (task->task_pool,
sizeof (struct fuzzy_learn_session));
-
- msec_to_tv (rule->ctx->io_timeout, &s->tv);
s->task = task;
s->commands = commands;
s->http_entry = NULL;
s->err = err;
s->rule = rule;
s->session = task->s;
+ s->event_loop = task->event_loop;
- event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
- event_base_set (task->event_loop, &s->ev);
- event_add (&s->ev, NULL);
-
- evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
- event_base_set (s->task->event_loop, &s->timev);
- event_add (&s->timev, &s->tv);
+ rspamd_ev_watcher_init (&s->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_controller_io_callback,
+ s);
+ rspamd_ev_watcher_start (s->event_loop, &s->ev,
+ ((double)rule->ctx->io_timeout) / 1000.0);
rspamd_session_add_event (task->s,
fuzzy_lua_fin,
fuzzy_lua_learn_handler (lua_State *L)
{
struct rspamd_task *task = lua_check_task (L, 1);
- guint flag = 0, weight = 1.0, send_flags = 0;
+ guint flag = 0, weight = 1, send_flags = 0;
const gchar *symbol;
struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);