]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Adopt lua redis
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 16:01:10 +0000 (17:01 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 20 Oct 2018 18:43:32 +0000 (19:43 +0100)
src/lua/lua_redis.c

index 65c0609f93ccca683cef2226541c9da7173cf78a..8d884fab03fd58b6d6850aad70febb74535ff414 100644 (file)
@@ -94,12 +94,12 @@ struct lua_redis_request_specific_userdata;
 struct lua_redis_userdata {
        redisAsyncContext *ctx;
        struct rspamd_task *task;
+       struct rspamd_symcache_item *item;
        struct rspamd_async_session *s;
        struct event_base *ev_base;
        struct rspamd_config *cfg;
        struct rspamd_redis_pool *pool;
        gchar *server;
-       gchar *reqline;
        struct lua_redis_request_specific_userdata *specific;
        gdouble timeout;
        guint16 port;
@@ -119,7 +119,6 @@ struct lua_redis_request_specific_userdata {
        guint nargs;
        gchar **args;
        gsize *arglens;
-       struct rspamd_async_watcher *w;
        struct lua_redis_userdata *c;
        struct lua_redis_ctx *ctx;
        struct lua_redis_request_specific_userdata *next;
@@ -140,8 +139,9 @@ struct lua_redis_ctx {
 struct lua_redis_result {
        gboolean is_error;
        gint result_ref;
-       struct rspamd_async_watcher *w;
+       struct rspamd_symcache_item *item;
        struct rspamd_async_session *s;
+       struct rspamd_task *task;
        struct lua_redis_request_specific_userdata *sp_ud;
 };
 
@@ -291,7 +291,10 @@ lua_redis_push_error (const gchar *err,
                sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
 
                if (connected && ud->s) {
-                       rspamd_session_watcher_pop (ud->s, sp_ud->w);
+                       if (ud->item) {
+                               rspamd_symcache_item_async_dec_check (ud->task, ud->item);
+                       }
+
                        rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
                }
                else {
@@ -374,7 +377,10 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
                sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
 
                if (ud->s) {
-                       rspamd_session_watcher_pop (ud->s, sp_ud->w);
+                       if (ud->item) {
+                               rspamd_symcache_item_async_dec_check (ud->task, ud->item);
+                       }
+
                        rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
                }
                else {
@@ -491,7 +497,10 @@ lua_redis_cleanup_events (struct lua_redis_ctx *ctx)
        while (!g_queue_is_empty (ctx->events_cleanup)) {
                struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup);
 
-               rspamd_session_watcher_pop (result->s, result->w);
+               if (result->item) {
+                       rspamd_symcache_item_async_dec_check (result->task, result->item);
+               }
+
                rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud);
 
                g_free (result);
@@ -584,7 +593,8 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
 
        result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
        result->s = ud->s;
-       result->w = sp_ud->w;
+       result->item = ud->item;
+       result->task = ud->task;
        result->sp_ud = sp_ud;
 
        g_queue_push_tail (ctx->replies, result);
@@ -911,12 +921,17 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy
                                ctx->events_cleanup = g_queue_new ();
 
                        }
+
                        ud->s = session;
                        ud->cfg = cfg;
                        ud->pool = cfg->redis_pool;
                        ud->ev_base = ev_base;
                        ud->task = task;
 
+                       if (task) {
+                               ud->item = rspamd_symbols_cache_get_cur_item (task);
+                       }
+
                        ret = TRUE;
                }
                else {
@@ -1020,6 +1035,7 @@ lua_redis_make_request (lua_State *L)
                                &sp_ud->nargs);
                lua_pop (L, 1);
                LL_PREPEND (ud->specific, sp_ud);
+
                ret = redisAsyncCommandArgv (ud->ctx,
                                lua_redis_callback,
                                sp_ud,
@@ -1029,12 +1045,13 @@ lua_redis_make_request (lua_State *L)
 
                if (ret == REDIS_OK) {
                        if (ud->s) {
-                               rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
-                               sp_ud->w = rspamd_session_get_watcher (ud->s);
-                               rspamd_session_watcher_push (ud->s);
-                       }
-                       else {
-                               sp_ud->w = NULL;
+                               rspamd_session_add_event (ud->s,
+                                               lua_redis_fin, sp_ud,
+                                               g_quark_from_static_string ("lua redis"));
+
+                               if (ud->item) {
+                                       rspamd_symcache_item_async_inc (ud->task, ud->item);
+                               }
                        }
 
                        REDIS_RETAIN (ctx); /* Cleared by fin event */
@@ -1396,18 +1413,27 @@ lua_redis_add_cmd (lua_State *L)
 
                if (ret == REDIS_OK) {
                        if (ud->s) {
-                               rspamd_session_add_event (ud->s, NULL, lua_redis_fin, sp_ud, g_quark_from_static_string ("lua redis"));
-                               sp_ud->w = rspamd_session_get_watcher (ud->s);
-                               rspamd_session_watcher_push (ud->s);
+                               rspamd_session_add_event (ud->s,
+                                               lua_redis_fin,
+                                               sp_ud,
+                                               g_quark_from_static_string ("lua redis"));
+
+                               if (ud->item) {
+                                       rspamd_symcache_item_async_inc (ud->task, ud->item);
+                               }
                        }
 
                        double_to_tv (sp_ud->c->timeout, &tv);
+
                        if (IS_ASYNC (ctx)) {
-                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
+                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
+                                               lua_redis_timeout, sp_ud);
                        }
                        else {
-                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout_sync, sp_ud);
+                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT,
+                                               lua_redis_timeout_sync, sp_ud);
                        }
+
                        event_base_set (ud->ev_base, &sp_ud->timeout);
                        event_add (&sp_ud->timeout, &tv);
                        REDIS_RETAIN (ctx);