]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
detect: update detect engine management
authorVictor Julien <victor@inliniac.net>
Sat, 17 Jan 2015 17:44:23 +0000 (18:44 +0100)
committerVictor Julien <victor@inliniac.net>
Thu, 19 Mar 2015 16:51:56 +0000 (17:51 +0100)
Update detect engine management to make it easier to reload the detect
engine.

Core of the new approach is a 'master' ctx, that keeps a list of one or
more detect engines. The detect engines will not be passed to any thread
directly, but instead will only be accessed through the detect engine
thread contexts. As we can replace those atomically, replacing a detect
engine becomes easier.

Each thread keeps a reference to its detect context. When a detect engine
is replaced or removed, it's added to a free list. Once its reference
count reaches 0, it is freed.

src/detect-engine.c
src/detect-engine.h
src/detect.h
src/suricata.c

index 9b2058ddb60a6d6019b7de25ab18fbeb7aabc7cc..b81fe74ef785453004388c9e76499933909c7876 100644 (file)
@@ -101,6 +101,8 @@ static TmEcode DetectEngineThreadCtxInitForLiveRuleSwap(ThreadVars *, void *, vo
 
 static uint8_t DetectEngineCtxLoadConf(DetectEngineCtx *);
 
+static DetectEngineMasterCtx g_master_de_ctx = { SCMUTEX_INITIALIZER, NULL, NULL, };
+
 /* 2 - for each direction */
 DetectEngineAppInspectionEngine *app_inspection_engine[FLOW_PROTO_DEFAULT][ALPROTO_MAX][2];
 
@@ -443,6 +445,208 @@ void DetectEngineRegisterAppInspectionEngine(uint8_t ipproto,
     return;
 }
 
+static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
+{
+    SCEnter();
+
+    int i = 0;
+    int no_of_detect_tvs = 0;
+    ThreadVars *tv = NULL;
+
+    SCLogNotice("rule reload starting");
+
+    /* count detect threads in use */
+    SCMutexLock(&tv_root_lock);
+    tv = tv_root[TVT_PPT];
+    while (tv) {
+        /* obtain the slots for this TV */
+        TmSlot *slots = tv->tm_slots;
+        while (slots != NULL) {
+            TmModule *tm = TmModuleGetById(slots->tm_id);
+
+            if (suricata_ctl_flags != 0) {
+                SCLogInfo("rule reload interupted by engine shutdown");
+                SCMutexUnlock(&tv_root_lock);
+                return -1;
+            }
+
+            if (!(tm->flags & TM_FLAG_DETECT_TM)) {
+                slots = slots->slot_next;
+                continue;
+            }
+            no_of_detect_tvs++;
+            break;
+        }
+
+        tv = tv->next;
+    }
+    SCMutexUnlock(&tv_root_lock);
+
+    if (no_of_detect_tvs == 0) {
+        return -1;
+    }
+
+    /* prepare swap structures */
+    DetectEngineThreadCtx *old_det_ctx[no_of_detect_tvs];
+    DetectEngineThreadCtx *new_det_ctx[no_of_detect_tvs];
+    ThreadVars *detect_tvs[no_of_detect_tvs];
+    memset(old_det_ctx, 0x00, (no_of_detect_tvs * sizeof(DetectEngineThreadCtx *)));
+    memset(new_det_ctx, 0x00, (no_of_detect_tvs * sizeof(DetectEngineThreadCtx *)));
+    memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
+
+    /* start the process of swapping detect threads ctxs */
+
+    /* get reference to tv's and setup new_det_ctx array */
+    SCMutexLock(&tv_root_lock);
+    tv = tv_root[TVT_PPT];
+    while (tv) {
+        /* obtain the slots for this TV */
+        TmSlot *slots = tv->tm_slots;
+        while (slots != NULL) {
+            TmModule *tm = TmModuleGetById(slots->tm_id);
+
+            if (suricata_ctl_flags != 0) {
+                SCMutexUnlock(&tv_root_lock);
+                goto error;
+            }
+
+            if (!(tm->flags & TM_FLAG_DETECT_TM)) {
+                slots = slots->slot_next;
+                continue;
+            }
+
+            old_det_ctx[i] = SC_ATOMIC_GET(slots->slot_data);
+            detect_tvs[i] = tv;
+            TmEcode r = DetectEngineThreadCtxInitForLiveRuleSwap(tv, (void *)new_de_ctx,
+                                                                 (void **)&new_det_ctx[i]);
+            i++;
+            if (r == TM_ECODE_FAILED) {
+                SCLogError(SC_ERR_LIVE_RULE_SWAP, "Detect engine thread init "
+                           "failure in live rule swap.  Let's get out of here");
+                SCMutexUnlock(&tv_root_lock);
+                goto error;
+            }
+            SCLogDebug("live rule swap created new det_ctx - %p and de_ctx "
+                       "- %p\n", new_det_ctx, new_de_ctx);
+            break;
+        }
+
+        tv = tv->next;
+    }
+    BUG_ON(i != no_of_detect_tvs);
+
+    /* atomicly replace the det_ctx data */
+    i = 0;
+    tv = tv_root[TVT_PPT];
+    while (tv) {
+        /* find the correct slot */
+        TmSlot *slots = tv->tm_slots;
+        while (slots != NULL) {
+            if (suricata_ctl_flags != 0) {
+                return -1;
+            }
+
+            TmModule *tm = TmModuleGetById(slots->tm_id);
+            if (!(tm->flags & TM_FLAG_DETECT_TM)) {
+                slots = slots->slot_next;
+                continue;
+            }
+            SCLogDebug("swapping new det_ctx - %p with older one - %p",
+                       new_det_ctx[i], SC_ATOMIC_GET(slots->slot_data));
+            (void)SC_ATOMIC_SET(slots->slot_data, new_det_ctx[i++]);
+            break;
+        }
+        tv = tv->next;
+    }
+    SCMutexUnlock(&tv_root_lock);
+
+    /* threads now all have new data, however they may not have started using
+     * it and may still use the old data */
+
+    SCLogInfo("Live rule swap has swapped %d old det_ctx's with new ones, "
+              "along with the new de_ctx", no_of_detect_tvs);
+
+    /* inject a fake packet if the detect thread isn't using the new ctx yet,
+     * this speeds up the process */
+    for (i = 0; i < no_of_detect_tvs; i++) {
+        int break_out = 0;
+        int pseudo_pkt_inserted = 0;
+        usleep(1000);
+        while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
+            if (suricata_ctl_flags != 0) {
+                break_out = 1;
+                break;
+            }
+
+            if (pseudo_pkt_inserted == 0) {
+                pseudo_pkt_inserted = 1;
+                if (detect_tvs[i]->inq != NULL) {
+                    Packet *p = PacketGetFromAlloc();
+                    if (p != NULL) {
+                        p->flags |= PKT_PSEUDO_STREAM_END;
+                        PacketQueue *q = &trans_q[detect_tvs[i]->inq->id];
+                        SCMutexLock(&q->mutex_q);
+                        PacketEnqueue(q, p);
+                        SCCondSignal(&q->cond_q);
+                        SCMutexUnlock(&q->mutex_q);
+                    }
+                }
+            }
+            usleep(1000);
+        }
+        if (break_out)
+            break;
+        SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
+    }
+
+    /* this is to make sure that if someone initiated shutdown during a live
+     * rule swap, the live rule swap won't clean up the old det_ctx and
+     * de_ctx, till all detect threads have stopped working and sitting
+     * silently after setting RUNNING_DONE flag and while waiting for
+     * THV_DEINIT flag */
+    if (i != no_of_detect_tvs) { // not all threads we swapped
+        ThreadVars *tv = tv_root[TVT_PPT];
+        while (tv) {
+            /* obtain the slots for this TV */
+            TmSlot *slots = tv->tm_slots;
+            while (slots != NULL) {
+                TmModule *tm = TmModuleGetById(slots->tm_id);
+                if (!(tm->flags & TM_FLAG_DETECT_TM)) {
+                    slots = slots->slot_next;
+                    continue;
+                }
+
+                while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+                    usleep(100);
+                }
+
+                slots = slots->slot_next;
+            }
+
+            tv = tv->next;
+        }
+    }
+
+    /* free all the ctxs */
+    for (i = 0; i < no_of_detect_tvs; i++) {
+        SCLogDebug("Freeing old_det_ctx - %p used by detect",
+                   old_det_ctx[i]);
+        DetectEngineThreadCtxDeinit(NULL, old_det_ctx[i]);
+    }
+
+    SRepReloadComplete();
+
+    SCLogNotice("rule reload complete");
+    return 0;
+
+ error:
+    for (i = 0; i < no_of_detect_tvs; i++) {
+        if (new_det_ctx[i] != NULL)
+            DetectEngineThreadCtxDeinit(NULL, new_det_ctx[i]);
+    }
+    return -1;
+}
+
 static void *DetectEngineLiveRuleSwap(void *arg)
 {
     SCEnter();
@@ -762,36 +966,6 @@ void DetectEngineSpawnLiveRuleSwapMgmtThread(void)
     SCReturn;
 }
 
-DetectEngineCtx *DetectEngineGetGlobalDeCtx(void)
-{
-    DetectEngineCtx *de_ctx = NULL;
-
-    SCMutexLock(&tv_root_lock);
-
-    ThreadVars *tv = tv_root[TVT_PPT];
-    while (tv) {
-        /* obtain the slots for this TV */
-        TmSlot *slots = tv->tm_slots;
-        while (slots != NULL) {
-            TmModule *tm = TmModuleGetById(slots->tm_id);
-
-            if (tm->flags & TM_FLAG_DETECT_TM) {
-                DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(slots->slot_data);
-                de_ctx = det_ctx->de_ctx;
-                SCMutexUnlock(&tv_root_lock);
-                return de_ctx;
-            }
-
-            slots = slots->slot_next;
-        }
-
-        tv = tv->next;
-    }
-
-    SCMutexUnlock(&tv_root_lock);
-    return NULL;
-}
-
 DetectEngineCtx *DetectEngineCtxInit(void)
 {
     DetectEngineCtx *de_ctx;
@@ -1363,10 +1537,6 @@ static TmEcode ThreadCtxDoInit (DetectEngineCtx *de_ctx, DetectEngineThreadCtx *
  */
 TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data)
 {
-    DetectEngineCtx *de_ctx = (DetectEngineCtx *)initdata;
-    if (de_ctx == NULL)
-        return TM_ECODE_FAILED;
-
     /* first register the counter. In delayed detect mode we exit right after if the
      * rules haven't been loaded yet. */
     uint16_t counter_alerts = SCPerfTVRegisterCounter("detect.alert", tv,
@@ -1381,21 +1551,22 @@ TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data)
     uint16_t counter_match_list = SCPerfTVRegisterAvgCounter("detect.match_list", tv,
                                                       SC_PERF_TYPE_UINT64, "NULL");
 #endif
-    if (de_ctx->delayed_detect == 1 && de_ctx->delayed_detect_initialized == 0) {
-        *data = NULL;
-        return TM_ECODE_OK;
-    }
-
     DetectEngineThreadCtx *det_ctx = SCMalloc(sizeof(DetectEngineThreadCtx));
     if (unlikely(det_ctx == NULL))
         return TM_ECODE_FAILED;
     memset(det_ctx, 0, sizeof(DetectEngineThreadCtx));
 
     det_ctx->tv = tv;
-    det_ctx->de_ctx = de_ctx;
+    det_ctx->de_ctx = DetectEngineGetCurrent();
+    if (det_ctx->de_ctx == NULL) {
+        DetectEngineThreadCtxDeinit(tv, det_ctx);
+        return TM_ECODE_FAILED;
+    }
 
-    if (ThreadCtxDoInit(de_ctx, det_ctx) != TM_ECODE_OK)
+    if (ThreadCtxDoInit(det_ctx->de_ctx, det_ctx) != TM_ECODE_OK) {
+        DetectEngineThreadCtxDeinit(tv, det_ctx);
         return TM_ECODE_FAILED;
+    }
 
     /** alert counter setup */
     det_ctx->counter_alerts = counter_alerts;
@@ -1423,19 +1594,18 @@ static TmEcode DetectEngineThreadCtxInitForLiveRuleSwap(ThreadVars *tv, void *in
 {
     *data = NULL;
 
-    DetectEngineCtx *de_ctx = (DetectEngineCtx *)initdata;
-    if (de_ctx == NULL)
-        return TM_ECODE_FAILED;
-
     DetectEngineThreadCtx *det_ctx = SCMalloc(sizeof(DetectEngineThreadCtx));
     if (unlikely(det_ctx == NULL))
         return TM_ECODE_FAILED;
     memset(det_ctx, 0, sizeof(DetectEngineThreadCtx));
 
     det_ctx->tv = tv;
-    det_ctx->de_ctx = de_ctx;
+    det_ctx->de_ctx = DetectEngineGetCurrent();
+    if (det_ctx->de_ctx == NULL) {
+        return TM_ECODE_FAILED;
+    }
 
-    if (ThreadCtxDoInit(de_ctx, det_ctx) != TM_ECODE_OK)
+    if (ThreadCtxDoInit(det_ctx->de_ctx, det_ctx) != TM_ECODE_OK)
         return TM_ECODE_FAILED;
 
     /** alert counter setup */
@@ -1535,6 +1705,7 @@ TmEcode DetectEngineThreadCtxDeinit(ThreadVars *tv, void *data)
     }
 
     DetectEngineThreadCtxDeinitKeywords(det_ctx->de_ctx, det_ctx);
+    DetectEngineDeReference(&det_ctx->de_ctx);
     SCFree(det_ctx);
 
     return TM_ECODE_OK;
@@ -1611,6 +1782,187 @@ void *DetectThreadCtxGetKeywordThreadCtx(DetectEngineThreadCtx *det_ctx, int id)
     return det_ctx->keyword_ctxs_array[id];
 }
 
+DetectEngineCtx *DetectEngineGetCurrent(void)
+{
+    DetectEngineMasterCtx *master = &g_master_de_ctx;
+    SCMutexLock(&master->lock);
+
+    if (master->list == NULL) {
+        SCMutexUnlock(&master->lock);
+        return NULL;
+    }
+
+    master->list->ref_cnt++;
+    SCLogDebug("master->list %p ref_cnt %u", master->list, master->list->ref_cnt);
+    SCMutexUnlock(&master->lock);
+    return master->list;
+}
+
+void DetectEngineDeReference(DetectEngineCtx **de_ctx)
+{
+    BUG_ON((*de_ctx)->ref_cnt == 0);
+    (*de_ctx)->ref_cnt--;
+    *de_ctx = NULL;
+}
+
+static int DetectEngineAddToList(DetectEngineCtx *instance)
+{
+    DetectEngineMasterCtx *master = &g_master_de_ctx;
+
+    if (instance == NULL)
+        return -1;
+
+    if (master->list == NULL) {
+        master->list = instance;
+    } else {
+        instance->next = master->list;
+        master->list = instance;
+    }
+
+    return 0;
+}
+
+int DetectEngineAddToMaster(DetectEngineCtx *de_ctx)
+{
+    int r;
+
+    if (de_ctx == NULL)
+        return -1;
+
+    SCLogDebug("adding de_ctx %p to master", de_ctx);
+
+    DetectEngineMasterCtx *master = &g_master_de_ctx;
+    SCMutexLock(&master->lock);
+    r = DetectEngineAddToList(de_ctx);
+    SCMutexUnlock(&master->lock);
+    return r;
+}
+
+int DetectEngineMoveToFreeList(DetectEngineCtx *de_ctx)
+{
+    DetectEngineMasterCtx *master = &g_master_de_ctx;
+
+    SCMutexLock(&master->lock);
+    DetectEngineCtx *instance = master->list;
+    if (instance == NULL) {
+        SCMutexUnlock(&master->lock);
+        return -1;
+    }
+
+    /* remove from active list */
+    if (instance == de_ctx) {
+        master->list = instance->next;
+    } else {
+        DetectEngineCtx *prev = instance;
+        instance = instance->next; /* already checked first element */
+
+        while (instance) {
+            DetectEngineCtx *next = instance->next;
+
+            if (instance == de_ctx) {
+                prev->next = instance->next;
+                break;
+            }
+
+            prev = instance;
+            instance = next;
+        }
+        if (instance == NULL) {
+            SCMutexUnlock(&master->lock);
+            return -1;
+        }
+    }
+
+    /* instance is now detached from list */
+    instance->next = NULL;
+
+    /* add to free list */
+    if (master->free_list == NULL) {
+        master->free_list = instance;
+    } else {
+        instance->next = master->free_list;
+        master->free_list = instance;
+    }
+    SCLogDebug("detect engine %p moved to free list (%u refs)", de_ctx, de_ctx->ref_cnt);
+
+    SCMutexUnlock(&master->lock);
+    return 0;
+}
+
+void DetectEnginePruneFreeList(void)
+{
+    DetectEngineMasterCtx *master = &g_master_de_ctx;
+    SCMutexLock(&master->lock);
+
+    DetectEngineCtx *prev = NULL;
+    DetectEngineCtx *instance = master->free_list;
+    while (instance) {
+        DetectEngineCtx *next = instance->next;
+
+        SCLogDebug("detect engine %p has %u ref(s)", instance, instance->ref_cnt);
+
+        if (instance->ref_cnt == 0) {
+            if (prev == NULL) {
+                master->free_list = next;
+            } else {
+                prev->next = next;
+            }
+
+            SCLogDebug("freeing detect engine %p", instance);
+            DetectEngineCtxFree(instance);
+            instance = NULL;
+        }
+
+        prev = instance;
+        instance = next;
+    }
+    SCMutexUnlock(&master->lock);
+}
+
+int DetectEngineReload(void)
+{
+    DetectEngineCtx *new_de_ctx = NULL;
+    DetectEngineCtx *old_de_ctx = NULL;
+
+    /* get a reference to the current de_ctx */
+    old_de_ctx = DetectEngineGetCurrent();
+    if (old_de_ctx == NULL)
+        return -1;
+    SCLogDebug("get ref to old_de_ctx %p", old_de_ctx);
+
+    /* get new detection engine */
+    new_de_ctx = DetectEngineCtxInit();
+    if (new_de_ctx == NULL) {
+        DetectEngineDeReference(&old_de_ctx);
+        return -1;
+    }
+    if (SigLoadSignatures(new_de_ctx, NULL, 0) != 0) {
+        DetectEngineCtxFree(new_de_ctx);
+        DetectEngineDeReference(&old_de_ctx);
+        return -1;
+    }
+    SCThresholdConfInitContext(new_de_ctx, NULL);
+    SCLogDebug("set up new_de_ctx %p", new_de_ctx);
+
+    /* add to master */
+    DetectEngineAddToMaster(new_de_ctx);
+
+    /* move to old free list */
+    DetectEngineMoveToFreeList(old_de_ctx);
+    DetectEngineDeReference(&old_de_ctx);
+
+    SCLogDebug("going to reload the threads to use new_de_ctx %p", new_de_ctx);
+    /* update the threads */
+    DetectEngineReloadThreads(new_de_ctx);
+    SCLogDebug("threads now run new_de_ctx %p", new_de_ctx);
+
+    /* walk free list, freeing the old_de_ctx */
+    DetectEnginePruneFreeList();
+
+    SCLogDebug("old_de_ctx should have been freed");
+    return 0;
+}
+
 const char *DetectSigmatchListEnumToString(enum DetectSigmatchListEnum type)
 {
     switch (type) {
index 73b3554f20ffefb2bb644606a950ec4668c4928c..14c6e6f18fd74fe003352963896ed627bd1dd9c5 100644 (file)
@@ -57,7 +57,6 @@ extern DetectEngineAppInspectionEngine *app_inspection_engine[FLOW_PROTO_DEFAULT
 void DetectEngineRegisterAppInspectionEngines(void);
 void DetectEngineSpawnLiveRuleSwapMgmtThread(void);
 DetectEngineCtx *DetectEngineCtxInit(void);
-DetectEngineCtx *DetectEngineGetGlobalDeCtx(void);
 void DetectEngineCtxFree(DetectEngineCtx *);
 
 TmEcode DetectEngineThreadCtxInit(ThreadVars *, void *, void **);
@@ -69,6 +68,13 @@ void DetectEngineResetMaxSigId(DetectEngineCtx *);
 void DetectEngineRegisterTests(void);
 const char *DetectSigmatchListEnumToString(enum DetectSigmatchListEnum type);
 
+int DetectEngineAddToMaster(DetectEngineCtx *de_ctx);
+DetectEngineCtx *DetectEngineGetCurrent(void);
+void DetectEnginePruneFreeList(void);
+int DetectEngineMoveToFreeList(DetectEngineCtx *de_ctx);
+void DetectEngineDeReference(DetectEngineCtx **de_ctx);
+int DetectEngineReload(void);
+
 /**
  * \brief Registers an app inspection engine.
  *
index 03658e42aa47fc914f88152cd73aaa4b3a57f2ed..a5cd359ddb1b382488e31aa07272500acdb4bba8 100644 (file)
@@ -736,6 +736,11 @@ typedef struct DetectEngineCtx_ {
     struct SCProfileKeywordDetectCtx_ *profile_keyword_ctx;
     struct SCProfileKeywordDetectCtx_ *profile_keyword_ctx_per_list[DETECT_SM_LIST_MAX];
 #endif
+
+    /** how many de_ctx' are referencing this */
+    uint32_t ref_cnt;
+    /** list in master: either active or freelist */
+    struct DetectEngineCtx_ *next;
 } DetectEngineCtx;
 
 /* Engine groups profiles (low, medium, high, custom) */
@@ -1041,6 +1046,19 @@ typedef struct SigGroupHead_ {
  *  deal with both cases */
 #define SIGMATCH_OPTIONAL_OPT   (1 << 5)
 
+typedef struct DetectEngineMasterCtx_ {
+    SCMutex lock;
+
+    /** list of active detection engines. This list is used to generate the
+     *  threads det_ctx's */
+    DetectEngineCtx *list;
+
+    /** free list, containing detection engines that will be removed but may
+     *  still be referenced by det_ctx's. Freed as soon as all references are
+     *  gone. */
+    DetectEngineCtx *free_list;
+} DetectEngineMasterCtx;
+
 /** Remember to add the options in SignatureIsIPOnly() at detect.c otherwise it wont be part of a signature group */
 
 enum {
index 12458caf70e84b7f5467debdfb789974b473dc4d..96a61b819038b1e53121286187e0f8f35fbaf697 100644 (file)
@@ -2307,6 +2307,7 @@ int main(int argc, char **argv)
                     "context failed.");
             exit(EXIT_FAILURE);
         }
+
 #ifdef __SC_CUDA_SUPPORT__
         if (PatternMatchDefaultMatcher() == MPM_AC_CUDA)
             CudaVarsSetDeCtx(de_ctx);
@@ -2332,6 +2333,7 @@ int main(int argc, char **argv)
             if (suri.run_mode == RUNMODE_ENGINE_ANALYSIS) {
                 exit(EXIT_SUCCESS);
             }
+            DetectEngineAddToMaster(de_ctx);
         }
     }
 
@@ -2482,11 +2484,6 @@ int main(int argc, char **argv)
         }
     }
 
-    DetectEngineCtx *global_de_ctx = DetectEngineGetGlobalDeCtx();
-    if (suri.run_mode != RUNMODE_UNIX_SOCKET && de_ctx != NULL) {
-        BUG_ON(global_de_ctx == NULL);
-    }
-
     /* before TmThreadKillThreads, as otherwise that kills it
      * but more slowly */
     if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
@@ -2517,9 +2514,14 @@ int main(int argc, char **argv)
 
     AppLayerHtpPrintStats();
 
-    if (global_de_ctx) {
-        DetectEngineCtxFree(global_de_ctx);
+    /** TODO this can do into it's own func */
+    de_ctx = DetectEngineGetCurrent();
+    if (de_ctx) {
+        DetectEngineMoveToFreeList(de_ctx);
+        DetectEngineDeReference(&de_ctx);
     }
+    DetectEnginePruneFreeList();
+
     AppLayerDeSetup();
 
     TagDestroyCtx();