]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Try to fix more issues
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 20 Jun 2019 14:07:58 +0000 (15:07 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
src/libserver/rspamd_control.c
src/libserver/worker_util.c
src/libutil/map.c
src/rspamd.c

index 62ca2464372b634bdfc9c6f39514a333d72b8c42..1d161f6bc9fb7ff6b539317a2e652d8ee521f2d5 100644 (file)
@@ -727,7 +727,7 @@ rspamd_control_hs_io_handler (int fd, short what, void *ud)
 
        /* At this point we just ignore replies from the workers */
        (void)read (fd, &rep, sizeof (rep));
-       rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
+       rspamd_ev_watcher_stop (elt->wrk->srv->event_loop, &elt->ev);
        g_free (elt);
 }
 
@@ -740,7 +740,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
 
        /* At this point we just ignore replies from the workers */
        (void) read (fd, &rep, sizeof (rep));
-       rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
+       rspamd_ev_watcher_stop (elt->wrk->srv->event_loop, &elt->ev);
        g_free (elt);
 }
 
index 70d349c2c4c1f7b81aaff6f5192bf812b5126351..f7b4ee9ab29070c2b41e1840b15f24b718eb655f 100644 (file)
@@ -321,6 +321,8 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
 
        event_loop = ev_default_loop (EVFLAG_SIGNALFD);
 
+       worker->srv->event_loop = event_loop;
+
        rspamd_worker_init_signals (worker, event_loop);
        rspamd_control_worker_add_default_handler (worker, event_loop);
 #ifdef WITH_HIREDIS
index 3ca94806ff8dc37e18bb609fd04db82eede8f182..eadf0279cebc5ac794c38d01714c3b1be6d9ed2a 100644 (file)
@@ -274,7 +274,12 @@ free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new)
 
 
        MAP_RELEASE (cbd->bk, "rspamd_map_backend");
-       MAP_RELEASE (periodic, "periodic");
+
+       if (periodic) {
+               /* Detached in case of HTTP error */
+               MAP_RELEASE (periodic, "periodic");
+       }
+
        g_free (cbd);
 }
 
@@ -325,7 +330,11 @@ http_map_error (struct rspamd_http_connection *conn,
                        cbd->bk->uri,
                        cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
                        err);
+       MAP_RETAIN (cbd->periodic, "periodic");
        rspamd_map_process_periodic (cbd->periodic);
+       MAP_RELEASE (cbd->periodic, "periodic");
+       /* Detach periodic as rspamd_map_process_periodic will destroy it */
+       cbd->periodic = NULL;
        MAP_RELEASE (cbd, "http_callback_data");
 }
 
@@ -2236,6 +2245,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
        switch (bk->protocol) {
        case MAP_PROTO_FILE:
                if (bk->data.fd) {
+                       ev_stat_stop (ev_default_loop (0), &bk->data.fd->st_ev);
                        g_free (bk->data.fd->filename);
                        g_free (bk->data.fd);
                }
index 765b4bd2bc189c06dffa7722ec1407726e6de5b1..d079617571baea03d1551aa1c41e0fc0cb679669 100644 (file)
@@ -719,6 +719,7 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
        if (!w->wanna_die) {
                w->wanna_die = TRUE;
                kill (w->pid, SIGUSR2);
+               ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
                msg_info_main ("send signal to worker %P", w->pid);
        }
        else {
@@ -727,9 +728,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
 }
 
 static gboolean
-wait_for_workers (gpointer key, gpointer value, gpointer unused)
+rspamd_worker_wait (struct rspamd_worker *w)
 {
-       struct rspamd_worker *w = value;
        struct rspamd_main *rspamd_main;
        gint res = 0;
        gboolean nowait = FALSE;
@@ -756,7 +756,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
                                if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
                                        if (term_attempts % 10 == 0) {
                                                msg_info_main ("waiting for worker %s(%P) to sync, "
-                                                               "%d seconds remain",
+                                                                          "%d seconds remain",
                                                                g_quark_to_string (w->type), w->pid,
                                                                (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
                                                kill (w->pid, SIGTERM);
@@ -768,7 +768,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
                                }
                                else {
                                        msg_err_main ("data corruption warning: terminating "
-                                                       "special worker %s(%P) with SIGKILL",
+                                                                 "special worker %s(%P) with SIGKILL",
                                                        g_quark_to_string (w->type), w->pid);
                                        kill (w->pid, SIGKILL);
                                        if (nowait && errno == ESRCH) {
@@ -798,7 +798,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
        msg_info_main ("%s process %P terminated %s",
                        g_quark_to_string (w->type), w->pid,
                        nowait ? "with no result available" :
-                                       (WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
+                       (WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
        if (w->srv_pipe[0] != -1) {
                /* Ugly workaround */
                if (w->tmp_data) {
@@ -817,6 +817,25 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
        return TRUE;
 }
 
+static gboolean
+hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
+{
+       return rspamd_worker_wait ((struct rspamd_worker *)value);
+}
+
+static void
+rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents)
+{
+       struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback,
+                       NULL);
+
+       if (g_hash_table_size (rspamd_main->workers) == 0) {
+               ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+       }
+}
+
+
 struct core_check_cbdata {
        struct rspamd_config *cfg;
        gsize total_count;
@@ -983,6 +1002,15 @@ do_encrypt_password (void)
        rspamd_fprintf (stderr, "use rspamadm pw for this operation\n");
 }
 
+static void
+stop_srv_ev (gpointer key, gpointer value, gpointer ud)
+{
+       struct rspamd_worker *cur = (struct rspamd_worker *)value;
+       struct rspamd_main *rspamd_main = (struct rspamd_main *)ud;
+
+       ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
+}
+
 /* Signal handlers */
 static void
 rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
@@ -991,6 +1019,8 @@ rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
 
        msg_info_main ("catch termination signal, waiting for children");
        rspamd_log_nolock (rspamd_main->logger);
+       /* Stop srv events to avoid false notifications */
+       g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
        rspamd_pass_signal (rspamd_main->workers, w->signum);
 
        ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1033,22 +1063,41 @@ rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
        guint i;
        gint res = 0;
        struct rspamd_worker *cur;
-       pid_t wrk;
-       gboolean need_refork = TRUE;
+       pid_t pid;
+       gboolean need_refork = TRUE, found_proc = FALSE;
 
        /* Turn off locking for logger */
        rspamd_log_nolock (rspamd_main->logger);
 
-       msg_info_main ("catch SIGCHLD signal, finding terminated workers");
+       msg_info_main ("got SIGCHLD signal, finding terminated workers");
        /* Remove dead child form children list */
-       while ((wrk = waitpid (0, &res, WNOHANG)) > 0) {
+       for (;;) {
+               pid = waitpid (0, &res, WNOHANG);
+
+               if (pid == -1) {
+                       if (errno != EINTR) {
+                               msg_warn_main ("got unexpected system error when waiting: %s",
+                                               strerror (errno));
+                               break;
+                       }
+                       else {
+                               continue;
+                       }
+               }
+               else if (pid == 0) {
+                       /* No more processes to wait */
+                       break;
+               }
+
+               found_proc = TRUE;
+
                if ((cur =
                                g_hash_table_lookup (rspamd_main->workers,
-                                               GSIZE_TO_POINTER (wrk))) != NULL) {
+                                               GSIZE_TO_POINTER (pid))) != NULL) {
                        /* Unlink dead process from queue and hash table */
 
                        g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
-                                       wrk));
+                                       pid));
 
                        if (cur->wanna_die) {
                                /* Do not refork workers that are intended to be terminated */
@@ -1155,14 +1204,19 @@ rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
                }
                else {
                        for (i = 0; i < other_workers->len; i++) {
-                               if (g_array_index (other_workers, pid_t, i) == wrk) {
+                               if (g_array_index (other_workers, pid_t, i) == pid) {
                                        g_array_remove_index_fast (other_workers, i);
-                                       msg_info_main ("related process %P terminated", wrk);
+                                       msg_info_main ("related process %P terminated", pid);
                                }
                        }
                }
        }
 
+       if (!found_proc) {
+               msg_err_main ("got SIGCHLD but no workers were able to be waited: %s",
+                               strerror (errno));
+       }
+
        rspamd_log_lock (rspamd_main->logger);
 }
 
@@ -1173,7 +1227,7 @@ rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
 
        term_attempts--;
 
-       g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
 
        if (g_hash_table_size (rspamd_main->workers) == 0) {
                ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1245,8 +1299,8 @@ main (gint argc, gchar **argv, gchar **env)
        GQuark type;
        rspamd_inet_addr_t *control_addr = NULL;
        struct ev_loop *event_loop;
-       ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
-       ev_io control_ev;
+       static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
+       static ev_io control_ev;
        struct rspamd_main *rspamd_main;
        gboolean skip_pid = FALSE, valgrind_mode = FALSE;
 
@@ -1568,7 +1622,10 @@ main (gint argc, gchar **argv, gchar **env)
 
 
        /* Wait for workers termination */
-       g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+       ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD);
+       ev_signal_start (event_loop, &cld_ev);
+
+       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
 
        static ev_timer ev_finale;