From: Vsevolod Stakhov Date: Thu, 25 Aug 2016 11:42:13 +0000 (+0100) Subject: [Feature] Implement finish scripts for worker processes X-Git-Tag: 1.4.0~563 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=23f831e0c58959e22c24ecd2bbd21a8a91183823;p=thirdparty%2Frspamd.git [Feature] Implement finish scripts for worker processes --- diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index f66361a417..7c47f1e4bb 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -400,6 +400,7 @@ struct rspamd_config { struct worker_s **compiled_workers; /**< list of compiled C modules */ GList *dynamic_modules; /**< list of dynamic C modules */ GList *dynamic_workers; /**< list of dynamic C modules */ + struct rspamd_config_post_load_script *finish_callbacks; /**< list of callbacks called on worker's termination */ struct rspamd_log_format *log_format; /**< parsed log format */ gchar *log_format_str; /**< raw log format string */ diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index b0ae4a3aec..9fed83c9a5 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -175,6 +175,7 @@ rspamd_config_free (struct rspamd_config *cfg) { struct rspamd_dynamic_module *dyn_mod; struct rspamd_dynamic_worker *dyn_wrk; + struct rspamd_config_post_load_script *sc, *sctmp; GList *cur; rspamd_map_remove_all (cfg); @@ -221,6 +222,16 @@ rspamd_config_free (struct rspamd_config *cfg) cur = g_list_next (cur); } + DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) { + luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref); + g_slice_free1 (sizeof (*sc), sc); + } + + DL_FOREACH_SAFE (cfg->on_load, sc, sctmp) { + luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref); + g_slice_free1 (sizeof (*sc), sc); + } + g_list_free (cfg->classifiers); g_list_free (cfg->metrics_list); rspamd_symbols_cache_destroy (cfg->cache); diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index 2ec2354851..1fefdc2cb1 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -354,5 +354,15 @@ gsize lua_logger_out_type (lua_State *L, gint pos, gchar *outbuf, */ void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname); +/** + * Call finishing script with the specified task + * @param L + * @param sc + * @param task + */ +void lua_call_finish_script (lua_State *L, struct + rspamd_config_post_load_script *sc, + struct rspamd_task *task); + #endif /* WITH_LUA */ #endif /* RSPAMD_LUA_H */ diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index ddd5d91461..1bb8ecc778 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -442,6 +442,15 @@ LUA_FUNCTION_DEF (config, get_symbol_callback); */ LUA_FUNCTION_DEF (config, set_symbol_callback); +/*** + * @method register_finish_script(callback) + * Adds new callback that is called on worker process termination when all + * tasks pending are processed + * + * @param callback {function} a fucntion with one argument (rspamd_task) + */ +LUA_FUNCTION_DEF (config, register_finish_script); + static const struct luaL_reg configlib_m[] = { LUA_INTERFACE_DEF (config, get_module_opt), LUA_INTERFACE_DEF (config, get_mempool), @@ -474,6 +483,7 @@ static const struct luaL_reg configlib_m[] = { LUA_INTERFACE_DEF (config, get_symbols_count), LUA_INTERFACE_DEF (config, get_symbol_callback), LUA_INTERFACE_DEF (config, set_symbol_callback), + LUA_INTERFACE_DEF (config, register_finish_script), {"__tostring", rspamd_lua_class_tostring}, {"__newindex", lua_config_newindex}, {NULL, NULL} @@ -1766,7 +1776,7 @@ lua_config_add_on_load (lua_State *L) return luaL_error (L, "invalid arguments"); } - sc = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*sc)); + sc = g_slice_alloc0 (sizeof (*sc)); lua_pushvalue (L, 2); sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX); DL_APPEND (cfg->on_load, sc); @@ -1860,6 +1870,26 @@ lua_config_set_symbol_callback (lua_State *L) return 1; } +static gint +lua_config_register_finish_script (lua_State *L) +{ + struct rspamd_config *cfg = lua_check_config (L, 1); + struct rspamd_config_post_load_script *sc; + + if (cfg != NULL && lua_type (L, 2) == LUA_TFUNCTION) { + sc = g_slice_alloc0 (sizeof (*sc)); + lua_pushvalue (L, 2); + sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX); + DL_APPEND (cfg->finish_callbacks, sc); + } + else { + return luaL_error (L, "invalid arguments"); + } + + return 0; +} + + void luaopen_config (lua_State * L) { @@ -1867,3 +1897,32 @@ luaopen_config (lua_State * L) lua_pop (L, 1); } + +void +lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc, + struct rspamd_task *task) +{ + struct rspamd_task **ptask; + gint err_idx; + GString *tb; + + lua_pushcfunction (L, &rspamd_lua_traceback); + err_idx = lua_gettop (L); + + lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref); + + ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall (L, 1, 0, err_idx) != 0) { + tb = lua_touserdata (L, -1); + msg_err_task ("call to finishing script failed: %v", tb); + g_string_free (tb, TRUE); + lua_pop (L, 1); + } + + lua_pop (L, 1); /* Error function */ + + return; +} diff --git a/src/worker.c b/src/worker.c index 60c39b2af1..097f7c2e19 100644 --- a/src/worker.c +++ b/src/worker.c @@ -72,15 +72,47 @@ worker_t normal_worker = { G_STRFUNC, \ __VA_ARGS__) +static void +rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) +{ + struct rspamd_task *task; + struct rspamd_config *cfg = worker->srv->cfg; + struct rspamd_worker_ctx *ctx; + struct rspamd_config_post_load_script *sc; + + if (cfg->finish_callbacks) { + ctx = worker->ctx; + /* Create a fake task object for async events */ + task = rspamd_task_new (worker, cfg); + task->resolver = ctx->resolver; + task->ev_base = ctx->ev_base; + task->s = rspamd_session_create (task->task_pool, + NULL, + NULL, + (event_finalizer_t) rspamd_task_free, + task); + + DL_FOREACH (cfg->finish_callbacks, sc) { + lua_call_finish_script (cfg->lua_state, sc, task); + } + } + +} + /* * Reduce number of tasks proceeded */ static void reduce_tasks_count (gpointer arg) { - guint *nconns = arg; + struct rspamd_worker *worker = arg; - (*nconns)--; + worker->nconns --; + + if (worker->wanna_die && worker->nconns == 0) { + msg_info ("performing finishing actions"); + rspamd_worker_call_finish_handlers (worker); + } } static void @@ -344,7 +376,7 @@ accept_socket (gint fd, short what, void *arg) task->ev_base = ctx->ev_base; worker->nconns++; rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns); + (rspamd_mempool_destruct_t)reduce_tasks_count, worker); /* Set up async session */ task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, @@ -525,6 +557,15 @@ init_worker (struct rspamd_config *cfg) return ctx; } +static void +rspamd_worker_on_terminate (struct rspamd_worker *worker) +{ + if (worker->nconns == 0) { + msg_info ("performing finishing actions"); + rspamd_worker_call_finish_handlers (worker); + } +} + /* * Start worker process */ @@ -549,6 +590,8 @@ start_worker (struct rspamd_worker *worker) /* XXX: stupid default */ ctx->keys_cache = rspamd_keypair_cache_new (256); rspamd_stat_init (worker->srv->cfg, ctx->ev_base); + g_ptr_array_add (worker->finish_actions, + (gpointer) rspamd_worker_on_terminate); #ifdef WITH_HYPERSCAN rspamd_control_worker_add_cmd_handler (worker,