]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Further cleanup from the watchers
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 13:48:06 +0000 (14:48 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 18:43:32 +0000 (19:43 +0100)
src/libserver/symbols_cache.c
src/lua/lua_config.c

index 216f8aabf0abfd0b6f1be0f387f96e105850c6a6..5d30f46dcb0da7411a1028cf0f828273c63a6159 100644 (file)
@@ -1293,51 +1293,6 @@ rspamd_symbols_cache_metric_limit (struct rspamd_task *task,
        return FALSE;
 }
 
-static void
-rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
-{
-       struct rspamd_task *task = sessiond;
-       struct rspamd_symcache_item *item = ud, *it;
-       struct cache_savepoint *checkpoint;
-       struct symbols_cache *cache;
-       gint i, remain = 0;
-
-       checkpoint = task->checkpoint;
-       cache = task->cfg->cache;
-
-       /* Specify that we are done with this item */
-       setbit (checkpoint->processed_bits, item->id * 2 + 1);
-
-       if (checkpoint->pass > 0) {
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
-               event_base_update_cache_time (task->ev_base);
-#endif
-               for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
-                       it = g_ptr_array_index (checkpoint->waitq, i);
-
-                       if (!isset (checkpoint->processed_bits, it->id * 2)) {
-                               if (!rspamd_symbols_cache_check_deps (task, cache, it,
-                                               checkpoint, 0, TRUE)) {
-                                       remain ++;
-                               }
-                               else {
-                                       msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)",
-                                                       item->id,
-                                                       item->symbol,
-                                                       it->id,
-                                                       it->symbol);
-                                       rspamd_symbols_cache_check_symbol (task, cache, it,
-                                                       checkpoint);
-                               }
-                       }
-               }
-       }
-
-       msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting",
-                       item->id, item->symbol,
-                       remain);
-}
-
 static gboolean
 rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                struct symbols_cache *cache,
@@ -1391,10 +1346,11 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
 #endif
                        item->start_ticks = t1;
                        item->async_events = 0;
+                       g_assert (checkpoint->cur_item == NULL);
+                       checkpoint->cur_item = item;
                        checkpoint->items_inflight ++;
                        /* Callback now must finalize itself */
                        item->func (task, item, item->user_data);
-                       rspamd_session_watch_stop (task->s);
 
                        if (checkpoint->items_inflight == 0) {
                                return TRUE;
@@ -1778,17 +1734,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                                        "resolved",
                                                        item->id, item->symbol);
 
-                                       PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) {
-                                               if (item->id == tmp_it->id) {
-                                                       found = TRUE;
-                                                       break;
-                                               }
-                                       }
-
-                                       if (!found) {
-                                               g_ptr_array_add (checkpoint->waitq, item);
-                                       }
-
                                        continue;
                                }
 
@@ -1801,22 +1746,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                break;
 
        case RSPAMD_CACHE_PASS_WAIT_FILTERS:
-               /* We just go through the blocked symbols and check if they are ready */
-               for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
-                       item = g_ptr_array_index (checkpoint->waitq, i);
-
-                       if (!isset (checkpoint->processed_bits, item->id * 2)) {
-                               if (!rspamd_symbols_cache_check_deps (task, cache, item,
-                                               checkpoint, 0, FALSE)) {
-                                       break;
-                               }
-
-                               rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint);
-                       }
-               }
-
-               if (checkpoint->waitq->len == 0 ||
+               if (checkpoint->items_inflight == 0 ||
                                stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
                        checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
                }
@@ -1879,7 +1809,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                        checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
                }
 
-               if (checkpoint->waitq->len == 0 ||
+               if (checkpoint->items_inflight == 0 ||
                                stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
                        checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
                }
@@ -2644,6 +2574,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
        msg_debug_cache_task ("process finalize for item %s", item->symbol);
        setbit (checkpoint->processed_bits, item->id + 1);
        checkpoint->items_inflight --;
+       checkpoint->cur_item = NULL;
 
 #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
        event_base_update_cache_time (task->ev_base);
index 1ed13245e8989d87698df0020019262d0da883ee..4042515923e2fa1a07dee9b8c16ff0ed92ec6a73 100644 (file)
@@ -1055,13 +1055,11 @@ struct lua_callback_data {
                gint ref;
        } callback;
        gboolean cb_is_ref;
+
+       /* Dynamic data */
        gint stack_level;
        gint order;
-};
-
-struct lua_watcher_data {
-       struct lua_callback_data *cbd;
-       gint cb_ref;
+       struct rspamd_symcache_item *item;
 };
 
 /*
@@ -1093,120 +1091,12 @@ rspamd_compare_order_func (gconstpointer a, gconstpointer b)
        return cb2->order - cb1->order;
 }
 
-static void
-lua_watcher_callback (gpointer session_data, gpointer ud)
-{
-       struct rspamd_task *task = session_data, **ptask;
-       struct lua_watcher_data *wd = ud;
-       lua_State *L;
-       gint level, nresults, err_idx, ret;
-       GString *tb;
-       struct rspamd_symbol_result *s;
-
-       L = wd->cbd->L;
-       level = lua_gettop (L);
-       lua_pushcfunction (L, &rspamd_lua_traceback);
-       err_idx = lua_gettop (L);
+static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry,
+                                                                                          int ret);
 
-       level ++;
-       lua_rawgeti (L, LUA_REGISTRYINDEX, wd->cb_ref);
-
-       ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
-       rspamd_lua_setclass (L, "rspamd{task}", -1);
-       *ptask = task;
-
-       if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) {
-               tb = lua_touserdata (L, -1);
-               msg_err_task ("call to (%s) failed (%d): %v",
-                               wd->cbd->symbol, ret, tb);
-
-               if (tb) {
-                       g_string_free (tb, TRUE);
-               }
-       }
-       else {
-               nresults = lua_gettop (L) - level;
-
-               if (nresults >= 1) {
-                       /* Function returned boolean, so maybe we need to insert result? */
-                       gint res = 0;
-                       gint i;
-                       gdouble flag = 1.0;
-                       gint type;
-                       struct lua_watcher_data *nwd;
-
-                       type = lua_type (L, level + 1);
-
-                       if (type == LUA_TBOOLEAN) {
-                               res = lua_toboolean (L, level + 1);
-                       }
-                       else if (type == LUA_TFUNCTION) {
-                               /* Function returned a closure that should be watched for */
-                               nwd = rspamd_mempool_alloc (task->task_pool, sizeof (*nwd));
-                               lua_pushvalue (L, level + 1);
-                               nwd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
-                               nwd->cbd = wd->cbd;
-                               rspamd_session_watcher_push_callback (task->s,
-                                               rspamd_session_get_watcher (task->s),
-                                               lua_watcher_callback, nwd);
-                               /*
-                                * We immediately pop watcher since we have not registered
-                                * any async events from here
-                                */
-                               rspamd_session_watcher_pop (task->s,
-                                               rspamd_session_get_watcher (task->s));
-                       }
-                       else {
-                               res = (gint)lua_tonumber (L, level + 1);
-                       }
-
-                       if (res) {
-                               gint first_opt = 2;
-
-                               if (lua_type (L, level + 2) == LUA_TNUMBER) {
-                                       flag = lua_tonumber (L, level + 2);
-                                       /* Shift opt index */
-                                       first_opt = 3;
-                               }
-                               else {
-                                       flag = res;
-                               }
-
-                               s = rspamd_task_insert_result (task,
-                                               wd->cbd->symbol, flag, NULL);
-
-                               if (s) {
-                                       guint last_pos = lua_gettop (L);
-
-                                       for (i = level + first_opt; i <= last_pos; i++) {
-                                               if (lua_type (L, i) == LUA_TSTRING) {
-                                                       const char *opt = lua_tostring (L, i);
-
-                                                       rspamd_task_add_result_option (task, s, opt);
-                                               }
-                                               else if (lua_type (L, i) == LUA_TTABLE) {
-                                                       lua_pushvalue (L, i);
-
-                                                       for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
-                                                               const char *opt = lua_tostring (L, -1);
-
-                                                               rspamd_task_add_result_option (task, s, opt);
-                                                       }
-
-                                                       lua_pop (L, 1);
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-
-       lua_settop (L, err_idx - 1);
-}
-
-static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret);
-
-static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg);
+static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry,
+                                                                                         int ret,
+                                                                                         const char *msg);
 
 static void
 lua_metric_symbol_callback (struct rspamd_task *task,
@@ -1217,6 +1107,7 @@ lua_metric_symbol_callback (struct rspamd_task *task,
        struct rspamd_task **ptask;
        struct thread_entry *thread_entry;
 
+       rspamd_symcache_item_async_inc (task, item);
        thread_entry = lua_thread_pool_get_for_task (task);
 
        g_assert(thread_entry->cd == NULL);
@@ -1224,6 +1115,7 @@ lua_metric_symbol_callback (struct rspamd_task *task,
 
        lua_State *thread = thread_entry->lua_state;
        cd->stack_level = lua_gettop (thread);
+       cd->item = item;
 
        if (cd->cb_is_ref) {
                lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1243,11 +1135,15 @@ lua_metric_symbol_callback (struct rspamd_task *task,
 }
 
 static void
-lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg)
+lua_metric_symbol_callback_error (struct thread_entry *thread_entry,
+                                                                 int ret,
+                                                                 const char *msg)
 {
        struct lua_callback_data *cd = thread_entry->cd;
        struct rspamd_task *task = thread_entry->task;
        msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg);
+
+       rspamd_symcache_item_async_dec_check (task, cd->item);
 }
 
 static void
@@ -1270,7 +1166,6 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
                gint i;
                gdouble flag = 1.0;
                gint type;
-               struct lua_watcher_data *wd;
 
                type = lua_type (L, cd->stack_level + 1);
 
@@ -1278,20 +1173,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
                        res = lua_toboolean (L, cd->stack_level + 1);
                }
                else if (type == LUA_TFUNCTION) {
-                       /* Function returned a closure that should be watched for */
-                       wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
-                       lua_pushvalue (L /*cd->L*/, cd->stack_level + 1);
-                       wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
-                       wd->cbd = cd;
-                       rspamd_session_watcher_push_callback (task->s,
-                                       rspamd_session_get_watcher (task->s),
-                                       lua_watcher_callback, wd);
-                       /*
-                        * We immediately pop watcher since we have not registered
-                        * any async events from here
-                        */
-                       rspamd_session_watcher_pop (task->s,
-                                       rspamd_session_get_watcher (task->s));
+                       g_assert_not_reached ();
                }
                else {
                        res = lua_tonumber (L, cd->stack_level + 1);
@@ -1342,6 +1224,7 @@ lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
        g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */
 
        cd->stack_level = 0;
+       rspamd_symcache_item_async_dec_check (task, cd->item);
 }
 
 static gint