{
struct rspamd_updates_cbdata *cbdata = ud;
struct rspamd_fuzzy_mirror *m;
- guint nupdates = 0, i;
+ guint i;
struct rspamd_fuzzy_storage_ctx *ctx;
const gchar *source;
GList *cur;
if (success) {
rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
- if (nupdates > 0) {
+ if (g_queue_get_length (ctx->updates_pending) > 0) {
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);
}
}
+static gboolean
+rspamd_fuzzy_storage_periodic_callback (void *ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+
+ if (g_queue_get_length (ctx->updates_pending) > 0) {
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
static gboolean
rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
rep.reply.fuzzy_sync.status = 0;
if (ctx->backend && worker->index == 0) {
- rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+ rspamd_fuzzy_storage_periodic_callback, ctx);
}
if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
}
if (ctx->backend && worker->index == 0) {
- rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+ rspamd_fuzzy_storage_periodic_callback, ctx);
}
if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
}
if (worker->index == 0) {
- rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+ rspamd_fuzzy_storage_periodic_callback, ctx);
}
if (ctx->mirrors && ctx->mirrors->len != 0) {
rspamd_fuzzy_version_cb cb, void *ud,
void *subr_ud);
const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
- void (*expire) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*periodic) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
};
.count = rspamd_fuzzy_backend_count_sqlite,
.version = rspamd_fuzzy_backend_version_sqlite,
.id = rspamd_fuzzy_backend_id_sqlite,
- .expire = rspamd_fuzzy_backend_expire_sqlite,
+ .periodic = rspamd_fuzzy_backend_expire_sqlite,
.close = rspamd_fuzzy_backend_close_sqlite,
}
};
gdouble expire;
gdouble sync;
struct event_base *ev_base;
+ rspamd_fuzzy_periodic_cb periodic_cb;
+ void *periodic_ud;
const struct rspamd_fuzzy_backend_subr *subr;
void *subr_ud;
- struct event expire_event;
+ struct event periodic_event;
};
static GQuark
return NULL;
}
+static inline void
+rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk)
+{
+ if (bk->periodic_cb) {
+ if (bk->periodic_cb (bk->periodic_ud)) {
+ if (bk->subr->periodic) {
+ bk->subr->periodic (bk, bk->subr_ud);
+ }
+ }
+ }
+ else {
+ if (bk->subr->periodic) {
+ bk->subr->periodic (bk, bk->subr_ud);
+ }
+ }
+}
+
static void
-rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud)
+rspamd_fuzzy_backend_periodic_cb (gint fd, short what, void *ud)
{
struct rspamd_fuzzy_backend *bk = ud;
gdouble jittered;
jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
double_to_tv (jittered, &tv);
- event_del (&bk->expire_event);
- bk->subr->expire (bk, bk->subr_ud);
- event_add (&bk->expire_event, &tv);
+ event_del (&bk->periodic_event);
+ rspamd_fuzzy_backend_periodic_sync (bk);
+ event_add (&bk->periodic_event, &tv);
}
void
-rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *bk,
- gdouble timeout)
+rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
+ gdouble timeout,
+ rspamd_fuzzy_periodic_cb cb,
+ void *ud)
{
gdouble jittered;
struct timeval tv;
g_assert (bk != NULL);
- if (bk->subr->expire) {
+ if (bk->subr->periodic) {
if (bk->sync > 0.0) {
- event_del (&bk->expire_event);
+ event_del (&bk->periodic_event);
+ }
+
+ if (cb) {
+ bk->periodic_cb = cb;
+ bk->periodic_ud = ud;
}
- bk->subr->expire (bk, bk->subr_ud);
+ rspamd_fuzzy_backend_periodic_sync (bk);
bk->sync = timeout;
jittered = rspamd_time_jitter (timeout, timeout / 2.0);
double_to_tv (jittered, &tv);
- event_set (&bk->expire_event, -1, EV_TIMEOUT,
- rspamd_fuzzy_backend_expire_cb, bk);
- event_base_set (bk->ev_base, &bk->expire_event);
- event_add (&bk->expire_event, &tv);
+ event_set (&bk->periodic_event, -1, EV_TIMEOUT,
+ rspamd_fuzzy_backend_periodic_cb, bk);
+ event_base_set (bk->ev_base, &bk->periodic_event);
+ event_add (&bk->periodic_event, &tv);
}
}
{
g_assert (bk != NULL);
- bk->subr->close (bk, bk->subr_ud);
-
if (bk->sync > 0.0) {
- bk->subr->expire (bk, bk->subr_ud);
- event_del (&bk->expire_event);
+ rspamd_fuzzy_backend_periodic_sync (bk);
+ event_del (&bk->periodic_event);
}
+ bk->subr->close (bk, bk->subr_ud);
+
g_slice_free1 (sizeof (*bk), bk);
}