]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Allow listening on multiply addresses.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Jul 2013 17:42:29 +0000 (18:42 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Jul 2013 17:42:29 +0000 (18:42 +0100)
Now rspamd can listen on multiply ipv4/ipv6 addresses.
Removed legacy workers (lmtp, kvstorage) as they are never used
in production.
Try to unify workers initialization.

16 files changed:
CMakeLists.txt
config.h.in
src/cfg_file.h
src/controller.c
src/fuzzy_storage.c
src/kvstorage_server.c
src/lua_worker.c
src/main.c
src/main.h
src/smtp.c
src/smtp_proxy.c
src/util.c
src/util.h
src/webui.c
src/worker.c
src/worker_util.c

index a062647740fba88aa15aa28809cf43eb2ed28a7c..c520889ef75e08133a91ef6e487bd166b2a96ff5 100644 (file)
@@ -1018,8 +1018,6 @@ INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/src" "${CMAKE_BINARY_DIR}/src")
 SET(RSPAMDSRC  src/modules.c
                                src/controller.c
                                src/fuzzy_storage.c
-                               src/kvstorage_server.c
-                               src/lmtp.c
                                src/lua_worker.c
                                src/main.c
                                src/map.c
@@ -1036,7 +1034,7 @@ SET(PLUGINSSRC    src/plugins/surbl.c
                                src/plugins/dkim_check.c)
                                
 SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage lua webui)
+SET(WORKERS_LIST normal controller smtp smtp_proxy fuzzy lua webui)
 
 AddModules(MODULES_LIST WORKERS_LIST)
 
index e930e520f0b28bdfb5312c0c53a512eeba87d9ee..69df3346998b2156e3f3c313e60dc98d3235495b 100644 (file)
@@ -467,6 +467,7 @@ typedef struct worker_s {
        gboolean unique;
        gboolean threaded;
        gboolean killable;
+       gint listen_type;
 } worker_t;
 
 extern module_t *modules[];
index 3b7059786fb9a9b95303bed8aba909a1f2b422b4..829b4b453ee1db42b2053443ca2fa3843c21a7b1 100644 (file)
@@ -257,7 +257,7 @@ struct worker_conf {
        guint16 bind_port;                                                              /**< bind port in case of TCP socket                                    */
        guint16 bind_family;                                                    /**< bind type (AF_UNIX or AF_INET)                                             */
        guint16 count;                                                                  /**< number of workers                                                                  */
-       gint listen_sock;                                                               /**< listening socket desctiptor                                                */
+       GList *listen_socks;                                                    /**< listening sockets desctiptors                                              */
        guint32 rlimit_nofile;                                                  /**< max files limit                                                                    */
        guint32 rlimit_maxcore;                                                 /**< maximum core file size                                                             */
        GHashTable *params;                                                             /**< params for worker                                                                  */
index 45c0264255b9964b2e3b56caf76f3fde9889a0c6..b712895d2b45cfba24b2f881f4ccd5a43ff78405 100644 (file)
@@ -60,7 +60,8 @@ worker_t controller_worker = {
        TRUE,                                           /* Has socket */
        FALSE,                                          /* Non unique */
        FALSE,                                          /* Non threaded */
-       TRUE                                            /* Killable */
+       TRUE,                                           /* Killable */
+       SOCK_STREAM                                     /* TCP socket */
 };
 
 enum command_type {
@@ -171,7 +172,7 @@ sigusr2_handler (gint fd, short what, void *arg)
        tv.tv_usec = 0;
        event_del (&worker->sig_ev_usr1);
        event_del (&worker->sig_ev_usr2);
-       event_del (&worker->bind_ev);
+       worker_stop_accept (worker);
        msg_info ("controller's shutdown is pending in %d sec", 2);
        event_loopexit (&tv);
        return;
@@ -1886,22 +1887,15 @@ init_controller (void)
 void
 start_controller (struct rspamd_worker *worker)
 {
-       struct sigaction                signals;
        gchar                          *hostbuf;
        gsize                           hostmax;
-       struct rspamd_controller_ctx   *ctx;
+       struct rspamd_controller_ctx   *ctx = worker->ctx;
        GError                         *err = NULL;
        struct timeval                  tv;
 
-       worker->srv->pid = getpid ();
-       ctx = worker->ctx;
-
-       ctx->ev_base = event_init ();
+       ctx->ev_base = prepare_worker (worker, "controller", sig_handler, accept_socket);
        g_mime_init (0);
 
-       init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
        event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
@@ -1949,16 +1943,10 @@ start_controller (struct rspamd_worker *worker)
        gethostname (hostbuf, hostmax);
        hostbuf[hostmax - 1] = '\0';
        rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
-       /* Accept event */
-       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
-       event_base_set (ctx->ev_base, &worker->bind_ev);
-       event_add (&worker->bind_ev, NULL);
 
        start_map_watch (worker->srv->cfg, ctx->ev_base);
        ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
-       gperf_profiler_init (worker->srv->cfg, "controller");
-
        event_base_loop (ctx->ev_base, 0);
 
        close_log (worker->srv->logger);
index c5ceb1b1c37fa780b725a16af779af7c61af08db..7bd950014fe26f6a50c3baa02f2ed403eabe72c4 100644 (file)
@@ -71,10 +71,11 @@ worker_t fuzzy_worker = {
        "fuzzy",                                        /* Name */
        init_fuzzy,                                     /* Init function */
        start_fuzzy,                            /* Start function */
-       FALSE,                                          /* No socket */
+       TRUE,                                           /* No socket */
        TRUE,                                           /* Unique */
        TRUE,                                           /* Threaded */
-       FALSE                                           /* Non killable */
+       FALSE,                                          /* Non killable */
+       SOCK_DGRAM                                      /* UDP socket */
 };
 
 static GQueue                  *hashes[BUCKETS];
@@ -364,8 +365,6 @@ sigterm_handler (gint fd, short what, void *arg)
        ctx = worker->ctx;
        event_del (&worker->sig_ev_usr1);
        event_del (&worker->sig_ev_usr2);
-       event_del (&worker->bind_ev);
-       close (worker->cf->listen_sock);
 
        rspamd_mutex_lock (ctx->update_mtx);
        mods = ctx->max_mods + 1;
@@ -392,8 +391,7 @@ sigusr2_handler (gint fd, short what, void *arg)
        tv.tv_usec = 0;
        event_del (&worker->sig_ev_usr1);
        event_del (&worker->sig_ev_usr2);
-       event_del (&worker->bind_ev);
-       close (worker->cf->listen_sock);
+       worker_stop_accept (worker);
        msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
        rspamd_mutex_lock (ctx->update_mtx);
        mods = ctx->max_mods + 1;
@@ -1025,21 +1023,14 @@ init_fuzzy (void)
 void
 start_fuzzy (struct rspamd_worker *worker)
 {
-       struct sigaction                 signals;
        struct event                     sev;
-       gint                              retries = 0;
        struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
        GError                          *err = NULL;
 
-       worker->srv->pid = getpid ();
-
-       ctx->ev_base = event_init ();
+       ctx->ev_base = prepare_worker (worker, "controller", sig_handler, accept_fuzzy_socket);
 
        server_stat = worker->srv->stat;
 
-       init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
        event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
@@ -1054,16 +1045,6 @@ start_fuzzy (struct rspamd_worker *worker)
        event_base_set (ctx->ev_base, &sev);
        signal_add (&sev, NULL);
 
-       /* Listen event */
-       while ((worker->cf->listen_sock =
-                       make_universal_socket (worker->cf->bind_addr, worker->cf->bind_port, SOCK_DGRAM, TRUE, TRUE, FALSE)) == -1) {
-               sleep (1);
-               if (++retries > MAX_RETRIES) {
-                       msg_err ("cannot bind to socket, exiting");
-                       exit (0);
-               }
-       }
-
        /* Init bloom filter */
        bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
        /* Try to read hashes from file */
@@ -1078,10 +1059,6 @@ start_fuzzy (struct rspamd_worker *worker)
        tmv.tv_usec = 0;
        evtimer_add (&tev, &tmv);
 
-       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
-       event_base_set (ctx->ev_base, &worker->bind_ev);
-       event_add (&worker->bind_ev, NULL);
-
        /* Create radix tree */
        if (ctx->update_map != NULL) {
                if (!add_map (worker->srv->cfg, ctx->update_map, "Allow fuzzy updates from specified addresses",
@@ -1095,8 +1072,6 @@ start_fuzzy (struct rspamd_worker *worker)
        /* Maps events */
        start_map_watch (worker->srv->cfg, ctx->ev_base);
 
-       gperf_profiler_init (worker->srv->cfg, "fuzzy");
-
        ctx->update_thread = rspamd_create_thread ("fuzzy update", sync_cache, worker, &err);
        if (ctx->update_thread == NULL) {
                msg_err ("error creating update thread: %s", err->message);
index 92090d4f6f3f56401b50abf6f3a4e19f449f17b5..b493eee46ed7698f85012a935927e5026c856058 100644 (file)
@@ -74,7 +74,8 @@ worker_t keystorage_worker = {
        TRUE,                                           /* Has socket */
        FALSE,                                          /* Non unique */
        TRUE,                                           /* Non threaded */
-       FALSE                                           /* Non killable */
+       FALSE,                                          /* Non killable */
+       SOCK_STREAM                                     /* TCP socket */
 };
 
 #ifndef HAVE_SA_SIGINFO
index 0cadb7eb2d1299a56af99d8a9beeb71b511b834c..6262af85bd243b64b5b8acd8cbaef0805e31c9d8 100644 (file)
@@ -53,7 +53,8 @@ worker_t lua_worker = {
        TRUE,                                   /* Has socket */
        FALSE,                                  /* Non unique */
        FALSE,                                  /* Non threaded */
-       TRUE                                    /* Killable */
+       TRUE,                                   /* Killable */
+       SOCK_STREAM                             /* TCP socket */
 };
 
 /*
@@ -302,7 +303,7 @@ sigusr2_handler (gint fd, short what, void *arg)
                tv.tv_usec = 0;
                event_del (&worker->sig_ev_usr1);
                event_del (&worker->sig_ev_usr2);
-               event_del (&worker->bind_ev);
+               worker_stop_accept (worker);
                msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
                event_loopexit (&tv);
        }
@@ -439,7 +440,6 @@ init_lua_worker (void)
 void
 start_lua_worker (struct rspamd_worker *worker)
 {
-       struct sigaction                signals;
        struct rspamd_lua_worker_ctx   *ctx = worker->ctx, **pctx;
        lua_State                                          *L;
 
@@ -448,18 +448,12 @@ start_lua_worker (struct rspamd_worker *worker)
        monstartup ((u_long) & _start, (u_long) & etext);
 #endif
 
-       gperf_profiler_init (worker->srv->cfg, "lua_worker");
-
-       worker->srv->pid = getpid ();
-
-       ctx->ev_base = event_init ();
+       ctx->ev_base = prepare_worker (worker, "lua_worker", sig_handler, lua_accept_socket);
 
        L = worker->srv->cfg->lua_state;
        ctx->L = L;
        ctx->cfg = worker->srv->cfg;
 
-       init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
@@ -471,12 +465,6 @@ start_lua_worker (struct rspamd_worker *worker)
        event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
        signal_add (&worker->sig_ev_usr1, NULL);
 
-       /* Accept event */
-       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
-                       lua_accept_socket, (void *) worker);
-       event_base_set (ctx->ev_base, &worker->bind_ev);
-       event_add (&worker->bind_ev, NULL);
-
        ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
        /* Open worker's lib */
index 9b5a121088c5466c04c40aa9ce255ef1556acb63..6076857207f1b6d714e85084fe79f9c1496cf444 100644 (file)
@@ -550,20 +550,26 @@ dump_cfg_vars (struct config_file *cfg)
        g_hash_table_foreach (cfg->variables, dump_all_variables, NULL);
 }
 
-static gint
-create_listen_socket (const gchar *addr, gint port, gint family)
+static GList *
+create_listen_socket (const gchar *addr, gint port, gint family, gint listen_type)
 {
        gint                            listen_sock = -1;
-       /* Create listen socket */
-       listen_sock = make_universal_socket (addr, port, SOCK_STREAM, TRUE, TRUE, TRUE);
-
-       if (listen_sock != -1) {
-               if (listen (listen_sock, -1) == -1) {
-                       msg_err ("cannot listen on socket. %s", strerror (errno));
+       GList                          *result, *cur;
+       /* Create listen sockets */
+       result = make_universal_sockets_list (addr, port, listen_type, TRUE, TRUE, TRUE);
+
+       cur = result;
+       while (cur != NULL) {
+               listen_sock = GPOINTER_TO_INT (cur->data);
+               if (listen_sock != -1) {
+                       if (listen (listen_sock, -1) == -1) {
+                               msg_err ("cannot listen on socket. %s", strerror (errno));
+                       }
                }
+               cur = g_list_next (cur);
        }
 
-       return listen_sock;
+       return result;
 }
 
 static void
@@ -596,9 +602,9 @@ make_listen_key (const gchar *addr, gint port, gint family)
 static void
 spawn_workers (struct rspamd_main *rspamd)
 {
-       GList                          *cur;
+       GList                          *cur, *ls;
        struct worker_conf             *cf;
-       gint                            i, listen_sock;
+       gint                            i;
        gpointer                        p;
 
        cur = rspamd->cfg->workers;
@@ -614,19 +620,20 @@ spawn_workers (struct rspamd_main *rspamd)
                                if ((p = g_hash_table_lookup (listen_sockets, GINT_TO_POINTER (
                                                make_listen_key (cf->bind_addr, cf->bind_port, cf->bind_family)))) == NULL) {
                                        /* Create listen socket */
-                                       listen_sock = create_listen_socket (cf->bind_addr, cf->bind_port, cf->bind_family);
-                                       if (listen_sock == -1) {
+                                       ls = create_listen_socket (cf->bind_addr, cf->bind_port, cf->bind_family,
+                                                       cf->worker->listen_type);
+                                       if (ls == NULL) {
                                                exit (-errno);
                                        }
                                        g_hash_table_insert (listen_sockets, GINT_TO_POINTER (
                                                        make_listen_key (cf->bind_addr, cf->bind_port, cf->bind_family)),
-                                                       GINT_TO_POINTER (listen_sock));
+                                                       ls);
                                }
                                else {
                                        /* We had socket for this type of worker */
-                                       listen_sock = GPOINTER_TO_INT (p);
+                                       ls = p;
                                }
-                               cf->listen_sock = listen_sock;
+                               cf->listen_socks = ls;
                        }
 
                        if (cf->worker->unique) {
index 521e0740a582fc77b249aa9aa090ae3906f74e41..052c6ce6d63106fc949feeaa2bfd6116f875d596 100644 (file)
@@ -53,7 +53,7 @@ struct rspamd_worker {
        GQuark type;                                                                                            /**< process type                                                                       */
        struct event sig_ev_usr1;                                                                       /**< signals event                                                                      */
        struct event sig_ev_usr2;                                                                       /**< signals event                                                                      */
-       struct event bind_ev;                                                                           /**< socket events                                                                      */
+       GList *accept_events;                                                                           /**< socket events                                                                      */
        struct worker_conf *cf;                                                                         /**< worker config data                                                         */
        gpointer ctx;                                                                                           /**< worker's specific data                                                     */
 };
@@ -321,6 +321,31 @@ void free_task_soft (gpointer ud);
  */
 double set_counter (const gchar *name, guint32 value);
 
+#ifndef HAVE_SA_SIGINFO
+typedef void (*rspamd_sig_handler_t) (gint);
+#else
+typedef void (*rspamd_sig_handler_t) (gint, siginfo_t *, void *);
+#endif
+
+/**
+ * Prepare worker's startup
+ * @param worker worker structure
+ * @param name name of the worker
+ * @param sig_handler handler of main signals
+ * @param accept_handler handler of accept event for listen sockets
+ * @return event base suitable for a worker
+ */
+struct event_base *
+prepare_worker (struct rspamd_worker *worker, const char *name,
+               rspamd_sig_handler_t sig_handler,
+               void (*accept_handler)(evutil_socket_t, short, void *));
+
+/**
+ * Stop accepting new connections for a worker
+ * @param worker
+ */
+void worker_stop_accept (struct rspamd_worker *worker);
+
 #endif
 
 /* 
index 075e1b6630b5ead9fe099aa2f7d6144b3706c982..f4033c9542a6a984b7606826055611386bc47733 100644 (file)
@@ -61,7 +61,8 @@ worker_t smtp_worker = {
        TRUE,                                           /* Has socket */
        FALSE,                                          /* Non unique */
        FALSE,                                          /* Non threaded */
-       TRUE                                            /* Killable */
+       TRUE,                                           /* Killable */
+       SOCK_STREAM                                     /* TCP socket */
 };
 
 #ifndef HAVE_SA_SIGINFO
@@ -105,7 +106,7 @@ sigusr2_handler (gint fd, short what, void *arg)
                tv.tv_usec = 0;
                event_del (&worker->sig_ev_usr1);
                event_del (&worker->sig_ev_usr2);
-               event_del (&worker->bind_ev);
+               worker_stop_accept (worker);
                msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
                event_loopexit (&tv);
        }
@@ -944,13 +945,10 @@ config_smtp_worker (struct rspamd_worker *worker)
 void
 start_smtp (struct rspamd_worker *worker)
 {
-       struct sigaction                signals;
        struct smtp_worker_ctx         *ctx = worker->ctx;
 
-       gperf_profiler_init (worker->srv->cfg, "worker");
+       ctx->ev_base = prepare_worker (worker, "smtp_worker", sig_handler, accept_socket);
 
-       worker->srv->pid = getpid ();
-       ctx->ev_base = event_init ();
 
        /* Set smtp options */
        if ( !config_smtp_worker (worker)) {
@@ -958,9 +956,6 @@ start_smtp (struct rspamd_worker *worker)
                exit (EXIT_SUCCESS);
        }
 
-       init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
-
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
        event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
@@ -971,11 +966,6 @@ start_smtp (struct rspamd_worker *worker)
        event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
        signal_add (&worker->sig_ev_usr1, NULL);
 
-       /* Accept event */
-       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
-       event_base_set (ctx->ev_base, &worker->bind_ev);
-       event_add (&worker->bind_ev, NULL);
-
        /* Maps events */
        start_map_watch (worker->srv->cfg, ctx->ev_base);
 
index e90be31fe50cf69e1cc892c55ab298f8ca7d7611..d3bcd3866b874738c590f0163af271ec1cad13b9 100644 (file)
@@ -62,7 +62,8 @@ worker_t smtp_proxy_worker = {
        TRUE,                                           /* Has socket */
        FALSE,                                          /* Non unique */
        FALSE,                                          /* Non threaded */
-       TRUE                                            /* Killable */
+       TRUE,                                           /* Killable */
+       SOCK_STREAM                                     /* TCP socket */
 };
 
 struct smtp_proxy_ctx {
@@ -179,7 +180,7 @@ sigusr2_handler (gint fd, short what, void *arg)
                tv.tv_usec = 0;
                event_del (&worker->sig_ev_usr1);
                event_del (&worker->sig_ev_usr2);
-               event_del (&worker->bind_ev);
+               worker_stop_accept (worker);
                msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
                event_loopexit (&tv);
        }
@@ -1021,13 +1022,9 @@ config_smtp_proxy_worker (struct rspamd_worker *worker)
 void
 start_smtp_proxy (struct rspamd_worker *worker)
 {
-       struct sigaction                signals;
        struct smtp_proxy_ctx         *ctx = worker->ctx;
 
-       gperf_profiler_init (worker->srv->cfg, "worker");
-
-       worker->srv->pid = getpid ();
-       ctx->ev_base = event_init ();
+       ctx->ev_base = prepare_worker (worker, "smtp_proxy", sig_handler, accept_socket);
 
        /* Set smtp options */
        if ( !config_smtp_proxy_worker (worker)) {
@@ -1035,8 +1032,6 @@ start_smtp_proxy (struct rspamd_worker *worker)
                exit (EXIT_SUCCESS);
        }
 
-       init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
@@ -1048,11 +1043,6 @@ start_smtp_proxy (struct rspamd_worker *worker)
        event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
        signal_add (&worker->sig_ev_usr1, NULL);
 
-       /* Accept event */
-       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
-       event_base_set (ctx->ev_base, &worker->bind_ev);
-       event_add (&worker->bind_ev, NULL);
-
        /* DNS resolver */
        ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
index 441ce007a8b961ecf1d99e09f5d2f2fa3dd86567..4936b37dcd2ed4b550ead655ccc8edcbb39543c0 100644 (file)
@@ -85,7 +85,7 @@ poll_sync_socket (gint fd, gint timeout, short events)
 }
 
 static gint
-make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean async)
+make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean async, GList **list)
 {
        gint                            fd, r, optlen, on = 1, s_error;
        struct addrinfo               *cur;
@@ -146,7 +146,13 @@ make_inet_socket (gint type, struct addrinfo *addr, gboolean is_server, gboolean
                                goto out;
                        }
                }
-               break;
+               if (list == NULL) {
+                       /* Go out immediately */
+                       break;
+               }
+               else if (fd != -1) {
+                       *list = g_list_prepend (*list, GINT_TO_POINTER (fd));
+               }
 out:
                if (fd != -1) {
                        close (fd);
@@ -160,13 +166,13 @@ out:
 gint
 make_tcp_socket (struct addrinfo *addr, gboolean is_server, gboolean async)
 {
-       return make_inet_socket (SOCK_STREAM, addr, is_server, async);
+       return make_inet_socket (SOCK_STREAM, addr, is_server, async, NULL);
 }
 
 gint
 make_udp_socket (struct addrinfo *addr, gboolean is_server, gboolean async)
 {
-       return make_inet_socket (SOCK_DGRAM, addr, is_server, async);
+       return make_inet_socket (SOCK_DGRAM, addr, is_server, async, NULL);
 }
 
 gint
@@ -284,7 +290,7 @@ make_unix_socket (const gchar *path, struct sockaddr_un *addr, gint type, gboole
 }
 
 /**
- * Make universal stream socket
+ * Make a universal socket
  * @param credits host, ip or path to unix socket
  * @param port port (used for network sockets)
  * @param async make this socket asynced
@@ -292,7 +298,8 @@ make_unix_socket (const gchar *path, struct sockaddr_un *addr, gint type, gboole
  * @param try_resolve try name resolution for a socket (BLOCKING)
  */
 gint
-make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean async, gboolean is_server, gboolean try_resolve)
+make_universal_socket (const gchar *credits, guint16 port,
+               gint type, gboolean async, gboolean is_server, gboolean try_resolve)
 {
        struct sockaddr_un              un;
        struct stat                     st;
@@ -347,7 +354,7 @@ make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean a
 
                rspamd_snprintf (portbuf, sizeof (portbuf), "%d", (int)port);
                if ((r = getaddrinfo (credits, portbuf, &hints, &res)) == 0) {
-                       r = make_inet_socket (type, res, is_server, async);
+                       r = make_inet_socket (type, res, is_server, async, NULL);
                        freeaddrinfo (res);
                        return r;
                }
@@ -358,6 +365,120 @@ make_universal_socket (const gchar *credits, guint16 port, gint type, gboolean a
        }
 }
 
+/**
+ * Make universal stream socket
+ * @param credits host, ip or path to unix socket
+ * @param port port (used for network sockets)
+ * @param async make this socket asynced
+ * @param is_server make this socket as server socket
+ * @param try_resolve try name resolution for a socket (BLOCKING)
+ */
+GList*
+make_universal_sockets_list (const gchar *credits, guint16 port,
+               gint type, gboolean async, gboolean is_server, gboolean try_resolve)
+{
+       struct sockaddr_un              un;
+       struct stat                     st;
+       struct addrinfo                 hints, *res;
+       gint                             r, fd, serrno;
+       gchar                            portbuf[8], **strv, **cur;
+       GList                           *result = NULL, *rcur;
+
+       strv = g_strsplit_set (credits, ",", -1);
+       if (strv == NULL) {
+               msg_err ("invalid sockets credentials: %s", credits);
+               return NULL;
+       }
+       cur = strv;
+       while (*cur != NULL) {
+               if (*credits == '/') {
+                       r = stat (credits, &st);
+                       if (is_server) {
+                               if (r == -1) {
+                                       fd = make_unix_socket (credits, &un, type, is_server, async);
+                               }
+                               else {
+                                       /* Unix socket exists, it must be unlinked first */
+                                       errno = EEXIST;
+                                       goto err;
+                               }
+                       }
+                       else {
+                               if (r == -1) {
+                                       /* Unix socket doesn't exists it must be created first */
+                                       errno = ENOENT;
+                                       goto err;
+                               }
+                               else {
+                                       if ((st.st_mode & S_IFSOCK) == 0) {
+                                               /* Path is not valid socket */
+                                               errno = EINVAL;
+                                               goto err;
+                                       }
+                                       else {
+                                               fd = make_unix_socket (credits, &un, type, is_server, async);
+                                       }
+                               }
+                       }
+                       if (fd != -1) {
+                               result = g_list_prepend (result, GINT_TO_POINTER (fd));
+                       }
+                       else {
+                               goto err;
+                       }
+               }
+               else {
+                       /* TCP related part */
+                       memset (&hints, 0, sizeof (hints));
+                       hints.ai_family = AF_UNSPEC;     /* Allow IPv4 or IPv6 */
+                       hints.ai_socktype = type; /* Type of the socket */
+                       hints.ai_flags = is_server ? AI_PASSIVE : 0;
+                       hints.ai_protocol = 0;           /* Any protocol */
+                       hints.ai_canonname = NULL;
+                       hints.ai_addr = NULL;
+                       hints.ai_next = NULL;
+
+                       if (!try_resolve) {
+                               hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
+                       }
+
+                       rspamd_snprintf (portbuf, sizeof (portbuf), "%d", (int)port);
+                       if ((r = getaddrinfo (credits, portbuf, &hints, &res)) == 0) {
+                               r = make_inet_socket (type, res, is_server, async, &result);
+                               freeaddrinfo (res);
+                               if (r == -1) {
+                                       goto err;
+                               }
+                       }
+                       else {
+                               msg_err ("address resolution for %s failed: %s", credits, gai_strerror (r));
+                               goto err;
+                       }
+               }
+               cur ++;
+       }
+
+       g_strfreev (strv);
+       return result;
+
+err:
+       g_strfreev (strv);
+       serrno = errno;
+       rcur = result;
+       while (rcur != NULL) {
+               fd = GPOINTER_TO_INT (rcur->data);
+               if (fd != -1) {
+                       close (fd);
+               }
+       }
+       if (result != NULL) {
+               g_list_free (result);
+       }
+
+       errno = serrno;
+       return NULL;
+}
+
 gint
 make_socketpair (gint pair[2])
 {
index b1e07c53815767db4bd68c4b33e1e6e3488c458e..5fa2eaff955e7867417e96a8e25c1c3f8d92e391 100644 (file)
@@ -32,7 +32,7 @@ gint accept_from_socket (gint listen_sock, struct sockaddr *addr, socklen_t *len
 gint make_unix_socket (const gchar *, struct sockaddr_un *, gint type, gboolean is_server, gboolean async);
 
 /**
- * Make universal stream socket
+ * Make a universal socket
  * @param credits host, ip or path to unix socket
  * @param port port (used for network sockets)
  * @param type type of socket (SO_STREAM or SO_DGRAM)
@@ -43,6 +43,17 @@ gint make_unix_socket (const gchar *, struct sockaddr_un *, gint type, gboolean
 gint make_universal_socket (const gchar *credits, guint16 port, gint type,
                gboolean async, gboolean is_server, gboolean try_resolve);
 
+/**
+ * Make a universal sockets
+ * @param credits host, ip or path to unix socket (several items may be separated by ',')
+ * @param port port (used for network sockets)
+ * @param type type of socket (SO_STREAM or SO_DGRAM)
+ * @param async make this socket asynced
+ * @param is_server make this socket as server socket
+ * @param try_resolve try name resolution for a socket (BLOCKING)
+ */
+GList* make_universal_sockets_list (const gchar *credits, guint16 port, gint type,
+               gboolean async, gboolean is_server, gboolean try_resolve);
 /*
  * Create socketpair
  */
index 0260632fb5dfe189523dc9bbaa4f482b48e598f9..eb9cf8ba818727971ae8f73a9de4ccdfae74bc89 100644 (file)
@@ -103,7 +103,8 @@ worker_t webui_worker = {
        TRUE,                                   /* Has socket */
        TRUE,                                   /* Non unique */
        FALSE,                                  /* Non threaded */
-       TRUE                                    /* Killable */
+       TRUE,                                   /* Killable */
+       SOCK_STREAM                             /* TCP socket */
 };
 
 #if defined(LIBEVENT_EVHTTP) || (defined(_EVENT_NUMERIC_VERSION) && (_EVENT_NUMERIC_VERSION > 0x02010000))
@@ -181,7 +182,6 @@ sigusr2_handler (gint fd, short what, void *arg)
                tv.tv_usec = 0;
                event_del (&worker->sig_ev_usr1);
                event_del (&worker->sig_ev_usr2);
-               event_del (&worker->bind_ev);
                msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
                event_loopexit (&tv);
        }
@@ -1741,6 +1741,7 @@ start_webui_worker (struct rspamd_worker *worker)
 {
        struct sigaction                signals;
        struct rspamd_webui_worker_ctx *ctx = worker->ctx;
+       GList                           *cur;
 
 #ifdef WITH_PROFILER
        extern void                     _start (void), etext (void);
@@ -1773,7 +1774,12 @@ start_webui_worker (struct rspamd_worker *worker)
        ctx->worker = worker;
        /* Accept event */
        ctx->http = evhttp_new (ctx->ev_base);
-       evhttp_accept_socket (ctx->http, worker->cf->listen_sock);
+
+       cur = worker->cf->listen_socks;
+       while (cur) {
+               evhttp_accept_socket (ctx->http, GPOINTER_TO_INT (cur->data));
+               cur = g_list_next (cur);
+       }
 
        if (ctx->use_ssl) {
 #ifdef HAVE_WEBUI_SSL
index 973395f356b99d96c72089a753088883883aba94..68ef89cb5ae039e405084cf334cbb11f4b82f91c 100644 (file)
@@ -57,7 +57,8 @@ worker_t normal_worker = {
        TRUE,                                           /* Has socket */
        FALSE,                                          /* Non unique */
        FALSE,                                          /* Non threaded */
-       TRUE                                            /* Killable */
+       TRUE,                                           /* Killable */
+       SOCK_STREAM                                     /* TCP socket */
 };
 
 #ifndef BUILD_STATIC
@@ -157,7 +158,7 @@ sigusr2_handler (gint fd, short what, void *arg)
                tv.tv_usec = 0;
                event_del (&worker->sig_ev_usr1);
                event_del (&worker->sig_ev_usr2);
-               event_del (&worker->bind_ev);
+               worker_stop_accept (worker);
                msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
                event_loopexit (&tv);
        }
@@ -816,25 +817,12 @@ init_worker (void)
 void
 start_worker (struct rspamd_worker *worker)
 {
-       struct sigaction                signals;
        gchar                          *is_custom_str;
        struct rspamd_worker_ctx       *ctx = worker->ctx;
        GError                                             *err = NULL;
        struct lua_locked_state            *nL;
 
-#ifdef WITH_PROFILER
-       extern void                     _start (void), etext (void);
-       monstartup ((u_long) & _start, (u_long) & etext);
-#endif
-
-       gperf_profiler_init (worker->srv->cfg, "worker");
-
-       worker->srv->pid = getpid ();
-
-       ctx->ev_base = event_init ();
-
-       init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+       ctx->ev_base = prepare_worker (worker, "normal", sig_handler, accept_socket);
 
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
@@ -846,12 +834,6 @@ start_worker (struct rspamd_worker *worker)
        event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
        signal_add (&worker->sig_ev_usr1, NULL);
 
-       /* Accept event */
-       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
-                       accept_socket, (void *) worker);
-       event_base_set (ctx->ev_base, &worker->bind_ev);
-       event_add (&worker->bind_ev, NULL);
-
 
 #ifndef BUILD_STATIC
        /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
index 541d4f1e4a0bb2c36d9aff3a66c031f75192a432..dd020e7cc1804d49994361f3e34e23688751c471 100644 (file)
@@ -213,3 +213,66 @@ set_counter (const gchar *name, guint32 value)
 
        return cd->value;
 }
+
+struct event_base *
+prepare_worker (struct rspamd_worker *worker, const char *name,
+               rspamd_sig_handler_t sig_handler,
+               void (*accept_handler)(evutil_socket_t, short, void *))
+{
+       struct event_base                *ev_base;
+       struct event                     *accept_event;
+       struct sigaction                  signals;
+       GList                             *cur;
+       gint                               listen_socket;
+
+#ifdef WITH_PROFILER
+       extern void                     _start (void), etext (void);
+       monstartup ((u_long) & _start, (u_long) & etext);
+#endif
+
+       gperf_profiler_init (worker->srv->cfg, "worker");
+
+       worker->srv->pid = getpid ();
+
+       ev_base = event_init ();
+
+       init_signals (&signals, sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* Accept all sockets */
+       cur = worker->cf->listen_socks;
+       while (cur) {
+               listen_socket = GPOINTER_TO_INT (cur->data);
+               if (listen_socket != -1) {
+                       accept_event = g_slice_alloc0 (sizeof (struct event));
+                       event_set (accept_event, listen_socket, EV_READ | EV_PERSIST,
+                                       accept_handler, worker);
+                       event_base_set (ev_base, accept_event);
+                       event_add (accept_event, NULL);
+                       worker->accept_events = g_list_prepend (worker->accept_events, accept_event);
+               }
+               cur = g_list_next (cur);
+       }
+
+       return ev_base;
+}
+
+void
+worker_stop_accept (struct rspamd_worker *worker)
+{
+       GList                             *cur;
+       struct event                     *event;
+
+       /* Remove all events */
+       cur = worker->accept_events;
+       while (cur) {
+               event = cur->data;
+               event_del (event);
+               cur = g_list_next (cur);
+               g_slice_free1 (sizeof (struct event), event);
+       }
+
+       if (worker->accept_events != NULL) {
+               g_list_free (worker->accept_events);
+       }
+}