/**
* \brief Separate run function so we can call it recursively.
- *
- * \note post_pq if only used for first slot
*/
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
{
/* handle error */
if (unlikely(r == TM_ECODE_FAILED)) {
/* Encountered error. Return packets to packetpool and return */
- TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
-
- SCMutexLock(&s->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
- SCMutexUnlock(&s->slot_post_pq.mutex_q);
-
- TmThreadsSetFlag(tv, THV_FAILED);
+ TmThreadsSlotProcessPktFail(tv, s, NULL);
return TM_ECODE_FAILED;
}
if (s->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
if (unlikely(r == TM_ECODE_FAILED)) {
- TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
-
- SCMutexLock(&s->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
- SCMutexUnlock(&s->slot_post_pq.mutex_q);
-
- TmqhOutputPacketpool(tv, extra_p);
- TmThreadsSetFlag(tv, THV_FAILED);
+ TmThreadsSlotProcessPktFail(tv, s, extra_p);
return TM_ECODE_FAILED;
}
}
}
/** \internal
- * \brief check 'slot' pre_pq and post_pq at thread cleanup
+ * \brief check 'slot' pre_pq and thread cleanup
* and dump detailed info about the state of the packets
* and threads if in a unexpected state.
*/
static void CheckSlot(const TmSlot *slot)
{
- if (slot->slot_pre_pq.len || slot->slot_post_pq.len) {
+ if (slot->slot_pre_pq.len) {
for (Packet *xp = slot->slot_pre_pq.top; xp != NULL; xp = xp->next) {
SCLogNotice("pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s",
slot->id, slot->tm_id, slot->slot_pre_pq.len, PktSrcToString(xp->pkt_src));
}
- for (Packet *xp = slot->slot_post_pq.top; xp != NULL; xp = xp->next) {
- SCLogNotice("post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s",
- slot->id, slot->tm_id, slot->slot_post_pq.len, PktSrcToString(xp->pkt_src));
- }
TmThreadDumpThreads();
abort();
}
}
memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
- memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
- SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
-
- /* get the 'pre qeueue' from module before the stream module */
- if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
- SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
- tv->stream_pq = &slot->slot_post_pq;
- /* if the stream module is the first, get the threads input queue */
- } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
+
+ /* if the flowworker module is the first, get the threads input queue */
+ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id];
- SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
+ SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+ /* setup a queue */
+ } else if (slot->tm_id == TMM_FLOWWORKER) {
+ tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+ if (tv->stream_pq_local == NULL)
+ FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
+ SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+ tv->stream_pq = tv->stream_pq_local;
+ SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
}
}
}
memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
- memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
- SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
-
- /* get the 'pre qeueue' from module before the stream module */
- if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
- SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
- tv->stream_pq = &slot->slot_post_pq;
- /* if the stream module is the first, get the threads input queue */
- } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
+
+ /* if the flowworker module is the first, get the threads input queue */
+ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id];
- SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
+ SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+ /* setup a queue */
+ } else if (slot->tm_id == TMM_FLOWWORKER) {
+ tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+ if (tv->stream_pq_local == NULL)
+ FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
+ SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+ tv->stream_pq = tv->stream_pq_local;
+ SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
}
}
}
#endif
-/**
- * \todo Only the first "slot" currently makes the "post_pq" available
- * to the thread module.
- */
static void *TmThreadsSlotVar(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
}
memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
- memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
- SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
/* special case: we need to access the stream queue
* from the flow timeout code */
- /* get the 'pre qeueue' from module before the stream module */
- if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
- SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
- tv->stream_pq = &s->slot_pre_pq;
- /* if the stream module is the first, get the threads input queue */
- } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
+ /* if the flowworker module is the first, get the threads input queue */
+ if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id];
- SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
+ SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+ /* setup a queue */
+ } else if (s->tm_id == TMM_FLOWWORKER) {
+ tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+ if (tv->stream_pq_local == NULL)
+ FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
+ SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+ tv->stream_pq = tv->stream_pq_local;
+ SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
}
}
/* output the packet */
tv->tmqh_out(tv, p);
- /* now handle the post_pq packets */
- TmThreadsSlotHandlePostPQs(tv, s);
+ /* now handle the stream pq packets */
+ TmThreadsHandleInjectedPackets(tv, s);
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
(void)SC_ATOMIC_SET(s->slot_data, slot_data);
}
memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
- memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
StatsSetupPrivate(tv);
SCFree(tv->printable_name);
}
+ if (tv->stream_pq_local) {
+ BUG_ON(tv->stream_pq_local->len);
+ SCMutexDestroy(&tv->stream_pq_local->mutex_q);
+ SCFree(tv->stream_pq_local);
+ }
+
s = (TmSlot *)tv->tm_slots;
while (s) {
ps = s;
{
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
TmModule *m = TmModuleGetById(s->tm_id);
- SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s %s",
- tv, s, s->id, s->tm_id, m->name, (tv->type == 0 && tv->stream_pq == &s->slot_post_pq) ? "<==== stream_pq" : "");
- if (tv->type == 0 && tv->stream_pq == &s->slot_pre_pq) {
- SCLogNotice("tv %p: -> slot %p/%d holds stream_pq %p IN PRE_PQ SUPER WEIRD", tv, s, s->id, tv->stream_pq);
- }
+ SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s",
+ tv, s, s->id, s->tm_id, m->name);
for (Packet *xp = s->slot_pre_pq.top; xp != NULL; xp = xp->next) {
SCLogNotice("tv %p: ==> pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s",
tv, s->id, s->tm_id, s->slot_pre_pq.len, PktSrcToString(xp->pkt_src));
}
- for (Packet *xp = s->slot_post_pq.top; xp != NULL; xp = xp->next) {
- SCLogNotice("tv %p: ==> post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s",
- tv, s->id, s->tm_id, s->slot_post_pq.len, PktSrcToString(xp->pkt_src));
- }
}
}
tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) {
SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
+ } else if (tv->stream_pq_local != NULL) {
+ for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
+ SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
+ tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
+ }
}
TmThreadDoDumpSlots(tv);
tv = tv->next;
* The locks in the queue are NOT used */
PacketQueue slot_pre_pq;
- /* queue filled by the SlotFunc with packets that will
- * be processed futher _after_ the current packet. The
- * locks in the queue are NOT used */
- PacketQueue slot_post_pq;
-
/* store the thread module id */
int tm_id;
static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
{
- TmqhOutputPacketpool(tv, p);
- for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
- SCMutexLock(&slot->slot_post_pq.mutex_q);
- TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
- SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+ if (p != NULL) {
+ TmqhOutputPacketpool(tv, p);
+ }
+ TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
+ if (tv->stream_pq_local) {
+ SCMutexLock(&tv->stream_pq_local->mutex_q);
+ TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
+ SCMutexUnlock(&tv->stream_pq_local->mutex_q);
}
TmThreadsSetFlag(tv, THV_FAILED);
}
/**
* \brief Handle timeout from the capture layer. Checks
- * post-pq which may have been filled by the flow
+ * stream_pq which may have been filled by the flow
* manager.
+ * \param s pipeline to run on these packets.
*/
-static inline void TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
+static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv, TmSlot *s)
{
- /* post process pq: only the first slot will possible have used it */
- if (s->slot_post_pq.top != NULL) {
+ PacketQueue *pq = tv->stream_pq_local;
+ if (pq && pq->len > 0) {
while (1) {
- SCMutexLock(&s->slot_post_pq.mutex_q);
- Packet *extra_p = PacketDequeue(&s->slot_post_pq);
- SCMutexUnlock(&s->slot_post_pq.mutex_q);
+ SCMutexLock(&pq->mutex_q);
+ Packet *extra_p = PacketDequeue(pq);
+ SCMutexUnlock(&pq->mutex_q);
if (extra_p == NULL)
break;
-
- if (s->slot_next != NULL) {
- TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
- if (r == TM_ECODE_FAILED) {
- TmThreadsSlotProcessPktFail(tv, s, extra_p);
- break;
- }
+ TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s);
+ if (r == TM_ECODE_FAILED) {
+ TmThreadsSlotProcessPktFail(tv, s, extra_p);
+ break;
}
tv->tmqh_out(tv, extra_p);
}
tv->tmqh_out(tv, p);
- TmThreadsSlotHandlePostPQs(tv, s);
+ TmThreadsHandleInjectedPackets(tv, s);
return TM_ECODE_OK;
}
if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
TmThreadsCaptureInjectPacket(tv, slot, p);
} else {
- TmThreadsSlotHandlePostPQs(tv, slot);
+ TmThreadsHandleInjectedPackets(tv, slot);
/* packet could have been passed to us that we won't use
* return it to the pool. */