From: Victor Julien Date: Tue, 12 May 2015 09:35:47 +0000 (+0200) Subject: detect: create loader threads X-Git-Tag: suricata-3.0RC1~194 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=eb09118d64003b06edaf455bd2bee0cab1fa467b;p=thirdparty%2Fsuricata.git detect: create loader threads To speed up startup with many tenants, tenant loading will be parallelized. As no tempary threads should be used for these memory allocation heavy tasks, this patch adds new type of 'command' thread that can be used to load and reload tenants. This patch hardcodes the number of loaders to 4. Future work will make it dynamic. The loader thread essentially sleeps constantly. When a tasks is sent to it, it will wake up and execute it. --- diff --git a/src/detect-engine.c b/src/detect-engine.c index db2c71bff3..305f939f9f 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -1677,11 +1677,12 @@ int DetectEngineMultiTenantEnabled(void) * * \param tenant_id the tenant id by which the config is known * \param filename full path of a yaml file + * \param loader_id id of loader thread or -1 * * \retval 0 ok * \retval -1 failed */ -int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename) +int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id) { DetectEngineCtx *de_ctx = NULL; char prefix[64]; @@ -1744,6 +1745,300 @@ error: return -1; } +/** + * \param ctx function specific data + * \param loader_id id of the loader that executed the task + */ +typedef int (*LoaderFunc)(void *ctx, int loader_id); + +typedef struct DetectLoaderTask_ { + LoaderFunc Func; + void *ctx; + TAILQ_ENTRY(DetectLoaderTask_) next; +} DetectLoaderTask; + +typedef struct DetectLoaderControl_ { + int id; + int result; /* 0 for ok, error otherwise */ + SCMutex m; + TAILQ_HEAD(, DetectLoaderTask_) task_list; +} DetectLoaderControl; + +#define NLOADERS 4 +static DetectLoaderControl loaders[NLOADERS]; +static int cur_loader = 0; +void TmThreadWakeupDetectLoaderThreads(void); + +/** \param loader -1 for auto select + * \retval loader_id or negative in case of error */ +int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx) +{ + if (loader_id == -1) { + loader_id = cur_loader; + cur_loader++; + if (cur_loader >= NLOADERS) + cur_loader = 0; + } + if (loader_id >= NLOADERS || loader_id < 0) { + return -ERANGE; + } + + DetectLoaderControl *loader = &loaders[loader_id]; + + DetectLoaderTask *t = SCCalloc(1, sizeof(*t)); + if (t == NULL) + return -ENOMEM; + + t->Func = Func; + t->ctx = func_ctx; + + SCMutexLock(&loader->m); + TAILQ_INSERT_TAIL(&loader->task_list, t, next); + SCMutexUnlock(&loader->m); + + TmThreadWakeupDetectLoaderThreads(); + + SCLogDebug("%d %p %p", loader_id, Func, func_ctx); + return loader_id; +} + +/** \brief wait for loader tasks to complete + * \retval result 0 for ok, -1 for errors */ +int DetectLoadersSync(void) +{ + SCLogDebug("waiting"); + int errors = 0; + int i; + for (i = 0; i < NLOADERS; i++) { + int done = 0; + DetectLoaderControl *loader = &loaders[i]; + while (!done) { + SCMutexLock(&loader->m); + if (TAILQ_EMPTY(&loader->task_list)) { + done = 1; + } + SCMutexUnlock(&loader->m); + } + SCMutexLock(&loader->m); + if (loader->result != 0) { + errors++; + loader->result = 0; + } + SCMutexUnlock(&loader->m); + + } + if (errors) { + SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors); + return -1; + } + SCLogDebug("done"); + return 0; +} + +void DetectLoaderInit(DetectLoaderControl *loader) +{ + memset(loader, 0x00, sizeof(*loader)); + SCMutexInit(&loader->m, NULL); + TAILQ_INIT(&loader->task_list); +} + +void DetectLoadersInit(void) +{ + int i; + for (i = 0; i < NLOADERS; i++) { + DetectLoaderInit(&loaders[i]); + } +} + +typedef struct TenantLoaderCtx_ { + uint32_t tenant_id; + const char *yaml; +} TenantLoaderCtx; + +static int DetectLoaderFuncLoadTenant(void *vctx, int loader_id) +{ + TenantLoaderCtx *ctx = (TenantLoaderCtx *)vctx; + +/* TODO we need to somehow store the loader id for when we free */ + if (DetectEngineMultiTenantLoadTenant(ctx->tenant_id, ctx->yaml, loader_id) != 0) { + return -1; + } + return 0; +} + +int DetectLoaderSetupLoadTenant(uint32_t tenant_id, const char *yaml) +{ + TenantLoaderCtx *t = SCCalloc(1, sizeof(*t)); + if (t == NULL) + return -ENOMEM; + + t->tenant_id = tenant_id; + t->yaml = yaml; + + return DetectLoaderQueueTask(-1, DetectLoaderFuncLoadTenant, t); +} + +/** + * \brief Unpauses all threads present in tv_root + */ +void TmThreadWakeupDetectLoaderThreads() +{ + ThreadVars *tv = NULL; + int i = 0; + + SCMutexLock(&tv_root_lock); + for (i = 0; i < TVT_MAX; i++) { + tv = tv_root[i]; + while (tv != NULL) { + if (strcmp(tv->name,"DetectLoader") == 0) { + BUG_ON(tv->ctrl_cond == NULL); + pthread_cond_broadcast(tv->ctrl_cond); + } + tv = tv->next; + } + } + SCMutexUnlock(&tv_root_lock); + + return; +} + +/** + * \brief Unpauses all threads present in tv_root + */ +void TmThreadContinueDetectLoaderThreads() +{ + ThreadVars *tv = NULL; + int i = 0; + + SCMutexLock(&tv_root_lock); + for (i = 0; i < TVT_MAX; i++) { + tv = tv_root[i]; + while (tv != NULL) { + if (strcmp(tv->name,"DetectLoader") == 0) + TmThreadContinue(tv); + + tv = tv->next; + } + } + SCMutexUnlock(&tv_root_lock); + + return; +} + + +SC_ATOMIC_DECLARE(int, detect_loader_cnt); + +typedef struct DetectLoaderThreadData_ { + uint32_t instance; +} DetectLoaderThreadData; + +static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data) +{ + DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */ + SCLogDebug("detect loader instance %u", ftd->instance); + + /* pass thread data back to caller */ + *data = ftd; + + return TM_ECODE_OK; +} + +static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data) +{ + SCFree(data); + return TM_ECODE_OK; +} + + +static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data) +{ + /* block usr2. usr2 to be handled by the main thread only */ + UtilSignalBlock(SIGUSR2); + + DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data; + BUG_ON(ftd == NULL); + + SCLogDebug("loader thread started"); + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + + /* see if we have tasks */ + + DetectLoaderControl *loader = &loaders[ftd->instance]; + SCMutexLock(&loader->m); + + DetectLoaderTask *task = NULL, *tmptask = NULL; + TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) { + int r = task->Func(task->ctx, ftd->instance); + loader->result |= r; + TAILQ_REMOVE(&loader->task_list, task, next); + SCFree(task); + } + + SCMutexUnlock(&loader->m); + + if (TmThreadsCheckFlag(th_v, THV_KILL)) { + break; + } + + /* just wait until someone wakes us up */ + SCCtrlMutexLock(th_v->ctrl_mutex); + SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex); + SCCtrlMutexUnlock(th_v->ctrl_mutex); + + SCLogDebug("woke up..."); + } + return TM_ECODE_OK; +} + +/** \brief spawn the detect loader manager thread */ +void DetectLoaderThreadSpawn() +{ + uint32_t u; + for (u = 0; u < NLOADERS; u++) { + ThreadVars *tv_loader = NULL; + + char name[32] = ""; + snprintf(name, sizeof(name), "DetectLoader%02u", u+1); + + tv_loader = TmThreadCreateCmdThreadByName("DetectLoader", + "DetectLoader", 1); + BUG_ON(tv_loader == NULL); + + if (tv_loader == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + } + return; +} + +void TmModuleDetectLoaderRegister (void) +{ + tmm_modules[TMM_DETECTLOADER].name = "DetectLoader"; + tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit; + tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit; + tmm_modules[TMM_DETECTLOADER].Management = DetectLoader; + tmm_modules[TMM_DETECTLOADER].cap_flags = 0; + tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name); + + SC_ATOMIC_INIT(detect_loader_cnt); +} + /** * \brief setup multi-detect / multi-tenancy * @@ -1761,6 +2056,11 @@ void DetectEngineMultiTenantSetup(void) int enabled = 0; (void)ConfGetBool("multi-detect.enabled", &enabled); if (enabled == 1) { + DetectLoadersInit(); + TmModuleDetectLoaderRegister(); + DetectLoaderThreadSpawn(); + TmThreadContinueDetectLoaderThreads(); + SCMutexLock(&master->lock); master->multi_tenant_enabled = 1; @@ -1858,7 +2158,7 @@ void DetectEngineMultiTenantSetup(void) } SCLogInfo("tenant id: %u, %s", tenant_id, yaml_node->val); - if (DetectEngineMultiTenantLoadTenant(tenant_id, yaml_node->val) != 0) { + if (DetectLoaderSetupLoadTenant(tenant_id, yaml_node->val) != 0) { /* error logged already */ goto bad_tenant; } @@ -1869,6 +2169,11 @@ void DetectEngineMultiTenantSetup(void) goto error; } } + + /* wait for our loaders to complete their tasks */ + if (DetectLoadersSync() != 0) + goto error; + if (DetectEngineMTApply() < 0) { SCLogError(SC_ERR_DETECT_PREPARE, "initializing the detection engine failed"); goto error; diff --git a/src/detect-engine.h b/src/detect-engine.h index ee2d89de0a..c9ee62e82e 100644 --- a/src/detect-engine.h +++ b/src/detect-engine.h @@ -86,7 +86,7 @@ int DetectEngineReloadIsStart(void); void DetectEngineReloadSetDone(void); int DetectEngineReloadIsDone(void); -int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename); +int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id); int DetectEngineTentantRegisterVlanId(uint32_t tenant_id, uint16_t vlan_id); int DetectEngineTentantUnregisterVlanId(uint32_t tenant_id, uint16_t vlan_id); diff --git a/src/detect.h b/src/detect.h index 2ebfb43f96..590ce24c6b 100644 --- a/src/detect.h +++ b/src/detect.h @@ -719,6 +719,10 @@ typedef struct DetectEngineCtx_ { uint32_t ref_cnt; /** list in master: either active or freelist */ struct DetectEngineCtx_ *next; + + /** id of loader thread 'owning' this de_ctx */ + int loader_id; + } DetectEngineCtx; /* Engine groups profiles (low, medium, high, custom) */ diff --git a/src/runmode-unix-socket.c b/src/runmode-unix-socket.c index bc3e2955c3..1fdd49dfb2 100644 --- a/src/runmode-unix-socket.c +++ b/src/runmode-unix-socket.c @@ -639,7 +639,7 @@ TmEcode UnixSocketRegisterTenant(json_t *cmd, json_t* answer, void *data) SCLogDebug("add-tenant: %d %s", tenant_id, filename); /* 3 load into the system */ - if (DetectEngineMultiTenantLoadTenant(tenant_id, filename) != 0) { + if (DetectEngineMultiTenantLoadTenant(tenant_id, filename, -1) != 0) { json_object_set_new(answer, "message", json_string("adding tenant failed")); return TM_ECODE_FAILED; } diff --git a/src/tm-modules.c b/src/tm-modules.c index c6f8c9ac76..73e9f235fd 100644 --- a/src/tm-modules.c +++ b/src/tm-modules.c @@ -269,6 +269,7 @@ const char * TmModuleTmmIdToString(TmmId id) CASE_CODE (TMM_FLOWMANAGER); CASE_CODE (TMM_FLOWRECYCLER); CASE_CODE (TMM_UNIXMANAGER); + CASE_CODE (TMM_DETECTLOADER); CASE_CODE (TMM_LUALOG); CASE_CODE (TMM_LOGSTATSLOG); CASE_CODE (TMM_RECEIVENETMAP); diff --git a/src/tm-threads-common.h b/src/tm-threads-common.h index e59c2a8764..f6629eaa82 100644 --- a/src/tm-threads-common.h +++ b/src/tm-threads-common.h @@ -104,6 +104,7 @@ typedef enum { TMM_FLOWMANAGER, TMM_FLOWRECYCLER, + TMM_DETECTLOADER, TMM_UNIXMANAGER,