/**
* \brief Separate run function so we can call it recursively.
*
- * \todo Deal with post_pq for slots beyond the first.
+ * \note post_pq if only used for first slot
*/
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
{
/* output the packet */
tv->tmqh_out(tv, p);
- } /* if (p != NULL) */
-
- /* now handle the post_pq packets */
- TmSlot *slot;
- for (slot = s; slot != NULL; slot = slot->slot_next) {
- if (slot->slot_post_pq.top != NULL) {
- while (1) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
-
- if (extra_p == NULL)
- break;
-
- if (slot->slot_next != NULL) {
- r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
- if (r == TM_ECODE_FAILED) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
-
- TmqhOutputPacketpool(tv, extra_p);
- TmThreadsSetFlag(tv, THV_FAILED);
- break;
- }
- }
- /* output the packet */
- tv->tmqh_out(tv, extra_p);
- } /* while */
- } /* if */
- } /* for */
+ /* now handle the post_pq packets */
+ TmThreadsSlotHandlePostPQs(tv, s);
+ }
if (TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0;
TmThreadsSetFlag(tv, THV_FAILED);
}
+/**
+ * \brief Handle timeout from the capture layer. Checks
+ * post-pq which may have been filled by the flow
+ * manager.
+ */
+static inline void TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
+{
+ /* post process pq: only the first slot will possible have used it */
+ if (s->slot_post_pq.top != NULL) {
+ while (1) {
+ SCMutexLock(&s->slot_post_pq.mutex_q);
+ Packet *extra_p = PacketDequeue(&s->slot_post_pq);
+ SCMutexUnlock(&s->slot_post_pq.mutex_q);
+ if (extra_p == NULL)
+ break;
+
+ if (s->slot_next != NULL) {
+ TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
+ if (r == TM_ECODE_FAILED) {
+ TmThreadsSlotProcessPktFail(tv, s, extra_p);
+ break;
+ }
+ }
+ tv->tmqh_out(tv, extra_p);
+ }
+ }
+}
+
/**
* \brief Process the rest of the functions (if any) and queue.
*/
tv->tmqh_out(tv, p);
- /* post process pq */
- for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
- if (slot->slot_post_pq.top != NULL) {
- while (1) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
-
- if (extra_p == NULL)
- break;
-
- if (slot->slot_next != NULL) {
- r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
- if (r == TM_ECODE_FAILED) {
- TmThreadsSlotProcessPktFail(tv, slot, extra_p);
- break;
- }
- }
- tv->tmqh_out(tv, extra_p);
- }
- } /* if (slot->slot_post_pq.top != NULL) */
- }
- return TM_ECODE_OK;
-}
-
-/**
- * \brief Handle timeout from the capture layer. Checks
- * post-pq which may have been filled by the flow
- * manager.
- */
-static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
-{
- /* post process pq */
- for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
- if (slot->slot_post_pq.top != NULL) {
- while (1) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
-
- if (extra_p == NULL)
- break;
+ TmThreadsSlotHandlePostPQs(tv, s);
- if (slot->slot_next != NULL) {
- TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
- if (r == TM_ECODE_FAILED) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
-
- TmqhOutputPacketpool(tv, extra_p);
- TmThreadsSetFlag(tv, THV_FAILED);
- return TM_ECODE_FAILED;
- }
- }
- tv->tmqh_out(tv, extra_p);
- }
- }
- }
return TM_ECODE_OK;
}