]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] lua: state management for coroutine thread pool
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 16 Jun 2026 15:35:34 +0000 (16:35 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 16 Jun 2026 15:35:55 +0000 (16:35 +0100)
Async libraries (dns/redis/tcp/http/util) capture the "currently
running" coroutine at a yield point and resume it later from a C
completion callback. Nothing previously guaranteed the entry resumed
was still the one captured: a double-fired event, a completion racing
task teardown, or an entry recycled into another task would resume the
wrong (or freed) coroutine and corrupt memory. These failures are
interleaving-dependent and invisible in isolation.

Give each pooled thread an explicit lifecycle (FREE/RUNNING/YIELDED/
DEAD) plus a generation counter, both carried in the existing
thread_entry and per-request cbdata structs - no new allocations on any
hot path:

  - get/return/terminate/yield/resume enforce legal state transitions,
    so returning a suspended thread or resuming a non-suspended one now
    aborts at the exact violation instead of corrupting a core later.
  - lua_thread_resume_checked() refuses to resume unless the thread is
    still YIELDED and its generation matches the value snapshotted at
    the yield point; a stale/duplicate completion becomes a logged
    no-op rather than a wrong-coroutine resume. All five async libs are
    migrated to it.
  - lua_tcp keeps cbd->thread pointing at the coroutine actually
    yielded by sync read/write, so the resume always targets it.

generation is bumped on every acquire and release, so an entry that
goes back to the pool and is handed out again no longer matches a
completion that was already in flight.

src/lua/lua_dns.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/lua/lua_tcp.c
src/lua/lua_thread_pool.cxx
src/lua/lua_thread_pool.h
src/lua/lua_util.c

index 2f20b6f5edd008d966c955be3681aa43db30406b..c4aa70ef58145322c533d8316a52d62c090a0917 100644 (file)
@@ -28,6 +28,7 @@ void lua_dns_callback(struct rdns_reply *reply, void *arg);
 
 struct lua_rspamd_dns_cbdata {
        struct thread_entry *thread;
+       guint64 thread_generation;
        struct rspamd_task *task;
        struct rspamd_dns_resolver *resolver;
        struct rspamd_symcache_dynamic_item *item;
@@ -137,6 +138,7 @@ lua_dns_request(lua_State *L)
 
        if (ret) {
                cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+               cbdata->thread_generation = cbdata->thread->generation;
                cbdata->s = session;
 
                if (task) {
@@ -176,7 +178,7 @@ void lua_dns_callback(struct rdns_reply *reply, void *arg)
                lua_pushvalue(L, -3);
        }
 
-       lua_thread_resume(cbdata->thread, 2);
+       lua_thread_resume_checked(cbdata->thread, cbdata->thread_generation, 2);
 
        if (cbdata->item) {
                rspamd_symcache_item_async_dec_check(cbdata->task, cbdata->item, M);
index 5bf43769a2823f4402187f62a469ed7e3b67f9dd..082edd9d6d1e9a09d676c230bfeb4560142d902c 100644 (file)
@@ -202,6 +202,7 @@ struct lua_http_cbdata {
        int fd;
        int cbref;
        struct thread_entry *thread;
+       guint64 thread_generation;
        ref_entry_t ref;
 };
 
@@ -499,7 +500,7 @@ lua_http_resume_handler(struct rspamd_http_connection *conn,
                rspamd_symcache_set_cur_item(cbd->task, cbd->item);
        }
 
-       lua_thread_resume(cbd->thread, 2);
+       lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2);
 }
 
 static gboolean
@@ -1291,6 +1292,7 @@ lua_http_request(lua_State *L)
 
        if (cbd->cbref == -1) {
                cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+               cbd->thread_generation = cbd->thread->generation;
        }
 
        REF_INIT_RETAIN(cbd, lua_http_cbd_dtor);
@@ -1439,6 +1441,7 @@ lua_http_request(lua_State *L)
 
        if (cbd->cbref == -1) {
                cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+               cbd->thread_generation = cbd->thread->generation;
                cbd->flags |= RSPAMD_LUA_HTTP_FLAG_YIELDED;
 
                return lua_thread_yield(cbd->thread, 0);
index b08f44858062b73525c6969f2d93a64d51c1e496..ccde17da7905b0c56331b8391995beadfcf4782a 100644 (file)
@@ -149,6 +149,7 @@ struct lua_redis_ctx {
        GQueue *replies;             /* for sync connection only */
        GQueue *events_cleanup;      /* for sync connection only */
        struct thread_entry *thread; /* for sync mode, set only if there was yield */
+       guint64 thread_generation;   /* generation of thread snapshotted at yield */
 };
 
 struct lua_redis_result {
@@ -689,6 +690,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
                if (ctx->thread) {
                        if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
                                /* somebody yielded and waits for results */
+                               guint64 thread_generation = ctx->thread_generation;
                                thread = ctx->thread;
                                ctx->thread = NULL;
 
@@ -698,7 +700,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv)
                                        rspamd_symcache_set_cur_item(ud->task, ud->item);
                                }
 
-                               lua_thread_resume(thread, results);
+                               lua_thread_resume_checked(thread, thread_generation, results);
                                lua_redis_cleanup_events(ctx);
                        }
                        else {
@@ -1704,6 +1706,7 @@ lua_redis_exec(lua_State *L)
                }
                else {
                        ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool);
+                       ctx->thread_generation = ctx->thread->generation;
                        return lua_thread_yield(ctx->thread, 0);
                }
        }
index cf42527fae289623f475fefb5e0bf9c0d75e14c5..8d35cd67309fde18c5b67ffafa0d6444c730c269 100644 (file)
@@ -341,6 +341,7 @@ struct lua_tcp_cbdata {
        struct rspamd_task *task;
        struct rspamd_symcache_dynamic_item *item;
        struct thread_entry *thread;
+       guint64 thread_generation;
        struct rspamd_config *cfg;
        struct rspamd_ssl_connection *ssl_conn;
        char *hostname;
@@ -784,7 +785,7 @@ lua_tcp_resume_thread_error_argp(struct lua_tcp_cbdata *cbd, const char *error,
        lua_tcp_shift_handler(cbd);
        // lua_tcp_unregister_event (cbd);
        lua_thread_pool_set_running_entry(cbd->cfg->lua_thread_pool, cbd->thread);
-       lua_thread_resume(thread, 2);
+       lua_thread_resume_checked(thread, cbd->thread_generation, 2);
        TCP_RELEASE(cbd);
 }
 
@@ -827,7 +828,7 @@ lua_tcp_resume_thread(struct lua_tcp_cbdata *cbd, const uint8_t *str, gsize len)
                rspamd_symcache_set_cur_item(cbd->task, cbd->item);
        }
 
-       lua_thread_resume(cbd->thread, 2);
+       lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2);
 
        TCP_RELEASE(cbd);
 }
@@ -875,7 +876,7 @@ lua_tcp_connect_helper(struct lua_tcp_cbdata *cbd)
        lua_tcp_shift_handler(cbd);
 
        // lua_tcp_unregister_event (cbd);
-       lua_thread_resume(cbd->thread, 2);
+       lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2);
        TCP_RELEASE(cbd);
 }
 
@@ -2255,6 +2256,7 @@ lua_tcp_connect_sync(lua_State *L)
        cbd->task = task;
        cbd->cfg = cfg;
        cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+       cbd->thread_generation = cbd->thread ? cbd->thread->generation : 0;
 
 
        cbd->handlers = g_queue_new();
@@ -2587,6 +2589,9 @@ lua_tcp_sync_read_once(lua_State *L)
        }
 
        struct thread_entry *thread = lua_thread_pool_get_running_entry(cbd->cfg->lua_thread_pool);
+       /* Resume targets cbd->thread, so keep it pointing at the coroutine we yield */
+       cbd->thread = thread;
+       cbd->thread_generation = thread->generation;
 
        rh = g_malloc0(sizeof(*rh));
        rh->type = LUA_WANT_READ;
@@ -2618,6 +2623,9 @@ lua_tcp_sync_write(lua_State *L)
        }
 
        struct thread_entry *thread = lua_thread_pool_get_running_entry(cbd->cfg->lua_thread_pool);
+       /* Resume targets cbd->thread, so keep it pointing at the coroutine we yield */
+       cbd->thread = thread;
+       cbd->thread_generation = thread->generation;
 
        tp = lua_type(L, 2);
        if (tp == LUA_TSTRING || tp == LUA_TUSERDATA) {
index 161d4a85f43322338059a2040239375624e7151b..b3d8d5cc4adaa4950bdeaf87a8a1f1529539ca57 100644 (file)
@@ -72,6 +72,11 @@ struct lua_thread_pool {
                        ent = thread_entry_new(L);
                }
 
+               /* Only idle threads may be handed out */
+               g_assert(ent->state == LUA_THREAD_FREE);
+               ent->state = LUA_THREAD_RUNNING;
+               ent->generation++;
+
                running_entry = ent;
 
                return ent;
@@ -81,11 +86,21 @@ struct lua_thread_pool {
        {
                /* we can't return a running/yielded thread into the pool */
                g_assert(lua_status(thread_entry->lua_state) == 0);
+               /*
+                * A thread is only returnable right after it finished executing, i.e.
+                * while it is still the RUNNING one. Hitting this assert means someone
+                * returned a suspended (YIELDED) or already recycled (FREE/DEAD)
+                * thread - a classic source of coroutine corruption.
+                */
+               g_assert(thread_entry->state == LUA_THREAD_RUNNING);
 
                if (running_entry == thread_entry) {
                        running_entry = NULL;
                }
 
+               thread_entry->state = LUA_THREAD_FREE;
+               thread_entry->generation++;
+
                if (available_items.size() <= max_items) {
                        thread_entry->cd = NULL;
                        thread_entry->finish_callback = NULL;
@@ -123,6 +138,8 @@ struct lua_thread_pool {
                        running_entry = NULL;
                }
 
+               thread_entry->state = LUA_THREAD_DEAD;
+
                msg_debug_lua_threads("%s: terminated thread entry", loc);
                thread_entry_free(L, thread_entry);
 
@@ -152,6 +169,8 @@ thread_entry_new(lua_State *L)
        ent = g_new0(struct thread_entry, 1);
        ent->lua_state = lua_newthread(L);
        ent->thread_index = luaL_ref(L, LUA_REGISTRYINDEX);
+       ent->state = LUA_THREAD_FREE;
+       ent->generation = 0;
 
        return ent;
 }
@@ -335,6 +354,37 @@ lua_resume_thread_internal_full(struct thread_entry *thread_entry,
        }
 }
 
+/*
+ * Shared by lua_thread_resume_full()/lua_thread_resume_checked_full(): refuse
+ * to resume anything that is not currently suspended. This makes a duplicate
+ * or stale async completion (double-fired event, completion racing task
+ * teardown, recycled entry) a logged no-op instead of memory corruption.
+ * Returns true if the thread is safe to resume.
+ */
+static bool
+lua_thread_resume_guard(struct thread_entry *thread_entry, const char *loc)
+{
+       struct rspamd_task *task = thread_entry->task;
+
+       if (thread_entry->state != LUA_THREAD_YIELDED) {
+               msg_err_task_check("%s: refusing to resume a coroutine that is not "
+                                                  "suspended (state=%d, lua_status=%d): likely a "
+                                                  "duplicate or stale async completion",
+                                                  loc, (int) thread_entry->state,
+                                                  lua_status(thread_entry->lua_state));
+
+               return false;
+       }
+
+       /*
+        * State and lua status must stay in lockstep: a YIELDED entry always has a
+        * suspended lua_State underneath it.
+        */
+       g_assert(lua_status(thread_entry->lua_state) == LUA_YIELD);
+
+       return true;
+}
+
 void lua_thread_resume_full(struct thread_entry *thread_entry, int narg,
                                                        const char *loc)
 {
@@ -343,16 +393,56 @@ void lua_thread_resume_full(struct thread_entry *thread_entry, int narg,
         * Another acceptable status is OK (0) but in that case we should push function on stack
         * to start the thread from, which is happening in lua_thread_call(), not in this function.
         */
-       g_assert(lua_status(thread_entry->lua_state) == LUA_YIELD);
        msg_debug_lua_threads("%s: lua_thread_resume_full", loc);
+
+       if (!lua_thread_resume_guard(thread_entry, loc)) {
+               return;
+       }
+
+       thread_entry->state = LUA_THREAD_RUNNING;
+       lua_thread_pool_set_running_entry_for_thread(thread_entry, loc);
+       lua_resume_thread_internal_full(thread_entry, narg, loc);
+}
+
+bool lua_thread_resume_checked_full(struct thread_entry *thread_entry,
+                                                                       guint64 expected_generation,
+                                                                       int narg,
+                                                                       const char *loc)
+{
+       struct rspamd_task *task = thread_entry->task;
+
+       msg_debug_lua_threads("%s: lua_thread_resume_checked_full", loc);
+
+       if (thread_entry->generation != expected_generation) {
+               /*
+                * The entry was recycled (returned to the pool and handed out again)
+                * between the yield and this completion. The coroutine we captured is
+                * gone; resuming now would clobber whoever owns the entry now.
+                */
+               msg_err_task_check("%s: refusing stale coroutine resume: generation "
+                                                  "mismatch (expected %uL, got %uL)",
+                                                  loc, expected_generation,
+                                                  thread_entry->generation);
+
+               return false;
+       }
+
+       if (!lua_thread_resume_guard(thread_entry, loc)) {
+               return false;
+       }
+
+       thread_entry->state = LUA_THREAD_RUNNING;
        lua_thread_pool_set_running_entry_for_thread(thread_entry, loc);
        lua_resume_thread_internal_full(thread_entry, narg, loc);
+
+       return true;
 }
 
 void lua_thread_call_full(struct thread_entry *thread_entry,
                                                  int narg, const char *loc)
 {
        g_assert(lua_status(thread_entry->lua_state) == 0);                /* we can't call running/yielded thread */
+       g_assert(thread_entry->state == LUA_THREAD_RUNNING);               /* must be freshly acquired */
        g_assert(thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
 
        lua_resume_thread_internal_full(thread_entry, narg, loc);
@@ -363,7 +453,10 @@ int lua_thread_yield_full(struct thread_entry *thread_entry,
                                                  const char *loc)
 {
        g_assert(lua_status(thread_entry->lua_state) == 0);
+       g_assert(thread_entry->state == LUA_THREAD_RUNNING);
 
        msg_debug_lua_threads("%s: lua_thread_yield_full", loc);
+       thread_entry->state = LUA_THREAD_YIELDED;
+
        return lua_yield(thread_entry->lua_state, nresults);
 }
index c6ad3184734cb6cbaf68fe3032405659438d622c..7a108c8c93eb22afdf17aad92ade7d4f958f32fb 100644 (file)
@@ -14,6 +14,18 @@ typedef void (*lua_thread_finish_t)(struct thread_entry *thread, int ret);
 
 typedef void (*lua_thread_error_t)(struct thread_entry *thread, int ret, const char *msg);
 
+/*
+ * Lifecycle of a pooled Lua coroutine. Transitions are enforced by the
+ * thread pool so that a thread can never be resumed twice, returned while
+ * suspended, or reused after it has been recycled into another task.
+ */
+enum thread_entry_state {
+       LUA_THREAD_FREE = 0, /* idle in the pool or freshly created */
+       LUA_THREAD_RUNNING,  /* currently executing Lua on the C stack */
+       LUA_THREAD_YIELDED,  /* suspended at an async yield point */
+       LUA_THREAD_DEAD,     /* errored/terminated, must not be reused */
+};
+
 struct thread_entry {
        lua_State *lua_state;
        int thread_index;
@@ -26,6 +38,15 @@ struct thread_entry {
        lua_thread_error_t error_callback;
        struct rspamd_task *task;
        struct rspamd_config *cfg;
+
+       /*
+        * Bumped on every acquire and release. Async libraries snapshot it at the
+        * yield point and pass it back to lua_thread_resume_checked(): a mismatch
+        * means the entry was recycled while a stale completion was in flight, so
+        * the resume is refused instead of corrupting another task's coroutine.
+        */
+       guint64 generation;
+       enum thread_entry_state state;
 };
 
 struct lua_callback_state {
@@ -175,6 +196,25 @@ void lua_thread_resume_full(struct thread_entry *thread_entry,
 #define lua_thread_resume(thread_entry, narg) \
        lua_thread_resume_full(thread_entry, narg, G_STRLOC)
 
+/**
+ * Like lua_thread_resume(), but additionally verifies that the entry has not
+ * been recycled since the caller snapshotted its generation at the yield
+ * point. If the generation does not match, or the thread is not suspended,
+ * the resume is refused (logged, no-op) rather than risking memory
+ * corruption. Returns true if the thread was actually resumed.
+ *
+ * @param thread_entry
+ * @param expected_generation value of thread_entry->generation captured at yield
+ * @param narg
+ */
+bool lua_thread_resume_checked_full(struct thread_entry *thread_entry,
+                                                                       guint64 expected_generation,
+                                                                       int narg,
+                                                                       const char *loc);
+
+#define lua_thread_resume_checked(thread_entry, expected_generation, narg) \
+       lua_thread_resume_checked_full(thread_entry, expected_generation, narg, G_STRLOC)
+
 /**
  * Terminates thread pool entry and fill the pool with another thread entry if needed
  * @param pool
index 1f1e96940364feaa133bc14f6e2779f47095b5a0..4bc950d5e6ec38da28569137568d7d9d0bc2117d 100644 (file)
@@ -4475,6 +4475,7 @@ struct rspamd_ev_base_sleep_cbdata {
        lua_State *L;
        int cbref;
        struct thread_entry *thread;
+       guint64 thread_generation;
        struct rspamd_async_session *session;
        ev_timer ev;
 };
@@ -4517,7 +4518,7 @@ lua_ev_base_sleep_cb(struct ev_loop *loop, struct ev_timer *t, int events)
        }
        else if (cbdata->thread) {
                /* Sync mode: resume the coroutine */
-               lua_thread_resume(cbdata->thread, 0);
+               lua_thread_resume_checked(cbdata->thread, cbdata->thread_generation, 0);
        }
 
        g_free(cbdata);
@@ -4600,6 +4601,7 @@ lua_ev_base_sleep(lua_State *L)
 
                        if (cfg && cfg->lua_thread_pool) {
                                cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+                               cbdata->thread_generation = cbdata->thread->generation;
 
                                /* Register session event if available so wait_session_events waits for us */
                                if (session) {