From: Victor Julien Date: Thu, 11 Nov 2021 07:34:43 +0000 (+0100) Subject: flow/recycler: bring back pthread_cond_t sleep X-Git-Tag: suricata-6.0.7~26 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0a6e2b9d1ae5cf9cde597a9b8b43818a16df50e8;p=thirdparty%2Fsuricata.git flow/recycler: bring back pthread_cond_t sleep Bug #4379. (cherry picked from commit f271fb457522d77a1befeb1d097c125afcbdeeb9) --- diff --git a/src/flow-manager.c b/src/flow-manager.c index d003e36076..71e15828db 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -90,6 +90,8 @@ SC_ATOMIC_EXTERN(unsigned int, flow_flags); SCCtrlCondT flow_manager_ctrl_cond; SCCtrlMutex flow_manager_ctrl_mutex; +SCCtrlCondT flow_recycler_ctrl_cond; +SCCtrlMutex flow_recycler_ctrl_mutex; void FlowTimeoutsInit(void) { @@ -292,11 +294,13 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount FlowQueuePrivateAppendFlow(&recycle, f); if (recycle.len == 100) { FlowQueueAppendPrivate(&flow_recycle_q, &recycle); + FlowWakeupFlowRecyclerThread(); } cnt++; } if (recycle.len) { FlowQueueAppendPrivate(&flow_recycle_q, &recycle); + FlowWakeupFlowRecyclerThread(); } return cnt; } @@ -566,9 +570,11 @@ static uint32_t FlowCleanupHash(void) FBLOCK_UNLOCK(fb); if (local_queue.len >= 25) { FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); + FlowWakeupFlowRecyclerThread(); } } FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); + FlowWakeupFlowRecyclerThread(); return cnt; } @@ -1093,11 +1099,11 @@ static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) */ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) { - struct timeval ts; - uint64_t recycled_cnt = 0; FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; BUG_ON(ftd == NULL); - + const bool time_is_live = TimeModeIsLive(); + uint64_t recycled_cnt = 0; + struct timeval ts; memset(&ts, 0, sizeof(ts)); uint32_t fr_passes = 0; @@ -1176,21 +1182,30 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) break; } -#ifdef FM_PROFILE - struct timeval sleep_startts; - memset(&sleep_startts, 0, sizeof(sleep_startts)); - gettimeofday(&sleep_startts, NULL); -#endif - usleep(250); -#ifdef FM_PROFILE - struct timeval sleep_endts; - memset(&sleep_endts, 0, sizeof(sleep_endts)); - gettimeofday(&sleep_endts, NULL); - struct timeval sleep_time; - memset(&sleep_time, 0, sizeof(sleep_time)); - timersub(&sleep_endts, &sleep_startts, &sleep_time); - timeradd(&sleeping, &sleep_time, &sleeping); -#endif + const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY); + if (emerg || !time_is_live) { + usleep(250); + } else { + struct timeval cond_tv; + gettimeofday(&cond_tv, NULL); + cond_tv.tv_sec += 1; + struct timespec cond_time = FROM_TIMEVAL(cond_tv); + SCCtrlMutexLock(&flow_recycler_ctrl_mutex); + while (1) { + int rc = SCCtrlCondTimedwait( + &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time); + if (rc == ETIMEDOUT || rc < 0) { + break; + } + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { + break; + } + if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) { + break; + } + } + SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex); + } SCLogDebug("woke up..."); @@ -1239,6 +1254,9 @@ void FlowRecyclerThreadSpawn() } flowrec_number = (uint32_t)setting; + SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); + SCLogConfig("using %u flow recycler threads", flowrec_number); for (uint32_t u = 0; u < flowrec_number; u++) { @@ -1280,6 +1298,7 @@ void FlowDisableFlowRecyclerThread(void) /* make sure all flows are processed */ do { + FlowWakeupFlowRecyclerThread(); usleep(10); } while (FlowRecyclerReadyToShutdown() == false); @@ -1313,6 +1332,7 @@ again: { if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { SCMutexUnlock(&tv_root_lock); + FlowWakeupFlowRecyclerThread(); /* sleep outside lock */ SleepMsec(1); goto again; diff --git a/src/flow-manager.h b/src/flow-manager.h index 25ac22dc4e..157358d170 100644 --- a/src/flow-manager.h +++ b/src/flow-manager.h @@ -28,6 +28,9 @@ extern SCCtrlCondT flow_manager_ctrl_cond; extern SCCtrlMutex flow_manager_ctrl_mutex; #define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond) +extern SCCtrlCondT flow_recycler_ctrl_cond; +extern SCCtrlMutex flow_recycler_ctrl_mutex; +#define FlowWakeupFlowRecyclerThread() SCCtrlCondSignal(&flow_recycler_ctrl_cond) #define FlowTimeoutsReset() FlowTimeoutsInit() void FlowTimeoutsInit(void);