]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Add timer update before timer setting
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Jan 2025 13:39:04 +0000 (13:39 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Jan 2025 13:39:04 +0000 (13:39 +0000)
src/lua/lua_redis.c

index d20c496ed98fe98bc996f3882fb15a139429d274..491007df34fe27acf6ac26cbe3cbb59b1cf4456f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2024 Vsevolod Stakhov
+ * Copyright 2025 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -130,7 +130,7 @@ struct lua_redis_request_specific_userdata {
        unsigned int nargs;
        char **args;
        gsize *arglens;
-       struct lua_redis_userdata *c;
+       struct lua_redis_userdata *common_ud;
        struct lua_redis_ctx *ctx;
        struct lua_redis_request_specific_userdata *next;
        ev_timer timeout_ev;
@@ -262,7 +262,7 @@ lua_redis_fin(void *arg)
        struct lua_redis_ctx *ctx;
 
        ctx = sp_ud->ctx;
-       ud = sp_ud->c;
+       ud = sp_ud->common_ud;
 
        if (ev_can_stop(&sp_ud->timeout_ev)) {
                ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
@@ -290,7 +290,7 @@ lua_redis_push_error(const char *err,
                                         gboolean connected,
                                         ...)
 {
-       struct lua_redis_userdata *ud = sp_ud->c;
+       struct lua_redis_userdata *ud = sp_ud->common_ud;
        struct lua_callback_state cbs;
        lua_State *L;
 
@@ -390,7 +390,7 @@ static void
 lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx,
                                        struct lua_redis_request_specific_userdata *sp_ud)
 {
-       struct lua_redis_userdata *ud = sp_ud->c;
+       struct lua_redis_userdata *ud = sp_ud->common_ud;
        struct lua_callback_state cbs;
        lua_State *L;
 
@@ -467,14 +467,14 @@ lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv)
        redisAsyncContext *ac;
 
        ctx = sp_ud->ctx;
-       ud = sp_ud->c;
+       ud = sp_ud->common_ud;
 
        if (ud->terminated || !rspamd_lua_is_initialised()) {
                /* We are already at the termination stage, just go out */
                return;
        }
 
-       msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx,
+       msg_debug_lua_redis("got async reply from redis %p for query %p", sp_ud->common_ud->ctx,
                                                sp_ud);
 
        REDIS_RETAIN(ctx);
@@ -601,7 +601,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
        int results;
 
        ctx = sp_ud->ctx;
-       ud = sp_ud->c;
+       ud = sp_ud->common_ud;
        lua_State *L = ctx->async.cfg->lua_state;
 
        sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -620,7 +620,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
        }
 
        if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
-               msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud);
+               msg_debug_lua_redis("got sync reply from redis: %p for query %p", ac, sp_ud);
 
                struct lua_redis_result *result = g_malloc0(sizeof *result);
 
@@ -653,17 +653,17 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
                /* if error happened, we should terminate the connection,
                   and release it */
 
-               if (result->is_error && sp_ud->c->ctx) {
-                       ac = sp_ud->c->ctx;
+               if (result->is_error && sp_ud->common_ud->ctx) {
+                       ac = sp_ud->common_ud->ctx;
                        /* Set to NULL to avoid double free in dtor */
-                       sp_ud->c->ctx = NULL;
+                       sp_ud->common_ud->ctx = NULL;
                        ctx->flags |= LUA_REDIS_TERMINATED;
 
                        /*
                         * This will call all callbacks pending so the entire context
                         * will be destructed
                         */
-                       rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
+                       rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac,
                                                                                                 RSPAMD_REDIS_RELEASE_FATAL);
                }
 
@@ -679,6 +679,8 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
        ctx->cmds_pending--;
 
        if (ctx->cmds_pending == 0) {
+               msg_debug_lua_redis("no more commands left for: %p for query %p", ac, sp_ud);
+
                if (ctx->thread) {
                        if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
                                /* somebody yielded and waits for results */
@@ -717,16 +719,16 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents)
                return;
        }
 
-       ud = sp_ud->c;
+       ud = sp_ud->common_ud;
        ctx = sp_ud->ctx;
        msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud,
-                                               sp_ud->c->ctx);
+                                               sp_ud->common_ud->ctx);
 
-       if (sp_ud->c->ctx) {
-               ac = sp_ud->c->ctx;
+       if (sp_ud->common_ud->ctx) {
+               ac = sp_ud->common_ud->ctx;
 
                /* Set to NULL to avoid double free in dtor */
-               sp_ud->c->ctx = NULL;
+               sp_ud->common_ud->ctx = NULL;
                ac->err = REDIS_ERR_IO;
                errno = ETIMEDOUT;
                ctx->flags |= LUA_REDIS_TERMINATED;
@@ -735,7 +737,7 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents)
                 * This will call all callbacks pending so the entire context
                 * will be destructed
                 */
-               rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
+               rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac,
                                                                                         RSPAMD_REDIS_RELEASE_FATAL);
        }
 }
@@ -754,24 +756,24 @@ lua_redis_timeout(EV_P_ ev_timer *w, int revents)
        }
 
        ctx = sp_ud->ctx;
-       ud = sp_ud->c;
+       ud = sp_ud->common_ud;
 
        REDIS_RETAIN(ctx);
        msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud,
-                                               sp_ud->c->ctx);
+                                               sp_ud->common_ud->ctx);
        lua_redis_push_error("timeout while connecting the server (%.2f sec)", ctx, sp_ud, TRUE, ud->timeout);
 
-       if (sp_ud->c->ctx) {
-               ac = sp_ud->c->ctx;
+       if (sp_ud->common_ud->ctx) {
+               ac = sp_ud->common_ud->ctx;
                /* Set to NULL to avoid double free in dtor */
-               sp_ud->c->ctx = NULL;
+               sp_ud->common_ud->ctx = NULL;
                ac->err = REDIS_ERR_IO;
                errno = ETIMEDOUT;
                /*
                 * This will call all callbacks pending so the entire context
                 * will be destructed
                 */
-               rspamd_redis_pool_release_connection(sp_ud->c->pool, ac,
+               rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac,
                                                                                         RSPAMD_REDIS_RELEASE_FATAL);
        }
 
@@ -1095,8 +1097,8 @@ rspamd_lua_redis_prepare_connection(lua_State *L, int *pcbref, gboolean is_async
                        return NULL;
                }
 
-               msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p",
-                                                       host, ctx, ud);
+               msg_debug_lua_redis("opened redis connection host=%s; lua_ctx=%p; redis_ctx=%p; ud=%p",
+                                                       host, ctx, ud->ctx, ud);
 
                return ctx;
        }
@@ -1137,7 +1139,7 @@ lua_redis_make_request(lua_State *L)
                ud = &ctx->async;
                sp_ud = g_malloc0(sizeof(*sp_ud));
                sp_ud->cbref = cbref;
-               sp_ud->c = ud;
+               sp_ud->common_ud = ud;
                sp_ud->ctx = ctx;
 
                lua_pushstring(L, "cmd");
@@ -1501,21 +1503,18 @@ lua_redis_add_cmd(lua_State *L)
                }
 
                sp_ud = g_malloc0(sizeof(*sp_ud));
+               sp_ud->common_ud = &ctx->async;
+               ud = &ctx->async;
                if (IS_ASYNC(ctx)) {
-                       sp_ud->c = &ctx->async;
-                       ud = &ctx->async;
                        sp_ud->cbref = cbref;
                }
-               else {
-                       sp_ud->c = &ctx->async;
-                       ud = &ctx->async;
-               }
+
                sp_ud->ctx = ctx;
 
                lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args,
                                                         &sp_ud->arglens, &sp_ud->nargs);
 
-               LL_PREPEND(sp_ud->c->specific, sp_ud);
+               LL_PREPEND(sp_ud->common_ud->specific, sp_ud);
 
                if (ud->s && rspamd_session_blocked(ud->s)) {
                        lua_pushboolean(L, 0);
@@ -1525,7 +1524,7 @@ lua_redis_add_cmd(lua_State *L)
                }
 
                if (IS_ASYNC(ctx)) {
-                       ret = redisAsyncCommandArgv(sp_ud->c->ctx,
+                       ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx,
                                                                                lua_redis_callback,
                                                                                sp_ud,
                                                                                sp_ud->nargs,
@@ -1533,7 +1532,7 @@ lua_redis_add_cmd(lua_State *L)
                                                                                sp_ud->arglens);
                }
                else {
-                       ret = redisAsyncCommandArgv(sp_ud->c->ctx,
+                       ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx,
                                                                                lua_redis_callback_sync,
                                                                                sp_ud,
                                                                                sp_ud->nargs,
@@ -1554,25 +1553,28 @@ lua_redis_add_cmd(lua_State *L)
                        }
 
                        sp_ud->timeout_ev.data = sp_ud;
+                       ev_now_update_if_cheap(ud->event_loop);
 
                        if (IS_ASYNC(ctx)) {
                                ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout,
-                                                         sp_ud->c->timeout, 0.0);
+                                                         sp_ud->common_ud->timeout, 0.0);
                        }
                        else {
                                ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync,
-                                                         sp_ud->c->timeout, 0.0);
+                                                         sp_ud->common_ud->timeout, 0.0);
                        }
 
                        ev_timer_start(ud->event_loop, &sp_ud->timeout_ev);
+                       msg_debug_lua_redis("added timeout %f for %p", sp_ud->common_ud->timeout, sp_ud);
+
                        REDIS_RETAIN(ctx);
                        ctx->cmds_pending++;
                }
                else {
                        msg_info("call to redis failed: %s",
-                                        sp_ud->c->ctx->errstr);
+                                        sp_ud->common_ud->ctx->errstr);
                        lua_pushboolean(L, 0);
-                       lua_pushstring(L, sp_ud->c->ctx->errstr);
+                       lua_pushstring(L, sp_ud->common_ud->ctx->errstr);
 
                        return 2;
                }
@@ -1606,11 +1608,20 @@ lua_redis_exec(lua_State *L)
                return 0;
        }
        else {
-               if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) {
+               struct lua_redis_userdata *ud = &ctx->async;
+               int replies_pending = g_queue_get_length(ctx->replies);
+
+               msg_debug_lua_redis("execute pending commands for %p; commands pending = %d; replies pending = %d",
+                       ctx,
+                       ctx->cmds_pending,
+                       replies_pending);
+
+               if (ctx->cmds_pending == 0 && replies_pending == 0) {
                        lua_pushstring(L, "No pending commands to execute");
                        lua_error(L);
                }
-               if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) {
+
+               if (ctx->cmds_pending == 0 && replies_pending > 0) {
                        int results = lua_redis_push_results(ctx, L);
                        return results;
                }