]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Introduce Flow timeout injection api
authorVictor Julien <victor@inliniac.net>
Tue, 25 Nov 2014 14:52:38 +0000 (15:52 +0100)
committerVictor Julien <victor@inliniac.net>
Wed, 17 Dec 2014 13:06:08 +0000 (14:06 +0100)
Add function TmThreadsInjectPacketById that is to be used to inject flow
timeout packets into the threads stream_pq queue.

TmThreadsInjectPacketById will also wake up listening threads if
applicable.

Packets are passed all packets together in an NULL terminated array
to reduce locking overhead.

src/flow-timeout.c
src/tm-threads.c
src/tm-threads.h

index 4f62d89d3a4bf0010f9e89dea6968295d67b1951..d62addd9ab3f7b8d85e0834ac6f42a1397950250 100644 (file)
@@ -481,15 +481,20 @@ int FlowForceReassemblyForFlowV2(Flow *f, int server, int client)
         }
     }
 
-    SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
-    PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p1);
-    if (p2 != NULL)
-        PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p2);
-    if (p3 != NULL)
-        PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3);
-    SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
-    if (stream_pseudo_pkt_decode_TV->inq != NULL) {
-        SCCondSignal(&trans_q[stream_pseudo_pkt_decode_TV->inq->id].cond_q);
+    /* inject the packet(s) into the appropriate thread */
+    int thread_id = (int)f->thread_id;
+    Packet *packets[4] = { p1, p2 ? p2 : p3, p2 ? p3 : NULL, NULL }; /**< null terminated array of packets */
+    if (unlikely(!(TmThreadsInjectPacketsById(packets, thread_id)))) {
+        FlowDeReference(&p1->flow);
+        TmqhOutputPacketpool(NULL, p1);
+        if (p2) {
+            FlowDeReference(&p2->flow);
+            TmqhOutputPacketpool(NULL, p2);
+        }
+        if (p3) {
+            FlowDeReference(&p3->flow);
+            TmqhOutputPacketpool(NULL, p3);
+        }
     }
 
     /* done, in case of error (no packet) we still tag flow as complete
index bb8ab9be9056ed7f227c4720ef9c702c21aed523..59754c0ae4513038353b06f7a9219a4f71135011 100644 (file)
@@ -2074,3 +2074,33 @@ void TmThreadsUnregisterThread(const int id)
 end:
     SCMutexUnlock(&thread_store_lock);
 }
+
+/**
+ *  \retval r 1 if packet was accepted, 0 otherwise
+ *  \note if packet was not accepted, it's still the responsibility
+ *        of the caller.
+ */
+int TmThreadsInjectPacketsById(Packet **packets, int id)
+{
+    if (id < 0 || id >= (int)thread_store.threads_size)
+        return 0;
+
+    Thread *t = &thread_store.threads[id];
+    ThreadVars *tv = t->tv;
+
+    if (tv == NULL || tv->stream_pq == NULL)
+        return 0;
+
+    SCMutexLock(&tv->stream_pq->mutex_q);
+    while (*packets != NULL) {
+        PacketEnqueue(tv->stream_pq, *packets);
+        packets++;
+    }
+    SCMutexUnlock(&tv->stream_pq->mutex_q);
+
+    /* wake up listening thread(s) if necessary */
+    if (tv->inq != NULL) {
+        SCCondSignal(&trans_q[tv->inq->id].cond_q);
+    }
+    return 1;
+}
index 0684d28b0260b773f4613c94c01337e747f9ce89..4d864d6ebe7597fafb24312e0b5384cbadfe88bb 100644 (file)
@@ -199,5 +199,6 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
 void TmThreadsListThreads(void);
 int TmThreadsRegisterThread(ThreadVars *tv, const int type);
 void TmThreadsUnregisterThread(const int id);
+int TmThreadsInjectPacketsById(Packet **, int id);
 
 #endif /* __TM_THREADS_H__ */