From: Vsevolod Stakhov Date: Tue, 16 Jun 2026 15:35:34 +0000 (+0100) Subject: [Fix] lua: state management for coroutine thread pool X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=d3a5a89f4f2160cfed285bea01a434df5f3aff71;p=thirdparty%2Frspamd.git [Fix] lua: state management for coroutine thread pool Async libraries (dns/redis/tcp/http/util) capture the "currently running" coroutine at a yield point and resume it later from a C completion callback. Nothing previously guaranteed the entry resumed was still the one captured: a double-fired event, a completion racing task teardown, or an entry recycled into another task would resume the wrong (or freed) coroutine and corrupt memory. These failures are interleaving-dependent and invisible in isolation. Give each pooled thread an explicit lifecycle (FREE/RUNNING/YIELDED/ DEAD) plus a generation counter, both carried in the existing thread_entry and per-request cbdata structs - no new allocations on any hot path: - get/return/terminate/yield/resume enforce legal state transitions, so returning a suspended thread or resuming a non-suspended one now aborts at the exact violation instead of corrupting a core later. - lua_thread_resume_checked() refuses to resume unless the thread is still YIELDED and its generation matches the value snapshotted at the yield point; a stale/duplicate completion becomes a logged no-op rather than a wrong-coroutine resume. All five async libs are migrated to it. - lua_tcp keeps cbd->thread pointing at the coroutine actually yielded by sync read/write, so the resume always targets it. generation is bumped on every acquire and release, so an entry that goes back to the pool and is handed out again no longer matches a completion that was already in flight. --- diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c index 2f20b6f5ed..c4aa70ef58 100644 --- a/src/lua/lua_dns.c +++ b/src/lua/lua_dns.c @@ -28,6 +28,7 @@ void lua_dns_callback(struct rdns_reply *reply, void *arg); struct lua_rspamd_dns_cbdata { struct thread_entry *thread; + guint64 thread_generation; struct rspamd_task *task; struct rspamd_dns_resolver *resolver; struct rspamd_symcache_dynamic_item *item; @@ -137,6 +138,7 @@ lua_dns_request(lua_State *L) if (ret) { cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool); + cbdata->thread_generation = cbdata->thread->generation; cbdata->s = session; if (task) { @@ -176,7 +178,7 @@ void lua_dns_callback(struct rdns_reply *reply, void *arg) lua_pushvalue(L, -3); } - lua_thread_resume(cbdata->thread, 2); + lua_thread_resume_checked(cbdata->thread, cbdata->thread_generation, 2); if (cbdata->item) { rspamd_symcache_item_async_dec_check(cbdata->task, cbdata->item, M); diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 5bf43769a2..082edd9d6d 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -202,6 +202,7 @@ struct lua_http_cbdata { int fd; int cbref; struct thread_entry *thread; + guint64 thread_generation; ref_entry_t ref; }; @@ -499,7 +500,7 @@ lua_http_resume_handler(struct rspamd_http_connection *conn, rspamd_symcache_set_cur_item(cbd->task, cbd->item); } - lua_thread_resume(cbd->thread, 2); + lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2); } static gboolean @@ -1291,6 +1292,7 @@ lua_http_request(lua_State *L) if (cbd->cbref == -1) { cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool); + cbd->thread_generation = cbd->thread->generation; } REF_INIT_RETAIN(cbd, lua_http_cbd_dtor); @@ -1439,6 +1441,7 @@ lua_http_request(lua_State *L) if (cbd->cbref == -1) { cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool); + cbd->thread_generation = cbd->thread->generation; cbd->flags |= RSPAMD_LUA_HTTP_FLAG_YIELDED; return lua_thread_yield(cbd->thread, 0); diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index b08f448580..ccde17da79 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -149,6 +149,7 @@ struct lua_redis_ctx { GQueue *replies; /* for sync connection only */ GQueue *events_cleanup; /* for sync connection only */ struct thread_entry *thread; /* for sync mode, set only if there was yield */ + guint64 thread_generation; /* generation of thread snapshotted at yield */ }; struct lua_redis_result { @@ -689,6 +690,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) if (ctx->thread) { if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { /* somebody yielded and waits for results */ + guint64 thread_generation = ctx->thread_generation; thread = ctx->thread; ctx->thread = NULL; @@ -698,7 +700,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) rspamd_symcache_set_cur_item(ud->task, ud->item); } - lua_thread_resume(thread, results); + lua_thread_resume_checked(thread, thread_generation, results); lua_redis_cleanup_events(ctx); } else { @@ -1704,6 +1706,7 @@ lua_redis_exec(lua_State *L) } else { ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool); + ctx->thread_generation = ctx->thread->generation; return lua_thread_yield(ctx->thread, 0); } } diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index cf42527fae..8d35cd6730 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -341,6 +341,7 @@ struct lua_tcp_cbdata { struct rspamd_task *task; struct rspamd_symcache_dynamic_item *item; struct thread_entry *thread; + guint64 thread_generation; struct rspamd_config *cfg; struct rspamd_ssl_connection *ssl_conn; char *hostname; @@ -784,7 +785,7 @@ lua_tcp_resume_thread_error_argp(struct lua_tcp_cbdata *cbd, const char *error, lua_tcp_shift_handler(cbd); // lua_tcp_unregister_event (cbd); lua_thread_pool_set_running_entry(cbd->cfg->lua_thread_pool, cbd->thread); - lua_thread_resume(thread, 2); + lua_thread_resume_checked(thread, cbd->thread_generation, 2); TCP_RELEASE(cbd); } @@ -827,7 +828,7 @@ lua_tcp_resume_thread(struct lua_tcp_cbdata *cbd, const uint8_t *str, gsize len) rspamd_symcache_set_cur_item(cbd->task, cbd->item); } - lua_thread_resume(cbd->thread, 2); + lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2); TCP_RELEASE(cbd); } @@ -875,7 +876,7 @@ lua_tcp_connect_helper(struct lua_tcp_cbdata *cbd) lua_tcp_shift_handler(cbd); // lua_tcp_unregister_event (cbd); - lua_thread_resume(cbd->thread, 2); + lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2); TCP_RELEASE(cbd); } @@ -2255,6 +2256,7 @@ lua_tcp_connect_sync(lua_State *L) cbd->task = task; cbd->cfg = cfg; cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool); + cbd->thread_generation = cbd->thread ? cbd->thread->generation : 0; cbd->handlers = g_queue_new(); @@ -2587,6 +2589,9 @@ lua_tcp_sync_read_once(lua_State *L) } struct thread_entry *thread = lua_thread_pool_get_running_entry(cbd->cfg->lua_thread_pool); + /* Resume targets cbd->thread, so keep it pointing at the coroutine we yield */ + cbd->thread = thread; + cbd->thread_generation = thread->generation; rh = g_malloc0(sizeof(*rh)); rh->type = LUA_WANT_READ; @@ -2618,6 +2623,9 @@ lua_tcp_sync_write(lua_State *L) } struct thread_entry *thread = lua_thread_pool_get_running_entry(cbd->cfg->lua_thread_pool); + /* Resume targets cbd->thread, so keep it pointing at the coroutine we yield */ + cbd->thread = thread; + cbd->thread_generation = thread->generation; tp = lua_type(L, 2); if (tp == LUA_TSTRING || tp == LUA_TUSERDATA) { diff --git a/src/lua/lua_thread_pool.cxx b/src/lua/lua_thread_pool.cxx index 161d4a85f4..b3d8d5cc4a 100644 --- a/src/lua/lua_thread_pool.cxx +++ b/src/lua/lua_thread_pool.cxx @@ -72,6 +72,11 @@ struct lua_thread_pool { ent = thread_entry_new(L); } + /* Only idle threads may be handed out */ + g_assert(ent->state == LUA_THREAD_FREE); + ent->state = LUA_THREAD_RUNNING; + ent->generation++; + running_entry = ent; return ent; @@ -81,11 +86,21 @@ struct lua_thread_pool { { /* we can't return a running/yielded thread into the pool */ g_assert(lua_status(thread_entry->lua_state) == 0); + /* + * A thread is only returnable right after it finished executing, i.e. + * while it is still the RUNNING one. Hitting this assert means someone + * returned a suspended (YIELDED) or already recycled (FREE/DEAD) + * thread - a classic source of coroutine corruption. + */ + g_assert(thread_entry->state == LUA_THREAD_RUNNING); if (running_entry == thread_entry) { running_entry = NULL; } + thread_entry->state = LUA_THREAD_FREE; + thread_entry->generation++; + if (available_items.size() <= max_items) { thread_entry->cd = NULL; thread_entry->finish_callback = NULL; @@ -123,6 +138,8 @@ struct lua_thread_pool { running_entry = NULL; } + thread_entry->state = LUA_THREAD_DEAD; + msg_debug_lua_threads("%s: terminated thread entry", loc); thread_entry_free(L, thread_entry); @@ -152,6 +169,8 @@ thread_entry_new(lua_State *L) ent = g_new0(struct thread_entry, 1); ent->lua_state = lua_newthread(L); ent->thread_index = luaL_ref(L, LUA_REGISTRYINDEX); + ent->state = LUA_THREAD_FREE; + ent->generation = 0; return ent; } @@ -335,6 +354,37 @@ lua_resume_thread_internal_full(struct thread_entry *thread_entry, } } +/* + * Shared by lua_thread_resume_full()/lua_thread_resume_checked_full(): refuse + * to resume anything that is not currently suspended. This makes a duplicate + * or stale async completion (double-fired event, completion racing task + * teardown, recycled entry) a logged no-op instead of memory corruption. + * Returns true if the thread is safe to resume. + */ +static bool +lua_thread_resume_guard(struct thread_entry *thread_entry, const char *loc) +{ + struct rspamd_task *task = thread_entry->task; + + if (thread_entry->state != LUA_THREAD_YIELDED) { + msg_err_task_check("%s: refusing to resume a coroutine that is not " + "suspended (state=%d, lua_status=%d): likely a " + "duplicate or stale async completion", + loc, (int) thread_entry->state, + lua_status(thread_entry->lua_state)); + + return false; + } + + /* + * State and lua status must stay in lockstep: a YIELDED entry always has a + * suspended lua_State underneath it. + */ + g_assert(lua_status(thread_entry->lua_state) == LUA_YIELD); + + return true; +} + void lua_thread_resume_full(struct thread_entry *thread_entry, int narg, const char *loc) { @@ -343,16 +393,56 @@ void lua_thread_resume_full(struct thread_entry *thread_entry, int narg, * Another acceptable status is OK (0) but in that case we should push function on stack * to start the thread from, which is happening in lua_thread_call(), not in this function. */ - g_assert(lua_status(thread_entry->lua_state) == LUA_YIELD); msg_debug_lua_threads("%s: lua_thread_resume_full", loc); + + if (!lua_thread_resume_guard(thread_entry, loc)) { + return; + } + + thread_entry->state = LUA_THREAD_RUNNING; + lua_thread_pool_set_running_entry_for_thread(thread_entry, loc); + lua_resume_thread_internal_full(thread_entry, narg, loc); +} + +bool lua_thread_resume_checked_full(struct thread_entry *thread_entry, + guint64 expected_generation, + int narg, + const char *loc) +{ + struct rspamd_task *task = thread_entry->task; + + msg_debug_lua_threads("%s: lua_thread_resume_checked_full", loc); + + if (thread_entry->generation != expected_generation) { + /* + * The entry was recycled (returned to the pool and handed out again) + * between the yield and this completion. The coroutine we captured is + * gone; resuming now would clobber whoever owns the entry now. + */ + msg_err_task_check("%s: refusing stale coroutine resume: generation " + "mismatch (expected %uL, got %uL)", + loc, expected_generation, + thread_entry->generation); + + return false; + } + + if (!lua_thread_resume_guard(thread_entry, loc)) { + return false; + } + + thread_entry->state = LUA_THREAD_RUNNING; lua_thread_pool_set_running_entry_for_thread(thread_entry, loc); lua_resume_thread_internal_full(thread_entry, narg, loc); + + return true; } void lua_thread_call_full(struct thread_entry *thread_entry, int narg, const char *loc) { g_assert(lua_status(thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ + g_assert(thread_entry->state == LUA_THREAD_RUNNING); /* must be freshly acquired */ g_assert(thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */ lua_resume_thread_internal_full(thread_entry, narg, loc); @@ -363,7 +453,10 @@ int lua_thread_yield_full(struct thread_entry *thread_entry, const char *loc) { g_assert(lua_status(thread_entry->lua_state) == 0); + g_assert(thread_entry->state == LUA_THREAD_RUNNING); msg_debug_lua_threads("%s: lua_thread_yield_full", loc); + thread_entry->state = LUA_THREAD_YIELDED; + return lua_yield(thread_entry->lua_state, nresults); } diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h index c6ad318473..7a108c8c93 100644 --- a/src/lua/lua_thread_pool.h +++ b/src/lua/lua_thread_pool.h @@ -14,6 +14,18 @@ typedef void (*lua_thread_finish_t)(struct thread_entry *thread, int ret); typedef void (*lua_thread_error_t)(struct thread_entry *thread, int ret, const char *msg); +/* + * Lifecycle of a pooled Lua coroutine. Transitions are enforced by the + * thread pool so that a thread can never be resumed twice, returned while + * suspended, or reused after it has been recycled into another task. + */ +enum thread_entry_state { + LUA_THREAD_FREE = 0, /* idle in the pool or freshly created */ + LUA_THREAD_RUNNING, /* currently executing Lua on the C stack */ + LUA_THREAD_YIELDED, /* suspended at an async yield point */ + LUA_THREAD_DEAD, /* errored/terminated, must not be reused */ +}; + struct thread_entry { lua_State *lua_state; int thread_index; @@ -26,6 +38,15 @@ struct thread_entry { lua_thread_error_t error_callback; struct rspamd_task *task; struct rspamd_config *cfg; + + /* + * Bumped on every acquire and release. Async libraries snapshot it at the + * yield point and pass it back to lua_thread_resume_checked(): a mismatch + * means the entry was recycled while a stale completion was in flight, so + * the resume is refused instead of corrupting another task's coroutine. + */ + guint64 generation; + enum thread_entry_state state; }; struct lua_callback_state { @@ -175,6 +196,25 @@ void lua_thread_resume_full(struct thread_entry *thread_entry, #define lua_thread_resume(thread_entry, narg) \ lua_thread_resume_full(thread_entry, narg, G_STRLOC) +/** + * Like lua_thread_resume(), but additionally verifies that the entry has not + * been recycled since the caller snapshotted its generation at the yield + * point. If the generation does not match, or the thread is not suspended, + * the resume is refused (logged, no-op) rather than risking memory + * corruption. Returns true if the thread was actually resumed. + * + * @param thread_entry + * @param expected_generation value of thread_entry->generation captured at yield + * @param narg + */ +bool lua_thread_resume_checked_full(struct thread_entry *thread_entry, + guint64 expected_generation, + int narg, + const char *loc); + +#define lua_thread_resume_checked(thread_entry, expected_generation, narg) \ + lua_thread_resume_checked_full(thread_entry, expected_generation, narg, G_STRLOC) + /** * Terminates thread pool entry and fill the pool with another thread entry if needed * @param pool diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c index 1f1e969403..4bc950d5e6 100644 --- a/src/lua/lua_util.c +++ b/src/lua/lua_util.c @@ -4475,6 +4475,7 @@ struct rspamd_ev_base_sleep_cbdata { lua_State *L; int cbref; struct thread_entry *thread; + guint64 thread_generation; struct rspamd_async_session *session; ev_timer ev; }; @@ -4517,7 +4518,7 @@ lua_ev_base_sleep_cb(struct ev_loop *loop, struct ev_timer *t, int events) } else if (cbdata->thread) { /* Sync mode: resume the coroutine */ - lua_thread_resume(cbdata->thread, 0); + lua_thread_resume_checked(cbdata->thread, cbdata->thread_generation, 0); } g_free(cbdata); @@ -4600,6 +4601,7 @@ lua_ev_base_sleep(lua_State *L) if (cfg && cfg->lua_thread_pool) { cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool); + cbdata->thread_generation = cbdata->thread->generation; /* Register session event if available so wait_session_events waits for us */ if (session) {