*/
static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
{
- TmSlot *stream_slot = NULL, *slot = NULL;
- int run = 1;
+ TmSlot *fw_slot = NULL;
int r = TM_ECODE_OK;
- for (slot = s; slot != NULL; slot = slot->slot_next) {
- if (slot->tm_id == TMM_FLOWWORKER)
- {
- stream_slot = slot;
+ for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
+ if (slot->tm_id == TMM_FLOWWORKER) {
+ fw_slot = slot;
break;
}
}
- if (tv->stream_pq == NULL || stream_slot == NULL) {
- SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot);
+ if (tv->stream_pq == NULL || fw_slot == NULL) {
+ SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
return r;
}
SCLogDebug("flow end loop starting");
- while(run) {
- Packet *p;
- if (tv->stream_pq->len != 0) {
- SCMutexLock(&tv->stream_pq->mutex_q);
- p = PacketDequeue(tv->stream_pq);
- SCMutexUnlock(&tv->stream_pq->mutex_q);
- BUG_ON(p == NULL);
-
- if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
- if (r == TM_ECODE_FAILED)
- run = 0;
+ while (1) {
+ SCMutexLock(&tv->stream_pq->mutex_q);
+ uint32_t len = tv->stream_pq->len;
+ SCMutexUnlock(&tv->stream_pq->mutex_q);
+ if (len > 0) {
+ while (len--) {
+ SCMutexLock(&tv->stream_pq->mutex_q);
+ Packet *p = PacketDequeue(tv->stream_pq);
+ SCMutexUnlock(&tv->stream_pq->mutex_q);
+ if (likely(p)) {
+ if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
+ if (r == TM_ECODE_FAILED)
+ break;
+ }
+ }
}
} else {
SleepUsec(1);
}
if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
- run = 0;
+ break;
}
}
SCLogDebug("flow end loop complete");