]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* Implement rspamd IO with IO dispatcher (TODO: still some issues with timeouts must...
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 19 Feb 2009 18:16:30 +0000 (21:16 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 19 Feb 2009 18:16:30 +0000 (21:16 +0300)
14 files changed:
CMakeLists.txt
src/buffer.c [new file with mode: 0644]
src/buffer.h [new file with mode: 0644]
src/controller.c
src/filter.c
src/fstring.h
src/main.h
src/message.c
src/plugins/regexp.c
src/protocol.c
src/protocol.h
src/util.c
src/util.h
src/worker.c

index 70e14b47241c96416cdceed0e9a6e68ed33e0406..8fedd0bff56f72cce04cfba0d5e83b6d52034e5e 100644 (file)
@@ -211,7 +211,8 @@ SET(RSPAMDSRC       src/modules.c
                                src/fstring.c
                                src/filter.c
                                src/controller.c
-                               src/cfg_utils.c)
+                               src/cfg_utils.c
+                               src/buffer.c)
 
 SET(TOKENIZERSSRC  src/tokenizers/tokenizers.c
                                src/tokenizers/osb.c)
diff --git a/src/buffer.c b/src/buffer.c
new file mode 100644 (file)
index 0000000..46ef988
--- /dev/null
@@ -0,0 +1,355 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "buffer.h"
+
+#define G_DISPATCHER_ERROR dispatcher_error_quark()
+
+static void dispatcher_cb (int fd, short what, void *arg);
+
+static inline GQuark
+dispatcher_error_quark (void)
+{
+  return g_quark_from_static_string ("g-dispatcher-error-quark");
+}
+
+#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
+
+static void
+write_buffers (int fd, rspamd_io_dispatcher_t *d)
+{
+       GList *cur;
+       GError *err;
+       rspamd_buffer_t *buf;
+       ssize_t r;
+
+       /* Fix order */
+       if (d->out_buffers) {
+               d->out_buffers = g_list_reverse (d->out_buffers);
+       }
+       cur = g_list_first (d->out_buffers);
+       while (cur) {
+               buf = (rspamd_buffer_t *)cur->data;
+               if (BUFREMAIN (buf) == 0) {
+                       /* Skip empty buffers */
+                       cur = g_list_next (cur);
+                       continue;
+               }
+
+               r = write (fd, buf->pos, BUFREMAIN (buf));
+               if (r == -1 && errno != EAGAIN) {
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+                               d->err_callback (err, d->user_data);
+                               return;
+                       }
+               }
+               else if (r > 0) {
+                       buf->pos += r;
+                       if (BUFREMAIN (buf) != 0) {
+                               /* Continue with this buffer */
+                               continue;
+                       }
+               }
+               else if (r == 0) {
+                       /* Got EOF while we wait for data */
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+                               d->err_callback (err, d->user_data);
+                               return;
+                       }
+               }
+               else if (errno == EAGAIN) {
+                       /* Wait for other event */
+                       return;
+               }
+               cur = g_list_next (cur);
+       }
+
+       if (cur == NULL) {
+               /* Disable write event for this time */
+               g_list_free (d->out_buffers);
+               d->out_buffers = NULL;
+
+               if (d->write_callback) {
+                       d->write_callback (d->user_data);
+               }
+
+               event_del (d->ev);
+               event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+       }
+}
+
+static void
+read_buffers (int fd, rspamd_io_dispatcher_t *d)
+{
+       ssize_t r, len;
+       GError *err;
+       f_str_t res;
+       char *c;
+
+       if (d->in_buf == NULL) {
+               d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
+               if (d->policy == BUFFER_LINE) {
+                       d->in_buf->data = fstralloc (d->pool, BUFSIZ);
+               }
+               else {
+                       d->in_buf->data = fstralloc (d->pool, d->nchars);
+               }
+               d->in_buf->pos = d->in_buf->data->begin;
+       }
+       
+       if (BUFREMAIN (d->in_buf) == 0) {
+               /* Buffer is full, try to call callback with overflow error */
+               if (d->err_callback) {
+                       err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow");
+                       d->err_callback (err, d->user_data);
+                       return;
+               }
+       }
+       else {
+               /* Try to read the whole buffer */
+               r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf));
+               if (r == -1 && errno != EAGAIN) {
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+                               d->err_callback (err, d->user_data);
+                               return;
+                       }
+               }
+               else if (r == 0) {
+                       /* Got EOF while we wait for data */
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+                               d->err_callback (err, d->user_data);
+                               return;
+                       }
+               }
+               else if (errno == EAGAIN) {
+                       return;
+               }
+               else {
+                       d->in_buf->pos += r;
+                       d->in_buf->data->len += r;
+               }
+       
+       }
+       
+       c = d->in_buf->data->begin;
+       r = 0;
+       len = d->in_buf->data->len;
+
+       switch (d->policy) {
+               case BUFFER_LINE:
+                       while (r < len) {
+                               if (*c == '\r' || *c == '\n') {
+                                       res.begin = d->in_buf->data->begin;
+                                       res.len = r;
+                                       if (d->read_callback) {
+                                               d->read_callback (&res, d->user_data);
+                                               if (r < len - 1 && *(c + 1) == '\n') {
+                                                       r ++;
+                                                       c ++;
+                                               }
+                                               /* Move remaining string to begin of buffer (draining) */
+                                               memmove (d->in_buf->data->begin, c, len - r);
+                                               c = d->in_buf->data->begin;
+                                               d->in_buf->data->len -= r + 1;
+                                               d->in_buf->pos -= r + 1;
+                                               r = 0;
+                                               len = d->in_buf->data->len;
+                                               continue;
+                                       }
+                               }
+                               r ++;
+                               c ++;
+                       }
+                       break;
+               case BUFFER_CHARACTER:
+                       while (r < len) {
+                               if (r == d->nchars) {
+                                       res.begin = d->in_buf->data->begin;
+                                       res.len = r;
+                                       if (d->read_callback) {
+                                               d->read_callback (&res, d->user_data);
+                                               /* Move remaining string to begin of buffer (draining) */
+                                               memmove (d->in_buf->data->begin, c, len - r);
+                                               c = d->in_buf->data->begin;
+                                               d->in_buf->data->len -= r;
+                                               d->in_buf->pos -= r;
+                                               r = 0;
+                                               len = d->in_buf->data->len;
+                                               continue;
+                                       }
+                               
+                               }
+                               r ++;
+                               c ++;
+                       }
+                       break;
+       }
+}
+
+#undef BUFREMAIN
+
+static void
+dispatcher_cb (int fd, short what, void *arg)
+{
+       rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
+       GError *err;
+
+       switch (what) {
+               case EV_TIMEOUT:
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
+                               d->err_callback (err, d->user_data);
+                       }
+                       break;
+               case EV_WRITE:
+                       /* No data to write, disable further EV_WRITE to this fd */
+                       if (d->out_buffers == NULL) {
+                               event_del (d->ev);
+                               event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+                               event_add (d->ev, d->tv);
+                       }
+                       else {
+                               write_buffers (fd, d);
+                       }
+                       break;
+               case EV_READ:
+                       read_buffers (fd, d);
+                       break;
+       }
+}
+
+
+rspamd_io_dispatcher_t* 
+rspamd_create_dispatcher (int fd, enum io_policy policy,
+                                                       dispatcher_read_callback_t read_cb,
+                                                       dispatcher_write_callback_t write_cb,
+                                                       dispatcher_err_callback_t err_cb,
+                                                       struct timeval *tv, void *user_data)
+{
+       rspamd_io_dispatcher_t *new;
+
+       if (fd == -1) {
+               return NULL;
+       }
+       
+       new = g_malloc (sizeof (rspamd_io_dispatcher_t));
+       bzero (new, sizeof (rspamd_io_dispatcher_t));
+
+       new->pool = memory_pool_new (memory_pool_get_size ());
+       new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval));
+       memcpy (new->tv, tv, sizeof (struct timeval));
+       new->policy = policy;
+       new->read_callback = read_cb;
+       new->write_callback = write_cb;
+       new->err_callback = err_cb;
+       new->user_data = user_data;
+
+       new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
+       new->fd = fd;
+
+       event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new);
+       event_add (new->ev, new->tv);
+
+       return new;
+}
+
+void 
+rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher)
+{
+       if (dispatcher != NULL) {
+               event_del (dispatcher->ev);
+               memory_pool_delete (dispatcher->pool);
+               g_free (dispatcher);
+       }
+}
+
+void 
+rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, 
+                                                                       enum io_policy policy,
+                                                                       size_t nchars)
+{
+       f_str_t *tmp;
+
+       if (d->policy != policy || d->nchars != nchars) {
+               d->policy = policy;
+               d->nchars = nchars ? nchars : BUFSIZ;
+               /* Resize input buffer if needed */
+               if (policy == BUFFER_CHARACTER && nchars != 0) {
+                       if (d->in_buf && d->in_buf->data->size < nchars) {
+                               tmp = fstralloc (d->pool, d->nchars);
+                               memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+                               tmp->len = d->in_buf->data->len;
+                               d->in_buf->data = tmp;
+                       }
+               }
+               else if (policy == BUFFER_LINE) {
+                       if (d->in_buf && d->nchars < BUFSIZ) {
+                               tmp = fstralloc (d->pool, BUFSIZ);
+                               memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+                               tmp->len = d->in_buf->data->len;
+                               d->in_buf->data = tmp;
+                       }
+               }
+       }
+}
+
+void 
+rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+                                                                       void *data,
+                                                                       size_t len, gboolean delayed)
+{
+       rspamd_buffer_t *newbuf;
+
+       newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
+       newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t));
+
+       newbuf->data->begin = memory_pool_alloc (d->pool, len);
+       memcpy (newbuf->data->begin, data, len);
+       newbuf->data->size = len;
+       newbuf->pos = newbuf->data->begin;
+       
+       d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
+
+       if (!delayed) {
+               event_del (d->ev);
+               event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+       }
+}
+
+void 
+rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d)
+{
+       event_del (d->ev);
+}
+
+/* 
+ * vi:ts=4 
+ */
diff --git a/src/buffer.h b/src/buffer.h
new file mode 100644 (file)
index 0000000..93c7818
--- /dev/null
@@ -0,0 +1,99 @@
+/**
+ * @file buffer.h
+ * Implements buffered IO
+ */
+
+#ifndef RSPAMD_BUFFER_H
+#define RSPAMD_BUFFER_H
+
+#include "config.h"
+#include "mem_pool.h"
+#include "fstring.h"
+
+typedef void (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
+typedef void (*dispatcher_write_callback_t)(void *user_data);
+typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
+
+/**
+ * Types of IO handling
+ */
+enum io_policy {
+       BUFFER_LINE,                                                                                                    /**< call handler when we have line ready */
+       BUFFER_CHARACTER,                                                                                               /**< call handler when we have some characters */
+};
+
+/**
+ * Buffer structure
+ */
+typedef struct rspamd_buffer_s {
+       f_str_t *data;                                                                                                  /**< buffer logic                       */
+       char *pos;                                                                                                              /**< current position           */
+} rspamd_buffer_t;
+
+typedef struct rspamd_io_dispatcher_s {
+       rspamd_buffer_t *in_buf;                                                                                /**< input buffer                       */
+       GList *out_buffers;                                                                                             /**< out buffers chain          */
+       struct timeval *tv;                                                                                             /**< io timeout                         */
+       struct event *ev;                                                                                               /**< libevent io event          */
+       memory_pool_t *pool;                                                                                    /**< where to store data        */
+       enum io_policy policy;                                                                                  /**< IO policy                          */
+       size_t nchars;                                                                                                  /**< how many chars to read     */
+       int fd;                                                                                                                 /**< descriptor                         */
+       dispatcher_read_callback_t read_callback;                                               /**< read callback                      */
+       dispatcher_write_callback_t write_callback;                                             /**< write callback                     */
+       dispatcher_err_callback_t err_callback;                                                 /**< error callback                     */
+       void *user_data;                                                                                                /**< user's data for callbacks */
+} rspamd_io_dispatcher_t;
+
+/**
+ * Creates rspamd IO dispatcher for specified descriptor
+ * @param fd descriptor to IO
+ * @param policy IO policy
+ * @param read_cb read callback handler
+ * @param write_cb write callback handler
+ * @param err_cb error callback handler
+ * @param tv IO timeout
+ * @param user_data pointer to user's data
+ * @return new dispatcher object or NULL in case of failure
+ */
+rspamd_io_dispatcher_t* rspamd_create_dispatcher (int fd, 
+                                                                                                 enum io_policy policy,
+                                                                                                 dispatcher_read_callback_t read_cb,
+                                                                                                 dispatcher_write_callback_t write_cb,
+                                                                                                 dispatcher_err_callback_t err_cb,
+                                                                                                 struct timeval *tv,
+                                                                                                 void *user_data);
+
+/**
+ * Set new policy for dispatcher
+ * @param d pointer to dispatcher's object
+ * @param policy IO policy
+ * @param nchars number of characters in buffer for character policy
+ */
+void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, 
+                                                                                                 enum io_policy policy,
+                                                                                                 size_t nchars);
+
+/**
+ * Write data when it would be possible
+ * @param d pointer to dispatcher's object
+ * @param data data to write
+ * @param len length of data
+ */
+void rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
+                                                                                                 void *data,
+                                                                                                 size_t len, gboolean delayed);
+
+/**
+ * Pause IO events on dispatcher
+ * @param d pointer to dispatcher's object
+ */
+void rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d);
+
+/**
+ * Frees dispatcher object
+ * @param dispatcher pointer to dispatcher's object
+ */
+void rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher);
+
+#endif
index 41f8a964935df60569a0389432856f22cb46ed70..7733bc924e94f07ec302d3525d53e802eeea4e57 100644 (file)
@@ -36,6 +36,9 @@
 #define CRLF "\r\n"
 #define END "END" CRLF
 
+/* 120 seconds for controller's IO */
+#define CONTROLLER_IO_TIMEOUT 120
+
 enum command_type {
        COMMAND_PASSWORD,
        COMMAND_QUIT,
@@ -68,6 +71,7 @@ static GCompletion *comp;
 static time_t start_time;
 
 static char greetingbuf[1024];
+static struct timeval io_tv;
 
 static 
 void sig_handler (int signo)
@@ -110,8 +114,7 @@ free_session (struct controller_session *session)
        struct mime_part *p;
        
        msg_debug ("free_session: freeing session %p", session);
-       bufferevent_disable (session->bev, EV_READ | EV_WRITE);
-       bufferevent_free (session->bev);
+       rspamd_remove_dispatcher (session->dispatcher);
        
        while ((part = g_list_first (session->parts))) {
                session->parts = g_list_remove_link (session->parts, part);
@@ -132,7 +135,7 @@ check_auth (struct controller_command *cmd, struct controller_session *session)
 
        if (cmd->privilleged && !session->authorized) {
                r = snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
-               bufferevent_write (session->bev, out_buf, r);
+               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                return 0;
        }
 
@@ -156,18 +159,18 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                        if (!arg || *arg == '\0') {
                                msg_debug ("process_command: empty password passed");
                                r = snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                return;
                        }
                        if (strncmp (arg, session->cfg->control_password, strlen (arg)) == 0) {
                                session->authorized = 1;
                                r = snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                        }
                        else {
                                session->authorized = 0;
                                r = snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                        }
                        break;
                case COMMAND_QUIT:
@@ -176,7 +179,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                case COMMAND_RELOAD:
                        if (check_auth (cmd, session)) {
                                r = snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                kill (getppid (), SIGHUP);
                        }
                        break;
@@ -199,13 +202,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                          mem_st.shared_chunks_allocated);
                                r += snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %zd" CRLF,
                                                          mem_st.chunks_freed);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                        }
                        break;
                case COMMAND_SHUTDOWN:
                        if (check_auth (cmd, session)) {
                                r = snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                kill (getppid (), SIGTERM);
                        }
                        break;
@@ -235,7 +238,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                                minutes, minutes > 1 ? "s" : " ",
                                                                (int)uptime, uptime > 1 ? "s" : " ");
                                }
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                        }
                        break;
                case COMMAND_LEARN:
@@ -244,37 +247,28 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                if (!arg || *arg == '\0') {
                                        msg_debug ("process_command: no statfile specified in learn command");
                                        r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
-                                       bufferevent_write (session->bev, out_buf, r);
+                                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                        return;
                                }
                                arg = *(cmd_args + 1);
                                if (arg == NULL || *arg == '\0') {
                                        msg_debug ("process_command: no statfile size specified in learn command");
                                        r = snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
-                                       bufferevent_write (session->bev, out_buf, r);
+                                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                        return;
                                }
                                size = strtoul (arg, &err_str, 10);
                                if (err_str && *err_str != '\0') {
                                        msg_debug ("process_command: statfile size is invalid: %s", arg);
                                        r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
-                                       bufferevent_write (session->bev, out_buf, r);
-                                       return;
-                               }
-                               session->learn_buf = memory_pool_alloc0 (session->session_pool, sizeof (f_str_buf_t));
-                               session->learn_buf->buf = fstralloc (session->session_pool, size);
-                               if (session->learn_buf->buf == NULL) {
-                                       r = snprintf (out_buf, sizeof (out_buf), "allocating buffer for learn failed" CRLF);
-                                       bufferevent_write (session->bev, out_buf, r);
+                                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                        return;
                                }
-                               session->learn_buf->pos = session->learn_buf->buf->begin;
-                               update_buf_size (session->learn_buf);
 
                                statfile = g_hash_table_lookup (session->cfg->statfiles, *cmd_args);
                                if (statfile == NULL) {
                                        r = snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
-                                       bufferevent_write (session->bev, out_buf, r);
+                                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                        return;
 
                                }
@@ -302,7 +296,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                                arg = *(cmd_args + 1);
                                                                if (!arg || *arg == '\0') {
                                                                        r = snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF, arg);
-                                                                       bufferevent_write (session->bev, out_buf, r);
+                                                                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                                                        return;
                                                                }
                                                                session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
@@ -311,7 +305,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                                arg = *(cmd_args + 1);
                                                                if (!arg || *arg == '\0') {
                                                                        r = snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF, arg);
-                                                                       bufferevent_write (session->bev, out_buf, r);
+                                                                       rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                                                        return;
                                                                }
                                                                session->learn_from = memory_pool_strdup (session->session_pool, arg);
@@ -321,7 +315,7 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                                break;
                                                        default:
                                                                r = snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF, arg);
-                                                               bufferevent_write (session->bev, out_buf, r);
+                                                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                                                return;
                                                }
                                        }
@@ -333,15 +327,16 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                        if (statfile_pool_create (session->worker->srv->statfile_pool, 
                                                                        session->learn_filename, statfile->size / sizeof (struct stat_file_block)) == -1) {
                                                r = snprintf (out_buf, sizeof (out_buf), "cannot create statfile %s" CRLF, session->learn_filename);
-                                               bufferevent_write (session->bev, out_buf, r);
+                                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                                return;
                                        }
                                        if (statfile_pool_open (session->worker->srv->statfile_pool, session->learn_filename) == -1) {
                                                r = snprintf (out_buf, sizeof (out_buf), "cannot open statfile %s" CRLF, session->learn_filename);
-                                               bufferevent_write (session->bev, out_buf, r);
+                                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                                                return;
                                        }
                                }
+                rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
                                session->state = STATE_LEARN;
                        }
                        break;
@@ -355,13 +350,13 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                                                        "(*) shutdown - shutdown rspamd" CRLF
                                                        "    stat - show different rspamd stat" CRLF
                                                        "    uptime - rspamd uptime" CRLF);
-                               bufferevent_write (session->bev, out_buf, r);
+                               rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE);
                        break;
        }
 }
 
 static void
-read_socket (struct bufferevent *bev, void *arg)
+read_socket (f_str_t *in, void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
        struct classifier_ctx *cls_ctx;
@@ -375,101 +370,73 @@ read_socket (struct bufferevent *bev, void *arg)
 
        switch (session->state) {
                case STATE_COMMAND:
-                       s = buffer_readline (session->session_pool, EVBUFFER_INPUT (bev));
-                       msg_debug ("read_socket: got '%s' string from user", s);
-                       if (s != NULL && *s != 0) {
-                               len = strlen (s);
-                               /* Remove end of line characters from string */
-                               if (s[len - 1] == '\n') {
-                                       if (s[len - 2] == '\r') {
-                                               s[len - 2] = 0;
-                                       }
-                                       s[len - 1] = 0;
-                               }
-                               params = g_strsplit (s, " ", -1);
-                               len = g_strv_length (params);
-                               if (len > 0) {
-                                       cmd = g_strstrip (params[0]);
-                                       comp_list = g_completion_complete (comp, cmd, NULL);
-                                       switch (g_list_length (comp_list)) {
-                                               case 1:
-                                                       process_command ((struct controller_command *)comp_list->data, &params[1], session);
-                                                       break;
-                                               case 0:
-                                                       msg_debug ("Unknown command: '%s'", cmd);
-                                                       i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
-                                                       bufferevent_write (bev, out_buf, i);
-                                                       break;
-                                               default:
-                                                       msg_debug ("Ambigious command: '%s'", cmd);
-                                                       i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
-                                                       bufferevent_write (bev, out_buf, i);
-                                                       break;
-                                       }
+                       s = fstrcstr (in, session->session_pool);
+                       params = g_strsplit (s, " ", -1);
+                       len = g_strv_length (params);
+                       if (len > 0) {
+                               cmd = g_strstrip (params[0]);
+                               comp_list = g_completion_complete (comp, cmd, NULL);
+                               switch (g_list_length (comp_list)) {
+                                       case 1:
+                                               process_command ((struct controller_command *)comp_list->data, &params[1], session);
+                                               break;
+                                       case 0:
+                                               msg_debug ("Unknown command: '%s'", cmd);
+                                               i = snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
+                                               rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+                                               break;
+                                       default:
+                                               msg_debug ("Ambigious command: '%s'", cmd);
+                                               i = snprintf (out_buf, sizeof (out_buf), "Ambigious command" CRLF);
+                                               rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+                                               break;
                                }
-                               if (session->state == STATE_COMMAND) {
-                                       session->state = STATE_REPLY;
-                               }
-                               if (session->state != STATE_LEARN) {
-                                       bufferevent_write (bev, END, sizeof (END) - 1);
-                                       bufferevent_enable (bev, EV_WRITE);
-                               }
-                               g_strfreev (params);
                        }
-            else {
-                               bufferevent_enable (bev, EV_WRITE);
-            }
+                       if (session->state == STATE_COMMAND) {
+                               session->state = STATE_REPLY;
+                       }
+                       if (session->state != STATE_LEARN) {
+                               rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE);
+                       }
+
+                       g_strfreev (params);
                        break;
                case STATE_LEARN:
-                       i = bufferevent_read (bev, session->learn_buf->pos, session->learn_buf->free);
-                       if (i > 0) {
-                               session->learn_buf->pos += i;
-                               update_buf_size (session->learn_buf);
-                               if (session->learn_buf->free == 0) {
-                                       process_learn (session);
-                                       while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) {
-                                               c.begin = content->data;
-                                               c.len = content->len;
-                                               if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer, 
-                                                                       session->session_pool, &c, &tokens)) {
-                                                       i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
-                                                       bufferevent_write (bev, out_buf, i);
-                                                       session->state = STATE_REPLY;
-                                                       return;
-                                               }
-                                       }
-                                       cls_ctx = session->learn_classifier->init_func (session->session_pool);
-                                       session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
-                                                                                                                       session->learn_filename, tokens, session->in_class);
-                                       session->worker->srv->stat->messages_learned ++;
-                                       i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
-                                       bufferevent_write (bev, out_buf, i);
-                                       bufferevent_enable (bev, EV_WRITE);
-
-                                       /* Clean learned parts */
-                                       while ((cur = g_list_first (session->parts))) {
-                                               session->parts = g_list_remove_link (session->parts, cur);
-                                               p = (struct mime_part *)cur->data;
-                                               g_byte_array_free (p->content, FALSE);
-                                               g_list_free_1 (cur);
-                                       }
-
+                       session->learn_buf = in;
+                       process_learn (session);
+                       while ((content = get_next_text_part (session->session_pool, session->parts, &cur)) != NULL) {
+                               c.begin = content->data;
+                               c.len = content->len;
+                               if (!session->learn_tokenizer->tokenize_func (session->learn_tokenizer, 
+                                                       session->session_pool, &c, &tokens)) {
+                                       i = snprintf (out_buf, sizeof (out_buf), "learn fail, tokenizer error" CRLF);
+                                       rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
                                        session->state = STATE_REPLY;
-                                       break;
+                                       return;
                                }
                        }
-                       else {
-                               i = snprintf (out_buf, sizeof (out_buf), "read error: %d" CRLF, i);
-                               bufferevent_write (bev, out_buf, i);
-                               bufferevent_enable (bev, EV_WRITE);
-                               session->state = STATE_REPLY;
+                       cls_ctx = session->learn_classifier->init_func (session->session_pool);
+                       session->learn_classifier->learn_func (cls_ctx, session->worker->srv->statfile_pool,
+                                                                                                       session->learn_filename, tokens, session->in_class);
+                       session->worker->srv->stat->messages_learned ++;
+                       i = snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF);
+                       rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE);
+
+                       /* Clean learned parts */
+                       while ((cur = g_list_first (session->parts))) {
+                               session->parts = g_list_remove_link (session->parts, cur);
+                               p = (struct mime_part *)cur->data;
+                               g_byte_array_free (p->content, FALSE);
+                               g_list_free_1 (cur);
                        }
+
+                       session->state = STATE_REPLY;
                        break;
        }
 }
 
 static void
-write_socket (struct bufferevent *bev, void *arg)
+write_socket (void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
        
@@ -482,16 +449,15 @@ write_socket (struct bufferevent *bev, void *arg)
        }
        else if (session->state == STATE_REPLY) {
                session->state = STATE_COMMAND;
+               rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
        }
-       bufferevent_disable (bev, EV_WRITE);
-       bufferevent_enable (bev, EV_READ);
 }
 
 static void
-err_socket (struct bufferevent *bev, short what, void *arg)
+err_socket (GError *err, void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
-       msg_info ("closing control connection");
+       msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);
        /* Free buffers */
        free_session (session);
 }
@@ -525,10 +491,11 @@ accept_socket (int fd, short what, void *arg)
        new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
        worker->srv->stat->control_connections_count ++;
 
-       /* Read event */
-       new_session->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_session);
-       bufferevent_write (new_session->bev, greetingbuf, strlen (greetingbuf));
-       bufferevent_enable (new_session->bev, EV_WRITE);
+       /* Set up dispatcher */
+       new_session->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+                                                                                                               write_socket, err_socket, &io_tv,
+                                                                                                               (void *)new_session);
+       rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE);
 }
 
 void
@@ -592,6 +559,9 @@ start_controller (struct rspamd_worker *worker)
 
        /* Send SIGUSR2 to parent */
        kill (getppid (), SIGUSR2);
+       
+       io_tv.tv_sec = CONTROLLER_IO_TIMEOUT;
+       io_tv.tv_usec = 0;
 
        event_loop (0);
 }
index 088ac4376dffff3899a6cff2839a09a80e9cd8aa..a1d1e43fa2178a59438bac5be696050c3fe39b89 100644 (file)
@@ -234,9 +234,6 @@ continue_process_filters (struct worker_task *task)
                        }
                        /* Process all metrics */
                        g_hash_table_foreach (task->results, metric_process_callback, task);
-                       /* All done */
-                       bufferevent_enable (task->bev, EV_WRITE);
-                       evbuffer_drain (task->bev->output, EVBUFFER_LENGTH (task->bev->output));
                        process_statfiles (task);
                        return 1;
        }
index 896cd8dcfa83a182711dd3a27ba73efe85c390e0..000ba74c6959c30bb49ff8d0632431e524f0d5cf 100644 (file)
@@ -5,12 +5,7 @@
 #ifndef FSTRING_H
 #define FSTRING_H
 
-#include <sys/types.h>
 #include "config.h"
-
-#ifdef HAVE_STDINT_H
-#include <stdint.h>
-#endif
 #include "mem_pool.h"
 
 #define update_buf_size(x) (x)->free = (x)->buf->size - ((x)->pos - (x)->buf->begin); (x)->buf->len = (x)->pos - (x)->buf->begin
index e1244d0ba0cdb52410f5f376cccaceaaa905bd07..ec74ad03bedc22cbef9384fbc66097f470a9a3ef 100644 (file)
@@ -14,6 +14,7 @@
 #include "memcached.h"
 #include "protocol.h"
 #include "filter.h"
+#include "buffer.h"
 
 /* Default values */
 #define FIXED_CONFIG_FILE "./rspamd.conf"
@@ -133,14 +134,14 @@ struct controller_session {
        /* Access to authorized commands */
        int authorized;                                                                                         /**< whether this session is authorized                         */
        memory_pool_t *session_pool;                                                            /**< memory pool for session                                            */
-       struct bufferevent *bev;                                                                        /**< buffered event for IO                                                      */
        struct config_file *cfg;                                                                        /**< pointer to config file                                                     */
        char *learn_rcpt;                                                                                       /**< recipient for learning                                                     */
        char *learn_from;                                                                                       /**< from address for learning                                          */
        struct tokenizer *learn_tokenizer;                                                      /**< tokenizer for learning                                                     */
        struct classifier *learn_classifier;                                            /**< classifier for learning                                            */
        char *learn_filename;                                                                           /**< real filename for learning                                         */
-       f_str_buf_t *learn_buf;                                                                         /**< learn input                                                                        */
+       rspamd_io_dispatcher_t *dispatcher;                                                     /**< IO dispatcher object                                                       */
+       f_str_t *learn_buf;                                                                                     /**< learn input                                                                        */
        GList *parts;                                                                                           /**< extracted mime parts                                                       */
        int in_class;                                                                                           /**< positive or negative learn                                         */
 };
@@ -168,8 +169,8 @@ struct worker_task {
        GList *rcpt;                                                                                            /**< recipients list                                                            */
        unsigned int nrcpt;                                                                                     /**< number of recipients                                                       */
        struct in_addr from_addr;                                                                       /**< client addr in numeric form                                        */
-       f_str_buf_t *msg;                                                                                       /**< message buffer                                                                     */
-       struct bufferevent *bev;                                                                        /**< buffered event for IO                                                      */
+       f_str_t *msg;                                                                                           /**< message buffer                                                                     */
+       rspamd_io_dispatcher_t *dispatcher;                                                     /**< IO dispatcher object                                                       */
        memcached_ctx_t *memc_ctx;                                                                      /**< memcached context associated with task                     */
        int parts_count;                                                                                        /**< mime parts count                                                           */
        GMimeMessage *message;                                                                          /**< message, parsed with GMime                                         */
index 5c0f15cf992682b84f008279fb4ef1673f9605f0..d5ec43653782cfee9df8f03370a33e4431dbb219 100644 (file)
@@ -304,7 +304,7 @@ process_message (struct worker_task *task)
        GMimeParser *parser;
        GMimeStream *stream;
 
-       stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len);
+       stream = g_mime_stream_mem_new_with_buffer (task->msg->begin, task->msg->len);
        /* create a new parser object to parse the stream */
        parser = g_mime_parser_new_with_stream (stream);
 
@@ -393,7 +393,7 @@ process_learn (struct controller_session *session)
        GMimeParser *parser;
        GMimeStream *stream;
 
-       stream = g_mime_stream_mem_new_with_buffer (session->learn_buf->buf->begin, session->learn_buf->buf->len);
+       stream = g_mime_stream_mem_new_with_buffer (session->learn_buf->begin, session->learn_buf->len);
        /* create a new parser object to parse the stream */
        parser = g_mime_parser_new_with_stream (stream);
 
index 1ecfcb36dcc1712f39f78e0cd25bab53152f1474..9cc37ea7ea73ffaea8be81a6f2fd9f4c4a3db24b 100644 (file)
@@ -180,7 +180,7 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task)
                        return 0;
                case REGEXP_MESSAGE:
                        msg_debug ("process_message: checking message regexp: /%s/", re->regexp_text);
-                       if (g_regex_match_full (re->regexp, task->msg->buf->begin, task->msg->buf->len, 0, 0, NULL, NULL) == TRUE) {
+                       if (g_regex_match_full (re->regexp, task->msg->begin, task->msg->len, 0, 0, NULL, NULL) == TRUE) {
                                return 1;
                        }
                        return 0;
index 92b331ac2f616a68c819efd5a63e0ab89dde2b8b..ab0db5f781badbee2ab8c553320df28ca99cb811 100644 (file)
@@ -174,14 +174,17 @@ static int
 parse_header (struct worker_task *task, char *line)
 {
        char *headern, *err, *tmp;
-
+       
+       msg_debug ("parse_header: got line from worker: %s", line);
        /* Check end of headers */
        if (*line == '\0') {
+               msg_debug ("parse_header: got empty line, assume it as end of headers");
                if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
                        task->state = WRITE_REPLY;
                }
                else {
                        if (task->content_length > 0) {
+                rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
                                task->state = READ_MESSAGE;
                        }
                        else {
@@ -209,14 +212,7 @@ parse_header (struct worker_task *task, char *line)
                        if (strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
                 if (task->content_length == 0) {
                                    task->content_length = strtoul (line, &err, 10);
-                                   task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_buf_t));
-                                   task->msg->buf = fstralloc (task->task_pool, task->content_length);
-                                   if (task->msg->buf == NULL) {
-                                           msg_err ("read_socket: cannot allocate memory for message buffer");
-                                           return -1;
-                                   }
-                                       task->msg->pos = task->msg->buf->begin;
-                       update_buf_size (task->msg);
+                                       msg_debug ("parse_header: read Content-Length header, value: %lu", (unsigned long int)task->content_length);
                                }
                        }
                        else {
@@ -229,6 +225,7 @@ parse_header (struct worker_task *task, char *line)
                        /* helo */
                        if (strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
                                task->helo = memory_pool_strdup (task->task_pool, line);
+                               msg_debug ("parse_header: read helo header, value: %s", task->helo);
                        }
                        else {
                                msg_info ("parse_header: wrong header: %s", headern);
@@ -240,6 +237,7 @@ parse_header (struct worker_task *task, char *line)
                        /* from */
                        if (strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
                                task->from = memory_pool_strdup (task->task_pool, line);
+                               msg_debug ("parse_header: read from header, value: %s", task->from);
                        }
                        else {
                                msg_info ("parse_header: wrong header: %s", headern);
@@ -252,6 +250,7 @@ parse_header (struct worker_task *task, char *line)
                        if (strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
                                tmp = memory_pool_strdup (task->task_pool, line);
                                task->rcpt = g_list_prepend (task->rcpt, tmp);
+                               msg_debug ("parse_header: read rcpt header, value: %s", tmp);
                        }
                        else {
                                msg_info ("parse_header: wrong header: %s", headern);
@@ -263,6 +262,7 @@ parse_header (struct worker_task *task, char *line)
                        /* nrcpt */
                        if (strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
                                task->nrcpt = strtoul (line, &err, 10);
+                               msg_debug ("parse_header: read rcpt header, value: %d", (int)task->nrcpt);
                        }
                        else {
                                msg_info ("parse_header: wrong header: %s", headern);
@@ -277,6 +277,7 @@ parse_header (struct worker_task *task, char *line)
                                        msg_info ("parse_header: bad ip header: '%s'", line);
                                        return -1;
                                }
+                               msg_debug ("parse_header: read IP header, value: %s", line);
                        }
                        else {
                                msg_info ("parse_header: wrong header: %s", headern);
@@ -292,14 +293,14 @@ parse_header (struct worker_task *task, char *line)
 }
 
 int
-read_rspamd_input_line (struct worker_task *task, char *line)
+read_rspamd_input_line (struct worker_task *task, f_str_t *line)
 {
        switch (task->state) {
                case READ_COMMAND:
-                       return parse_command (task, line);
+                       return parse_command (task, fstrcstr (line, task->task_pool));
                        break;
                case READ_HEADER:
-                       return parse_header (task, line);
+                       return parse_header (task, fstrcstr (line, task->task_pool));
                        break;
        }
 }
@@ -323,7 +324,7 @@ show_url_header (struct worker_task *task)
                /* Do header folding */
                if (host.len + r >= OUTBUFSIZ - 3) {
                        outbuf[r ++] = '\r'; outbuf[r ++] = '\n'; outbuf[r] = ' ';
-                       bufferevent_write (task->bev, outbuf, r);
+                       rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
                        r = 0;
                }
                /* Write url host to buf */
@@ -340,7 +341,7 @@ show_url_header (struct worker_task *task)
                        *(host.begin + host.len) = c;
                }
        }
-       bufferevent_write (task->bev, outbuf, r);
+       rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
 }
 
 static void
@@ -363,7 +364,7 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
                r = snprintf (outbuf, sizeof (outbuf), "%s: %s ; %.2f / %.2f" CRLF, (char *)metric_name,
                                        (is_spam) ? "True" : "False", metric_res->score, metric_res->metric->required_score);
        }
-       bufferevent_write (task->bev, outbuf, r);
+       rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
 }
 
 static int
@@ -374,7 +375,7 @@ write_check_reply (struct worker_task *task)
        struct metric_result *metric_res;
 
        r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK");
-       bufferevent_write (task->bev, outbuf, r);
+       rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
        if (task->proto == SPAMC_PROTO) {
                /* Ignore metrics, just write report for 'default' metric */
                metric_res = g_hash_table_lookup (task->results, "default");
@@ -391,7 +392,7 @@ write_check_reply (struct worker_task *task)
                /* URL stat */
                show_url_header (task);
        }
-       bufferevent_write (task->bev, CRLF, sizeof (CRLF) - 1);
+       rspamd_dispatcher_write (task->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE);
 
        return 0;
 }
@@ -423,7 +424,7 @@ show_metric_symbols (gpointer metric_name, gpointer metric_value, void *user_dat
        g_list_free (symbols);
        msg_debug ("show_metric_symbols: write symbols line: %s", outbuf);
        outbuf[r++] = '\r'; outbuf[r++] = '\n';
-       bufferevent_write (task->bev, outbuf, r);
+       rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
 }
 
 static int
@@ -450,6 +451,7 @@ write_symbols_reply (struct worker_task *task)
                /* Write result for each metric separately */
                g_hash_table_foreach (task->results, show_metric_symbols, task);
        }
+
        return 0;
 }
 
@@ -460,9 +462,9 @@ write_process_reply (struct worker_task *task)
        char outbuf[OUTBUFSIZ];
 
        r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF "Content-Length: %zd" CRLF CRLF, 
-                                       (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->buf->len);
-       bufferevent_write (task->bev, outbuf, r);
-       bufferevent_write (task->bev, task->msg->buf->begin, task->msg->buf->len);
+                                       (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER, "OK", task->msg->len);
+       rspamd_dispatcher_write (task->dispatcher, outbuf, r, TRUE);
+       rspamd_dispatcher_write (task->dispatcher, task->msg->begin, task->msg->len, FALSE);
 
        return 0;
 }
@@ -486,7 +488,7 @@ write_reply (struct worker_task *task)
                        msg_debug ("write_reply: writing error: %s", outbuf);
                }
                /* Write to bufferevent error message */
-               bufferevent_write (task->bev, outbuf, r);
+               rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
        }
        else {
                switch (task->cmd) {
@@ -504,11 +506,11 @@ write_reply (struct worker_task *task)
                        case CMD_SKIP:
                                r = snprintf (outbuf, sizeof (outbuf), "%s 0 %s" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER,
                                                                                                                                SPAMD_OK);
-                               bufferevent_write (task->bev, outbuf, r);
+                               rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
                                break;
                        case CMD_PING:
                                r = snprintf (outbuf, sizeof (outbuf), "%s 0 PONG" CRLF, (task->proto == SPAMC_PROTO) ? SPAMD_REPLY_BANNER : RSPAMD_REPLY_BANNER);
-                               bufferevent_write (task->bev, outbuf, r);
+                               rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE);
                                break;
                }
        }
index 243e216aa78a41c097c9217b961adc9d7ea92958..74e7f7f98a8ebd0b3d6df53ae6269b03ce98f9f8 100644 (file)
@@ -36,7 +36,7 @@ enum rspamd_command {
  * @param line line of user's input
  * @return 0 if line was successfully parsed and -1 if we have protocol error
  */
-int read_rspamd_input_line (struct worker_task *task, char *line);
+int read_rspamd_input_line (struct worker_task *task, f_str_t *line);
 
 /**
  * Write reply for specified task command
index 952c3a93eb299de462014aa74f84ee43ac415a9f..cffa5e06b17e685ebc9d5e51903966aa2c82c27a 100644 (file)
@@ -900,51 +900,6 @@ resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *fro
        return new;
 }
 
-/*
- * These functions are from libevent where they are static and not exported anywhere
- * XXX: think how to avoid this
- */
-
-char *
-buffer_readline (memory_pool_t *pool, struct evbuffer *buf)
-{
-       u_char *data = EVBUFFER_DATA (buf);
-       size_t len = EVBUFFER_LENGTH (buf);
-       char *line, fch, sch;
-       unsigned int i;
-
-       for (i = 0; i < len; i++) {
-               if (data[i] == '\r' || data[i] == '\n') {
-                       break;
-               }
-       }
-
-       if (i == len) {
-               return (NULL);
-       }
-       
-       line = memory_pool_alloc (pool, i + 1);
-
-       memcpy (line, data, i);
-       line[i] = '\0';
-
-       /*
-        * Some protocols terminate a line with '\r\n', so check for
-        * that, too.
-        */
-       if ( i < len - 1 ) {
-               fch = data[i], sch = data[i+1];
-
-               /* Drain one more character if needed */
-               if ( (sch == '\r' || sch == '\n') && sch != fch )
-                       i += 1;
-       }
-
-       evbuffer_drain (buf, i + 1);
-
-       return (line);
-}
-
 /*
  * vi:ts=4
  */
index fa5bbfbdb7043e5a2a0180a9220a3c0f87cca0c5..69ae0fe92c69d11682bf3a16fcd829c9b53215ec 100644 (file)
@@ -55,8 +55,4 @@ void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const
 /* Replace %r with rcpt value and %f with from value, new string is allocated in pool */
 char* resolve_stat_filename (memory_pool_t *pool, char *pattern, char *rcpt, char *from);
 
-/* Replace libevent evbuffer_readline with memory_pool variant */
-char* buffer_readline (memory_pool_t *pool, struct evbuffer *buf);
-
-
 #endif
index caacda8384f6a03f2a8c2550de8b89cacf0d2344..6d0c413cd448e5399e5c3ef7405931e59817cfcd 100644 (file)
@@ -40,6 +40,8 @@
 #include <perl.h>                 /* from the Perl distribution     */
 
 #define TASK_POOL_SIZE 4095
+/* 2 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 2
 
 const f_str_t CRLF = {
        /* begin */"\r\n",
@@ -47,8 +49,12 @@ const f_str_t CRLF = {
        /* size */2
 };
 
+static struct timeval io_tv;
+
 extern PerlInterpreter *perl_interpreter;
 
+static void write_socket (void *arg);
+
 static 
 void sig_handler (int signo)
 {
@@ -113,8 +119,7 @@ free_task (struct worker_task *task)
                        g_list_free_1 (part);
                }
                memory_pool_delete (task->task_pool);
-               bufferevent_disable (task->bev, EV_READ | EV_WRITE);
-               bufferevent_free (task->bev);
+               rspamd_remove_dispatcher (task->dispatcher);
                close (task->sock);
                g_free (task);
        }
@@ -124,66 +129,40 @@ free_task (struct worker_task *task)
  * Callback that is called when there is data to read in buffer
  */
 static void
-read_socket (struct bufferevent *bev, void *arg)
+read_socket (f_str_t *in, void *arg)
 {
        struct worker_task *task = (struct worker_task *)arg;
        ssize_t r;
-       char *s;
 
        switch (task->state) {
                case READ_COMMAND:
                case READ_HEADER:
-                       s = buffer_readline (task->task_pool, EVBUFFER_INPUT (bev));
-                       if (s == NULL) {
-                               msg_debug ("read_socket: got incomplete line from user");
-                               return;
-                       }
-                       if (read_rspamd_input_line (task, s) != 0) {
+                       if (read_rspamd_input_line (task, in) != 0) {
                                task->last_error = "Read error";
                                task->error_code = RSPAMD_NETWORK_ERROR;
                                task->state = WRITE_ERROR;
-                       }
-                       if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
-                               bufferevent_enable (bev, EV_WRITE);
-                               bufferevent_disable (bev, EV_READ);
+                               write_socket (task);
                        }
                        break;
                case READ_MESSAGE:
-                       r = bufferevent_read (bev, task->msg->pos, task->msg->free);
-                       if (r > 0) {
-                               task->msg->pos += r;
-                msg_debug ("read_socket: read %zd bytes from socket, %zd bytes left", r, task->msg->free);
-                               update_buf_size (task->msg);
-                               if (task->msg->free == 0) {
-                                       r = process_message (task);
-                                       r = process_filters (task);
-                                       if (r == -1) {
-                                               task->last_error = "Filter processing error";
-                                               task->error_code = RSPAMD_FILTER_ERROR;
-                                               task->state = WRITE_ERROR;
-                                       }
-                                       else if (r == 0) {
-                                               task->state = WAIT_FILTER;
-                                       }
-                                       else {
-                                               process_statfiles (task);
-                                       }
-                               }
-                               if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
-                                       bufferevent_enable (bev, EV_WRITE);
-                                       bufferevent_disable (bev, EV_READ);
-                                       evbuffer_drain (bev->output, EVBUFFER_LENGTH (bev->output));
-                               }
+                       task->msg = in;
+                       r = process_message (task);
+                       r = process_filters (task);
+                       if (r == -1) {
+                               task->last_error = "Filter processing error";
+                               task->error_code = RSPAMD_FILTER_ERROR;
+                               task->state = WRITE_ERROR;
+                               write_socket (task);
+                       }
+                       else if (r == 0) {
+                               task->state = WAIT_FILTER;
+                               rspamd_dispatcher_pause (task->dispatcher);
                        }
                        else {
-                               msg_warn ("read_socket: cannot read data to buffer (free space: %zd): %ld", task->msg->free, (long int)r);
-                               bufferevent_disable (bev, EV_READ);
-                               free_task (task);
+                               process_statfiles (task);
+                               write_socket (task);
                        }
                        break;
-               case WAIT_FILTER:
-                       bufferevent_disable (bev, EV_READ);
-                       break;
        }
 }
 
@@ -191,7 +170,7 @@ read_socket (struct bufferevent *bev, void *arg)
  * Callback for socket writing
  */
 static void
-write_socket (struct bufferevent *bev, void *arg)
+write_socket (void *arg)
 {
        struct worker_task *task = (struct worker_task *)arg;
        
@@ -199,12 +178,10 @@ write_socket (struct bufferevent *bev, void *arg)
                case WRITE_REPLY:
                        write_reply (task);
                        task->state = CLOSING_CONNECTION;
-                       bufferevent_disable (bev, EV_READ);
                        break;
                case WRITE_ERROR:
                        write_reply (task);
                        task->state = CLOSING_CONNECTION;
-                       bufferevent_disable (bev, EV_READ);
                        break;
                case CLOSING_CONNECTION:
                        msg_debug ("write_socket: normally closing connection");
@@ -221,10 +198,10 @@ write_socket (struct bufferevent *bev, void *arg)
  * Called if something goes wrong
  */
 static void
-err_socket (struct bufferevent *bev, short what, void *arg)
+err_socket (GError *err, void *arg)
 {
        struct worker_task *task = (struct worker_task *)arg;
-       msg_info ("err_socket: abnormally closing connection");
+       msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
        /* Free buffers */
        free_task (task);
 }
@@ -266,9 +243,10 @@ accept_socket (int fd, short what, void *arg)
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
        worker->srv->stat->connections_count ++;
 
-       /* Read event */
-       new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);
-       bufferevent_enable (new_task->bev, EV_READ);
+       /* Set up dispatcher */
+       new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+                                                                                                               write_socket, err_socket, &io_tv,
+                                                                                                               (void *)new_task);
 }
 
 /*
@@ -304,6 +282,9 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
        /* Send SIGUSR2 to parent */
        kill (getppid (), SIGUSR2);
 
+       io_tv.tv_sec = WORKER_IO_TIMEOUT;
+       io_tv.tv_usec = 0;
+
        event_loop (0);
 }