pid_t wrk_pid;
gpointer ud;
gint attached_fd;
+ GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
static void rspamd_control_ignore_io_handler (int fd, short what, void *ud);
+static void
+rspamd_control_stop_pending (struct rspamd_control_reply_elt *elt)
+{
+ GHashTable *htb;
+ /* It stops event and frees hash */
+ htb = elt->pending_elts;
+ g_hash_table_remove (elt->pending_elts, elt);
+ /* Release hash reference */
+ g_hash_table_unref (htb);
+}
+
void
rspamd_control_send_error (struct rspamd_control_session *session,
gint code, const gchar *error_msg, ...)
rspamd_inet_address_to_string (session->addr));
DL_FOREACH_SAFE (session->replies, elt, telt) {
- rspamd_ev_watcher_stop (session->event_loop,
- &elt->ev);
- g_free (elt);
+ rspamd_control_stop_pending (elt);
}
rspamd_inet_address_free (session->addr);
}
}
+void
+rspamd_pending_control_free (gpointer p)
+{
+ struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *)p;
+
+ rspamd_ev_watcher_stop (rep_elt->event_loop, &rep_elt->ev);
+ g_free (rep_elt);
+}
+
static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
+ rep_elt->pending_elts = g_hash_table_ref (wrk->control_events_pending);
rspamd_ev_watcher_init (&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start (rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
+ g_hash_table_insert (wrk->control_events_pending, rep_elt, rep_elt);
DL_APPEND (res, rep_elt);
}
{
struct rspamd_control_reply_elt *elt =
(struct rspamd_control_reply_elt *)ud;
+
struct rspamd_control_reply rep;
/* At this point we just ignore replies from the workers */
(void)read (fd, &rep, sizeof (rep));
- rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
- g_free (elt);
+ rspamd_control_stop_pending (elt);
}
static void
/* At this point we just ignore replies from the workers */
(void) read (fd, &rep, sizeof (rep));
- rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
- g_free (elt);
+ rspamd_control_stop_pending (elt);
}
static void
REF_RELEASE (child->cf);
g_hash_table_remove (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid));
+ g_hash_table_unref (child->control_events_pending);
g_free (child);
}
else {
child->cf = parent->cf;
child->ppid = parent->pid;
REF_RETAIN (child->cf);
+ child->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL, rspamd_pending_control_free);
g_hash_table_insert (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
}