]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flowworker/stream: use no-lock packet queue
authorVictor Julien <victor@inliniac.net>
Wed, 13 Nov 2019 08:43:11 +0000 (09:43 +0100)
committerVictor Julien <victor@inliniac.net>
Fri, 7 Feb 2020 14:43:10 +0000 (15:43 +0100)
Use smaller structure for temporary packet queues.

src/app-layer.c
src/flow-worker.c
src/stream-tcp-reassemble.c
src/stream-tcp-reassemble.h
src/stream-tcp.c
src/stream-tcp.h

index 9924b8def0f7cbf9c25b1de293c195bce3530bde..b614f2712bc63c116a15f4f54df5b7d63251b314 100644 (file)
@@ -973,8 +973,8 @@ void AppLayerDeSetupCounters()
     ThreadVars tv;\
     StreamTcpThread *stt = NULL;\
     TCPHdr tcph;\
-    PacketQueue pq;\
-    memset(&pq,0,sizeof(PacketQueue));\
+    PacketQueueNoLock pq;\
+    memset(&pq,0,sizeof(PacketQueueNoLock));\
     memset(p, 0, SIZE_OF_PACKET);\
     memset (&f, 0, sizeof(Flow));\
     memset(&tv, 0, sizeof (ThreadVars));\
index d857532507d27d726518bcbfcd8de5f6d225e609..74e81c57de37b8d3d498c84b6036666b617c49bc 100644 (file)
@@ -65,7 +65,7 @@ typedef struct FlowWorkerThreadData_ {
     uint16_t both_bypass_pkts;
     uint16_t both_bypass_bytes;
 
-    PacketQueue pq;
+    PacketQueueNoLock pq;
 
 } FlowWorkerThreadData;
 
@@ -142,8 +142,7 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void *
     AppLayerRegisterThreadCounters(tv);
 
     /* setup pq for stream end pkts */
-    memset(&fw->pq, 0, sizeof(PacketQueue));
-    SCMutexInit(&fw->pq.mutex_q, NULL);
+    memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
 
     *data = fw;
     return TM_ECODE_OK;
@@ -170,7 +169,6 @@ static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
 
     /* free pq */
     BUG_ON(fw->pq.len);
-    SCMutexDestroy(&fw->pq.mutex_q);
 
     SC_ATOMIC_DESTROY(fw->detect_thread);
     SCFree(fw);
@@ -250,7 +248,7 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
         /* Packets here can safely access p->flow as it's locked */
         SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
         Packet *x;
-        while ((x = PacketDequeue(&fw->pq))) {
+        while ((x = PacketDequeueNoLock(&fw->pq))) {
             SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
 
             // TODO do we need to call StreamTcp on these pseudo packets or not?
index e48659a225ae09555131314260b472b90bc49cac..45059f78aa8bc282d25d2ae4609dd9ec1a63c296 100644 (file)
@@ -1724,7 +1724,7 @@ static int StreamTcpReassembleHandleSegmentUpdateACK (ThreadVars *tv,
 
 int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
                                      TcpSession *ssn, TcpStream *stream,
-                                     Packet *p, PacketQueue *pq)
+                                     Packet *p, PacketQueueNoLock *pq)
 {
     SCEnter();
 
@@ -2057,8 +2057,8 @@ static int StreamTcpReassembleTest33(void)
     StreamTcpUTInit(&ra_ctx);
     StreamTcpUTSetupSession(&ssn);
 
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&f, 0, sizeof (Flow));
     memset(&tcph, 0, sizeof (TCPHdr));
     ThreadVars tv;
@@ -2119,8 +2119,8 @@ static int StreamTcpReassembleTest34(void)
 
     StreamTcpUTInit(&ra_ctx);
     StreamTcpUTSetupSession(&ssn);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&f, 0, sizeof (Flow));
     memset(&tcph, 0, sizeof (TCPHdr));
     ThreadVars tv;
@@ -2177,7 +2177,7 @@ static int StreamTcpReassembleTest37(void)
     TCPHdr tcph;
     TcpReassemblyThreadCtx *ra_ctx = NULL;
     uint8_t packet[1460] = "";
-    PacketQueue pq;
+    PacketQueueNoLock pq;
     ThreadVars tv;
     memset(&tv, 0, sizeof (ThreadVars));
 
@@ -2186,7 +2186,7 @@ static int StreamTcpReassembleTest37(void)
 
     StreamTcpUTInit(&ra_ctx);
     StreamTcpUTSetupSession(&ssn);
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&f, 0, sizeof (Flow));
     memset(&tcph, 0, sizeof (TCPHdr));
     memset(&tv, 0, sizeof (ThreadVars));
@@ -2250,8 +2250,8 @@ static int StreamTcpReassembleTest39 (void)
     ThreadVars tv;
     StreamTcpThread stt;
     TCPHdr tcph;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (stt));
@@ -2733,8 +2733,8 @@ static int StreamTcpReassembleTest40 (void)
     Flow *f = NULL;
     TCPHdr tcph;
     TcpSession ssn;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&tcph, 0, sizeof (TCPHdr));
     ThreadVars tv;
     memset(&tv, 0, sizeof (ThreadVars));
@@ -2982,8 +2982,8 @@ static int StreamTcpReassembleTest47 (void)
     TCPHdr tcph;
     TcpSession ssn;
     ThreadVars tv;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&tcph, 0, sizeof (TCPHdr));
     memset(&tv, 0, sizeof (ThreadVars));
     StreamTcpInitConfig(TRUE);
index 886318f2bd99a44941027cb91e03e87ec4ae8ba3..82423a75fdc4a28793595b7f31a1254d08b8b029 100644 (file)
@@ -82,7 +82,7 @@ typedef struct TcpReassemblyThreadCtx_ {
 #define OS_POLICY_DEFAULT   OS_POLICY_BSD
 
 void StreamTcpReassembleInitMemuse(void);
-int StreamTcpReassembleHandleSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *, PacketQueue *);
+int StreamTcpReassembleHandleSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *, PacketQueueNoLock *);
 int StreamTcpReassembleInit(char);
 void StreamTcpReassembleFree(char);
 void StreamTcpReassembleRegisterTests(void);
index 6a771cfb6bd89ed096050fcb3bbf60a0d11d42ad..1ce7ffe6bd639ba7fca7f908a9d4098c8346bb3d 100644 (file)
@@ -94,7 +94,7 @@
 #define STREAMTCP_EMERG_EST_TIMEOUT             300
 #define STREAMTCP_EMERG_CLOSED_TIMEOUT          20
 
-static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *, TcpSession *, Packet *, PacketQueue *);
+static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *, TcpSession *, Packet *, PacketQueueNoLock *);
 void StreamTcpReturnStreamSegments (TcpStream *);
 void StreamTcpInitConfig(char);
 int StreamTcpGetFlowState(void *);
@@ -105,7 +105,7 @@ static int StreamTcpHandleTimestamp(TcpSession * , Packet *);
 static int StreamTcpValidateRst(TcpSession * , Packet *);
 static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *);
 static int StreamTcpStateDispatch(ThreadVars *tv, Packet *p,
-        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq,
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq,
         uint8_t state);
 
 extern int g_detect_disabled;
@@ -899,7 +899,8 @@ static int StreamTcpPacketIsRetransmission(TcpStream *stream, Packet *p)
  *  \retval -1 error
  */
 static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn,
+        PacketQueueNoLock *pq)
 {
     if (p->tcph->th_flags & TH_RST) {
         StreamTcpSetEvent(p, STREAM_RST_BUT_NO_SESSION);
@@ -1407,7 +1408,8 @@ static inline bool StateSynSentValidateTimestamp(TcpSession *ssn, Packet *p)
  */
 
 static int StreamTcpPacketStateSynSent(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn,
+        PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -1732,7 +1734,8 @@ static int StreamTcpPacketStateSynSent(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn,
+        PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -2136,7 +2139,7 @@ static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p,
  *  \param  stt     Strean Thread module registered to handle the stream handling
  */
 static int HandleEstablishedPacketToServer(ThreadVars *tv, TcpSession *ssn, Packet *p,
-                        StreamTcpThread *stt, PacketQueue *pq)
+                        StreamTcpThread *stt, PacketQueueNoLock *pq)
 {
     SCLogDebug("ssn %p: =+ pkt (%" PRIu32 ") is to server: SEQ %" PRIu32 ","
                "ACK %" PRIu32 ", WIN %"PRIu16"", ssn, p->payload_len,
@@ -2322,7 +2325,7 @@ static int HandleEstablishedPacketToServer(ThreadVars *tv, TcpSession *ssn, Pack
  *  \param  stt     Strean Thread module registered to handle the stream handling
  */
 static int HandleEstablishedPacketToClient(ThreadVars *tv, TcpSession *ssn, Packet *p,
-                        StreamTcpThread *stt, PacketQueue *pq)
+                        StreamTcpThread *stt, PacketQueueNoLock *pq)
 {
     SCLogDebug("ssn %p: =+ pkt (%" PRIu32 ") is to client: SEQ %" PRIu32 ","
                " ACK %" PRIu32 ", WIN %"PRIu16"", ssn, p->payload_len,
@@ -2494,7 +2497,7 @@ static inline uint32_t StreamTcpResetGetMaxAck(TcpStream *stream, uint32_t seq)
  */
 
 static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -2705,7 +2708,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *stt,
-                                TcpSession *ssn, Packet *p, PacketQueue *pq)
+                                TcpSession *ssn, Packet *p, PacketQueueNoLock *pq)
 {
     if (PKT_IS_TOSERVER(p)) {
         SCLogDebug("ssn %p: pkt (%" PRIu32 ") is to server: SEQ %" PRIu32 ","
@@ -2820,7 +2823,7 @@ static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *stt,
  */
 
 static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -3259,7 +3262,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -3558,7 +3561,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -3724,7 +3727,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     SCEnter();
 
@@ -4027,7 +4030,7 @@ static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpPacketStateLastAck(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -4154,7 +4157,7 @@ static int StreamTcpPacketStateLastAck(ThreadVars *tv, Packet *p,
  */
 
 static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -4317,7 +4320,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p,
 }
 
 static int StreamTcpPacketStateClosed(ThreadVars *tv, Packet *p,
-                        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq)
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq)
 {
     if (ssn == NULL)
         return -1;
@@ -4667,7 +4670,7 @@ static int StreamTcpPacketIsBadWindowUpdate(TcpSession *ssn, Packet *p)
  *  \param state current TCP state
  */
 static inline int StreamTcpStateDispatch(ThreadVars *tv, Packet *p,
-        StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq,
+        StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq,
         const uint8_t state)
 {
     SCLogDebug("ssn: %p", ssn);
@@ -4760,7 +4763,7 @@ static inline void HandleThreadId(ThreadVars *tv, Packet *p, StreamTcpThread *st
 
 /* flow is and stays locked */
 int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
-                     PacketQueue *pq)
+                     PacketQueueNoLock *pq)
 {
     SCEnter();
 
@@ -4891,7 +4894,7 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
     if (ssn != NULL) {
         while (stt->pseudo_queue.len > 0) {
             SCLogDebug("processing pseudo packet / stream end");
-            Packet *np = PacketDequeue(&stt->pseudo_queue);
+            Packet *np = PacketDequeueNoLock(&stt->pseudo_queue);
             if (np != NULL) {
                 /* process the opposing direction of the original packet */
                 if (PKT_IS_TOSERVER(np)) {
@@ -4905,7 +4908,7 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
                 }
 
                 /* enqueue this packet so we inspect it in detect etc */
-                PacketEnqueue(pq, np);
+                PacketEnqueueNoLock(pq, np);
             }
             SCLogDebug("processing pseudo packet / stream end done");
         }
@@ -4961,9 +4964,9 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
 error:
     /* make sure we don't leave packets in our pseudo queue */
     while (stt->pseudo_queue.len > 0) {
-        Packet *np = PacketDequeue(&stt->pseudo_queue);
+        Packet *np = PacketDequeueNoLock(&stt->pseudo_queue);
         if (np != NULL) {
-            PacketEnqueue(pq, np);
+            PacketEnqueueNoLock(pq, np);
         }
     }
 
@@ -5177,7 +5180,7 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn
     return 0;
 }
 
-TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
+TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueueNoLock *pq)
 {
     StreamTcpThread *stt = (StreamTcpThread *)data;
 
@@ -5975,7 +5978,7 @@ Packet *StreamTcpPseudoSetup(Packet *parent, uint8_t *pkt, uint32_t len)
  */
 static void StreamTcpPseudoPacketCreateDetectLogFlush(ThreadVars *tv,
         StreamTcpThread *stt, Packet *parent,
-        TcpSession *ssn, PacketQueue *pq, int dir)
+        TcpSession *ssn, PacketQueueNoLock *pq, int dir)
 {
     SCEnter();
     Flow *f = parent->flow;
@@ -6146,7 +6149,7 @@ static void StreamTcpPseudoPacketCreateDetectLogFlush(ThreadVars *tv,
     memcpy(&np->ts, &parent->ts, sizeof(struct timeval));
 
     SCLogDebug("np %p", np);
-    PacketEnqueue(pq, np);
+    PacketEnqueueNoLock(pq, np);
 
     StatsIncr(tv, stt->counter_tcp_pseudo);
     SCReturn;
@@ -6162,7 +6165,8 @@ error:
  *         Flag TCP engine that data needs to be inspected regardless
  *         of how far we are wrt inspect limits.
  */
-void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq)
+void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p,
+        PacketQueueNoLock *pq)
 {
     TcpSession *ssn = f->protoctx;
     ssn->client.flags |= STREAMTCP_STREAM_FLAG_TRIGGER_RAW;
@@ -6325,8 +6329,8 @@ static int StreamTcpTest02 (void)
     StreamTcpThread stt;
     uint8_t payload[4];
     TCPHdr tcph;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(pq));
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
@@ -6412,8 +6416,8 @@ static int StreamTcpTest03 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(pq));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -6488,8 +6492,8 @@ static int StreamTcpTest04 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(pq));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -6558,8 +6562,8 @@ static int StreamTcpTest05 (void)
     TCPHdr tcph;
     uint8_t payload[4];
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -6658,8 +6662,8 @@ static int StreamTcpTest06 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&ssn, 0, sizeof (TcpSession));
     memset(&tv, 0, sizeof (ThreadVars));
@@ -6719,10 +6723,10 @@ static int StreamTcpTest07 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     uint8_t payload[1] = {0x42};
-    PacketQueue pq;
+    PacketQueueNoLock pq;
 
     memset(p, 0, SIZE_OF_PACKET);
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof(StreamTcpThread));
@@ -6783,8 +6787,8 @@ static int StreamTcpTest08 (void)
     uint8_t payload[1] = {0x42};
 
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof(StreamTcpThread));
@@ -6846,8 +6850,8 @@ static int StreamTcpTest09 (void)
     uint8_t payload[1] = {0x42};
 
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof(StreamTcpThread));
@@ -6916,8 +6920,8 @@ static int StreamTcpTest10 (void)
     TCPHdr tcph;
     uint8_t payload[4];
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -6995,8 +6999,8 @@ static int StreamTcpTest11 (void)
     TCPHdr tcph;
     uint8_t payload[4];
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -7076,8 +7080,8 @@ static int StreamTcpTest12 (void)
     TCPHdr tcph;
     uint8_t payload[4];
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -7172,8 +7176,8 @@ static int StreamTcpTest13 (void)
     TCPHdr tcph;
     uint8_t payload[4];
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -7375,8 +7379,8 @@ static int StreamTcpTest14 (void)
     IPV4Hdr ipv4h;
     char os_policy_name[10] = "windows";
     const char *ip_addr;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -7541,8 +7545,8 @@ static int StreamTcp4WHSTest01 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -7623,8 +7627,8 @@ static int StreamTcp4WHSTest02 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -7693,8 +7697,8 @@ static int StreamTcp4WHSTest03 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     memset(p, 0, SIZE_OF_PACKET);
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -7777,8 +7781,8 @@ static int StreamTcpTest15 (void)
     IPV4Hdr ipv4h;
     char os_policy_name[10] = "windows";
     const char *ip_addr;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -7944,8 +7948,8 @@ static int StreamTcpTest16 (void)
     IPV4Hdr ipv4h;
     char os_policy_name[10] = "windows";
     const char *ip_addr;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -8114,8 +8118,8 @@ static int StreamTcpTest17 (void)
     IPV4Hdr ipv4h;
     char os_policy_name[10] = "windows";
     const char *ip_addr;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -8529,12 +8533,12 @@ static int StreamTcpTest23(void)
     TCPHdr tcph;
     uint8_t packet[1460] = "";
     ThreadVars tv;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
 
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     FAIL_IF(p == NULL);
 
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(p, 0, SIZE_OF_PACKET);
     memset(&f, 0, sizeof (Flow));
     memset(&tcph, 0, sizeof (TCPHdr));
@@ -8598,8 +8602,8 @@ static int StreamTcpTest24(void)
     uint8_t packet[1460] = "";
     ThreadVars tv;
     memset(&tv, 0, sizeof (ThreadVars));
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     StreamTcpUTInit(&stt.ra_ctx);
     StreamTcpUTSetupSession(&ssn);
@@ -8668,8 +8672,8 @@ static int StreamTcpTest25(void)
     uint8_t payload[4];
     TCPHdr tcph;
     int ret = 0;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -8761,8 +8765,8 @@ static int StreamTcpTest26(void)
     uint8_t payload[4];
     TCPHdr tcph;
     int ret = 0;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -8855,8 +8859,8 @@ static int StreamTcpTest27(void)
     uint8_t payload[4];
     TCPHdr tcph;
     int ret = 0;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9412,8 +9416,8 @@ static int StreamTcpTest32(void)
     TCPHdr tcph;
     TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(NULL);
     int ret = 0;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset (&p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9502,8 +9506,8 @@ static int StreamTcpTest33 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     TcpReassemblyThreadCtx ra_ctx;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
     memset (&p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9603,8 +9607,8 @@ static int StreamTcpTest34 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     TcpReassemblyThreadCtx ra_ctx;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
     memset (&p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9668,8 +9672,8 @@ static int StreamTcpTest35 (void)
     StreamTcpThread stt;
     TCPHdr tcph;
     TcpReassemblyThreadCtx ra_ctx;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
     memset (&p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9733,8 +9737,8 @@ static int StreamTcpTest36(void)
     TCPHdr tcph;
     TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(NULL);
     int ret = 0;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset (&p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9826,8 +9830,8 @@ static int StreamTcpTest37(void)
     uint8_t payload[4];
     TCPHdr tcph;
     int ret = 0;
-    PacketQueue pq;
-    memset(&pq,0,sizeof(PacketQueue));
+    PacketQueueNoLock pq;
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     memset(p, 0, SIZE_OF_PACKET);
     memset (&f, 0, sizeof(Flow));
@@ -9942,13 +9946,13 @@ static int StreamTcpTest38 (void)
     StreamTcpThread stt;
     uint8_t payload[128];
     TCPHdr tcph;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
 
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
     memset(&tcph, 0, sizeof (TCPHdr));
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     if (unlikely(p == NULL))
@@ -10097,13 +10101,13 @@ static int StreamTcpTest39 (void)
     StreamTcpThread stt;
     uint8_t payload[4];
     TCPHdr tcph;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
 
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
     memset(&tcph, 0, sizeof (TCPHdr));
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
 
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     if (unlikely(p == NULL))
@@ -10226,7 +10230,7 @@ static int StreamTcpTest42 (void)
     ThreadVars tv;
     StreamTcpThread stt;
     TCPHdr tcph;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     TcpSession *ssn;
 
@@ -10234,7 +10238,7 @@ static int StreamTcpTest42 (void)
         return 0;
     memset(p, 0, SIZE_OF_PACKET);
 
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -10318,7 +10322,7 @@ static int StreamTcpTest43 (void)
     ThreadVars tv;
     StreamTcpThread stt;
     TCPHdr tcph;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     TcpSession *ssn;
 
@@ -10326,7 +10330,7 @@ static int StreamTcpTest43 (void)
         return 0;
     memset(p, 0, SIZE_OF_PACKET);
 
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -10410,7 +10414,7 @@ static int StreamTcpTest44 (void)
     ThreadVars tv;
     StreamTcpThread stt;
     TCPHdr tcph;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     TcpSession *ssn;
 
@@ -10418,7 +10422,7 @@ static int StreamTcpTest44 (void)
         return 0;
     memset(p, 0, SIZE_OF_PACKET);
 
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
@@ -10497,7 +10501,7 @@ static int StreamTcpTest45 (void)
     ThreadVars tv;
     StreamTcpThread stt;
     TCPHdr tcph;
-    PacketQueue pq;
+    PacketQueueNoLock pq;
     Packet *p = SCMalloc(SIZE_OF_PACKET);
     TcpSession *ssn;
 
@@ -10505,7 +10509,7 @@ static int StreamTcpTest45 (void)
         return 0;
     memset(p, 0, SIZE_OF_PACKET);
 
-    memset(&pq,0,sizeof(PacketQueue));
+    memset(&pq,0,sizeof(PacketQueueNoLock));
     memset (&f, 0, sizeof(Flow));
     memset(&tv, 0, sizeof (ThreadVars));
     memset(&stt, 0, sizeof (StreamTcpThread));
index 4e5f55c8cc1541aa241b619cc4b0bd92fc03c488..c1566bf4073106ccbbf30cc6ef27c1720b6febf8 100644 (file)
@@ -73,7 +73,7 @@ typedef struct StreamTcpThread_ {
     /** queue for pseudo packet(s) that were created in the stream
      *  process and need further handling. Currently only used when
      *  receiving (valid) RST packets */
-    PacketQueue pseudo_queue;
+    PacketQueueNoLock pseudo_queue;
 
     uint16_t counter_tcp_sessions;
     /** sessions not picked up because memcap was reached */
@@ -138,7 +138,7 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p,
         uint64_t *progress_out, bool respect_inspect_depth);
 void StreamReassembleRawUpdateProgress(TcpSession *ssn, Packet *p, uint64_t progress);
 
-void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq);
+void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueueNoLock *pq);
 
 
 /** ------- Inline functions: ------ */
@@ -172,14 +172,14 @@ enum {
     STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION = 1,
 };
 
-TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *);
+TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *);
 int StreamNeedsReassembly(const TcpSession *ssn, uint8_t direction);
 TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **);
 TmEcode StreamTcpThreadDeinit(ThreadVars *tv, void *data);
 void StreamTcpRegisterTests (void);
 
 int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
-                     PacketQueue *pq);
+                     PacketQueueNoLock *pq);
 /* clear ssn and return to pool */
 void StreamTcpSessionClear(void *ssnptr);
 /* cleanup ssn, but don't free ssn */