From: Victor Julien Date: Tue, 19 Apr 2016 16:06:32 +0000 (+0200) Subject: flowworker: initial support X-Git-Tag: suricata-3.1RC1~124 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=52d500c670a343a1503dc959c2b087979eb8346f;p=thirdparty%2Fsuricata.git flowworker: initial support Initial version of the 'FlowWorker' thread module. This module combines Flow handling, TCP handling, App layer handling and Detection in a single module. It does all flow related processing under a single flow lock. --- diff --git a/src/Makefile.am b/src/Makefile.am index b81bb7be56..b729388ee3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -222,6 +222,7 @@ flow-storage.c flow-storage.h \ flow-timeout.c flow-timeout.h \ flow-util.c flow-util.h \ flow-var.c flow-var.h \ +flow-worker.c flow-worker.h \ host.c host.h \ host-bit.c host-bit.h \ host-queue.c host-queue.h \ diff --git a/src/detect-engine.c b/src/detect-engine.c index 6a66a45d9f..34f1565202 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -28,6 +28,7 @@ #include "flow.h" #include "flow-private.h" #include "flow-util.h" +#include "flow-worker.h" #include "conf.h" #include "conf-yaml-loader.h" @@ -695,7 +696,7 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx) continue; } - old_det_ctx[i] = SC_ATOMIC_GET(slots->slot_data); + old_det_ctx[i] = FlowWorkerGetDetectCtxPtr(SC_ATOMIC_GET(slots->slot_data)); detect_tvs[i] = tv; new_det_ctx[i] = DetectEngineThreadCtxInitForReload(tv, new_de_ctx, 1); @@ -733,7 +734,7 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx) } SCLogDebug("swapping new det_ctx - %p with older one - %p", new_det_ctx[i], SC_ATOMIC_GET(slots->slot_data)); - (void)SC_ATOMIC_SET(slots->slot_data, new_det_ctx[i++]); + FlowWorkerReplaceDetectCtx(SC_ATOMIC_GET(slots->slot_data), new_det_ctx[i++]); break; } tv = tv->next; diff --git a/src/detect.c b/src/detect.c index 03dcad3ab2..860b330b3d 100644 --- a/src/detect.c +++ b/src/detect.c @@ -2052,9 +2052,7 @@ TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQue if (p->flow) { det_ctx->flow_locked = 1; - FLOWLOCK_WRLOCK(p->flow); DetectFlow(tv, de_ctx, det_ctx, p); - FLOWLOCK_UNLOCK(p->flow); det_ctx->flow_locked = 0; } else { DetectNoFlow(tv, de_ctx, det_ctx, p); diff --git a/src/flow-worker.c b/src/flow-worker.c new file mode 100644 index 0000000000..d6137944fa --- /dev/null +++ b/src/flow-worker.c @@ -0,0 +1,233 @@ +/* Copyright (C) 2016 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Victor Julien + * + * Flow Workers are single thread modules taking care of (almost) + * everything related to packets with flows: + * + * - Lookup/creation + * - Stream tracking, reassembly + * - Applayer update + * - Detection + * + * This all while holding the flow lock. + * + * TODO + * - once we have a single entry point into the outputs they + * will have to move into this as well. + * - once outputs are here we can also call StreamTcpPrune here + * instead of in the packet pool return code + */ + +#include "suricata-common.h" +#include "suricata.h" + +#include "decode.h" +#include "stream-tcp.h" +#include "app-layer.h" +#include "detect-engine.h" + +#include "util-validate.h" + +typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr; + +typedef struct FlowWorkerThreadData_ { + union { + StreamTcpThread *stream_thread; + void *stream_thread_ptr; + }; + + SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread); + +#if 0 + void *output_thread; // XXX multiple, not a single state +#endif + PacketQueue pq; + +} FlowWorkerThreadData; + +/** \brief handle flow for packet + * + * Handle flow creation/lookup + */ +static void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p) +{ + FlowHandlePacketUpdate(p->flow, p); + + /* handle the app layer part of the UDP packet payload */ + if (p->proto == IPPROTO_UDP) { + AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow); + } +} + +static TmEcode FlowWorkerThreadInit(ThreadVars *tv, void *initdata, void **data) +{ + FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw)); + BUG_ON(fw == NULL); + SC_ATOMIC_INIT(fw->detect_thread); + SC_ATOMIC_SET(fw->detect_thread, NULL); + + /* setup TCP */ + BUG_ON(StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK); + + if (DetectEngineEnabled()) { + /* setup DETECT */ + void *detect_thread = NULL; + BUG_ON(DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK); + SC_ATOMIC_SET(fw->detect_thread, detect_thread); + } +#if 0 + // setup OUTPUTS +#endif + + /* setup pq for stream end pkts */ + memset(&fw->pq, 0, sizeof(PacketQueue)); + SCMutexInit(&fw->pq.mutex_q, NULL); + + *data = fw; + return TM_ECODE_OK; +} + +static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data) +{ + FlowWorkerThreadData *fw = data; + + /* free TCP */ + StreamTcpThreadDeinit(tv, (void *)fw->stream_thread); + + /* free DETECT */ + void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); + if (detect_thread != NULL) + DetectEngineThreadCtxDeinit(tv, detect_thread); + SC_ATOMIC_SET(fw->detect_thread, NULL); +#if 0 + // free OUTPUT +#endif + + /* free pq */ + BUG_ON(fw->pq.len); + SCMutexDestroy(&fw->pq.mutex_q); + + SCFree(fw); + return TM_ECODE_OK; +} + +TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq); +TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); + +TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, PacketQueue *unused) +{ + FlowWorkerThreadData *fw = data; + void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); + + SCLogDebug("packet %"PRIu64, p->pcap_cnt); + + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) + TimeSetByThread(tv->id, &p->ts); + + /* handle Flow */ + if (p->flags & PKT_WANTS_FLOW) { + FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars + if (likely(p->flow != NULL)) { + DEBUG_ASSERT_FLOW_LOCKED(p->flow); + FlowUpdate(tv, fw->stream_thread, p); + } + /* Flow is now LOCKED */ + + /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a + * pseudo packet created by the flow manager. */ + } else if (p->flags & PKT_HAS_FLOW) { + FLOWLOCK_WRLOCK(p->flow); + } + + SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no"); + + /* handle TCP and app layer */ + if (PKT_IS_TCP(p)) { + SCLogDebug("packet %"PRIu64" is TCP", p->pcap_cnt); + DEBUG_ASSERT_FLOW_LOCKED(p->flow); + + StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL); + + /* Packets here can safely access p->flow as it's locked */ + SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len); + Packet *x; + while ((x = PacketDequeue(&fw->pq))) { + SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x); + + // TODO do we need to call StreamTcp on these pseudo packets or not? + //StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL); + if (detect_thread != NULL) + Detect(tv, x, detect_thread, NULL, NULL); +#if 0 + // Outputs +#endif + /* put these packets in the preq queue so that they are + * by the other thread modules before packet 'p'. */ + PacketEnqueue(preq, x); + } + } + + /* handle Detect */ + DEBUG_ASSERT_FLOW_LOCKED(p->flow); + SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt); + + if (detect_thread != NULL) { + Detect(tv, p, detect_thread, NULL, NULL); + } +#if 0 + // Outputs + + // StreamTcpPruneSession (from TmqhOutputPacketpool) +#endif + + if (p->flow) { + DEBUG_ASSERT_FLOW_LOCKED(p->flow); + FLOWLOCK_UNLOCK(p->flow); + } + + return TM_ECODE_OK; +} + +void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx) +{ + FlowWorkerThreadData *fw = flow_worker; + + SC_ATOMIC_SET(fw->detect_thread, detect_ctx); +} + +void *FlowWorkerGetDetectCtxPtr(void *flow_worker) +{ + FlowWorkerThreadData *fw = flow_worker; + + return SC_ATOMIC_GET(fw->detect_thread); +} + +void TmModuleFlowWorkerRegister (void) +{ + tmm_modules[TMM_FLOWWORKER].name = "FlowWorker"; + tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit; + tmm_modules[TMM_FLOWWORKER].Func = FlowWorker; + tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit; + tmm_modules[TMM_FLOWWORKER].cap_flags = 0; + tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM; +} diff --git a/src/flow-worker.h b/src/flow-worker.h new file mode 100644 index 0000000000..a3528e6286 --- /dev/null +++ b/src/flow-worker.h @@ -0,0 +1,26 @@ +/* Copyright (C) 2016 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +#ifndef __FLOW_WORKER_H__ +#define __FLOW_WORKER_H__ + +void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx); +void *FlowWorkerGetDetectCtxPtr(void *flow_worker); + +void TmModuleFlowWorkerRegister (void); + +#endif /* __FLOW_WORKER_H__ */ diff --git a/src/runmode-erf-file.c b/src/runmode-erf-file.c index e2ffde4b4f..5e0b097ab6 100644 --- a/src/runmode-erf-file.c +++ b/src/runmode-erf-file.c @@ -94,22 +94,13 @@ int RunModeErfFileSingle(void) } TmSlotSetFuncAppend(tv, tm_module, NULL); - tm_module = TmModuleGetByName("StreamTcp"); + tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName Detect failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv, tm_module, NULL); - } - SetupOutputs(tv); if (TmThreadSpawn(tv) != TM_ECODE_OK) { @@ -217,22 +208,14 @@ int RunModeErfFileAutoFp(void) printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); } - tm_module = TmModuleGetByName("StreamTcp"); + + tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName Detect failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - } - if (threading_set_cpu_affinity) { TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu); /* If we have more than one core/cpu, the first Detect thread diff --git a/src/runmode-pcap-file.c b/src/runmode-pcap-file.c index cf99fac6fc..c6fea2b2de 100644 --- a/src/runmode-pcap-file.c +++ b/src/runmode-pcap-file.c @@ -100,22 +100,13 @@ int RunModeFilePcapSingle(void) } TmSlotSetFuncAppend(tv, tm_module, NULL); - tm_module = TmModuleGetByName("StreamTcp"); + tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv, tm_module, NULL); - } - SetupOutputs(tv); TmThreadSetCPU(tv, DETECT_CPU_SET); @@ -249,22 +240,14 @@ int RunModeFilePcapAutoFp(void) SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed"); exit(EXIT_FAILURE); } - tm_module = TmModuleGetByName("StreamTcp"); + + tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - } - TmThreadSetGroupName(tv_detect_ncpu, "Detect"); /* add outputs as well */ diff --git a/src/runmode-tile.c b/src/runmode-tile.c index 913ee562f6..4e94e24dab 100644 --- a/src/runmode-tile.c +++ b/src/runmode-tile.c @@ -245,22 +245,13 @@ int RunModeTileMpipeWorkers(void) } TmSlotSetFuncAppend(tv_worker, tm_module, NULL); - tm_module = TmModuleGetByName("StreamTcp"); + tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv_worker, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName Detect failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_worker, tm_module, NULL); - } - tm_module = TmModuleGetByName("RespondReject"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName for RespondReject failed\n"); diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 941211dc4d..d1d1470bf3 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -4863,77 +4863,42 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn return 0; } -void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p) -{ - FlowHandlePacketUpdate(p->flow, p); - - /* handle the app layer part of the UDP packet payload */ - if (p->proto == IPPROTO_UDP) { - AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow); - } -} - TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { StreamTcpThread *stt = (StreamTcpThread *)data; - TmEcode ret = TM_ECODE_OK; SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt); - TimeSetByThread(tv->id, &p->ts); - - if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) { - FLOWLOCK_WRLOCK(p->flow); - AppLayerProfilingReset(stt->ra_ctx->app_tctx); - (void)StreamTcpPacket(tv, p, stt, pq); - p->flags |= PKT_IGNORE_CHECKSUM; - stt->pkts++; - FLOWLOCK_UNLOCK(p->flow); - return TM_ECODE_OK; - } - - if (!(p->flags & PKT_WANTS_FLOW)) { - return TM_ECODE_OK; - } - - FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars - if (likely(p->flow != NULL)) { - FlowUpdate(tv, stt, p); - } - if (!(PKT_IS_TCP(p))) { - goto unlock; + return TM_ECODE_OK; } if (p->flow == NULL) { StatsIncr(tv, stt->counter_tcp_no_flow); - goto unlock; + return TM_ECODE_OK; } /* only TCP packets with a flow from here */ - if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) { - if (StreamTcpValidateChecksum(p) == 0) { - StatsIncr(tv, stt->counter_tcp_invalid_checksum); - goto unlock; + if (!(p->flags & PKT_PSEUDO_STREAM_END)) { + if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) { + if (StreamTcpValidateChecksum(p) == 0) { + StatsIncr(tv, stt->counter_tcp_invalid_checksum); + return TM_ECODE_OK; + } + } else { + p->flags |= PKT_IGNORE_CHECKSUM; } } else { - p->flags |= PKT_IGNORE_CHECKSUM; + p->flags |= PKT_IGNORE_CHECKSUM; //TODO check that this is set at creation } AppLayerProfilingReset(stt->ra_ctx->app_tctx); - ret = StreamTcpPacket(tv, p, stt, pq); - - //if (ret) - // return TM_ECODE_FAILED; + (void)StreamTcpPacket(tv, p, stt, pq); stt->pkts++; - unlock: - if (p->flow) { - FLOWLOCK_UNLOCK(p->flow); - } - return ret; + return TM_ECODE_OK; } TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) diff --git a/src/suricata.c b/src/suricata.c index 37ba4aa90f..71a742c421 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -43,6 +43,7 @@ #include "packet-queue.h" #include "threads.h" #include "threadvars.h" +#include "flow-worker.h" #include "util-atomic.h" #include "util-spm.h" @@ -856,6 +857,8 @@ void RegisterAllModules() TmModuleNapatechStreamRegister(); TmModuleNapatechDecodeRegister(); + /* flow worker */ + TmModuleFlowWorkerRegister(); /* stream engine */ TmModuleStreamTcpRegister(); /* detection */ diff --git a/src/tm-modules.c b/src/tm-modules.c index 06190b6549..b798d1d4d8 100644 --- a/src/tm-modules.c +++ b/src/tm-modules.c @@ -199,6 +199,7 @@ void TmModuleRegisterTests(void) const char * TmModuleTmmIdToString(TmmId id) { switch (id) { + CASE_CODE (TMM_FLOWWORKER); CASE_CODE (TMM_RECEIVENFLOG); CASE_CODE (TMM_DECODENFLOG); CASE_CODE (TMM_DECODENFQ); diff --git a/src/tm-threads-common.h b/src/tm-threads-common.h index 6a66b41795..dc70af9bc4 100644 --- a/src/tm-threads-common.h +++ b/src/tm-threads-common.h @@ -31,6 +31,7 @@ * in tm-modules.c */ typedef enum { + TMM_FLOWWORKER, TMM_DECODENFQ, TMM_VERDICTNFQ, TMM_RECEIVENFQ, diff --git a/src/tm-threads.c b/src/tm-threads.c index 89db9812b4..506430d3f8 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -192,14 +192,18 @@ static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) int r = TM_ECODE_OK; for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->tm_id == TMM_STREAMTCP) { + if (slot->tm_id == TMM_FLOWWORKER || + slot->tm_id == TMM_STREAMTCP) + { stream_slot = slot; break; } } - if (tv->stream_pq == NULL || stream_slot == NULL) + if (tv->stream_pq == NULL || stream_slot == NULL) { + SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot); return r; + } SCLogDebug("flow end loop starting"); while(run) { @@ -314,11 +318,11 @@ void *TmThreadsSlotPktAcqLoop(void *td) SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); /* get the 'pre qeueue' from module before the stream module */ - if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) { + if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) { SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq); tv->stream_pq = &slot->slot_post_pq; /* if the stream module is the first, get the threads input queue */ - } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) { + } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq); } @@ -438,11 +442,11 @@ void *TmThreadsSlotPktAcqLoopAFL(void *td) SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); /* get the 'pre qeueue' from module before the stream module */ - if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) { + if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) { SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq); tv->stream_pq = &slot->slot_post_pq; /* if the stream module is the first, get the threads input queue */ - } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) { + } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq); } @@ -559,11 +563,11 @@ void *TmThreadsSlotVar(void *td) * from the flow timeout code */ /* get the 'pre qeueue' from module before the stream module */ - if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) { + if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) { SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq); tv->stream_pq = &s->slot_pre_pq; /* if the stream module is the first, get the threads input queue */ - } else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) { + } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq); } diff --git a/src/util-runmodes.c b/src/util-runmodes.c index 8242c81724..375295b104 100644 --- a/src/util-runmodes.c +++ b/src/util-runmodes.c @@ -246,22 +246,13 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser, SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed"); exit(EXIT_FAILURE); } - TmModule *tm_module = TmModuleGetByName("StreamTcp"); + TmModule *tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - } - TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); TmThreadSetGroupName(tv_detect_ncpu, "Detect"); @@ -347,22 +338,13 @@ static int RunModeSetLiveCaptureWorkersForDevice(ConfigIfaceThreadsCountFunc Mod } TmSlotSetFuncAppend(tv, tm_module, NULL); - tm_module = TmModuleGetByName("StreamTcp"); + tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv, tm_module, NULL); - } - tm_module = TmModuleGetByName("RespondReject"); if (tm_module == NULL) { SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName RespondReject failed"); @@ -541,22 +523,14 @@ int RunModeSetIPSAutoFp(ConfigIPSParserFunc ConfigParser, SCLogError(SC_ERR_RUNMODE, "TmThreadsCreate failed"); exit(EXIT_FAILURE); } - TmModule *tm_module = TmModuleGetByName("StreamTcp"); + + TmModule *tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); - } - TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET); SetupOutputs(tv_detect_ncpu); @@ -656,22 +630,13 @@ int RunModeSetIPSWorker(ConfigIPSParserFunc ConfigParser, } TmSlotSetFuncAppend(tv, tm_module, NULL); - tm_module = TmModuleGetByName("StreamTcp"); + TmModule *tm_module = TmModuleGetByName("FlowWorker"); if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName StreamTcp failed"); + SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName for FlowWorker failed"); exit(EXIT_FAILURE); } TmSlotSetFuncAppend(tv, tm_module, NULL); - if (DetectEngineEnabled()) { - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv, tm_module, NULL); - } - tm_module = TmModuleGetByName(verdict_mod_name); if (tm_module == NULL) { SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName %s failed", verdict_mod_name); diff --git a/src/util-validate.h b/src/util-validate.h index b7647a7d4d..c0dac20b78 100644 --- a/src/util-validate.h +++ b/src/util-validate.h @@ -53,14 +53,12 @@ */ #define DEBUG_VALIDATE_FLOW(f) do { \ if ((f) != NULL) { \ - SCMutexLock(&(f)->m); \ BUG_ON((f)->flags & FLOW_IPV4 && \ (f)->flags & FLOW_IPV6); \ if ((f)->proto == IPPROTO_TCP) { \ BUG_ON((f)->alstate != NULL && \ (f)->alparser == NULL); \ } \ - SCMutexUnlock(&(f)->m); \ } \ } while(0)