]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Adopt lua tcp
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 16:23:45 +0000 (17:23 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 18:43:32 +0000 (19:43 +0100)
src/lua/lua_tcp.c

index f9c1a477d151626748aa798b4c32d960d938fc2b..c6e96825b61723c00f190fbd8ec199086de4845b 100644 (file)
@@ -336,11 +336,11 @@ struct lua_tcp_cbdata {
        guint port;
        guint flags;
        gchar tag[7];
-       struct rspamd_async_watcher *w;
        struct event ev;
        struct lua_tcp_dtor *dtors;
        ref_entry_t ref;
        struct rspamd_task *task;
+       struct rspamd_symcache_item *item;
        struct thread_entry *thread;
        struct rspamd_config *cfg;
        gboolean eof;
@@ -482,10 +482,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
                 * Object is owned by lua and will be destroyed on __gc()
                 */
 
-               if (cbd->w) {
-                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+               if (cbd->item) {
+                       rspamd_symcache_item_async_dec_check (cbd->task, cbd->item);
+                       cbd->item = NULL;
                }
-               cbd->w = NULL;
 
                if (cbd->async_ev) {
                        rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd);
@@ -494,10 +494,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
                cbd->async_ev = NULL;
        }
        else {
-               if (cbd->w) {
-                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+               if (cbd->item) {
+                       rspamd_symcache_item_async_dec_check (cbd->task, cbd->item);
+                       cbd->item = NULL;
                }
-               cbd->w = NULL;
 
                if (cbd->async_ev) {
                        rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
@@ -525,7 +525,6 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
        struct lua_tcp_handler *hdl;
        gint cbref, top;
        struct lua_callback_state cbs;
-       struct rspamd_async_watcher *existing_watcher = NULL;
        lua_State *L;
 
        if (cbd->thread) {
@@ -572,21 +571,14 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
                        rspamd_lua_setclass (L, "rspamd{tcp}", -1);
                        TCP_RETAIN (cbd);
 
-                       if (cbd->w) {
-                               /* Replace watcher to deal with nested calls */
-                               existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+                       if (cbd->item) {
+                               rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
                        }
 
                        if (lua_pcall (L, 3, 0, 0) != 0) {
                                msg_info ("callback call failed: %s", lua_tostring (L, -1));
                        }
 
-
-                       if (cbd->w) {
-                               /* Restore existing watcher */
-                               rspamd_session_replace_watcher (cbd->session, existing_watcher);
-                       }
-
                        lua_settop (L, top);
 
                        TCP_RELEASE (cbd);
@@ -617,7 +609,6 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        gint cbref, arg_cnt, top;
        struct lua_callback_state cbs;
        lua_State *L;
-       struct rspamd_async_watcher *existing_watcher = NULL;
 
        if (cbd->thread) {
                lua_tcp_resume_thread (cbd, str, len);
@@ -663,20 +654,14 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 
                TCP_RETAIN (cbd);
 
-               if (cbd->w) {
-                       /* Replace watcher to deal with nested calls */
-                       existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+               if (cbd->item) {
+                       rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
                }
 
                if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
                        msg_info ("callback call failed: %s", lua_tostring (L, -1));
                }
 
-               if (cbd->w) {
-                       /* Restore existing watcher */
-                       rspamd_session_replace_watcher (cbd->session, existing_watcher);
-               }
-
                lua_settop (L, top);
                TCP_RELEASE (cbd);
        }
@@ -720,7 +705,6 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 
        lua_State *L = cbd->thread->lua_state;
        struct lua_tcp_handler *hdl;
-       struct rspamd_async_watcher *existing_watcher = NULL;
 
        hdl = g_queue_peek_head (cbd->handlers);
 
@@ -735,18 +719,12 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        lua_tcp_shift_handler (cbd);
        lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread);
 
-       if (cbd->w) {
-               /* Replace watcher to deal with nested calls */
-               existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+       if (cbd->item) {
+               rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
        }
 
        lua_thread_resume (cbd->thread, 2);
 
-       if (cbd->w) {
-               /* Restore existing watcher */
-               rspamd_session_replace_watcher (cbd->session, existing_watcher);
-       }
-
        TCP_RELEASE (cbd);
 }
 
@@ -1042,7 +1020,6 @@ lua_tcp_handler (int fd, short what, gpointer ud)
                                if (cbd->connect_cb != -1) {
                                        struct lua_tcp_cbdata **pcbd;
                                        gint top;
-                                       struct rspamd_async_watcher *existing_watcher = NULL;
 
                                        lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
                                        L = cbs.L;
@@ -1054,26 +1031,16 @@ lua_tcp_handler (int fd, short what, gpointer ud)
                                        TCP_RETAIN (cbd);
                                        rspamd_lua_setclass (L, "rspamd{tcp}", -1);
 
-                                       if (cbd->w) {
-                                               /* Replace watcher to deal with nested calls */
-                                               existing_watcher = rspamd_session_replace_watcher (
-                                                               cbd->session, cbd->w);
+                                       if (cbd->item) {
+                                               rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
                                        }
 
                                        if (lua_pcall (L, 1, 0, 0) != 0) {
                                                msg_info ("callback call failed: %s", lua_tostring (L, -1));
                                        }
 
-                                       if (cbd->w) {
-                                               /* Restore existing watcher */
-                                               rspamd_session_replace_watcher (cbd->session,
-                                                               existing_watcher);
-                                       }
-
                                        lua_settop (L, top);
-
                                        TCP_RELEASE (cbd);
-
                                        lua_thread_pool_restore_callback (&cbs);
                                }
                        }
@@ -1194,7 +1161,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
        if (cbd->session) {
                event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin;
 
-               cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd,
+               cbd->async_ev = rspamd_session_add_event (cbd->session, fin, cbd,
                                g_quark_from_static_string ("lua tcp"));
 
                if (!cbd->async_ev) {
@@ -1208,12 +1175,8 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
 static void
 lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd)
 {
-       if (cbd->session) {
-               cbd->w = rspamd_session_get_watcher (cbd->session);
-
-               if (cbd->w) {
-                       rspamd_session_watcher_push (cbd->session);
-               }
+       if (cbd->item) {
+               rspamd_symcache_item_async_inc (cbd->task, cbd->item);
        }
 }
 
@@ -1590,6 +1553,11 @@ lua_tcp_request (lua_State *L)
        }
 
        cbd->task = task;
+
+       if (task) {
+               cbd->item = rspamd_symbols_cache_get_cur_item (task);
+       }
+
        cbd->cfg = cfg;
        h = rspamd_random_uint64_fast ();
        rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);