uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
+static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
+{
+ TmqhOutputPacketpool(tv, p);
+ for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
+ SCMutexLock(&slot->slot_post_pq.mutex_q);
+ TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
+ SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+ }
+ TmThreadsSetFlag(tv, THV_FAILED);
+}
+
/**
* \brief Process the rest of the functions (if any) and queue.
*/
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
{
- TmEcode r = TM_ECODE_OK;
-
if (s == NULL) {
tv->tmqh_out(tv, p);
- return r;
+ return TM_ECODE_OK;
}
- if (TmThreadsSlotVarRun(tv, p, s) == TM_ECODE_FAILED) {
- TmqhOutputPacketpool(tv, p);
- TmSlot *slot = s;
- while (slot != NULL) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+ TmEcode r = TmThreadsSlotVarRun(tv, p, s);
+ if (unlikely(r == TM_ECODE_FAILED)) {
+ TmThreadsSlotProcessPktFail(tv, s, p);
+ return TM_ECODE_FAILED;
+ }
- slot = slot->slot_next;
- }
- TmThreadsSetFlag(tv, THV_FAILED);
- r = TM_ECODE_FAILED;
+ tv->tmqh_out(tv, p);
- } else {
- tv->tmqh_out(tv, p);
+ /* post process pq */
+ for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
+ if (slot->slot_post_pq.top != NULL) {
+ while (1) {
+ SCMutexLock(&slot->slot_post_pq.mutex_q);
+ Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
+ SCMutexUnlock(&slot->slot_post_pq.mutex_q);
- /* post process pq */
- TmSlot *slot = s;
- while (slot != NULL) {
- if (slot->slot_post_pq.top != NULL) {
- while (1) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+ if (extra_p == NULL)
+ break;
- if (extra_p == NULL)
+ if (slot->slot_next != NULL) {
+ r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
+ if (r == TM_ECODE_FAILED) {
+ TmThreadsSlotProcessPktFail(tv, slot, extra_p);
break;
-
- if (slot->slot_next != NULL) {
- r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
- if (r == TM_ECODE_FAILED) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
-
- TmqhOutputPacketpool(tv, extra_p);
- TmThreadsSetFlag(tv, THV_FAILED);
- break;
- }
}
- tv->tmqh_out(tv, extra_p);
}
- } /* if (slot->slot_post_pq.top != NULL) */
- slot = slot->slot_next;
- } /* while (slot != NULL) */
+ tv->tmqh_out(tv, extra_p);
+ }
+ } /* if (slot->slot_post_pq.top != NULL) */
}
-
- return r;
+ return TM_ECODE_OK;
}
/**