]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow-time: use live threads at shutdown
authorVictor Julien <victor@inliniac.net>
Mon, 1 Dec 2014 17:48:03 +0000 (18:48 +0100)
committerVictor Julien <victor@inliniac.net>
Wed, 17 Dec 2014 14:52:11 +0000 (15:52 +0100)
Update pktacq loop to process flow timeouts in a running engine.

Add a new step to the shutdown phase of packet acquisition loop
threads (pktacqloop).

The shutdown code lets the pktacqloop break out of it's packet
acquisition loop. The thread then enters a flow timeout loop, where
it processes packets from it's tv->stream_pq queue until it's
empty _and_ the KILL flag is set.

Make sure receive threads are done before moving on to flow hash
cleanup (recycle all). Without this the flow recycler could start
it's unconditional hash clean up while detect threads are still
running on the flows.

Update unix socket to match live modes.

src/flow-timeout.c
src/runmode-unix-socket.c
src/suricata.c
src/threadvars.h
src/tm-threads.c
src/tm-threads.h

index 5ab7b2b0eb610f8d3397994f3aef1174624c7685..4f146ab83f81af38e2bbe508d16cf3a5ed59c6a7 100644 (file)
@@ -523,10 +523,10 @@ static inline void FlowForceReassemblyForHash(void)
 {
     Flow *f;
     TcpSession *ssn;
-    int client_ok;
-    int server_ok;
-
+    int client_ok = 0;
+    int server_ok = 0;
     uint32_t idx = 0;
+
 #if 0
     /* We use this packet just for reassembly purpose */
     Packet *reassemble_p = PacketGetFromAlloc();
@@ -676,7 +676,7 @@ static inline void FlowForceReassemblyForHash(void)
 void FlowForceReassembly(void)
 {
     /* Do remember.  We need to have packet acquire disabled by now */
-
+#if 0
     /** ----- Part 1 ------*/
     /* Flush out unattended packets */
     FlowForceReassemblyFlushPendingPseudoPackets();
@@ -721,7 +721,7 @@ void FlowForceReassembly(void)
     }
 
     SCMutexUnlock(&tv_root_lock);
-
+#endif
     /** ----- Part 3 ----- **/
     /* Carry out flow reassembly for unattended flows */
     FlowForceReassemblyForHash();
index 83daaaf0c99b5cf728d2e46879054e811fbb3464..54c85fe739c7f3335a384d292aa29682660cd1a2 100644 (file)
@@ -290,21 +290,23 @@ TmEcode UnixSocketPcapFilesCheck(void *data)
         /* handle graceful shutdown of the flow engine, it's helper
          * threads and the packet threads */
         FlowKillFlowManagerThread();
-        TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
+        TmThreadDisableReceiveThreads();
         FlowForceReassembly();
-        TmThreadKillThreadsFamily(TVT_PPT);
-        TmThreadClearThreadsFamily(TVT_PPT);
+        TmThreadDisablePacketThreads();
         FlowKillFlowRecyclerThread();
 
-        /* kill remaining mgt threads */
+        /* kill the stats threads */
         TmThreadKillThreadsFamily(TVT_MGMT);
         TmThreadClearThreadsFamily(TVT_MGMT);
-        SCPerfReleaseResources();
 
-        RunModeShutDown();
+        /* kill packet threads -- already in 'disabled' state */
+        TmThreadKillThreadsFamily(TVT_PPT);
+        TmThreadClearThreadsFamily(TVT_PPT);
 
         /* mgt and ppt threads killed, we can run non thread-safe
          * shutdown functions */
+        SCPerfReleaseResources();
+        RunModeShutDown();
         FlowShutdown();
         HostCleanup();
         StreamTcpFreeConfig(STREAM_VERBOSE);
index 31a2411fe7bd73a5b53028c80aa4c637919e3c33..25dab7e14923d8fb12818dc60a083adbe53185c5 100644 (file)
@@ -2392,20 +2392,19 @@ int main(int argc, char **argv)
         FlowKillFlowManagerThread();
     }
 
-    /* Disable packet acquire thread first */
-    TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
+    /* Disable packet acquisition first */
+    TmThreadDisableReceiveThreads();
 
     if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
         FlowForceReassembly();
+        /* kill receive threads when they have processed all
+         * flow timeout packets */
+        TmThreadDisablePacketThreads();
     }
 
     SCPrintElapsedTime(&suri);
 
     if (suri.rule_reload == 1) {
-        /* Disable detect threads first.  This is required by live rule swap */
-        TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM |
-                                      TM_FLAG_STREAM_TM | TM_FLAG_DETECT_TM);
-
         /* wait if live rule swap is in progress */
         if (UtilSignalIsHandler(SIGUSR2, SignalHandlerSigusr2Idle)) {
             SCLogInfo("Live rule swap in progress.  Waiting for it to end "
@@ -2430,6 +2429,7 @@ int main(int argc, char **argv)
         FlowKillFlowRecyclerThread();
     }
 
+    /* kill remaining threads */
     TmThreadKillThreads();
 
     if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
index f8277b7758c5e60606b67ee6b6d000dfdc15286e..64c215bcca807fbce97ace2bcdb3fc7a9e0839c0 100644 (file)
@@ -45,6 +45,8 @@ struct TmSlot_;
 #define THV_DEINIT    (1 << 7)
 #define THV_RUNNING_DONE (1 << 8) /** thread has completed running and is entering
                                    * the de-init phase */
+#define THV_KILL_PKTACQ (1 << 9)    /**< flag thread to stop packet acq */
+#define THV_FLOW_LOOP (1 << 10)   /**< thread is in flow shutdown loop */
 
 /** Thread flags set and read by threads, to control the threads, when they
  *  encounter certain conditions like failure */
index 60b18c1daf9b7834bb16d0008219e2dc0e2c6b3a..64964c308f5108654818dfdce5b3b5fd844984fd 100644 (file)
@@ -185,6 +185,56 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p,
     return TM_ECODE_OK;
 }
 
+/** \internal
+ *
+ *  \brief Process flow timeout packets
+ *
+ *  Process flow timeout pseudo packets. During shutdown this loop
+ *  is run until the flow engine kills the thread and the queue is
+ *  empty.
+ */
+static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
+{
+    TmSlot *stream_slot = NULL, *slot = NULL;
+    int run = 1;
+    int r = TM_ECODE_OK;
+
+    for (slot = s; slot != NULL; slot = slot->slot_next) {
+        if (slot->tm_id == TMM_STREAMTCP) {
+            stream_slot = slot;
+            break;
+        }
+    }
+
+    if (tv->stream_pq == NULL || stream_slot == NULL)
+        return r;
+
+    SCLogDebug("flow end loop starting");
+    while(run) {
+        Packet *p;
+        if (tv->stream_pq->len != 0) {
+            SCMutexLock(&tv->stream_pq->mutex_q);
+            p = PacketDequeue(tv->stream_pq);
+            SCMutexUnlock(&tv->stream_pq->mutex_q);
+            BUG_ON(p == NULL);
+
+            if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
+                if (r == TM_ECODE_FAILED)
+                    run = 0;
+            }
+        } else {
+            usleep(1);
+        }
+
+        if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
+            run = 0;
+        }
+    }
+    SCLogDebug("flow end loop complete");
+
+    return r;
+}
+
 /*
 
     pcap/nfq
@@ -297,7 +347,7 @@ void *TmThreadsSlotPktAcqLoop(void *td)
 
         r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
 
-        if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL)
+        if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL_PKTACQ)
             || suricata_ctl_flags) {
             run = 0;
         }
@@ -307,6 +357,11 @@ void *TmThreadsSlotPktAcqLoop(void *td)
     }
     SCPerfSyncCounters(tv);
 
+    TmThreadsSetFlag(tv, THV_FLOW_LOOP);
+
+    /* process all pseudo packets the flow timeout may throw at us */
+    TmThreadTimeoutLoop(tv, s);
+
     PacketPoolDestroy();
 
     TmThreadsSetFlag(tv, THV_RUNNING_DONE);
@@ -1442,9 +1497,13 @@ void TmThreadKillThread(ThreadVars *tv)
 }
 
 /**
- * \brief Disable all threads having the specified TMs.
+ *  \brief Disable all threads having the specified TMs.
+ *
+ *  Breaks out of the packet acquisition loop, and bumps
+ *  into the 'flow loop', where it will process packets
+ *  from the flow engine's shutdown handling.
  */
-void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
+void TmThreadDisableReceiveThreads(void)
 {
     /* value in seconds */
 #define THREAD_KILL_MAX_WAIT_TIME 60
@@ -1471,7 +1530,7 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
         while (slots != NULL) {
             TmModule *tm = TmModuleGetById(slots->tm_id);
 
-            if (tm->flags & tm_flags) {
+            if (tm->flags & TM_FLAG_RECEIVE_TM) {
                 disable = 1;
                 break;
             }
@@ -1494,9 +1553,8 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
                 }
             }
 
-            /* we found our receive TV.  Send it a KILL signal.  This is all
-             * we need to do to kill receive threads */
-            TmThreadsSetFlag(tv, THV_KILL);
+            /* we found a receive TV. Send it a KILL_PKTACQ signal. */
+            TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
 
             if (tv->inq != NULL) {
                 int i;
@@ -1509,7 +1567,8 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
                 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
             }
 
-            while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+            /* wait for it to enter the 'flow loop' stage */
+            while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
                 usleep(WAIT_TIME);
                 total_wait_time += WAIT_TIME / 1000000.0;
                 if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
@@ -1529,6 +1588,77 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
     return;
 }
 
+/**
+ * \brief Disable all threads having the specified TMs.
+ */
+void TmThreadDisablePacketThreads(void)
+{
+    /* value in seconds */
+#define THREAD_KILL_MAX_WAIT_TIME 60
+    /* value in microseconds */
+#define WAIT_TIME 100
+
+    double total_wait_time = 0;
+
+    ThreadVars *tv = NULL;
+
+    SCMutexLock(&tv_root_lock);
+
+    /* all receive threads are part of packet processing threads */
+    tv = tv_root[TVT_PPT];
+
+    /* we do have to keep in mind that TVs are arranged in the order
+     * right from receive to log.  The moment we fail to find a
+     * receive TM amongst the slots in a tv, it indicates we are done
+     * with all receive threads */
+    while (tv) {
+        if (tv->inq != NULL) {
+            /* we wait till we dry out all the inq packets, before we
+             * kill this thread.  Do note that you should have disabled
+             * packet acquire by now using TmThreadDisableReceiveThreads()*/
+            if (!(strlen(tv->inq->name) == strlen("packetpool") &&
+                        strcasecmp(tv->inq->name, "packetpool") == 0)) {
+                PacketQueue *q = &trans_q[tv->inq->id];
+                while (q->len != 0) {
+                    usleep(1000);
+                }
+            }
+        }
+
+        /* we found our receive TV.  Send it a KILL signal.  This is all
+         * we need to do to kill receive threads */
+        TmThreadsSetFlag(tv, THV_KILL);
+
+        if (tv->inq != NULL) {
+            int i;
+            for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
+                if (tv->inq->q_type == 0)
+                    SCCondSignal(&trans_q[tv->inq->id].cond_q);
+                else
+                    SCCondSignal(&data_queues[tv->inq->id].cond_q);
+            }
+            SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
+        }
+
+        while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+            usleep(WAIT_TIME);
+            total_wait_time += WAIT_TIME / 1000000.0;
+            if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
+                SCLogError(SC_ERR_FATAL, "Engine unable to "
+                        "disable detect thread - \"%s\".  "
+                        "Killing engine", tv->name);
+                exit(EXIT_FAILURE);
+            }
+        }
+
+        tv = tv->next;
+    }
+
+    SCMutexUnlock(&tv_root_lock);
+
+    return;
+}
+
 TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
 {
     ThreadVars *tv = NULL;
index 4d864d6ebe7597fafb24312e0b5384cbadfe88bb..9ffc9efba8106a53826a3fb1ed6dbd1fe2db5bdf 100644 (file)
@@ -130,7 +130,8 @@ void TmThreadWaitForFlag(ThreadVars *, uint16_t);
 TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
 
 ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *);
-void TmThreadDisableThreadsWithTMS(uint8_t tm_flags);
+void TmThreadDisablePacketThreads(void);
+void TmThreadDisableReceiveThreads(void);
 TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *);
 
 /**