From: Victor Julien Date: Fri, 8 Nov 2019 11:09:24 +0000 (+0100) Subject: threading: fix shutdown race condition X-Git-Tag: suricata-5.0.1~91 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fe9aeed0f0e65da7e7f61b5722580efc86f6355a;p=thirdparty%2Fsuricata.git threading: fix shutdown race condition A BUG_ON statement would seemingly randomly trigger during the threading shutdown logic. After a packet thread reached the THV_RUNNING_DONE state, it would sometimes still receive flow timeout packets which would then remain unprocessed. 1 main: TmThreadDisableReceiveThreads(); <- stop capturing packets 2 worker: -> TmThreadTimeoutLoop (THV_FLOW_LOOP) phase starts 3 main: FlowForceReassembly(); <- inject packets from flow engine 4 main: TmThreadDisablePacketThreads(); <- then disable packet threads 5 main: -> checks if 'worker' is ready processing packets 6 main: -> sends THV_KILL to worker 7 worker: breaks out of TmThreadTimeoutLoop and changes to THV_RUNNING_DONE. Part of the problem was with (5) above. When checking if the worker was already done with its work, TmThreadDisablePacketThreads would not consider the injected flow timeout packets. The second part of the problem was with (7), where the worker checked if it was ready with the TmThreadTimeoutLoop in a thread unsafe way. As a result TmThreadDisablePacketThreads would not wait long enough for the worker(s) to finish its work and move the threads to the THV_RUNNING_DONE phase by issuing the THV_KILL command. When waiting for packet processing threads to process all in-flight packets, also consider the 'stream_pq'. This will have received the flow timeout packets. Bug #1871. --- diff --git a/src/tm-threads.c b/src/tm-threads.c index bd1a856988..08bce9f6ad 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1422,6 +1422,36 @@ void TmThreadRemove(ThreadVars *tv, int type) return; } +static bool ThreadStillHasPackets(ThreadVars *tv) +{ + if (tv->inq != NULL) { + /* we wait till we dry out all the inq packets, before we + * kill this thread. Do note that you should have disabled + * packet acquire by now using TmThreadDisableReceiveThreads()*/ + if (!(strlen(tv->inq->name) == strlen("packetpool") && + strcasecmp(tv->inq->name, "packetpool") == 0)) { + PacketQueue *q = &trans_q[tv->inq->id]; + SCMutexLock(&q->mutex_q); + uint32_t len = q->len; + SCMutexUnlock(&q->mutex_q); + if (len != 0) { + return true; + } + } + } + + if (tv->stream_pq != NULL) { + SCMutexLock(&tv->stream_pq->mutex_q); + uint32_t len = tv->stream_pq->len; + SCMutexUnlock(&tv->stream_pq->mutex_q); + + if (len != 0) { + return true; + } + } + return false; +} + /** * \brief Kill a thread. * @@ -1442,19 +1472,6 @@ static int TmThreadKillThread(ThreadVars *tv) return 1; } - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - return 0; - } - } - } - /* set the thread flag informing the thread that it needs to be * terminated */ TmThreadsSetFlag(tv, THV_KILL); @@ -1499,7 +1516,7 @@ static int TmThreadKillThread(ThreadVars *tv) /** \internal * * \brief make sure that all packet threads are done processing their - * in-flight packets + * in-flight packets, including 'injected' flow packets. */ static void TmThreadDrainPacketThreads(void) { @@ -1521,23 +1538,16 @@ again: /* all receive threads are part of packet processing threads */ tv = tv_root[TVT_PPT]; while (tv) { - if (tv->inq != NULL) { + if (ThreadStillHasPackets(tv)) { /* we wait till we dry out all the inq packets, before we * kill this thread. Do note that you should have disabled * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); + SCMutexUnlock(&tv_root_lock); - /* sleep outside lock */ - SleepMsec(1); - goto again; - } - } + /* sleep outside lock */ + SleepMsec(1); + goto again; } - tv = tv->next; } @@ -1593,20 +1603,14 @@ again: } if (disable) { - if (tv->inq != NULL) { + if (ThreadStillHasPackets(tv)) { /* we wait till we dry out all the inq packets, before we * kill this thread. Do note that you should have disabled * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; - } - } + SCMutexUnlock(&tv_root_lock); + /* don't sleep while holding a lock */ + SleepMsec(1); + goto again; } /* we found a receive TV. Send it a KILL_PKTACQ signal. */ @@ -1676,22 +1680,6 @@ again: * receive TM amongst the slots in a tv, it indicates we are done * with all receive threads */ while (tv) { - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; - } - } - } - /* we found our receive TV. Send it a KILL signal. This is all * we need to do to kill receive threads */ TmThreadsSetFlag(tv, THV_KILL);