]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flowworker: initial support
authorVictor Julien <victor@inliniac.net>
Tue, 19 Apr 2016 16:06:32 +0000 (18:06 +0200)
committerVictor Julien <victor@inliniac.net>
Fri, 20 May 2016 07:04:17 +0000 (09:04 +0200)
Initial version of the 'FlowWorker' thread module. This module
combines Flow handling, TCP handling, App layer handling and
Detection in a single module. It does all flow related processing
under a single flow lock.

15 files changed:
src/Makefile.am
src/detect-engine.c
src/detect.c
src/flow-worker.c [new file with mode: 0644]
src/flow-worker.h [new file with mode: 0644]
src/runmode-erf-file.c
src/runmode-pcap-file.c
src/runmode-tile.c
src/stream-tcp.c
src/suricata.c
src/tm-modules.c
src/tm-threads-common.h
src/tm-threads.c
src/util-runmodes.c
src/util-validate.h

index b81bb7be562e04f60e2e2c6655cb73c20ee57302..b729388ee337f0cdfa0b06b73d7850c1999977a6 100644 (file)
@@ -222,6 +222,7 @@ flow-storage.c flow-storage.h \
 flow-timeout.c flow-timeout.h \
 flow-util.c flow-util.h \
 flow-var.c flow-var.h \
+flow-worker.c flow-worker.h \
 host.c host.h \
 host-bit.c host-bit.h \
 host-queue.c host-queue.h \
index 6a66a45d9fee2c159372672efa7293f69fef4c9b..34f15652023ba7a3cf44f67537597d2ebf0d1a2b 100644 (file)
@@ -28,6 +28,7 @@
 #include "flow.h"
 #include "flow-private.h"
 #include "flow-util.h"
+#include "flow-worker.h"
 #include "conf.h"
 #include "conf-yaml-loader.h"
 
@@ -695,7 +696,7 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
                 continue;
             }
 
-            old_det_ctx[i] = SC_ATOMIC_GET(slots->slot_data);
+            old_det_ctx[i] = FlowWorkerGetDetectCtxPtr(SC_ATOMIC_GET(slots->slot_data));
             detect_tvs[i] = tv;
 
             new_det_ctx[i] = DetectEngineThreadCtxInitForReload(tv, new_de_ctx, 1);
@@ -733,7 +734,7 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
             }
             SCLogDebug("swapping new det_ctx - %p with older one - %p",
                        new_det_ctx[i], SC_ATOMIC_GET(slots->slot_data));
-            (void)SC_ATOMIC_SET(slots->slot_data, new_det_ctx[i++]);
+            FlowWorkerReplaceDetectCtx(SC_ATOMIC_GET(slots->slot_data), new_det_ctx[i++]);
             break;
         }
         tv = tv->next;
index 03dcad3ab201a465b9f58950f99eacfb61d7b7f3..860b330b3dce89711ef8d8229b2a94c5b5d4024b 100644 (file)
@@ -2052,9 +2052,7 @@ TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQue
 
     if (p->flow) {
         det_ctx->flow_locked = 1;
-        FLOWLOCK_WRLOCK(p->flow);
         DetectFlow(tv, de_ctx, det_ctx, p);
-        FLOWLOCK_UNLOCK(p->flow);
         det_ctx->flow_locked = 0;
     } else {
         DetectNoFlow(tv, de_ctx, det_ctx, p);
diff --git a/src/flow-worker.c b/src/flow-worker.c
new file mode 100644 (file)
index 0000000..d613794
--- /dev/null
@@ -0,0 +1,233 @@
+/* Copyright (C) 2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ *
+ * Flow Workers are single thread modules taking care of (almost)
+ * everything related to packets with flows:
+ *
+ * - Lookup/creation
+ * - Stream tracking, reassembly
+ * - Applayer update
+ * - Detection
+ *
+ * This all while holding the flow lock.
+ *
+ * TODO
+ * - once we have a single entry point into the outputs they
+ *   will have to move into this as well.
+ * - once outputs are here we can also call StreamTcpPrune here
+ *   instead of in the packet pool return code
+ */
+
+#include "suricata-common.h"
+#include "suricata.h"
+
+#include "decode.h"
+#include "stream-tcp.h"
+#include "app-layer.h"
+#include "detect-engine.h"
+
+#include "util-validate.h"
+
+typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
+
+typedef struct FlowWorkerThreadData_ {
+    union {
+        StreamTcpThread *stream_thread;
+        void *stream_thread_ptr;
+    };
+
+    SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
+
+#if 0
+    void *output_thread; // XXX multiple, not a single state
+#endif
+    PacketQueue pq;
+
+} FlowWorkerThreadData;
+
+/** \brief handle flow for packet
+ *
+ *  Handle flow creation/lookup
+ */
+static void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
+{
+    FlowHandlePacketUpdate(p->flow, p);
+
+    /* handle the app layer part of the UDP packet payload */
+    if (p->proto == IPPROTO_UDP) {
+        AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
+    }
+}
+
+static TmEcode FlowWorkerThreadInit(ThreadVars *tv, void *initdata, void **data)
+{
+    FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
+    BUG_ON(fw == NULL);
+    SC_ATOMIC_INIT(fw->detect_thread);
+    SC_ATOMIC_SET(fw->detect_thread, NULL);
+
+    /* setup TCP */
+    BUG_ON(StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK);
+
+    if (DetectEngineEnabled()) {
+        /* setup DETECT */
+        void *detect_thread = NULL;
+        BUG_ON(DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK);
+        SC_ATOMIC_SET(fw->detect_thread, detect_thread);
+    }
+#if 0
+    // setup OUTPUTS
+#endif
+
+    /* setup pq for stream end pkts */
+    memset(&fw->pq, 0, sizeof(PacketQueue));
+    SCMutexInit(&fw->pq.mutex_q, NULL);
+
+    *data = fw;
+    return TM_ECODE_OK;
+}
+
+static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
+{
+    FlowWorkerThreadData *fw = data;
+
+    /* free TCP */
+    StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
+
+    /* free DETECT */
+    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
+    if (detect_thread != NULL)
+        DetectEngineThreadCtxDeinit(tv, detect_thread);
+        SC_ATOMIC_SET(fw->detect_thread, NULL);
+#if 0
+    // free OUTPUT
+#endif
+
+    /* free pq */
+    BUG_ON(fw->pq.len);
+    SCMutexDestroy(&fw->pq.mutex_q);
+
+    SCFree(fw);
+    return TM_ECODE_OK;
+}
+
+TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq);
+TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+
+TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, PacketQueue *unused)
+{
+    FlowWorkerThreadData *fw = data;
+    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
+
+    SCLogDebug("packet %"PRIu64, p->pcap_cnt);
+
+    /* update time */
+    if (!(PKT_IS_PSEUDOPKT(p)))
+        TimeSetByThread(tv->id, &p->ts);
+
+    /* handle Flow */
+    if (p->flags & PKT_WANTS_FLOW) {
+        FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
+        if (likely(p->flow != NULL)) {
+            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+            FlowUpdate(tv, fw->stream_thread, p);
+        }
+        /* Flow is now LOCKED */
+
+    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
+     * pseudo packet created by the flow manager. */
+    } else if (p->flags & PKT_HAS_FLOW) {
+        FLOWLOCK_WRLOCK(p->flow);
+    }
+
+    SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
+
+    /* handle TCP and app layer */
+    if (PKT_IS_TCP(p)) {
+        SCLogDebug("packet %"PRIu64" is TCP", p->pcap_cnt);
+        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+
+        StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL);
+
+        /* Packets here can safely access p->flow as it's locked */
+        SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
+        Packet *x;
+        while ((x = PacketDequeue(&fw->pq))) {
+            SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
+
+            // TODO do we need to call StreamTcp on these pseudo packets or not?
+            //StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL);
+            if (detect_thread != NULL)
+                Detect(tv, x, detect_thread, NULL, NULL);
+#if 0
+            //  Outputs
+#endif
+            /* put these packets in the preq queue so that they are
+             * by the other thread modules before packet 'p'. */
+            PacketEnqueue(preq, x);
+        }
+    }
+
+    /* handle Detect */
+    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+    SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
+
+    if (detect_thread != NULL) {
+        Detect(tv, p, detect_thread, NULL, NULL);
+    }
+#if 0
+    // Outputs
+
+    // StreamTcpPruneSession (from TmqhOutputPacketpool)
+#endif
+
+    if (p->flow) {
+        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+        FLOWLOCK_UNLOCK(p->flow);
+    }
+
+    return TM_ECODE_OK;
+}
+
+void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
+{
+    FlowWorkerThreadData *fw = flow_worker;
+
+    SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
+}
+
+void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
+{
+    FlowWorkerThreadData *fw = flow_worker;
+
+    return SC_ATOMIC_GET(fw->detect_thread);
+}
+
+void TmModuleFlowWorkerRegister (void)
+{
+    tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
+    tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
+    tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
+    tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
+    tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
+    tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
+}
diff --git a/src/flow-worker.h b/src/flow-worker.h
new file mode 100644 (file)
index 0000000..a3528e6
--- /dev/null
@@ -0,0 +1,26 @@
+/* Copyright (C) 2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#ifndef __FLOW_WORKER_H__
+#define __FLOW_WORKER_H__
+
+void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
+void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
+
+void TmModuleFlowWorkerRegister (void);
+
+#endif /* __FLOW_WORKER_H__ */
index e2ffde4b4f481f6ce2661694992fc53e7cd81d9f..5e0b097ab60036c4301094bbda9364625cec91fa 100644 (file)
@@ -94,22 +94,13 @@ int RunModeErfFileSingle(void)
     }
     TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-    tm_module = TmModuleGetByName("StreamTcp");
+    tm_module = TmModuleGetByName("FlowWorker");
     if (tm_module == NULL) {
-        printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+        SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
         exit(EXIT_FAILURE);
     }
     TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-    if (DetectEngineEnabled()) {
-        tm_module = TmModuleGetByName("Detect");
-        if (tm_module == NULL) {
-            printf("ERROR: TmModuleGetByName Detect failed\n");
-            exit(EXIT_FAILURE);
-        }
-        TmSlotSetFuncAppend(tv, tm_module, NULL);
-    }
-
     SetupOutputs(tv);
 
     if (TmThreadSpawn(tv) != TM_ECODE_OK) {
@@ -217,22 +208,14 @@ int RunModeErfFileAutoFp(void)
             printf("ERROR: TmThreadsCreate failed\n");
             exit(EXIT_FAILURE);
         }
-        tm_module = TmModuleGetByName("StreamTcp");
+
+        tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                printf("ERROR: TmModuleGetByName Detect failed\n");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
-        }
-
         if (threading_set_cpu_affinity) {
             TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
             /* If we have more than one core/cpu, the first Detect thread
index cf99fac6fc612e1274eb4f3ad74548d4e78ef02c..c6fea2b2defbb97b77de844a353bc73e2bac8e64 100644 (file)
@@ -100,22 +100,13 @@ int RunModeFilePcapSingle(void)
     }
     TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-    tm_module = TmModuleGetByName("StreamTcp");
+    tm_module = TmModuleGetByName("FlowWorker");
     if (tm_module == NULL) {
-        SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+        SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
         exit(EXIT_FAILURE);
     }
     TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-    if (DetectEngineEnabled()) {
-        tm_module = TmModuleGetByName("Detect");
-        if (tm_module == NULL) {
-            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
-            exit(EXIT_FAILURE);
-        }
-        TmSlotSetFuncAppend(tv, tm_module, NULL);
-    }
-
     SetupOutputs(tv);
 
     TmThreadSetCPU(tv, DETECT_CPU_SET);
@@ -249,22 +240,14 @@ int RunModeFilePcapAutoFp(void)
             SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed");
             exit(EXIT_FAILURE);
         }
-        tm_module = TmModuleGetByName("StreamTcp");
+
+        tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
-        }
-
         TmThreadSetGroupName(tv_detect_ncpu, "Detect");
 
         /* add outputs as well */
index 913ee562f69b59fdc0a0e34d715232d9a9035161..4e94e24dab4e9c01fbd375e1fae15b3db458c199 100644 (file)
@@ -245,22 +245,13 @@ int RunModeTileMpipeWorkers(void)
         }
         TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
 
-        tm_module = TmModuleGetByName("StreamTcp");
+        tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                printf("ERROR: TmModuleGetByName Detect failed\n");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
-        }
-
         tm_module = TmModuleGetByName("RespondReject");
         if (tm_module == NULL) {
             printf("ERROR: TmModuleGetByName for RespondReject failed\n");
index 941211dc4d7b04a0ae994e77dab28131f61baf1f..d1d1470bf3b0f32a7338bb25cee1eba5a61330e4 100644 (file)
@@ -4863,77 +4863,42 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn
     return 0;
 }
 
-void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
-{
-    FlowHandlePacketUpdate(p->flow, p);
-
-    /* handle the app layer part of the UDP packet payload */
-    if (p->proto == IPPROTO_UDP) {
-        AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
-    }
-}
-
 TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
 {
     StreamTcpThread *stt = (StreamTcpThread *)data;
-    TmEcode ret = TM_ECODE_OK;
 
     SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt);
 
-    TimeSetByThread(tv->id, &p->ts);
-
-    if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) {
-        FLOWLOCK_WRLOCK(p->flow);
-        AppLayerProfilingReset(stt->ra_ctx->app_tctx);
-        (void)StreamTcpPacket(tv, p, stt, pq);
-        p->flags |= PKT_IGNORE_CHECKSUM;
-        stt->pkts++;
-        FLOWLOCK_UNLOCK(p->flow);
-        return TM_ECODE_OK;
-    }
-
-    if (!(p->flags & PKT_WANTS_FLOW)) {
-        return TM_ECODE_OK;
-    }
-
-    FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
-    if (likely(p->flow != NULL)) {
-        FlowUpdate(tv, stt, p);
-    }
-
     if (!(PKT_IS_TCP(p))) {
-        goto unlock;
+        return TM_ECODE_OK;
     }
 
     if (p->flow == NULL) {
         StatsIncr(tv, stt->counter_tcp_no_flow);
-        goto unlock;
+        return TM_ECODE_OK;
     }
 
     /* only TCP packets with a flow from here */
 
-    if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
-        if (StreamTcpValidateChecksum(p) == 0) {
-            StatsIncr(tv, stt->counter_tcp_invalid_checksum);
-            goto unlock;
+    if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
+        if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
+            if (StreamTcpValidateChecksum(p) == 0) {
+                StatsIncr(tv, stt->counter_tcp_invalid_checksum);
+                return TM_ECODE_OK;
+            }
+        } else {
+            p->flags |= PKT_IGNORE_CHECKSUM;
         }
     } else {
-        p->flags |= PKT_IGNORE_CHECKSUM;
+        p->flags |= PKT_IGNORE_CHECKSUM; //TODO check that this is set at creation
     }
     AppLayerProfilingReset(stt->ra_ctx->app_tctx);
 
-    ret = StreamTcpPacket(tv, p, stt, pq);
-
-    //if (ret)
-      //  return TM_ECODE_FAILED;
+    (void)StreamTcpPacket(tv, p, stt, pq);
 
     stt->pkts++;
 
- unlock:
-    if (p->flow) {
-        FLOWLOCK_UNLOCK(p->flow);
-    }
-    return ret;
+    return TM_ECODE_OK;
 }
 
 TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
index 37ba4aa90f8e4d33397f053e565a085863227942..71a742c421b852df9efeb0124fa8f70d7729381e 100644 (file)
@@ -43,6 +43,7 @@
 #include "packet-queue.h"
 #include "threads.h"
 #include "threadvars.h"
+#include "flow-worker.h"
 
 #include "util-atomic.h"
 #include "util-spm.h"
@@ -856,6 +857,8 @@ void RegisterAllModules()
     TmModuleNapatechStreamRegister();
     TmModuleNapatechDecodeRegister();
 
+    /* flow worker */
+    TmModuleFlowWorkerRegister();
     /* stream engine */
     TmModuleStreamTcpRegister();
     /* detection */
index 06190b6549058d85d12fa3f476d73d0a623c6128..b798d1d4d8364b0c6103541fa6ca234ba6d88a56 100644 (file)
@@ -199,6 +199,7 @@ void TmModuleRegisterTests(void)
 const char * TmModuleTmmIdToString(TmmId id)
 {
     switch (id) {
+        CASE_CODE (TMM_FLOWWORKER);
         CASE_CODE (TMM_RECEIVENFLOG);
         CASE_CODE (TMM_DECODENFLOG);
         CASE_CODE (TMM_DECODENFQ);
index 6a66b41795f9bf561fd3c8640a2d1454ed25a802..dc70af9bc44e94484c72eb366c3bdf745ef369fa 100644 (file)
@@ -31,6 +31,7 @@
  *        in tm-modules.c
  */
 typedef enum {
+    TMM_FLOWWORKER,
     TMM_DECODENFQ,
     TMM_VERDICTNFQ,
     TMM_RECEIVENFQ,
index 89db9812b4215b8dd3f67e4c77b0ff4b0d35941d..506430d3f81790b052eb5a5a685e5c1d8e1a2d7d 100644 (file)
@@ -192,14 +192,18 @@ static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
     int r = TM_ECODE_OK;
 
     for (slot = s; slot != NULL; slot = slot->slot_next) {
-        if (slot->tm_id == TMM_STREAMTCP) {
+        if (slot->tm_id == TMM_FLOWWORKER ||
+            slot->tm_id == TMM_STREAMTCP)
+        {
             stream_slot = slot;
             break;
         }
     }
 
-    if (tv->stream_pq == NULL || stream_slot == NULL)
+    if (tv->stream_pq == NULL || stream_slot == NULL) {
+        SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot);
         return r;
+    }
 
     SCLogDebug("flow end loop starting");
     while(run) {
@@ -314,11 +318,11 @@ void *TmThreadsSlotPktAcqLoop(void *td)
         SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
 
         /* get the 'pre qeueue' from module before the stream module */
-        if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) {
+        if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
             SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
             tv->stream_pq = &slot->slot_post_pq;
         /* if the stream module is the first, get the threads input queue */
-        } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) {
+        } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
             SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
         }
@@ -438,11 +442,11 @@ void *TmThreadsSlotPktAcqLoopAFL(void *td)
         SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
 
         /* get the 'pre qeueue' from module before the stream module */
-        if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) {
+        if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
             SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
             tv->stream_pq = &slot->slot_post_pq;
         /* if the stream module is the first, get the threads input queue */
-        } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) {
+        } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
             SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
         }
@@ -559,11 +563,11 @@ void *TmThreadsSlotVar(void *td)
          * from the flow timeout code */
 
         /* get the 'pre qeueue' from module before the stream module */
-        if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) {
+        if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
             SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
             tv->stream_pq = &s->slot_pre_pq;
         /* if the stream module is the first, get the threads input queue */
-        } else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) {
+        } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
             SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
         }
index 8242c81724615d2c3d215f8ce5e1fabc4079b314..375295b1044b6ac794db09720cc4e9299a178e0a 100644 (file)
@@ -246,22 +246,13 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser,
             SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed");
             exit(EXIT_FAILURE);
         }
-        TmModule *tm_module = TmModuleGetByName("StreamTcp");
+        TmModule *tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
-        }
-
         TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
 
         TmThreadSetGroupName(tv_detect_ncpu, "Detect");
@@ -347,22 +338,13 @@ static int RunModeSetLiveCaptureWorkersForDevice(ConfigIfaceThreadsCountFunc Mod
         }
         TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-        tm_module = TmModuleGetByName("StreamTcp");
+        tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv, tm_module, NULL);
-        }
-
         tm_module = TmModuleGetByName("RespondReject");
         if (tm_module == NULL) {
             SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName RespondReject failed");
@@ -541,22 +523,14 @@ int RunModeSetIPSAutoFp(ConfigIPSParserFunc ConfigParser,
             SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed");
             exit(EXIT_FAILURE);
         }
-        TmModule *tm_module = TmModuleGetByName("StreamTcp");
+
+        TmModule *tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
-        }
-
         TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
 
         SetupOutputs(tv_detect_ncpu);
@@ -656,22 +630,13 @@ int RunModeSetIPSWorker(ConfigIPSParserFunc ConfigParser,
         }
         TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-        tm_module = TmModuleGetByName("StreamTcp");
+        TmModule *tm_module = TmModuleGetByName("FlowWorker");
         if (tm_module == NULL) {
-            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+            SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
             exit(EXIT_FAILURE);
         }
         TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-        if (DetectEngineEnabled()) {
-            tm_module = TmModuleGetByName("Detect");
-            if (tm_module == NULL) {
-                SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
-                exit(EXIT_FAILURE);
-            }
-            TmSlotSetFuncAppend(tv, tm_module, NULL);
-        }
-
         tm_module = TmModuleGetByName(verdict_mod_name);
         if (tm_module == NULL) {
             SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName %s failed", verdict_mod_name);
index b7647a7d4da85467c498f29697c19f0437c32776..c0dac20b78be49c3faee614af57d6a017389936a 100644 (file)
  */
 #define DEBUG_VALIDATE_FLOW(f) do {                 \
     if ((f) != NULL) {                              \
-        SCMutexLock(&(f)->m);                       \
         BUG_ON((f)->flags & FLOW_IPV4 &&            \
                (f)->flags & FLOW_IPV6);             \
         if ((f)->proto == IPPROTO_TCP) {            \
             BUG_ON((f)->alstate != NULL &&          \
                    (f)->alparser == NULL);          \
         }                                           \
-        SCMutexUnlock(&(f)->m);                     \
     }                                               \
 } while(0)