]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Implement counter for async events in symcache item
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 12:42:37 +0000 (13:42 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 18:43:32 +0000 (19:43 +0100)
src/libserver/symbols_cache.c
src/libserver/symbols_cache.h
src/plugins/surbl.c

index 90fbf1df898b8b879dc64daf7c06659bf70a7f61..b5db5c7f9658305e1b45bb590d668961a1d1db4d 100644 (file)
         rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \
         G_STRFUNC, \
         __VA_ARGS__)
+#define msg_debug_cache_task(...)  rspamd_conditional_debug_fast (NULL, NULL, \
+        rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
 
 INIT_LOG_MODULE(symcache)
 
@@ -104,6 +108,7 @@ struct rspamd_symcache_item {
 
        /* Per process counter */
        gdouble start_ticks;
+       guint async_events;
        struct rspamd_counter_data *cd;
        gchar *symbol;
        enum rspamd_symbol_type type;
@@ -1316,7 +1321,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
                                        remain ++;
                                }
                                else {
-                                       msg_debug_task ("watcher for %d(%s), unblocked item %d(%s)",
+                                       msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)",
                                                        item->id,
                                                        item->symbol,
                                                        it->id,
@@ -1328,7 +1333,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
                }
        }
 
-       msg_debug_task ("finished watcher for %d(%s), %ud symbols waiting",
+       msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting",
                        item->id, item->symbol,
                        remain);
 }
@@ -1374,7 +1379,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                }
 
                if (check) {
-                       msg_debug_task ("execute %s, %d", item->symbol, item->id);
+                       msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
 #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
                        struct timeval tv;
 
@@ -1385,6 +1390,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                        t1 = rspamd_get_ticks (FALSE);
 #endif
                        item->start_ticks = t1;
+                       item->async_events = 0;
                        checkpoint->items_inflight ++;
                        /* Callback now must finalize itself */
                        item->func (task, item, item->user_data);
@@ -1397,7 +1403,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                        return FALSE;
                }
                else {
-                       msg_debug_task ("skipping check of %s as its start condition is false",
+                       msg_debug_cache_task ("skipping check of %s as its start condition is false",
                                        item->symbol);
                        setbit (checkpoint->processed_bits, item->id * 2 + 1);
 
@@ -1438,7 +1444,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
 
                        if (dep->item == NULL) {
                                /* Assume invalid deps as done */
-                               msg_debug_task ("symbol %d(%s) has invalid dependencies on %d(%s)",
+                               msg_debug_cache_task ("symbol %d(%s) has invalid dependencies on %d(%s)",
                                                item->id, item->symbol, dep->id, dep->sym);
                                continue;
                        }
@@ -1454,7 +1460,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                                                check_only)) {
 
                                                        ret = FALSE;
-                                                       msg_debug_task ("delayed dependency %d(%s) for "
+                                                       msg_debug_cache_task ("delayed dependency %d(%s) for "
                                                                                        "symbol %d(%s)",
                                                                        dep->id, dep->sym, item->id, item->symbol);
                                                }
@@ -1463,19 +1469,19 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                                                checkpoint)) {
                                                        /* Now started, but has events pending */
                                                        ret = FALSE;
-                                                       msg_debug_task ("started check of %d(%s) symbol "
+                                                       msg_debug_cache_task ("started check of %d(%s) symbol "
                                                                                        "as dep for "
                                                                                        "%d(%s)",
                                                                        dep->id, dep->sym, item->id, item->symbol);
                                                }
                                                else {
-                                                       msg_debug_task ("dependency %d(%s) for symbol %d(%s) is "
+                                                       msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is "
                                                                        "already processed",
                                                                        dep->id, dep->sym, item->id, item->symbol);
                                                }
                                        }
                                        else {
-                                               msg_debug_task ("dependency %d(%s) for symbol %d(%s) "
+                                               msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) "
                                                                                "cannot be started now",
                                                                dep->id, dep->sym,
                                                                item->id, item->symbol);
@@ -1484,7 +1490,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                }
                                else {
                                        /* Started but not finished */
-                                       msg_debug_task ("dependency %d(%s) for symbol %d(%s) is "
+                                       msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is "
                                                                        "still executing",
                                                        dep->id, dep->sym,
                                                        item->id, item->symbol);
@@ -1492,7 +1498,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                }
                        }
                        else {
-                               msg_debug_task ("dependency %d(%s) for symbol %d(%s) is already "
+                               msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is already "
                                                "checked",
                                                dep->id, dep->sym,
                                                item->id, item->symbol);
@@ -1671,7 +1677,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
        }
 
-       msg_debug_task ("symbols processing stage at pass: %d", checkpoint->pass);
+       msg_debug_cache_task ("symbols processing stage at pass: %d", checkpoint->pass);
        start_events_pending = rspamd_session_events_pending (task->s);
 
        switch (checkpoint->pass) {
@@ -1768,7 +1774,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                        guint j;
                                        struct rspamd_symcache_item *tmp_it;
 
-                                       msg_debug_task ("blocked execution of %d(%s) unless deps are "
+                                       msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
                                                        "resolved",
                                                        item->id, item->symbol);
 
@@ -2367,10 +2373,10 @@ rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task,
                        setbit (checkpoint->processed_bits, item->id * 2);
                        setbit (checkpoint->processed_bits, item->id * 2 + 1);
 
-                       msg_debug_task ("disable execution of %s", symbol);
+                       msg_debug_cache_task ("disable execution of %s", symbol);
                }
                else {
-                       msg_debug_task ("skip squeezed symbol %s", symbol);
+                       msg_debug_cache_task ("skip squeezed symbol %s", symbol);
                }
        }
        else {
@@ -2403,7 +2409,7 @@ rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task,
                clrbit (checkpoint->processed_bits, item->id * 2);
                clrbit (checkpoint->processed_bits, item->id * 2 + 1);
 
-               msg_debug_task ("enable execution of %s (%d)", symbol, id);
+               msg_debug_cache_task ("enable execution of %s (%d)", symbol, id);
        }
        else {
                msg_info_task ("cannot enable %s: not found", symbol);
@@ -2631,8 +2637,12 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
        struct timeval tv;
        const gdouble slow_diff_limit = 0.1;
 
-       setbit (checkpoint->processed_bits, item->id + 1);
+       /* Sanity checks */
        g_assert (checkpoint->items_inflight > 0);
+       g_assert (item->async_events == 0);
+
+       msg_debug_cache_task ("process post"
+       setbit (checkpoint->processed_bits, item->id + 1);
        checkpoint->items_inflight --;
 
 #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
@@ -2666,4 +2676,24 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
                                rdep,
                                checkpoint);
        }
+}
+
+guint
+rspamd_symcahe_item_async_inc (struct rspamd_task *task,
+               struct rspamd_symcache_item *item)
+{
+       msg_debug_cache_task ("increase async events counter for %s = %d + 1",
+                       item->symbol, item->async_events);
+       return ++item->async_events;
+}
+
+guint
+rspamd_symcahe_item_async_dec (struct rspamd_task *task,
+               struct rspamd_symcache_item *item)
+{
+       msg_debug_cache_task ("decrease async events counter for %s = %d - 1",
+                       item->symbol, item->async_events);
+       g_assert (item->async_events > 0);
+
+       return --item->async_events;
 }
\ No newline at end of file
index f8764ee8c4a9a21e965ad15dfc424fd86abf01e0..adf7d2b26983af48e1c0d51f03ea05e83e41febb 100644 (file)
@@ -331,4 +331,15 @@ struct rspamd_symcache_item *rspamd_symbols_cache_set_cur_item (struct rspamd_ta
  */
 void rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
                                                                                 struct rspamd_symcache_item *item);
+
+/*
+ * Increase number of async events pending for an item
+ */
+guint rspamd_symcahe_item_async_inc (struct rspamd_task *task,
+               struct rspamd_symcache_item *item);
+/*
+ * Decrease number of async events pending for an item, asserts if no events pending
+ */
+guint rspamd_symcahe_item_async_dec (struct rspamd_task *task,
+               struct rspamd_symcache_item *item);
 #endif
index fa4b9216243073b0bf96b8528b691445757a7d50..88c6a0823f2797806022203f1407a3cf967fbe09 100644 (file)
@@ -108,7 +108,7 @@ struct dns_param {
        struct rspamd_task *task;
        gchar *host_resolve;
        struct suffix_item *suffix;
-       struct rspamd_async_watcher *w;
+       struct rspamd_symcache_item *item;
        struct surbl_module_ctx *ctx;
 };
 
@@ -120,7 +120,7 @@ struct redirector_param {
        struct rspamd_http_connection *conn;
        GHashTable *tree;
        struct suffix_item *suffix;
-       struct rspamd_async_watcher *w;
+       struct rspamd_symcache_item *item;
        gint sock;
        guint redirector_requests;
 };
@@ -1323,6 +1323,7 @@ format_surbl_request (rspamd_mempool_t * pool,
 
 static void
 make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task,
+                                        struct rspamd_symcache_item *item,
                                         struct suffix_item *suffix,
                                         gboolean forced, GHashTable *tree,
                                         struct surbl_ctx *surbl_module_ctx)
@@ -1379,8 +1380,7 @@ make_surbl_requests (struct rspamd_url *url, struct rspamd_task *task,
                        if (make_dns_request_task (task,
                                        surbl_dns_ip_callback,
                                        (void *) param, RDNS_REQUEST_A, surbl_req)) {
-                               param->w = rspamd_session_get_watcher (task->s);
-                               rspamd_session_watcher_push (task->s);
+                               param->item = item;
                        }
                }
        }