return;
}
}
- else if (errno == EAGAIN) {
+ else if (r == -1 && errno == EAGAIN) {
+ msg_debug ("write_buffers: partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
f_str_t res;
char *c;
unsigned int len;
+ enum io_policy saved_policy;
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
return;
}
}
- else if (errno == EAGAIN) {
+ else if (r == -1 && errno == EAGAIN) {
+ msg_debug ("read_buffers: partially read data, retry");
return;
}
else {
}
+ msg_debug ("read_buffers: read %ld characters, policy is %s, watermark is: %ld",
+ (long int)r, d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
+ (long int)d->nchars);
+ saved_policy = d->policy;
c = d->in_buf->data->begin;
r = 0;
len = d->in_buf->data->len;
d->in_buf->pos -= r + 1;
r = 0;
len = d->in_buf->data->len;
+ if (d->policy != saved_policy) {
+ msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d);
+ return;
+ }
continue;
}
}
}
break;
case BUFFER_CHARACTER:
- while (r < len) {
+ while (r <= len) {
if (r == d->nchars) {
res.begin = d->in_buf->data->begin;
res.len = r;
d->in_buf->pos -= r;
r = 0;
len = d->in_buf->data->len;
+ if (d->policy != saved_policy) {
+ msg_debug ("read_buffers: policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d);
+ return;
+ }
continue;
}
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
GError *err;
+ msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
+
switch (what) {
case EV_TIMEOUT:
if (d->err_callback) {
size_t nchars)
{
f_str_t *tmp;
+ int t;
if (d->policy != policy) {
d->policy = policy;
if (d->in_buf && d->in_buf->data->size < nchars) {
tmp = fstralloc (d->pool, d->nchars);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
d->in_buf->data = tmp;
+ d->in_buf->pos = d->in_buf->data->begin + t;
}
}
else if (policy == BUFFER_LINE) {
if (d->in_buf && d->nchars < BUFSIZ) {
tmp = fstralloc (d->pool, BUFSIZ);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
d->in_buf->data = tmp;
+ d->in_buf->pos = d->in_buf->data->begin + t;
}
}
}
+
+ msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
}
void
dns_callback (int result, char type, int count, int ttl, void *addresses, void *data)
{
struct memcached_param *param = (struct memcached_param *)data;
+ char c;
msg_debug ("dns_callback: in surbl request callback");
+ c = *(param->url->host + param->url->hostlen);
+ *(param->url->host + param->url->hostlen) = 0;
/* If we have result from DNS server, this url exists in SURBL, so increase score */
if (result == DNS_ERR_NONE && type == DNS_IPv4_A) {
msg_info ("surbl_check: url %s is in surbl %s", param->url->host, surbl_module_ctx->suffix);
else {
msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, surbl_module_ctx->suffix);
}
+ *(param->url->host + param->url->hostlen) = c;
param->task->save.saved --;
if (param->task->save.saved == 0) {
break;
case READ_MESSAGE:
task->msg = in;
+ msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
r = process_message (task);
r = process_filters (task);
if (r == -1) {
new_task->state = READ_COMMAND;
new_task->sock = nfd;
new_task->cfg = worker->srv->cfg;
+ io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ io_tv.tv_usec = 0;
TAILQ_INIT (&new_task->urls);
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
- io_tv.tv_sec = WORKER_IO_TIMEOUT;
- io_tv.tv_usec = 0;
-
event_loop (0);
}