]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add scanning support for milter protocol
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 6 May 2017 18:21:09 +0000 (19:21 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 6 May 2017 18:21:09 +0000 (19:21 +0100)
src/libserver/milter.c
src/rspamd_proxy.c

index 15835a8017fc51e9921a0a6026b17e5307a281f0..576fb644cf5cd8b1d88522237fb9ee58d73604dd 100644 (file)
@@ -67,11 +67,11 @@ rspamd_milter_obuf_free (struct rspamd_milter_outbuf *obuf)
 }
 
 #define RSPAMD_MILTER_RESET_COMMON (1 << 0)
-#define RSPAMD_MILTER_RESET_OUT (1 << 1)
+#define RSPAMD_MILTER_RESET_IO (1 << 1)
 #define RSPAMD_MILTER_RESET_ADDR (1 << 2)
 #define RSPAMD_MILTER_RESET_MACRO (1 << 3)
 #define RSPAMD_MILTER_RESET_ALL (RSPAMD_MILTER_RESET_COMMON | \
-       RSPAMD_MILTER_RESET_OUT | \
+       RSPAMD_MILTER_RESET_IO | \
        RSPAMD_MILTER_RESET_ADDR | \
        RSPAMD_MILTER_RESET_MACRO)
 #define RSPAMD_MILTER_RESET_QUIT_NC (RSPAMD_MILTER_RESET_COMMON | \
@@ -88,19 +88,19 @@ rspamd_milter_session_reset (struct rspamd_milter_session *session,
        struct rspamd_email_address *cur;
        guint i;
 
-       if (how & RSPAMD_MILTER_RESET_OUT) {
+       if (how & RSPAMD_MILTER_RESET_IO) {
                DL_FOREACH_SAFE (priv->out_chain, obuf, obuf_tmp) {
                        rspamd_milter_obuf_free (obuf);
                }
 
                priv->out_chain = NULL;
-       }
 
-       if (how & RSPAMD_MILTER_RESET_COMMON) {
                if (priv->parser.buf) {
                        priv->parser.buf->len = 0;
                }
+       }
 
+       if (how & RSPAMD_MILTER_RESET_COMMON) {
                if (session->message) {
                        session->message->len = 0;
                }
@@ -173,10 +173,6 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session)
                        rspamd_fstring_free (session->hostname);
                }
 
-               if (priv->fd) {
-                       close (priv->fd);
-               }
-
                g_free (session);
        }
 }
@@ -585,6 +581,8 @@ rspamd_milter_process_command (struct rspamd_milter_session *session,
        case RSPAMD_MILTER_CMD_QUIT:
                msg_debug_milter ("quit command");
                priv->state = RSPAMD_MILTER_WANNA_DIE;
+               REF_RETAIN (session);
+               priv->fin_cb (priv->fd, session, priv->ud);
                REF_RELEASE (session);
                break;
        case RSPAMD_MILTER_CMD_RCPT:
@@ -631,7 +629,6 @@ rspamd_milter_process_command (struct rspamd_milter_session *session,
                        session->message = rspamd_fstring_sized_new (
                                        RSPAMD_MILTER_MESSAGE_CHUNK);
                }
-
                msg_debug_milter ("got data command");
                /* We do not need reply as specified */
                break;
@@ -681,6 +678,7 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
        end = priv->parser.buf->str + priv->parser.buf->len;
 
        while (p < end) {
+               msg_debug_milter("offset: %d, state: %d", (gint)(p - (const guchar *)priv->parser.buf->str), priv->parser.state);
                switch (priv->parser.state) {
                case st_len_1:
                        /* The first length byte in big endian order */
@@ -746,10 +744,10 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
                                return FALSE;
                        }
                        if (priv->parser.buf->allocated < priv->parser.datalen) {
+                               priv->parser.pos = p - (const guchar *)priv->parser.buf->str;
                                priv->parser.buf = rspamd_fstring_grow (priv->parser.buf,
-                                               priv->parser.pos + priv->parser.datalen);
+                                               priv->parser.buf->len + priv->parser.datalen);
                                /* This can realloc buffer */
-                               p = priv->parser.buf->str + priv->parser.pos;
                                rspamd_milter_plan_io (session, priv, EV_READ);
                                goto end;
                        }
@@ -768,6 +766,7 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
                                }
                                else {
                                        /* Need to read more */
+                                       priv->parser.pos = p - (const guchar *)priv->parser.buf->str;
                                        rspamd_milter_plan_io (session, priv, EV_READ);
                                        goto end;
                                }
@@ -775,9 +774,38 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
                        break;
                }
        }
+
+       /* Leftover */
+       switch (priv->parser.state) {
+       case st_read_data:
+               if (p + priv->parser.datalen <= end) {
+                       if (!rspamd_milter_process_command (session, priv)) {
+                               return FALSE;
+                       }
+
+                       priv->parser.state = st_len_1;
+                       priv->parser.cur_cmd = '\0';
+                       priv->parser.cmd_start = 0;
+               }
+               break;
+       default:
+               /* No need to do anything */
+               break;
+       }
+
+       if (p == end) {
+               priv->parser.buf->len = 0;
+               priv->parser.pos = 0;
+       }
+
+       if (priv->out_chain) {
+               rspamd_milter_plan_io (session, priv, EV_READ|EV_WRITE);
+       }
+       else {
+               rspamd_milter_plan_io (session, priv, EV_READ);
+       }
 end:
 
-       priv->parser.pos = p - (const guchar *)priv->parser.buf->str;
        return TRUE;
 }
 
@@ -801,6 +829,9 @@ rspamd_milter_handle_session (struct rspamd_milter_session *session,
                r = read (priv->fd, priv->parser.buf->str + priv->parser.buf->len,
                                priv->parser.buf->allocated - priv->parser.buf->len);
 
+               msg_debug_milter ("read %z bytes, %z remain, %z allocated",
+                               r, priv->parser.buf->len, priv->parser.buf->allocated);
+
                if (r == -1) {
                        if (errno == EAGAIN || errno == EINTR) {
                                rspamd_milter_plan_io (session, priv, EV_READ);
index 6eabcf109e293bfde9d005f0e4d5c99d276cc7f6..5728419b95c93c918cf31b4ca4e7f666258df7bf 100644 (file)
@@ -859,6 +859,10 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
                rspamd_http_connection_unref (session->client_conn);
        }
 
+       if (session->client_milter_conn) {
+               rspamd_milter_session_unref (session->client_milter_conn);
+       }
+
        for (i = 0; i < session->mirror_conns->len; i ++) {
                conn = g_ptr_array_index (session->mirror_conns, i);
 
@@ -1142,12 +1146,18 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code,
 {
        struct rspamd_http_message *reply;
 
-       reply = rspamd_http_new_message (HTTP_RESPONSE);
-       reply->code = code;
-       reply->status = rspamd_fstring_new_init (status, strlen (status));
-       rspamd_http_connection_write_message (session->client_conn,
-                       reply, NULL, NULL, session, session->client_sock,
-                       &session->ctx->io_tv, session->ctx->ev_base);
+       if (session->client_milter_conn) {
+               rspamd_milter_send_action (session->client_milter_conn,
+                               RSPAMD_MILTER_TEMPFAIL);
+       }
+       else {
+               reply = rspamd_http_new_message (HTTP_RESPONSE);
+               reply->code = code;
+               reply->status = rspamd_fstring_new_init (status, strlen (status));
+               rspamd_http_connection_write_message (session->client_conn,
+                               reply, NULL, NULL, session, session->client_sock,
+                               &session->ctx->io_tv, session->ctx->ev_base);
+       }
 }
 
 static void
@@ -1180,7 +1190,8 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
                else {
                        msg_info_session ("retry connection to: %s"
                                        " retries left: %d",
-                                       rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
+                                       rspamd_inet_address_to_string (
+                                                       rspamd_upstream_addr (session->master_conn->up)),
                                        session->ctx->max_retries - session->retries);
                }
        }
@@ -1223,9 +1234,18 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
 
        rspamd_upstream_ok (bk_conn->up);
 
-       rspamd_http_connection_write_message (session->client_conn,
-                       msg, NULL, NULL, session, session->client_sock,
-                       bk_conn->io_tv, session->ctx->ev_base);
+       if (session->client_milter_conn) {
+               /*
+                * TODO: convert reply to milter reply
+                */
+               rspamd_milter_send_action (session->client_milter_conn,
+                               RSPAMD_MILTER_ACCEPT);
+       }
+       else {
+               rspamd_http_connection_write_message (session->client_conn,
+                               msg, NULL, NULL, session, session->client_sock,
+                               bk_conn->io_tv, session->ctx->ev_base);
+       }
 
        return 0;
 }
@@ -1262,15 +1282,27 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task)
                break;
        }
 
-       rspamd_http_connection_reset (session->client_conn);
        session->master_conn->flags |= RSPAMD_BACKEND_CLOSED;
        session->master_conn->results = rep;
-       rspamd_http_connection_write_message (session->client_conn, msg, NULL,
-                       ctype,
-                       session,
-                       session->client_sock,
-                       NULL,
-                       session->ctx->ev_base);
+
+       if (session->client_milter_conn) {
+               /*
+                * TODO: convert reply to milter reply
+                */
+               rspamd_milter_send_action (session->client_milter_conn,
+                               RSPAMD_MILTER_ACCEPT);
+       }
+       else {
+               rspamd_http_connection_reset (session->client_conn);
+               rspamd_http_connection_write_message (session->client_conn,
+                               msg,
+                               NULL,
+                               ctype,
+                               session,
+                               session->client_sock,
+                               NULL,
+                               session->ctx->ev_base);
+       }
 }
 
 static gboolean
@@ -1548,6 +1580,32 @@ proxy_milter_finish_handler (gint fd,
                void *ud)
 {
        struct rspamd_proxy_session *session = ud;
+       struct rspamd_http_message *msg;
+
+       if (!session->master_conn) {
+               session->client_milter_conn = rms;
+               msg = rspamd_milter_to_http (rms);
+               session->master_conn = rspamd_mempool_alloc0 (session->pool,
+                               sizeof (*session->master_conn));
+               session->master_conn->s = session;
+               session->master_conn->name = "master";
+               session->client_message = msg;
+
+               if (msg->body_buf.len == 0) {
+                       msg_info_session ("incomplete master connection");
+                       proxy_backend_close_connection (session->master_conn);
+                       REF_RELEASE (session);
+               }
+               else {
+                       proxy_open_mirror_connections (session);
+                       proxy_send_master_message (session);
+               }
+       }
+       else {
+               msg_info_session ("finished master connection");
+               proxy_backend_close_connection (session->master_conn);
+               REF_RELEASE (session);
+       }
 }
 
 static void