struct worker_s **compiled_workers; /**< list of compiled C modules */
GList *dynamic_modules; /**< list of dynamic C modules */
GList *dynamic_workers; /**< list of dynamic C modules */
+ struct rspamd_config_post_load_script *finish_callbacks; /**< list of callbacks called on worker's termination */
struct rspamd_log_format *log_format; /**< parsed log format */
gchar *log_format_str; /**< raw log format string */
{
struct rspamd_dynamic_module *dyn_mod;
struct rspamd_dynamic_worker *dyn_wrk;
+ struct rspamd_config_post_load_script *sc, *sctmp;
GList *cur;
rspamd_map_remove_all (cfg);
cur = g_list_next (cur);
}
+ DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) {
+ luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
+ g_slice_free1 (sizeof (*sc), sc);
+ }
+
+ DL_FOREACH_SAFE (cfg->on_load, sc, sctmp) {
+ luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
+ g_slice_free1 (sizeof (*sc), sc);
+ }
+
g_list_free (cfg->classifiers);
g_list_free (cfg->metrics_list);
rspamd_symbols_cache_destroy (cfg->cache);
*/
void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname);
+/**
+ * Call finishing script with the specified task
+ * @param L
+ * @param sc
+ * @param task
+ */
+void lua_call_finish_script (lua_State *L, struct
+ rspamd_config_post_load_script *sc,
+ struct rspamd_task *task);
+
#endif /* WITH_LUA */
#endif /* RSPAMD_LUA_H */
*/
LUA_FUNCTION_DEF (config, set_symbol_callback);
+/***
+ * @method register_finish_script(callback)
+ * Adds new callback that is called on worker process termination when all
+ * tasks pending are processed
+ *
+ * @param callback {function} a fucntion with one argument (rspamd_task)
+ */
+LUA_FUNCTION_DEF (config, register_finish_script);
+
static const struct luaL_reg configlib_m[] = {
LUA_INTERFACE_DEF (config, get_module_opt),
LUA_INTERFACE_DEF (config, get_mempool),
LUA_INTERFACE_DEF (config, get_symbols_count),
LUA_INTERFACE_DEF (config, get_symbol_callback),
LUA_INTERFACE_DEF (config, set_symbol_callback),
+ LUA_INTERFACE_DEF (config, register_finish_script),
{"__tostring", rspamd_lua_class_tostring},
{"__newindex", lua_config_newindex},
{NULL, NULL}
return luaL_error (L, "invalid arguments");
}
- sc = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*sc));
+ sc = g_slice_alloc0 (sizeof (*sc));
lua_pushvalue (L, 2);
sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
DL_APPEND (cfg->on_load, sc);
return 1;
}
+static gint
+lua_config_register_finish_script (lua_State *L)
+{
+ struct rspamd_config *cfg = lua_check_config (L, 1);
+ struct rspamd_config_post_load_script *sc;
+
+ if (cfg != NULL && lua_type (L, 2) == LUA_TFUNCTION) {
+ sc = g_slice_alloc0 (sizeof (*sc));
+ lua_pushvalue (L, 2);
+ sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ DL_APPEND (cfg->finish_callbacks, sc);
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
+
+ return 0;
+}
+
+
void
luaopen_config (lua_State * L)
{
lua_pop (L, 1);
}
+
+void
+lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc,
+ struct rspamd_task *task)
+{
+ struct rspamd_task **ptask;
+ gint err_idx;
+ GString *tb;
+
+ lua_pushcfunction (L, &rspamd_lua_traceback);
+ err_idx = lua_gettop (L);
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref);
+
+ ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (L, "rspamd{task}", -1);
+ *ptask = task;
+
+ if (lua_pcall (L, 1, 0, err_idx) != 0) {
+ tb = lua_touserdata (L, -1);
+ msg_err_task ("call to finishing script failed: %v", tb);
+ g_string_free (tb, TRUE);
+ lua_pop (L, 1);
+ }
+
+ lua_pop (L, 1); /* Error function */
+
+ return;
+}
G_STRFUNC, \
__VA_ARGS__)
+static void
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+ struct rspamd_task *task;
+ struct rspamd_config *cfg = worker->srv->cfg;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_config_post_load_script *sc;
+
+ if (cfg->finish_callbacks) {
+ ctx = worker->ctx;
+ /* Create a fake task object for async events */
+ task = rspamd_task_new (worker, cfg);
+ task->resolver = ctx->resolver;
+ task->ev_base = ctx->ev_base;
+ task->s = rspamd_session_create (task->task_pool,
+ NULL,
+ NULL,
+ (event_finalizer_t) rspamd_task_free,
+ task);
+
+ DL_FOREACH (cfg->finish_callbacks, sc) {
+ lua_call_finish_script (cfg->lua_state, sc, task);
+ }
+ }
+
+}
+
/*
* Reduce number of tasks proceeded
*/
static void
reduce_tasks_count (gpointer arg)
{
- guint *nconns = arg;
+ struct rspamd_worker *worker = arg;
- (*nconns)--;
+ worker->nconns --;
+
+ if (worker->wanna_die && worker->nconns == 0) {
+ msg_info ("performing finishing actions");
+ rspamd_worker_call_finish_handlers (worker);
+ }
}
static void
task->ev_base = ctx->ev_base;
worker->nconns++;
rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns);
+ (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
/* Set up async session */
task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
return ctx;
}
+static void
+rspamd_worker_on_terminate (struct rspamd_worker *worker)
+{
+ if (worker->nconns == 0) {
+ msg_info ("performing finishing actions");
+ rspamd_worker_call_finish_handlers (worker);
+ }
+}
+
/*
* Start worker process
*/
/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
+ g_ptr_array_add (worker->finish_actions,
+ (gpointer) rspamd_worker_on_terminate);
#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,