]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threads: fix missed logging at shutdown 2518/head
authorVictor Julien <victor@inliniac.net>
Thu, 26 Jan 2017 09:16:53 +0000 (10:16 +0100)
committerVictor Julien <victor@inliniac.net>
Thu, 26 Jan 2017 09:23:13 +0000 (10:23 +0100)
At shutdown, all flows that still need work are handled by the flow
force reassembly logic. This means one or more flow end pseudo packets
are generated and pushed through the engine for final detection and
logging.

In some cases this would not work correctly. This was caused by the
flow timeout logic kicking in before all the 'live' packets were
processed. Before the flow timeout handling runs the receive threads
are disabled, however the engine did not wait for the in-flight
packets to be fully processed. In autofp mode, packets could still
be in the queue between receive thread(s) and flow worker(s).

This patch adds a new function that 'drains' all the packet threads
of any in-progress packets before moving on the flow timeout logic.

Bug #1946.

src/tm-threads.c

index 9df10ff9370ab6a953eb2caaab60f4506c0a2329..bc866aa215dc8d9744ac8bdb818916c95b26c1f5 100644 (file)
@@ -1521,6 +1521,64 @@ void TmThreadKillThread(ThreadVars *tv)
     return;
 }
 
+/** \internal
+ *
+ *  \brief make sure that all packet threads are done processing their
+ *         in-flight packets
+ */
+static void TmThreadDrainPacketThreads(void)
+{
+    /* value in seconds */
+#define THREAD_KILL_MAX_WAIT_TIME 60
+    /* value in microseconds */
+#define WAIT_TIME 1000
+
+    uint64_t total_wait_time = 0;
+
+    ThreadVars *tv = NULL;
+
+again:
+    if (total_wait_time > (THREAD_KILL_MAX_WAIT_TIME * 1000000)) {
+        SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
+                "to process their packets in time");
+        return;
+    }
+
+    SCMutexLock(&tv_root_lock);
+
+    /* all receive threads are part of packet processing threads */
+    tv = tv_root[TVT_PPT];
+
+    while (tv) {
+        if (tv->inq != NULL) {
+            /* we wait till we dry out all the inq packets, before we
+             * kill this thread.  Do note that you should have disabled
+             * packet acquire by now using TmThreadDisableReceiveThreads()*/
+            if (!(strlen(tv->inq->name) == strlen("packetpool") &&
+                        strcasecmp(tv->inq->name, "packetpool") == 0)) {
+                PacketQueue *q = &trans_q[tv->inq->id];
+                if (q->len != 0) {
+                    SCMutexUnlock(&tv_root_lock);
+
+                    total_wait_time += WAIT_TIME;
+
+                    /* don't sleep while holding a lock */
+                    usleep(WAIT_TIME);
+                    goto again;
+                }
+            }
+        }
+
+        tv = tv->next;
+    }
+
+    SCMutexUnlock(&tv_root_lock);
+    return;
+
+#undef THREAD_KILL_MAX_WAIT_TIME
+#undef WAIT_TIME
+}
+
 /**
  *  \brief Disable all threads having the specified TMs.
  *
@@ -1618,6 +1676,11 @@ again:
 
     SCMutexUnlock(&tv_root_lock);
 
+    /* finally wait for all packet threads to have
+     * processed all of their 'live' packets so we
+     * don't process the last live packets together
+     * with FFR packets */
+    TmThreadDrainPacketThreads();
     return;
 }
 
@@ -1635,6 +1698,8 @@ void TmThreadDisablePacketThreads(void)
 
     ThreadVars *tv = NULL;
 
+    /* first drain all packet threads of their packets */
+    TmThreadDrainPacketThreads();
 again:
     SCMutexLock(&tv_root_lock);