]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] More libserver adoptions
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 19 Jun 2019 12:16:25 +0000 (13:16 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
contrib/libev/CMakeLists.txt
src/libserver/rspamd_control.c
src/plugins/fuzzy_check.c
src/plugins/surbl.c

index e681aeb916d9fac82bd7aaec27915fb1345f4aaf..c99b4dd3245c8341ddcae6c1f2d6fa1aa963749d 100644 (file)
@@ -64,4 +64,8 @@ ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
                )
 IF(HAVE_EVENTFD)
        ADD_DEFINITIONS(-DEV_USE_EVENTFD=1)
+ENDIF()
+
+IF(ENABLE_FULL_DEBUG MATCHES "ON")
+       ADD_DEFINITIONS(-DEV_VERIFY=3)
 ENDIF()
\ No newline at end of file
index 9788d47edd7e5cbe3e4ac4e1c5bd72be16bc3d7c..62ca2464372b634bdfc9c6f39514a333d72b8c42 100644 (file)
@@ -19,6 +19,7 @@
 #include "worker_util.h"
 #include "libutil/http_connection.h"
 #include "libutil/http_private.h"
+#include "libutil/libev_helper.h"
 #include "unix-std.h"
 #include "utlist.h"
 
 #include <sys/resource.h>
 #endif
 
-static struct timeval io_timeout = {
-               .tv_sec = 30,
-               .tv_usec = 0
-};
-static struct timeval worker_io_timeout = {
-               .tv_sec = 0,
-               .tv_usec = 500000
-};
+static ev_tstamp io_timeout = 30.0;
+static ev_tstamp worker_io_timeout = 0.5;
 
 struct rspamd_control_session;
 
 struct rspamd_control_reply_elt {
        struct rspamd_control_reply reply;
-       struct event io_ev;
+       struct rspamd_io_ev ev;
        struct rspamd_worker *wrk;
        gpointer ud;
        gint attached_fd;
@@ -48,6 +43,7 @@ struct rspamd_control_reply_elt {
 
 struct rspamd_control_session {
        gint fd;
+       struct ev_loop *event_loop;
        struct rspamd_main *rspamd_main;
        struct rspamd_http_connection *conn;
        struct rspamd_control_command cmd;
@@ -131,7 +127,7 @@ rspamd_control_send_error (struct rspamd_control_session *session,
                        NULL,
                        "application/json",
                        session,
-                       &io_timeout);
+                       io_timeout);
 }
 
 static void
@@ -154,7 +150,7 @@ rspamd_control_send_ucl (struct rspamd_control_session *session,
                        NULL,
                        "application/json",
                        session,
-                       &io_timeout);
+                       io_timeout);
 }
 
 static void
@@ -168,7 +164,8 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
                        rspamd_inet_address_to_string (session->addr));
 
        DL_FOREACH_SAFE (session->replies, elt, telt) {
-               event_del (&elt->io_ev);
+               rspamd_ev_watcher_stop (session->event_loop,
+                               &elt->ev);
                g_free (elt);
        }
 
@@ -358,7 +355,8 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud)
        }
 
        session->replies_remain --;
-       event_del (&elt->io_ev);
+       rspamd_ev_watcher_stop (session->event_loop,
+                       &elt->ev);
 
        if (session->replies_remain == 0) {
                rspamd_control_write_reply (session);
@@ -434,12 +432,12 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
                        rep_elt = g_malloc0 (sizeof (*rep_elt));
                        rep_elt->wrk = wrk;
                        rep_elt->ud = ud;
-                       event_set (&rep_elt->io_ev, wrk->control_pipe[0],
-                                       EV_READ | EV_PERSIST, handler,
+                       rspamd_ev_watcher_init (&rep_elt->ev,
+                                       wrk->control_pipe[0],
+                                       EV_READ, handler,
                                        rep_elt);
-                       event_base_set (rspamd_main->event_loop,
-                                       &rep_elt->io_ev);
-                       event_add (&rep_elt->io_ev, &worker_io_timeout);
+                       rspamd_ev_watcher_start (rspamd_main->event_loop,
+                                       &rep_elt->ev, worker_io_timeout);
 
                        DL_APPEND (res, rep_elt);
                }
@@ -527,11 +525,11 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main,
        session->rspamd_main = rspamd_main;
        session->addr = addr;
        rspamd_http_connection_read_message (session->conn, session,
-                       &io_timeout);
+                       io_timeout);
 }
 
 struct rspamd_worker_control_data {
-       struct event io_ev;
+       ev_io io_ev;
        struct rspamd_worker *worker;
        struct ev_loop *ev_base;
        struct {
@@ -613,9 +611,10 @@ rspamd_control_default_cmd_handler (gint fd,
 }
 
 static void
-rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
+rspamd_control_default_worker_handler (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_worker_control_data *cd = ud;
+       struct rspamd_worker_control_data *cd =
+                       (struct rspamd_worker_control_data *)w->data;
        static struct rspamd_control_command cmd;
        static struct msghdr msg;
        static struct iovec iov;
@@ -631,15 +630,15 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
 
-       r = recvmsg (fd, &msg, 0);
+       r = recvmsg (w->fd, &msg, 0);
 
        if (r == -1) {
                msg_err ("cannot read request from the control socket: %s",
                                strerror (errno));
 
                if (errno != EAGAIN && errno != EINTR) {
-                       event_del (&cd->io_ev);
-                       close (fd);
+                       ev_io_stop (cd->ev_base, &cd->io_ev);
+                       close (w->fd);
                }
        }
        else if (r < (gint)sizeof (cmd)) {
@@ -647,8 +646,8 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
                                (gint)sizeof (cmd));
 
                if (r == 0) {
-                       event_del (&cd->io_ev);
-                       close (fd);
+                       ev_io_stop (cd->ev_base, &cd->io_ev);
+                       close (w->fd);
                }
        }
        else if ((gint)cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) {
@@ -660,13 +659,13 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
                if (cd->handlers[cmd.type].handler) {
                        cd->handlers[cmd.type].handler (cd->worker->srv,
                                        cd->worker,
-                                       fd,
+                                       w->fd,
                                        rfd,
                                        &cmd,
                                        cd->handlers[cmd.type].ud);
                }
                else {
-                       rspamd_control_default_cmd_handler (fd, rfd, cd, &cmd);
+                       rspamd_control_default_cmd_handler (w->fd, rfd, cd, &cmd);
                }
        }
        else {
@@ -684,10 +683,10 @@ rspamd_control_worker_add_default_handler (struct rspamd_worker *worker,
        cd->worker = worker;
        cd->ev_base = ev_base;
 
-       event_set (&cd->io_ev, worker->control_pipe[1], EV_READ | EV_PERSIST,
-                       rspamd_control_default_worker_handler, cd);
-       event_base_set (ev_base, &cd->io_ev);
-       event_add (&cd->io_ev, NULL);
+       cd->io_ev.data = cd;
+       ev_io_init (&cd->io_ev, rspamd_control_default_worker_handler,
+                       worker->control_pipe[1], EV_READ);
+       ev_io_start (ev_base, &cd->io_ev);
 
        worker->control_data = cd;
 }
@@ -720,26 +719,28 @@ struct rspamd_srv_reply_data {
 };
 
 static void
-rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_hs_io_handler (int fd, short what, void *ud)
 {
-       struct rspamd_control_reply_elt *elt = ud;
+       struct rspamd_control_reply_elt *elt =
+                       (struct rspamd_control_reply_elt *)ud;
        struct rspamd_control_reply rep;
 
        /* At this point we just ignore replies from the workers */
        (void)read (fd, &rep, sizeof (rep));
-       event_del (&elt->io_ev);
+       rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
        g_free (elt);
 }
 
 static void
-rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
 {
-       struct rspamd_control_reply_elt *elt = ud;
+       struct rspamd_control_reply_elt *elt =
+                       (struct rspamd_control_reply_elt *)ud;
        struct rspamd_control_reply rep;
 
        /* At this point we just ignore replies from the workers */
        (void) read (fd, &rep, sizeof (rep));
-       event_del (&elt->io_ev);
+       rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
        g_free (elt);
 }
 
@@ -794,7 +795,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
 }
 
 static void
-rspamd_srv_handler (gint fd, short what, gpointer ud)
+rspamd_srv_handler (EV_P_ ev_io *w, int revents)
 {
        struct rspamd_worker *worker;
        static struct rspamd_srv_command cmd;
@@ -809,8 +810,8 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
        struct rspamd_control_command wcmd;
        gssize r;
 
-       if (what == EV_READ) {
-               worker = ud;
+       if (revents == EV_READ) {
+               worker = (struct rspamd_worker *)w->data;
                srv = worker->srv;
                iov.iov_base = &cmd;
                iov.iov_len = sizeof (cmd);
@@ -820,7 +821,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                msg.msg_iov = &iov;
                msg.msg_iovlen = 1;
 
-               r = recvmsg (fd, &msg, 0);
+               r = recvmsg (w->fd, &msg, 0);
 
                if (r == -1) {
                        msg_err ("cannot read from worker's srv pipe: %s",
@@ -831,7 +832,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                         * Usually this means that a worker is dead, so do not try to read
                         * anything
                         */
-                       event_del (&worker->srv_ev);
+                       ev_io_stop (EV_A_ w);
                }
                else if (r != sizeof (cmd)) {
                        msg_err ("cannot read from worker's srv pipe incomplete command: %d",
@@ -919,17 +920,14 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                        }
 
                        /* Now plan write event and send data back */
-                       event_del (&worker->srv_ev);
-                       event_set (&worker->srv_ev,
-                                       worker->srv_pipe[0],
-                                       EV_WRITE,
-                                       rspamd_srv_handler,
-                                       rdata);
-                       event_add (&worker->srv_ev, NULL);
+                       w->data = rdata;
+                       ev_io_stop (EV_A_ w);
+                       ev_io_set (w, worker->srv_pipe[0], EV_WRITE);
+                       ev_io_start (EV_A_ w);
                }
        }
-       else if (what == EV_WRITE) {
-               rdata = ud;
+       else if (revents == EV_WRITE) {
+               rdata = (struct rspamd_srv_reply_data *)w->data;
                worker = rdata->worker;
                worker->tmp_data = NULL; /* Avoid race */
                srv = rdata->srv;
@@ -953,7 +951,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                msg.msg_iov = &iov;
                msg.msg_iovlen = 1;
 
-               r = sendmsg (fd, &msg, 0);
+               r = sendmsg (w->fd, &msg, 0);
 
                if (r == -1) {
                        msg_err ("cannot write to worker's srv pipe: %s",
@@ -961,13 +959,10 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                }
 
                g_free (rdata);
-               event_del (&worker->srv_ev);
-               event_set (&worker->srv_ev,
-                               worker->srv_pipe[0],
-                               EV_READ | EV_PERSIST,
-                               rspamd_srv_handler,
-                               worker);
-               event_add (&worker->srv_ev, NULL);
+               w->data = worker;
+               ev_io_stop (EV_A_ w);
+               ev_io_set (w, worker->srv_pipe[0], EV_READ);
+               ev_io_start (EV_A_ w);
        }
 }
 
@@ -979,10 +974,9 @@ rspamd_srv_start_watching (struct rspamd_main *srv,
        g_assert (worker != NULL);
 
        worker->tmp_data = NULL;
-       event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
-                       rspamd_srv_handler, worker);
-       event_base_set (ev_base, &worker->srv_ev);
-       event_add (&worker->srv_ev, NULL);
+       worker->srv_ev.data = worker;
+       ev_io_init (&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ);
+       ev_io_start (ev_base, &worker->srv_ev);
 }
 
 struct rspamd_srv_request_data {
@@ -991,14 +985,14 @@ struct rspamd_srv_request_data {
        gint attached_fd;
        struct rspamd_srv_reply rep;
        rspamd_srv_reply_handler handler;
-       struct event io_ev;
+       ev_io io_ev;
        gpointer ud;
 };
 
 static void
-rspamd_srv_request_handler (gint fd, short what, gpointer ud)
+rspamd_srv_request_handler (EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_srv_request_data *rd = ud;
+       struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *)w->data;
        struct msghdr msg;
        struct iovec iov;
        guchar fdspace[CMSG_SPACE(sizeof (int))];
@@ -1006,7 +1000,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
        gssize r;
        gint rfd = -1;
 
-       if (what == EV_WRITE) {
+       if (revents == EV_WRITE) {
                /* Send request to server */
                memset (&msg, 0, sizeof (msg));
 
@@ -1027,17 +1021,16 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
                msg.msg_iov = &iov;
                msg.msg_iovlen = 1;
 
-               r = sendmsg (fd, &msg, 0);
+               r = sendmsg (w->fd, &msg, 0);
 
                if (r == -1) {
                        msg_err ("cannot write to server pipe: %s", strerror (errno));
                        goto cleanup;
                }
 
-               event_del (&rd->io_ev);
-               event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ,
-                               rspamd_srv_request_handler, rd);
-               event_add (&rd->io_ev, NULL);
+               ev_io_stop (EV_A_ w);
+               ev_io_set (w, rd->worker->srv_pipe[1], EV_READ);
+               ev_io_start (EV_A_ w);
        }
        else {
                iov.iov_base = &rd->rep;
@@ -1048,7 +1041,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
                msg.msg_iov = &iov;
                msg.msg_iovlen = 1;
 
-               r = recvmsg (fd, &msg, 0);
+               r = recvmsg (w->fd, &msg, 0);
 
                if (r == -1) {
                        msg_err ("cannot read from server pipe: %s", strerror (errno));
@@ -1075,7 +1068,8 @@ cleanup:
        if (rd->handler) {
                rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
        }
-       event_del (&rd->io_ev);
+
+       ev_io_stop (EV_A_ w);
        g_free (rd);
 }
 
@@ -1102,8 +1096,8 @@ rspamd_srv_send_command (struct rspamd_worker *worker,
        rd->rep.type = cmd->type;
        rd->attached_fd = attached_fd;
 
-       event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE,
-                       rspamd_srv_request_handler, rd);
-       event_base_set (ev_base, &rd->io_ev);
-       event_add (&rd->io_ev, NULL);
+       rd->io_ev.data = rd;
+       ev_io_init (&rd->io_ev, rspamd_srv_request_handler,
+                       rd->worker->srv_pipe[1], EV_WRITE);
+       ev_io_start (ev_base, &rd->io_ev);
 }
index 2c91869d66130069f178930dd945042f524937ca..75df2a6456e17c73f48a8aef03f201aad3003b19 100644 (file)
@@ -47,6 +47,7 @@
 #include "libstat/stat_api.h"
 #include <math.h>
 #include <src/libmime/message.h>
+#include "libutil/libev_helper.h"
 
 #define DEFAULT_SYMBOL "R_FUZZY_HASH"
 
@@ -129,9 +130,8 @@ struct fuzzy_client_session {
        struct rspamd_symcache_item *item;
        struct upstream *server;
        struct fuzzy_rule *rule;
-       struct event ev;
-       struct event timev;
-       struct timeval tv;
+       struct ev_loop *event_loop;
+       struct rspamd_io_ev ev;
        gint state;
        gint fd;
        guint retransmits;
@@ -146,9 +146,8 @@ struct fuzzy_learn_session {
        struct upstream *server;
        struct fuzzy_rule *rule;
        struct rspamd_task *task;
-       struct event ev;
-       struct event timev;
-       struct timeval tv;
+       struct ev_loop *event_loop;
+       struct rspamd_io_ev ev;
        gint fd;
        guint retransmits;
 };
@@ -1185,8 +1184,7 @@ fuzzy_io_fin (void *ud)
                g_ptr_array_free (session->results, TRUE);
        }
 
-       event_del (&session->ev);
-       event_del (&session->timev);
+       rspamd_ev_watcher_stop (session->event_loop, &session->ev);
        close (session->fd);
 }
 
@@ -2181,13 +2179,49 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session)
        return FALSE;
 }
 
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+       struct fuzzy_client_session *session = arg;
+       struct rspamd_task *task;
+
+       task = session->task;
+
+       /* We might be here because of other checks being slow */
+       if (fuzzy_check_try_read (session) > 0) {
+               if (fuzzy_check_session_is_completed (session)) {
+                       return;
+               }
+       }
+
+       if (session->retransmits >= session->rule->ctx->retransmits) {
+               msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+                               rspamd_upstream_name (session->server),
+                               rspamd_inet_address_to_string_pretty (
+                                               rspamd_upstream_addr_cur (session->server)),
+                               session->retransmits);
+               rspamd_upstream_fail (session->server, TRUE);
+
+               if (session->item) {
+                       rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+               }
+               rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+       }
+       else {
+               /* Plan write event */
+               rspamd_ev_watcher_reschedule (session->event_loop,
+                               &session->ev, EV_READ|EV_WRITE);
+               session->retransmits ++;
+       }
+}
+
 /* Fuzzy check callback */
 static void
 fuzzy_check_io_callback (gint fd, short what, void *arg)
 {
        struct fuzzy_client_session *session = arg;
        struct rspamd_task *task;
-       struct ev_loop *ev_base;
        gint r;
 
        enum {
@@ -2224,18 +2258,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
                }
        }
        else {
-               /* Should not happen */
-               g_assert (0);
+               fuzzy_check_timer_callback (fd, what, arg);
+               return;
        }
 
        if (ret == return_want_more) {
                /* Processed write, switch to reading */
-               ev_base = event_get_base (&session->ev);
-               event_del (&session->ev);
-               event_set (&session->ev, fd, EV_READ,
-                               fuzzy_check_io_callback, session);
-               event_base_set (ev_base, &session->ev);
-               event_add (&session->ev, NULL);
+               rspamd_ev_watcher_reschedule (session->event_loop,
+                               &session->ev, EV_READ);
        }
        else if (ret == return_error) {
                /* Error state */
@@ -2258,77 +2288,81 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
                /* Read something from network */
                if (!fuzzy_check_session_is_completed (session)) {
                        /* Need to read more */
-                       ev_base = event_get_base (&session->ev);
-                       event_del (&session->ev);
-                       event_set (&session->ev, session->fd, EV_READ,
-                                       fuzzy_check_io_callback, session);
-                       event_base_set (ev_base, &session->ev);
-                       event_add (&session->ev, NULL);
+                       rspamd_ev_watcher_reschedule (session->event_loop,
+                                       &session->ev, EV_READ);
                }
        }
 }
 
-/* Fuzzy check timeout callback */
+
 static void
-fuzzy_check_timer_callback (gint fd, short what, void *arg)
+fuzzy_lua_fin (void *ud)
 {
-       struct fuzzy_client_session *session = arg;
+       struct fuzzy_learn_session *session = ud;
+
+       (*session->saved)--;
+
+       rspamd_ev_watcher_stop (session->event_loop, &session->ev);
+       close (session->fd);
+}
+
+/* Controller IO */
+
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+       struct fuzzy_learn_session *session = arg;
        struct rspamd_task *task;
-       struct ev_loop *ev_base;
 
        task = session->task;
 
-       /* We might be here because of other checks being slow */
-       if (fuzzy_check_try_read (session) > 0) {
-               if (fuzzy_check_session_is_completed (session)) {
-                       return;
-               }
-       }
-
        if (session->retransmits >= session->rule->ctx->retransmits) {
-               msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+               rspamd_upstream_fail (session->server, TRUE);
+               msg_err_task_check ("got IO timeout with server %s(%s), "
+                                                       "after %d retransmits",
                                rspamd_upstream_name (session->server),
                                rspamd_inet_address_to_string_pretty (
                                                rspamd_upstream_addr_cur (session->server)),
                                session->retransmits);
-               rspamd_upstream_fail (session->server, TRUE);
 
-               if (session->item) {
-                       rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+               if (session->session) {
+                       rspamd_session_remove_event (session->session, fuzzy_lua_fin,
+                                       session);
+               }
+               else {
+                       if (session->http_entry) {
+                               rspamd_controller_send_error (session->http_entry,
+                                               500, "IO timeout with fuzzy storage");
+                       }
+
+                       if (*session->saved > 0 ) {
+                               (*session->saved)--;
+                               if (*session->saved == 0) {
+                                       if (session->http_entry) {
+                                               rspamd_task_free (session->task);
+                                       }
+
+                                       session->task = NULL;
+                               }
+                       }
+
+                       if (session->http_entry) {
+                               rspamd_http_connection_unref (session->http_entry->conn);
+                       }
+
+                       rspamd_ev_watcher_stop (session->event_loop,
+                                       &session->ev);
+                       close (session->fd);
                }
-               rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
        }
        else {
                /* Plan write event */
-               ev_base = event_get_base (&session->ev);
-               event_del (&session->ev);
-               event_set (&session->ev, fd, EV_WRITE|EV_READ,
-                               fuzzy_check_io_callback, session);
-               event_base_set (ev_base, &session->ev);
-               event_add (&session->ev, NULL);
-
-               /* Plan new retransmit timer */
-               ev_base = event_get_base (&session->timev);
-               event_del (&session->timev);
-               event_base_set (ev_base, &session->timev);
-               event_add (&session->timev, &session->tv);
+               rspamd_ev_watcher_reschedule (session->event_loop,
+                               &session->ev, EV_READ|EV_WRITE);
                session->retransmits ++;
        }
 }
 
-static void
-fuzzy_lua_fin (void *ud)
-{
-       struct fuzzy_learn_session *session = ud;
-
-       (*session->saved)--;
-
-       event_del (&session->ev);
-       event_del (&session->timev);
-       close (session->fd);
-}
-
-/* Controller IO */
 static void
 fuzzy_controller_io_callback (gint fd, short what, void *arg)
 {
@@ -2340,7 +2374,6 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
        struct fuzzy_cmd_io *io;
        struct rspamd_fuzzy_cmd *cmd = NULL;
        const gchar *symbol, *ftype;
-       struct ev_loop *ev_base;
        gint r;
        enum {
                return_error = 0,
@@ -2355,7 +2388,8 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
        if (what & EV_READ) {
                if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
-                               event_add (&session->ev, NULL);
+                               rspamd_ev_watcher_reschedule (session->event_loop,
+                                               &session->ev, EV_READ);
                                return;
                        }
 
@@ -2482,16 +2516,14 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
                        }
                }
        else {
-               g_assert (0);
+               fuzzy_controller_timer_callback (fd, what, arg);
+
+               return;
        }
 
        if (ret == return_want_more) {
-               ev_base = event_get_base (&session->ev);
-               event_del (&session->ev);
-               event_set (&session->ev, fd, EV_READ,
-                               fuzzy_controller_io_callback, session);
-               event_base_set (ev_base, &session->ev);
-               event_add (&session->ev, NULL);
+               rspamd_ev_watcher_reschedule (session->event_loop,
+                               &session->ev, EV_READ);
 
                return;
        }
@@ -2518,8 +2550,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
                        rspamd_http_connection_unref (session->http_entry->conn);
                }
 
-               event_del (&session->ev);
-               event_del (&session->timev);
+               rspamd_ev_watcher_stop (session->event_loop, &session->ev);
                close (session->fd);
 
                if (*session->saved == 0) {
@@ -2555,7 +2586,6 @@ cleanup:
 
                if (session->http_entry) {
                        ucl_object_t *reply, *hashes;
-                       guint i;
                        gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
 
                        reply = ucl_object_typed_new (UCL_OBJECT);
@@ -2588,72 +2618,6 @@ cleanup:
 
 }
 
-static void
-fuzzy_controller_timer_callback (gint fd, short what, void *arg)
-{
-       struct fuzzy_learn_session *session = arg;
-       struct rspamd_task *task;
-       struct ev_loop *ev_base;
-
-       task = session->task;
-
-       if (session->retransmits >= session->rule->ctx->retransmits) {
-               rspamd_upstream_fail (session->server, TRUE);
-               msg_err_task_check ("got IO timeout with server %s(%s), "
-                               "after %d retransmits",
-                               rspamd_upstream_name (session->server),
-                               rspamd_inet_address_to_string_pretty (
-                                               rspamd_upstream_addr_cur (session->server)),
-                               session->retransmits);
-
-               if (session->session) {
-                       rspamd_session_remove_event (session->session, fuzzy_lua_fin,
-                                       session);
-               }
-               else {
-                       if (session->http_entry) {
-                               rspamd_controller_send_error (session->http_entry,
-                                               500, "IO timeout with fuzzy storage");
-                       }
-
-                       if (*session->saved > 0 ) {
-                               (*session->saved)--;
-                               if (*session->saved == 0) {
-                                       if (session->http_entry) {
-                                               rspamd_task_free (session->task);
-                                       }
-
-                                       session->task = NULL;
-                               }
-                       }
-
-                       if (session->http_entry) {
-                               rspamd_http_connection_unref (session->http_entry->conn);
-                       }
-
-                       event_del (&session->ev);
-                       event_del (&session->timev);
-                       close (session->fd);
-               }
-       }
-       else {
-               /* Plan write event */
-               ev_base = event_get_base (&session->ev);
-               event_del (&session->ev);
-               event_set (&session->ev, fd, EV_WRITE|EV_READ,
-                               fuzzy_controller_io_callback, session);
-               event_base_set (ev_base, &session->ev);
-               event_add (&session->ev, NULL);
-
-               /* Plan new retransmit timer */
-               ev_base = event_get_base (&session->timev);
-               event_del (&session->timev);
-               event_base_set (ev_base, &session->timev);
-               event_add (&session->timev, &session->tv);
-               session->retransmits ++;
-       }
-}
-
 static GPtrArray *
 fuzzy_generate_commands (struct rspamd_task *task, struct fuzzy_rule *rule,
                gint c, gint flag, guint32 value, guint flags)
@@ -2774,7 +2738,6 @@ register_fuzzy_client_call (struct rspamd_task *task,
                                session =
                                                rspamd_mempool_alloc0 (task->task_pool,
                                                                sizeof (struct fuzzy_client_session));
-                               msec_to_tv (rule->ctx->io_timeout, &session->tv);
                                session->state = 0;
                                session->commands = commands;
                                session->task = task;
@@ -2782,16 +2745,15 @@ register_fuzzy_client_call (struct rspamd_task *task,
                                session->server = selected;
                                session->rule = rule;
                                session->results = g_ptr_array_sized_new (32);
+                               session->event_loop = task->event_loop;
 
-                               event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
-                                               session);
-                               event_base_set (session->task->event_loop, &session->ev);
-                               event_add (&session->ev, NULL);
-
-                               evtimer_set (&session->timev, fuzzy_check_timer_callback,
+                               rspamd_ev_watcher_init (&session->ev,
+                                               sock,
+                                               EV_WRITE,
+                                               fuzzy_check_io_callback,
                                                session);
-                               event_base_set (session->task->event_loop, &session->timev);
-                               event_add (&session->timev, &session->tv);
+                               rspamd_ev_watcher_start (session->event_loop, &session->ev,
+                                               ((double)rule->ctx->io_timeout) / 1000.0);
 
                                rspamd_session_add_event (task->s, fuzzy_io_fin, session, M);
                                session->item = rspamd_symcache_get_cur_item (task);
@@ -2881,7 +2843,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
        struct rspamd_controller_session *session = entry->ud;
        gint sock;
        gint ret = -1;
-       struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);
 
        /* Get upstream */
 
@@ -2899,7 +2860,6 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
                                rspamd_mempool_alloc0 (session->pool,
                                        sizeof (struct fuzzy_learn_session));
 
-                       msec_to_tv (fuzzy_module_ctx->io_timeout, &s->tv);
                        s->task = task;
                        s->commands = commands;
                        s->http_entry = entry;
@@ -2908,17 +2868,17 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
                        s->fd = sock;
                        s->err = err;
                        s->rule = rule;
+                       s->event_loop = task->event_loop;
                        /* We ref connection to avoid freeing before we process fuzzy rule */
                        rspamd_http_connection_ref (entry->conn);
 
-                       event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
-                       event_base_set (entry->rt->event_loop, &s->ev);
-                       event_add (&s->ev, NULL);
-
-                       evtimer_set (&s->timev, fuzzy_controller_timer_callback,
+                       rspamd_ev_watcher_init (&s->ev,
+                                       sock,
+                                       EV_WRITE,
+                                       fuzzy_controller_io_callback,
                                        s);
-                       event_base_set (s->task->event_loop, &s->timev);
-                       event_add (&s->timev, &s->tv);
+                       rspamd_ev_watcher_start (s->event_loop, &s->ev,
+                                       ((double)rule->ctx->io_timeout) / 1000.0);
 
                        (*saved)++;
                        ret = 1;
@@ -3258,8 +3218,6 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
                                s =
                                                rspamd_mempool_alloc0 (task->task_pool,
                                                                sizeof (struct fuzzy_learn_session));
-
-                               msec_to_tv (rule->ctx->io_timeout, &s->tv);
                                s->task = task;
                                s->commands = commands;
                                s->http_entry = NULL;
@@ -3269,14 +3227,15 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
                                s->err = err;
                                s->rule = rule;
                                s->session = task->s;
+                               s->event_loop = task->event_loop;
 
-                               event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
-                               event_base_set (task->event_loop, &s->ev);
-                               event_add (&s->ev, NULL);
-
-                               evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
-                               event_base_set (s->task->event_loop, &s->timev);
-                               event_add (&s->timev, &s->tv);
+                               rspamd_ev_watcher_init (&s->ev,
+                                               sock,
+                                               EV_WRITE,
+                                               fuzzy_controller_io_callback,
+                                               s);
+                               rspamd_ev_watcher_start (s->event_loop, &s->ev,
+                                               ((double)rule->ctx->io_timeout) / 1000.0);
 
                                rspamd_session_add_event (task->s,
                                                fuzzy_lua_fin,
@@ -3367,7 +3326,7 @@ static gint
 fuzzy_lua_learn_handler (lua_State *L)
 {
        struct rspamd_task *task = lua_check_task (L, 1);
-       guint flag = 0, weight = 1.0, send_flags = 0;
+       guint flag = 0, weight = 1, send_flags = 0;
        const gchar *symbol;
        struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context (task->cfg);
 
index 338bdaa241bf5f20e7ac62f18e256efce7bb8b11..3c467615c3923fcdd75ae2d49d2846d361e6b155 100644 (file)
@@ -1815,7 +1815,6 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
        const gchar *rule)
 {
        struct redirector_param *param;
-       struct timeval *timeout;
        struct upstream *selected;
        struct rspamd_http_message *msg;
        struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);
@@ -1851,8 +1850,6 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
                msg = rspamd_http_new_message (HTTP_REQUEST);
                msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
                param->redirector = selected;
-               timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
-               double_to_tv (surbl_module_ctx->read_timeout, timeout);
 
                rspamd_session_add_event (task->s,
                                free_redirector_session, param,
@@ -1864,7 +1861,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
                }
 
                rspamd_http_connection_write_message (param->conn, msg, NULL,
-                               NULL, param, timeout);
+                               NULL, param, surbl_module_ctx->read_timeout);
 
                msg_info_surbl (
                                "<%s> registered redirector call for %*s to %s, according to rule: %s",