]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Implement finish scripts for worker processes
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 25 Aug 2016 11:42:13 +0000 (12:42 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 25 Aug 2016 11:42:13 +0000 (12:42 +0100)
src/libserver/cfg_file.h
src/libserver/cfg_utils.c
src/lua/lua_common.h
src/lua/lua_config.c
src/worker.c

index f66361a417318aa63e815df3a5f353311489a4ed..7c47f1e4bba36ad1146722443d34ae43c2b383c0 100644 (file)
@@ -400,6 +400,7 @@ struct rspamd_config {
        struct worker_s **compiled_workers;                             /**< list of compiled C modules                                                 */
        GList *dynamic_modules;                                                 /**< list of dynamic C modules                                                  */
        GList *dynamic_workers;                                                 /**< list of dynamic C modules                                                  */
+       struct rspamd_config_post_load_script *finish_callbacks; /**< list of callbacks called on worker's termination  */
        struct rspamd_log_format *log_format;                   /**< parsed log format                                                                  */
        gchar *log_format_str;                                                  /**< raw log format string                                                              */
 
index b0ae4a3aec86a4f212f5c6c8fdc2e666de50dacb..9fed83c9a50360ed287ea1a0460e3b2701f73f37 100644 (file)
@@ -175,6 +175,7 @@ rspamd_config_free (struct rspamd_config *cfg)
 {
        struct rspamd_dynamic_module *dyn_mod;
        struct rspamd_dynamic_worker *dyn_wrk;
+       struct rspamd_config_post_load_script *sc, *sctmp;
        GList *cur;
 
        rspamd_map_remove_all (cfg);
@@ -221,6 +222,16 @@ rspamd_config_free (struct rspamd_config *cfg)
                cur = g_list_next (cur);
        }
 
+       DL_FOREACH_SAFE (cfg->finish_callbacks, sc, sctmp) {
+               luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
+               g_slice_free1 (sizeof (*sc), sc);
+       }
+
+       DL_FOREACH_SAFE (cfg->on_load, sc, sctmp) {
+               luaL_unref (cfg->lua_state, LUA_REGISTRYINDEX, sc->cbref);
+               g_slice_free1 (sizeof (*sc), sc);
+       }
+
        g_list_free (cfg->classifiers);
        g_list_free (cfg->metrics_list);
        rspamd_symbols_cache_destroy (cfg->cache);
index 2ec23548519d468c6f50bd9e85c8968ab06b65dc..1fefdc2cb16e5fe4eb4b21b0ca84d49f42f8b68d 100644 (file)
@@ -354,5 +354,15 @@ gsize lua_logger_out_type (lua_State *L, gint pos, gchar *outbuf,
  */
 void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname);
 
+/**
+ * Call finishing script with the specified task
+ * @param L
+ * @param sc
+ * @param task
+ */
+void lua_call_finish_script (lua_State *L, struct
+               rspamd_config_post_load_script *sc,
+               struct rspamd_task *task);
+
 #endif /* WITH_LUA */
 #endif /* RSPAMD_LUA_H */
index ddd5d9146125530b7a6a3460a55460532919542e..1bb8ecc7784606c0d46d283052990231d15e3916 100644 (file)
@@ -442,6 +442,15 @@ LUA_FUNCTION_DEF (config, get_symbol_callback);
  */
 LUA_FUNCTION_DEF (config, set_symbol_callback);
 
+/***
+ * @method register_finish_script(callback)
+ * Adds new callback that is called on worker process termination when all
+ * tasks pending are processed
+ *
+ * @param callback {function} a fucntion with one argument (rspamd_task)
+ */
+LUA_FUNCTION_DEF (config, register_finish_script);
+
 static const struct luaL_reg configlib_m[] = {
        LUA_INTERFACE_DEF (config, get_module_opt),
        LUA_INTERFACE_DEF (config, get_mempool),
@@ -474,6 +483,7 @@ static const struct luaL_reg configlib_m[] = {
        LUA_INTERFACE_DEF (config, get_symbols_count),
        LUA_INTERFACE_DEF (config, get_symbol_callback),
        LUA_INTERFACE_DEF (config, set_symbol_callback),
+       LUA_INTERFACE_DEF (config, register_finish_script),
        {"__tostring", rspamd_lua_class_tostring},
        {"__newindex", lua_config_newindex},
        {NULL, NULL}
@@ -1766,7 +1776,7 @@ lua_config_add_on_load (lua_State *L)
                return luaL_error (L, "invalid arguments");
        }
 
-       sc = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*sc));
+       sc = g_slice_alloc0 (sizeof (*sc));
        lua_pushvalue (L, 2);
        sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
        DL_APPEND (cfg->on_load, sc);
@@ -1860,6 +1870,26 @@ lua_config_set_symbol_callback (lua_State *L)
        return 1;
 }
 
+static gint
+lua_config_register_finish_script (lua_State *L)
+{
+       struct rspamd_config *cfg = lua_check_config (L, 1);
+       struct rspamd_config_post_load_script *sc;
+
+       if (cfg != NULL && lua_type (L, 2) == LUA_TFUNCTION) {
+               sc = g_slice_alloc0 (sizeof (*sc));
+               lua_pushvalue (L, 2);
+               sc->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+               DL_APPEND (cfg->finish_callbacks, sc);
+       }
+       else {
+               return luaL_error (L, "invalid arguments");
+       }
+
+       return 0;
+}
+
+
 void
 luaopen_config (lua_State * L)
 {
@@ -1867,3 +1897,32 @@ luaopen_config (lua_State * L)
 
        lua_pop (L, 1);
 }
+
+void
+lua_call_finish_script (lua_State *L, struct rspamd_config_post_load_script *sc,
+               struct rspamd_task *task)
+{
+       struct rspamd_task **ptask;
+       gint err_idx;
+       GString *tb;
+
+       lua_pushcfunction (L, &rspamd_lua_traceback);
+       err_idx = lua_gettop (L);
+
+       lua_rawgeti (L, LUA_REGISTRYINDEX, sc->cbref);
+
+       ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
+       rspamd_lua_setclass (L, "rspamd{task}", -1);
+       *ptask = task;
+
+       if (lua_pcall (L, 1, 0, err_idx) != 0) {
+               tb = lua_touserdata (L, -1);
+               msg_err_task ("call to finishing script failed: %v", tb);
+               g_string_free (tb, TRUE);
+               lua_pop (L, 1);
+       }
+
+       lua_pop (L, 1); /* Error function */
+
+       return;
+}
index 60c39b2af184e16fcaa8c8089b25bb8b8768a061..097f7c2e193a79f30acce237c17c63b24eed38d2 100644 (file)
@@ -72,15 +72,47 @@ worker_t normal_worker = {
         G_STRFUNC, \
         __VA_ARGS__)
 
+static void
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+       struct rspamd_task *task;
+       struct rspamd_config *cfg = worker->srv->cfg;
+       struct rspamd_worker_ctx *ctx;
+       struct rspamd_config_post_load_script *sc;
+
+       if (cfg->finish_callbacks) {
+               ctx = worker->ctx;
+               /* Create a fake task object for async events */
+               task = rspamd_task_new (worker, cfg);
+               task->resolver = ctx->resolver;
+               task->ev_base = ctx->ev_base;
+               task->s = rspamd_session_create (task->task_pool,
+                               NULL,
+                               NULL,
+                               (event_finalizer_t) rspamd_task_free,
+                               task);
+
+               DL_FOREACH (cfg->finish_callbacks, sc) {
+                       lua_call_finish_script (cfg->lua_state, sc, task);
+               }
+       }
+
+}
+
 /*
  * Reduce number of tasks proceeded
  */
 static void
 reduce_tasks_count (gpointer arg)
 {
-       guint *nconns = arg;
+       struct rspamd_worker *worker = arg;
 
-       (*nconns)--;
+       worker->nconns --;
+
+       if (worker->wanna_die && worker->nconns == 0) {
+               msg_info ("performing finishing actions");
+               rspamd_worker_call_finish_handlers (worker);
+       }
 }
 
 static void
@@ -344,7 +376,7 @@ accept_socket (gint fd, short what, void *arg)
        task->ev_base = ctx->ev_base;
        worker->nconns++;
        rspamd_mempool_add_destructor (task->task_pool,
-               (rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns);
+               (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
 
        /* Set up async session */
        task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
@@ -525,6 +557,15 @@ init_worker (struct rspamd_config *cfg)
        return ctx;
 }
 
+static void
+rspamd_worker_on_terminate (struct rspamd_worker *worker)
+{
+       if (worker->nconns == 0) {
+               msg_info ("performing finishing actions");
+               rspamd_worker_call_finish_handlers (worker);
+       }
+}
+
 /*
  * Start worker process
  */
@@ -549,6 +590,8 @@ start_worker (struct rspamd_worker *worker)
        /* XXX: stupid default */
        ctx->keys_cache = rspamd_keypair_cache_new (256);
        rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
+       g_ptr_array_add (worker->finish_actions,
+                       (gpointer) rspamd_worker_on_terminate);
 
 #ifdef WITH_HYPERSCAN
        rspamd_control_worker_add_cmd_handler (worker,