#include "util-profiling.h"
#include "util-signal.h"
#include "queue.h"
+#include "util-validate.h"
#ifdef PROFILE_LOCKING
thread_local uint64_t mutex_lock_contention;
SC_ATOMIC_AND(tv->flags, ~flag);
}
+TmEcode TmThreadsProcessDecodePseudoPackets(
+ ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
+{
+ while (decode_pq->top != NULL) {
+ Packet *extra_p = PacketDequeueNoLock(decode_pq);
+ if (unlikely(extra_p == NULL))
+ continue;
+ DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
+
+ if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ }
+ SCReturnInt(TM_ECODE_OK);
+}
+
/**
* \brief Separate run function so we can call it recursively.
*/
return TM_ECODE_FAILED;
}
- /* handle new packets */
- while (tv->decode_pq.top != NULL) {
- Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
- if (unlikely(extra_p == NULL))
- continue;
-
- /* see if we need to process the packet */
- if (s->slot_next != NULL) {
- r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
- if (unlikely(r == TM_ECODE_FAILED)) {
- TmThreadsSlotProcessPktFail(tv, s, extra_p);
- return TM_ECODE_FAILED;
- }
- }
- tv->tmqh_out(tv, extra_p);
+ if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) != TM_ECODE_OK) {
+ return TM_ECODE_FAILED;
}
}