]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Rework upstreams library
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 3 Dec 2015 18:51:21 +0000 (18:51 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 3 Dec 2015 18:51:21 +0000 (18:51 +0000)
Now each address has its own errors count, so rspamd will prefer upstream addrs with no errors to addrs with errors. This might help to resolve issues on systems where ipv6 does not work.

src/libutil/upstream.c
test/rspamd_upstream_test.c

index 9eecb85cf8041257c8f421bbabe613e5f3d1ba53..9044d0633e1a9334ffd8ddc792ac870dfe55704e 100644 (file)
@@ -36,6 +36,11 @@ struct upstream_inet_addr_entry {
        struct upstream_inet_addr_entry *next;
 };
 
+struct upstream_addr_elt {
+       rspamd_inet_addr_t *addr;
+       guint errors;
+};
+
 struct upstream {
        guint weight;
        guint cur_weight;
@@ -51,7 +56,7 @@ struct upstream {
        struct upstream_ctx *ctx;
 
        struct {
-               GPtrArray *addr;
+               GPtrArray *addr; /* struct upstream_addr_elt */
                guint cur;
        } addrs;
 
@@ -192,12 +197,12 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
 static gint
 rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
 {
-       const rspamd_inet_addr_t **ip1 = (const rspamd_inet_addr_t **)a,
-                       **ip2 = (const rspamd_inet_addr_t **)b;
+       const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
+                       **ip2 = (const struct upstream_addr_elt **)b;
        gint w1, w2;
 
-       w1 = rspamd_upstream_af_to_weight (*ip1);
-       w2 = rspamd_upstream_af_to_weight (*ip2);
+       w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
+       w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
 
        return w2 - w1;
 }
@@ -211,6 +216,15 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
        rspamd_mutex_unlock (ls->lock);
 }
 
+static void
+rspamd_upstream_addr_elt_dtor (gpointer a)
+{
+       struct upstream_addr_elt *elt = a;
+
+       rspamd_inet_address_destroy (elt->addr);
+       g_slice_free1 (sizeof (*elt), elt);
+}
+
 static void
 rspamd_upstream_update_addrs (struct upstream *up)
 {
@@ -218,6 +232,7 @@ rspamd_upstream_update_addrs (struct upstream *up)
        guint addr_cnt;
        struct upstream_inet_addr_entry *cur, *tmp;
        GPtrArray *new_addrs;
+       struct upstream_addr_elt *addr_elt;
 
        /*
         * We need first of all get the saved port, since DNS gives us no
@@ -226,8 +241,8 @@ rspamd_upstream_update_addrs (struct upstream *up)
        rspamd_mutex_lock (up->lock);
 
        if (up->addrs.addr->len > 0 && up->new_addrs) {
-               port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr,
-                               0));
+               addr_elt = g_ptr_array_index (up->addrs.addr, 0);
+               port = rspamd_inet_address_get_port (addr_elt->addr);
 
                /* Free old addresses */
                g_ptr_array_free (up->addrs.addr, TRUE);
@@ -237,13 +252,15 @@ rspamd_upstream_update_addrs (struct upstream *up)
                LL_FOREACH (up->new_addrs, cur) {
                        addr_cnt++;
                }
-               new_addrs = g_ptr_array_new_full (addr_cnt,
-                               (GDestroyNotify) rspamd_inet_address_destroy);
+               new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor);
 
                /* Copy addrs back */
                LL_FOREACH (up->new_addrs, cur) {
                        rspamd_inet_address_set_port (cur->addr, port);
-                       g_ptr_array_add (new_addrs, cur->addr);
+                       addr_elt = g_slice_alloc (sizeof (*addr_elt));
+                       addr_elt->addr = cur->addr;
+                       addr_elt->errors = 0;
+                       g_ptr_array_add (new_addrs, addr_elt);
                }
 
                up->addrs.cur = 0;
@@ -363,6 +380,7 @@ rspamd_upstream_fail (struct upstream *up)
        struct timeval tv;
        gdouble error_rate, max_error_rate;
        gint msec_last, msec_cur;
+       struct upstream_addr_elt *addr_elt;
 
        gettimeofday (&tv, NULL);
 
@@ -392,17 +410,31 @@ rspamd_upstream_fail (struct upstream *up)
                        }
                }
        }
+
+       /* Also increase count of errors for this specific address */
+       if (up->addrs.addr) {
+               addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+               addr_elt->errors ++;
+       }
+
        rspamd_mutex_unlock (up->lock);
 }
 
 void
 rspamd_upstream_ok (struct upstream *up)
 {
+       struct upstream_addr_elt *addr_elt;
+
        rspamd_mutex_lock (up->lock);
        if (up->errors > 0 && up->active_idx != -1) {
                /* We touch upstream if and only if it is active */
                up->errors = 0;
                rspamd_upstream_set_active (up->ls, up);
+
+               if (up->addrs.addr) {
+                       addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+                       addr_elt->errors = 0;
+               }
        }
 
        rspamd_mutex_unlock (up->lock);
@@ -470,30 +502,18 @@ rspamd_upstream_dtor (struct upstream *up)
 rspamd_inet_addr_t*
 rspamd_upstream_addr (struct upstream *up)
 {
-       gint idx, next_idx, w1, w2;
-       /*
-        * We know that addresses are sorted in the way that ipv4 addresses come
-        * first. Therefore, we select only ipv4 addresses if they exist, since
-        * many systems now has poorly supported ipv6
-        */
-       idx = up->addrs.cur;
-       next_idx = (idx + 1) % up->addrs.addr->len;
-       w1 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, idx));
-       w2 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr,
-                       next_idx));
-
-       /*
-        * We don't care about the exact priorities, but we prefer ipv4/unix
-        * addresses before any ipv6 addresses
-        */
-       if (!w1 || w2) {
+       guint idx, next_idx;
+       struct upstream_addr_elt *e1, *e2;
+
+       do {
+               idx = up->addrs.cur;
+               next_idx = (idx + 1) % up->addrs.addr->len;
+               e1 = g_ptr_array_index (up->addrs.addr, idx);
+               e2 = g_ptr_array_index (up->addrs.addr, next_idx);
                up->addrs.cur = next_idx;
-       }
-       else {
-               up->addrs.cur = 0;
-       }
+       } while (e2->errors > e1->errors);
 
-       return g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+       return e2->addr;
 }
 
 const gchar*
@@ -507,15 +527,26 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
                const gchar *str, guint16 def_port, void *data)
 {
        struct upstream *up;
+       GPtrArray *addrs;
+       guint i;
+       rspamd_inet_addr_t *addr;
 
        up = g_slice_alloc0 (sizeof (*up));
 
-       if (!rspamd_parse_host_port_priority (str, &up->addrs.addr,
+       if (!rspamd_parse_host_port_priority (str, &addrs,
                        &up->weight,
                        &up->name, def_port, NULL)) {
                g_slice_free1 (sizeof (*up), up);
                return FALSE;
        }
+       else {
+               for (i = 0; i < addrs->len; i ++) {
+                       addr = g_ptr_array_index (addrs, i);
+                       rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr));
+               }
+
+               g_ptr_array_free (addrs, TRUE);
+       }
 
        g_ptr_array_add (ups->ups, up);
        up->ud = data;
@@ -537,10 +568,17 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
 gboolean
 rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr)
 {
+       struct upstream_addr_elt *elt;
        /*
         * XXX: slow and inefficient
         */
-       g_ptr_array_add (up->addrs.addr, addr);
+       if (up->addrs.addr == NULL) {
+               up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor);
+       }
+
+       elt = g_slice_alloc0 (sizeof (*elt));
+       elt->addr = addr;
+       g_ptr_array_add (up->addrs.addr, elt);
        g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
 
        return TRUE;
index a446cc05d2015984d3a1b37b062a722da1b4b648..61dcd02e97721ed72fa7510259effdb27670c21d 100644 (file)
@@ -84,6 +84,45 @@ rspamd_upstream_test_func (void)
        resolver = dns_resolver_init (NULL, ev_base, cfg);
        rspamd_upstreams_library_config (cfg, cfg->ups_ctx, ev_base, resolver->r);
 
+       /*
+        * Test v4/v6 priorities
+        */
+       nls = rspamd_upstreams_create (cfg->ups_ctx);
+       g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL));
+       up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0);
+       rspamd_parse_inet_address (&paddr, "127.0.0.2", 0);
+       g_assert (rspamd_upstream_add_addr (up, paddr));
+       rspamd_parse_inet_address (&paddr, "::1", 0);
+       g_assert (rspamd_upstream_add_addr (up, paddr));
+       /* Rewind to start */
+       addr = rspamd_upstream_addr (up);
+       addr = rspamd_upstream_addr (up);
+       /* cur should be zero here */
+       addr = rspamd_upstream_addr (up);
+       next_addr = rspamd_upstream_addr (up);
+       g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
+       g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+       next_addr = rspamd_upstream_addr (up);
+       g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
+       next_addr = rspamd_upstream_addr (up);
+       g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+       next_addr = rspamd_upstream_addr (up);
+       g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+       next_addr = rspamd_upstream_addr (up);
+       g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
+       /* Test errors with IPv6 */
+       rspamd_upstream_fail (up);
+       /* Now we should have merely IPv4 addresses in rotation */
+       addr = rspamd_upstream_addr (up);
+       for (i = 0; i < 256; i++) {
+               next_addr = rspamd_upstream_addr (up);
+               g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
+               g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+               g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);
+               addr = next_addr;
+       }
+       rspamd_upstreams_destroy (nls);
+
        ls = rspamd_upstreams_create (cfg->ups_ctx);
        g_assert (rspamd_upstreams_parse_line (ls, test_upstream_list, 443, NULL));
        g_assert (rspamd_upstreams_count (ls) == 3);
@@ -128,25 +167,6 @@ rspamd_upstream_test_func (void)
 
        rspamd_upstreams_destroy (nls);
 
-       /*
-        * Test v4/v6 priorities
-        */
-       nls = rspamd_upstreams_create (cfg->ups_ctx);
-       g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL));
-       up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0);
-       rspamd_parse_inet_address (&paddr, "127.0.0.2", 0);
-       g_assert (rspamd_upstream_add_addr (up, paddr));
-       rspamd_parse_inet_address (&paddr, "::1", 0);
-       g_assert (rspamd_upstream_add_addr (up, paddr));
-       addr = rspamd_upstream_addr (up);
-       for (i = 0; i < 256; i ++) {
-               next_addr = rspamd_upstream_addr (up);
-               g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
-               g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
-               g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);
-               addr = next_addr;
-       }
-       rspamd_upstreams_destroy (nls);
 
        /* Upstream fail test */
        evtimer_set (&ev, rspamd_upstream_timeout_handler, resolver);