From: Victor Julien Date: Wed, 13 Nov 2019 08:43:11 +0000 (+0100) Subject: flowworker/stream: use no-lock packet queue X-Git-Tag: suricata-6.0.0-beta1~784 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e3fbdf1948ad2ea243abfe1dd8817f2eb578bb2f;p=thirdparty%2Fsuricata.git flowworker/stream: use no-lock packet queue Use smaller structure for temporary packet queues. --- diff --git a/src/app-layer.c b/src/app-layer.c index 9924b8def0..b614f2712b 100644 --- a/src/app-layer.c +++ b/src/app-layer.c @@ -973,8 +973,8 @@ void AppLayerDeSetupCounters() ThreadVars tv;\ StreamTcpThread *stt = NULL;\ TCPHdr tcph;\ - PacketQueue pq;\ - memset(&pq,0,sizeof(PacketQueue));\ + PacketQueueNoLock pq;\ + memset(&pq,0,sizeof(PacketQueueNoLock));\ memset(p, 0, SIZE_OF_PACKET);\ memset (&f, 0, sizeof(Flow));\ memset(&tv, 0, sizeof (ThreadVars));\ diff --git a/src/flow-worker.c b/src/flow-worker.c index d857532507..74e81c57de 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -65,7 +65,7 @@ typedef struct FlowWorkerThreadData_ { uint16_t both_bypass_pkts; uint16_t both_bypass_bytes; - PacketQueue pq; + PacketQueueNoLock pq; } FlowWorkerThreadData; @@ -142,8 +142,7 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void * AppLayerRegisterThreadCounters(tv); /* setup pq for stream end pkts */ - memset(&fw->pq, 0, sizeof(PacketQueue)); - SCMutexInit(&fw->pq.mutex_q, NULL); + memset(&fw->pq, 0, sizeof(PacketQueueNoLock)); *data = fw; return TM_ECODE_OK; @@ -170,7 +169,6 @@ static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data) /* free pq */ BUG_ON(fw->pq.len); - SCMutexDestroy(&fw->pq.mutex_q); SC_ATOMIC_DESTROY(fw->detect_thread); SCFree(fw); @@ -250,7 +248,7 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) /* Packets here can safely access p->flow as it's locked */ SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len); Packet *x; - while ((x = PacketDequeue(&fw->pq))) { + while ((x = PacketDequeueNoLock(&fw->pq))) { SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x); // TODO do we need to call StreamTcp on these pseudo packets or not? diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index e48659a225..45059f78aa 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -1724,7 +1724,7 @@ static int StreamTcpReassembleHandleSegmentUpdateACK (ThreadVars *tv, int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn, TcpStream *stream, - Packet *p, PacketQueue *pq) + Packet *p, PacketQueueNoLock *pq) { SCEnter(); @@ -2057,8 +2057,8 @@ static int StreamTcpReassembleTest33(void) StreamTcpUTInit(&ra_ctx); StreamTcpUTSetupSession(&ssn); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&f, 0, sizeof (Flow)); memset(&tcph, 0, sizeof (TCPHdr)); ThreadVars tv; @@ -2119,8 +2119,8 @@ static int StreamTcpReassembleTest34(void) StreamTcpUTInit(&ra_ctx); StreamTcpUTSetupSession(&ssn); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&f, 0, sizeof (Flow)); memset(&tcph, 0, sizeof (TCPHdr)); ThreadVars tv; @@ -2177,7 +2177,7 @@ static int StreamTcpReassembleTest37(void) TCPHdr tcph; TcpReassemblyThreadCtx *ra_ctx = NULL; uint8_t packet[1460] = ""; - PacketQueue pq; + PacketQueueNoLock pq; ThreadVars tv; memset(&tv, 0, sizeof (ThreadVars)); @@ -2186,7 +2186,7 @@ static int StreamTcpReassembleTest37(void) StreamTcpUTInit(&ra_ctx); StreamTcpUTSetupSession(&ssn); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&f, 0, sizeof (Flow)); memset(&tcph, 0, sizeof (TCPHdr)); memset(&tv, 0, sizeof (ThreadVars)); @@ -2250,8 +2250,8 @@ static int StreamTcpReassembleTest39 (void) ThreadVars tv; StreamTcpThread stt; TCPHdr tcph; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (stt)); @@ -2733,8 +2733,8 @@ static int StreamTcpReassembleTest40 (void) Flow *f = NULL; TCPHdr tcph; TcpSession ssn; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&tcph, 0, sizeof (TCPHdr)); ThreadVars tv; memset(&tv, 0, sizeof (ThreadVars)); @@ -2982,8 +2982,8 @@ static int StreamTcpReassembleTest47 (void) TCPHdr tcph; TcpSession ssn; ThreadVars tv; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&tcph, 0, sizeof (TCPHdr)); memset(&tv, 0, sizeof (ThreadVars)); StreamTcpInitConfig(TRUE); diff --git a/src/stream-tcp-reassemble.h b/src/stream-tcp-reassemble.h index 886318f2bd..82423a75fd 100644 --- a/src/stream-tcp-reassemble.h +++ b/src/stream-tcp-reassemble.h @@ -82,7 +82,7 @@ typedef struct TcpReassemblyThreadCtx_ { #define OS_POLICY_DEFAULT OS_POLICY_BSD void StreamTcpReassembleInitMemuse(void); -int StreamTcpReassembleHandleSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *, PacketQueue *); +int StreamTcpReassembleHandleSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpSession *, TcpStream *, Packet *, PacketQueueNoLock *); int StreamTcpReassembleInit(char); void StreamTcpReassembleFree(char); void StreamTcpReassembleRegisterTests(void); diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 6a771cfb6b..1ce7ffe6bd 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -94,7 +94,7 @@ #define STREAMTCP_EMERG_EST_TIMEOUT 300 #define STREAMTCP_EMERG_CLOSED_TIMEOUT 20 -static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *, TcpSession *, Packet *, PacketQueue *); +static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *, TcpSession *, Packet *, PacketQueueNoLock *); void StreamTcpReturnStreamSegments (TcpStream *); void StreamTcpInitConfig(char); int StreamTcpGetFlowState(void *); @@ -105,7 +105,7 @@ static int StreamTcpHandleTimestamp(TcpSession * , Packet *); static int StreamTcpValidateRst(TcpSession * , Packet *); static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *); static int StreamTcpStateDispatch(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq, + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq, uint8_t state); extern int g_detect_disabled; @@ -899,7 +899,8 @@ static int StreamTcpPacketIsRetransmission(TcpStream *stream, Packet *p) * \retval -1 error */ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, + PacketQueueNoLock *pq) { if (p->tcph->th_flags & TH_RST) { StreamTcpSetEvent(p, STREAM_RST_BUT_NO_SESSION); @@ -1407,7 +1408,8 @@ static inline bool StateSynSentValidateTimestamp(TcpSession *ssn, Packet *p) */ static int StreamTcpPacketStateSynSent(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, + PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -1732,7 +1734,8 @@ static int StreamTcpPacketStateSynSent(ThreadVars *tv, Packet *p, */ static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, + PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -2136,7 +2139,7 @@ static int StreamTcpPacketStateSynRecv(ThreadVars *tv, Packet *p, * \param stt Strean Thread module registered to handle the stream handling */ static int HandleEstablishedPacketToServer(ThreadVars *tv, TcpSession *ssn, Packet *p, - StreamTcpThread *stt, PacketQueue *pq) + StreamTcpThread *stt, PacketQueueNoLock *pq) { SCLogDebug("ssn %p: =+ pkt (%" PRIu32 ") is to server: SEQ %" PRIu32 "," "ACK %" PRIu32 ", WIN %"PRIu16"", ssn, p->payload_len, @@ -2322,7 +2325,7 @@ static int HandleEstablishedPacketToServer(ThreadVars *tv, TcpSession *ssn, Pack * \param stt Strean Thread module registered to handle the stream handling */ static int HandleEstablishedPacketToClient(ThreadVars *tv, TcpSession *ssn, Packet *p, - StreamTcpThread *stt, PacketQueue *pq) + StreamTcpThread *stt, PacketQueueNoLock *pq) { SCLogDebug("ssn %p: =+ pkt (%" PRIu32 ") is to client: SEQ %" PRIu32 "," " ACK %" PRIu32 ", WIN %"PRIu16"", ssn, p->payload_len, @@ -2494,7 +2497,7 @@ static inline uint32_t StreamTcpResetGetMaxAck(TcpStream *stream, uint32_t seq) */ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -2705,7 +2708,7 @@ static int StreamTcpPacketStateEstablished(ThreadVars *tv, Packet *p, */ static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *stt, - TcpSession *ssn, Packet *p, PacketQueue *pq) + TcpSession *ssn, Packet *p, PacketQueueNoLock *pq) { if (PKT_IS_TOSERVER(p)) { SCLogDebug("ssn %p: pkt (%" PRIu32 ") is to server: SEQ %" PRIu32 "," @@ -2820,7 +2823,7 @@ static int StreamTcpHandleFin(ThreadVars *tv, StreamTcpThread *stt, */ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -3259,7 +3262,7 @@ static int StreamTcpPacketStateFinWait1(ThreadVars *tv, Packet *p, */ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -3558,7 +3561,7 @@ static int StreamTcpPacketStateFinWait2(ThreadVars *tv, Packet *p, */ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -3724,7 +3727,7 @@ static int StreamTcpPacketStateClosing(ThreadVars *tv, Packet *p, */ static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { SCEnter(); @@ -4027,7 +4030,7 @@ static int StreamTcpPacketStateCloseWait(ThreadVars *tv, Packet *p, */ static int StreamTcpPacketStateLastAck(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -4154,7 +4157,7 @@ static int StreamTcpPacketStateLastAck(ThreadVars *tv, Packet *p, */ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -4317,7 +4320,7 @@ static int StreamTcpPacketStateTimeWait(ThreadVars *tv, Packet *p, } static int StreamTcpPacketStateClosed(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq) + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq) { if (ssn == NULL) return -1; @@ -4667,7 +4670,7 @@ static int StreamTcpPacketIsBadWindowUpdate(TcpSession *ssn, Packet *p) * \param state current TCP state */ static inline int StreamTcpStateDispatch(ThreadVars *tv, Packet *p, - StreamTcpThread *stt, TcpSession *ssn, PacketQueue *pq, + StreamTcpThread *stt, TcpSession *ssn, PacketQueueNoLock *pq, const uint8_t state) { SCLogDebug("ssn: %p", ssn); @@ -4760,7 +4763,7 @@ static inline void HandleThreadId(ThreadVars *tv, Packet *p, StreamTcpThread *st /* flow is and stays locked */ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt, - PacketQueue *pq) + PacketQueueNoLock *pq) { SCEnter(); @@ -4891,7 +4894,7 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt, if (ssn != NULL) { while (stt->pseudo_queue.len > 0) { SCLogDebug("processing pseudo packet / stream end"); - Packet *np = PacketDequeue(&stt->pseudo_queue); + Packet *np = PacketDequeueNoLock(&stt->pseudo_queue); if (np != NULL) { /* process the opposing direction of the original packet */ if (PKT_IS_TOSERVER(np)) { @@ -4905,7 +4908,7 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt, } /* enqueue this packet so we inspect it in detect etc */ - PacketEnqueue(pq, np); + PacketEnqueueNoLock(pq, np); } SCLogDebug("processing pseudo packet / stream end done"); } @@ -4961,9 +4964,9 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt, error: /* make sure we don't leave packets in our pseudo queue */ while (stt->pseudo_queue.len > 0) { - Packet *np = PacketDequeue(&stt->pseudo_queue); + Packet *np = PacketDequeueNoLock(&stt->pseudo_queue); if (np != NULL) { - PacketEnqueue(pq, np); + PacketEnqueueNoLock(pq, np); } } @@ -5177,7 +5180,7 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn return 0; } -TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueueNoLock *pq) { StreamTcpThread *stt = (StreamTcpThread *)data; @@ -5975,7 +5978,7 @@ Packet *StreamTcpPseudoSetup(Packet *parent, uint8_t *pkt, uint32_t len) */ static void StreamTcpPseudoPacketCreateDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Packet *parent, - TcpSession *ssn, PacketQueue *pq, int dir) + TcpSession *ssn, PacketQueueNoLock *pq, int dir) { SCEnter(); Flow *f = parent->flow; @@ -6146,7 +6149,7 @@ static void StreamTcpPseudoPacketCreateDetectLogFlush(ThreadVars *tv, memcpy(&np->ts, &parent->ts, sizeof(struct timeval)); SCLogDebug("np %p", np); - PacketEnqueue(pq, np); + PacketEnqueueNoLock(pq, np); StatsIncr(tv, stt->counter_tcp_pseudo); SCReturn; @@ -6162,7 +6165,8 @@ error: * Flag TCP engine that data needs to be inspected regardless * of how far we are wrt inspect limits. */ -void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq) +void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, + PacketQueueNoLock *pq) { TcpSession *ssn = f->protoctx; ssn->client.flags |= STREAMTCP_STREAM_FLAG_TRIGGER_RAW; @@ -6325,8 +6329,8 @@ static int StreamTcpTest02 (void) StreamTcpThread stt; uint8_t payload[4]; TCPHdr tcph; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(pq)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); @@ -6412,8 +6416,8 @@ static int StreamTcpTest03 (void) StreamTcpThread stt; TCPHdr tcph; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(pq)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -6488,8 +6492,8 @@ static int StreamTcpTest04 (void) StreamTcpThread stt; TCPHdr tcph; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(pq)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -6558,8 +6562,8 @@ static int StreamTcpTest05 (void) TCPHdr tcph; uint8_t payload[4]; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -6658,8 +6662,8 @@ static int StreamTcpTest06 (void) StreamTcpThread stt; TCPHdr tcph; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&ssn, 0, sizeof (TcpSession)); memset(&tv, 0, sizeof (ThreadVars)); @@ -6719,10 +6723,10 @@ static int StreamTcpTest07 (void) StreamTcpThread stt; TCPHdr tcph; uint8_t payload[1] = {0x42}; - PacketQueue pq; + PacketQueueNoLock pq; memset(p, 0, SIZE_OF_PACKET); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof(StreamTcpThread)); @@ -6783,8 +6787,8 @@ static int StreamTcpTest08 (void) uint8_t payload[1] = {0x42}; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof(StreamTcpThread)); @@ -6846,8 +6850,8 @@ static int StreamTcpTest09 (void) uint8_t payload[1] = {0x42}; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof(StreamTcpThread)); @@ -6916,8 +6920,8 @@ static int StreamTcpTest10 (void) TCPHdr tcph; uint8_t payload[4]; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -6995,8 +6999,8 @@ static int StreamTcpTest11 (void) TCPHdr tcph; uint8_t payload[4]; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -7076,8 +7080,8 @@ static int StreamTcpTest12 (void) TCPHdr tcph; uint8_t payload[4]; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -7172,8 +7176,8 @@ static int StreamTcpTest13 (void) TCPHdr tcph; uint8_t payload[4]; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -7375,8 +7379,8 @@ static int StreamTcpTest14 (void) IPV4Hdr ipv4h; char os_policy_name[10] = "windows"; const char *ip_addr; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -7541,8 +7545,8 @@ static int StreamTcp4WHSTest01 (void) StreamTcpThread stt; TCPHdr tcph; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -7623,8 +7627,8 @@ static int StreamTcp4WHSTest02 (void) StreamTcpThread stt; TCPHdr tcph; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -7693,8 +7697,8 @@ static int StreamTcp4WHSTest03 (void) StreamTcpThread stt; TCPHdr tcph; memset(p, 0, SIZE_OF_PACKET); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -7777,8 +7781,8 @@ static int StreamTcpTest15 (void) IPV4Hdr ipv4h; char os_policy_name[10] = "windows"; const char *ip_addr; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -7944,8 +7948,8 @@ static int StreamTcpTest16 (void) IPV4Hdr ipv4h; char os_policy_name[10] = "windows"; const char *ip_addr; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -8114,8 +8118,8 @@ static int StreamTcpTest17 (void) IPV4Hdr ipv4h; char os_policy_name[10] = "windows"; const char *ip_addr; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -8529,12 +8533,12 @@ static int StreamTcpTest23(void) TCPHdr tcph; uint8_t packet[1460] = ""; ThreadVars tv; - PacketQueue pq; + PacketQueueNoLock pq; Packet *p = SCMalloc(SIZE_OF_PACKET); FAIL_IF(p == NULL); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset(&f, 0, sizeof (Flow)); memset(&tcph, 0, sizeof (TCPHdr)); @@ -8598,8 +8602,8 @@ static int StreamTcpTest24(void) uint8_t packet[1460] = ""; ThreadVars tv; memset(&tv, 0, sizeof (ThreadVars)); - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); StreamTcpUTInit(&stt.ra_ctx); StreamTcpUTSetupSession(&ssn); @@ -8668,8 +8672,8 @@ static int StreamTcpTest25(void) uint8_t payload[4]; TCPHdr tcph; int ret = 0; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -8761,8 +8765,8 @@ static int StreamTcpTest26(void) uint8_t payload[4]; TCPHdr tcph; int ret = 0; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -8855,8 +8859,8 @@ static int StreamTcpTest27(void) uint8_t payload[4]; TCPHdr tcph; int ret = 0; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9412,8 +9416,8 @@ static int StreamTcpTest32(void) TCPHdr tcph; TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(NULL); int ret = 0; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9502,8 +9506,8 @@ static int StreamTcpTest33 (void) StreamTcpThread stt; TCPHdr tcph; TcpReassemblyThreadCtx ra_ctx; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset (&p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9603,8 +9607,8 @@ static int StreamTcpTest34 (void) StreamTcpThread stt; TCPHdr tcph; TcpReassemblyThreadCtx ra_ctx; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset (&p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9668,8 +9672,8 @@ static int StreamTcpTest35 (void) StreamTcpThread stt; TCPHdr tcph; TcpReassemblyThreadCtx ra_ctx; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx)); memset (&p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9733,8 +9737,8 @@ static int StreamTcpTest36(void) TCPHdr tcph; TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx(NULL); int ret = 0; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9826,8 +9830,8 @@ static int StreamTcpTest37(void) uint8_t payload[4]; TCPHdr tcph; int ret = 0; - PacketQueue pq; - memset(&pq,0,sizeof(PacketQueue)); + PacketQueueNoLock pq; + memset(&pq,0,sizeof(PacketQueueNoLock)); memset(p, 0, SIZE_OF_PACKET); memset (&f, 0, sizeof(Flow)); @@ -9942,13 +9946,13 @@ static int StreamTcpTest38 (void) StreamTcpThread stt; uint8_t payload[128]; TCPHdr tcph; - PacketQueue pq; + PacketQueueNoLock pq; memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); memset(&tcph, 0, sizeof (TCPHdr)); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); Packet *p = SCMalloc(SIZE_OF_PACKET); if (unlikely(p == NULL)) @@ -10097,13 +10101,13 @@ static int StreamTcpTest39 (void) StreamTcpThread stt; uint8_t payload[4]; TCPHdr tcph; - PacketQueue pq; + PacketQueueNoLock pq; memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); memset(&tcph, 0, sizeof (TCPHdr)); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); Packet *p = SCMalloc(SIZE_OF_PACKET); if (unlikely(p == NULL)) @@ -10226,7 +10230,7 @@ static int StreamTcpTest42 (void) ThreadVars tv; StreamTcpThread stt; TCPHdr tcph; - PacketQueue pq; + PacketQueueNoLock pq; Packet *p = SCMalloc(SIZE_OF_PACKET); TcpSession *ssn; @@ -10234,7 +10238,7 @@ static int StreamTcpTest42 (void) return 0; memset(p, 0, SIZE_OF_PACKET); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -10318,7 +10322,7 @@ static int StreamTcpTest43 (void) ThreadVars tv; StreamTcpThread stt; TCPHdr tcph; - PacketQueue pq; + PacketQueueNoLock pq; Packet *p = SCMalloc(SIZE_OF_PACKET); TcpSession *ssn; @@ -10326,7 +10330,7 @@ static int StreamTcpTest43 (void) return 0; memset(p, 0, SIZE_OF_PACKET); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -10410,7 +10414,7 @@ static int StreamTcpTest44 (void) ThreadVars tv; StreamTcpThread stt; TCPHdr tcph; - PacketQueue pq; + PacketQueueNoLock pq; Packet *p = SCMalloc(SIZE_OF_PACKET); TcpSession *ssn; @@ -10418,7 +10422,7 @@ static int StreamTcpTest44 (void) return 0; memset(p, 0, SIZE_OF_PACKET); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); @@ -10497,7 +10501,7 @@ static int StreamTcpTest45 (void) ThreadVars tv; StreamTcpThread stt; TCPHdr tcph; - PacketQueue pq; + PacketQueueNoLock pq; Packet *p = SCMalloc(SIZE_OF_PACKET); TcpSession *ssn; @@ -10505,7 +10509,7 @@ static int StreamTcpTest45 (void) return 0; memset(p, 0, SIZE_OF_PACKET); - memset(&pq,0,sizeof(PacketQueue)); + memset(&pq,0,sizeof(PacketQueueNoLock)); memset (&f, 0, sizeof(Flow)); memset(&tv, 0, sizeof (ThreadVars)); memset(&stt, 0, sizeof (StreamTcpThread)); diff --git a/src/stream-tcp.h b/src/stream-tcp.h index 4e5f55c8cc..c1566bf407 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -73,7 +73,7 @@ typedef struct StreamTcpThread_ { /** queue for pseudo packet(s) that were created in the stream * process and need further handling. Currently only used when * receiving (valid) RST packets */ - PacketQueue pseudo_queue; + PacketQueueNoLock pseudo_queue; uint16_t counter_tcp_sessions; /** sessions not picked up because memcap was reached */ @@ -138,7 +138,7 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p, uint64_t *progress_out, bool respect_inspect_depth); void StreamReassembleRawUpdateProgress(TcpSession *ssn, Packet *p, uint64_t progress); -void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq); +void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueueNoLock *pq); /** ------- Inline functions: ------ */ @@ -172,14 +172,14 @@ enum { STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION = 1, }; -TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *); int StreamNeedsReassembly(const TcpSession *ssn, uint8_t direction); TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **); TmEcode StreamTcpThreadDeinit(ThreadVars *tv, void *data); void StreamTcpRegisterTests (void); int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt, - PacketQueue *pq); + PacketQueueNoLock *pq); /* clear ssn and return to pool */ void StreamTcpSessionClear(void *ssnptr); /* cleanup ssn, but don't free ssn */