return 1;
}
-static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
- void *detect_thread, // TODO proper type?
- FlowTimeoutCounters *counters,
+static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
FlowQueuePrivate *fq)
{
Flow *f;
if (f->proto == IPPROTO_TCP) {
if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) && !FlowIsBypassed(f) &&
FlowForceReassemblyNeedReassembly(f) == 1 && f->ffr != 0) {
+ /* read detect thread in case we're doing a reload */
+ void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
int cnt = FlowFinish(tv, f, fw, detect_thread);
counters->flows_aside_pkt_inject += cnt;
counters->flows_aside_needs_work++;
/** \internal
* \brief process flows injected into our queue by other threads
*/
-static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
- FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
+static inline void FlowWorkerProcessInjectedFlows(
+ ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
{
/* take injected flows and append to our work queue */
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len);
FlowTimeoutCounters counters = { 0, 0, };
- CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
+ CheckWorkQueue(tv, fw, &counters, &injected);
UpdateCounters(tv, fw, &counters);
}
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
/** \internal
* \brief process flows set aside locally during flow lookup
*/
-static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
- FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
+static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
{
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
if (fw->fls.work_queue.len) {
StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
FlowTimeoutCounters counters = { 0, 0, };
- CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
+ CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue);
UpdateCounters(tv, fw, &counters);
}
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
housekeeping:
/* take injected flows and process them */
- FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
+ FlowWorkerProcessInjectedFlows(tv, fw, p);
/* process local work queue */
- FlowWorkerProcessLocalFlows(tv, fw, p, detect_thread);
+ FlowWorkerProcessLocalFlows(tv, fw, p);
return TM_ECODE_OK;
}