]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* Implement initial version of greylisting triplets storage
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 17 Jun 2010 16:25:48 +0000 (20:25 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 17 Jun 2010 16:25:48 +0000 (20:25 +0400)
* Fix issues with smtp worker

CMakeLists.txt
src/cfg_xml.c
src/fuzzy_storage.c
src/greylist.h [new file with mode: 0644]
src/greylist_storage.c [new file with mode: 0644]
src/logger.c
src/main.c
src/main.h
src/protocol.c
src/smtp.c

index 3ba509251fb0440f6311f77864beee5c2bacaed6..5abf157cbb5ed6b4070a85e48c5219a5df300b70 100644 (file)
@@ -405,6 +405,7 @@ SET(RSPAMDSRC       src/modules.c
                                src/fstring.c
                                src/fuzzy.c
                                src/fuzzy_storage.c
+                               src/greylist_storage.c
                                src/hash.c
                                src/html.c
                                src/lmtp.c
index 21c01e35909758a693eaf6ff94e7276228b64c50..bfe71c957bd3fb0e978c8f30d49148b5d7ac8373 100644 (file)
@@ -635,6 +635,10 @@ worker_handle_type (struct config_file *cfg, struct rspamd_xml_userdata *ctx, GH
                wrk->type = TYPE_FUZZY;
                wrk->has_socket = FALSE;
        }
+       else if (g_ascii_strcasecmp (data, "greylist") == 0) {
+               wrk->type = TYPE_GREYLIST;
+               wrk->has_socket = FALSE;
+       }
        else {
                msg_err ("unknown worker type: %s", data);
                return FALSE;
index 62f2711d7e30ddb45c34072ceeb59b73f776fa7c..843fe8f094b104bff3b65b793e959aa8a59cd574 100644 (file)
@@ -237,7 +237,7 @@ sigterm_handler (int fd, short what, void *arg)
        struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
        static struct timeval           tv = {
                .tv_sec = 0,
-               .tv_usec = 0,
+               .tv_usec = 0
        };
 
        mods = MOD_LIMIT + 1;
diff --git a/src/greylist.h b/src/greylist.h
new file mode 100644 (file)
index 0000000..b17004c
--- /dev/null
@@ -0,0 +1,47 @@
+#ifndef RSPAMD_GREYLIST_H
+#define RSPAMD_GREYLIST_H
+
+#include "config.h"
+
+#define CHECKSUM_SIZE 16
+/* 5 minutes */
+#define DEFAULT_GREYLIST_TIME 300
+/* 2 days */
+#define DEFAULT_EXPIRE_TIME 60 * 60 * 24 * 2
+
+/**
+ * Item in storage
+ */
+struct rspamd_grey_item {
+       time_t age;                                     /**< age of checksum                    */
+       guint8 data[CHECKSUM_SIZE];     /**< checksum of triplet                */
+};
+
+/**
+ * Protocol command that is used to work with greylist storage
+ */
+struct rspamd_grey_command {
+       enum {
+               GREY_CMD_ADD = 0,
+               GREY_CMD_CHECK,
+               GREY_CMD_DEL
+       } cmd;
+       gint version;
+       guint8 data[CHECKSUM_SIZE];
+};
+
+/**
+ * Reply packet
+ */
+struct rspamd_grey_reply {
+       enum {
+               GREY_OK = 0,
+               GREY_GREYLISTED,
+               GREY_EXPIRED,
+               GREY_NOT_FOUND,
+               GREY_ERR
+       } reply;
+};
+
+
+#endif
diff --git a/src/greylist_storage.c b/src/greylist_storage.c
new file mode 100644 (file)
index 0000000..47b769b
--- /dev/null
@@ -0,0 +1,358 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Store greylisting data in memory
+ */
+
+#include "config.h"
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "url.h"
+#include "modules.h"
+#include "message.h"
+#include "greylist.h"
+
+#ifdef WITH_JUDY
+#include <Judy.h>
+#endif
+
+/* Number of insuccessfull bind retries */
+#define MAX_RETRIES 40
+
+struct greylist_ctx {
+#ifdef WITH_JUDY
+       Pvoid_t                  jtree;
+#else
+       GTree                   *tree;
+#endif
+       time_t                   greylist_time;
+       time_t                   expire_time;
+};
+
+#ifndef HAVE_SA_SIGINFO
+static void
+sig_handler (int signo)
+#else
+static void
+sig_handler (int signo, siginfo_t *info, void *unused)
+#endif
+{
+       switch (signo) {
+       case SIGINT:
+               /* Ignore SIGINT as we should got SIGTERM after it anyway */
+               return;
+       case SIGTERM:
+#ifdef WITH_PROFILER
+               exit (0);
+#else
+               _exit (1);
+#endif
+               break;
+       }
+}
+
+static void
+sigterm_handler (int fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
+       static struct timeval           tv = {
+               .tv_sec = 0,
+               .tv_usec = 0
+       };
+
+       close (worker->cf->listen_sock);
+       (void)event_loopexit (&tv);
+}
+
+/*
+ * Config reload is designed by sending sigusr to active workers and pending shutdown of them
+ */
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
+       /* Do not accept new connections, preparing to end worker's process */
+       struct timeval                  tv;
+
+       tv.tv_sec = SOFT_SHUTDOWN_TIME;
+       tv.tv_usec = 0;
+       event_del (&worker->sig_ev);
+       event_del (&worker->bind_ev);
+       close (worker->cf->listen_sock);
+       do_reopen_log = 1;
+       msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+       event_loopexit (&tv);
+       return;
+}
+
+struct greylist_session {
+       struct rspamd_worker *worker;
+       int fd;
+       socklen_t salen;
+       struct sockaddr_storage sa;
+       guint8 *pos;
+       struct rspamd_grey_command cmd;
+};
+
+static gint
+grey_cmp (gconstpointer a, gconstpointer b, gpointer unused)
+{
+       return memcmp (a, b, CHECKSUM_SIZE);
+}
+
+static gint
+greylist_process_add_command (struct rspamd_grey_command *cmd, struct greylist_ctx *ctx)
+{
+       struct rspamd_grey_reply          reply;
+       struct rspamd_grey_item          *item, **pitem = NULL;
+       
+       item = g_malloc (sizeof (struct rspamd_grey_item));
+       item->age = time (NULL);
+       memcpy (item->data, cmd->data, CHECKSUM_SIZE);
+#ifdef WITH_JUDY
+
+       JHSI (pitem, ctx->jtree, item->data, CHECKSUM_SIZE);
+       if (pitem == PJERR) {
+               reply.reply = GREY_ERR;
+       }
+       else if (*pitem != 0) {
+               g_free (*pitem);
+               *pitem = item;
+       }
+       else {
+               *pitem = item;
+       }
+#else
+       g_tree_insert (ctx->tree, item->data, item);
+       reply.reply = GREY_OK;
+#endif
+
+       return reply.reply;
+}
+
+static gint
+greylist_process_delete_command (struct rspamd_grey_command *cmd, struct greylist_ctx *ctx)
+{
+       struct rspamd_grey_reply          reply;
+#ifdef WITH_JUDY
+       int                               rc;
+       struct rspamd_grey_item         **pitem = NULL;
+
+       JHSG (pitem, ctx->jtree, cmd->data, CHECKSUM_SIZE);
+       if (pitem != NULL) {
+               g_free (*pitem);
+               JHSD (rc, ctx->jtree, cmd->data, CHECKSUM_SIZE);
+               if (rc == 1) {
+                       reply.reply = GREY_OK;
+               }
+               else {
+                       reply.reply = GREY_NOT_FOUND;
+               }
+       }
+       else {
+               reply.reply = GREY_NOT_FOUND;
+       }
+#else
+       if(g_tree_remove (ctx->tree, cmd->data)) {
+               reply.reply = GREY_OK;
+       }
+       else {
+               reply.reply = GREY_NOT_FOUND;
+       }
+#endif
+       return reply.reply;
+}
+
+static gint
+greylist_process_check_command (struct rspamd_grey_command *cmd, struct greylist_ctx *ctx)
+{
+       struct rspamd_grey_reply          reply;
+       struct rspamd_grey_item          *item = NULL, **pitem = NULL;
+       time_t                            now;
+       
+       now = time (NULL);
+#ifdef WITH_JUDY
+       JHSG (pitem, ctx->jtree, cmd->data, CHECKSUM_SIZE);
+       if (pitem != NULL) {
+               item = *pitem;
+       }
+#else
+       item = g_tree_lookup (ctx->tree, cmd->data);
+#endif
+       if (item) {
+               if (now - item->age > ctx->expire_time) {
+                       /* Remove expired item */
+                       reply.reply = GREY_EXPIRED;
+                       greylist_process_delete_command (cmd, ctx);
+               }
+               else if (now - item->age > ctx->greylist_time) {
+                       reply.reply = GREY_OK;
+               }
+               else {
+                       reply.reply = GREY_GREYLISTED;
+               }
+       }
+       else {
+               reply.reply = GREY_NOT_FOUND;
+       }
+
+       return reply.reply;
+}
+
+#define CMD_PROCESS(x)                                                                                                                                                                                         \
+do {                                                                                                                                                                                                                           \
+       reply.reply = greylist_process_##x##_command (&session->cmd, (struct greylist_ctx *)session->worker->ctx);              \
+       if (sendto (session->fd, &reply, sizeof (reply), 0, (struct sockaddr *)&session->sa, session->salen) == -1) {   \
+               msg_err ("error while writing reply: %s", strerror (errno));                                                                                            \
+       }                                                                                                                                                                                                                               \
+} while(0)
+
+static void
+process_greylist_command (struct greylist_session *session)
+{
+       struct rspamd_grey_reply          reply;
+
+       switch (session->cmd.cmd) {
+       case GREY_CMD_CHECK:
+               CMD_PROCESS (check);
+               break;
+       case GREY_CMD_ADD:
+               CMD_PROCESS (add);
+               break;
+       case GREY_CMD_DEL:
+               CMD_PROCESS (delete);
+               break;
+       }
+}
+
+#undef CMD_PROCESS
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_greylist_socket (int fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
+       struct greylist_session         session;
+       ssize_t                         r;
+
+       session.worker = worker;
+       session.fd = fd;
+       session.pos = (guint8 *) & session.cmd;
+       session.salen = sizeof (session.sa);
+
+       /* Got some data */
+       if (what == EV_READ) {
+               if ((r = recvfrom (fd, session.pos, sizeof (struct rspamd_grey_command), MSG_WAITALL, (struct sockaddr *)&session.sa, &session.salen)) == -1) {
+                       msg_err ("got error while reading from socket: %d, %s", errno, strerror (errno));
+                       return;
+               }
+               else if (r == sizeof (struct rspamd_grey_command)) {
+                       /* Assume that the whole command was read */
+                       process_greylist_command (&session);
+               }
+               else {
+                       msg_err ("got incomplete data while reading from socket: %d, %s", errno, strerror (errno));
+                       return;
+               }
+       }
+}
+
+static gboolean
+config_greylist_worker (struct rspamd_worker *worker)
+{
+       struct greylist_ctx            *ctx;
+       char                           *value;
+
+       ctx = g_malloc0 (sizeof (struct greylist_ctx));
+#ifdef WITH_JUDY
+       ctx->jtree = NULL;
+#else
+       ctx->tree = g_tree_new_full (grey_cmp, NULL, NULL, g_free);
+#endif
+       
+       ctx->greylist_time = DEFAULT_GREYLIST_TIME;
+       ctx->expire_time = DEFAULT_EXPIRE_TIME;
+
+       if ((value = g_hash_table_lookup (worker->cf->params, "greylist_time")) != NULL) {
+               ctx->greylist_time = parse_seconds (value) / 1000;
+       }
+       if ((value = g_hash_table_lookup (worker->cf->params, "expire_time")) != NULL) {
+               ctx->expire_time = parse_seconds (value) / 1000;
+       }
+       worker->ctx = ctx;
+
+       return TRUE;
+}
+
+/*
+ * Start worker process
+ */
+void
+start_greylist_storage (struct rspamd_worker *worker)
+{
+       struct sigaction                signals;
+       struct event                    sev;
+       int                             retries = 0;
+
+       worker->srv->pid = getpid ();
+       worker->srv->type = TYPE_GREYLIST;
+
+       event_init ();
+
+       init_signals (&signals, sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* SIGUSR2 handler */
+       signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
+       signal_add (&worker->sig_ev, NULL);
+       signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
+       signal_add (&sev, NULL);
+
+       /* Accept event */
+       while ((worker->cf->listen_sock = make_udp_socket (&worker->cf->bind_addr, worker->cf->bind_port, TRUE, TRUE)) == -1) {
+               sleep (1);
+               if (++retries > MAX_RETRIES) {
+                       msg_err ("cannot bind to socket, exiting");
+                       exit (EXIT_SUCCESS);
+               }
+       }
+       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_greylist_socket, (void *)worker);
+       event_add (&worker->bind_ev, NULL);
+
+       gperf_profiler_init (worker->srv->cfg, "greylist");
+
+       if (!config_greylist_worker (worker)) {
+               msg_err ("cannot configure greylisting worker, exiting");
+               exit (EXIT_SUCCESS);
+       }
+
+       event_loop (0);
+       exit (EXIT_SUCCESS);
+}
index 282506df48cd2b868a49138a2e643321b68a2b67..722bcad72137885b8b20b64ae3b2f7109fd1a8a5 100644 (file)
@@ -505,6 +505,9 @@ file_log_function (const gchar * log_domain, const gchar *function, GLogLevelFla
                        case TYPE_FUZZY:
                                cptype = "fuzzy";
                                break;
+                       case TYPE_GREYLIST:
+                               cptype = "greylist";
+                               break;
                }
                if (function == NULL) {
                        r = rspamd_snprintf (tmpbuf, sizeof (tmpbuf), "#%P(%s): %s rspamd ", rspamd_log->pid, cptype, timebuf);
index 2b69afe38e8153361436e989ed85388a8709b59b..3365b67c4ef866801c29e51a3045237316a37a3c 100644 (file)
@@ -361,6 +361,12 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
                                msg_info ("starting fuzzy storage process %P", getpid ());
                                start_fuzzy_storage (cur);
                                break;
+                       case TYPE_GREYLIST:
+                               setproctitle ("greylist storage");
+                               pidfile_close (rspamd->pfh);
+                               msg_info ("starting greylist storage process %P", getpid ());
+                               start_greylist_storage (cur);
+                               break;
                        case TYPE_WORKER:
                        default:
                                setproctitle ("worker process");
@@ -529,7 +535,7 @@ spawn_workers (struct rspamd_main *rspamd)
                        cf->listen_sock = listen_sock;
                }
                
-               if (cf->type == TYPE_FUZZY) {
+               if (cf->type == TYPE_FUZZY || cf->type == TYPE_GREYLIST) {
                        if (cf->count > 1) {
                                msg_err ("cannot spawn more than 1 fuzzy storage worker, so spawn one");
                        }
@@ -555,6 +561,8 @@ get_process_type (enum process_type type)
                return "worker";
        case TYPE_FUZZY:
                return "fuzzy";
+       case TYPE_GREYLIST:
+               return "greylist";
        case TYPE_CONTROLLER:
                return "controller";
        case TYPE_LMTP:
index d5f9714683cd47be511dfeb0fa73036e0a7b6a97..89169da109d9e8dab0cdb44ece7ba35dbc691ba1 100644 (file)
@@ -48,7 +48,8 @@ enum process_type {
        TYPE_CONTROLLER,
        TYPE_LMTP,
        TYPE_SMTP,
-       TYPE_FUZZY
+       TYPE_FUZZY,
+       TYPE_GREYLIST
 };
 
 
@@ -246,6 +247,7 @@ struct c_module {
 
 void start_worker (struct rspamd_worker *worker);
 void start_controller (struct rspamd_worker *worker);
+void start_greylist_storage (struct rspamd_worker *worker);
 
 /**
  * Register custom controller function
index e7dc86e9aa76e8aaea05a345efb46073593ad21e..a4c679f96040800ec3ee5feca8506ea9f8338292 100644 (file)
@@ -120,14 +120,13 @@ parse_command (struct worker_task *task, f_str_t * line)
        struct custom_command          *cmd;
        GList                          *cur;
 
+       task->proto_ver = RSPAMC_PROTO_1_1;
        token = separate_command (line, ' ');
        if (line == NULL || token == NULL) {
                debug_task ("bad command: %s", token);
                return -1;
        }
 
-       task->proto_ver = RSPAMC_PROTO_1_1;
-
        switch (token[0]) {
        case 'c':
        case 'C':
index 709df3a7fd8e7b7072692c245f13956087f6a5a4..06fa501b5e73fa5bb0de98690fde2c85adfce170 100644 (file)
@@ -954,7 +954,7 @@ static gboolean
 config_smtp_worker (struct rspamd_worker *worker)
 {
        struct smtp_worker_ctx         *ctx;
-       char                           *value, *err_str;
+       char                           *value;
        uint32_t                        timeout;
 
        ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
@@ -980,14 +980,9 @@ config_smtp_worker (struct rspamd_worker *worker)
        }
        if ((value = g_hash_table_lookup (worker->cf->params, "smtp_timeout")) != NULL) {
                errno = 0;
-               timeout = strtoul (value, &err_str, 10);
-               if (errno != 0 || (err_str && *err_str != '\0')) {
-                       msg_warn ("cannot parse timeout, invalid number: %s: %s", value, strerror (errno));
-               }
-               else {
-                       ctx->smtp_timeout.tv_sec = timeout / 1000;
-                       ctx->smtp_timeout.tv_usec = timeout - ctx->smtp_timeout.tv_sec * 1000;
-               }
+               timeout = parse_seconds (value);
+               ctx->smtp_timeout.tv_sec = timeout / 1000;
+               ctx->smtp_timeout.tv_usec = timeout - ctx->smtp_timeout.tv_sec * 1000;
        }
        if ((value = g_hash_table_lookup (worker->cf->params, "smtp_delay")) != NULL) {
                ctx->smtp_delay = parse_seconds (value);