From: Vsevolod Stakhov Date: Sat, 20 Oct 2018 12:29:42 +0000 (+0100) Subject: [Project] Rework symbols processing logic X-Git-Tag: 1.8.2~183 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=afa07e499f51eac10dc1ed6a566752904f905f80;p=thirdparty%2Frspamd.git [Project] Rework symbols processing logic --- diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index beb2dc53f1..90fbf1df89 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -103,6 +103,7 @@ struct rspamd_symcache_item { guint64 last_count; /* Per process counter */ + gdouble start_ticks; struct rspamd_counter_data *cd; gchar *symbol; enum rspamd_symbol_type type; @@ -165,7 +166,8 @@ struct cache_savepoint { guint version; struct rspamd_metric_result *rs; gdouble lim; - GPtrArray *waitq; + struct rspamd_symcache_item *cur_item; + guint items_inflight; struct symbols_cache_order *order; }; @@ -188,8 +190,7 @@ struct rspamd_cache_refresh_cbdata { static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, struct symbols_cache *cache, struct rspamd_symcache_item *item, - struct cache_savepoint *checkpoint, - gdouble *total_diff); + struct cache_savepoint *checkpoint); static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task, struct symbols_cache *cache, struct rspamd_symcache_item *item, @@ -1321,8 +1322,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) it->id, it->symbol); rspamd_symbols_cache_check_symbol (task, cache, it, - checkpoint, - NULL); + checkpoint); } } } @@ -1337,16 +1337,12 @@ static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, struct symbols_cache *cache, struct rspamd_symcache_item *item, - struct cache_savepoint *checkpoint, - gdouble *total_diff) + struct cache_savepoint *checkpoint) { - guint pending_before, pending_after; - double t1 = 0, t2 = 0; - gdouble diff; + double t1 = 0; struct rspamd_task **ptask; lua_State *L; gboolean check = TRUE; - const gdouble slow_diff_limit = 0.1; if (item->func) { @@ -1378,12 +1374,6 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, } if (check) { - pending_before = rspamd_session_events_pending (task->s); - /* Watch for events appeared */ - rspamd_session_watch_start (task->s, - item->id, - rspamd_symbols_cache_watcher_cb, - item); msg_debug_task ("execute %s, %d", item->symbol, item->id); #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC struct timeval tv; @@ -1394,43 +1384,13 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, #else t1 = rspamd_get_ticks (FALSE); #endif - + item->start_ticks = t1; + checkpoint->items_inflight ++; + /* Callback now must finalize itself */ item->func (task, item, item->user_data); - -#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC - event_base_update_cache_time (task->ev_base); - event_base_gettimeofday_cached (task->ev_base, &tv); - t2 = tv_to_double (&tv); -#else - t2 = rspamd_get_ticks (FALSE); -#endif - - diff = (t2 - t1); - - if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { - rspamd_task_profile_set (task, item->symbol, diff); - } - - if (total_diff) { - *total_diff += diff; - } - - if (diff > slow_diff_limit && !(item->type & SYMBOL_TYPE_SQUEEZED)) { - msg_info_task ("slow rule: %s: %.2f ms", item->symbol, - diff * 1000); - } - - if (rspamd_worker_is_scanner (task->worker)) { - rspamd_set_counter (item->cd, diff); - } - - pending_after = rspamd_session_events_pending (task->s); rspamd_session_watch_stop (task->s); - if (pending_before == pending_after) { - /* No new events registered */ - setbit (checkpoint->processed_bits, item->id * 2 + 1); - + if (checkpoint->items_inflight == 0) { return TRUE; } @@ -1492,20 +1452,6 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, checkpoint, recursion + 1, check_only)) { - gboolean found = FALSE; - guint j; - struct rspamd_symcache_item *tmp_it; - - PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) { - if (item->id == tmp_it->id) { - found = TRUE; - break; - } - } - - if (!found) { - g_ptr_array_add (checkpoint->waitq, item); - } ret = FALSE; msg_debug_task ("delayed dependency %d(%s) for " @@ -1514,8 +1460,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, } else if (!rspamd_symbols_cache_check_symbol (task, cache, dep->item, - checkpoint, - NULL)) { + checkpoint)) { /* Now started, but has events pending */ ret = FALSE; msg_debug_task ("started check of %d(%s) symbol " @@ -1566,15 +1511,6 @@ rspamd_symbols_cache_continuation (void *data) rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); } -static void -rspamd_symbols_cache_tm (gint fd, short what, void *data) -{ - struct rspamd_task *task = data; - - rspamd_session_remove_event (task->s, rspamd_symbols_cache_continuation, - data); -} - static struct cache_savepoint * rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task, struct symbols_cache *cache) @@ -1595,15 +1531,12 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task, /* Bit 0: check started, Bit 1: check finished */ checkpoint->processed_bits = rspamd_mempool_alloc0 (task->task_pool, NBYTES (cache->used_items) * 2); - checkpoint->waitq = g_ptr_array_new (); g_assert (cache->items_by_order != NULL); checkpoint->version = cache->items_by_order->d->len; checkpoint->order = cache->items_by_order; REF_RETAIN (checkpoint->order); rspamd_mempool_add_destructor (task->task_pool, rspamd_symbols_cache_order_unref, checkpoint->order); - rspamd_mempool_add_destructor (task->task_pool, - rspamd_ptr_array_free_hard, checkpoint->waitq); checkpoint->pass = RSPAMD_CACHE_PASS_INIT; task->checkpoint = checkpoint; @@ -1714,10 +1647,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, struct rspamd_symcache_item *item = NULL; struct cache_savepoint *checkpoint; gint i; - gdouble total_ticks = 0; gboolean all_done; gint saved_priority; - const gdouble max_ticks = 3e8; guint start_events_pending; g_assert (cache != NULL); @@ -1775,7 +1706,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_ticks); + checkpoint); } } @@ -1856,34 +1787,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_ticks); - } - - if (total_ticks > max_ticks) { - /* Maybe we should stop and check pending events? */ - if (rspamd_session_events_pending (task->s) > start_events_pending) { - /* Add some timeout event to avoid too long waiting */ -#if 0 - struct event *ev; - struct timeval tv; - - rspamd_session_add_event (task->s, - rspamd_symbols_cache_continuation, task, - rspamd_symbols_cache_quark ()); - ev = rspamd_mempool_alloc (task->task_pool, sizeof (*ev)); - event_set (ev, -1, EV_TIMEOUT, rspamd_symbols_cache_tm, task); - event_base_set (task->ev_base, ev); - msec_to_tv (50, &tv); - event_add (ev, &tv); - rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)event_del, ev); -#endif - msg_info_task ("trying to check async events after spending " - "%.0f ticks processing symbols", - total_ticks); - - return TRUE; - } + checkpoint); } } @@ -1902,18 +1806,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_ticks); - } - - if (total_ticks > max_ticks) { - /* Maybe we should stop and check pending events? */ - if (rspamd_session_events_pending (task->s) > - start_events_pending) { - msg_debug_task ("trying to check async events after spending " - "%.0f microseconds processing symbols", - total_ticks); - return TRUE; - } + checkpoint); } } @@ -1958,7 +1851,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_ticks); + checkpoint); } } checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS; @@ -2016,7 +1909,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_ticks); + checkpoint); } } checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT; @@ -2694,3 +2587,83 @@ rspamd_symbols_cache_foreach (struct symbols_cache *cache, func (item->id, item->symbol, item->type, ud); } } + +struct rspamd_symcache_item * +rspamd_symbols_cache_get_cur_item (struct rspamd_task *task) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + + return checkpoint->cur_item; +} + +/** + * Replaces the current item being processed. + * Returns the current item being processed (if any) + * @param task + * @param item + * @return + */ +struct rspamd_symcache_item * +rspamd_symbols_cache_set_cur_item (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + struct rspamd_symcache_item *ex; + + ex = checkpoint->cur_item; + checkpoint->cur_item = item; + + return ex; +} + + +/** + * Finalize the current async element potentially calling its deps + */ +void +rspamd_symbols_cache_finalize_item (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + struct rspamd_symcache_item *rdep; + gdouble total_ticks = 0, t2, diff; + guint i; + struct timeval tv; + const gdouble slow_diff_limit = 0.1; + + setbit (checkpoint->processed_bits, item->id + 1); + g_assert (checkpoint->items_inflight > 0); + checkpoint->items_inflight --; + +#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC + event_base_update_cache_time (task->ev_base); + event_base_gettimeofday_cached (task->ev_base, &tv); + t2 = tv_to_double (&tv); +#else + t2 = rspamd_get_ticks (FALSE); +#endif + + diff = (t2 - item->start_ticks); + + if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { + rspamd_task_profile_set (task, item->symbol, diff); + } + + if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { + if (diff > slow_diff_limit) { + msg_info_task ("slow rule: %s: %.2f ms", item->symbol, + diff * 1000); + } + + if (rspamd_worker_is_scanner (task->worker)) { + rspamd_set_counter (item->cd, diff); + } + } + + /* Process all reverse dependencies */ + PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { + rspamd_symbols_cache_check_symbol (task, task->cfg->cache, + rdep, + checkpoint); + } +} \ No newline at end of file diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h index d3cfc2b96a..f8764ee8c4 100644 --- a/src/libserver/symbols_cache.h +++ b/src/libserver/symbols_cache.h @@ -308,4 +308,27 @@ void rspamd_symbols_cache_foreach (struct symbols_cache *cache, gint /* flags */, gpointer /* userdata */), gpointer ud); +/** + * Returns the current item being processed (if any) + * @param task + * @return + */ +struct rspamd_symcache_item *rspamd_symbols_cache_get_cur_item (struct rspamd_task *task); + +/** + * Replaces the current item being processed. + * Returns the current item being processed (if any) + * @param task + * @param item + * @return + */ +struct rspamd_symcache_item *rspamd_symbols_cache_set_cur_item (struct rspamd_task *task, + struct rspamd_symcache_item *item); + + +/** + * Finalize the current async element potentially calling its deps + */ +void rspamd_symbols_cache_finalize_item (struct rspamd_task *task, + struct rspamd_symcache_item *item); #endif diff --git a/src/plugins/chartable.c b/src/plugins/chartable.c index 45b6ff4cfb..208b78d49e 100644 --- a/src/plugins/chartable.c +++ b/src/plugins/chartable.c @@ -667,6 +667,8 @@ chartable_symbol_callback (struct rspamd_task *task, utext_close (&utxt); } + + rspamd_symbols_cache_finalize_item (task, item); } static void @@ -736,4 +738,6 @@ chartable_url_symbol_callback (struct rspamd_task *task, cur_score, NULL); } + + rspamd_symbols_cache_finalize_item (task, item); }