]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
detect: create loader threads
authorVictor Julien <victor@inliniac.net>
Tue, 12 May 2015 09:35:47 +0000 (11:35 +0200)
committerVictor Julien <victor@inliniac.net>
Thu, 23 Jul 2015 17:36:15 +0000 (19:36 +0200)
To speed up startup with many tenants, tenant loading will be parallelized.
As no tempary threads should be used for these memory allocation heavy
tasks, this patch adds new type of 'command' thread that can be used to
load and reload tenants.

This patch hardcodes the number of loaders to 4. Future work will make it
dynamic.

The loader thread essentially sleeps constantly. When a tasks is sent to
it, it will wake up and execute it.

src/detect-engine.c
src/detect-engine.h
src/detect.h
src/runmode-unix-socket.c
src/tm-modules.c
src/tm-threads-common.h

index db2c71bff3f79de4afbbd7d0ba9ac4e4b4050490..305f939f9f079355f4f49f69454e8b4b74633f5b 100644 (file)
@@ -1677,11 +1677,12 @@ int DetectEngineMultiTenantEnabled(void)
  *
  *  \param tenant_id the tenant id by which the config is known
  *  \param filename full path of a yaml file
+ *  \param loader_id id of loader thread or -1
  *
  *  \retval 0 ok
  *  \retval -1 failed
  */
-int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename)
+int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id)
 {
     DetectEngineCtx *de_ctx = NULL;
     char prefix[64];
@@ -1744,6 +1745,300 @@ error:
     return -1;
 }
 
+/**
+ * \param ctx function specific data
+ * \param loader_id id of the loader that executed the task
+ */
+typedef int (*LoaderFunc)(void *ctx, int loader_id);
+
+typedef struct DetectLoaderTask_ {
+    LoaderFunc Func;
+    void *ctx;
+    TAILQ_ENTRY(DetectLoaderTask_) next;
+} DetectLoaderTask;
+
+typedef struct DetectLoaderControl_ {
+    int id;
+    int result;     /* 0 for ok, error otherwise */
+    SCMutex m;
+    TAILQ_HEAD(, DetectLoaderTask_) task_list;
+} DetectLoaderControl;
+
+#define NLOADERS 4
+static DetectLoaderControl loaders[NLOADERS];
+static int cur_loader = 0;
+void TmThreadWakeupDetectLoaderThreads(void);
+
+/** \param loader -1 for auto select
+ *  \retval loader_id or negative in case of error */
+int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx)
+{
+    if (loader_id == -1) {
+        loader_id = cur_loader;
+        cur_loader++;
+        if (cur_loader >= NLOADERS)
+            cur_loader = 0;
+    }
+    if (loader_id >= NLOADERS || loader_id < 0) {
+        return -ERANGE;
+    }
+
+    DetectLoaderControl *loader = &loaders[loader_id];
+
+    DetectLoaderTask *t = SCCalloc(1, sizeof(*t));
+    if (t == NULL)
+        return -ENOMEM;
+
+    t->Func = Func;
+    t->ctx = func_ctx;
+
+    SCMutexLock(&loader->m);
+    TAILQ_INSERT_TAIL(&loader->task_list, t, next);
+    SCMutexUnlock(&loader->m);
+
+    TmThreadWakeupDetectLoaderThreads();
+
+    SCLogDebug("%d %p %p", loader_id, Func, func_ctx);
+    return loader_id;
+}
+
+/** \brief wait for loader tasks to complete
+ *  \retval result 0 for ok, -1 for errors */
+int DetectLoadersSync(void)
+{
+    SCLogDebug("waiting");
+    int errors = 0;
+    int i;
+    for (i = 0; i < NLOADERS; i++) {
+        int done = 0;
+        DetectLoaderControl *loader = &loaders[i];
+        while (!done) {
+            SCMutexLock(&loader->m);
+            if (TAILQ_EMPTY(&loader->task_list)) {
+                done = 1;
+            }
+            SCMutexUnlock(&loader->m);
+        }
+        SCMutexLock(&loader->m);
+        if (loader->result != 0) {
+            errors++;
+            loader->result = 0;
+        }
+        SCMutexUnlock(&loader->m);
+
+    }
+    if (errors) {
+        SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors);
+        return -1;
+    }
+    SCLogDebug("done");
+    return 0;
+}
+
+void DetectLoaderInit(DetectLoaderControl *loader)
+{
+    memset(loader, 0x00, sizeof(*loader));
+    SCMutexInit(&loader->m, NULL);
+    TAILQ_INIT(&loader->task_list);
+}
+
+void DetectLoadersInit(void)
+{
+    int i;
+    for (i = 0; i < NLOADERS; i++) {
+        DetectLoaderInit(&loaders[i]);
+    }
+}
+
+typedef struct TenantLoaderCtx_ {
+    uint32_t tenant_id;
+    const char *yaml;
+} TenantLoaderCtx;
+
+static int DetectLoaderFuncLoadTenant(void *vctx, int loader_id)
+{
+    TenantLoaderCtx *ctx = (TenantLoaderCtx *)vctx;
+
+/* TODO we need to somehow store the loader id for when we free */
+    if (DetectEngineMultiTenantLoadTenant(ctx->tenant_id, ctx->yaml, loader_id) != 0) {
+        return -1;
+    }
+    return 0;
+}
+
+int DetectLoaderSetupLoadTenant(uint32_t tenant_id, const char *yaml)
+{
+    TenantLoaderCtx *t = SCCalloc(1, sizeof(*t));
+    if (t == NULL)
+        return -ENOMEM;
+
+    t->tenant_id = tenant_id;
+    t->yaml = yaml;
+
+    return DetectLoaderQueueTask(-1, DetectLoaderFuncLoadTenant, t);
+}
+
+/**
+ * \brief Unpauses all threads present in tv_root
+ */
+void TmThreadWakeupDetectLoaderThreads()
+{
+    ThreadVars *tv = NULL;
+    int i = 0;
+
+    SCMutexLock(&tv_root_lock);
+    for (i = 0; i < TVT_MAX; i++) {
+        tv = tv_root[i];
+        while (tv != NULL) {
+            if (strcmp(tv->name,"DetectLoader") == 0) {
+                BUG_ON(tv->ctrl_cond == NULL);
+                pthread_cond_broadcast(tv->ctrl_cond);
+            }
+            tv = tv->next;
+        }
+    }
+    SCMutexUnlock(&tv_root_lock);
+
+    return;
+}
+
+/**
+ * \brief Unpauses all threads present in tv_root
+ */
+void TmThreadContinueDetectLoaderThreads()
+{
+    ThreadVars *tv = NULL;
+    int i = 0;
+
+    SCMutexLock(&tv_root_lock);
+    for (i = 0; i < TVT_MAX; i++) {
+        tv = tv_root[i];
+        while (tv != NULL) {
+            if (strcmp(tv->name,"DetectLoader") == 0)
+                TmThreadContinue(tv);
+
+            tv = tv->next;
+        }
+    }
+    SCMutexUnlock(&tv_root_lock);
+
+    return;
+}
+
+
+SC_ATOMIC_DECLARE(int, detect_loader_cnt);
+
+typedef struct DetectLoaderThreadData_ {
+    uint32_t instance;
+} DetectLoaderThreadData;
+
+static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data)
+{
+    DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData));
+    if (ftd == NULL)
+        return TM_ECODE_FAILED;
+
+    ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */
+    SCLogDebug("detect loader instance %u", ftd->instance);
+
+    /* pass thread data back to caller */
+    *data = ftd;
+
+    return TM_ECODE_OK;
+}
+
+static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data)
+{
+    SCFree(data);
+    return TM_ECODE_OK;
+}
+
+
+static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data)
+{
+    /* block usr2. usr2 to be handled by the main thread only */
+    UtilSignalBlock(SIGUSR2);
+
+    DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data;
+    BUG_ON(ftd == NULL);
+
+    SCLogDebug("loader thread started");
+    while (1)
+    {
+        if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
+            TmThreadsSetFlag(th_v, THV_PAUSED);
+            TmThreadTestThreadUnPaused(th_v);
+            TmThreadsUnsetFlag(th_v, THV_PAUSED);
+        }
+
+        /* see if we have tasks */
+
+        DetectLoaderControl *loader = &loaders[ftd->instance];
+        SCMutexLock(&loader->m);
+
+        DetectLoaderTask *task = NULL, *tmptask = NULL;
+        TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) {
+            int r = task->Func(task->ctx, ftd->instance);
+            loader->result |= r;
+            TAILQ_REMOVE(&loader->task_list, task, next);
+            SCFree(task);
+        }
+
+        SCMutexUnlock(&loader->m);
+
+        if (TmThreadsCheckFlag(th_v, THV_KILL)) {
+            break;
+        }
+
+        /* just wait until someone wakes us up */
+        SCCtrlMutexLock(th_v->ctrl_mutex);
+        SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex);
+        SCCtrlMutexUnlock(th_v->ctrl_mutex);
+
+        SCLogDebug("woke up...");
+    }
+    return TM_ECODE_OK;
+}
+
+/** \brief spawn the detect loader manager thread */
+void DetectLoaderThreadSpawn()
+{
+    uint32_t u;
+    for (u = 0; u < NLOADERS; u++) {
+        ThreadVars *tv_loader = NULL;
+
+        char name[32] = "";
+        snprintf(name, sizeof(name), "DetectLoader%02u", u+1);
+
+        tv_loader = TmThreadCreateCmdThreadByName("DetectLoader",
+                "DetectLoader", 1);
+        BUG_ON(tv_loader == NULL);
+
+        if (tv_loader == NULL) {
+            printf("ERROR: TmThreadsCreate failed\n");
+            exit(1);
+        }
+        if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) {
+            printf("ERROR: TmThreadSpawn failed\n");
+            exit(1);
+        }
+    }
+    return;
+}
+
+void TmModuleDetectLoaderRegister (void)
+{
+    tmm_modules[TMM_DETECTLOADER].name = "DetectLoader";
+    tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit;
+    tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit;
+    tmm_modules[TMM_DETECTLOADER].Management = DetectLoader;
+    tmm_modules[TMM_DETECTLOADER].cap_flags = 0;
+    tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM;
+    SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name);
+
+    SC_ATOMIC_INIT(detect_loader_cnt);
+}
+
 /**
  *  \brief setup multi-detect / multi-tenancy
  *
@@ -1761,6 +2056,11 @@ void DetectEngineMultiTenantSetup(void)
     int enabled = 0;
     (void)ConfGetBool("multi-detect.enabled", &enabled);
     if (enabled == 1) {
+        DetectLoadersInit();
+        TmModuleDetectLoaderRegister();
+        DetectLoaderThreadSpawn();
+        TmThreadContinueDetectLoaderThreads();
+
         SCMutexLock(&master->lock);
         master->multi_tenant_enabled = 1;
 
@@ -1858,7 +2158,7 @@ void DetectEngineMultiTenantSetup(void)
                 }
                 SCLogInfo("tenant id: %u, %s", tenant_id, yaml_node->val);
 
-                if (DetectEngineMultiTenantLoadTenant(tenant_id, yaml_node->val) != 0) {
+                if (DetectLoaderSetupLoadTenant(tenant_id, yaml_node->val) != 0) {
                     /* error logged already */
                     goto bad_tenant;
                 }
@@ -1869,6 +2169,11 @@ void DetectEngineMultiTenantSetup(void)
                     goto error;
             }
         }
+
+        /* wait for our loaders to complete their tasks */
+        if (DetectLoadersSync() != 0)
+            goto error;
+
         if (DetectEngineMTApply() < 0) {
             SCLogError(SC_ERR_DETECT_PREPARE, "initializing the detection engine failed");
             goto error;
index ee2d89de0af5ae05952ebbab442d618cf90bf4ec..c9ee62e82ef88c559b8ac3cdfdb5c22da588879b 100644 (file)
@@ -86,7 +86,7 @@ int DetectEngineReloadIsStart(void);
 void DetectEngineReloadSetDone(void);
 int DetectEngineReloadIsDone(void);
 
-int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename);
+int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id);
 
 int DetectEngineTentantRegisterVlanId(uint32_t tenant_id, uint16_t vlan_id);
 int DetectEngineTentantUnregisterVlanId(uint32_t tenant_id, uint16_t vlan_id);
index 2ebfb43f96d4fdd78d57a345caeaa4f5d13f5b03..590ce24c6b4180b11a15afd583bd3010ad104853 100644 (file)
@@ -719,6 +719,10 @@ typedef struct DetectEngineCtx_ {
     uint32_t ref_cnt;
     /** list in master: either active or freelist */
     struct DetectEngineCtx_ *next;
+
+    /** id of loader thread 'owning' this de_ctx */
+    int loader_id;
+
 } DetectEngineCtx;
 
 /* Engine groups profiles (low, medium, high, custom) */
index bc3e2955c371bd54b7cd8a62d74aa68ac035a6d4..1fdd49dfb224c3c6ccd610361f22497dbc0bd95a 100644 (file)
@@ -639,7 +639,7 @@ TmEcode UnixSocketRegisterTenant(json_t *cmd, json_t* answer, void *data)
     SCLogDebug("add-tenant: %d %s", tenant_id, filename);
 
     /* 3 load into the system */
-    if (DetectEngineMultiTenantLoadTenant(tenant_id, filename) != 0) {
+    if (DetectEngineMultiTenantLoadTenant(tenant_id, filename, -1) != 0) {
         json_object_set_new(answer, "message", json_string("adding tenant failed"));
         return TM_ECODE_FAILED;
     }
index c6f8c9ac76d68bef7c0e53c7d85cb0dee90a65dc..73e9f235fd9d19017866b45ce44f10e07dddc5ff 100644 (file)
@@ -269,6 +269,7 @@ const char * TmModuleTmmIdToString(TmmId id)
         CASE_CODE (TMM_FLOWMANAGER);
         CASE_CODE (TMM_FLOWRECYCLER);
         CASE_CODE (TMM_UNIXMANAGER);
+        CASE_CODE (TMM_DETECTLOADER);
         CASE_CODE (TMM_LUALOG);
         CASE_CODE (TMM_LOGSTATSLOG);
         CASE_CODE (TMM_RECEIVENETMAP);
index e59c2a8764f7f038c5e60de03f0526d61e209aba..f6629eaa82e50f0f3e66805e3f5becade66030ba 100644 (file)
@@ -104,6 +104,7 @@ typedef enum {
 
     TMM_FLOWMANAGER,
     TMM_FLOWRECYCLER,
+    TMM_DETECTLOADER,
 
     TMM_UNIXMANAGER,