]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Fix periodic updates
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 2 Sep 2016 14:30:22 +0000 (15:30 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 2 Sep 2016 14:30:22 +0000 (15:30 +0100)
src/fuzzy_storage.c
src/libserver/fuzzy_backend.c
src/libserver/fuzzy_backend.h

index d663a89e4fd68757a7caff39df1ebebf76b2317c..56f62ba543e4cd959d6f0655944b512d267a6f89 100644 (file)
@@ -451,7 +451,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
 {
        struct rspamd_updates_cbdata *cbdata = ud;
        struct rspamd_fuzzy_mirror *m;
-       guint nupdates = 0, i;
+       guint i;
        struct rspamd_fuzzy_storage_ctx *ctx;
        const gchar *source;
        GList *cur;
@@ -463,7 +463,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
        if (success) {
                rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
 
-               if (nupdates > 0) {
+               if (g_queue_get_length (ctx->updates_pending) > 0) {
                        for (i = 0; i < ctx->mirrors->len; i ++) {
                                m = g_ptr_array_index (ctx->mirrors, i);
 
@@ -1458,6 +1458,20 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
        }
 }
 
+static gboolean
+rspamd_fuzzy_storage_periodic_callback (void *ud)
+{
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
+
+       if (g_queue_get_length (ctx->updates_pending) > 0) {
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
 static gboolean
 rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
                struct rspamd_worker *worker, gint fd,
@@ -1471,7 +1485,9 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
        rep.reply.fuzzy_sync.status = 0;
 
        if (ctx->backend && worker->index == 0) {
-               rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+                               rspamd_fuzzy_storage_periodic_callback, ctx);
        }
 
        if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
@@ -1515,7 +1531,8 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
        }
 
        if (ctx->backend && worker->index == 0) {
-               rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+                               rspamd_fuzzy_storage_periodic_callback, ctx);
        }
 
        if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
@@ -2255,7 +2272,8 @@ start_fuzzy (struct rspamd_worker *worker)
        }
 
        if (worker->index == 0) {
-               rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+                               rspamd_fuzzy_storage_periodic_callback, ctx);
        }
 
        if (ctx->mirrors && ctx->mirrors->len != 0) {
index 44493e17b8d22288faebc74a449429d19b5b7c14..98fd5a5dc645558e1feeef25866996e8ac3e1eae 100644 (file)
@@ -68,7 +68,7 @@ struct rspamd_fuzzy_backend_subr {
                        rspamd_fuzzy_version_cb cb, void *ud,
                        void *subr_ud);
        const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
-       void (*expire) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+       void (*periodic) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
        void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
 };
 
@@ -80,7 +80,7 @@ static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = {
                .count = rspamd_fuzzy_backend_count_sqlite,
                .version = rspamd_fuzzy_backend_version_sqlite,
                .id = rspamd_fuzzy_backend_id_sqlite,
-               .expire = rspamd_fuzzy_backend_expire_sqlite,
+               .periodic = rspamd_fuzzy_backend_expire_sqlite,
                .close = rspamd_fuzzy_backend_close_sqlite,
        }
 };
@@ -90,9 +90,11 @@ struct rspamd_fuzzy_backend {
        gdouble expire;
        gdouble sync;
        struct event_base *ev_base;
+       rspamd_fuzzy_periodic_cb periodic_cb;
+       void *periodic_ud;
        const struct rspamd_fuzzy_backend_subr *subr;
        void *subr_ud;
-       struct event expire_event;
+       struct event periodic_event;
 };
 
 static GQuark
@@ -349,8 +351,25 @@ rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk)
        return NULL;
 }
 
+static inline void
+rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk)
+{
+       if (bk->periodic_cb) {
+               if (bk->periodic_cb (bk->periodic_ud)) {
+                       if (bk->subr->periodic) {
+                               bk->subr->periodic (bk, bk->subr_ud);
+                       }
+               }
+       }
+       else {
+               if (bk->subr->periodic) {
+                       bk->subr->periodic (bk, bk->subr_ud);
+               }
+       }
+}
+
 static void
-rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud)
+rspamd_fuzzy_backend_periodic_cb (gint fd, short what, void *ud)
 {
        struct rspamd_fuzzy_backend *bk = ud;
        gdouble jittered;
@@ -358,33 +377,40 @@ rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud)
 
        jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
        double_to_tv (jittered, &tv);
-       event_del (&bk->expire_event);
-       bk->subr->expire (bk, bk->subr_ud);
-       event_add (&bk->expire_event, &tv);
+       event_del (&bk->periodic_event);
+       rspamd_fuzzy_backend_periodic_sync (bk);
+       event_add (&bk->periodic_event, &tv);
 }
 
 void
-rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *bk,
-               gdouble timeout)
+rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
+               gdouble timeout,
+               rspamd_fuzzy_periodic_cb cb,
+               void *ud)
 {
        gdouble jittered;
        struct timeval tv;
 
        g_assert (bk != NULL);
 
-       if (bk->subr->expire) {
+       if (bk->subr->periodic) {
                if (bk->sync > 0.0) {
-                       event_del (&bk->expire_event);
+                       event_del (&bk->periodic_event);
+               }
+
+               if (cb) {
+                       bk->periodic_cb = cb;
+                       bk->periodic_ud = ud;
                }
 
-               bk->subr->expire (bk, bk->subr_ud);
+               rspamd_fuzzy_backend_periodic_sync (bk);
                bk->sync = timeout;
                jittered = rspamd_time_jitter (timeout, timeout / 2.0);
                double_to_tv (jittered, &tv);
-               event_set (&bk->expire_event, -1, EV_TIMEOUT,
-                               rspamd_fuzzy_backend_expire_cb, bk);
-               event_base_set (bk->ev_base, &bk->expire_event);
-               event_add (&bk->expire_event, &tv);
+               event_set (&bk->periodic_event, -1, EV_TIMEOUT,
+                               rspamd_fuzzy_backend_periodic_cb, bk);
+               event_base_set (bk->ev_base, &bk->periodic_event);
+               event_add (&bk->periodic_event, &tv);
        }
 }
 
@@ -393,12 +419,12 @@ rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
 {
        g_assert (bk != NULL);
 
-       bk->subr->close (bk, bk->subr_ud);
-
        if (bk->sync > 0.0) {
-               bk->subr->expire (bk, bk->subr_ud);
-               event_del (&bk->expire_event);
+               rspamd_fuzzy_backend_periodic_sync (bk);
+               event_del (&bk->periodic_event);
        }
 
+       bk->subr->close (bk, bk->subr_ud);
+
        g_slice_free1 (sizeof (*bk), bk);
 }
index adb7e507555b8a6ce18ac6ce36f71273fc396994..a9385c2f6fcc6ad6586de87adba2cb0f6f7c737a 100644 (file)
@@ -29,6 +29,7 @@ typedef void (*rspamd_fuzzy_check_cb) (struct rspamd_fuzzy_reply *rep, void *ud)
 typedef void (*rspamd_fuzzy_update_cb) (gboolean success, void *ud);
 typedef void (*rspamd_fuzzy_version_cb) (guint64 rev, void *ud);
 typedef void (*rspamd_fuzzy_count_cb) (guint64 count, void *ud);
+typedef gboolean (*rspamd_fuzzy_periodic_cb) (void *ud);
 
 /**
  * Open fuzzy backend
@@ -92,8 +93,10 @@ const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
  * Starts expire process for the backend
  * @param backend
  */
-void rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *backend,
-               gdouble timeout);
+void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend,
+               gdouble timeout,
+               rspamd_fuzzy_periodic_cb cb,
+               void *ud);
 
 /**
  * Closes backend