struct lua_rspamd_dns_cbdata {
struct thread_entry *thread;
+ guint64 thread_generation;
struct rspamd_task *task;
struct rspamd_dns_resolver *resolver;
struct rspamd_symcache_dynamic_item *item;
if (ret) {
cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+ cbdata->thread_generation = cbdata->thread->generation;
cbdata->s = session;
if (task) {
lua_pushvalue(L, -3);
}
- lua_thread_resume(cbdata->thread, 2);
+ lua_thread_resume_checked(cbdata->thread, cbdata->thread_generation, 2);
if (cbdata->item) {
rspamd_symcache_item_async_dec_check(cbdata->task, cbdata->item, M);
int fd;
int cbref;
struct thread_entry *thread;
+ guint64 thread_generation;
ref_entry_t ref;
};
rspamd_symcache_set_cur_item(cbd->task, cbd->item);
}
- lua_thread_resume(cbd->thread, 2);
+ lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2);
}
static gboolean
if (cbd->cbref == -1) {
cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+ cbd->thread_generation = cbd->thread->generation;
}
REF_INIT_RETAIN(cbd, lua_http_cbd_dtor);
if (cbd->cbref == -1) {
cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+ cbd->thread_generation = cbd->thread->generation;
cbd->flags |= RSPAMD_LUA_HTTP_FLAG_YIELDED;
return lua_thread_yield(cbd->thread, 0);
GQueue *replies; /* for sync connection only */
GQueue *events_cleanup; /* for sync connection only */
struct thread_entry *thread; /* for sync mode, set only if there was yield */
+ guint64 thread_generation; /* generation of thread snapshotted at yield */
};
struct lua_redis_result {
if (ctx->thread) {
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
/* somebody yielded and waits for results */
+ guint64 thread_generation = ctx->thread_generation;
thread = ctx->thread;
ctx->thread = NULL;
rspamd_symcache_set_cur_item(ud->task, ud->item);
}
- lua_thread_resume(thread, results);
+ lua_thread_resume_checked(thread, thread_generation, results);
lua_redis_cleanup_events(ctx);
}
else {
}
else {
ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool);
+ ctx->thread_generation = ctx->thread->generation;
return lua_thread_yield(ctx->thread, 0);
}
}
struct rspamd_task *task;
struct rspamd_symcache_dynamic_item *item;
struct thread_entry *thread;
+ guint64 thread_generation;
struct rspamd_config *cfg;
struct rspamd_ssl_connection *ssl_conn;
char *hostname;
lua_tcp_shift_handler(cbd);
// lua_tcp_unregister_event (cbd);
lua_thread_pool_set_running_entry(cbd->cfg->lua_thread_pool, cbd->thread);
- lua_thread_resume(thread, 2);
+ lua_thread_resume_checked(thread, cbd->thread_generation, 2);
TCP_RELEASE(cbd);
}
rspamd_symcache_set_cur_item(cbd->task, cbd->item);
}
- lua_thread_resume(cbd->thread, 2);
+ lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2);
TCP_RELEASE(cbd);
}
lua_tcp_shift_handler(cbd);
// lua_tcp_unregister_event (cbd);
- lua_thread_resume(cbd->thread, 2);
+ lua_thread_resume_checked(cbd->thread, cbd->thread_generation, 2);
TCP_RELEASE(cbd);
}
cbd->task = task;
cbd->cfg = cfg;
cbd->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+ cbd->thread_generation = cbd->thread ? cbd->thread->generation : 0;
cbd->handlers = g_queue_new();
}
struct thread_entry *thread = lua_thread_pool_get_running_entry(cbd->cfg->lua_thread_pool);
+ /* Resume targets cbd->thread, so keep it pointing at the coroutine we yield */
+ cbd->thread = thread;
+ cbd->thread_generation = thread->generation;
rh = g_malloc0(sizeof(*rh));
rh->type = LUA_WANT_READ;
}
struct thread_entry *thread = lua_thread_pool_get_running_entry(cbd->cfg->lua_thread_pool);
+ /* Resume targets cbd->thread, so keep it pointing at the coroutine we yield */
+ cbd->thread = thread;
+ cbd->thread_generation = thread->generation;
tp = lua_type(L, 2);
if (tp == LUA_TSTRING || tp == LUA_TUSERDATA) {
ent = thread_entry_new(L);
}
+ /* Only idle threads may be handed out */
+ g_assert(ent->state == LUA_THREAD_FREE);
+ ent->state = LUA_THREAD_RUNNING;
+ ent->generation++;
+
running_entry = ent;
return ent;
{
/* we can't return a running/yielded thread into the pool */
g_assert(lua_status(thread_entry->lua_state) == 0);
+ /*
+ * A thread is only returnable right after it finished executing, i.e.
+ * while it is still the RUNNING one. Hitting this assert means someone
+ * returned a suspended (YIELDED) or already recycled (FREE/DEAD)
+ * thread - a classic source of coroutine corruption.
+ */
+ g_assert(thread_entry->state == LUA_THREAD_RUNNING);
if (running_entry == thread_entry) {
running_entry = NULL;
}
+ thread_entry->state = LUA_THREAD_FREE;
+ thread_entry->generation++;
+
if (available_items.size() <= max_items) {
thread_entry->cd = NULL;
thread_entry->finish_callback = NULL;
running_entry = NULL;
}
+ thread_entry->state = LUA_THREAD_DEAD;
+
msg_debug_lua_threads("%s: terminated thread entry", loc);
thread_entry_free(L, thread_entry);
ent = g_new0(struct thread_entry, 1);
ent->lua_state = lua_newthread(L);
ent->thread_index = luaL_ref(L, LUA_REGISTRYINDEX);
+ ent->state = LUA_THREAD_FREE;
+ ent->generation = 0;
return ent;
}
}
}
+/*
+ * Shared by lua_thread_resume_full()/lua_thread_resume_checked_full(): refuse
+ * to resume anything that is not currently suspended. This makes a duplicate
+ * or stale async completion (double-fired event, completion racing task
+ * teardown, recycled entry) a logged no-op instead of memory corruption.
+ * Returns true if the thread is safe to resume.
+ */
+static bool
+lua_thread_resume_guard(struct thread_entry *thread_entry, const char *loc)
+{
+ struct rspamd_task *task = thread_entry->task;
+
+ if (thread_entry->state != LUA_THREAD_YIELDED) {
+ msg_err_task_check("%s: refusing to resume a coroutine that is not "
+ "suspended (state=%d, lua_status=%d): likely a "
+ "duplicate or stale async completion",
+ loc, (int) thread_entry->state,
+ lua_status(thread_entry->lua_state));
+
+ return false;
+ }
+
+ /*
+ * State and lua status must stay in lockstep: a YIELDED entry always has a
+ * suspended lua_State underneath it.
+ */
+ g_assert(lua_status(thread_entry->lua_state) == LUA_YIELD);
+
+ return true;
+}
+
void lua_thread_resume_full(struct thread_entry *thread_entry, int narg,
const char *loc)
{
* Another acceptable status is OK (0) but in that case we should push function on stack
* to start the thread from, which is happening in lua_thread_call(), not in this function.
*/
- g_assert(lua_status(thread_entry->lua_state) == LUA_YIELD);
msg_debug_lua_threads("%s: lua_thread_resume_full", loc);
+
+ if (!lua_thread_resume_guard(thread_entry, loc)) {
+ return;
+ }
+
+ thread_entry->state = LUA_THREAD_RUNNING;
+ lua_thread_pool_set_running_entry_for_thread(thread_entry, loc);
+ lua_resume_thread_internal_full(thread_entry, narg, loc);
+}
+
+bool lua_thread_resume_checked_full(struct thread_entry *thread_entry,
+ guint64 expected_generation,
+ int narg,
+ const char *loc)
+{
+ struct rspamd_task *task = thread_entry->task;
+
+ msg_debug_lua_threads("%s: lua_thread_resume_checked_full", loc);
+
+ if (thread_entry->generation != expected_generation) {
+ /*
+ * The entry was recycled (returned to the pool and handed out again)
+ * between the yield and this completion. The coroutine we captured is
+ * gone; resuming now would clobber whoever owns the entry now.
+ */
+ msg_err_task_check("%s: refusing stale coroutine resume: generation "
+ "mismatch (expected %uL, got %uL)",
+ loc, expected_generation,
+ thread_entry->generation);
+
+ return false;
+ }
+
+ if (!lua_thread_resume_guard(thread_entry, loc)) {
+ return false;
+ }
+
+ thread_entry->state = LUA_THREAD_RUNNING;
lua_thread_pool_set_running_entry_for_thread(thread_entry, loc);
lua_resume_thread_internal_full(thread_entry, narg, loc);
+
+ return true;
}
void lua_thread_call_full(struct thread_entry *thread_entry,
int narg, const char *loc)
{
g_assert(lua_status(thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
+ g_assert(thread_entry->state == LUA_THREAD_RUNNING); /* must be freshly acquired */
g_assert(thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
lua_resume_thread_internal_full(thread_entry, narg, loc);
const char *loc)
{
g_assert(lua_status(thread_entry->lua_state) == 0);
+ g_assert(thread_entry->state == LUA_THREAD_RUNNING);
msg_debug_lua_threads("%s: lua_thread_yield_full", loc);
+ thread_entry->state = LUA_THREAD_YIELDED;
+
return lua_yield(thread_entry->lua_state, nresults);
}
typedef void (*lua_thread_error_t)(struct thread_entry *thread, int ret, const char *msg);
+/*
+ * Lifecycle of a pooled Lua coroutine. Transitions are enforced by the
+ * thread pool so that a thread can never be resumed twice, returned while
+ * suspended, or reused after it has been recycled into another task.
+ */
+enum thread_entry_state {
+ LUA_THREAD_FREE = 0, /* idle in the pool or freshly created */
+ LUA_THREAD_RUNNING, /* currently executing Lua on the C stack */
+ LUA_THREAD_YIELDED, /* suspended at an async yield point */
+ LUA_THREAD_DEAD, /* errored/terminated, must not be reused */
+};
+
struct thread_entry {
lua_State *lua_state;
int thread_index;
lua_thread_error_t error_callback;
struct rspamd_task *task;
struct rspamd_config *cfg;
+
+ /*
+ * Bumped on every acquire and release. Async libraries snapshot it at the
+ * yield point and pass it back to lua_thread_resume_checked(): a mismatch
+ * means the entry was recycled while a stale completion was in flight, so
+ * the resume is refused instead of corrupting another task's coroutine.
+ */
+ guint64 generation;
+ enum thread_entry_state state;
};
struct lua_callback_state {
#define lua_thread_resume(thread_entry, narg) \
lua_thread_resume_full(thread_entry, narg, G_STRLOC)
+/**
+ * Like lua_thread_resume(), but additionally verifies that the entry has not
+ * been recycled since the caller snapshotted its generation at the yield
+ * point. If the generation does not match, or the thread is not suspended,
+ * the resume is refused (logged, no-op) rather than risking memory
+ * corruption. Returns true if the thread was actually resumed.
+ *
+ * @param thread_entry
+ * @param expected_generation value of thread_entry->generation captured at yield
+ * @param narg
+ */
+bool lua_thread_resume_checked_full(struct thread_entry *thread_entry,
+ guint64 expected_generation,
+ int narg,
+ const char *loc);
+
+#define lua_thread_resume_checked(thread_entry, expected_generation, narg) \
+ lua_thread_resume_checked_full(thread_entry, expected_generation, narg, G_STRLOC)
+
/**
* Terminates thread pool entry and fill the pool with another thread entry if needed
* @param pool
lua_State *L;
int cbref;
struct thread_entry *thread;
+ guint64 thread_generation;
struct rspamd_async_session *session;
ev_timer ev;
};
}
else if (cbdata->thread) {
/* Sync mode: resume the coroutine */
- lua_thread_resume(cbdata->thread, 0);
+ lua_thread_resume_checked(cbdata->thread, cbdata->thread_generation, 0);
}
g_free(cbdata);
if (cfg && cfg->lua_thread_pool) {
cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+ cbdata->thread_generation = cbdata->thread->generation;
/* Register session event if available so wait_session_events waits for us */
if (session) {