/* handle graceful shutdown of the flow engine, it's helper
* threads and the packet threads */
FlowKillFlowManagerThread();
- TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
+ TmThreadDisableReceiveThreads();
FlowForceReassembly();
- TmThreadKillThreadsFamily(TVT_PPT);
- TmThreadClearThreadsFamily(TVT_PPT);
+ TmThreadDisablePacketThreads();
FlowKillFlowRecyclerThread();
- /* kill remaining mgt threads */
+ /* kill the stats threads */
TmThreadKillThreadsFamily(TVT_MGMT);
TmThreadClearThreadsFamily(TVT_MGMT);
- SCPerfReleaseResources();
- RunModeShutDown();
+ /* kill packet threads -- already in 'disabled' state */
+ TmThreadKillThreadsFamily(TVT_PPT);
+ TmThreadClearThreadsFamily(TVT_PPT);
/* mgt and ppt threads killed, we can run non thread-safe
* shutdown functions */
+ SCPerfReleaseResources();
+ RunModeShutDown();
FlowShutdown();
HostCleanup();
StreamTcpFreeConfig(STREAM_VERBOSE);
FlowKillFlowManagerThread();
}
- /* Disable packet acquire thread first */
- TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
+ /* Disable packet acquisition first */
+ TmThreadDisableReceiveThreads();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
FlowForceReassembly();
+ /* kill receive threads when they have processed all
+ * flow timeout packets */
+ TmThreadDisablePacketThreads();
}
SCPrintElapsedTime(&suri);
if (suri.rule_reload == 1) {
- /* Disable detect threads first. This is required by live rule swap */
- TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM |
- TM_FLAG_STREAM_TM | TM_FLAG_DETECT_TM);
-
/* wait if live rule swap is in progress */
if (UtilSignalIsHandler(SIGUSR2, SignalHandlerSigusr2Idle)) {
SCLogInfo("Live rule swap in progress. Waiting for it to end "
FlowKillFlowRecyclerThread();
}
+ /* kill remaining threads */
TmThreadKillThreads();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
return TM_ECODE_OK;
}
+/** \internal
+ *
+ * \brief Process flow timeout packets
+ *
+ * Process flow timeout pseudo packets. During shutdown this loop
+ * is run until the flow engine kills the thread and the queue is
+ * empty.
+ */
+static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
+{
+ TmSlot *stream_slot = NULL, *slot = NULL;
+ int run = 1;
+ int r = TM_ECODE_OK;
+
+ for (slot = s; slot != NULL; slot = slot->slot_next) {
+ if (slot->tm_id == TMM_STREAMTCP) {
+ stream_slot = slot;
+ break;
+ }
+ }
+
+ if (tv->stream_pq == NULL || stream_slot == NULL)
+ return r;
+
+ SCLogDebug("flow end loop starting");
+ while(run) {
+ Packet *p;
+ if (tv->stream_pq->len != 0) {
+ SCMutexLock(&tv->stream_pq->mutex_q);
+ p = PacketDequeue(tv->stream_pq);
+ SCMutexUnlock(&tv->stream_pq->mutex_q);
+ BUG_ON(p == NULL);
+
+ if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
+ if (r == TM_ECODE_FAILED)
+ run = 0;
+ }
+ } else {
+ usleep(1);
+ }
+
+ if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
+ run = 0;
+ }
+ }
+ SCLogDebug("flow end loop complete");
+
+ return r;
+}
+
/*
pcap/nfq
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
- if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL)
+ if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL_PKTACQ)
|| suricata_ctl_flags) {
run = 0;
}
}
SCPerfSyncCounters(tv);
+ TmThreadsSetFlag(tv, THV_FLOW_LOOP);
+
+ /* process all pseudo packets the flow timeout may throw at us */
+ TmThreadTimeoutLoop(tv, s);
+
PacketPoolDestroy();
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
}
/**
- * \brief Disable all threads having the specified TMs.
+ * \brief Disable all threads having the specified TMs.
+ *
+ * Breaks out of the packet acquisition loop, and bumps
+ * into the 'flow loop', where it will process packets
+ * from the flow engine's shutdown handling.
*/
-void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
+void TmThreadDisableReceiveThreads(void)
{
/* value in seconds */
#define THREAD_KILL_MAX_WAIT_TIME 60
while (slots != NULL) {
TmModule *tm = TmModuleGetById(slots->tm_id);
- if (tm->flags & tm_flags) {
+ if (tm->flags & TM_FLAG_RECEIVE_TM) {
disable = 1;
break;
}
}
}
- /* we found our receive TV. Send it a KILL signal. This is all
- * we need to do to kill receive threads */
- TmThreadsSetFlag(tv, THV_KILL);
+ /* we found a receive TV. Send it a KILL_PKTACQ signal. */
+ TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
if (tv->inq != NULL) {
int i;
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
- while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+ /* wait for it to enter the 'flow loop' stage */
+ while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
usleep(WAIT_TIME);
total_wait_time += WAIT_TIME / 1000000.0;
if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
return;
}
+/**
+ * \brief Disable all threads having the specified TMs.
+ */
+void TmThreadDisablePacketThreads(void)
+{
+ /* value in seconds */
+#define THREAD_KILL_MAX_WAIT_TIME 60
+ /* value in microseconds */
+#define WAIT_TIME 100
+
+ double total_wait_time = 0;
+
+ ThreadVars *tv = NULL;
+
+ SCMutexLock(&tv_root_lock);
+
+ /* all receive threads are part of packet processing threads */
+ tv = tv_root[TVT_PPT];
+
+ /* we do have to keep in mind that TVs are arranged in the order
+ * right from receive to log. The moment we fail to find a
+ * receive TM amongst the slots in a tv, it indicates we are done
+ * with all receive threads */
+ while (tv) {
+ if (tv->inq != NULL) {
+ /* we wait till we dry out all the inq packets, before we
+ * kill this thread. Do note that you should have disabled
+ * packet acquire by now using TmThreadDisableReceiveThreads()*/
+ if (!(strlen(tv->inq->name) == strlen("packetpool") &&
+ strcasecmp(tv->inq->name, "packetpool") == 0)) {
+ PacketQueue *q = &trans_q[tv->inq->id];
+ while (q->len != 0) {
+ usleep(1000);
+ }
+ }
+ }
+
+ /* we found our receive TV. Send it a KILL signal. This is all
+ * we need to do to kill receive threads */
+ TmThreadsSetFlag(tv, THV_KILL);
+
+ if (tv->inq != NULL) {
+ int i;
+ for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
+ if (tv->inq->q_type == 0)
+ SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ else
+ SCCondSignal(&data_queues[tv->inq->id].cond_q);
+ }
+ SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
+ }
+
+ while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+ usleep(WAIT_TIME);
+ total_wait_time += WAIT_TIME / 1000000.0;
+ if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
+ SCLogError(SC_ERR_FATAL, "Engine unable to "
+ "disable detect thread - \"%s\". "
+ "Killing engine", tv->name);
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ tv = tv->next;
+ }
+
+ SCMutexUnlock(&tv_root_lock);
+
+ return;
+}
+
TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
{
ThreadVars *tv = NULL;