flow-timeout.c flow-timeout.h \
flow-util.c flow-util.h \
flow-var.c flow-var.h \
+flow-worker.c flow-worker.h \
host.c host.h \
host-bit.c host-bit.h \
host-queue.c host-queue.h \
#include "flow.h"
#include "flow-private.h"
#include "flow-util.h"
+#include "flow-worker.h"
#include "conf.h"
#include "conf-yaml-loader.h"
continue;
}
- old_det_ctx[i] = SC_ATOMIC_GET(slots->slot_data);
+ old_det_ctx[i] = FlowWorkerGetDetectCtxPtr(SC_ATOMIC_GET(slots->slot_data));
detect_tvs[i] = tv;
new_det_ctx[i] = DetectEngineThreadCtxInitForReload(tv, new_de_ctx, 1);
}
SCLogDebug("swapping new det_ctx - %p with older one - %p",
new_det_ctx[i], SC_ATOMIC_GET(slots->slot_data));
- (void)SC_ATOMIC_SET(slots->slot_data, new_det_ctx[i++]);
+ FlowWorkerReplaceDetectCtx(SC_ATOMIC_GET(slots->slot_data), new_det_ctx[i++]);
break;
}
tv = tv->next;
if (p->flow) {
det_ctx->flow_locked = 1;
- FLOWLOCK_WRLOCK(p->flow);
DetectFlow(tv, de_ctx, det_ctx, p);
- FLOWLOCK_UNLOCK(p->flow);
det_ctx->flow_locked = 0;
} else {
DetectNoFlow(tv, de_ctx, det_ctx, p);
--- /dev/null
+/* Copyright (C) 2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ *
+ * Flow Workers are single thread modules taking care of (almost)
+ * everything related to packets with flows:
+ *
+ * - Lookup/creation
+ * - Stream tracking, reassembly
+ * - Applayer update
+ * - Detection
+ *
+ * This all while holding the flow lock.
+ *
+ * TODO
+ * - once we have a single entry point into the outputs they
+ * will have to move into this as well.
+ * - once outputs are here we can also call StreamTcpPrune here
+ * instead of in the packet pool return code
+ */
+
+#include "suricata-common.h"
+#include "suricata.h"
+
+#include "decode.h"
+#include "stream-tcp.h"
+#include "app-layer.h"
+#include "detect-engine.h"
+
+#include "util-validate.h"
+
+typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
+
+typedef struct FlowWorkerThreadData_ {
+ union {
+ StreamTcpThread *stream_thread;
+ void *stream_thread_ptr;
+ };
+
+ SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
+
+#if 0
+ void *output_thread; // XXX multiple, not a single state
+#endif
+ PacketQueue pq;
+
+} FlowWorkerThreadData;
+
+/** \brief handle flow for packet
+ *
+ * Handle flow creation/lookup
+ */
+static void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
+{
+ FlowHandlePacketUpdate(p->flow, p);
+
+ /* handle the app layer part of the UDP packet payload */
+ if (p->proto == IPPROTO_UDP) {
+ AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
+ }
+}
+
+static TmEcode FlowWorkerThreadInit(ThreadVars *tv, void *initdata, void **data)
+{
+ FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
+ BUG_ON(fw == NULL);
+ SC_ATOMIC_INIT(fw->detect_thread);
+ SC_ATOMIC_SET(fw->detect_thread, NULL);
+
+ /* setup TCP */
+ BUG_ON(StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK);
+
+ if (DetectEngineEnabled()) {
+ /* setup DETECT */
+ void *detect_thread = NULL;
+ BUG_ON(DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK);
+ SC_ATOMIC_SET(fw->detect_thread, detect_thread);
+ }
+#if 0
+ // setup OUTPUTS
+#endif
+
+ /* setup pq for stream end pkts */
+ memset(&fw->pq, 0, sizeof(PacketQueue));
+ SCMutexInit(&fw->pq.mutex_q, NULL);
+
+ *data = fw;
+ return TM_ECODE_OK;
+}
+
+static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
+{
+ FlowWorkerThreadData *fw = data;
+
+ /* free TCP */
+ StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
+
+ /* free DETECT */
+ void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
+ if (detect_thread != NULL)
+ DetectEngineThreadCtxDeinit(tv, detect_thread);
+ SC_ATOMIC_SET(fw->detect_thread, NULL);
+#if 0
+ // free OUTPUT
+#endif
+
+ /* free pq */
+ BUG_ON(fw->pq.len);
+ SCMutexDestroy(&fw->pq.mutex_q);
+
+ SCFree(fw);
+ return TM_ECODE_OK;
+}
+
+TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq);
+TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+
+TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, PacketQueue *unused)
+{
+ FlowWorkerThreadData *fw = data;
+ void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
+
+ SCLogDebug("packet %"PRIu64, p->pcap_cnt);
+
+ /* update time */
+ if (!(PKT_IS_PSEUDOPKT(p)))
+ TimeSetByThread(tv->id, &p->ts);
+
+ /* handle Flow */
+ if (p->flags & PKT_WANTS_FLOW) {
+ FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
+ if (likely(p->flow != NULL)) {
+ DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+ FlowUpdate(tv, fw->stream_thread, p);
+ }
+ /* Flow is now LOCKED */
+
+ /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
+ * pseudo packet created by the flow manager. */
+ } else if (p->flags & PKT_HAS_FLOW) {
+ FLOWLOCK_WRLOCK(p->flow);
+ }
+
+ SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
+
+ /* handle TCP and app layer */
+ if (PKT_IS_TCP(p)) {
+ SCLogDebug("packet %"PRIu64" is TCP", p->pcap_cnt);
+ DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+
+ StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL);
+
+ /* Packets here can safely access p->flow as it's locked */
+ SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
+ Packet *x;
+ while ((x = PacketDequeue(&fw->pq))) {
+ SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
+
+ // TODO do we need to call StreamTcp on these pseudo packets or not?
+ //StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL);
+ if (detect_thread != NULL)
+ Detect(tv, x, detect_thread, NULL, NULL);
+#if 0
+ // Outputs
+#endif
+ /* put these packets in the preq queue so that they are
+ * by the other thread modules before packet 'p'. */
+ PacketEnqueue(preq, x);
+ }
+ }
+
+ /* handle Detect */
+ DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+ SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
+
+ if (detect_thread != NULL) {
+ Detect(tv, p, detect_thread, NULL, NULL);
+ }
+#if 0
+ // Outputs
+
+ // StreamTcpPruneSession (from TmqhOutputPacketpool)
+#endif
+
+ if (p->flow) {
+ DEBUG_ASSERT_FLOW_LOCKED(p->flow);
+ FLOWLOCK_UNLOCK(p->flow);
+ }
+
+ return TM_ECODE_OK;
+}
+
+void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
+{
+ FlowWorkerThreadData *fw = flow_worker;
+
+ SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
+}
+
+void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
+{
+ FlowWorkerThreadData *fw = flow_worker;
+
+ return SC_ATOMIC_GET(fw->detect_thread);
+}
+
+void TmModuleFlowWorkerRegister (void)
+{
+ tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
+ tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
+ tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
+ tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
+ tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
+ tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
+}
--- /dev/null
+/* Copyright (C) 2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#ifndef __FLOW_WORKER_H__
+#define __FLOW_WORKER_H__
+
+void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
+void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
+
+void TmModuleFlowWorkerRegister (void);
+
+#endif /* __FLOW_WORKER_H__ */
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- tm_module = TmModuleGetByName("StreamTcp");
+ tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName Detect failed\n");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv, tm_module, NULL);
- }
-
SetupOutputs(tv);
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
- tm_module = TmModuleGetByName("StreamTcp");
+
+ tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName Detect failed\n");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- }
-
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
/* If we have more than one core/cpu, the first Detect thread
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- tm_module = TmModuleGetByName("StreamTcp");
+ tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv, tm_module, NULL);
- }
-
SetupOutputs(tv);
TmThreadSetCPU(tv, DETECT_CPU_SET);
SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed");
exit(EXIT_FAILURE);
}
- tm_module = TmModuleGetByName("StreamTcp");
+
+ tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- }
-
TmThreadSetGroupName(tv_detect_ncpu, "Detect");
/* add outputs as well */
}
TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
- tm_module = TmModuleGetByName("StreamTcp");
+ tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName Detect failed\n");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
- }
-
tm_module = TmModuleGetByName("RespondReject");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for RespondReject failed\n");
return 0;
}
-void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
-{
- FlowHandlePacketUpdate(p->flow, p);
-
- /* handle the app layer part of the UDP packet payload */
- if (p->proto == IPPROTO_UDP) {
- AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
- }
-}
-
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
StreamTcpThread *stt = (StreamTcpThread *)data;
- TmEcode ret = TM_ECODE_OK;
SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt);
- TimeSetByThread(tv->id, &p->ts);
-
- if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) {
- FLOWLOCK_WRLOCK(p->flow);
- AppLayerProfilingReset(stt->ra_ctx->app_tctx);
- (void)StreamTcpPacket(tv, p, stt, pq);
- p->flags |= PKT_IGNORE_CHECKSUM;
- stt->pkts++;
- FLOWLOCK_UNLOCK(p->flow);
- return TM_ECODE_OK;
- }
-
- if (!(p->flags & PKT_WANTS_FLOW)) {
- return TM_ECODE_OK;
- }
-
- FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
- if (likely(p->flow != NULL)) {
- FlowUpdate(tv, stt, p);
- }
-
if (!(PKT_IS_TCP(p))) {
- goto unlock;
+ return TM_ECODE_OK;
}
if (p->flow == NULL) {
StatsIncr(tv, stt->counter_tcp_no_flow);
- goto unlock;
+ return TM_ECODE_OK;
}
/* only TCP packets with a flow from here */
- if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
- if (StreamTcpValidateChecksum(p) == 0) {
- StatsIncr(tv, stt->counter_tcp_invalid_checksum);
- goto unlock;
+ if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
+ if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
+ if (StreamTcpValidateChecksum(p) == 0) {
+ StatsIncr(tv, stt->counter_tcp_invalid_checksum);
+ return TM_ECODE_OK;
+ }
+ } else {
+ p->flags |= PKT_IGNORE_CHECKSUM;
}
} else {
- p->flags |= PKT_IGNORE_CHECKSUM;
+ p->flags |= PKT_IGNORE_CHECKSUM; //TODO check that this is set at creation
}
AppLayerProfilingReset(stt->ra_ctx->app_tctx);
- ret = StreamTcpPacket(tv, p, stt, pq);
-
- //if (ret)
- // return TM_ECODE_FAILED;
+ (void)StreamTcpPacket(tv, p, stt, pq);
stt->pkts++;
- unlock:
- if (p->flow) {
- FLOWLOCK_UNLOCK(p->flow);
- }
- return ret;
+ return TM_ECODE_OK;
}
TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
#include "packet-queue.h"
#include "threads.h"
#include "threadvars.h"
+#include "flow-worker.h"
#include "util-atomic.h"
#include "util-spm.h"
TmModuleNapatechStreamRegister();
TmModuleNapatechDecodeRegister();
+ /* flow worker */
+ TmModuleFlowWorkerRegister();
/* stream engine */
TmModuleStreamTcpRegister();
/* detection */
const char * TmModuleTmmIdToString(TmmId id)
{
switch (id) {
+ CASE_CODE (TMM_FLOWWORKER);
CASE_CODE (TMM_RECEIVENFLOG);
CASE_CODE (TMM_DECODENFLOG);
CASE_CODE (TMM_DECODENFQ);
* in tm-modules.c
*/
typedef enum {
+ TMM_FLOWWORKER,
TMM_DECODENFQ,
TMM_VERDICTNFQ,
TMM_RECEIVENFQ,
int r = TM_ECODE_OK;
for (slot = s; slot != NULL; slot = slot->slot_next) {
- if (slot->tm_id == TMM_STREAMTCP) {
+ if (slot->tm_id == TMM_FLOWWORKER ||
+ slot->tm_id == TMM_STREAMTCP)
+ {
stream_slot = slot;
break;
}
}
- if (tv->stream_pq == NULL || stream_slot == NULL)
+ if (tv->stream_pq == NULL || stream_slot == NULL) {
+ SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot);
return r;
+ }
SCLogDebug("flow end loop starting");
while(run) {
SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
/* get the 'pre qeueue' from module before the stream module */
- if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) {
+ if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
tv->stream_pq = &slot->slot_post_pq;
/* if the stream module is the first, get the threads input queue */
- } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) {
+ } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id];
SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
}
SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
/* get the 'pre qeueue' from module before the stream module */
- if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) {
+ if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
tv->stream_pq = &slot->slot_post_pq;
/* if the stream module is the first, get the threads input queue */
- } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) {
+ } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id];
SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
}
* from the flow timeout code */
/* get the 'pre qeueue' from module before the stream module */
- if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) {
+ if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
tv->stream_pq = &s->slot_pre_pq;
/* if the stream module is the first, get the threads input queue */
- } else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) {
+ } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id];
SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
}
SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed");
exit(EXIT_FAILURE);
}
- TmModule *tm_module = TmModuleGetByName("StreamTcp");
+ TmModule *tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- }
-
TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
TmThreadSetGroupName(tv_detect_ncpu, "Detect");
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- tm_module = TmModuleGetByName("StreamTcp");
+ tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv, tm_module, NULL);
- }
-
tm_module = TmModuleGetByName("RespondReject");
if (tm_module == NULL) {
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName RespondReject failed");
SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed");
exit(EXIT_FAILURE);
}
- TmModule *tm_module = TmModuleGetByName("StreamTcp");
+
+ TmModule *tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- }
-
TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
SetupOutputs(tv_detect_ncpu);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- tm_module = TmModuleGetByName("StreamTcp");
+ TmModule *tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed");
+ SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- if (DetectEngineEnabled()) {
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv, tm_module, NULL);
- }
-
tm_module = TmModuleGetByName(verdict_mod_name);
if (tm_module == NULL) {
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName %s failed", verdict_mod_name);
*/
#define DEBUG_VALIDATE_FLOW(f) do { \
if ((f) != NULL) { \
- SCMutexLock(&(f)->m); \
BUG_ON((f)->flags & FLOW_IPV4 && \
(f)->flags & FLOW_IPV6); \
if ((f)->proto == IPPROTO_TCP) { \
BUG_ON((f)->alstate != NULL && \
(f)->alparser == NULL); \
} \
- SCMutexUnlock(&(f)->m); \
} \
} while(0)