#include "config.h"
#include "buffer.h"
+#include "main.h"
#define G_DISPATCHER_ERROR dispatcher_error_quark()
cur = g_list_next (cur);
continue;
}
-
r = write (fd, buf->pos, BUFREMAIN (buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
buf->pos += r;
if (BUFREMAIN (buf) != 0) {
/* Continue with this buffer */
+ msg_debug ("write_buffers: wrote %ld bytes of %ld", (long int)r, (long int)buf->data->len);
continue;
}
}
}
else if (errno == EAGAIN) {
/* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
return;
}
cur = g_list_next (cur);
g_list_free (d->out_buffers);
d->out_buffers = NULL;
+ msg_debug ("write_buffers: all buffers were written successfully");
+
if (d->write_callback) {
d->write_callback (d->user_data);
+ if (d->wanna_die) {
+ msg_debug ("write_buffers: callback set wanna_die flag, terminating");
+ rspamd_remove_dispatcher (d);
+ return;
+ }
}
-
+
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
}
+ else {
+ /* Plan other write event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
}
static void
read_buffers (int fd, rspamd_io_dispatcher_t *d)
{
- ssize_t r, len;
+ ssize_t r;
GError *err;
f_str_t res;
char *c;
+ unsigned int len;
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
res.len = r;
if (d->read_callback) {
d->read_callback (&res, d->user_data);
+ if (d->wanna_die) {
+ msg_debug ("read_buffers: callback set wanna_die flag, terminating");
+ rspamd_remove_dispatcher (d);
+ return;
+ }
if (r < len - 1 && *(c + 1) == '\n') {
r ++;
c ++;
}
/* Move remaining string to begin of buffer (draining) */
- memmove (d->in_buf->data->begin, c, len - r);
+ memmove (d->in_buf->data->begin, c + 1, len - r - 1);
c = d->in_buf->data->begin;
d->in_buf->data->len -= r + 1;
d->in_buf->pos -= r + 1;
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
- event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new);
+ event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
event_add (new->ev, new->tv);
return new;
rspamd_buffer_t *newbuf;
newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
- newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t));
-
- newbuf->data->begin = memory_pool_alloc (d->pool, len);
+ newbuf->data = fstralloc (d->pool, len);
+
+ /* We need to copy data to temporary internal buffer to avoid using of stack variables */
memcpy (newbuf->data->begin, data, len);
- newbuf->data->size = len;
newbuf->pos = newbuf->data->begin;
+ newbuf->data->len = len;
d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
if (!delayed) {
+ msg_debug ("rspamd_dispatcher_write: plan write event");
event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
event_add (d->ev, d->tv);
}
}
#include <perl.h> /* from the Perl distribution */
#define TASK_POOL_SIZE 4095
-/* 2 seconds for worker's IO */
-#define WORKER_IO_TIMEOUT 2
+/* 60 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 60
const f_str_t CRLF = {
/* begin */"\r\n",
g_list_free_1 (part);
}
memory_pool_delete (task->task_pool);
- rspamd_remove_dispatcher (task->dispatcher);
+ /* Plan dispatcher shutdown */
+ task->dispatcher->wanna_die = 1;
close (task->sock);
g_free (task);
}
struct sockaddr_storage ss;
struct worker_task *new_task;
socklen_t addrlen = sizeof(ss);
- int nfd;
+ int nfd, on = 1;
+ struct linger linger;
if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
return;
if (event_make_socket_nonblocking(fd) < 0) {
return;
}
+
+ /* Socket options */
+ setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
+ setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
+ linger.l_onoff = 1;
+ linger.l_linger = 2;
+ setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));
new_task = g_malloc (sizeof (struct worker_task));
if (new_task == NULL) {