From: Vsevolod Stakhov Date: Mon, 31 Oct 2016 17:03:00 +0000 (+0000) Subject: [CritFix] Fix workers scripts by sharing workers configs X-Git-Tag: 1.4.0~163 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=098b9ae4891070d2951094a0324fedd835db74bb;p=thirdparty%2Frspamd.git [CritFix] Fix workers scripts by sharing workers configs --- diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 1ca6118a5c..1980c1455c 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -195,6 +195,7 @@ struct rspamd_worker_conf { gpointer *ctx; /**< worker's context */ ucl_object_t *options; /**< other worker's options */ struct rspamd_worker_lua_script *scripts; /**< registered lua scripts */ + ref_entry_t ref; }; enum rspamd_log_format_type { diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index f5cf5d2326..9daa90af8e 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -28,6 +28,7 @@ #include "unix-std.h" #include "libutil/multipattern.h" #include "monitored.h" +#include "ref.h" #include #define DEFAULT_SCORE 10.0 @@ -939,22 +940,32 @@ rspamd_config_new_group (struct rspamd_config *cfg, struct metric *metric, return gr; } +static void +rspamd_worker_conf_dtor (struct rspamd_worker_conf *wcf) +{ + if (wcf) { + g_queue_free (wcf->active_workers); + g_hash_table_unref (wcf->params); + g_slice_free1 (sizeof (*wcf), wcf); + } +} + +static void +rspamd_worker_conf_cfg_fin (gpointer d) +{ + struct rspamd_worker_conf *wcf = d; + + REF_RELEASE (wcf); +} + struct rspamd_worker_conf * rspamd_config_new_worker (struct rspamd_config *cfg, struct rspamd_worker_conf *c) { if (c == NULL) { - c = - rspamd_mempool_alloc0 (cfg->cfg_pool, - sizeof (struct rspamd_worker_conf)); + c = g_slice_alloc0 (sizeof (struct rspamd_worker_conf)); c->params = g_hash_table_new (rspamd_str_hash, rspamd_str_equal); c->active_workers = g_queue_new (); - rspamd_mempool_add_destructor (cfg->cfg_pool, - (rspamd_mempool_destruct_t)g_hash_table_destroy, - c->params); - rspamd_mempool_add_destructor (cfg->cfg_pool, - (rspamd_mempool_destruct_t)g_queue_free, - c->active_workers); #ifdef HAVE_SC_NPROCESSORS_ONLN c->count = sysconf (_SC_NPROCESSORS_ONLN); #else @@ -962,6 +973,10 @@ rspamd_config_new_worker (struct rspamd_config *cfg, #endif c->rlimit_nofile = 0; c->rlimit_maxcore = 0; + + REF_INIT_RETAIN (c, rspamd_worker_conf_dtor); + rspamd_mempool_add_destructor (cfg->cfg_pool, + rspamd_worker_conf_cfg_fin, c); } return c; diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 4c1bb4a1b2..5d0747a662 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -550,8 +550,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, wrk->srv = rspamd_main; wrk->type = cf->type; - wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf)); - memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf)); + wrk->cf = cf; + REF_RETAIN (cf); wrk->index = index; wrk->ctx = cf->ctx; wrk->finish_actions = g_ptr_array_new (); diff --git a/src/log_helper.c b/src/log_helper.c index 96075ea478..74541f62b0 100644 --- a/src/log_helper.c +++ b/src/log_helper.c @@ -173,6 +173,8 @@ start_log_helper (struct rspamd_worker *worker) { struct log_helper_ctx *ctx = worker->ctx; gssize r = -1; + gint nscripts = 0; + struct rspamd_worker_lua_script *tmp; static struct rspamd_srv_command srv_cmd; ctx->ev_base = rspamd_prepare_worker (worker, @@ -183,6 +185,9 @@ start_log_helper (struct rspamd_worker *worker) ctx->scripts = worker->cf->scripts; ctx->L = ctx->cfg->lua_state; + DL_COUNT (worker->cf->scripts, tmp, nscripts); + msg_info ("started log_helper worker with %d scripts", nscripts); + #ifdef HAVE_SOCK_SEQPACKET r = socketpair (AF_LOCAL, SOCK_SEQPACKET, 0, ctx->pair); #endif diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index b9a727efad..7025d8fdac 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -2009,7 +2009,6 @@ lua_config_register_worker_script (lua_State *L) for (cur = g_list_first (cfg->workers); cur != NULL; cur = g_list_next (cur)) { cf = cur->data; - wtype = g_quark_to_string (cf->type); if (g_ascii_strcasecmp (wtype, worker_type) == 0) { diff --git a/src/plugins/lua/fann_scores.lua b/src/plugins/lua/fann_scores.lua index 6a63e2e6eb..0d9e00435b 100644 --- a/src/plugins/lua/fann_scores.lua +++ b/src/plugins/lua/fann_scores.lua @@ -559,7 +559,7 @@ else if opts['train']['max_epoch'] then max_epoch = opts['train']['max_epoch'] end - cfg:register_worker_script("log_helper", + local ret = cfg:register_worker_script("log_helper", function(score, req_score, results, cf, id, extra) -- map (snd x) (filter (fst x == module_id) extra) local extra_fann = map(function(e) return e[2] end, @@ -572,6 +572,10 @@ else opts['train'], extra_fann) end end) + + if not ret then + rspamd_logger.errx(cfg, 'cannot find worker "log_helper"') + end end) rspamd_plugins["fann_score"] = { log_callback = function(task) diff --git a/src/rspamd.c b/src/rspamd.c index f12a3b5e1d..d9f2b9a76f 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -692,7 +692,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) WTERMSIG (res) == SIGKILL ? "hardly" : "softly"); event_del (&w->srv_ev); g_ptr_array_free (w->finish_actions, TRUE); - g_free (w->cf); + REF_RELEASE (w->cf); g_free (w); return TRUE;