]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* Fix dispatcher bugs and add some debug output
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 2 Mar 2009 06:21:24 +0000 (09:21 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 2 Mar 2009 06:21:24 +0000 (09:21 +0300)
* Fix log output for surbl

src/buffer.c
src/plugins/surbl.c
src/worker.c

index 6cffbea9eec3e289fdeb8e294650911f3cf811b5..8883e314d3b22cec1f6c03e3e68ebf062a594f7f 100644 (file)
@@ -82,7 +82,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
                                return;
                        }
                }
-               else if (errno == EAGAIN) {
+               else if (r == -1 && errno == EAGAIN) {
+                       msg_debug ("write_buffers: partially write data, retry");
                        /* Wait for other event */
                        event_del (d->ev);
                        event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -128,6 +129,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
        f_str_t res;
        char *c;
        unsigned int len;
+       enum io_policy saved_policy;
 
        if (d->in_buf == NULL) {
                d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -166,7 +168,8 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
                                return;
                        }
                }
-               else if (errno == EAGAIN) {
+               else if (r == -1 && errno == EAGAIN) {
+                       msg_debug ("read_buffers: partially read data, retry");
                        return;
                }
                else {
@@ -176,6 +179,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
        
        }
        
+       msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld", 
+                       (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
+                       (long int)d->nchars);
+       saved_policy = d->policy;
        c = d->in_buf->data->begin;
        r = 0;
        len = d->in_buf->data->len;
@@ -204,6 +211,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
                                                d->in_buf->pos -= r + 1;
                                                r = 0;
                                                len = d->in_buf->data->len;
+                                               if (d->policy != saved_policy) {
+                                                       msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+                                                       read_buffers (fd, d);
+                                                       return;
+                                               }
                                                continue;
                                        }
                                }
@@ -212,7 +224,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
                        }
                        break;
                case BUFFER_CHARACTER:
-                       while (r < len) {
+                       while (r <= len) {
                                if (r == d->nchars) {
                                        res.begin = d->in_buf->data->begin;
                                        res.len = r;
@@ -225,6 +237,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d)
                                                d->in_buf->pos -= r;
                                                r = 0;
                                                len = d->in_buf->data->len;
+                                               if (d->policy != saved_policy) {
+                                                       msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+                                                       read_buffers (fd, d);
+                                                       return;
+                                               }
                                                continue;
                                        }
                                
@@ -244,6 +261,8 @@ dispatcher_cb (int fd, short what, void *arg)
        rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
        GError *err;
 
+       msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
+
        switch (what) {
                case EV_TIMEOUT:
                        if (d->err_callback) {
@@ -325,6 +344,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
                                                                        size_t nchars)
 {
        f_str_t *tmp;
+       int t;
 
        if (d->policy != policy) {
                d->policy = policy;
@@ -334,19 +354,25 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
                        if (d->in_buf && d->in_buf->data->size < nchars) {
                                tmp = fstralloc (d->pool, d->nchars);
                                memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+                               t = d->in_buf->pos - d->in_buf->data->begin;
                                tmp->len = d->in_buf->data->len;
                                d->in_buf->data = tmp;
+                               d->in_buf->pos = d->in_buf->data->begin + t;
                        }
                }
                else if (policy == BUFFER_LINE) {
                        if (d->in_buf && d->nchars < BUFSIZ) {
                                tmp = fstralloc (d->pool, BUFSIZ);
                                memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+                               t = d->in_buf->pos - d->in_buf->data->begin;
                                tmp->len = d->in_buf->data->len;
                                d->in_buf->data = tmp;
+                               d->in_buf->pos = d->in_buf->data->begin + t;
                        }
                }
        }
+
+       msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
 }
 
 void 
index e048ed132f36b083f88e7779dbf239fc90324c3e..330e9cc661919f0bd68ff12d1065d04bc9685177 100644 (file)
@@ -310,8 +310,11 @@ static void
 dns_callback (int result, char type, int count, int ttl, void *addresses, void *data)
 {
        struct memcached_param *param = (struct memcached_param *)data;
+       char c;
        
        msg_debug ("dns_callback: in surbl request callback");
+       c = *(param->url->host + param->url->hostlen);
+       *(param->url->host + param->url->hostlen) = 0;
        /* If we have result from DNS server, this url exists in SURBL, so increase score */
        if (result == DNS_ERR_NONE && type == DNS_IPv4_A) {
                msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix);
@@ -320,6 +323,7 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
        else {
                msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, surbl_module_ctx->suffix);
        }
+       *(param->url->host + param->url->hostlen) = c;
 
        param->task->save.saved --;
        if (param->task->save.saved == 0) {
index 143cb54deca63b78545db56edc95d7bf7e59e26e..03912930fd68ce128dff27adf19681463b460af8 100644 (file)
@@ -144,6 +144,7 @@ read_socket (f_str_t *in, void *arg)
                        break;
                case READ_MESSAGE:
                        task->msg = in;
+                       msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
                        r = process_message (task);
                        r = process_filters (task);
                        if (r == -1) {
@@ -242,6 +243,8 @@ accept_socket (int fd, short what, void *arg)
        new_task->state = READ_COMMAND;
        new_task->sock = nfd;
        new_task->cfg = worker->srv->cfg;
+       io_tv.tv_sec = WORKER_IO_TIMEOUT;
+       io_tv.tv_usec = 0;
        TAILQ_INIT (&new_task->urls);
        new_task->task_pool = memory_pool_new (memory_pool_get_size ());
        /* Add destructor for recipients list (it would be better to use anonymous function here */
@@ -289,9 +292,6 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
        /* Send SIGUSR2 to parent */
        kill (getppid (), SIGUSR2);
 
-       io_tv.tv_sec = WORKER_IO_TIMEOUT;
-       io_tv.tv_usec = 0;
-
        event_loop (0);
 }