]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* Check messages received via smtp proxy
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 10 Jun 2010 17:47:22 +0000 (21:47 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 10 Jun 2010 17:47:22 +0000 (21:47 +0400)
* Add support for sendfile in io dispatcher
* Fix issues with compatibility of worker_task and smtp proxy
* Proxy DATA command

CMakeLists.txt
config.h.in
src/buffer.c
src/buffer.h
src/filter.c
src/main.h
src/smtp.c
src/smtp.h
src/smtp_proto.c
src/smtp_proto.h

index 9e929ec1e43efdb5b7097d5c6edbbf1ea8cd85d5..72bf8b97abbc29a4a38f4ba2850ee3b73dddaf5a 100644 (file)
@@ -281,6 +281,7 @@ CHECK_INCLUDE_FILES(pwd.h HAVE_PWD_H)
 CHECK_INCLUDE_FILES(grp.h HAVE_GRP_H)
 CHECK_INCLUDE_FILES(glob.h HAVE_GLOB_H)
 CHECK_INCLUDE_FILES(poll.h HAVE_POLL_H)
+CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
 
 IF(HAVE_SYS_WAIT_H)
        LIST(APPEND CMAKE_REQUIRED_INCLUDES sys/wait.h)
@@ -294,6 +295,7 @@ CHECK_FUNCTION_EXISTS(wait4 HAVE_WAIT4)
 CHECK_FUNCTION_EXISTS(waitpid HAVE_WAITPID)
 CHECK_FUNCTION_EXISTS(flock HAVE_FLOCK)
 CHECK_FUNCTION_EXISTS(tanhl HAVE_TANHL)
+CHECK_FUNCTION_EXISTS(sendfile HAVE_SENDFILE)
 
 CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX)
 CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
index ebb48f50833764fb6ab0f72427c638e8c16726ff..bbac937c1e5edf3771f83cf8458dfb444c6b2376 100644 (file)
 
 #cmakedefine BUILD_STATIC        1
 
+#cmakedefine HAVE_SENDFILE       1
+#cmakedefine HAVE_SYS_SENDFILE_H 1
+
 #define RVERSION          "${RSPAMD_VERSION}"
 #define RSPAMD_MASTER_SITE_URL "${RSPAMD_MASTER_SITE_URL}"
 
 #define HAVE_SETLOCALE 1
 #endif
 
+#ifdef HAVE_SYS_SENDFILE_H
+#include <sys/sendfile.h>
+#endif
+
 #ifdef WITH_GPERF_TOOLS
 #include <google/profiler.h>
 #endif
index 7dd43d2ade9886b2d8c23678453b56447576ed16..5eb2c81d145fca26246312fe4979418206d35d51 100644 (file)
@@ -36,6 +36,126 @@ dispatcher_error_quark (void)
        return g_quark_from_static_string ("g-dispatcher-error-quark");
 }
 
+static gboolean
+sendfile_callback (rspamd_io_dispatcher_t *d)
+{
+       ssize_t                         r;
+       GError                         *err;
+
+#ifdef HAVE_SENDFILE
+       #if defined(FREEBSD)
+       off_t                           off = 0;
+       /* FreeBSD version */
+       if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, 0, &off, 0) != 0) {
+               if (errno != EAGAIN) {
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+                               d->err_callback (err, d->user_data);
+                               return FALSE;
+                       }
+               }
+               else {
+                       debug_ip (d->peer_addr, "partially write data, retry");
+                       /* Wait for other event */
+                       d->offset += off;
+                       event_del (d->ev);
+                       event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_add (d->ev, d->tv);
+               }
+       }
+       else {
+               if (d->write_callback) {
+                       if (!d->write_callback (d->user_data)) {
+                               debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+                               return FALSE;
+                       }
+               }
+               event_del (d->ev);
+               event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+               d->in_sendfile = FALSE;
+       }
+       #else
+       /* Linux version */
+       r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
+       if (r == -1) {
+               if (errno != EAGAIN) {
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+                               d->err_callback (err, d->user_data);
+                               return FALSE;
+                       }
+               }
+               else {
+                       debug_ip (d->peer_addr, "partially write data, retry");
+                       /* Wait for other event */
+                       event_del (d->ev);
+                       event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_add (d->ev, d->tv);
+               }
+       }
+       else if (r + d->offset < d->file_size) {
+               debug_ip (d->peer_addr, "partially write data, retry");
+               /* Wait for other event */
+               event_del (d->ev);
+               event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+       }
+       else {
+               if (d->write_callback) {
+                       if (!d->write_callback (d->user_data)) {
+                               debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+                               return FALSE;
+                       }
+               }
+               event_del (d->ev);
+               event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+               d->in_sendfile = FALSE;
+       }
+       #endif
+#else
+       r = write (d->fd, d->map, d->file_size - d->offset);
+       if (r == -1) {
+               if (errno != EAGAIN) {
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+                               d->err_callback (err, d->user_data);
+                               return FALSE;
+                       }
+               }
+               else {
+                       debug_ip (d->peer_addr, "partially write data, retry");
+                       /* Wait for other event */
+                       event_del (d->ev);
+                       event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_add (d->ev, d->tv);
+               }
+       }
+       else if (r + d->offset < d->file_size) {
+               d->offset += r;
+               debug_ip (d->peer_addr, "partially write data, retry");
+               /* Wait for other event */
+               event_del (d->ev);
+               event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+       }
+       else {
+               if (d->write_callback) {
+                       if (!d->write_callback (d->user_data)) {
+                               debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+                               return FALSE;
+                       }
+               }
+               event_del (d->ev);
+               event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+               event_add (d->ev, d->tv);
+               d->in_sendfile = FALSE;
+       }
+#endif
+       return TRUE;
+}
+
 #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
 
 static                          gboolean
@@ -139,7 +259,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
 
        if (d->in_buf == NULL) {
                d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
-               if (d->policy == BUFFER_LINE) {
+               if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
                        d->in_buf->data = fstralloc (d->pool, BUFSIZ);
                }
                else {
@@ -254,6 +374,22 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
                        }
                }
                break;
+       case BUFFER_ANY:
+               res.begin = d->in_buf->data->begin;
+               res.len = *len;
+               if (d->read_callback) {
+                       if (!d->read_callback (&res, d->user_data)) {
+                               return;
+                       }
+                       if (d->policy != saved_policy) {
+                               debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
+                               read_buffers (fd, d, TRUE);
+                               return;
+                       }
+               }
+               d->in_buf->pos = d->in_buf->data->begin;
+               d->in_buf->data->len = 0;
+               break;
        }
 }
 
@@ -276,14 +412,19 @@ dispatcher_cb (int fd, short what, void *arg)
                break;
        case EV_WRITE:
                /* No data to write, disable further EV_WRITE to this fd */
-               if (d->out_buffers == NULL) {
-                       event_del (d->ev);
-                       event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
-                       event_add (d->ev, d->tv);
+               if (d->in_sendfile) {
+                       sendfile_callback (d);
                }
                else {
-                       /* Delayed write */
-                       write_buffers (fd, d, TRUE);
+                       if (d->out_buffers == NULL) {
+                               event_del (d->ev);
+                               event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+                               event_add (d->ev, d->tv);
+                       }
+                       else {
+                               /* Delayed write */
+                               write_buffers (fd, d, TRUE);
+                       }
                }
                break;
        case EV_READ:
@@ -315,6 +456,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
                new->tv = NULL;
        }
        new->nchars = 0;
+       new->in_sendfile = FALSE;
        new->policy = policy;
        new->read_callback = read_cb;
        new->write_callback = write_cb;
@@ -363,7 +505,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
                                d->in_buf->pos = d->in_buf->data->begin + t;
                        }
                }
-               else if (policy == BUFFER_LINE) {
+               else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
                        if (d->in_buf && d->nchars < BUFSIZ) {
                                tmp = fstralloc (d->pool, BUFSIZ);
                                memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
@@ -413,6 +555,34 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
        return TRUE;
 }
 
+
+gboolean 
+rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len)
+{
+       if (lseek (fd, 0, SEEK_SET) == -1) {
+               msg_warn ("lseek failed: %s", strerror (errno));
+               return FALSE;
+       }
+
+       d->offset = 0;
+       d->in_sendfile = TRUE;
+       d->sendfile_fd = fd;
+       d->file_size = len;
+
+#ifndef HAVE_SENDFILE
+       #ifdef HAVE_MMAP_NOCORE
+       if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
+       #else
+       if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+       #endif
+               msg_warn ("mmap failed: %s", strerror (errno));
+               return FALSE;
+       }
+#endif
+
+       return sendfile_callback (d);
+}
+
 void
 rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
 {
index 4cf9de5558af80dbbddd87facede85012add9ec6..9f3897d1c4161b40877ad4ee2ad2c936b2d41c35 100644 (file)
@@ -20,6 +20,7 @@ typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
 enum io_policy {
        BUFFER_LINE,                                                                                                    /**< call handler when we have line ready */
        BUFFER_CHARACTER,                                                                                               /**< call handler when we have some characters */
+       BUFFER_ANY                                                                                                              /**< call handler whenever we got data in buffer */
 };
 
 /**
@@ -45,6 +46,13 @@ typedef struct rspamd_io_dispatcher_s {
        dispatcher_write_callback_t write_callback;                                             /**< write callback                     */
        dispatcher_err_callback_t err_callback;                                                 /**< error callback                     */
        void *user_data;                                                                                                /**< user's data for callbacks */
+       off_t offset;                                                                                                   /**< for sendfile use           */
+       size_t file_size;
+       int sendfile_fd;
+       gboolean in_sendfile;                                                                                   /**< whether buffer is in sendfile mode */
+#ifndef HAVE_SENDFILE
+       void *map;
+#endif
 } rspamd_io_dispatcher_t;
 
 /**
@@ -86,6 +94,14 @@ gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
                                                                                                  void *data,
                                                                                                  size_t len, gboolean delayed, gboolean allocated);
 
+/**
+ * Send specified descriptor to dispatcher
+ * @param d pointer to dispatcher's object
+ * @param fd descriptor of file
+ * @param len length of data
+ */
+gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len);
+
 /**
  * Pause IO events on dispatcher
  * @param d pointer to dispatcher's object
index 0a18bd793dce7fdd118fb3f47025df2d781d3036..c6f936087277731d54b7998c1efc52aae9dcdf31 100644 (file)
@@ -310,7 +310,12 @@ continue_process_filters (struct worker_task *task)
        /* Process all statfiles */
        process_statfiles (task);
        /* XXX: ugly direct call */
-       task->dispatcher->write_callback (task);
+       if (task->fin_callback) {
+               task->fin_callback (task->fin_arg);
+       }
+       else {
+               task->dispatcher->write_callback (task);
+       }
        return 1;
 }
 
index 9371652d329a8695101fd9f25bb47fc0484d29e4..d5f9714683cd47be511dfeb0fa73036e0a7b6a97 100644 (file)
@@ -225,6 +225,8 @@ struct worker_task {
        gboolean view_checked;
        gboolean pass_all_filters;                                                                      /**< pass task throught every rule                                      */
        uint32_t parser_recursion;                                                                      /**< for avoiding recursion stack overflow                      */
+       gboolean (*fin_callback)(void *arg);                                            /**< calback for filters finalizing                                     */
+       void *fin_arg;                                                                                          /**< argument for fin callback                                          */
 };
 
 /**
index 1bf135d3090ff6360c8b90e2b4c8f0be6b09f5b3..3f366f7fd7a761a71fadc7e8d432ee8bb6212777 100644 (file)
@@ -29,6 +29,8 @@
 #include "smtp.h"
 #include "smtp_proto.h"
 #include "map.h"
+#include "message.h"
+#include "settings.h"
 #include "evdns/evdns.h"
 
 /* Max line size as it is defined in rfc2822 */
@@ -39,6 +41,9 @@
 #define DEFAULT_UPSTREAM_DEAD_TIME 300
 #define DEFAULT_UPSTREAM_MAXERRORS 10
 
+
+#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected"
+
 static gboolean smtp_write_socket (void *arg);
 
 static sig_atomic_t                    wanna_die = 0;
@@ -89,6 +94,9 @@ free_smtp_session (gpointer arg)
                }
                memory_pool_delete (session->pool);
                close (session->sock);
+               if (session->temp_fd != -1) {
+                       close (session->temp_fd);
+               }
                g_free (session);
        }
 }
@@ -146,7 +154,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
        }
        /* Create a dispatcher for upstream connection */
        session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE, 
-                                                       smtp_upstream_read_socket, NULL, smtp_upstream_err_socket, 
+                                                       smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket, 
                                                        &session->ctx->smtp_timeout, session);
        session->state = SMTP_STATE_WAIT_UPSTREAM;
        session->upstream_state = SMTP_STATE_GREETING;
@@ -159,6 +167,8 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
 {
        /* XXX: write dialog implementation */
        struct smtp_command             *cmd;
+       char                             outbuf[BUFSIZ];
+       int                              r;
        
        if (! parse_smtp_command (session, line, &cmd)) {
                session->error = SMTP_ERROR_BAD_COMMAND;
@@ -201,10 +211,22 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
                        if (session->state == SMTP_STATE_RCPT) {
                                if (parse_smtp_rcpt (session, cmd)) {
                                        /* Make upstream connection */
-                                       if (!create_smtp_upstream_connection (session)) {
-                                               session->error = SMTP_ERROR_UPSTREAM;
-                                               session->state = SMTP_STATE_CRITICAL_ERROR;
-                                               return FALSE;
+                                       if (session->upstream == NULL) {
+                                               if (!create_smtp_upstream_connection (session)) {
+                                                       session->error = SMTP_ERROR_UPSTREAM;
+                                                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                                                       return FALSE;
+                                               }
+                                       }
+                                       else {
+                                               /* Send next rcpt to upstream */
+                                               session->state = SMTP_STATE_WAIT_UPSTREAM;
+                                               session->upstream_state = SMTP_STATE_BEFORE_DATA;
+                                               rspamd_dispatcher_restore (session->upstream_dispatcher);
+                                               r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: ");
+                                               r += smtp_upstream_write_list (session->rcpt->data, outbuf + r, sizeof (outbuf) - r);
+                                               session->cur_rcpt = NULL;
+                                               return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
                                        }
                                        session->state = SMTP_STATE_WAIT_UPSTREAM;
                                        return TRUE;
@@ -222,6 +244,10 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
                        if (session->rcpt) {
                                g_list_free (session->rcpt);
                        }
+                       if (session->upstream) {
+                               remove_normal_event (session->s, smtp_upstream_finalize_connection, session);
+                               session->upstream = NULL;
+                       }
                        session->state = SMTP_STATE_GREETING; 
                        break;
                case SMTP_COMMAND_DATA:
@@ -230,7 +256,19 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
                                        session->error = SMTP_ERROR_RECIPIENTS;
                                        return FALSE;
                                }
-                               session->error = SMTP_ERROR_DATA_OK;
+                               if (session->upstream == NULL) {
+                                       session->error = SMTP_ERROR_UPSTREAM;
+                                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                                       return FALSE;
+                               }
+                               else {
+                                       session->upstream_state = SMTP_STATE_DATA;
+                                       rspamd_dispatcher_restore (session->upstream_dispatcher);
+                                       r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
+                                       session->state = SMTP_STATE_WAIT_UPSTREAM;
+                                       session->error = SMTP_ERROR_DATA_OK;
+                                       return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
+                               }
                        }
                        else {
                                goto improper_sequence;
@@ -250,6 +288,88 @@ improper_sequence:
        return FALSE;
 }
 
+static gboolean
+smtp_send_upstream_message (struct smtp_session *session)
+{
+       rspamd_dispatcher_pause (session->dispatcher);
+       rspamd_dispatcher_restore (session->upstream_dispatcher);
+       
+       if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) {
+               goto err;
+       }
+       session->upstream_state = SMTP_STATE_IN_SENDFILE;
+       session->state = SMTP_STATE_WAIT_UPSTREAM;
+       return TRUE;
+
+err:
+       session->error = SMTP_ERROR_FILE;
+       session->state = SMTP_STATE_CRITICAL_ERROR;
+       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+       destroy_session (session->s);
+       return FALSE;
+}
+
+static gboolean
+process_smtp_data (struct smtp_session *session)
+{
+       struct stat                     st;
+       int                             r;
+
+       if (fstat (session->temp_fd, &st) == -1) {
+               goto err;
+       }
+       /* Now mmap temp file if it is small enough */
+       session->temp_size = st.st_size;
+       if (session->ctx->max_size == 0 || st.st_size < session->ctx->max_size) {
+               session->task = construct_task (session->worker);
+               session->task->fin_callback = smtp_write_socket;
+               session->task->fin_arg = session;
+               session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t));
+#ifdef HAVE_MMAP_NOCORE
+               if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
+#else
+               if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
+#endif
+                       goto err;
+               }
+               session->task->msg->len = st.st_size;
+               if (process_message (session->task) == -1) {
+                       msg_err ("cannot process message");
+                       munmap (session->task->msg->begin, st.st_size);
+                       goto err;
+               }
+               session->task->helo = session->helo;
+               memcpy (&session->task->from_addr, &session->client_addr, sizeof (struct in_addr));
+               session->task->cmd = CMD_CHECK;
+               r = process_filters (session->task);
+               if (r == -1) {
+                       munmap (session->task->msg->begin, st.st_size);
+                       msg_err ("cannot process filters");
+                       goto err;
+               }
+               else if (r == 0) {
+                       session->state = SMTP_STATE_END;
+                       rspamd_dispatcher_pause (session->dispatcher);
+               }
+               else {
+                       process_statfiles (session->task);
+                       session->state = SMTP_STATE_END;
+                       return smtp_write_socket (session);
+               }
+       }
+       else {
+               return smtp_send_upstream_message (session);
+       }
+
+       return TRUE;
+err:
+       session->error = SMTP_ERROR_FILE;
+       session->state = SMTP_STATE_CRITICAL_ERROR;
+       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+       destroy_session (session->s);
+       return FALSE;
+}
+
 /*
  * Callback that is called when there is data to read in buffer
  */
@@ -257,6 +377,8 @@ static                          gboolean
 smtp_read_socket (f_str_t * in, void *arg)
 {
        struct smtp_session            *session = arg;
+       char                           *p;
+       gboolean                        do_write;
 
        switch (session->state) {
                case SMTP_STATE_RESOLVE_REVERSE:
@@ -275,6 +397,69 @@ smtp_read_socket (f_str_t * in, void *arg)
                                smtp_write_socket (session);
                        }
                        break;
+               case SMTP_STATE_AFTER_DATA:
+                       if (in->len == 0) {
+                               return TRUE;
+                       }
+                       p = in->begin + in->len;
+                       do_write = TRUE;
+                       if (in->len > sizeof (session->data_end)) {
+                               /* New data is more than trailer buffer */
+                               if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) {
+                                       msg_err ("cannot write to temp file: %s", strerror (errno));
+                                       session->error = SMTP_ERROR_FILE;
+                                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                                       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                                       destroy_session (session->s);
+                                       return FALSE;
+                               }
+                               memcpy (session->data_end, p - sizeof (session->data_end) + 1, sizeof (session->data_end));
+                               session->data_idx = 5;
+                       }
+                       else if (session->data_idx + in->len < sizeof (session->data_end)){
+                               /* New data is less than trailer buffer plus index */
+                               memcpy (session->data_end + session->data_idx, in->begin, in->len);
+                               session->data_idx += in->len;
+                               do_write = FALSE;
+                       }
+                       else {
+                               /* Save remaining bytes */
+                               if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) {
+                                       msg_err ("cannot write to temp file: %s", strerror (errno));
+                                       session->error = SMTP_ERROR_FILE;
+                                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                                       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                                       destroy_session (session->s);
+                                       return FALSE;
+                               }
+                               /* Move bytes */
+                               session->data_idx = sizeof (session->data_end) - in->len;
+                               memmove (session->data_end, session->data_end + (sizeof (session->data_end) - in->len) + 1, sizeof (session->data_end) - in->len);
+                               memcpy (session->data_end + session->data_idx, in->begin, in->len);
+                               session->data_idx = 5;
+                       }
+                       if (do_write) {
+                               if (memcmp (session->data_end, DATA_END_TRAILER, sizeof (session->data_end)) == 0) {
+                                       return process_smtp_data (session);
+                               }
+                               else {
+                                       if (session->data_idx < in->len) {
+                                               if (in->len - session->data_idx != 0 && 
+                                                               write (session->temp_fd, in->begin, in->len - session->data_idx) != in->len - session->data_idx) {
+                                                       msg_err ("cannot write to temp file: %s", strerror (errno));
+                                                       session->error = SMTP_ERROR_FILE;
+                                                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                                                       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                                                       destroy_session (session->s);
+                                                       return FALSE;
+                                               }
+                                       }
+                               }
+                       }
+                       break;
+               case SMTP_STATE_WAIT_UPSTREAM:
+                       rspamd_dispatcher_pause (session->dispatcher);
+                       break;
                default:
                        session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0");
                        session->state = SMTP_STATE_ERROR;
@@ -299,6 +484,13 @@ static                          gboolean
 smtp_write_socket (void *arg)
 {
        struct smtp_session            *session = arg;
+       double                          ms = 0, rs = 0;
+       int                             r;
+       struct metric_result           *metric_res;
+       struct metric                  *m;
+       char                            logbuf[1024];
+       gboolean                        is_spam = FALSE;
+       GList                          *symbols, *cur;  
 
        if (session->state == SMTP_STATE_CRITICAL_ERROR) {
                if (session->error != NULL) {
@@ -307,6 +499,47 @@ smtp_write_socket (void *arg)
                destroy_session (session->s);
                return FALSE;
        }
+       else if (session->state == SMTP_STATE_END) {
+               /* Check metric */
+               m = g_hash_table_lookup (session->cfg->metrics, session->ctx->metric);
+               metric_res = g_hash_table_lookup (session->task->results, session->ctx->metric);
+               if (m != NULL && metric_res != NULL) {
+                       if (!check_metric_settings (session->task, m, &ms, &rs)) {
+                               ms = m->required_score;
+                               rs = m->reject_score;
+                       }
+                       if (metric_res->score >= ms) {
+                               is_spam = TRUE;
+                       }
+
+                       r = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", session->task->message_id);
+                       r += snprintf (logbuf + r, sizeof (logbuf) - r, "(%s: %s: [%.2f/%.2f/%.2f] [", 
+                                       (char *)m->name, is_spam ? "T" : "F", metric_res->score, ms, rs);
+                       symbols = g_hash_table_get_keys (metric_res->symbols);
+                       cur = symbols;
+                       while (cur) {
+                               if (g_list_next (cur) != NULL) {
+                                       r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s,", (char *)cur->data);
+                               }
+                               else {
+                                       r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s", (char *)cur->data);
+                               }
+                               cur = g_list_next (cur);
+                       }
+                       g_list_free (symbols);
+                       r += snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %ld, time: %sms",
+                               (long int)session->task->msg->len, calculate_check_time (&session->task->ts, session->cfg->clock_res));
+                       msg_info ("%s", logbuf);
+
+                       if (is_spam) {
+                               rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+                               destroy_session (session->s);
+                               return FALSE;
+                       }
+               }
+               return smtp_send_upstream_message (session);
+       }
        else {
                if (session->error != NULL) {
                        rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
@@ -501,8 +734,10 @@ accept_socket (int fd, short what, void *arg)
        }
 
        session->sock = nfd;
+       session->temp_fd = -1;
        session->worker = worker;
        session->ctx = worker->ctx;
+       session->cfg = worker->srv->cfg;
        session->session_time = time (NULL);
        worker->srv->stat->connections_count++;
 
@@ -762,6 +997,18 @@ config_smtp_worker (struct rspamd_worker *worker)
        if ((value = g_hash_table_lookup (worker->cf->params, "smtp_capabilities")) != NULL) {
                make_capabilities (ctx, value);
        }
+       if ((value = g_hash_table_lookup (worker->cf->params, "smtp_metric")) != NULL) {
+               ctx->metric = memory_pool_strdup (ctx->pool, value);
+       }
+       else {
+               ctx->metric = DEFAULT_METRIC;
+       }
+       if ((value = g_hash_table_lookup (worker->cf->params, "smtp_reject_message")) != NULL) {
+               ctx->reject_message = memory_pool_strdup (ctx->pool, value);
+       }
+       else {
+               ctx->reject_message = DEFAULT_REJECT_MESSAGE;
+       }
 
        /* Set ctx */
        worker->ctx = ctx;
index 36319bb4c31c1d4bb34dfb7b0e2d2009b86f93f7..4208df4f213ba34ddc957e62472debd1bffccdb0 100644 (file)
@@ -29,6 +29,9 @@ struct smtp_worker_ctx {
        gboolean use_xclient;
        gboolean helo_required;
        char *smtp_capabilities;
+       char *reject_message;
+       size_t max_size;
+       char *metric;
 };
 
 enum rspamd_smtp_state {
@@ -41,9 +44,10 @@ enum rspamd_smtp_state {
        SMTP_STATE_RCPT,
        SMTP_STATE_BEFORE_DATA,
        SMTP_STATE_DATA,
-       SMTP_STATE_EOD,
+       SMTP_STATE_AFTER_DATA,
        SMTP_STATE_END,
        SMTP_STATE_WAIT_UPSTREAM,
+       SMTP_STATE_IN_SENDFILE,
        SMTP_STATE_ERROR,
        SMTP_STATE_CRITICAL_ERROR,
        SMTP_STATE_WRITE_ERROR
@@ -51,6 +55,7 @@ enum rspamd_smtp_state {
 
 struct smtp_session {
        struct smtp_worker_ctx *ctx;
+       struct config_file *cfg;
        memory_pool_t *pool;
 
        enum rspamd_smtp_state state;
@@ -62,6 +67,8 @@ struct smtp_session {
        char *error;
        int sock;
        int upstream_sock;
+       int temp_fd;
+       size_t temp_size;
        time_t session_time;
 
        gchar *helo;
@@ -74,6 +81,10 @@ struct smtp_session {
        rspamd_io_dispatcher_t *upstream_dispatcher;
 
        struct smtp_upstream *upstream;
+
+       char data_end[5];
+       char data_idx;
+
        gboolean resolved;
        gboolean esmtp;
 };
index 82fffa690f0256920bd9e2f10358e2d09ebf0b61..bef52b6b55898e13ace9012c64a82f7d10e05674 100644 (file)
@@ -163,7 +163,7 @@ parse_smtp_command (struct smtp_session *session, f_str_t *line, struct smtp_com
                                break;
                        case SMTP_PARSE_ARGUMENT:
                                if (ch == ' ' || ch == ':' || ch == CR || ch == LF || i == line->len - 1) {
-                                       if (i == line->len - 1) {
+                                       if (i == line->len - 1 && (ch != ' ' && ch != CR && ch != LF)) {
                                                p ++;
                                        }
                                        arg->len = p - c;
@@ -329,14 +329,14 @@ parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd)
 
 /* Return -1 if there are some error, 1 if all is ok and 0 in case of incomplete reply */
 static int
-check_smtp_ustream_reply (f_str_t *in)
+check_smtp_ustream_reply (f_str_t *in, char success_code)
 {
        char                           *p;
 
        /* Check for 250 at the begin of line */
        if (in->len >= sizeof ("220 ") - 1) {
                p = in->begin;
-               if (p[0] == '2') {
+               if (p[0] == success_code) {
                        /* Last reply line */
                        if (p[3] == ' ') {
                                return 1;
@@ -353,7 +353,7 @@ check_smtp_ustream_reply (f_str_t *in)
        return -1;
 }
 
-static size_t
+size_t
 smtp_upstream_write_list (GList *args, char *buf, size_t buflen)
 {
        GList                          *cur = args;
@@ -373,23 +373,36 @@ smtp_upstream_write_list (GList *args, char *buf, size_t buflen)
        return r;
 }
 
+gboolean 
+smtp_upstream_write_socket (void *arg)
+{
+       struct smtp_session            *session = arg;
+       
+       if (session->upstream_state == SMTP_STATE_IN_SENDFILE) {
+               session->upstream_state = SMTP_STATE_END;
+               return rspamd_dispatcher_write (session->upstream_dispatcher, DATA_END_TRAILER, sizeof (DATA_END_TRAILER) - 1, FALSE, TRUE);
+       }
+
+       return TRUE;
+}
+
 gboolean 
 smtp_upstream_read_socket (f_str_t * in, void *arg)
 {
        struct smtp_session            *session = arg;
-       char                            outbuf[BUFSIZ];
+       char                            outbuf[BUFSIZ], *tmppattern;
        int                             r;
        
        switch (session->upstream_state) {
                case SMTP_STATE_GREETING:
-                       r = check_smtp_ustream_reply (in);
+                       r = check_smtp_ustream_reply (in, '2');
                        if (r == -1) {
-                               session->error = memory_pool_alloc (session->pool, in->len + 3);
+                               session->error = memory_pool_alloc (session->pool, in->len + 1);
                                g_strlcpy (session->error, in->begin, in->len + 1);
                                /* XXX: assume upstream errors as critical errors */
                                session->state = SMTP_STATE_CRITICAL_ERROR;
                                rspamd_dispatcher_restore (session->dispatcher);
-                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
                                rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
                                destroy_session (session->s);
                                return FALSE;
@@ -417,14 +430,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
                        }
                        break;
                case SMTP_STATE_HELO:
-                       r = check_smtp_ustream_reply (in);
+                       r = check_smtp_ustream_reply (in, '2');
                        if (r == -1) {
                                session->error = memory_pool_alloc (session->pool, in->len + 1);
                                g_strlcpy (session->error, in->begin, in->len + 1);
                                /* XXX: assume upstream errors as critical errors */
                                session->state = SMTP_STATE_CRITICAL_ERROR;
                                rspamd_dispatcher_restore (session->dispatcher);
-                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
                                rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
                                destroy_session (session->s);
                                return FALSE;
@@ -443,14 +456,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
                        }
                        break;
                case SMTP_STATE_FROM:
-                       r = check_smtp_ustream_reply (in);
+                       r = check_smtp_ustream_reply (in, '2');
                        if (r == -1) {
                                session->error = memory_pool_alloc (session->pool, in->len + 1);
                                g_strlcpy (session->error, in->begin, in->len + 1);
                                /* XXX: assume upstream errors as critical errors */
                                session->state = SMTP_STATE_CRITICAL_ERROR;
                                rspamd_dispatcher_restore (session->dispatcher);
-                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
                                rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
                                destroy_session (session->s);
                                return FALSE;
@@ -463,14 +476,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
                        }
                        break;
                case SMTP_STATE_RCPT:
-                       r = check_smtp_ustream_reply (in);
+                       r = check_smtp_ustream_reply (in, '2');
                        if (r == -1) {
                                session->error = memory_pool_alloc (session->pool, in->len + 1);
                                g_strlcpy (session->error, in->begin, in->len + 1);
                                /* XXX: assume upstream errors as critical errors */
                                session->state = SMTP_STATE_CRITICAL_ERROR;
                                rspamd_dispatcher_restore (session->dispatcher);
-                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
                                rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
                                destroy_session (session->s);
                                return FALSE;
@@ -485,14 +498,20 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
                        }
                        break;
                case SMTP_STATE_BEFORE_DATA:
-                       r = check_smtp_ustream_reply (in);
+                       r = check_smtp_ustream_reply (in, '2');
                        if (r == -1) {
                                session->error = memory_pool_alloc (session->pool, in->len + 1);
                                g_strlcpy (session->error, in->begin, in->len + 1);
                                rspamd_dispatcher_restore (session->dispatcher);
-                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
                                rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
-                               session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt);
+                               if (session->cur_rcpt) {
+                                       session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt);
+                               }
+                               else {
+                                       session->rcpt = g_list_delete_link (session->rcpt, session->rcpt);
+                               }
+                               session->state = SMTP_STATE_RCPT;
                                return TRUE;
                        }
                        else if (r == 1) {
@@ -500,6 +519,7 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
                                        r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: ");
                                        r += smtp_upstream_write_list (session->cur_rcpt, outbuf + r, sizeof (outbuf) - r);
                                        session->cur_rcpt = g_list_next (session->cur_rcpt);
+                                       rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
                                }
                                else {
                                        session->upstream_state = SMTP_STATE_DATA;
@@ -508,15 +528,68 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
                                session->error = memory_pool_alloc (session->pool, in->len + 1);
                                g_strlcpy (session->error, in->begin, in->len + 1);
                                /* Write to client */
-                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
                                rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
                                if (session->state == SMTP_STATE_WAIT_UPSTREAM) {
                                        rspamd_dispatcher_restore (session->dispatcher);
                                        session->state = SMTP_STATE_RCPT;
                                }
-                               return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
                        }
                        break;
+               case SMTP_STATE_DATA:
+                       r = check_smtp_ustream_reply (in, '3');
+                       if (r == -1) {
+                               session->error = memory_pool_alloc (session->pool, in->len + 1);
+                               g_strlcpy (session->error, in->begin, in->len + 1);
+                               /* XXX: assume upstream errors as critical errors */
+                               session->state = SMTP_STATE_CRITICAL_ERROR;
+                               rspamd_dispatcher_restore (session->dispatcher);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+                               destroy_session (session->s);
+                               return FALSE;
+                       }
+                       else if (r == 1) {
+                               r = strlen (session->cfg->temp_dir) + sizeof ("/rspamd-XXXXXX.tmp");
+                               tmppattern = alloca (r);
+                               snprintf (tmppattern, r, "%s/rspamd-XXXXXX.tmp", session->cfg->temp_dir);
+                               session->temp_fd = g_mkstemp_full (tmppattern, O_RDWR, S_IWUSR | S_IRUSR);
+                               if (session->temp_fd == -1) {
+                                       session->error = SMTP_ERROR_FILE;
+                                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                                       rspamd_dispatcher_restore (session->dispatcher);
+                                       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                                       destroy_session (session->s);
+                                       return FALSE;
+                               }
+                               session->state = SMTP_STATE_AFTER_DATA;
+                               session->error = SMTP_ERROR_DATA_OK;
+                               rspamd_dispatcher_restore (session->dispatcher);
+                               rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                               rspamd_dispatcher_pause (session->upstream_dispatcher);
+                               rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_ANY, 0);
+                               session->data_idx = 0;
+                               memset (session->data_end, 0, sizeof (session->data_end));
+                               return TRUE;
+                       }
+                       break;
+               case SMTP_STATE_END:
+                       session->error = memory_pool_alloc (session->pool, in->len + 1);
+                       g_strlcpy (session->error, in->begin, in->len + 1);
+                       session->state = SMTP_STATE_END;
+                       rspamd_dispatcher_restore (session->dispatcher);
+                       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                       rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+                       destroy_session (session->s);
+                       return FALSE;
+               default:
+                       msg_err ("got upstream reply at unexpected state: %d, reply: %V", session->upstream_state, in);
+                       session->state = SMTP_STATE_CRITICAL_ERROR;
+                       rspamd_dispatcher_restore (session->dispatcher);
+                       rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+                       rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+                       destroy_session (session->s);
+                       return FALSE;
        }
 
        return TRUE;
index c78cfb0948e94d5c76d7820b6fde48499e62d4f5..e850e4a5d7640db016e54ece8da8be78a40f6626 100644 (file)
 #define SMTP_ERROR_RECIPIENTS "554 No valid recipients" CRLF
 #define SMTP_ERROR_UNIMPLIMENTED "502 Command not implemented" CRLF
 #define SMTP_ERROR_UPSTREAM "421 Service not available, closing transmission channel" CRLF
+#define SMTP_ERROR_FILE "420 Service not available, filesystem error" CRLF
 #define SMTP_ERROR_OK "250 Requested mail action okay, completed" CRLF
 #define SMTP_ERROR_DATA_OK "354 Start mail input; end with <CRLF>.<CRLF>" CRLF
 
+#define DATA_END_TRAILER CRLF "." CRLF
+
 
 struct smtp_command {
        enum {
@@ -40,7 +43,9 @@ gboolean parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd
 
 /* Upstream SMTP */
 gboolean smtp_upstream_read_socket (f_str_t * in, void *arg);
+gboolean smtp_upstream_write_socket (void *arg);
 void smtp_upstream_err_socket (GError *err, void *arg);
 void smtp_upstream_finalize_connection (gpointer data);
+size_t smtp_upstream_write_list (GList *args, char *buf, size_t buflen);
 
 #endif