From 081b0e05a23cf7eeb41b85c0e60dd098d6e99c3c Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Wed, 28 Mar 2012 20:26:05 +0530 Subject: [PATCH] restructure disabling receive threads. Introduce new flag to indicate that threads have finised running --- src/counters.c | 4 ++++ src/cuda-packet-batcher.c | 3 ++- src/flow-manager.c | 1 + src/threadvars.h | 2 ++ src/tm-threads.c | 40 +++++++++++++++++++++++++++++++-------- src/util-mpm-b2g-cuda.c | 3 ++- 6 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/counters.c b/src/counters.c index 75e037aaec..eae1bdfe29 100644 --- a/src/counters.c +++ b/src/counters.c @@ -448,6 +448,7 @@ static void *SCPerfMgmtThread(void *arg) if (sc_perf_op_ctx == NULL) { SCLogError(SC_ERR_PERF_STATS_NOT_INIT, "Perf Counter API not init" "SCPerfInitCounterApi() has to be called first"); + TmThreadsSetFlag(tv_local, THV_CLOSED | THV_RUNNING_DONE); return NULL; } @@ -469,6 +470,7 @@ static void *SCPerfMgmtThread(void *arg) } } + TmThreadsSetFlag(tv_local, THV_RUNNING_DONE); TmThreadWaitForFlag(tv_local, THV_DEINIT); TmThreadsSetFlag(tv_local, THV_CLOSED); @@ -502,6 +504,7 @@ static void *SCPerfWakeupThread(void *arg) if (sc_perf_op_ctx == NULL) { SCLogError(SC_ERR_PERF_STATS_NOT_INIT, "Perf Counter API not init" "SCPerfInitCounterApi() has to be called first"); + TmThreadsSetFlag(tv_local, THV_CLOSED | THV_RUNNING_DONE); return NULL; } @@ -555,6 +558,7 @@ static void *SCPerfWakeupThread(void *arg) } } + TmThreadsSetFlag(tv_local, THV_RUNNING_DONE); TmThreadWaitForFlag(tv_local, THV_DEINIT); TmThreadsSetFlag(tv_local, THV_CLOSED); diff --git a/src/cuda-packet-batcher.c b/src/cuda-packet-batcher.c index d850c3bd26..cf1a3cff69 100644 --- a/src/cuda-packet-batcher.c +++ b/src/cuda-packet-batcher.c @@ -327,7 +327,7 @@ void *SCCudaPBTmThreadsSlot1(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -372,6 +372,7 @@ void *SCCudaPBTmThreadsSlot1(void *td) } } + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { diff --git a/src/flow-manager.c b/src/flow-manager.c index da9ac57cf5..0e3dbe2d50 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -532,6 +532,7 @@ void *FlowManagerThread(void *td) SCPerfSyncCountersIfSignalled(th_v, 0); } + TmThreadsSetFlag(th_v, THV_RUNNING_DONE); TmThreadWaitForFlag(th_v, THV_DEINIT); FlowHashDebugDeinit(); diff --git a/src/threadvars.h b/src/threadvars.h index c8f760e558..b119204fc7 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -42,6 +42,8 @@ struct TmSlot_; /* used to indicate the thread is going through de-init. Introduced as more * of a hack for solving stream-timeout-shutdown. Is set by the main thread. */ #define THV_DEINIT 0x40 +#define THV_RUNNING_DONE 0x80 /** thread has completed running and is entering + * the de-init phase */ /** Thread flags set and read by threads, to control the threads, when they * encounter certain conditions like failure */ diff --git a/src/tm-threads.c b/src/tm-threads.c index ba7a7318ca..1f493bb7fb 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -140,7 +140,7 @@ void *TmThreadsSlot1NoIn(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -190,6 +190,7 @@ void *TmThreadsSlot1NoIn(void *td) } } /* while (run) */ + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { @@ -230,7 +231,7 @@ void *TmThreadsSlot1NoOut(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -262,6 +263,7 @@ void *TmThreadsSlot1NoOut(void *td) } } /* while (run) */ + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { @@ -303,7 +305,7 @@ void *TmThreadsSlot1NoInOut(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -329,6 +331,7 @@ void *TmThreadsSlot1NoInOut(void *td) } } /* while (run) */ + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { @@ -371,7 +374,7 @@ void *TmThreadsSlot1(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -435,6 +438,7 @@ void *TmThreadsSlot1(void *td) } } /* while (run) */ + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { @@ -569,7 +573,7 @@ void *TmThreadsSlotPktAcqLoop(void *td) { s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } @@ -579,7 +583,7 @@ void *TmThreadsSlotPktAcqLoop(void *td) { if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -602,6 +606,7 @@ void *TmThreadsSlotPktAcqLoop(void *td) { } SCPerfSyncCounters(tv, 0); + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); for (slot = s; slot != NULL; slot = slot->slot_next) { @@ -649,7 +654,7 @@ void *TmThreadsSlotVar(void *td) if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } @@ -659,7 +664,7 @@ void *TmThreadsSlotVar(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -729,6 +734,7 @@ void *TmThreadsSlotVar(void *td) } /* while (run) */ SCPerfSyncCounters(tv, 0); + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); s = (TmSlot *)tv->tm_slots; @@ -1447,6 +1453,13 @@ void TmThreadKillThread(ThreadVars *tv) */ void TmThreadDisableReceiveThreads(void) { + /* value in seconds */ +#define THREAD_KILL_MAX_WAIT_TIME 60 + /* value in microseconds */ +#define WAIT_TIME 100 + + double total_wait_time = 0; + ThreadVars *tv = NULL; SCMutexLock(&tv_root_lock); @@ -1472,6 +1485,17 @@ void TmThreadDisableReceiveThreads(void) * we need to do to kill receive threads */ TmThreadsSetFlag(tv, THV_KILL); + while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + usleep(WAIT_TIME); + total_wait_time += WAIT_TIME / 1000000.0; + if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { + SCLogError(SC_ERR_FATAL, "Engine unable to " + "disable receive thread - \"%s\". " + "Killing engine", tv->name); + exit(EXIT_FAILURE); + } + } + tv = tv->next; } diff --git a/src/util-mpm-b2g-cuda.c b/src/util-mpm-b2g-cuda.c index b3a01bfd85..3ef10d27fc 100644 --- a/src/util-mpm-b2g-cuda.c +++ b/src/util-mpm-b2g-cuda.c @@ -2238,7 +2238,7 @@ void *CudaMpmB2gThreadsSlot1(void *td) if (r != TM_ECODE_OK) { EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); pthread_exit((void *) -1); } } @@ -2287,6 +2287,7 @@ void *CudaMpmB2gThreadsSlot1(void *td) } } + TmThreadsSetFlag(tv, THV_RUNNING_DONE); TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { -- 2.47.2