}
}
- SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
- PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p1);
- if (p2 != NULL)
- PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p2);
- if (p3 != NULL)
- PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3);
- SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
- if (stream_pseudo_pkt_decode_TV->inq != NULL) {
- SCCondSignal(&trans_q[stream_pseudo_pkt_decode_TV->inq->id].cond_q);
+ /* inject the packet(s) into the appropriate thread */
+ int thread_id = (int)f->thread_id;
+ Packet *packets[4] = { p1, p2 ? p2 : p3, p2 ? p3 : NULL, NULL }; /**< null terminated array of packets */
+ if (unlikely(!(TmThreadsInjectPacketsById(packets, thread_id)))) {
+ FlowDeReference(&p1->flow);
+ TmqhOutputPacketpool(NULL, p1);
+ if (p2) {
+ FlowDeReference(&p2->flow);
+ TmqhOutputPacketpool(NULL, p2);
+ }
+ if (p3) {
+ FlowDeReference(&p3->flow);
+ TmqhOutputPacketpool(NULL, p3);
+ }
}
/* done, in case of error (no packet) we still tag flow as complete
end:
SCMutexUnlock(&thread_store_lock);
}
+
+/**
+ * \retval r 1 if packet was accepted, 0 otherwise
+ * \note if packet was not accepted, it's still the responsibility
+ * of the caller.
+ */
+int TmThreadsInjectPacketsById(Packet **packets, int id)
+{
+ if (id < 0 || id >= (int)thread_store.threads_size)
+ return 0;
+
+ Thread *t = &thread_store.threads[id];
+ ThreadVars *tv = t->tv;
+
+ if (tv == NULL || tv->stream_pq == NULL)
+ return 0;
+
+ SCMutexLock(&tv->stream_pq->mutex_q);
+ while (*packets != NULL) {
+ PacketEnqueue(tv->stream_pq, *packets);
+ packets++;
+ }
+ SCMutexUnlock(&tv->stream_pq->mutex_q);
+
+ /* wake up listening thread(s) if necessary */
+ if (tv->inq != NULL) {
+ SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ }
+ return 1;
+}
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);
+int TmThreadsInjectPacketsById(Packet **, int id);
#endif /* __TM_THREADS_H__ */