From: Vsevolod Stakhov Date: Sat, 30 Apr 2022 18:57:35 +0000 (+0100) Subject: [Project] Implement item finalization X-Git-Tag: 3.3~293^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a1eb112ae959069b4c10767c6f2a7d664786f3e6;p=thirdparty%2Frspamd.git [Project] Implement item finalization --- diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx index bef932488f..af4d364325 100644 --- a/src/libserver/symcache/symcache_c.cxx +++ b/src/libserver/symcache/symcache_c.cxx @@ -543,4 +543,14 @@ rspamd_symcache_process_symbols(struct rspamd_task *task, auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); return cache_runtime->process_symbols(task, *real_cache, stage); +} + +void +rspamd_symcache_finalize_item(struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_item = C_API_SYMCACHE_ITEM(item); + + cache_runtime->finalize_item(task, real_item); } \ No newline at end of file diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx index afd98e2868..0dfc23825b 100644 --- a/src/libserver/symcache/symcache_runtime.cxx +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -20,6 +20,7 @@ #include "libutil/cxx/util.hxx" #include "libserver/task.h" #include "libmime/scan_result.h" +#include "libserver/worker_util.h" #include #include @@ -276,16 +277,14 @@ symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcache &ca auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item * { /* Lookup in cache */ - if (save_in_cache) { - for (const auto &cache_id: last_id_mappings) { - if (cache_id.first == -1) { - break; - } - if (cache_id.first == id) { - auto *dyn_item = &dynamic_items[cache_id.second]; + for (const auto &cache_id: last_id_mappings) { + if (cache_id.first == -1) { + break; + } + if (cache_id.first == id) { + auto *dyn_item = &dynamic_items[cache_id.second]; - return dyn_item; - } + return dyn_item; } } @@ -345,7 +344,7 @@ auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache break; case RSPAMD_TASK_STAGE_FILTERS: - return process_filters(task, cache,rspamd_session_events_pending(task->s)); + return process_filters(task, cache, rspamd_session_events_pending(task->s)); break; default: @@ -437,7 +436,7 @@ symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int if (!check_item_deps(task, cache, item, dyn_item, false)) { msg_debug_cache_task("blocked execution of %d(%s) unless deps are " - "resolved", item->id, item->symbol.c_str()); + "resolved", item->id, item->symbol.c_str()); return true; } @@ -589,7 +588,7 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache if (!item->deps.empty()) { - for (const auto &dep : item->deps) { + for (const auto &dep: item->deps) { if (!dep.item) { /* Assume invalid deps as done */ msg_debug_cache_task("symbol %d(%s) has invalid dependencies on %d(%s)", @@ -610,26 +609,26 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache ret = false; msg_debug_cache_task("delayed dependency %d(%s) for " - "symbol %d(%s)", + "symbol %d(%s)", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); } - else if (!process_symbol(task, cache,dep.item.get(),dep_dyn_item)) { + else if (!process_symbol(task, cache, dep.item.get(), dep_dyn_item)) { /* Now started, but has events pending */ ret = false; msg_debug_cache_task("started check of %d(%s) symbol " - "as dep for " - "%d(%s)", + "as dep for " + "%d(%s)", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); } else { msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is " - "already processed", + "already processed", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); } } else { msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) " - "cannot be started now", + "cannot be started now", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); ret = false; } @@ -637,14 +636,14 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache else { /* Started but not finished */ msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is " - "still executing", + "still executing", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); ret = false; } } else { msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is already " - "checked", + "checked", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); } } @@ -656,5 +655,176 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache return inner_functor(0, item, dyn_item, inner_functor); } + +struct rspamd_symcache_delayed_cbdata { + cache_item *item; + struct rspamd_task *task; + symcache_runtime *runtime; + struct rspamd_async_event *event; + struct ev_timer tm; +}; + +static void +rspamd_symcache_delayed_item_fin(gpointer ud) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud; + + cbd->runtime->unset_slow(); + ev_timer_stop(cbd->task->event_loop, &cbd->tm); +} + +static void +rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data; + + cbd->event = NULL; + + /* Timer will be stopped here */ + rspamd_session_remove_event (cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + cbd->runtime->process_item_rdeps(cbd->task, cbd->item); + +} + +static void +rspamd_delayed_timer_dtor(gpointer d) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d; + + if (cbd->event) { + /* Event has not been executed */ + rspamd_session_remove_event (cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + cbd->event = nullptr; + } +} + +auto +symcache_runtime::finalize_item(struct rspamd_task *task, cache_item *item) -> void +{ + /* Limit to consider a rule as slow (in milliseconds) */ + constexpr const gdouble slow_diff_limit = 300; + /* Sanity checks */ + g_assert (items_inflight > 0); + auto *dyn_item = get_dynamic_item(item->id, false); + + if (dyn_item->async_events > 0) { + /* + * XXX: Race condition + * + * It is possible that some async event is still in flight, but we + * already know its result, however, it is the responsibility of that + * event to decrease async events count and call this function + * one more time + */ + msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d " + "async events pending", + item->symbol.c_str(), item->id, dyn_item->async_events); + + return; + } + + msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id); + dyn_item->finished = true; + items_inflight--; + cur_item = nullptr; + + auto enable_slow_timer = [&]() -> bool { + auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata); + /* Add timer to allow something else to be executed */ + ev_timer *tm = &cbd->tm; + + cbd->event = rspamd_session_add_event (task->s, + rspamd_symcache_delayed_item_fin, cbd, + "symcache"); + + /* + * If no event could be added, then we are already in the destruction + * phase. So the main issue is to deal with has slow here + */ + if (cbd->event) { + ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0); + ev_set_priority (tm, EV_MINPRI); + rspamd_mempool_add_destructor (task->task_pool, + rspamd_delayed_timer_dtor, cbd); + + cbd->task = task; + cbd->item = item; + tm->data = cbd; + ev_timer_start(task->event_loop, tm); + } + else { + /* Just reset as no timer is added */ + has_slow = FALSE; + return false; + } + + return true; + }; + + if (profile) { + ev_now_update_if_cheap(task->event_loop); + auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 - + dyn_item->start_msec); + + if (diff > slow_diff_limit) { + + if (!has_slow) { + has_slow = true; + + msg_info_task ("slow rule: %s(%d): %.2f ms; enable slow timer delay", + item->symbol.c_str(), item->id, + diff); + + if (enable_slow_timer()) { + /* Allow network execution */ + return; + } + } + else { + msg_info_task ("slow rule: %s(%d): %.2f ms", + item->symbol.c_str(), item->id, + diff); + } + } + + if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) { + rspamd_task_profile_set(task, item->symbol.c_str(), diff); + } + + if (rspamd_worker_is_scanner(task->worker)) { + rspamd_set_counter(item->cd, diff); + } + } + + process_item_rdeps(task, item); +} + +auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void +{ + auto *cache_ptr = reinterpret_cast(task->cfg->cache); + + for (const auto &rdep: item->rdeps) { + if (rdep.item) { + auto *dyn_item = get_dynamic_item(rdep.item->id, true); + if (!dyn_item->started) { + msg_debug_cache_task ("check item %d(%s) rdep of %s ", + rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); + + if (!check_item_deps(task, *cache_ptr, rdep.item.get(), dyn_item, false)) { + msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); + } + else { + process_symbol(task, *cache_ptr, rdep.item.get(), + dyn_item); + } + } + } + } +} + } diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx index e63d236e70..1d77bfd4a2 100644 --- a/src/libserver/symcache/symcache_runtime.hxx +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -176,6 +176,25 @@ public: * @return */ auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool; + + /** + * Finalize execution of some item in the cache + * @param task + * @param item + */ + auto finalize_item(struct rspamd_task *task, cache_item *item) -> void; + + /** + * Process unblocked reverse dependencies of the specific item + * @param task + * @param item + */ + auto process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void; + + /* XXX: a helper to allow hiding internal implementation of the slow timer structure */ + auto unset_slow() -> void { + has_slow = false; + } };