/* input a packet */
p = tv->tmqh_in(tv);
+ /* if we didn't get a packet see if we need to do some housekeeping */
+ if (unlikely(p == NULL)) {
+ if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+ p = PacketGetFromQueueOrAlloc();
+ if (p != NULL) {
+ p->flags |= PKT_PSEUDO_STREAM_END;
+ PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
+ }
+ }
+ }
+
if (p != NULL) {
/* run the thread module(s) */
r = TmThreadsSlotVarRun(tv, p, s);
* manager.
* \param s pipeline to run on these packets.
*/
-static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
+static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
{
PacketQueue *pq = tv->stream_pq_local;
if (pq && pq->len > 0) {
}
tv->tmqh_out(tv, extra_p);
}
+ return true;
+ } else {
+ return false;
}
}
}
}
+/** \brief handle capture timeout
+ * When a capture method times out we check for house keeping
+ * tasks in the capture thread.
+ *
+ * \param p packet. Capture method may have taken a packet from
+ * the pool prior to the timing out call. We will then
+ * use that packet. Otherwise we can get our own.
+ */
static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
{
if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
- TmThreadsCaptureInjectPacket(tv, p);
- } else {
- TmThreadsHandleInjectedPackets(tv);
+ TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+ return;
- /* packet could have been passed to us that we won't use
- * return it to the pool. */
- if (p != NULL)
- tv->tmqh_out(tv, p);
+ } else {
+ if (TmThreadsHandleInjectedPackets(tv) == false) {
+ /* see if we have to do some house keeping */
+ if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+ TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+ return;
+ }
+ }
}
+
+ /* packet could have been passed to us that we won't use
+ * return it to the pool. */
+ if (p != NULL)
+ tv->tmqh_out(tv, p);
}
void TmThreadsListThreads(void);