]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Rework symbols processing logic
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 12:29:42 +0000 (13:29 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 18:43:32 +0000 (19:43 +0100)
src/libserver/symbols_cache.c
src/libserver/symbols_cache.h
src/plugins/chartable.c

index beb2dc53f1b4e293035e8799a74bfa18e1e39937..90fbf1df898b8b879dc64daf7c06659bf70a7f61 100644 (file)
@@ -103,6 +103,7 @@ struct rspamd_symcache_item {
        guint64 last_count;
 
        /* Per process counter */
+       gdouble start_ticks;
        struct rspamd_counter_data *cd;
        gchar *symbol;
        enum rspamd_symbol_type type;
@@ -165,7 +166,8 @@ struct cache_savepoint {
        guint version;
        struct rspamd_metric_result *rs;
        gdouble lim;
-       GPtrArray *waitq;
+       struct rspamd_symcache_item *cur_item;
+       guint items_inflight;
        struct symbols_cache_order *order;
 };
 
@@ -188,8 +190,7 @@ struct rspamd_cache_refresh_cbdata {
 static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                struct symbols_cache *cache,
                struct rspamd_symcache_item *item,
-               struct cache_savepoint *checkpoint,
-               gdouble *total_diff);
+               struct cache_savepoint *checkpoint);
 static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                struct symbols_cache *cache,
                struct rspamd_symcache_item *item,
@@ -1321,8 +1322,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
                                                        it->id,
                                                        it->symbol);
                                        rspamd_symbols_cache_check_symbol (task, cache, it,
-                                                       checkpoint,
-                                                       NULL);
+                                                       checkpoint);
                                }
                        }
                }
@@ -1337,16 +1337,12 @@ static gboolean
 rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                struct symbols_cache *cache,
                struct rspamd_symcache_item *item,
-               struct cache_savepoint *checkpoint,
-               gdouble *total_diff)
+               struct cache_savepoint *checkpoint)
 {
-       guint pending_before, pending_after;
-       double t1 = 0, t2 = 0;
-       gdouble diff;
+       double t1 = 0;
        struct rspamd_task **ptask;
        lua_State *L;
        gboolean check = TRUE;
-       const gdouble slow_diff_limit = 0.1;
 
        if (item->func) {
 
@@ -1378,12 +1374,6 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                }
 
                if (check) {
-                       pending_before = rspamd_session_events_pending (task->s);
-                       /* Watch for events appeared */
-                       rspamd_session_watch_start (task->s,
-                                       item->id,
-                                       rspamd_symbols_cache_watcher_cb,
-                                       item);
                        msg_debug_task ("execute %s, %d", item->symbol, item->id);
 #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
                        struct timeval tv;
@@ -1394,43 +1384,13 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
 #else
                        t1 = rspamd_get_ticks (FALSE);
 #endif
-
+                       item->start_ticks = t1;
+                       checkpoint->items_inflight ++;
+                       /* Callback now must finalize itself */
                        item->func (task, item, item->user_data);
-
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
-                       event_base_update_cache_time (task->ev_base);
-                       event_base_gettimeofday_cached (task->ev_base, &tv);
-                       t2 = tv_to_double (&tv);
-#else
-                       t2 = rspamd_get_ticks (FALSE);
-#endif
-
-                       diff = (t2 - t1);
-
-                       if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
-                               rspamd_task_profile_set (task, item->symbol, diff);
-                       }
-
-                       if (total_diff) {
-                               *total_diff += diff;
-                       }
-
-                       if (diff > slow_diff_limit && !(item->type & SYMBOL_TYPE_SQUEEZED)) {
-                               msg_info_task ("slow rule: %s: %.2f ms", item->symbol,
-                                               diff * 1000);
-                       }
-
-                       if (rspamd_worker_is_scanner (task->worker)) {
-                               rspamd_set_counter (item->cd, diff);
-                       }
-
-                       pending_after = rspamd_session_events_pending (task->s);
                        rspamd_session_watch_stop (task->s);
 
-                       if (pending_before == pending_after) {
-                               /* No new events registered */
-                               setbit (checkpoint->processed_bits, item->id * 2 + 1);
-
+                       if (checkpoint->items_inflight == 0) {
                                return TRUE;
                        }
 
@@ -1492,20 +1452,6 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                                                checkpoint,
                                                                recursion + 1,
                                                                check_only)) {
-                                                       gboolean found = FALSE;
-                                                       guint j;
-                                                       struct rspamd_symcache_item *tmp_it;
-
-                                                       PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) {
-                                                               if (item->id == tmp_it->id) {
-                                                                       found = TRUE;
-                                                                       break;
-                                                               }
-                                                       }
-
-                                                       if (!found) {
-                                                               g_ptr_array_add (checkpoint->waitq, item);
-                                                       }
 
                                                        ret = FALSE;
                                                        msg_debug_task ("delayed dependency %d(%s) for "
@@ -1514,8 +1460,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                                }
                                                else if (!rspamd_symbols_cache_check_symbol (task, cache,
                                                                dep->item,
-                                                               checkpoint,
-                                                               NULL)) {
+                                                               checkpoint)) {
                                                        /* Now started, but has events pending */
                                                        ret = FALSE;
                                                        msg_debug_task ("started check of %d(%s) symbol "
@@ -1566,15 +1511,6 @@ rspamd_symbols_cache_continuation (void *data)
        rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
 }
 
-static void
-rspamd_symbols_cache_tm (gint fd, short what, void *data)
-{
-       struct rspamd_task *task = data;
-
-       rspamd_session_remove_event (task->s, rspamd_symbols_cache_continuation,
-                       data);
-}
-
 static struct cache_savepoint *
 rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task,
                struct symbols_cache *cache)
@@ -1595,15 +1531,12 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task,
        /* Bit 0: check started, Bit 1: check finished */
        checkpoint->processed_bits = rspamd_mempool_alloc0 (task->task_pool,
                        NBYTES (cache->used_items) * 2);
-       checkpoint->waitq = g_ptr_array_new ();
        g_assert (cache->items_by_order != NULL);
        checkpoint->version = cache->items_by_order->d->len;
        checkpoint->order = cache->items_by_order;
        REF_RETAIN (checkpoint->order);
        rspamd_mempool_add_destructor (task->task_pool,
                        rspamd_symbols_cache_order_unref, checkpoint->order);
-       rspamd_mempool_add_destructor (task->task_pool,
-                       rspamd_ptr_array_free_hard, checkpoint->waitq);
        checkpoint->pass = RSPAMD_CACHE_PASS_INIT;
        task->checkpoint = checkpoint;
 
@@ -1714,10 +1647,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
        struct rspamd_symcache_item *item = NULL;
        struct cache_savepoint *checkpoint;
        gint i;
-       gdouble total_ticks = 0;
        gboolean all_done;
        gint saved_priority;
-       const gdouble max_ticks = 3e8;
        guint start_events_pending;
 
        g_assert (cache != NULL);
@@ -1775,7 +1706,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_ticks);
+                                               checkpoint);
                        }
                }
 
@@ -1856,34 +1787,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_ticks);
-                       }
-
-                       if (total_ticks > max_ticks) {
-                               /* Maybe we should stop and check pending events? */
-                               if (rspamd_session_events_pending (task->s) > start_events_pending) {
-                                       /* Add some timeout event to avoid too long waiting */
-#if 0
-                                       struct event *ev;
-                                       struct timeval tv;
-
-                                       rspamd_session_add_event (task->s,
-                                                       rspamd_symbols_cache_continuation, task,
-                                                       rspamd_symbols_cache_quark ());
-                                       ev = rspamd_mempool_alloc (task->task_pool, sizeof (*ev));
-                                       event_set (ev, -1, EV_TIMEOUT, rspamd_symbols_cache_tm, task);
-                                       event_base_set (task->ev_base, ev);
-                                       msec_to_tv (50, &tv);
-                                       event_add (ev, &tv);
-                                       rspamd_mempool_add_destructor (task->task_pool,
-                                                       (rspamd_mempool_destruct_t)event_del, ev);
-#endif
-                                       msg_info_task ("trying to check async events after spending "
-                                                       "%.0f ticks processing symbols",
-                                                       total_ticks);
-
-                                       return TRUE;
-                               }
+                                               checkpoint);
                        }
                }
 
@@ -1902,18 +1806,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_ticks);
-                       }
-
-                       if (total_ticks > max_ticks) {
-                               /* Maybe we should stop and check pending events? */
-                               if (rspamd_session_events_pending (task->s) >
-                                               start_events_pending) {
-                                       msg_debug_task ("trying to check async events after spending "
-                                                       "%.0f microseconds processing symbols",
-                                                       total_ticks);
-                                       return TRUE;
-                               }
+                                               checkpoint);
                        }
                }
 
@@ -1958,7 +1851,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_ticks);
+                                               checkpoint);
                        }
                }
                checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS;
@@ -2016,7 +1909,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                        }
                                }
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_ticks);
+                                               checkpoint);
                        }
                }
                checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
@@ -2694,3 +2587,83 @@ rspamd_symbols_cache_foreach (struct symbols_cache *cache,
                func (item->id, item->symbol, item->type, ud);
        }
 }
+
+struct rspamd_symcache_item *
+rspamd_symbols_cache_get_cur_item (struct rspamd_task *task)
+{
+       struct cache_savepoint *checkpoint = task->checkpoint;
+
+       return checkpoint->cur_item;
+}
+
+/**
+ * Replaces the current item being processed.
+ * Returns the current item being processed (if any)
+ * @param task
+ * @param item
+ * @return
+ */
+struct rspamd_symcache_item *
+rspamd_symbols_cache_set_cur_item (struct rspamd_task *task,
+                                                                  struct rspamd_symcache_item *item)
+{
+       struct cache_savepoint *checkpoint = task->checkpoint;
+       struct rspamd_symcache_item *ex;
+
+       ex = checkpoint->cur_item;
+       checkpoint->cur_item = item;
+
+       return ex;
+}
+
+
+/**
+ * Finalize the current async element potentially calling its deps
+ */
+void
+rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
+                                                                       struct rspamd_symcache_item *item)
+{
+       struct cache_savepoint *checkpoint = task->checkpoint;
+       struct rspamd_symcache_item *rdep;
+       gdouble total_ticks = 0, t2, diff;
+       guint i;
+       struct timeval tv;
+       const gdouble slow_diff_limit = 0.1;
+
+       setbit (checkpoint->processed_bits, item->id + 1);
+       g_assert (checkpoint->items_inflight > 0);
+       checkpoint->items_inflight --;
+
+#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
+       event_base_update_cache_time (task->ev_base);
+       event_base_gettimeofday_cached (task->ev_base, &tv);
+       t2 = tv_to_double (&tv);
+#else
+       t2 = rspamd_get_ticks (FALSE);
+#endif
+
+       diff = (t2 - item->start_ticks);
+
+       if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
+               rspamd_task_profile_set (task, item->symbol, diff);
+       }
+
+       if (!(item->type & SYMBOL_TYPE_SQUEEZED)) {
+               if (diff > slow_diff_limit) {
+                       msg_info_task ("slow rule: %s: %.2f ms", item->symbol,
+                                       diff * 1000);
+               }
+
+               if (rspamd_worker_is_scanner (task->worker)) {
+                       rspamd_set_counter (item->cd, diff);
+               }
+       }
+
+       /* Process all reverse dependencies */
+       PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
+               rspamd_symbols_cache_check_symbol (task, task->cfg->cache,
+                               rdep,
+                               checkpoint);
+       }
+}
\ No newline at end of file
index d3cfc2b96a859e7ed9cd89628fe52ea1f1cf7db8..f8764ee8c4a9a21e965ad15dfc424fd86abf01e0 100644 (file)
@@ -308,4 +308,27 @@ void rspamd_symbols_cache_foreach (struct symbols_cache *cache,
                                gint /* flags */, gpointer /* userdata */),
                gpointer ud);
 
+/**
+ * Returns the current item being processed (if any)
+ * @param task
+ * @return
+ */
+struct rspamd_symcache_item *rspamd_symbols_cache_get_cur_item (struct rspamd_task *task);
+
+/**
+ * Replaces the current item being processed.
+ * Returns the current item being processed (if any)
+ * @param task
+ * @param item
+ * @return
+ */
+struct rspamd_symcache_item *rspamd_symbols_cache_set_cur_item (struct rspamd_task *task,
+                                                                                                                               struct rspamd_symcache_item *item);
+
+
+/**
+ * Finalize the current async element potentially calling its deps
+ */
+void rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
+                                                                                struct rspamd_symcache_item *item);
 #endif
index 45b6ff4cfb2d6d7626e8b4edbb6efc96555db44d..208b78d49e7152b0cfb3e577d47fa1a77b9e2ad6 100644 (file)
@@ -667,6 +667,8 @@ chartable_symbol_callback (struct rspamd_task *task,
 
                utext_close (&utxt);
        }
+
+       rspamd_symbols_cache_finalize_item (task, item);
 }
 
 static void
@@ -736,4 +738,6 @@ chartable_url_symbol_callback (struct rspamd_task *task,
                                cur_score, NULL);
 
        }
+
+       rspamd_symbols_cache_finalize_item (task, item);
 }