From: Victor Julien Date: Thu, 11 Nov 2021 07:34:43 +0000 (+0100) Subject: flow/recycler: bring back pthread_cond_t sleep X-Git-Tag: suricata-7.0.0-beta1~444 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f271fb457522d77a1befeb1d097c125afcbdeeb9;p=thirdparty%2Fsuricata.git flow/recycler: bring back pthread_cond_t sleep Bug #4379. --- diff --git a/src/flow-manager.c b/src/flow-manager.c index fb10375e41..171feb8d2c 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -93,6 +93,8 @@ SC_ATOMIC_EXTERN(unsigned int, flow_flags); SCCtrlCondT flow_manager_ctrl_cond; SCCtrlMutex flow_manager_ctrl_mutex; +SCCtrlCondT flow_recycler_ctrl_cond; +SCCtrlMutex flow_recycler_ctrl_mutex; void FlowTimeoutsInit(void) { @@ -300,11 +302,13 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount FlowQueuePrivateAppendFlow(&recycle, f); if (recycle.len == 100) { FlowQueueAppendPrivate(&flow_recycle_q, &recycle); + FlowWakeupFlowRecyclerThread(); } cnt++; } if (recycle.len) { FlowQueueAppendPrivate(&flow_recycle_q, &recycle); + FlowWakeupFlowRecyclerThread(); } return cnt; } @@ -586,9 +590,11 @@ static uint32_t FlowCleanupHash(void) FBLOCK_UNLOCK(fb); if (local_queue.len >= 25) { FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); + FlowWakeupFlowRecyclerThread(); } } FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); + FlowWakeupFlowRecyclerThread(); return cnt; } @@ -1052,6 +1058,7 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) { FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; BUG_ON(ftd == NULL); + const bool time_is_live = TimeModeIsLive(); uint64_t recycled_cnt = 0; struct timeval ts; memset(&ts, 0, sizeof(ts)); @@ -1084,7 +1091,30 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) break; } - usleep(250); + const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY); + if (emerg || !time_is_live) { + usleep(250); + } else { + struct timeval cond_tv; + gettimeofday(&cond_tv, NULL); + cond_tv.tv_sec += 1; + struct timespec cond_time = FROM_TIMEVAL(cond_tv); + SCCtrlMutexLock(&flow_recycler_ctrl_mutex); + while (1) { + int rc = SCCtrlCondTimedwait( + &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time); + if (rc == ETIMEDOUT || rc < 0) { + break; + } + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { + break; + } + if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) { + break; + } + } + SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex); + } SCLogDebug("woke up..."); @@ -1120,6 +1150,9 @@ void FlowRecyclerThreadSpawn() } flowrec_number = (uint32_t)setting; + SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); + SCLogConfig("using %u flow recycler threads", flowrec_number); for (uint32_t u = 0; u < flowrec_number; u++) { @@ -1161,6 +1194,7 @@ void FlowDisableFlowRecyclerThread(void) /* make sure all flows are processed */ do { + FlowWakeupFlowRecyclerThread(); usleep(10); } while (FlowRecyclerReadyToShutdown() == false); @@ -1194,6 +1228,7 @@ again: { if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { SCMutexUnlock(&tv_root_lock); + FlowWakeupFlowRecyclerThread(); /* sleep outside lock */ SleepMsec(1); goto again; diff --git a/src/flow-manager.h b/src/flow-manager.h index 25ac22dc4e..157358d170 100644 --- a/src/flow-manager.h +++ b/src/flow-manager.h @@ -28,6 +28,9 @@ extern SCCtrlCondT flow_manager_ctrl_cond; extern SCCtrlMutex flow_manager_ctrl_mutex; #define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond) +extern SCCtrlCondT flow_recycler_ctrl_cond; +extern SCCtrlMutex flow_recycler_ctrl_mutex; +#define FlowWakeupFlowRecyclerThread() SCCtrlCondSignal(&flow_recycler_ctrl_cond) #define FlowTimeoutsReset() FlowTimeoutsInit() void FlowTimeoutsInit(void);