]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Give easy access for thread stream packet queue
authorVictor Julien <victor@inliniac.net>
Tue, 25 Nov 2014 14:05:06 +0000 (15:05 +0100)
committerVictor Julien <victor@inliniac.net>
Wed, 17 Dec 2014 13:06:06 +0000 (14:06 +0100)
Access it from ThreadVars. This allows for easy injection of packets
into the stream engine.

src/threadvars.h
src/tm-threads.c

index 0e04af2ca93cfc10daf294b2d5d47b2763c8363e..f8277b7758c5e60606b67ee6b6d000dfdc15286e 100644 (file)
@@ -87,6 +87,9 @@ typedef struct ThreadVars_ {
     void *(*tm_func)(void *);
     struct TmSlot_ *tm_slots;
 
+    /** stream packet queue for flow time out injection */
+    struct PacketQueue_ *stream_pq;
+
     uint8_t thread_setup_flags;
 
     /** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
index f9dd12c07fe5cf932e4b311fb3690e5801340bd2..bb8ab9be9056ed7f227c4720ef9c702c21aed523 100644 (file)
@@ -270,6 +270,16 @@ void *TmThreadsSlotPktAcqLoop(void *td)
         SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
         memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
         SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
+
+        /* get the 'pre qeueue' from module before the stream module */
+        if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) {
+            SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
+            tv->stream_pq = &slot->slot_post_pq;
+        /* if the stream module is the first, get the threads input queue */
+        } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) {
+            tv->stream_pq = &trans_q[tv->inq->id];
+            SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
+        }
     }
 
     tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx);
@@ -314,14 +324,19 @@ void *TmThreadsSlotPktAcqLoop(void *td)
                 goto error;
             }
         }
+
+        BUG_ON(slot->slot_pre_pq.len);
+        BUG_ON(slot->slot_post_pq.len);
     }
 
+    tv->stream_pq = NULL;
     SCLogDebug("%s ending", tv->name);
     TmThreadsSetFlag(tv, THV_CLOSED);
     pthread_exit((void *) 0);
     return NULL;
 
 error:
+    tv->stream_pq = NULL;
     pthread_exit((void *) -1);
     return NULL;
 }
@@ -378,6 +393,19 @@ void *TmThreadsSlotVar(void *td)
         SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
         memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
         SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
+
+        /* special case: we need to access the stream queue
+         * from the flow timeout code */
+
+        /* get the 'pre qeueue' from module before the stream module */
+        if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) {
+            SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
+            tv->stream_pq = &s->slot_pre_pq;
+        /* if the stream module is the first, get the threads input queue */
+        } else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) {
+            tv->stream_pq = &trans_q[tv->inq->id];
+            SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
+        }
     }
 
     tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx);
@@ -465,14 +493,18 @@ void *TmThreadsSlotVar(void *td)
                 goto error;
             }
         }
+        BUG_ON(s->slot_pre_pq.len);
+        BUG_ON(s->slot_post_pq.len);
     }
 
     SCLogDebug("%s ending", tv->name);
+    tv->stream_pq = NULL;
     TmThreadsSetFlag(tv, THV_CLOSED);
     pthread_exit((void *) 0);
     return NULL;
 
 error:
+    tv->stream_pq = NULL;
     pthread_exit((void *) -1);
     return NULL;
 }