#include "contrib/libev/ev.h"
+/* Forward declaration */
+static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
+ struct ev_loop *);
+
static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
/**
* Return worker's control structure by its type
rspamd_worker_init_signals (worker, event_loop);
rspamd_control_worker_add_default_handler (worker, event_loop);
+ rspamd_worker_heartbeat_start (worker, event_loop);
#ifdef WITH_HIREDIS
rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
worker->srv->cfg, event_loop);
}
}
+static void
+rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+}
+
+static void
+rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+ wrk->hb.heartbeat_ev.data = (void *)wrk;
+ ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb,
+ 0.0, wrk->srv->cfg->heartbeat_interval);
+ ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
+static void
+rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+ gdouble time_from_last = ev_time ();
+ struct rspamd_main *rspamd_main;
+ struct tm tm;
+ gchar timebuf[64];
+ gchar usec_buf[16];
+ gint r;
+
+ time_from_last -= wrk->hb.last_event;
+ rspamd_main = wrk->srv;
+
+ if (time_from_last > 0 && time_from_last > rspamd_main->cfg->heartbeat_interval) {
+ rspamd_localtime (wrk->hb.last_event, &tm);
+ r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+ rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+ wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+ rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+ "%s", usec_buf + 1);
+
+ if (wrk->hb.nbeats > 0) {
+ /* First time lost event */
+ msg_warn_main ("lost heartbeat from worker type %s with pid %P, "
+ "last beat on: %s (%L beats received previously)",
+ g_quark_to_string (wrk->type), wrk->pid,
+ timebuf,
+ wrk->hb.nbeats);
+ wrk->hb.nbeats = -1;
+ /* TODO: send notify about worker problem */
+ }
+ else {
+ wrk->hb.nbeats --;
+ msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, "
+ "last beat on: %s",
+ -(wrk->hb.nbeats),
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ timebuf);
+ }
+ }
+ else if (wrk->hb.nbeats < 0) {
+ rspamd_localtime (wrk->hb.last_event, &tm);
+ r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+ rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+ wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+ rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+ "%s", usec_buf + 1);
+
+ msg_info_main ("received heartbeat from worker type %s with pid %P, "
+ "last beat on: %s (%L beats lost previously)",
+ g_quark_to_string (wrk->type), wrk->pid,
+ timebuf,
+ -(wrk->hb.nbeats));
+ wrk->hb.nbeats = 1;
+ /* TODO: send notify about worker restoration */
+ }
+}
+
+static void
+rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+ wrk->hb.heartbeat_ev.data = (void *)wrk;
+ ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb,
+ 0.0, wrk->srv->cfg->heartbeat_interval * 2);
+ ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
struct rspamd_worker *
rspamd_fork_worker (struct rspamd_main *rspamd_main,
struct rspamd_worker_conf *cf,
wrk->cld_ev.data = wrk;
ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
+ rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
wrk->pid), wrk);
typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *,
struct rspamd_worker *);
+struct rspamd_worker_heartbeat {
+ ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */
+ ev_tstamp last_event;
+ gint64 nbeats; /* positive for beats received, negative for beats missed */
+};
+
/**
* Worker process structure
*/
struct rspamd_main *srv; /**< pointer to server structure */
GQuark type; /**< process type */
GHashTable *signal_events; /**< signal events */
- struct rspamd_worker_accept_event *accept_events; /**< socket events */
+ struct rspamd_worker_accept_event *accept_events; /**< socket events */
struct rspamd_worker_conf *cf; /**< worker config data */
gpointer ctx; /**< worker's specific data */
enum rspamd_worker_flags flags; /**< worker's flags */
gint srv_pipe[2]; /**< used by workers to request something from the
main process. [0] - main, [1] - worker */
ev_io srv_ev; /**< used by main for read workers' requests */
+ struct rspamd_worker_heartbeat hb; /**< heartbeat data */
gpointer control_data; /**< used by control protocol to handle commands */
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
GPtrArray *finish_actions; /**< called when worker is terminated */