]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* Fix dispatcher timeouts handling
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 20 Feb 2009 15:49:04 +0000 (18:49 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 20 Feb 2009 15:49:04 +0000 (18:49 +0300)
* Add wanna_die flag that can be used in dispatcher's callbacks

src/buffer.c
src/buffer.h
src/controller.c
src/protocol.c
src/worker.c

index 46ef988502f6968d479b7e8495363bd3cb2720cb..268b6cbb1a64e6e65da644aecc4a88705dc68d9a 100644 (file)
@@ -24,6 +24,7 @@
 
 #include "config.h"
 #include "buffer.h"
+#include "main.h"
 
 #define G_DISPATCHER_ERROR dispatcher_error_quark()
 
@@ -57,7 +58,6 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                        cur = g_list_next (cur);
                        continue;
                }
-
                r = write (fd, buf->pos, BUFREMAIN (buf));
                if (r == -1 && errno != EAGAIN) {
                        if (d->err_callback) {
@@ -70,6 +70,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                        buf->pos += r;
                        if (BUFREMAIN (buf) != 0) {
                                /* Continue with this buffer */
+                               msg_debug ("write_buffers: wrote %ld bytes of %ld", (long int)r, (long int)buf->data->len);
                                continue;
                        }
                }
@@ -83,6 +84,9 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                }
                else if (errno == EAGAIN) {
                        /* Wait for other event */
+                       event_del (d->ev);
+                       event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_add (d->ev, d->tv);
                        return;
                }
                cur = g_list_next (cur);
@@ -93,23 +97,37 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                g_list_free (d->out_buffers);
                d->out_buffers = NULL;
 
+               msg_debug ("write_buffers: all buffers were written successfully");
+
                if (d->write_callback) {
                        d->write_callback (d->user_data);
+                       if (d->wanna_die) {
+                               msg_debug ("write_buffers: callback set wanna_die flag, terminating");
+                               rspamd_remove_dispatcher (d);
+                               return;
+                       }
                }
-
+               
                event_del (d->ev);
                event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
                event_add (d->ev, d->tv);
        }
+       else {
+               /* Plan other write event */
+               event_del (d->ev);
+               event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+       }
 }
 
 static void
 read_buffers (int fd, rspamd_io_dispatcher_t *d)
 {
-       ssize_t r, len;
+       ssize_t r;
        GError *err;
        f_str_t res;
        char *c;
+       unsigned int len;
 
        if (d->in_buf == NULL) {
                d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -170,12 +188,17 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
                                        res.len = r;
                                        if (d->read_callback) {
                                                d->read_callback (&res, d->user_data);
+                                               if (d->wanna_die) {
+                                                       msg_debug ("read_buffers: callback set wanna_die flag, terminating");
+                                                       rspamd_remove_dispatcher (d);
+                                                       return;
+                                               }
                                                if (r < len - 1 && *(c + 1) == '\n') {
                                                        r ++;
                                                        c ++;
                                                }
                                                /* Move remaining string to begin of buffer (draining) */
-                                               memmove (d->in_buf->data->begin, c, len - r);
+                                               memmove (d->in_buf->data->begin, c + 1, len - r - 1);
                                                c = d->in_buf->data->begin;
                                                d->in_buf->data->len -= r + 1;
                                                d->in_buf->pos -= r + 1;
@@ -274,7 +297,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
        new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
        new->fd = fd;
 
-       event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new);
+       event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
        event_add (new->ev, new->tv);
 
        return new;
@@ -328,18 +351,19 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
        rspamd_buffer_t *newbuf;
 
        newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
-       newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t));
-
-       newbuf->data->begin = memory_pool_alloc (d->pool, len);
+       newbuf->data = fstralloc (d->pool, len);
+       
+       /* We need to copy data to temporary internal buffer to avoid using of stack variables */
        memcpy (newbuf->data->begin, data, len);
-       newbuf->data->size = len;
        newbuf->pos = newbuf->data->begin;
+       newbuf->data->len = len;
        
        d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
 
        if (!delayed) {
+               msg_debug ("rspamd_dispatcher_write: plan write event");
                event_del (d->ev);
-               event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
                event_add (d->ev, d->tv);
        }
 }
index 93c7818dd4178cca56f67ef43af421d71c1053a5..4c9e5c905655666fd07b07c113baa7537bbe3d31 100644 (file)
@@ -39,6 +39,7 @@ typedef struct rspamd_io_dispatcher_s {
        enum io_policy policy;                                                                                  /**< IO policy                          */
        size_t nchars;                                                                                                  /**< how many chars to read     */
        int fd;                                                                                                                 /**< descriptor                         */
+       gboolean wanna_die;                                                                                             /**< if dispatcher should be stopped */
        dispatcher_read_callback_t read_callback;                                               /**< read callback                      */
        dispatcher_write_callback_t write_callback;                                             /**< write callback                     */
        dispatcher_err_callback_t err_callback;                                                 /**< error callback                     */
index 7733bc924e94f07ec302d3525d53e802eeea4e57..8128d7356945235fa4d4859257d4bf34eeddc4a6 100644 (file)
@@ -457,7 +457,13 @@ static void
 err_socket (GError *err, void *arg)
 {
        struct controller_session *session = (struct controller_session *)arg;
-       msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);
+
+       if (err->code == EOF) {
+               msg_info ("err_socket: client closed control connection");
+       }
+       else {
+               msg_info ("err_socket: abnormally closing control connection, error: %s", err->message);
+       }
        /* Free buffers */
        free_session (session);
 }
index ab0db5f781badbee2ab8c553320df28ca99cb811..543507c61b05800e28194e495c0090dbf1d2522e 100644 (file)
@@ -91,6 +91,7 @@ parse_command (struct worker_task *task, char *line)
 {
        char *token;
 
+       msg_debug ("parse_command: got line from worker: %s", line);
        token = strsep (&line, " ");
        if (line == NULL || token == NULL) {
                msg_debug ("parse_command: bad command: %s", token);
@@ -191,6 +192,7 @@ parse_header (struct worker_task *task, char *line)
                                task->last_error = "Unknown content length";
                                task->error_code = RSPAMD_LENGTH_ERROR;
                                task->state = WRITE_ERROR;
+                               return -1;
                        }
                }
                return 0;
index 6d0c413cd448e5399e5c3ef7405931e59817cfcd..f3bbac8ce36dad3ed4d89a8a499220bfe6c10ddd 100644 (file)
@@ -40,8 +40,8 @@
 #include <perl.h>                 /* from the Perl distribution     */
 
 #define TASK_POOL_SIZE 4095
-/* 2 seconds for worker's IO */
-#define WORKER_IO_TIMEOUT 2
+/* 60 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 60
 
 const f_str_t CRLF = {
        /* begin */"\r\n",
@@ -119,7 +119,8 @@ free_task (struct worker_task *task)
                        g_list_free_1 (part);
                }
                memory_pool_delete (task->task_pool);
-               rspamd_remove_dispatcher (task->dispatcher);
+               /* Plan dispatcher shutdown */
+               task->dispatcher->wanna_die = 1;
                close (task->sock);
                g_free (task);
        }
@@ -216,7 +217,8 @@ accept_socket (int fd, short what, void *arg)
        struct sockaddr_storage ss;
        struct worker_task *new_task;
        socklen_t addrlen = sizeof(ss);
-       int nfd;
+       int nfd, on = 1;
+       struct linger linger;
 
        if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
                return;
@@ -224,6 +226,13 @@ accept_socket (int fd, short what, void *arg)
        if (event_make_socket_nonblocking(fd) < 0) {
                return;
        }
+
+       /* Socket options */
+       setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
+       setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
+       linger.l_onoff = 1;
+       linger.l_linger = 2;
+       setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));
        
        new_task = g_malloc (sizeof (struct worker_task));
        if (new_task == NULL) {