]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threads: improve flow timeout loop
authorVictor Julien <victor@inliniac.net>
Wed, 12 Jun 2019 07:56:26 +0000 (09:56 +0200)
committerVictor Julien <victor@inliniac.net>
Mon, 17 Jun 2019 18:06:50 +0000 (20:06 +0200)
Improve thread safety and remove BUG_ON

src/tm-threads.c

index ffca6b218aff5220181b9cae40a6336d0764640a..895d3ed7fa95a7f9097cf0f556df9ed37d6ffb61 100644 (file)
@@ -185,42 +185,44 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p,
  */
 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
 {
-    TmSlot *stream_slot = NULL, *slot = NULL;
-    int run = 1;
+    TmSlot *fw_slot = NULL;
     int r = TM_ECODE_OK;
 
-    for (slot = s; slot != NULL; slot = slot->slot_next) {
-        if (slot->tm_id == TMM_FLOWWORKER)
-        {
-            stream_slot = slot;
+    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
+        if (slot->tm_id == TMM_FLOWWORKER) {
+            fw_slot = slot;
             break;
         }
     }
 
-    if (tv->stream_pq == NULL || stream_slot == NULL) {
-        SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot);
+    if (tv->stream_pq == NULL || fw_slot == NULL) {
+        SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
         return r;
     }
 
     SCLogDebug("flow end loop starting");
-    while(run) {
-        Packet *p;
-        if (tv->stream_pq->len != 0) {
-            SCMutexLock(&tv->stream_pq->mutex_q);
-            p = PacketDequeue(tv->stream_pq);
-            SCMutexUnlock(&tv->stream_pq->mutex_q);
-            BUG_ON(p == NULL);
-
-            if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
-                if (r == TM_ECODE_FAILED)
-                    run = 0;
+    while (1) {
+        SCMutexLock(&tv->stream_pq->mutex_q);
+        uint32_t len = tv->stream_pq->len;
+        SCMutexUnlock(&tv->stream_pq->mutex_q);
+        if (len > 0) {
+            while (len--) {
+                SCMutexLock(&tv->stream_pq->mutex_q);
+                Packet *p = PacketDequeue(tv->stream_pq);
+                SCMutexUnlock(&tv->stream_pq->mutex_q);
+                if (likely(p)) {
+                    if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
+                        if (r == TM_ECODE_FAILED)
+                            break;
+                    }
+                }
             }
         } else {
             SleepUsec(1);
         }
 
         if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
-            run = 0;
+            break;
         }
     }
     SCLogDebug("flow end loop complete");