#include "libutil/cxx/util.hxx"
#include "libserver/task.h"
#include "libmime/scan_result.h"
+#include "libserver/worker_util.h"
#include <limits>
#include <cmath>
auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *
{
/* Lookup in cache */
- if (save_in_cache) {
- for (const auto &cache_id: last_id_mappings) {
- if (cache_id.first == -1) {
- break;
- }
- if (cache_id.first == id) {
- auto *dyn_item = &dynamic_items[cache_id.second];
+ for (const auto &cache_id: last_id_mappings) {
+ if (cache_id.first == -1) {
+ break;
+ }
+ if (cache_id.first == id) {
+ auto *dyn_item = &dynamic_items[cache_id.second];
- return dyn_item;
- }
+ return dyn_item;
}
}
break;
case RSPAMD_TASK_STAGE_FILTERS:
- return process_filters(task, cache,rspamd_session_events_pending(task->s));
+ return process_filters(task, cache, rspamd_session_events_pending(task->s));
break;
default:
if (!check_item_deps(task, cache, item,
dyn_item, false)) {
msg_debug_cache_task("blocked execution of %d(%s) unless deps are "
- "resolved", item->id, item->symbol.c_str());
+ "resolved", item->id, item->symbol.c_str());
return true;
}
if (!item->deps.empty()) {
- for (const auto &dep : item->deps) {
+ for (const auto &dep: item->deps) {
if (!dep.item) {
/* Assume invalid deps as done */
msg_debug_cache_task("symbol %d(%s) has invalid dependencies on %d(%s)",
ret = false;
msg_debug_cache_task("delayed dependency %d(%s) for "
- "symbol %d(%s)",
+ "symbol %d(%s)",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
- else if (!process_symbol(task, cache,dep.item.get(),dep_dyn_item)) {
+ else if (!process_symbol(task, cache, dep.item.get(), dep_dyn_item)) {
/* Now started, but has events pending */
ret = false;
msg_debug_cache_task("started check of %d(%s) symbol "
- "as dep for "
- "%d(%s)",
+ "as dep for "
+ "%d(%s)",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
else {
msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is "
- "already processed",
+ "already processed",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
}
else {
msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) "
- "cannot be started now",
+ "cannot be started now",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
ret = false;
}
else {
/* Started but not finished */
msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is "
- "still executing",
+ "still executing",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
ret = false;
}
}
else {
msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is already "
- "checked",
+ "checked",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
}
return inner_functor(0, item, dyn_item, inner_functor);
}
+
+struct rspamd_symcache_delayed_cbdata {
+ cache_item *item;
+ struct rspamd_task *task;
+ symcache_runtime *runtime;
+ struct rspamd_async_event *event;
+ struct ev_timer tm;
+};
+
+static void
+rspamd_symcache_delayed_item_fin(gpointer ud)
+{
+ auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud;
+
+ cbd->runtime->unset_slow();
+ ev_timer_stop(cbd->task->event_loop, &cbd->tm);
+}
+
+static void
+rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what)
+{
+ auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data;
+
+ cbd->event = NULL;
+
+ /* Timer will be stopped here */
+ rspamd_session_remove_event (cbd->task->s,
+ rspamd_symcache_delayed_item_fin, cbd);
+ cbd->runtime->process_item_rdeps(cbd->task, cbd->item);
+
+}
+
+static void
+rspamd_delayed_timer_dtor(gpointer d)
+{
+ auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d;
+
+ if (cbd->event) {
+ /* Event has not been executed */
+ rspamd_session_remove_event (cbd->task->s,
+ rspamd_symcache_delayed_item_fin, cbd);
+ cbd->event = nullptr;
+ }
+}
+
+auto
+symcache_runtime::finalize_item(struct rspamd_task *task, cache_item *item) -> void
+{
+ /* Limit to consider a rule as slow (in milliseconds) */
+ constexpr const gdouble slow_diff_limit = 300;
+ /* Sanity checks */
+ g_assert (items_inflight > 0);
+ auto *dyn_item = get_dynamic_item(item->id, false);
+
+ if (dyn_item->async_events > 0) {
+ /*
+ * XXX: Race condition
+ *
+ * It is possible that some async event is still in flight, but we
+ * already know its result, however, it is the responsibility of that
+ * event to decrease async events count and call this function
+ * one more time
+ */
+ msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d "
+ "async events pending",
+ item->symbol.c_str(), item->id, dyn_item->async_events);
+
+ return;
+ }
+
+ msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
+ dyn_item->finished = true;
+ items_inflight--;
+ cur_item = nullptr;
+
+ auto enable_slow_timer = [&]() -> bool {
+ auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata);
+ /* Add timer to allow something else to be executed */
+ ev_timer *tm = &cbd->tm;
+
+ cbd->event = rspamd_session_add_event (task->s,
+ rspamd_symcache_delayed_item_fin, cbd,
+ "symcache");
+
+ /*
+ * If no event could be added, then we are already in the destruction
+ * phase. So the main issue is to deal with has slow here
+ */
+ if (cbd->event) {
+ ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
+ ev_set_priority (tm, EV_MINPRI);
+ rspamd_mempool_add_destructor (task->task_pool,
+ rspamd_delayed_timer_dtor, cbd);
+
+ cbd->task = task;
+ cbd->item = item;
+ tm->data = cbd;
+ ev_timer_start(task->event_loop, tm);
+ }
+ else {
+ /* Just reset as no timer is added */
+ has_slow = FALSE;
+ return false;
+ }
+
+ return true;
+ };
+
+ if (profile) {
+ ev_now_update_if_cheap(task->event_loop);
+ auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 -
+ dyn_item->start_msec);
+
+ if (diff > slow_diff_limit) {
+
+ if (!has_slow) {
+ has_slow = true;
+
+ msg_info_task ("slow rule: %s(%d): %.2f ms; enable slow timer delay",
+ item->symbol.c_str(), item->id,
+ diff);
+
+ if (enable_slow_timer()) {
+ /* Allow network execution */
+ return;
+ }
+ }
+ else {
+ msg_info_task ("slow rule: %s(%d): %.2f ms",
+ item->symbol.c_str(), item->id,
+ diff);
+ }
+ }
+
+ if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) {
+ rspamd_task_profile_set(task, item->symbol.c_str(), diff);
+ }
+
+ if (rspamd_worker_is_scanner(task->worker)) {
+ rspamd_set_counter(item->cd, diff);
+ }
+ }
+
+ process_item_rdeps(task, item);
+}
+
+auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void
+{
+ auto *cache_ptr = reinterpret_cast<symcache *>(task->cfg->cache);
+
+ for (const auto &rdep: item->rdeps) {
+ if (rdep.item) {
+ auto *dyn_item = get_dynamic_item(rdep.item->id, true);
+ if (!dyn_item->started) {
+ msg_debug_cache_task ("check item %d(%s) rdep of %s ",
+ rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
+
+ if (!check_item_deps(task, *cache_ptr, rdep.item.get(), dyn_item, false)) {
+ msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s "
+ "unless deps are resolved",
+ rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
+ }
+ else {
+ process_symbol(task, *cache_ptr, rdep.item.get(),
+ dyn_item);
+ }
+ }
+ }
+ }
+}
+
}