}
}
-static inline bool FlowIsTimedOut(const Flow *f, const SCTime_t ts, const bool emerg)
+static inline bool FlowIsTimedOut(
+ const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg)
{
SCTime_t timesout_at;
if (emerg) {
} else {
timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
}
- /* do the timeout check */
- if (SCTIME_CMP_LT(ts, timesout_at)) {
- return false;
+ /* if time is live, we just use the pktts */
+ if (TimeModeIsLive() || ftid == f->thread_id[0] || f->thread_id[0] == 0) {
+ if (SCTIME_CMP_LT(pktts, timesout_at)) {
+ return false;
+ }
+ } else {
+ SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
+ /* do the timeout check */
+ if (SCTIME_CMP_LT(checkts, timesout_at)) {
+ return false;
+ }
}
return true;
}
+static inline uint16_t GetTvId(const ThreadVars *tv)
+{
+ uint16_t tv_id;
+#ifdef UNITTESTS
+ if (RunmodeIsUnittests()) {
+ tv_id = 0;
+ } else {
+ tv_id = (uint16_t)tv->id;
+ }
+#else
+ tv_id = (uint16_t)tv->id;
+#endif
+ return tv_id;
+}
+
/** \brief Get Flow for packet
*
* Hash retrieval function for flows. Looks up the hash bucket containing the
return f;
}
+ const uint16_t tv_id = GetTvId(tv);
const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0;
const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0;
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
do {
Flow *next_f = NULL;
FLOWLOCK_WRLOCK(f);
- const bool timedout =
- (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, p->ts, emerg));
+ const bool timedout = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) &&
+ FlowIsTimedOut(tv_id, f, p->ts, emerg));
if (timedout) {
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);