guint64 last_count;
/* Per process counter */
+ gdouble start_ticks;
struct rspamd_counter_data *cd;
gchar *symbol;
enum rspamd_symbol_type type;
guint version;
struct rspamd_metric_result *rs;
gdouble lim;
- GPtrArray *waitq;
+ struct rspamd_symcache_item *cur_item;
+ guint items_inflight;
struct symbols_cache_order *order;
};
static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
struct rspamd_symcache_item *item,
- struct cache_savepoint *checkpoint,
- gdouble *total_diff);
+ struct cache_savepoint *checkpoint);
static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
struct symbols_cache *cache,
struct rspamd_symcache_item *item,
it->id,
it->symbol);
rspamd_symbols_cache_check_symbol (task, cache, it,
- checkpoint,
- NULL);
+ checkpoint);
}
}
}
rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
struct rspamd_symcache_item *item,
- struct cache_savepoint *checkpoint,
- gdouble *total_diff)
+ struct cache_savepoint *checkpoint)
{
- guint pending_before, pending_after;
- double t1 = 0, t2 = 0;
- gdouble diff;
+ double t1 = 0;
struct rspamd_task **ptask;
lua_State *L;
gboolean check = TRUE;
- const gdouble slow_diff_limit = 0.1;
if (item->func) {
}
if (check) {
- pending_before = rspamd_session_events_pending (task->s);
- /* Watch for events appeared */
- rspamd_session_watch_start (task->s,
- item->id,
- rspamd_symbols_cache_watcher_cb,
- item);
msg_debug_task ("execute %s, %d", item->symbol, item->id);
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
struct timeval tv;
#else
t1 = rspamd_get_ticks (FALSE);
#endif
-
+ item->start_ticks = t1;
+ checkpoint->items_inflight ++;
+ /* Callback now must finalize itself */
item->func (task, item, item->user_data);
-
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- event_base_update_cache_time (task->ev_base);
- event_base_gettimeofday_cached (task->ev_base, &tv);
- t2 = tv_to_double (&tv);
-#else
- t2 = rspamd_get_ticks (FALSE);
-#endif
-
- diff = (t2 - t1);
-
- if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
- rspamd_task_profile_set (task, item->symbol, diff);
- }
-
- if (total_diff) {
- *total_diff += diff;
- }
-
- if (diff > slow_diff_limit && !(item->type & SYMBOL_TYPE_SQUEEZED)) {
- msg_info_task ("slow rule: %s: %.2f ms", item->symbol,
- diff * 1000);
- }
-
- if (rspamd_worker_is_scanner (task->worker)) {
- rspamd_set_counter (item->cd, diff);
- }
-
- pending_after = rspamd_session_events_pending (task->s);
rspamd_session_watch_stop (task->s);
- if (pending_before == pending_after) {
- /* No new events registered */
- setbit (checkpoint->processed_bits, item->id * 2 + 1);
-
+ if (checkpoint->items_inflight == 0) {
return TRUE;
}
checkpoint,
recursion + 1,
check_only)) {
- gboolean found = FALSE;
- guint j;
- struct rspamd_symcache_item *tmp_it;
-
- PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) {
- if (item->id == tmp_it->id) {
- found = TRUE;
- break;
- }
- }
-
- if (!found) {
- g_ptr_array_add (checkpoint->waitq, item);
- }
ret = FALSE;
msg_debug_task ("delayed dependency %d(%s) for "
}
else if (!rspamd_symbols_cache_check_symbol (task, cache,
dep->item,
- checkpoint,
- NULL)) {
+ checkpoint)) {
/* Now started, but has events pending */
ret = FALSE;
msg_debug_task ("started check of %d(%s) symbol "
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
}
-static void
-rspamd_symbols_cache_tm (gint fd, short what, void *data)
-{
- struct rspamd_task *task = data;
-
- rspamd_session_remove_event (task->s, rspamd_symbols_cache_continuation,
- data);
-}
-
static struct cache_savepoint *
rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task,
struct symbols_cache *cache)
/* Bit 0: check started, Bit 1: check finished */
checkpoint->processed_bits = rspamd_mempool_alloc0 (task->task_pool,
NBYTES (cache->used_items) * 2);
- checkpoint->waitq = g_ptr_array_new ();
g_assert (cache->items_by_order != NULL);
checkpoint->version = cache->items_by_order->d->len;
checkpoint->order = cache->items_by_order;
REF_RETAIN (checkpoint->order);
rspamd_mempool_add_destructor (task->task_pool,
rspamd_symbols_cache_order_unref, checkpoint->order);
- rspamd_mempool_add_destructor (task->task_pool,
- rspamd_ptr_array_free_hard, checkpoint->waitq);
checkpoint->pass = RSPAMD_CACHE_PASS_INIT;
task->checkpoint = checkpoint;
struct rspamd_symcache_item *item = NULL;
struct cache_savepoint *checkpoint;
gint i;
- gdouble total_ticks = 0;
gboolean all_done;
gint saved_priority;
- const gdouble max_ticks = 3e8;
guint start_events_pending;
g_assert (cache != NULL);
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_ticks);
+ checkpoint);
}
}
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_ticks);
- }
-
- if (total_ticks > max_ticks) {
- /* Maybe we should stop and check pending events? */
- if (rspamd_session_events_pending (task->s) > start_events_pending) {
- /* Add some timeout event to avoid too long waiting */
-#if 0
- struct event *ev;
- struct timeval tv;
-
- rspamd_session_add_event (task->s,
- rspamd_symbols_cache_continuation, task,
- rspamd_symbols_cache_quark ());
- ev = rspamd_mempool_alloc (task->task_pool, sizeof (*ev));
- event_set (ev, -1, EV_TIMEOUT, rspamd_symbols_cache_tm, task);
- event_base_set (task->ev_base, ev);
- msec_to_tv (50, &tv);
- event_add (ev, &tv);
- rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)event_del, ev);
-#endif
- msg_info_task ("trying to check async events after spending "
- "%.0f ticks processing symbols",
- total_ticks);
-
- return TRUE;
- }
+ checkpoint);
}
}
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_ticks);
- }
-
- if (total_ticks > max_ticks) {
- /* Maybe we should stop and check pending events? */
- if (rspamd_session_events_pending (task->s) >
- start_events_pending) {
- msg_debug_task ("trying to check async events after spending "
- "%.0f microseconds processing symbols",
- total_ticks);
- return TRUE;
- }
+ checkpoint);
}
}
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_ticks);
+ checkpoint);
}
}
checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS;
}
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_ticks);
+ checkpoint);
}
}
checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
func (item->id, item->symbol, item->type, ud);
}
}
+
+struct rspamd_symcache_item *
+rspamd_symbols_cache_get_cur_item (struct rspamd_task *task)
+{
+ struct cache_savepoint *checkpoint = task->checkpoint;
+
+ return checkpoint->cur_item;
+}
+
+/**
+ * Replaces the current item being processed.
+ * Returns the current item being processed (if any)
+ * @param task
+ * @param item
+ * @return
+ */
+struct rspamd_symcache_item *
+rspamd_symbols_cache_set_cur_item (struct rspamd_task *task,
+ struct rspamd_symcache_item *item)
+{
+ struct cache_savepoint *checkpoint = task->checkpoint;
+ struct rspamd_symcache_item *ex;
+
+ ex = checkpoint->cur_item;
+ checkpoint->cur_item = item;
+
+ return ex;
+}
+
+
+/**
+ * Finalize the current async element potentially calling its deps
+ */
+void
+rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
+ struct rspamd_symcache_item *item)
+{
+ struct cache_savepoint *checkpoint = task->checkpoint;
+ struct rspamd_symcache_item *rdep;
+ gdouble total_ticks = 0, t2, diff;
+ guint i;
+ struct timeval tv;
+ const gdouble slow_diff_limit = 0.1;
+
+ setbit (checkpoint->processed_bits, item->id + 1);
+ g_assert (checkpoint->items_inflight > 0);
+ checkpoint->items_inflight --;
+
+#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
+ event_base_update_cache_time (task->ev_base);
+ event_base_gettimeofday_cached (task->ev_base, &tv);
+ t2 = tv_to_double (&tv);
+#else
+ t2 = rspamd_get_ticks (FALSE);
+#endif
+
+ diff = (t2 - item->start_ticks);
+
+ if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
+ rspamd_task_profile_set (task, item->symbol, diff);
+ }
+
+ if (!(item->type & SYMBOL_TYPE_SQUEEZED)) {
+ if (diff > slow_diff_limit) {
+ msg_info_task ("slow rule: %s: %.2f ms", item->symbol,
+ diff * 1000);
+ }
+
+ if (rspamd_worker_is_scanner (task->worker)) {
+ rspamd_set_counter (item->cd, diff);
+ }
+ }
+
+ /* Process all reverse dependencies */
+ PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
+ rspamd_symbols_cache_check_symbol (task, task->cfg->cache,
+ rdep,
+ checkpoint);
+ }
+}
\ No newline at end of file