* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "unix-std.h"
#include "lua_compress.h"
#include "libmime/email_addr.h"
LUA_FUNCTION_DEF(ev_base, timestamp);
LUA_FUNCTION_DEF(ev_base, pending_events);
LUA_FUNCTION_DEF(ev_base, add_timer);
+LUA_FUNCTION_DEF(ev_base, sleep);
static const struct luaL_reg ev_baselib_m[] = {
LUA_INTERFACE_DEF(ev_base, loop),
LUA_INTERFACE_DEF(ev_base, timestamp),
LUA_INTERFACE_DEF(ev_base, pending_events),
LUA_INTERFACE_DEF(ev_base, add_timer),
+ LUA_INTERFACE_DEF(ev_base, sleep),
{"__tostring", rspamd_lua_class_tostring},
{NULL, NULL}};
return 0;
}
+
+struct rspamd_ev_base_sleep_cbdata {
+ lua_State *L;
+ int cbref;
+ struct thread_entry *thread;
+ ev_timer ev;
+};
+
+static void
+lua_ev_base_sleep_cb(struct ev_loop *loop, struct ev_timer *t, int events)
+{
+ struct rspamd_ev_base_sleep_cbdata *cbdata =
+ (struct rspamd_ev_base_sleep_cbdata *) t->data;
+
+ ev_timer_stop(loop, t);
+
+ if (cbdata->cbref != -1) {
+ /* Async mode: call the callback */
+ lua_State *L = cbdata->L;
+
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ int err_idx = lua_gettop(L);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbdata->cbref);
+
+ if (lua_pcall(L, 0, 0, err_idx) != 0) {
+ msg_err("call to sleep callback failed: %s", lua_tostring(L, -1));
+ }
+
+ lua_settop(L, err_idx - 1);
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->cbref);
+ }
+ else if (cbdata->thread) {
+ /* Sync mode: resume the coroutine */
+ lua_thread_resume(cbdata->thread, 0);
+ }
+
+ g_free(cbdata);
+}
+
+/***
+ * @method ev_base:sleep(time[, callback])
+ * Sleep for the specified time. If callback is provided, it's called asynchronously
+ * after the timeout. If no callback, this yields the current coroutine and resumes
+ * it after the timeout (synchronous mode).
+ * @param {number} time timeout in seconds
+ * @param {function} callback optional callback for async mode
+ */
+static int
+lua_ev_base_sleep(lua_State *L)
+{
+ struct ev_loop *ev_base;
+
+ ev_base = lua_check_ev_base(L, 1);
+
+ if (!lua_isnumber(L, 2)) {
+ return luaL_error(L, "invalid arguments: timeout expected");
+ }
+
+ double timeout = lua_tonumber(L, 2);
+
+ struct rspamd_ev_base_sleep_cbdata *cbdata = g_malloc0(sizeof(*cbdata));
+ cbdata->L = L;
+ cbdata->ev.data = cbdata;
+ cbdata->cbref = -1;
+ cbdata->thread = NULL;
+
+ if (lua_isfunction(L, 3)) {
+ /* Async mode with callback */
+ lua_pushvalue(L, 3);
+ cbdata->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+ ev_timer_init(&cbdata->ev, lua_ev_base_sleep_cb, timeout, 0.0);
+ ev_timer_start(ev_base, &cbdata->ev);
+
+ return 0;
+ }
+ else {
+ /* Sync mode using coroutines - get config from global */
+ lua_getglobal(L, "rspamd_config");
+
+ if (lua_type(L, -1) == LUA_TUSERDATA) {
+ struct rspamd_config *cfg = lua_check_config(L, -1);
+ lua_pop(L, 1);
+
+ if (cfg && cfg->lua_thread_pool) {
+ cbdata->thread = lua_thread_pool_get_running_entry(cfg->lua_thread_pool);
+
+ ev_timer_init(&cbdata->ev, lua_ev_base_sleep_cb, timeout, 0.0);
+ ev_timer_start(ev_base, &cbdata->ev);
+
+ return lua_thread_yield(cbdata->thread, 0);
+ }
+ }
+ else {
+ lua_pop(L, 1);
+ }
+
+ /* No thread pool available, cannot do sync sleep */
+ g_free(cbdata);
+ return luaL_error(L, "sync sleep requires lua_thread_pool (use callback for async)");
+ }
+}