*
* \param tenant_id the tenant id by which the config is known
* \param filename full path of a yaml file
+ * \param loader_id id of loader thread or -1
*
* \retval 0 ok
* \retval -1 failed
*/
-int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename)
+int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id)
{
DetectEngineCtx *de_ctx = NULL;
char prefix[64];
return -1;
}
+/**
+ * \param ctx function specific data
+ * \param loader_id id of the loader that executed the task
+ */
+typedef int (*LoaderFunc)(void *ctx, int loader_id);
+
+typedef struct DetectLoaderTask_ {
+ LoaderFunc Func;
+ void *ctx;
+ TAILQ_ENTRY(DetectLoaderTask_) next;
+} DetectLoaderTask;
+
+typedef struct DetectLoaderControl_ {
+ int id;
+ int result; /* 0 for ok, error otherwise */
+ SCMutex m;
+ TAILQ_HEAD(, DetectLoaderTask_) task_list;
+} DetectLoaderControl;
+
+#define NLOADERS 4
+static DetectLoaderControl loaders[NLOADERS];
+static int cur_loader = 0;
+void TmThreadWakeupDetectLoaderThreads(void);
+
+/** \param loader -1 for auto select
+ * \retval loader_id or negative in case of error */
+int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx)
+{
+ if (loader_id == -1) {
+ loader_id = cur_loader;
+ cur_loader++;
+ if (cur_loader >= NLOADERS)
+ cur_loader = 0;
+ }
+ if (loader_id >= NLOADERS || loader_id < 0) {
+ return -ERANGE;
+ }
+
+ DetectLoaderControl *loader = &loaders[loader_id];
+
+ DetectLoaderTask *t = SCCalloc(1, sizeof(*t));
+ if (t == NULL)
+ return -ENOMEM;
+
+ t->Func = Func;
+ t->ctx = func_ctx;
+
+ SCMutexLock(&loader->m);
+ TAILQ_INSERT_TAIL(&loader->task_list, t, next);
+ SCMutexUnlock(&loader->m);
+
+ TmThreadWakeupDetectLoaderThreads();
+
+ SCLogDebug("%d %p %p", loader_id, Func, func_ctx);
+ return loader_id;
+}
+
+/** \brief wait for loader tasks to complete
+ * \retval result 0 for ok, -1 for errors */
+int DetectLoadersSync(void)
+{
+ SCLogDebug("waiting");
+ int errors = 0;
+ int i;
+ for (i = 0; i < NLOADERS; i++) {
+ int done = 0;
+ DetectLoaderControl *loader = &loaders[i];
+ while (!done) {
+ SCMutexLock(&loader->m);
+ if (TAILQ_EMPTY(&loader->task_list)) {
+ done = 1;
+ }
+ SCMutexUnlock(&loader->m);
+ }
+ SCMutexLock(&loader->m);
+ if (loader->result != 0) {
+ errors++;
+ loader->result = 0;
+ }
+ SCMutexUnlock(&loader->m);
+
+ }
+ if (errors) {
+ SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors);
+ return -1;
+ }
+ SCLogDebug("done");
+ return 0;
+}
+
+void DetectLoaderInit(DetectLoaderControl *loader)
+{
+ memset(loader, 0x00, sizeof(*loader));
+ SCMutexInit(&loader->m, NULL);
+ TAILQ_INIT(&loader->task_list);
+}
+
+void DetectLoadersInit(void)
+{
+ int i;
+ for (i = 0; i < NLOADERS; i++) {
+ DetectLoaderInit(&loaders[i]);
+ }
+}
+
+typedef struct TenantLoaderCtx_ {
+ uint32_t tenant_id;
+ const char *yaml;
+} TenantLoaderCtx;
+
+static int DetectLoaderFuncLoadTenant(void *vctx, int loader_id)
+{
+ TenantLoaderCtx *ctx = (TenantLoaderCtx *)vctx;
+
+/* TODO we need to somehow store the loader id for when we free */
+ if (DetectEngineMultiTenantLoadTenant(ctx->tenant_id, ctx->yaml, loader_id) != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int DetectLoaderSetupLoadTenant(uint32_t tenant_id, const char *yaml)
+{
+ TenantLoaderCtx *t = SCCalloc(1, sizeof(*t));
+ if (t == NULL)
+ return -ENOMEM;
+
+ t->tenant_id = tenant_id;
+ t->yaml = yaml;
+
+ return DetectLoaderQueueTask(-1, DetectLoaderFuncLoadTenant, t);
+}
+
+/**
+ * \brief Unpauses all threads present in tv_root
+ */
+void TmThreadWakeupDetectLoaderThreads()
+{
+ ThreadVars *tv = NULL;
+ int i = 0;
+
+ SCMutexLock(&tv_root_lock);
+ for (i = 0; i < TVT_MAX; i++) {
+ tv = tv_root[i];
+ while (tv != NULL) {
+ if (strcmp(tv->name,"DetectLoader") == 0) {
+ BUG_ON(tv->ctrl_cond == NULL);
+ pthread_cond_broadcast(tv->ctrl_cond);
+ }
+ tv = tv->next;
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ return;
+}
+
+/**
+ * \brief Unpauses all threads present in tv_root
+ */
+void TmThreadContinueDetectLoaderThreads()
+{
+ ThreadVars *tv = NULL;
+ int i = 0;
+
+ SCMutexLock(&tv_root_lock);
+ for (i = 0; i < TVT_MAX; i++) {
+ tv = tv_root[i];
+ while (tv != NULL) {
+ if (strcmp(tv->name,"DetectLoader") == 0)
+ TmThreadContinue(tv);
+
+ tv = tv->next;
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ return;
+}
+
+
+SC_ATOMIC_DECLARE(int, detect_loader_cnt);
+
+typedef struct DetectLoaderThreadData_ {
+ uint32_t instance;
+} DetectLoaderThreadData;
+
+static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data)
+{
+ DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData));
+ if (ftd == NULL)
+ return TM_ECODE_FAILED;
+
+ ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */
+ SCLogDebug("detect loader instance %u", ftd->instance);
+
+ /* pass thread data back to caller */
+ *data = ftd;
+
+ return TM_ECODE_OK;
+}
+
+static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data)
+{
+ SCFree(data);
+ return TM_ECODE_OK;
+}
+
+
+static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data)
+{
+ /* block usr2. usr2 to be handled by the main thread only */
+ UtilSignalBlock(SIGUSR2);
+
+ DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data;
+ BUG_ON(ftd == NULL);
+
+ SCLogDebug("loader thread started");
+ while (1)
+ {
+ if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
+ TmThreadsSetFlag(th_v, THV_PAUSED);
+ TmThreadTestThreadUnPaused(th_v);
+ TmThreadsUnsetFlag(th_v, THV_PAUSED);
+ }
+
+ /* see if we have tasks */
+
+ DetectLoaderControl *loader = &loaders[ftd->instance];
+ SCMutexLock(&loader->m);
+
+ DetectLoaderTask *task = NULL, *tmptask = NULL;
+ TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) {
+ int r = task->Func(task->ctx, ftd->instance);
+ loader->result |= r;
+ TAILQ_REMOVE(&loader->task_list, task, next);
+ SCFree(task);
+ }
+
+ SCMutexUnlock(&loader->m);
+
+ if (TmThreadsCheckFlag(th_v, THV_KILL)) {
+ break;
+ }
+
+ /* just wait until someone wakes us up */
+ SCCtrlMutexLock(th_v->ctrl_mutex);
+ SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex);
+ SCCtrlMutexUnlock(th_v->ctrl_mutex);
+
+ SCLogDebug("woke up...");
+ }
+ return TM_ECODE_OK;
+}
+
+/** \brief spawn the detect loader manager thread */
+void DetectLoaderThreadSpawn()
+{
+ uint32_t u;
+ for (u = 0; u < NLOADERS; u++) {
+ ThreadVars *tv_loader = NULL;
+
+ char name[32] = "";
+ snprintf(name, sizeof(name), "DetectLoader%02u", u+1);
+
+ tv_loader = TmThreadCreateCmdThreadByName("DetectLoader",
+ "DetectLoader", 1);
+ BUG_ON(tv_loader == NULL);
+
+ if (tv_loader == NULL) {
+ printf("ERROR: TmThreadsCreate failed\n");
+ exit(1);
+ }
+ if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) {
+ printf("ERROR: TmThreadSpawn failed\n");
+ exit(1);
+ }
+ }
+ return;
+}
+
+void TmModuleDetectLoaderRegister (void)
+{
+ tmm_modules[TMM_DETECTLOADER].name = "DetectLoader";
+ tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit;
+ tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit;
+ tmm_modules[TMM_DETECTLOADER].Management = DetectLoader;
+ tmm_modules[TMM_DETECTLOADER].cap_flags = 0;
+ tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM;
+ SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name);
+
+ SC_ATOMIC_INIT(detect_loader_cnt);
+}
+
/**
* \brief setup multi-detect / multi-tenancy
*
int enabled = 0;
(void)ConfGetBool("multi-detect.enabled", &enabled);
if (enabled == 1) {
+ DetectLoadersInit();
+ TmModuleDetectLoaderRegister();
+ DetectLoaderThreadSpawn();
+ TmThreadContinueDetectLoaderThreads();
+
SCMutexLock(&master->lock);
master->multi_tenant_enabled = 1;
}
SCLogInfo("tenant id: %u, %s", tenant_id, yaml_node->val);
- if (DetectEngineMultiTenantLoadTenant(tenant_id, yaml_node->val) != 0) {
+ if (DetectLoaderSetupLoadTenant(tenant_id, yaml_node->val) != 0) {
/* error logged already */
goto bad_tenant;
}
goto error;
}
}
+
+ /* wait for our loaders to complete their tasks */
+ if (DetectLoadersSync() != 0)
+ goto error;
+
if (DetectEngineMTApply() < 0) {
SCLogError(SC_ERR_DETECT_PREPARE, "initializing the detection engine failed");
goto error;