]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* Add initial LMTP support and LDA delivery to rspamd
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 24 Feb 2009 17:16:53 +0000 (20:16 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 24 Feb 2009 17:16:53 +0000 (20:16 +0300)
17 files changed:
CMakeLists.txt
config.h.in
rspamd.conf.sample
src/cfg_file.h
src/cfg_file.l
src/cfg_file.y
src/cfg_utils.c
src/controller.c
src/fstring.c
src/fstring.h
src/lmtp.c [new file with mode: 0644]
src/lmtp.h [new file with mode: 0644]
src/lmtp_proto.c [new file with mode: 0644]
src/lmtp_proto.h [new file with mode: 0644]
src/main.c
src/main.h
src/worker.c

index 8fedd0bff56f72cce04cfba0d5e83b6d52034e5e..e11320de1ca3320ada5dbb75aba94664c6e2d13a 100644 (file)
@@ -145,7 +145,7 @@ CHECK_INCLUDE_FILES(netinet/in.h  HAVE_NETINET_IN_H)
 CHECK_INCLUDE_FILES(arpa/inet.h  HAVE_ARPA_INET_H)
 CHECK_INCLUDE_FILES(netdb.h  HAVE_NETDB_H)
 CHECK_INCLUDE_FILES(syslog.h HAVE_SYSLOG_H)
-
+CHECK_INCLUDE_FILES(libgen.h HAVE_LIBGEN_H)
 
 CHECK_FUNCTION_EXISTS(setproctitle HAVE_SETPROCTITLE)
 CHECK_FUNCTION_EXISTS(getpagesize HAVE_GETPAGESIZE)
@@ -212,7 +212,9 @@ SET(RSPAMDSRC       src/modules.c
                                src/filter.c
                                src/controller.c
                                src/cfg_utils.c
-                               src/buffer.c)
+                               src/buffer.c
+                               src/lmtp.c
+                               src/lmtp_proto.c)
 
 SET(TOKENIZERSSRC  src/tokenizers/tokenizers.c
                                src/tokenizers/osb.c)
index 04dd043d91b093b5030fd04dbcb911e36f99ba47..b77d3145415cea070daba1805fd5a9dcfc1e59e7 100644 (file)
@@ -37,6 +37,8 @@
 
 #cmakedefine HAVE_LIBUTIL_H      1
 
+#cmakedefine HAVE_LIBGEN_H       1
+
 #cmakedefine HAVE_ENDIAN_H       1
 #cmakedefine HAVE_SYS_ENDIAN_H   1
 #cmakedefine HAVE_MACHINE_ENDIAN_H  1
 #include <math.h>
 #endif
 
+/* libutil */
 #ifdef HAVE_LIBUTIL_H
 #include <libutil.h>
 #endif
 
+/* syslog */
 #ifdef HAVE_SYSLOG_H
 #include <syslog.h>
 #endif
 
+#ifdef HAVE_LIBGEN_H
+#include <libgen.h>
+#define HAVE_DIRNAME 1
+#endif
+
 #include <errno.h>
 #include <signal.h>
 #include <event.h>
index f0e2a9411e95291b3fa2373e167b5d65a192265e..0c6ba3189d68a4b574848f9cf08b21b3765ee6e3 100644 (file)
@@ -76,4 +76,14 @@ factors {
        "winnow" = 5.5;
 };
 
+lmtp {
+       enabled = yes;
+       bind_socket = localhost:11335;
+};
+
+delivery {
+       enabled = yes;
+       agent = "/dev/null";
+};
+
 url_filters = "surbl";
index a5d092c73ee9154199db83d6ef568f6d1ed2b5f9..7bacb8aa9f4326944b8bec27541df5c4c2d096c8 100644 (file)
@@ -14,6 +14,7 @@
 
 #define DEFAULT_BIND_PORT 768
 #define DEFAULT_CONTROL_PORT 7608
+#define DEFAULT_LMTP_PORT 7609
 #define MAX_MEMCACHED_SERVERS 48
 #define DEFAULT_MEMCACHED_PORT 11211
 /* Memcached timeouts */
@@ -38,6 +39,16 @@ struct classifier;
 
 enum { VAL_UNDEF=0, VAL_TRUE, VAL_FALSE };
 
+/**
+ * Types of rspamd bind lines
+ */
+enum rspamd_cred_type {
+       CRED_NORMAL,
+       CRED_CONTROL,
+       CRED_LMTP,
+       CRED_DELIVERY,
+};
+
 /**
  * Regexp type: /H - header, /M - mime, /U - url
  */
@@ -161,6 +172,21 @@ struct config_file {
        unsigned int memcached_maxerrors;                               /**< maximum number of errors                                                   */
        unsigned int memcached_connect_timeout;                 /**< connection timeout                                                                 */
 
+       gboolean lmtp_enable;                                                   /**< is lmtp agent is enabled                                                   */
+       char *lmtp_host;                                                                /**< host for lmtp agent                                                                */
+       struct in_addr lmtp_addr;                                               /**< bind address for lmtp                                                              */
+       uint16_t lmtp_port;                                                             /**< bind port for lmtp agent                                                   */
+       uint16_t lmtp_family;                                                   /**< bind family for lmtp agent                                                 */
+       char *lmtp_metric;                                                              /**< metric to use in lmtp module                                               */
+
+       gboolean delivery_enable;                                               /**< is delivery agent is enabled                                               */
+       char *deliver_host;                                                             /**< host for mail deliviring                                                   */
+       struct in_addr deliver_addr;                                    /**< its address                                                                                */
+       uint16_t deliver_port;                                                  /**< port for deliviring                                                                */
+       uint16_t deliver_family;                                                /**< socket family for delivirnig                                               */
+       char *deliver_agent_path;                                               /**< deliver to pipe instead of socket                                  */
+       gboolean deliver_lmtp;                                                  /**< use LMTP instead of SMTP                                                   */
+
        LIST_HEAD (modulesq, perl_module) perl_modules; /**< linked list of perl modules to load                                */
 
        LIST_HEAD (headersq, filter) header_filters;    /**< linked list of all header's filters                                */
@@ -193,10 +219,10 @@ int add_memcached_server (struct config_file *cf, char *str);
  * Parse bind credits
  * @param cf config file to use
  * @param str line that presents bind line
- * @param is_control flag that defines whether this credits are for controller
+ * @param type type of credits
  * @return 1 if line was successfully parsed and 0 in case of error
  */
-int parse_bind_line (struct config_file *cf, char *str, char is_control);
+int parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type);
 
 /**
  * Init default values
index 1845166b2c3824554f33aa76a3b6a45062d8b636..d2810c5bb1ce01f7e25d97cb0e9eb77750d5f74c 100644 (file)
@@ -47,6 +47,10 @@ required_score                                       return REQUIRED_SCORE;
 function                                               return FUNCTION;
 control                                                        return CONTROL;
 password                                               return PASSWORD;
+lmtp                                                   return LMTP;
+enabled                                                        return ENABLED;
+delivery                                               return DELIVERY;
+agent                                                  return AGENT;
 
 statfile                                               return STATFILE;
 alias                                                  return ALIAS;
index 73455976df454888b6e3fb6e35c49c09279d74ee..6667d01a12311165345622decc3ca1e4fad5fd55 100644 (file)
@@ -43,6 +43,7 @@ struct statfile *cur_statfile = NULL;
 %token  LOGGING LOG_TYPE LOG_TYPE_CONSOLE LOG_TYPE_SYSLOG LOG_TYPE_FILE
 %token  LOG_LEVEL LOG_LEVEL_DEBUG LOG_LEVEL_INFO LOG_LEVEL_WARNING LOG_LEVEL_ERROR LOG_FACILITY LOG_FILENAME
 %token  STATFILE ALIAS PATTERN WEIGHT STATFILE_POOL_SIZE SIZE TOKENIZER CLASSIFIER
+%token DELIVERY LMTP ENABLED AGENT
 
 %type  <string>        STRING
 %type  <string>        VARIABLE
@@ -84,6 +85,8 @@ command       :
        | logging
        | statfile
        | statfile_pool_size
+       | lmtp
+       | delivery
        ;
 
 tempdir :
@@ -125,7 +128,7 @@ controlcmd:
 
 controlsock:
        BINDSOCK EQSIGN bind_cred {
-               if (!parse_bind_line (cfg, $3, 1)) {
+               if (!parse_bind_line (cfg, $3, CRED_CONTROL)) {
                        yyerror ("yyparse: parse_bind_line");
                        YYERROR;
                }
@@ -141,7 +144,7 @@ controlpassword:
 
 bindsock:
        BINDSOCK EQSIGN bind_cred {
-               if (!parse_bind_line (cfg, $3, 0)) {
+               if (!parse_bind_line (cfg, $3, CRED_NORMAL)) {
                        yyerror ("yyparse: parse_bind_line");
                        YYERROR;
                }               
@@ -659,6 +662,85 @@ statfile_pool_size:
                cfg->max_statfile_size = $3;
        }
        ;
+
+lmtp:
+       LMTP OBRACE lmtpbody EBRACE
+       ;
+
+lmtpbody:
+       lmtpcmd SEMICOLON
+       | lmtpbody lmtpcmd SEMICOLON
+       ;
+
+lmtpcmd:
+       lmtpenabled
+       | lmtpsock
+       | lmtpmetric
+       ;
+
+lmtpenabled:
+       ENABLED EQSIGN FLAG {
+               cfg->lmtp_enable = $3;
+       }
+       ;
+
+lmtpsock:
+       BINDSOCK EQSIGN bind_cred {
+               if (!parse_bind_line (cfg, $3, CRED_LMTP)) {
+                       yyerror ("yyparse: parse_bind_line");
+                       YYERROR;
+               }
+               free ($3);
+       }
+       ;
+lmtpmetric:
+       METRIC EQSIGN QUOTEDSTRING {
+               cfg->lmtp_metric = memory_pool_strdup (cfg->cfg_pool, $3);
+       }
+       ;
+
+delivery:
+       DELIVERY OBRACE deliverybody EBRACE
+       ;
+
+deliverybody:
+       deliverycmd SEMICOLON
+       | deliverybody deliverycmd SEMICOLON
+       ;
+
+deliverycmd:
+       deliveryenabled
+       | deliverysock
+       | deliveryagent
+       | deliverylmtp
+       ;
+
+deliveryenabled:
+       ENABLED EQSIGN FLAG {
+               cfg->delivery_enable = $3;
+       }
+       ;
+
+deliverysock:
+       BINDSOCK EQSIGN bind_cred {
+               if (!parse_bind_line (cfg, $3, CRED_DELIVERY)) {
+                       yyerror ("yyparse: parse_bind_line");
+                       YYERROR;
+               }
+               free ($3);
+       }
+       ;
+deliverylmtp:
+       LMTP EQSIGN FLAG {
+               cfg->deliver_lmtp = $3;
+       }
+       ;
+deliveryagent:
+       AGENT EQSIGN QUOTEDSTRING {
+               cfg->deliver_agent_path = memory_pool_strdup (cfg->cfg_pool, $3);
+       }
+       ;
+
 %%
 /* 
  * vi:ts=4 
index df8a2265f14cca2176b779a61c2e7c696c3f1d29..2785e18d9cf8a4a008f9f0a2a705fb47e46e8d4f 100644 (file)
@@ -81,85 +81,107 @@ add_memcached_server (struct config_file *cf, char *str)
 }
 
 int
-parse_bind_line (struct config_file *cf, char *str, char is_control)
+parse_bind_line (struct config_file *cf, char *str, enum rspamd_cred_type type)
 {
        char *cur_tok, *err_str;
        struct hostent *hent;
        size_t s;
+       char **host;
+       int16_t *family, *port;
+       struct in_addr *addr;
        
        if (str == NULL) return 0;
        cur_tok = strsep (&str, ":");
+
+       switch (type) {
+               case CRED_NORMAL:
+                       host = &cf->bind_host;
+                       port = &cf->bind_port;
+                       *port = DEFAULT_BIND_PORT;
+                       family = &cf->bind_family;
+                       addr = &cf->bind_addr;
+                       break;
+               case CRED_CONTROL:
+                       host = &cf->control_host;
+                       port = &cf->control_port;
+                       *port = DEFAULT_CONTROL_PORT;
+                       family = &cf->control_family;
+                       addr = &cf->control_addr;
+                       break;
+               case CRED_LMTP:
+                       host = &cf->lmtp_host;
+                       port = &cf->lmtp_port;
+                       *port = DEFAULT_LMTP_PORT;
+                       family = &cf->lmtp_family;
+                       addr = &cf->lmtp_addr;
+                       break;
+               case CRED_DELIVERY:
+                       host = &cf->deliver_host;
+                       port = &cf->deliver_port;
+                       *port = 25;
+                       family = &cf->deliver_family;
+                       addr = &cf->deliver_addr;
+                       break;
+       }
        
        if (cur_tok[0] == '/' || cur_tok[0] == '.') {
-               if (is_control) {
-                       cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
-                       cf->control_family = AF_UNIX;
-               }
-               else {
-                       cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
-                       cf->bind_family = AF_UNIX;
-               }
-               return 1;
-
-       } else {
-               if (str == '\0') {
-                       if (is_control) {
-                               cf->control_port = DEFAULT_CONTROL_PORT;
+#ifdef HAVE_DIRNAME
+               /* Try to check path of bind credit */
+               struct stat st;
+               int fd;
+               char *copy = memory_pool_strdup (cf->cfg_pool, cur_tok);
+               if (stat (copy, &st) == -1) {
+                       if (errno == ENOENT) {
+                               if ((fd = open (cur_tok, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
+                                       yyerror ("parse_bind_line: cannot open path %s for making socket, %m", cur_tok);
+                                       return 0;
+                               }
+                               else {
+                                       close (fd);
+                                       unlink (cur_tok);
+                               }
                        }
                        else {
-                               cf->bind_port = DEFAULT_BIND_PORT;
+                               yyerror ("parse_bind_line: cannot stat path %s for making socket, %m", cur_tok);
+                               return 0;
                        }
                }
                else {
-                       if (is_control) {
-                               cf->control_port = (uint16_t)strtoul (str, &err_str, 10);
-                       }
-                       else {
-                               cf->bind_port = (uint16_t)strtoul (str, &err_str, 10);
+                       if (unlink (cur_tok) == -1) {
+                               yyerror ("parse_bind_line: cannot remove path %s for making socket, %m", cur_tok);
+                               return 0;
                        }
+               }
+#endif
+               *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+               *family = AF_UNIX;
+               return 1;
+
+       } else {
+               if (*str != '\0') {
+                       *port = (uint16_t)strtoul (str, &err_str, 10);
                        if (*err_str != '\0') {
+                               yyerror ("parse_bind_line: cannot read numeric value: %s", err_str);
                                return 0;
                        }
                }
                
-               if (is_control) {
-                       if (!inet_aton (cur_tok, &cf->control_addr)) {
-                               /* Try to call gethostbyname */
-                               hent = gethostbyname (cur_tok);
-                               if (hent == NULL) {
-                                       return 0;
-                               }
-                               else {
-                                       cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
-                                       memcpy((char *)&cf->control_addr, hent->h_addr, sizeof(struct in_addr));
-                                       s = strlen (cur_tok) + 1;
-                               }
+               if (!inet_aton (cur_tok, addr)) {
+                       /* Try to call gethostbyname */
+                       hent = gethostbyname (cur_tok);
+                       if (hent == NULL) {
+                               return 0;
                        }
                        else {
-                               cf->control_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+                               *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
+                               memcpy((char *)addr, hent->h_addr, sizeof(struct in_addr));
+                               s = strlen (cur_tok) + 1;
                        }
-
-                       cf->control_family = AF_INET;
                }
                else {
-                       if (!inet_aton (cur_tok, &cf->bind_addr)) {
-                               /* Try to call gethostbyname */
-                               hent = gethostbyname (cur_tok);
-                               if (hent == NULL) {
-                                       return 0;
-                               }
-                               else {
-                                       cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
-                                       memcpy((char *)&cf->bind_addr, hent->h_addr, sizeof(struct in_addr));
-                                       s = strlen (cur_tok) + 1;
-                               }
-                       }
-                       else {
-                               cf->bind_host = memory_pool_strdup (cf->cfg_pool, cur_tok);
-                       }
-
-                       cf->bind_family = AF_INET;
+                       *host = memory_pool_strdup (cf->cfg_pool, cur_tok);
                }
+               *family = AF_INET;
 
                return 1;
        }
@@ -191,6 +213,7 @@ init_defaults (struct config_file *cfg)
        cfg->composite_symbols = g_hash_table_new (g_str_hash, g_str_equal);
        cfg->statfiles = g_hash_table_new (g_str_hash, g_str_equal);
        cfg->cfg_params = g_hash_table_new (g_str_hash, g_str_equal);
+       cfg->lmtp_metric = "default";
 
        def_metric = memory_pool_alloc (cfg->cfg_pool, sizeof (struct metric));
        def_metric->name = "default";
@@ -512,6 +535,11 @@ fill_cfg_params (struct config_file *cfg)
 void
 post_load_config (struct config_file *cfg)
 {
+       if (cfg->lmtp_enable && !cfg->delivery_enable) {
+               yywarn ("post_load_config: lmtp is enabled, but delivery is not enabled, disabling lmtp");
+               cfg->lmtp_enable = FALSE;
+       }
+
        g_hash_table_foreach (cfg->variables, substitute_all_variables, cfg);
        g_hash_table_foreach (cfg->modules_opts, substitute_module_variables, cfg);
        parse_filters_str (cfg, cfg->header_filters_str, SCRIPT_HEADER);
index 8128d7356945235fa4d4859257d4bf34eeddc4a6..2b1b5099e2c8ff8d698c691139757cde3760e4ec 100644 (file)
@@ -534,7 +534,7 @@ start_controller (struct rspamd_worker *worker)
        }
        else {
                un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
-               if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->bind_host, un_addr)) == -1) {
+               if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->control_host, un_addr)) == -1) {
                        msg_err ("start_controller: cannot create unix listen socket. %m");
                        exit(-errno);
                }
index 75be14442a8818b5fada4ac558a13f5ce8e1f220..82d0b095e7c484bfbc58c9a9101b45faa23bbfd5 100644 (file)
@@ -91,6 +91,37 @@ fstrstr (f_str_t *orig, f_str_t *pattern)
 
 }
 
+/*
+ * Search for pattern in orig ignoring case
+ */
+ssize_t
+fstrstri (f_str_t *orig, f_str_t *pattern)
+{
+       register ssize_t cur = 0, pcur = 0;
+
+       if (pattern->len > orig->len) {
+               return -1;
+       }
+
+       while (cur < orig->len) {
+               if (tolower (*(orig->begin + cur)) == tolower (*pattern->begin)) {
+                       while (cur < orig->len && pcur < pattern->len) {
+                               if (tolower (*(orig->begin + cur)) != tolower (*(pattern->begin + pcur))) {
+                                       pcur = 0;
+                                       break;
+                               }
+                               cur ++;
+                               pcur ++;
+                       }
+                       return cur - pattern->len;
+               }
+               cur ++;
+       }
+
+       return -1;
+
+}
+
 /*
  * Split string by tokens
  * word contains parsed word
index 000ba74c6959c30bb49ff8d0632431e524f0d5cf..f5d7fffa3e368b9fc3d7e252f46e0c9034aaf66c 100644 (file)
@@ -42,6 +42,11 @@ ssize_t fstrrchr (f_str_t *src, char c);
  */
 ssize_t fstrstr (f_str_t *orig, f_str_t *pattern);
 
+/*
+ * Search for pattern in orig ignoring case
+ */
+ssize_t fstrstri (f_str_t *orig, f_str_t *pattern);
+
 /*
  * Split string by tokens
  * word contains parsed word
@@ -88,7 +93,6 @@ f_str_t* fstrgrow (memory_pool_t *pool, f_str_t *orig, size_t newlen);
  */
 uint32_t fstrhash (f_str_t *str);
 
-
 /*
  * Make copy of string to 0-terminated string
  */
diff --git a/src/lmtp.c b/src/lmtp.c
new file mode 100644 (file)
index 0000000..ba03cd9
--- /dev/null
@@ -0,0 +1,314 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "buffer.h"
+#include "main.h"
+#include "lmtp.h"
+#include "lmtp_proto.h"
+#include "cfg_file.h"
+#include "url.h"
+#include "modules.h"
+#include "message.h"
+
+static char greetingbuf[1024];
+static struct timeval io_tv;
+
+static void write_socket (void *arg);
+
+static 
+void sig_handler (int signo)
+{
+       switch (signo) {
+               case SIGINT:
+               case SIGTERM:
+                       _exit (1);
+                       break;
+       }
+}
+
+/*
+ * Config reload is designed by sending sigusr to active workers and pending shutdown of them
+ */
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       /* Do not accept new connections, preparing to end worker's process */
+       struct timeval tv;
+       tv.tv_sec = SOFT_SHUTDOWN_TIME;
+       tv.tv_usec = 0;
+       event_del (&worker->sig_ev);
+       event_del (&worker->bind_ev);
+       do_reopen_log = 1;
+       msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+       event_loopexit (&tv);
+       return;
+}
+
+/*
+ * Destructor for recipients list
+ */
+static void
+rcpt_destruct (void *pointer)
+{
+       struct worker_task *task = (struct worker_task *)pointer;
+
+       if (task->rcpt) {
+               g_list_free (task->rcpt);
+       }
+}
+
+/*
+ * Free all structures of lmtp proto
+ */
+static void
+free_task (struct rspamd_lmtp_proto *lmtp)
+{
+       GList *part;
+       struct mime_part *p;
+
+       if (lmtp) {
+               msg_debug ("free_task: free pointer %p", lmtp->task);
+               if (lmtp->task->memc_ctx) {
+                       memc_close_ctx (lmtp->task->memc_ctx);
+               }
+               while ((part = g_list_first (lmtp->task->parts))) {
+                       lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part);
+                       p = (struct mime_part *)part->data;
+                       g_byte_array_free (p->content, FALSE);
+                       g_list_free_1 (part);
+               }
+               memory_pool_delete (lmtp->task->task_pool);
+               /* Plan dispatcher shutdown */
+               lmtp->task->dispatcher->wanna_die = 1;
+               close (lmtp->task->sock);
+               g_free (lmtp->task);
+               g_free (lmtp);
+       }
+}
+
+/*
+ * Callback that is called when there is data to read in buffer
+ */
+static void
+read_socket (f_str_t *in, void *arg)
+{
+       struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
+       struct worker_task *task = lmtp->task;
+       ssize_t r;
+
+       switch (task->state) {
+               case READ_COMMAND:
+               case READ_HEADER:
+                       if (read_lmtp_input_line (lmtp, in) != 0) {
+                               msg_info ("read_lmtp_socket: closing lmtp connection due to protocol error");
+                               lmtp->task->state = CLOSING_CONNECTION;
+                       }
+                       /* Task was read, recall read handler once more with new state to process message and write reply */
+                       if (task->state == READ_MESSAGE) {
+                               read_socket (in, arg);
+                       }
+                       break;
+               case READ_MESSAGE:
+                       r = process_message (lmtp->task);
+                       r = process_filters (lmtp->task);
+                       if (r == -1) {
+                               task->last_error = "Filter processing error";
+                               task->error_code = LMTP_FAILURE;
+                               task->state = WRITE_ERROR;
+                               write_socket (lmtp);
+                       }
+                       else if (r == 0) {
+                               task->state = WAIT_FILTER;
+                               rspamd_dispatcher_pause (lmtp->task->dispatcher);
+                       }
+                       else {
+                               process_statfiles (lmtp->task);
+                               task->state = WRITE_REPLY;
+                               write_socket (lmtp);
+                       }
+                       break;
+       }
+}
+
+/*
+ * Callback for socket writing
+ */
+static void
+write_socket (void *arg)
+{
+       struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
+       
+       switch (lmtp->task->state) {
+               case WRITE_REPLY:
+                       write_lmtp_reply (lmtp);
+                       lmtp->task->state = CLOSING_CONNECTION;
+                       break;
+               case WRITE_ERROR:
+                       write_lmtp_reply (lmtp);
+                       lmtp->task->state = CLOSING_CONNECTION;
+                       break;
+               case CLOSING_CONNECTION:
+                       msg_debug ("lmtp_write_socket: normally closing connection");
+                       free_task (lmtp);
+                       break;
+       }
+}
+
+/*
+ * Called if something goes wrong
+ */
+static void
+err_socket (GError *err, void *arg)
+{
+       struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg;
+       msg_info ("lmtp_err_socket: abnormally closing connection, error: %s", err->message);
+       /* Free buffers */
+       free_task (lmtp);
+}
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_socket (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       struct sockaddr_storage ss;
+       struct worker_task *new_task;
+       struct rspamd_lmtp_proto *lmtp;
+       socklen_t addrlen = sizeof(ss);
+       int nfd, on = 1;
+       struct linger linger;
+
+       if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+               return;
+       }
+       if (event_make_socket_nonblocking(fd) < 0) {
+               return;
+       }
+
+       /* Socket options */
+       setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on));
+       setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on));
+       linger.l_onoff = 1;
+       linger.l_linger = 2;
+       setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger));
+
+       lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto));
+       new_task = g_malloc (sizeof (struct worker_task));
+       bzero (new_task, sizeof (struct worker_task));
+       new_task->worker = worker;
+       new_task->state = READ_COMMAND;
+       new_task->sock = nfd;
+       new_task->cfg = worker->srv->cfg;
+       TAILQ_INIT (&new_task->urls);
+       new_task->task_pool = memory_pool_new (memory_pool_get_size ());
+       /* Add destructor for recipients list (it would be better to use anonymous function here */
+       memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
+       new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
+       memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
+       worker->srv->stat->connections_count ++;
+       lmtp->task = new_task;
+       lmtp->state = LMTP_READ_LHLO;
+
+       /* Set up dispatcher */
+       new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
+                                                                                                               write_socket, err_socket, &io_tv,
+                                                                                                               (void *)lmtp);
+       rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, strlen (greetingbuf), FALSE);
+}
+
+/*
+ * Start lmtp worker process
+ */
+void
+start_lmtp_worker (struct rspamd_worker *worker)
+{
+       struct sigaction signals;
+       int listen_sock, i;
+       struct sockaddr_un *un_addr;
+       char *hostbuf;
+       long int hostmax;
+
+       worker->srv->pid = getpid ();
+       worker->srv->type = TYPE_LMTP;
+       event_init ();
+       g_mime_init (0);
+
+       init_signals (&signals, sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* SIGUSR2 handler */
+       signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+       signal_add (&worker->sig_ev, NULL);
+       
+       /* Create listen socket */
+       if (worker->srv->cfg->lmtp_family == AF_INET) {
+               if ((listen_sock = make_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port)) == -1) {
+                       msg_err ("start_lmtp: cannot create tcp listen socket. %m");
+                       exit(-errno);
+               }
+       }
+       else {
+               un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
+               if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr)) == -1) {
+                       msg_err ("start_lmtp: cannot create unix listen socket. %m");
+                       exit(-errno);
+               }
+       }
+       
+       if (listen (listen_sock, -1) == -1) {
+               msg_err ("start_lmtp: cannot listen on socket. %m");
+               exit(-errno);
+       }
+       /* Accept event */
+       event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_add(&worker->bind_ev, NULL);
+
+       /* Perform modules configuring */
+       for (i = 0; i < MODULES_NUM; i ++) {
+               modules[i].module_config_func (worker->srv->cfg);
+       }
+
+       /* Fill hostname buf */
+       hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
+       hostbuf = alloca (hostmax);
+       gethostname (hostbuf, hostmax);
+       hostbuf[hostmax - 1] = '\0';
+       snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf);
+
+       /* Send SIGUSR2 to parent */
+       kill (getppid (), SIGUSR2);
+
+       io_tv.tv_sec = WORKER_IO_TIMEOUT;
+       io_tv.tv_usec = 0;
+
+       event_loop (0);
+}
+
+/* 
+ * vi:ts=4 
+ */
diff --git a/src/lmtp.h b/src/lmtp.h
new file mode 100644 (file)
index 0000000..d7c13c4
--- /dev/null
@@ -0,0 +1,20 @@
+#ifndef RSPAMD_LMTP_H
+#define RSPAMD_LMTP_H
+
+#include "config.h"
+#include "main.h"
+
+#define LMTP_GREETING       220
+#define LMTP_QUIT       221
+#define LMTP_OK         250
+#define LMTP_DATA       354
+#define LMTP_ERROR_PROCESS  500
+#define LMTP_FAILURE        530
+#define LMTP_AUTH_ERROR     503
+#define LMTP_BAD_CMD        503
+#define LMTP_NO_RCPT        554
+#define LMTP_TEMP_FAIL      421
+
+void start_lmtp_worker (struct rspamd_worker *worker);
+
+#endif
diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c
new file mode 100644 (file)
index 0000000..df53f69
--- /dev/null
@@ -0,0 +1,380 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "cfg_file.h"
+#include "lmtp.h"
+#include "lmtp_proto.h"
+
+/* Max line size as it is defined in rfc2822 */
+#define OUTBUFSIZ 1000
+
+/* LMTP commands */
+static f_str_t lhlo_command = {
+       .begin = "LHLO",
+       .len = sizeof ("LHLO") - 1
+};
+static f_str_t mail_command = {
+       .begin = "MAIL FROM:",
+       .len = sizeof ("MAIL FROM:") - 1
+};
+static f_str_t rcpt_command = {
+       .begin = "RCPT TO:",
+       .len = sizeof ("RCPT TO:") - 1
+};
+static f_str_t data_command = {
+       .begin = "DATA",
+       .len = sizeof ("DATA") - 1
+};
+static f_str_t data_dot = {
+       .begin = ".\r\n",
+       .len = sizeof (".\r\n") - 1
+};
+
+static void
+out_lmtp_reply (struct rspamd_lmtp_proto *lmtp, int code, char *rcode, char *msg)
+{
+       char outbuf[OUTBUFSIZ];
+       int r;
+       
+       if (*rcode == '\0') {
+               r = snprintf (outbuf, OUTBUFSIZ, "%d %s\r\n", code, msg);
+       }
+       else {
+               r = snprintf (outbuf, OUTBUFSIZ, "%d %s %s\r\n", code, rcode, msg);
+       }
+       rspamd_dispatcher_write (lmtp->task->dispatcher, outbuf, r, FALSE);
+}
+
+int 
+read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line)
+{
+       char *c, *rcpt;
+       unsigned int i = 0, l = 0, size;
+
+       switch (lmtp->state) {
+               case LMTP_READ_LHLO:
+                       /* Search LHLO line */
+                       if ((i = fstrstri (line, &lhlo_command)) == -1) {
+                               msg_info ("read_lmtp_input_line: LHLO expected but not found");
+                               out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need LHLO here");
+                               return -1;
+                       }
+                       else {
+                               i += lhlo_command.len;
+                               c = line->begin + i;
+                               /* Skip spaces */
+                               while (isspace (*c) && i < line->len) {
+                                       i ++;
+                                       c ++;
+                               }
+                               lmtp->task->helo = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1);
+                               /* Strlcpy makes string null terminated by design */
+                               g_strlcpy (lmtp->task->helo, c, line->len - i + 1);
+                               lmtp->state = LMTP_READ_FROM;
+                               out_lmtp_reply (lmtp, LMTP_OK, "", "Ok");
+                               return 0;
+                       }
+                       break;
+               case LMTP_READ_FROM:
+                       /* Search MAIL FROM: line */
+                       if ((i = fstrstri (line, &mail_command)) == -1) {
+                               msg_info ("read_lmtp_input_line: MAIL expected but not found");
+                               out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need MAIL here");
+                               return -1;
+                       }
+                       else {
+                               i += mail_command.len;
+                               c = line->begin + i;
+                               /* Get data from brackets (<>)*/
+                               while (*c++ != '<' && i < line->len) {
+                                       i ++;
+                               }
+                               while (*c != '>' && i < line->len) {
+                                       l ++;
+                                       c ++;
+                                       i ++;
+                               }
+
+                               lmtp->task->from = memory_pool_alloc (lmtp->task->task_pool, l + 1);
+                               /* Strlcpy makes string null terminated by design */
+                               g_strlcpy (lmtp->task->from, c - l, l + 1);
+                               lmtp->state = LMTP_READ_RCPT;
+                               out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Sender ok");
+                               return 0;
+                       }
+                       break;
+               case LMTP_READ_RCPT:
+                       /* Search RCPT_TO: line */
+                       if ((i = fstrstri (line, &rcpt_command)) == -1) {
+                               msg_info ("read_lmtp_input_line: RCPT expected but not found");
+                               out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need RCPT here");
+                               return -1;
+                       }
+                       else {
+                               i += rcpt_command.len;
+                               c = line->begin + i;
+                               /* Get data from brackets (<>)*/
+                               while (*c++ != '<' && i < line->len) {
+                                       i ++;
+                               }
+                               while (*c != '>' && i < line->len) {
+                                       l ++;
+                                       c ++;
+                                       i ++;
+                               }
+                               rcpt = memory_pool_alloc (lmtp->task->task_pool, l + 1);
+                               /* Strlcpy makes string null terminated by design */
+                               g_strlcpy (rcpt, c - l, l + 1);
+                               lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
+                               lmtp->state = LMTP_READ_DATA;
+                               out_lmtp_reply (lmtp, LMTP_OK, "2.1.0", "Recipient ok");
+                               return 0;
+                       }
+                       break;
+               case LMTP_READ_DATA:
+                       /* Search DATA line */
+                       if ((i = fstrstri (line, &data_command)) == -1) {
+                               msg_info ("read_lmtp_input_line: DATA expected but not found");
+                               out_lmtp_reply (lmtp, LMTP_BAD_CMD, "5.0.0", "Need DATA here");
+                               return -1;
+                       }
+                       else {
+                               i += rcpt_command.len;
+                               c = line->begin + i;
+                               /* Skip spaces */
+                               while (isspace (*c++)) {
+                                       i ++;
+                               }
+                               rcpt = memory_pool_alloc (lmtp->task->task_pool, line->len - i + 1);
+                               /* Strlcpy makes string null terminated by design */
+                               g_strlcpy (rcpt, c, line->len - i + 1);
+                               lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt);
+                               lmtp->state = LMTP_READ_MESSAGE;
+                               out_lmtp_reply (lmtp, LMTP_DATA, "", "Enter message, ending with \".\" on a line by itself");
+                               lmtp->task->msg = fstralloc (lmtp->task->task_pool, BUFSIZ);
+                               return 0;
+                       }
+                       break;
+               case LMTP_READ_MESSAGE:
+                       if (strncmp (line->begin, data_dot.begin, line->len) == 0) {
+                               lmtp->state = LMTP_READ_DOT;
+                               lmtp->task->state = READ_MESSAGE;
+                               return 0;
+                       }
+                       else {
+                               l = lmtp->task->msg->len;
+                               size = lmtp->task->msg->size;
+                               if (l + line->len > size) {
+                                       /* Grow buffer */
+                                       if (line->len > size) {
+                                               size += line->len << 1;
+                                       }
+                                       else {
+                                               /* size *= 2 */
+                                               size <<= 1;
+                                       }
+                                       lmtp->task->msg = fstrgrow (lmtp->task->task_pool, lmtp->task->msg, size);
+                               }
+                               fstrcat (lmtp->task->msg, line);
+                               return 0;
+                       }
+                       break;
+               case LMTP_READ_DOT:
+                       /* We have some input after reading dot, close connection as we have no currently support of multiply 
+                        * messages per session
+                        */
+                       out_lmtp_reply (lmtp, LMTP_QUIT, "", "Bye");
+                       return 0;
+                       break;
+       }       
+}
+
+static char*
+format_lda_args (struct worker_task *task)
+{
+       char *arg, *res, *c, *r;
+       size_t len;
+       GList *rcpt;
+       gboolean got_args = FALSE;
+
+       c = task->cfg->deliver_agent_path;
+       /* Find first arg */
+       if ((c = strchr (c, ' ')) == NULL) {
+               return task->cfg->deliver_agent_path;
+       }
+       
+       /* Calculate length of result string */
+       len = strlen (task->cfg->deliver_agent_path);
+       while (*c) {
+               if (*c == '%') {
+                       c++;
+                       switch (*c) {
+                               case 'f':
+                                       /* Insert from */
+                                       len += strlen (task->from) - 2;
+                                       break;
+                               case 'r':
+                                       /* Insert list of recipients */
+                                       rcpt = g_list_first (task->rcpt);
+                                       len -= 2;
+                                       while (rcpt) {
+                                               len += strlen ((char *)rcpt->data) + 1;
+                                               rcpt = g_list_next (rcpt);
+                                       }
+                                       break;
+                       }
+               }
+               c ++;
+               len ++;
+       }
+       res = memory_pool_alloc (task->task_pool, len + 1);
+       r = res;
+       c = task->cfg->deliver_agent_path;
+       
+       while (*c) {
+               if (*c == ' ') {
+                       got_args = TRUE;
+               }
+               if (got_args && *c == '%') {
+                       switch (*(c + 1)) {
+                               case 'f':
+                                       /* Insert from */
+                                       c += 2;
+                                       len = strlen (task->from);
+                                       memcpy (r, task->from, len);
+                                       r += len;
+                                       break;
+                               case 'r':
+                                       /* Insert list of recipients */
+                                       c += 2;
+                                       rcpt = g_list_first (task->rcpt);
+                                       while (rcpt) {
+                                               len = strlen ((char *)rcpt->data) + 1;
+                                               memcpy (r, rcpt->data, len);
+                                               r += len;
+                                               *r++ = ' ';
+                                               rcpt = g_list_next (rcpt);
+                                       }
+                                       break;
+                               default:
+                                       *r = *c;
+                                       r ++;
+                                       c ++;
+                                       break;
+                       }
+               }
+               else {
+                       *r = *c;
+                       r ++;
+                       c ++;
+               }
+       }
+
+       return res;
+}
+
+static int
+lmtp_deliver_lda (struct worker_task *task)
+{
+       char *args;
+       FILE *lda;
+       GMimeStream *stream;
+       int rc, ecode;
+
+       if ((args = format_lda_args (task)) == NULL) {
+               return -1;
+       }
+
+       lda = popen (args, "w");
+       if (lda == NULL) {
+               msg_info ("lmtp_deliver_lda: cannot deliver to lda, %m");
+               return -1;
+       }
+
+       stream = g_mime_stream_file_new (lda);
+
+       if (g_mime_object_write_to_stream ((GMimeObject *)task->message, stream) == -1) {
+               msg_info ("lmtp_deliver_lda: cannot write stream to lda");
+               return -1;
+       }
+
+       rc = pclose (lda);
+       if (rc == -1) {
+               msg_info ("lmtp_deliver_lda: lda returned error code");
+               return -1;
+       }
+       else if (WIFEXITED (rc)) {
+               ecode = WEXITSTATUS (rc);
+               if (ecode == 0) {
+                       return 0;
+               }
+               else {
+                       msg_info ("lmtp_deliver_lda: lda returned error code %d", ecode);
+                       return -1;
+               }
+       }
+}
+
+int
+lmtp_deliver_message (struct worker_task *task)
+{
+       if (task->cfg->deliver_agent_path != NULL) {
+               /* Do deliver to LDA */
+               return lmtp_deliver_lda (task);
+       }
+       else {
+               /* XXX: do lmtp/smtp client */
+               return -1;
+       }
+}
+
+int
+write_lmtp_reply (struct rspamd_lmtp_proto *lmtp)
+{
+       int r;
+       char outbuf[OUTBUFSIZ];
+
+       msg_debug ("write_lmtp_reply: writing reply to client");
+       if (lmtp->task->error_code != 0) {
+               out_lmtp_reply (lmtp, lmtp->task->error_code, "", lmtp->task->last_error);
+       }
+       else {
+               /* Do delivery */
+               if (lmtp_deliver_message (lmtp->task) == -1) {
+                       out_lmtp_reply (lmtp, LMTP_FAILURE, "", "Delivery failure");
+                       return -1;
+               }
+               else {
+                       out_lmtp_reply (lmtp, LMTP_OK, "", "Delivery completed");
+               }
+       }
+
+       return 0;
+}
+
+/* 
+ * vi:ts=4 
+ */
diff --git a/src/lmtp_proto.h b/src/lmtp_proto.h
new file mode 100644 (file)
index 0000000..24cba2c
--- /dev/null
@@ -0,0 +1,44 @@
+#ifndef RSPAMD_LMTP_PROTO_H
+#define RSPAMD_LMTP_PROTO_H
+
+#include "config.h"
+
+struct worker_task;
+
+enum lmtp_state {
+       LMTP_READ_LHLO,
+       LMTP_READ_FROM,
+       LMTP_READ_RCPT,
+       LMTP_READ_DATA,
+       LMTP_READ_MESSAGE,
+       LMTP_READ_DOT,
+};
+
+struct rspamd_lmtp_proto {
+       struct worker_task *task;
+       enum lmtp_state state;
+};
+
+/**
+ * Read one line of user's input for specified task
+ * @param lmtp lmtp object
+ * @param line line of user's input
+ * @return 0 if line was successfully parsed and -1 if we have protocol error
+ */
+int read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, f_str_t *line);
+
+/**
+ * Deliver message via lmtp/smtp or pipe to LDA
+ * @param task task object
+ * @return 0 if we wrote message and -1 if there was some error
+ */
+int lmtp_deliver_message (struct worker_task *task);
+
+/**
+ * Write reply for specified lmtp object
+ * @param lmtp lmtp object
+ * @return 0 if we wrote reply and -1 if there was some error
+ */
+int write_lmtp_reply (struct rspamd_lmtp_proto *lmtp);
+
+#endif
index 782f0acff99007f2d4344d989bf159e66b773cbd..6a170726a15167e9982aa95c9a8385cdad07ba92 100644 (file)
@@ -27,6 +27,7 @@
 #include "cfg_file.h"
 #include "util.h"
 #include "perl.h"
+#include "lmtp.h"
 
 /* 2 seconds to fork new process in place of dead one */
 #define SOFT_FORK_TIME 2
@@ -178,6 +179,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
                                                msg_info ("fork_worker: starting controller process %d", getpid ());
                                                start_controller (cur);
                                                break;
+                                       case TYPE_LMTP:
+                                               setproctitle ("lmtp process");
+                                               pidfile_close (rspamd->pfh);
+                                               msg_info ("fork_worker: starting lmtp process %d", getpid ());
+                                               start_lmtp_worker (cur);
                                        case TYPE_WORKER:
                                        default:
                                                setproctitle ("worker process");
@@ -368,6 +374,11 @@ main (int argc, char **argv, char **env)
        if (cfg->controller_enabled) {
                fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
        }
+       
+       /* Start lmtp if enabled */
+       if (cfg->lmtp_enable) {
+               fork_worker (rspamd, listen_sock, 0, TYPE_LMTP);
+       }
 
        /* Signal processing cycle */
        for (;;) {
@@ -394,17 +405,17 @@ main (int argc, char **argv, char **env)
                                        if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
                                                /* Normal worker termination, do not fork one more */
                                                msg_info ("main: %s process %d terminated normally", 
-                                                                       (cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid);
+                                                                       (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
                                        }
                                        else {
                                                if (WIFSIGNALED (res)) {
                                                        msg_warn ("main: %s process %d terminated abnormally by signal: %d", 
-                                                                               (cur->type == TYPE_WORKER) ? "worker" : "controller",
+                                                                               (cur->type != TYPE_WORKER) ? "controller" : "worker",
                                                                                cur->pid, WTERMSIG(res));
                                                }
                                                else {
                                                        msg_warn ("main: %s process %d terminated abnormally", 
-                                                                               (cur->type == TYPE_WORKER) ? "worker" : "controller", cur->pid);
+                                                                               (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
                                                }
                                                /* Fork another worker in replace of dead one */
                                                delay_fork (cur->type);
index ec74ad03bedc22cbef9384fbc66097f470a9a3ef..270817a37a1060e5b3c5c037ce5e48361a935d3e 100644 (file)
@@ -22,6 +22,8 @@
 #define SOFT_SHUTDOWN_TIME 60
 /* Default metric name */
 #define DEFAULT_METRIC "default"
+/* 60 seconds for worker's IO */
+#define WORKER_IO_TIMEOUT 60
 
 /* Logging in postfix style */
 #define msg_err g_critical
@@ -36,6 +38,7 @@ enum process_type {
        TYPE_MAIN,
        TYPE_WORKER,
        TYPE_CONTROLLER,
+       TYPE_LMTP,
 };
 
 /** 
index f3bbac8ce36dad3ed4d89a8a499220bfe6c10ddd..334e819cb6aa0ecd006c7ef8ec0347688b0fd0c0 100644 (file)
@@ -40,8 +40,6 @@
 #include <perl.h>                 /* from the Perl distribution     */
 
 #define TASK_POOL_SIZE 4095
-/* 60 seconds for worker's IO */
-#define WORKER_IO_TIMEOUT 60
 
 const f_str_t CRLF = {
        /* begin */"\r\n",