]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Implement item finalization
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 30 Apr 2022 18:57:35 +0000 (19:57 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 30 Apr 2022 18:57:35 +0000 (19:57 +0100)
src/libserver/symcache/symcache_c.cxx
src/libserver/symcache/symcache_runtime.cxx
src/libserver/symcache/symcache_runtime.hxx

index bef932488fecab44ca79b9221074bea3558e2884..af4d3643255ce82fcd6fef6dcd1bed2245c45afd 100644 (file)
@@ -543,4 +543,14 @@ rspamd_symcache_process_symbols(struct rspamd_task *task,
 
        auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
        return cache_runtime->process_symbols(task, *real_cache, stage);
+}
+
+void
+rspamd_symcache_finalize_item(struct rspamd_task *task,
+                                                         struct rspamd_symcache_item *item)
+{
+       auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
+       auto *real_item = C_API_SYMCACHE_ITEM(item);
+
+       cache_runtime->finalize_item(task, real_item);
 }
\ No newline at end of file
index afd98e28682846a199b101dccfa8049e72679a94..0dfc23825ba2cf6a150c32120a046bb4d0b0923f 100644 (file)
@@ -20,6 +20,7 @@
 #include "libutil/cxx/util.hxx"
 #include "libserver/task.h"
 #include "libmime/scan_result.h"
+#include "libserver/worker_util.h"
 #include <limits>
 #include <cmath>
 
@@ -276,16 +277,14 @@ symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcache &ca
 auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *
 {
        /* Lookup in cache */
-       if (save_in_cache) {
-               for (const auto &cache_id: last_id_mappings) {
-                       if (cache_id.first == -1) {
-                               break;
-                       }
-                       if (cache_id.first == id) {
-                               auto *dyn_item = &dynamic_items[cache_id.second];
+       for (const auto &cache_id: last_id_mappings) {
+               if (cache_id.first == -1) {
+                       break;
+               }
+               if (cache_id.first == id) {
+                       auto *dyn_item = &dynamic_items[cache_id.second];
 
-                               return dyn_item;
-                       }
+                       return dyn_item;
                }
        }
 
@@ -345,7 +344,7 @@ auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache
                break;
 
        case RSPAMD_TASK_STAGE_FILTERS:
-               return process_filters(task, cache,rspamd_session_events_pending(task->s));
+               return process_filters(task, cache, rspamd_session_events_pending(task->s));
                break;
 
        default:
@@ -437,7 +436,7 @@ symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int
                        if (!check_item_deps(task, cache, item,
                                        dyn_item, false)) {
                                msg_debug_cache_task("blocked execution of %d(%s) unless deps are "
-                                                                         "resolved", item->id, item->symbol.c_str());
+                                                                        "resolved", item->id, item->symbol.c_str());
 
                                return true;
                        }
@@ -589,7 +588,7 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
 
                if (!item->deps.empty()) {
 
-                       for (const auto &dep : item->deps) {
+                       for (const auto &dep: item->deps) {
                                if (!dep.item) {
                                        /* Assume invalid deps as done */
                                        msg_debug_cache_task("symbol %d(%s) has invalid dependencies on %d(%s)",
@@ -610,26 +609,26 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
 
                                                                ret = false;
                                                                msg_debug_cache_task("delayed dependency %d(%s) for "
-                                                                                                         "symbol %d(%s)",
+                                                                                                        "symbol %d(%s)",
                                                                                dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                                                        }
-                                                       else if (!process_symbol(task, cache,dep.item.get(),dep_dyn_item)) {
+                                                       else if (!process_symbol(task, cache, dep.item.get(), dep_dyn_item)) {
                                                                /* Now started, but has events pending */
                                                                ret = false;
                                                                msg_debug_cache_task("started check of %d(%s) symbol "
-                                                                                                         "as dep for "
-                                                                                                         "%d(%s)",
+                                                                                                        "as dep for "
+                                                                                                        "%d(%s)",
                                                                                dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                                                        }
                                                        else {
                                                                msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is "
-                                                                                                         "already processed",
+                                                                                                        "already processed",
                                                                                dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                                                        }
                                                }
                                                else {
                                                        msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) "
-                                                                                                 "cannot be started now",
+                                                                                                "cannot be started now",
                                                                        dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                                                        ret = false;
                                                }
@@ -637,14 +636,14 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
                                        else {
                                                /* Started but not finished */
                                                msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is "
-                                                                                         "still executing",
+                                                                                        "still executing",
                                                                dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                                                ret = false;
                                        }
                                }
                                else {
                                        msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is already "
-                                                                                 "checked",
+                                                                                "checked",
                                                        dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                                }
                        }
@@ -656,5 +655,176 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
        return inner_functor(0, item, dyn_item, inner_functor);
 }
 
+
+struct rspamd_symcache_delayed_cbdata {
+       cache_item *item;
+       struct rspamd_task *task;
+       symcache_runtime *runtime;
+       struct rspamd_async_event *event;
+       struct ev_timer tm;
+};
+
+static void
+rspamd_symcache_delayed_item_fin(gpointer ud)
+{
+       auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud;
+
+       cbd->runtime->unset_slow();
+       ev_timer_stop(cbd->task->event_loop, &cbd->tm);
+}
+
+static void
+rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what)
+{
+       auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data;
+
+       cbd->event = NULL;
+
+       /* Timer will be stopped here */
+       rspamd_session_remove_event (cbd->task->s,
+                       rspamd_symcache_delayed_item_fin, cbd);
+       cbd->runtime->process_item_rdeps(cbd->task, cbd->item);
+
+}
+
+static void
+rspamd_delayed_timer_dtor(gpointer d)
+{
+       auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d;
+
+       if (cbd->event) {
+               /* Event has not been executed */
+               rspamd_session_remove_event (cbd->task->s,
+                               rspamd_symcache_delayed_item_fin, cbd);
+               cbd->event = nullptr;
+       }
+}
+
+auto
+symcache_runtime::finalize_item(struct rspamd_task *task, cache_item *item) -> void
+{
+       /* Limit to consider a rule as slow (in milliseconds) */
+       constexpr const gdouble slow_diff_limit = 300;
+       /* Sanity checks */
+       g_assert (items_inflight > 0);
+       auto *dyn_item = get_dynamic_item(item->id, false);
+
+       if (dyn_item->async_events > 0) {
+               /*
+                * XXX: Race condition
+                *
+                * It is possible that some async event is still in flight, but we
+                * already know its result, however, it is the responsibility of that
+                * event to decrease async events count and call this function
+                * one more time
+                */
+               msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d "
+                                                        "async events pending",
+                               item->symbol.c_str(), item->id, dyn_item->async_events);
+
+               return;
+       }
+
+       msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
+       dyn_item->finished = true;
+       items_inflight--;
+       cur_item = nullptr;
+
+       auto enable_slow_timer = [&]() -> bool {
+               auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata);
+               /* Add timer to allow something else to be executed */
+               ev_timer *tm = &cbd->tm;
+
+               cbd->event = rspamd_session_add_event (task->s,
+                               rspamd_symcache_delayed_item_fin, cbd,
+                               "symcache");
+
+               /*
+                * If no event could be added, then we are already in the destruction
+                * phase. So the main issue is to deal with has slow here
+                */
+               if (cbd->event) {
+                       ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
+                       ev_set_priority (tm, EV_MINPRI);
+                       rspamd_mempool_add_destructor (task->task_pool,
+                                       rspamd_delayed_timer_dtor, cbd);
+
+                       cbd->task = task;
+                       cbd->item = item;
+                       tm->data = cbd;
+                       ev_timer_start(task->event_loop, tm);
+               }
+               else {
+                       /* Just reset as no timer is added */
+                       has_slow = FALSE;
+                       return false;
+               }
+
+               return true;
+       };
+
+       if (profile) {
+               ev_now_update_if_cheap(task->event_loop);
+               auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 -
+                                        dyn_item->start_msec);
+
+               if (diff > slow_diff_limit) {
+
+                       if (!has_slow) {
+                               has_slow = true;
+
+                               msg_info_task ("slow rule: %s(%d): %.2f ms; enable slow timer delay",
+                                               item->symbol.c_str(), item->id,
+                                               diff);
+
+                               if (enable_slow_timer()) {
+                                       /* Allow network execution */
+                                       return;
+                               }
+                       }
+                       else {
+                               msg_info_task ("slow rule: %s(%d): %.2f ms",
+                                               item->symbol.c_str(), item->id,
+                                               diff);
+                       }
+               }
+
+               if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) {
+                       rspamd_task_profile_set(task, item->symbol.c_str(), diff);
+               }
+
+               if (rspamd_worker_is_scanner(task->worker)) {
+                       rspamd_set_counter(item->cd, diff);
+               }
+       }
+
+       process_item_rdeps(task, item);
+}
+
+auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void
+{
+       auto *cache_ptr = reinterpret_cast<symcache *>(task->cfg->cache);
+
+       for (const auto &rdep: item->rdeps) {
+               if (rdep.item) {
+                       auto *dyn_item = get_dynamic_item(rdep.item->id, true);
+                       if (!dyn_item->started) {
+                               msg_debug_cache_task ("check item %d(%s) rdep of %s ",
+                                               rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
+
+                               if (!check_item_deps(task, *cache_ptr, rdep.item.get(), dyn_item, false)) {
+                                       msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s "
+                                                                                 "unless deps are resolved",
+                                                       rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
+                               }
+                               else {
+                                       process_symbol(task, *cache_ptr, rdep.item.get(),
+                                                       dyn_item);
+                               }
+                       }
+               }
+       }
+}
+
 }
 
index e63d236e70ff5205a89543817f43b2ead5d5ee63..1d77bfd4a2a802ed67f67187fedc4180517b3c89 100644 (file)
@@ -176,6 +176,25 @@ public:
         * @return
         */
        auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool;
+
+       /**
+        * Finalize execution of some item in the cache
+        * @param task
+        * @param item
+        */
+       auto finalize_item(struct rspamd_task *task, cache_item *item) -> void;
+
+       /**
+        * Process unblocked reverse dependencies of the specific item
+        * @param task
+        * @param item
+        */
+       auto process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void;
+
+       /* XXX: a helper to allow hiding internal implementation of the slow timer structure */
+       auto unset_slow() -> void {
+               has_slow = false;
+       }
 };