]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Implement event watchers for upstreams
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 4 Dec 2018 17:05:18 +0000 (17:05 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 4 Dec 2018 17:05:18 +0000 (17:05 +0000)
src/libutil/upstream.c
src/libutil/upstream.h

index 4ed657da48baf7f63477ca162472f1c54dfe9abf..90f792bbeca94161b25271b631dfee362f65e8ce 100644 (file)
@@ -34,6 +34,13 @@ struct upstream_addr_elt {
        guint errors;
 };
 
+struct upstream_list_watcher {
+       rspamd_upstream_watch_func func;
+       gpointer ud;
+       enum rspamd_upstreams_watch_event events_mask;
+       struct upstream_list_watcher *next, *prev;
+};
+
 struct upstream {
        guint weight;
        guint cur_weight;
@@ -73,6 +80,7 @@ struct upstream_list {
        struct upstream_ctx *ctx;
        GPtrArray *ups;
        GPtrArray *alive;
+       struct upstream_list_watcher *watchers;
        rspamd_mutex_t *lock;
        guint64 hash_seed;
        struct upstream_limits limits;
@@ -109,8 +117,9 @@ static guint default_dns_retransmits = 2;
 
 void
 rspamd_upstreams_library_config (struct rspamd_config *cfg,
-               struct upstream_ctx *ctx, struct event_base *ev_base,
-               struct rdns_resolver *resolver)
+                                                                struct upstream_ctx *ctx,
+                                                                struct event_base *ev_base,
+                                                                struct rdns_resolver *resolver)
 {
        g_assert (ctx != NULL);
        g_assert (cfg != NULL);
@@ -405,6 +414,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
        guint i;
        struct upstream *cur;
        struct timeval tv;
+       struct upstream_list_watcher *w;
 
        RSPAMD_UPSTREAM_LOCK (ls->lock);
        g_ptr_array_remove_index (ls->alive, up->active_idx);
@@ -431,6 +441,12 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
                event_add (&up->ev, &tv);
        }
 
+       DL_FOREACH (up->ls->watchers, w) {
+               if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
+                       w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud);
+               }
+       }
+
        RSPAMD_UPSTREAM_UNLOCK (ls->lock);
 }
 
@@ -440,6 +456,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
        gdouble error_rate, max_error_rate;
        gdouble sec_last, sec_cur;
        struct upstream_addr_elt *addr_elt;
+       struct upstream_list_watcher *w;
 
        if (up->ctx && up->active_idx != -1) {
                sec_cur = rspamd_get_ticks (FALSE);
@@ -449,6 +466,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
                        /* We have the first error */
                        up->last_fail = sec_cur;
                        up->errors = 1;
+
+                       DL_FOREACH (up->ls->watchers, w) {
+                               if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+                                       w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
+                               }
+                       }
                }
                else {
                        sec_last = up->last_fail;
@@ -456,6 +479,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
                        if (sec_cur >= sec_last) {
                                up->errors ++;
 
+                               DL_FOREACH (up->ls->watchers, w) {
+                                       if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+                                               w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, up->errors, w->ud);
+                                       }
+                               }
+
                                if (sec_cur > sec_last) {
                                        error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
                                        max_error_rate = ((gdouble)up->ls->limits.max_errors) /
@@ -499,6 +528,7 @@ void
 rspamd_upstream_ok (struct upstream *up)
 {
        struct upstream_addr_elt *addr_elt;
+       struct upstream_list_watcher *w;
 
        RSPAMD_UPSTREAM_LOCK (up->lock);
        if (up->errors > 0 && up->active_idx != -1) {
@@ -509,6 +539,12 @@ rspamd_upstream_ok (struct upstream *up)
                        addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
                        addr_elt->errors = 0;
                }
+
+               DL_FOREACH (up->ls->watchers, w) {
+                       if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
+                               w->func (up, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
+                       }
+               }
        }
 
        RSPAMD_UPSTREAM_UNLOCK (up->lock);
@@ -831,6 +867,7 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
 {
        guint i;
        struct upstream *up;
+       struct upstream_list_watcher *w, *tmp;
 
        if (ups != NULL) {
                g_ptr_array_free (ups->alive, TRUE);
@@ -841,6 +878,10 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
                        REF_RELEASE (up);
                }
 
+               DL_FOREACH_SAFE (ups->watchers, w, tmp) {
+                       g_free (w);
+               }
+
                g_ptr_array_free (ups->ups, TRUE);
                rspamd_mutex_free (ups->lock);
                g_free (ups);
@@ -852,6 +893,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
 {
        struct upstream *up = (struct upstream *)elt;
        struct upstream_list *ups = (struct upstream_list *)ls;
+       struct upstream_list_watcher *w;
 
        /* Here the upstreams list is already locked */
        RSPAMD_UPSTREAM_LOCK (up->lock);
@@ -862,6 +904,13 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
        g_ptr_array_add (ups->alive, up);
        up->active_idx = ups->alive->len - 1;
        RSPAMD_UPSTREAM_UNLOCK (up->lock);
+
+       DL_FOREACH (up->ls->watchers, w) {
+               if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
+                       w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
+               }
+       }
+
        /* For revive event */
        REF_RELEASE (up);
 }
@@ -1125,3 +1174,20 @@ rspamd_upstreams_set_limits (struct upstream_list *ups,
                ups->limits.dns_retransmits = dns_retransmits;
        }
 }
+
+void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
+                                                                                 enum rspamd_upstreams_watch_event events,
+                                                                                 rspamd_upstream_watch_func func,
+                                                                                 gpointer ud)
+{
+       struct upstream_list_watcher *nw;
+
+       g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
+
+       nw = g_malloc (sizeof (*nw));
+       nw->func = func;
+       nw->events_mask = events;
+       nw->ud = ud;
+
+       DL_APPEND (ups->watchers, nw);
+}
index 9b5c7794c6c0a8a9341b5a6c1f45321bf2674dd4..56d6fa6c569c859510d735fe4c3b4ad435ea9a12 100644 (file)
@@ -181,6 +181,31 @@ typedef void (*rspamd_upstream_traverse_func) (struct upstream *up, guint idx,
 void rspamd_upstreams_foreach (struct upstream_list *ups,
                rspamd_upstream_traverse_func cb, void *ud);
 
+enum rspamd_upstreams_watch_event {
+       RSPAMD_UPSTREAM_WATCH_SUCCESS = 1u << 0,
+       RSPAMD_UPSTREAM_WATCH_FAILURE = 1u << 1,
+       RSPAMD_UPSTREAM_WATCH_OFFLINE = 1u << 2,
+       RSPAMD_UPSTREAM_WATCH_ONLINE = 1u << 3,
+       RSPAMD_UPSTREAM_WATCH_ALL = (1u << 0) | (1u << 1) | (1u << 2) | (1u << 3),
+};
+
+typedef void (*rspamd_upstream_watch_func) (struct upstream *up,
+                                                                                       enum rspamd_upstreams_watch_event event,
+                                                                                       guint cur_errors,
+                                                                                       void *ud);
+
+/**
+ * Adds new watcher to the upstreams list
+ * @param ups
+ * @param events
+ * @param func
+ * @param ud
+ */
+void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
+                                                                                 enum rspamd_upstreams_watch_event events,
+                                                                                 rspamd_upstream_watch_func func,
+                                                                                 gpointer ud);
+
 /**
  * Returns the current IP address of the upstream
  * @param up