]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threading: fix shutdown race condition
authorVictor Julien <victor@inliniac.net>
Fri, 8 Nov 2019 11:09:24 +0000 (12:09 +0100)
committerVictor Julien <victor@inliniac.net>
Mon, 25 Nov 2019 18:46:08 +0000 (19:46 +0100)
A BUG_ON statement would seemingly randomly trigger during the threading
shutdown logic. After a packet thread reached the THV_RUNNING_DONE state,
it would sometimes still receive flow timeout packets which would then
remain unprocessed.

1 main:   TmThreadDisableReceiveThreads(); <- stop capturing packets
2 worker: -> TmThreadTimeoutLoop (THV_FLOW_LOOP) phase starts
3 main:   FlowForceReassembly();           <- inject packets from flow engine
4 main:   TmThreadDisablePacketThreads();  <- then disable packet threads
5 main:   -> checks if 'worker' is ready processing packets
6 main:   -> sends THV_KILL to worker
7 worker: breaks out of TmThreadTimeoutLoop and changes to THV_RUNNING_DONE.

Part of the problem was with (5) above. When checking if the worker was
already done with its work, TmThreadDisablePacketThreads would not consider
the injected flow timeout packets. The second part of the problem was with (7),
where the worker checked if it was ready with the TmThreadTimeoutLoop in a
thread unsafe way.

As a result TmThreadDisablePacketThreads would not wait long enough for the
worker(s) to finish its work and move the threads to the THV_RUNNING_DONE
phase by issuing the THV_KILL command.

When waiting for packet processing threads to process all in-flight packets,
also consider the 'stream_pq'. This will have received the flow timeout
packets.

Bug #1871.

src/tm-threads.c

index bd1a856988d06b49731fc716dbcd57e91bc92909..08bce9f6ad6f7f8f230512f2d5f929298e226aec 100644 (file)
@@ -1422,6 +1422,36 @@ void TmThreadRemove(ThreadVars *tv, int type)
     return;
 }
 
+static bool ThreadStillHasPackets(ThreadVars *tv)
+{
+    if (tv->inq != NULL) {
+        /* we wait till we dry out all the inq packets, before we
+         * kill this thread.  Do note that you should have disabled
+         * packet acquire by now using TmThreadDisableReceiveThreads()*/
+        if (!(strlen(tv->inq->name) == strlen("packetpool") &&
+              strcasecmp(tv->inq->name, "packetpool") == 0)) {
+            PacketQueue *q = &trans_q[tv->inq->id];
+            SCMutexLock(&q->mutex_q);
+            uint32_t len = q->len;
+            SCMutexUnlock(&q->mutex_q);
+            if (len != 0) {
+                return true;
+            }
+        }
+    }
+
+    if (tv->stream_pq != NULL) {
+        SCMutexLock(&tv->stream_pq->mutex_q);
+        uint32_t len = tv->stream_pq->len;
+        SCMutexUnlock(&tv->stream_pq->mutex_q);
+
+        if (len != 0) {
+            return true;
+        }
+    }
+    return false;
+}
+
 /**
  * \brief Kill a thread.
  *
@@ -1442,19 +1472,6 @@ static int TmThreadKillThread(ThreadVars *tv)
         return 1;
     }
 
-    if (tv->inq != NULL) {
-        /* we wait till we dry out all the inq packets, before we
-         * kill this thread.  Do note that you should have disabled
-         * packet acquire by now using TmThreadDisableReceiveThreads()*/
-        if (!(strlen(tv->inq->name) == strlen("packetpool") &&
-              strcasecmp(tv->inq->name, "packetpool") == 0)) {
-            PacketQueue *q = &trans_q[tv->inq->id];
-            if (q->len != 0) {
-                return 0;
-            }
-        }
-    }
-
     /* set the thread flag informing the thread that it needs to be
      * terminated */
     TmThreadsSetFlag(tv, THV_KILL);
@@ -1499,7 +1516,7 @@ static int TmThreadKillThread(ThreadVars *tv)
 /** \internal
  *
  *  \brief make sure that all packet threads are done processing their
- *         in-flight packets
+ *         in-flight packets, including 'injected' flow packets.
  */
 static void TmThreadDrainPacketThreads(void)
 {
@@ -1521,23 +1538,16 @@ again:
     /* all receive threads are part of packet processing threads */
     tv = tv_root[TVT_PPT];
     while (tv) {
-        if (tv->inq != NULL) {
+        if (ThreadStillHasPackets(tv)) {
             /* we wait till we dry out all the inq packets, before we
              * kill this thread.  Do note that you should have disabled
              * packet acquire by now using TmThreadDisableReceiveThreads()*/
-            if (!(strlen(tv->inq->name) == strlen("packetpool") &&
-                        strcasecmp(tv->inq->name, "packetpool") == 0)) {
-                PacketQueue *q = &trans_q[tv->inq->id];
-                if (q->len != 0) {
-                    SCMutexUnlock(&tv_root_lock);
+            SCMutexUnlock(&tv_root_lock);
 
-                    /* sleep outside lock */
-                    SleepMsec(1);
-                    goto again;
-                }
-            }
+            /* sleep outside lock */
+            SleepMsec(1);
+            goto again;
         }
-
         tv = tv->next;
     }
 
@@ -1593,20 +1603,14 @@ again:
         }
 
         if (disable) {
-            if (tv->inq != NULL) {
+            if (ThreadStillHasPackets(tv)) {
                 /* we wait till we dry out all the inq packets, before we
                  * kill this thread.  Do note that you should have disabled
                  * packet acquire by now using TmThreadDisableReceiveThreads()*/
-                if (!(strlen(tv->inq->name) == strlen("packetpool") &&
-                      strcasecmp(tv->inq->name, "packetpool") == 0)) {
-                    PacketQueue *q = &trans_q[tv->inq->id];
-                    if (q->len != 0) {
-                        SCMutexUnlock(&tv_root_lock);
-                        /* don't sleep while holding a lock */
-                        SleepMsec(1);
-                        goto again;
-                    }
-                }
+                SCMutexUnlock(&tv_root_lock);
+                /* don't sleep while holding a lock */
+                SleepMsec(1);
+                goto again;
             }
 
             /* we found a receive TV. Send it a KILL_PKTACQ signal. */
@@ -1676,22 +1680,6 @@ again:
      * receive TM amongst the slots in a tv, it indicates we are done
      * with all receive threads */
     while (tv) {
-        if (tv->inq != NULL) {
-            /* we wait till we dry out all the inq packets, before we
-             * kill this thread.  Do note that you should have disabled
-             * packet acquire by now using TmThreadDisableReceiveThreads()*/
-            if (!(strlen(tv->inq->name) == strlen("packetpool") &&
-                        strcasecmp(tv->inq->name, "packetpool") == 0)) {
-                PacketQueue *q = &trans_q[tv->inq->id];
-                if (q->len != 0) {
-                    SCMutexUnlock(&tv_root_lock);
-                    /* don't sleep while holding a lock */
-                    SleepMsec(1);
-                    goto again;
-                }
-            }
-        }
-
         /* we found our receive TV.  Send it a KILL signal.  This is all
          * we need to do to kill receive threads */
         TmThreadsSetFlag(tv, THV_KILL);