AFPDumpCounters(ptv);
last_dump = current_time;
}
- /* poll timed out, lets see if we need to inject a fake packet */
- TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL);
+ /* poll timed out, lets see handle our timeout path */
+ TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL);
} else if ((r < 0) && (errno != EINTR)) {
SCLogError(SC_ERR_AFP_READ, "Error reading data from iface '%s': (%d) %s",
return r;
}
+/**
+ * \brief Handle timeout from the capture layer. Checks
+ * post-pq which may have been filled by the flow
+ * manager.
+ */
+static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
+{
+ /* post process pq */
+ for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
+ if (slot->slot_post_pq.top != NULL) {
+ while (1) {
+ SCMutexLock(&slot->slot_post_pq.mutex_q);
+ Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
+ SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+
+ if (extra_p == NULL)
+ break;
+
+ if (slot->slot_next != NULL) {
+ TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
+ if (r == TM_ECODE_FAILED) {
+ SCMutexLock(&slot->slot_post_pq.mutex_q);
+ TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
+ SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+
+ TmqhOutputPacketpool(tv, extra_p);
+ TmThreadsSetFlag(tv, THV_FAILED);
+ return TM_ECODE_FAILED;
+ }
+ }
+ tv->tmqh_out(tv, extra_p);
+ }
+ }
+ }
+ return TM_ECODE_OK;
+}
+
/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
* Allow caller to supply their own packet
*
* to force a packet through the engine to complete a reload */
static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, TmSlot *slot, Packet *p)
{
- if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
- TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
- if (p == NULL)
- p = PacketGetFromQueueOrAlloc();
- if (p != NULL) {
- p->flags |= PKT_PSEUDO_STREAM_END;
- if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) {
- TmqhOutputPacketpool(tv, p);
- }
+ TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
+ if (p == NULL)
+ p = PacketGetFromQueueOrAlloc();
+ if (p != NULL) {
+ p->flags |= PKT_PSEUDO_STREAM_END;
+ if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) {
+ TmqhOutputPacketpool(tv, p);
}
}
}
+static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, Packet *p)
+{
+ if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
+ TmThreadsCaptureInjectPacket(tv, slot, p);
+ } else {
+ TmThreadsSlotHandlePostPQs(tv, slot);
+
+ /* packet could have been passed to us that we won't use
+ * return it to the pool. */
+ if (p != NULL)
+ tv->tmqh_out(tv, p);
+ }
+}
+
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);