OutputLoggerExitPrintStats(tv, fw->output_thread);
}
+static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
+{
+ FlowWorkerThreadData *fw = flow_worker;
+ if (fw->pq.len)
+ return true;
+ if (fw->fls.work_queue.len)
+ return true;
+
+ if (tv->flow_queue) {
+ FQLOCK_LOCK(tv->flow_queue);
+ bool fq_done = (tv->flow_queue->qlen == 0);
+ FQLOCK_UNLOCK(tv->flow_queue);
+ if (!fq_done) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
void TmModuleFlowWorkerRegister (void)
{
tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
+ tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
return 1;
}
+static bool ThreadBusy(ThreadVars *tv)
+{
+ for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
+ TmModule *tm = TmModuleGetById(s->tm_id);
+ if (tm && tm->ThreadBusy != NULL) {
+ if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
+ return true;
+ }
+ }
+ return false;
+}
+
/** \internal
*
* \brief make sure that all packet threads are done processing their
SleepMsec(1);
goto again;
}
- if (tv->flow_queue) {
- FQLOCK_LOCK(tv->flow_queue);
- bool fq_done = (tv->flow_queue->qlen == 0);
- FQLOCK_UNLOCK(tv->flow_queue);
- if (!fq_done) {
- SCMutexUnlock(&tv_root_lock);
-
- Packet *p = PacketGetFromAlloc();
- if (p != NULL) {
- p->flags |= PKT_PSEUDO_STREAM_END;
- PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
- PacketQueue *q = tv->stream_pq;
- SCMutexLock(&q->mutex_q);
- PacketEnqueue(q, p);
- SCCondSignal(&q->cond_q);
- SCMutexUnlock(&q->mutex_q);
- }
+ if (ThreadBusy(tv)) {
+ SCMutexUnlock(&tv_root_lock);
- /* don't sleep while holding a lock */
- SleepMsec(1);
- goto again;
+ Packet *p = PacketGetFromAlloc();
+ if (p != NULL) {
+ p->flags |= PKT_PSEUDO_STREAM_END;
+ PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
+ PacketQueue *q = tv->stream_pq;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
}
+
+ /* don't sleep while holding a lock */
+ SleepMsec(1);
+ goto again;
}
tv = tv->next;
}
goto again;
}
- if (tv->flow_queue) {
- FQLOCK_LOCK(tv->flow_queue);
- bool fq_done = (tv->flow_queue->qlen == 0);
- FQLOCK_UNLOCK(tv->flow_queue);
- if (!fq_done) {
- SCMutexUnlock(&tv_root_lock);
-
- Packet *p = PacketGetFromAlloc();
- if (p != NULL) {
- p->flags |= PKT_PSEUDO_STREAM_END;
- PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
- PacketQueue *q = tv->stream_pq;
- SCMutexLock(&q->mutex_q);
- PacketEnqueue(q, p);
- SCCondSignal(&q->cond_q);
- SCMutexUnlock(&q->mutex_q);
- }
+ if (ThreadBusy(tv)) {
+ SCMutexUnlock(&tv_root_lock);
- /* don't sleep while holding a lock */
- SleepMsec(1);
- goto again;
+ Packet *p = PacketGetFromAlloc();
+ if (p != NULL) {
+ p->flags |= PKT_PSEUDO_STREAM_END;
+ PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
+ PacketQueue *q = tv->stream_pq;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
}
+
+ /* don't sleep while holding a lock */
+ SleepMsec(1);
+ goto again;
}
/* we found a receive TV. Send it a KILL_PKTACQ signal. */