return;
}
+static bool ThreadStillHasPackets(ThreadVars *tv)
+{
+ if (tv->inq != NULL) {
+ /* we wait till we dry out all the inq packets, before we
+ * kill this thread. Do note that you should have disabled
+ * packet acquire by now using TmThreadDisableReceiveThreads()*/
+ if (!(strlen(tv->inq->name) == strlen("packetpool") &&
+ strcasecmp(tv->inq->name, "packetpool") == 0)) {
+ PacketQueue *q = &trans_q[tv->inq->id];
+ SCMutexLock(&q->mutex_q);
+ uint32_t len = q->len;
+ SCMutexUnlock(&q->mutex_q);
+ if (len != 0) {
+ return true;
+ }
+ }
+ }
+
+ if (tv->stream_pq != NULL) {
+ SCMutexLock(&tv->stream_pq->mutex_q);
+ uint32_t len = tv->stream_pq->len;
+ SCMutexUnlock(&tv->stream_pq->mutex_q);
+
+ if (len != 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
/**
* \brief Kill a thread.
*
return 1;
}
- if (tv->inq != NULL) {
- /* we wait till we dry out all the inq packets, before we
- * kill this thread. Do note that you should have disabled
- * packet acquire by now using TmThreadDisableReceiveThreads()*/
- if (!(strlen(tv->inq->name) == strlen("packetpool") &&
- strcasecmp(tv->inq->name, "packetpool") == 0)) {
- PacketQueue *q = &trans_q[tv->inq->id];
- if (q->len != 0) {
- return 0;
- }
- }
- }
-
/* set the thread flag informing the thread that it needs to be
* terminated */
TmThreadsSetFlag(tv, THV_KILL);
/** \internal
*
* \brief make sure that all packet threads are done processing their
- * in-flight packets
+ * in-flight packets, including 'injected' flow packets.
*/
static void TmThreadDrainPacketThreads(void)
{
/* all receive threads are part of packet processing threads */
tv = tv_root[TVT_PPT];
while (tv) {
- if (tv->inq != NULL) {
+ if (ThreadStillHasPackets(tv)) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
- if (!(strlen(tv->inq->name) == strlen("packetpool") &&
- strcasecmp(tv->inq->name, "packetpool") == 0)) {
- PacketQueue *q = &trans_q[tv->inq->id];
- if (q->len != 0) {
- SCMutexUnlock(&tv_root_lock);
+ SCMutexUnlock(&tv_root_lock);
- /* sleep outside lock */
- SleepMsec(1);
- goto again;
- }
- }
+ /* sleep outside lock */
+ SleepMsec(1);
+ goto again;
}
-
tv = tv->next;
}
}
if (disable) {
- if (tv->inq != NULL) {
+ if (ThreadStillHasPackets(tv)) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
- if (!(strlen(tv->inq->name) == strlen("packetpool") &&
- strcasecmp(tv->inq->name, "packetpool") == 0)) {
- PacketQueue *q = &trans_q[tv->inq->id];
- if (q->len != 0) {
- SCMutexUnlock(&tv_root_lock);
- /* don't sleep while holding a lock */
- SleepMsec(1);
- goto again;
- }
- }
+ SCMutexUnlock(&tv_root_lock);
+ /* don't sleep while holding a lock */
+ SleepMsec(1);
+ goto again;
}
/* we found a receive TV. Send it a KILL_PKTACQ signal. */
* receive TM amongst the slots in a tv, it indicates we are done
* with all receive threads */
while (tv) {
- if (tv->inq != NULL) {
- /* we wait till we dry out all the inq packets, before we
- * kill this thread. Do note that you should have disabled
- * packet acquire by now using TmThreadDisableReceiveThreads()*/
- if (!(strlen(tv->inq->name) == strlen("packetpool") &&
- strcasecmp(tv->inq->name, "packetpool") == 0)) {
- PacketQueue *q = &trans_q[tv->inq->id];
- if (q->len != 0) {
- SCMutexUnlock(&tv_root_lock);
- /* don't sleep while holding a lock */
- SleepMsec(1);
- goto again;
- }
- }
- }
-
/* we found our receive TV. Send it a KILL signal. This is all
* we need to do to kill receive threads */
TmThreadsSetFlag(tv, THV_KILL);