SCCtrlCondT flow_manager_ctrl_cond;
SCCtrlMutex flow_manager_ctrl_mutex;
+SCCtrlCondT flow_recycler_ctrl_cond;
+SCCtrlMutex flow_recycler_ctrl_mutex;
void FlowTimeoutsInit(void)
{
FlowQueuePrivateAppendFlow(&recycle, f);
if (recycle.len == 100) {
FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
+ FlowWakeupFlowRecyclerThread();
}
cnt++;
}
if (recycle.len) {
FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
+ FlowWakeupFlowRecyclerThread();
}
return cnt;
}
FBLOCK_UNLOCK(fb);
if (local_queue.len >= 25) {
FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
+ FlowWakeupFlowRecyclerThread();
}
}
FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
+ FlowWakeupFlowRecyclerThread();
return cnt;
}
*/
static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
{
- struct timeval ts;
- uint64_t recycled_cnt = 0;
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
BUG_ON(ftd == NULL);
-
+ const bool time_is_live = TimeModeIsLive();
+ uint64_t recycled_cnt = 0;
+ struct timeval ts;
memset(&ts, 0, sizeof(ts));
uint32_t fr_passes = 0;
break;
}
-#ifdef FM_PROFILE
- struct timeval sleep_startts;
- memset(&sleep_startts, 0, sizeof(sleep_startts));
- gettimeofday(&sleep_startts, NULL);
-#endif
- usleep(250);
-#ifdef FM_PROFILE
- struct timeval sleep_endts;
- memset(&sleep_endts, 0, sizeof(sleep_endts));
- gettimeofday(&sleep_endts, NULL);
- struct timeval sleep_time;
- memset(&sleep_time, 0, sizeof(sleep_time));
- timersub(&sleep_endts, &sleep_startts, &sleep_time);
- timeradd(&sleeping, &sleep_time, &sleeping);
-#endif
+ const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
+ if (emerg || !time_is_live) {
+ usleep(250);
+ } else {
+ struct timeval cond_tv;
+ gettimeofday(&cond_tv, NULL);
+ cond_tv.tv_sec += 1;
+ struct timespec cond_time = FROM_TIMEVAL(cond_tv);
+ SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
+ while (1) {
+ int rc = SCCtrlCondTimedwait(
+ &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
+ if (rc == ETIMEDOUT || rc < 0) {
+ break;
+ }
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
+ break;
+ }
+ if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
+ break;
+ }
+ }
+ SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
+ }
SCLogDebug("woke up...");
}
flowrec_number = (uint32_t)setting;
+ SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
+ SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
+
SCLogConfig("using %u flow recycler threads", flowrec_number);
for (uint32_t u = 0; u < flowrec_number; u++) {
/* make sure all flows are processed */
do {
+ FlowWakeupFlowRecyclerThread();
usleep(10);
} while (FlowRecyclerReadyToShutdown() == false);
{
if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
SCMutexUnlock(&tv_root_lock);
+ FlowWakeupFlowRecyclerThread();
/* sleep outside lock */
SleepMsec(1);
goto again;