]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Try hard to deal with ghost workers
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 26 Mar 2020 17:36:53 +0000 (17:36 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 26 Mar 2020 17:36:53 +0000 (17:36 +0000)
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/libserver/worker_util.c
src/rspamd.c
src/rspamd.h

index 4b2cfc733b4285cb2c9ed056603d33ae7bca85e4..30d959e476b09bb3cb92441e1521705973938745 100644 (file)
@@ -40,6 +40,7 @@ struct rspamd_control_reply_elt {
        pid_t wrk_pid;
        gpointer ud;
        gint attached_fd;
+       GHashTable *pending_elts;
        struct rspamd_control_reply_elt *prev, *next;
 };
 
@@ -105,6 +106,17 @@ static const struct rspamd_control_cmd_match {
 
 static void rspamd_control_ignore_io_handler (int fd, short what, void *ud);
 
+static void
+rspamd_control_stop_pending (struct rspamd_control_reply_elt *elt)
+{
+       GHashTable *htb;
+       /* It stops event and frees hash */
+       htb = elt->pending_elts;
+       g_hash_table_remove (elt->pending_elts, elt);
+       /* Release hash reference */
+       g_hash_table_unref (htb);
+}
+
 void
 rspamd_control_send_error (struct rspamd_control_session *session,
                gint code, const gchar *error_msg, ...)
@@ -168,9 +180,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
                        rspamd_inet_address_to_string (session->addr));
 
        DL_FOREACH_SAFE (session->replies, elt, telt) {
-               rspamd_ev_watcher_stop (session->event_loop,
-                               &elt->ev);
-               g_free (elt);
+               rspamd_control_stop_pending (elt);
        }
 
        rspamd_inet_address_free (session->addr);
@@ -385,6 +395,15 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
        }
 }
 
+void
+rspamd_pending_control_free (gpointer p)
+{
+       struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *)p;
+
+       rspamd_ev_watcher_stop (rep_elt->event_loop, &rep_elt->ev);
+       g_free (rep_elt);
+}
+
 static struct rspamd_control_reply_elt *
 rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
                                                          struct rspamd_control_command *cmd,
@@ -443,12 +462,14 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
                        rep_elt->wrk_type = wrk->type;
                        rep_elt->event_loop = rspamd_main->event_loop;
                        rep_elt->ud = ud;
+                       rep_elt->pending_elts = g_hash_table_ref (wrk->control_events_pending);
                        rspamd_ev_watcher_init (&rep_elt->ev,
                                        wrk->control_pipe[0],
                                        EV_READ, handler,
                                        rep_elt);
                        rspamd_ev_watcher_start (rspamd_main->event_loop,
                                        &rep_elt->ev, worker_io_timeout);
+                       g_hash_table_insert (wrk->control_events_pending, rep_elt, rep_elt);
 
                        DL_APPEND (res, rep_elt);
                }
@@ -750,12 +771,12 @@ rspamd_control_ignore_io_handler (int fd, short what, void *ud)
 {
        struct rspamd_control_reply_elt *elt =
                        (struct rspamd_control_reply_elt *)ud;
+
        struct rspamd_control_reply rep;
 
        /* At this point we just ignore replies from the workers */
        (void)read (fd, &rep, sizeof (rep));
-       rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
-       g_free (elt);
+       rspamd_control_stop_pending (elt);
 }
 
 static void
@@ -767,8 +788,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
 
        /* At this point we just ignore replies from the workers */
        (void) read (fd, &rep, sizeof (rep));
-       rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
-       g_free (elt);
+       rspamd_control_stop_pending (elt);
 }
 
 static void
@@ -802,6 +822,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
                REF_RELEASE (child->cf);
                g_hash_table_remove (srv->workers,
                                GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid));
+               g_hash_table_unref (child->control_events_pending);
                g_free (child);
        }
        else {
@@ -816,6 +837,8 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
                child->cf = parent->cf;
                child->ppid = parent->pid;
                REF_RETAIN (child->cf);
+               child->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+                               NULL, rspamd_pending_control_free);
                g_hash_table_insert (srv->workers,
                                GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
        }
index d1ce88f3166097e8e460fbbc03608ed43dbba24c..21ab1a663ae23a9a1cc2a896f09d53bbf644676c 100644 (file)
@@ -274,6 +274,12 @@ enum rspamd_control_type rspamd_control_command_from_string (const gchar *str);
  */
 const gchar *rspamd_control_command_to_string (enum rspamd_control_type cmd);
 
+/**
+ * Used to cleanup pending events
+ * @param p
+ */
+void rspamd_pending_control_free (gpointer p);
+
 #ifdef  __cplusplus
 }
 #endif
index d97190f2ba0e75b3694349b7c5a66a00d10e5517..5a2234f29eee74cd8ba42b521f3dbd775170a76c 100644 (file)
@@ -466,6 +466,7 @@ rspamd_worker_init_signals (struct rspamd_worker *worker,
                        rspamd_worker_usr2_handler, NULL);
 }
 
+
 struct ev_loop *
 rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
                                           rspamd_accept_handler hdl)
@@ -979,6 +980,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
        wrk->pid = fork ();
        wrk->cores_throttled = rspamd_main->cores_throttling;
        wrk->term_handler = term_handler;
+       wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+                       NULL, rspamd_pending_control_free);
 
        switch (wrk->pid) {
        case 0:
index dee990c419169a2aa941582ec317207129d3ff82..fb3b93e365ca7dd1d9c5263ba583e4fbf263369d 100644 (file)
@@ -1181,6 +1181,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
        }
 
        REF_RELEASE (wrk->cf);
+       g_hash_table_unref (wrk->control_events_pending);
        g_free (wrk);
 }
 
index 8885480c2a7af70aed328bc9a81469f15b5f1f02..9e50c054a0324613a36d797674993cdf9eb99e35 100644 (file)
@@ -121,6 +121,7 @@ struct rspamd_worker {
        gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
        ev_child cld_ev;                /**< to allow reaping                                                           */
        rspamd_worker_term_cb term_handler; /**< custom term handler                                            */
+       GHashTable *control_events_pending; /**< control events pending indexed by ptr          */
 };
 
 struct rspamd_abstract_worker_ctx {