return 1;
}
+/** \param[in] max_work Max flows to process. 0 if unlimited. */
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
void *detect_thread, // TODO proper type?
- FlowTimeoutCounters *counters,
- FlowQueuePrivate *fq)
+ FlowTimeoutCounters *counters, FlowQueuePrivate *fq, const uint32_t max_work)
{
+ uint32_t i = 0;
Flow *f;
while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
FLOWLOCK_WRLOCK(f);
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
+
if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
FlowSparePoolReturnFlow(f);
} else {
FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
}
+
+ if (max_work != 0 && ++i == max_work)
+ break;
}
}
if (p->pkt_src == PKT_SRC_WIRE)
StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len);
- FlowTimeoutCounters counters = { 0, 0, };
- CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
- UpdateCounters(tv, fw, &counters);
+ /* move to local queue so we can process over the course of multiple packets */
+ FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
}
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
}
static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
{
+ uint32_t max_work = 2;
+ if (PKT_IS_PSEUDOPKT(p))
+ max_work = 0;
+
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
if (fw->fls.work_queue.len) {
StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
FlowTimeoutCounters counters = { 0, 0, };
- CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
+ CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue, max_work);
UpdateCounters(tv, fw, &counters);
}
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
housekeeping:
- /* take injected flows and process them */
+ /* take injected flows and add them to our local queue */
FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
/* process local work queue */