]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: move flow handling into worker threads
authorVictor Julien <victor@inliniac.net>
Fri, 15 Apr 2016 15:08:50 +0000 (17:08 +0200)
committerVictor Julien <victor@inliniac.net>
Fri, 20 May 2016 06:56:18 +0000 (08:56 +0200)
Instead of handling the packet update during flow lookup, handle
it in the stream/detect threads. This lowers the load of the
capture thread(s) in autofp mode.

The decoders now set a flag in the packet if the packet needs a
flow lookup. Then the workers will take care of this. The decoders
also already calculate the raw flow hash value. This is so that
this value can be used in flow balancing in autofp.

Because the flow lookup/creation is now done in the worker threads,
the flow balancing can no longer use the flow. It's not yet
available. Autofp load balancing uses raw hash values instead.

In the same line, move UDP AppLayer out of the DecodeUDP module,
and also into the stream/detect threads.

Handle TCP session reuse inside the flow engine itself. If a looked up
flow matches the packet, but is a TCP stream starter, check if the
ssn needs to be reused. If that is the case handle it within the
lookup function. Simplies the locking and removes potential race
conditions.

18 files changed:
src/app-layer.c
src/decode-icmpv4.c
src/decode-icmpv6.c
src/decode-ipv4.c
src/decode-sctp.c
src/decode-tcp.c
src/decode-udp.c
src/decode.h
src/flow-hash.c
src/flow.c
src/flow.h
src/runmode-erf-file.c
src/runmode-pcap-file.c
src/stream-tcp.c
src/stream-tcp.h
src/tmqh-flow.c
src/util-runmodes.c
src/util-unittest-helper.c

index b0955769c99594cccb4ae39e75e426cbe69c4a00..db4fa3abed95714206757eb97c231b1f835e4d82 100644 (file)
@@ -461,7 +461,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
  *  If the protocol is yet unknown, the proto detection code is run first.
  *
  *  \param dp_ctx Thread app layer detect context
- *  \param f unlocked flow
+ *  \param f *locked* flow
  *  \param p UDP packet
  *
  *  \retval 0 ok
@@ -473,8 +473,6 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
 
     int r = 0;
 
-    FLOWLOCK_WRLOCK(f);
-
     uint8_t flags = 0;
     if (p->flowflags & FLOW_PKT_TOSERVER) {
         flags |= STREAM_TOSERVER;
@@ -527,7 +525,6 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
         }
     }
 
-    FLOWLOCK_UNLOCK(f);
     PACKET_PROFILING_APP_STORE(tctx, p);
 
     SCReturnInt(r);
index 3e08daa1975336d445ece06ae6f7d19610e0df18..1cfaf780780a53e42d07552ae5fdf61967a2af3e 100644 (file)
@@ -194,7 +194,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt,
                     {
                         /* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */
                         if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
-                            FlowHandlePacket(tv, dtv, p);
+                            FlowSetupPacket(p);
                         }
                     }
                 }
index 238a6cf6faeb2b7ada03cf60a95142eb3783f3a5..26a48b7439afc5ad2eea503369ba74d73c1ff551 100644 (file)
@@ -362,8 +362,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p,
         SCLogDebug("Unknown Type, ICMPV6_UNKNOWN_TYPE");
 #endif
 
-    /* Flow is an integral part of us */
-    FlowHandlePacket(tv, dtv, p);
+    FlowSetupPacket(p);
 
     return TM_ECODE_OK;
 }
index a41c35bfd7b06e579283599cbcf2094769d31524..c21d224600457e954a203432803ea53f0861fc07 100644 (file)
@@ -1518,7 +1518,6 @@ int DecodeIPV4DefragTest03(void)
         0x80, 0x00, 0xb1, 0xa3, 0x00, 0x00
     };
 
-    Flow *f = NULL;
     Packet *p = PacketGetFromAlloc();
     if (unlikely(p == NULL))
         return 0;
@@ -1542,12 +1541,11 @@ int DecodeIPV4DefragTest03(void)
         result = 0;
         goto end;
     }
-    if (p->flow == NULL) {
+    if (!(p->flags & PKT_WANTS_FLOW)) {
         printf("packet flow shouldn't be NULL\n");
         result = 0;
         goto end;
     }
-    f = p->flow;
     PACKET_RECYCLE(p);
 
     PacketCopyData(p, pkt1, sizeof(pkt1));
@@ -1585,11 +1583,11 @@ int DecodeIPV4DefragTest03(void)
         result = 0;
         goto end;
     }
-    if (tp->flow == NULL) {
+    if (!(tp->flags & PKT_WANTS_FLOW)) {
         result = 0;
         goto end;
     }
-    if (tp->flow != f) {
+    if (tp->flow_hash != p->flow_hash) {
         result = 0;
         goto end;
     }
index 2d493c6690fe3760b61bde9f9b2c33ef9f9da49e..f787933ebd9d0e512273100adb53020a44f5c68f 100644 (file)
@@ -73,8 +73,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u
         SCTP_GET_SRC_PORT(p), SCTP_GET_DST_PORT(p));
 #endif
 
-    /* Flow is an integral part of us */
-    FlowHandlePacket(tv, dtv, p);
+    FlowSetupPacket(p);
 
     return TM_ECODE_OK;
 }
index b3c73b5d8910e4b3cc8184e73519e281866e5b75..51dbfaaa26ac74edd7788ea48d3e249aea283ce6 100644 (file)
@@ -214,8 +214,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
         TCP_HAS_MSS(p) ? "MSS " : "");
 #endif
 
-    /* Flow is an integral part of us */
-    FlowHandlePacket(tv, dtv, p);
+    FlowSetupPacket(p);
 
     return TM_ECODE_OK;
 }
index 24f9207b8cbfa442296cbc3101ae2661a73b6dc6..d45d57f2548eda48fe2bb6c4115c1d55b0bc5cf7 100644 (file)
@@ -85,17 +85,11 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
     if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) {
         /* Here we have a Teredo packet and don't need to handle app
          * layer */
-        FlowHandlePacket(tv, dtv, p);
+        FlowSetupPacket(p);
         return TM_ECODE_OK;
     }
 
-    /* Flow is an integral part of us */
-    FlowHandlePacket(tv, dtv, p);
-
-    /* handle the app layer part of the UDP packet payload */
-    if (unlikely(p->flow != NULL)) {
-        AppLayerHandleUdp(tv, dtv->app_tctx, p, p->flow);
-    }
+    FlowSetupPacket(p);
 
     return TM_ECODE_OK;
 }
index 4c83d08300c8cd7c4a0c46457e6bcf102baeeb0c..a8641a23a0574964bf54b640847f1eae3ead8212 100644 (file)
@@ -399,6 +399,10 @@ typedef struct Packet_
 
     struct Flow_ *flow;
 
+    /* raw hash value for looking up the flow, will need to modulated to the
+     * hash size still */
+    uint32_t flow_hash;
+
     struct timeval ts;
 
     union {
@@ -1049,6 +1053,10 @@ int DecoderParseDataFromFile(char *filename, DecoderFunc Decoder);
 #define PKT_IS_INVALID                  (1<<20)
 #define PKT_PROFILE                     (1<<21)
 
+/** indication by decoder that it feels the packet should be handled by
+ *  flow engine: Packet::flow_hash will be set */
+#define PKT_WANTS_FLOW                  (1<<22)
+
 /** \brief return 1 if the packet is a pseudo packet */
 #define PKT_IS_PSEUDOPKT(p) ((p)->flags & PKT_PSEUDO_STREAM_END)
 
index 20f1d36c611db5b48070091f03f1c57e25b58b66..c7418aa2949138a4fe367079349fb0402a69d9b6 100644 (file)
@@ -53,12 +53,6 @@ SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
 
 static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv);
-static int handle_tcp_reuse = 1;
-
-void FlowDisableTcpReuseHandling(void)
-{
-    handle_tcp_reuse = 0;
-}
 
 /** \brief compare two raw ipv6 addrs
  *
@@ -297,6 +291,12 @@ static inline int FlowCompareICMPv4(Flow *f, const Packet *p)
     return 0;
 }
 
+void FlowSetupPacket(Packet *p)
+{
+    p->flags |= PKT_WANTS_FLOW;
+    p->flow_hash = FlowGetHash(p);
+}
+
 int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn);
 
 static inline int FlowCompare(Flow *f, const Packet *p)
@@ -312,17 +312,6 @@ static inline int FlowCompare(Flow *f, const Packet *p)
         if (f->flags & FLOW_TCP_REUSED)
             return 0;
 
-        if (handle_tcp_reuse == 1) {
-            /* lets see if we need to consider the existing session reuse */
-            if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
-                /* okay, we need to setup a new flow for this packet.
-                 * Flag the flow that it's been replaced by a new one */
-                f->flags |= FLOW_TCP_REUSED;
-                SCLogDebug("flow obsolete: TCP reuse will use a new flow "
-                        "starting with packet %"PRIu64, p->pcap_cnt);
-                return 0;
-            }
-        }
         return 1;
     } else {
         return CMP_FLOW(f, p);
@@ -418,120 +407,41 @@ static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
     return f;
 }
 
-Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest)
+static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
+                             FlowBucket *fb, Flow *old_f,
+                             const uint32_t hash, const Packet *p)
 {
-    Flow *f = NULL;
-
-    /* get the key to our bucket */
-    uint32_t hash = FlowGetHash(p);
-    /* get our hash bucket and lock it */
-    FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
-    FBLOCK_LOCK(fb);
+    /* tag flow as reused so future lookups won't find it */
+    old_f->flags |= FLOW_TCP_REUSED;
+    /* get some settings that we move over to the new flow */
+    FlowThreadId thread_id = old_f->thread_id;
+    int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
 
-    SCLogDebug("fb %p fb->head %p", fb, fb->head);
-
-    f = FlowGetNew(NULL, NULL, p);
-    if (f != NULL) {
-        /* flow is locked */
-        if (fb->head == NULL) {
-            fb->head = f;
-            fb->tail = f;
-        } else {
-            f->hprev = fb->tail;
-            fb->tail->hnext = f;
-            fb->tail = f;
-        }
-
-        /* got one, now lock, initialize and return */
-        FlowInit(f, p);
-        f->flow_hash = hash;
-        f->fb = fb;
-        /* update the last seen timestamp of this flow */
-        COPY_TIMESTAMP(&p->ts,&f->lastts);
-        FlowReference(dest, f);
+    /* since fb lock is still held this flow won't be found until we are done */
+    FLOWLOCK_UNLOCK(old_f);
 
-    }
-    FBLOCK_UNLOCK(fb);
-    return f;
-}
-
-/** \brief Lookup flow based on packet
- *
- *  Find the flow belonging to this packet. If not found, no new flow
- *  is set up.
- *
- *  \param p packet to lookup the flow for
- *
- *  \retval f flow or NULL if not found
- */
-Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest)
-{
-    Flow *f = NULL;
-
-    /* get the key to our bucket */
-    uint32_t hash = FlowGetHash(p);
-    /* get our hash bucket and lock it */
-    FlowBucket *fb = &flow_hash[hash  % flow_config.hash_size];
-    FBLOCK_LOCK(fb);
-
-    SCLogDebug("fb %p fb->head %p", fb, fb->head);
-
-    /* see if the bucket already has a flow */
-    if (fb->head == NULL) {
-        FBLOCK_UNLOCK(fb);
+    /* Get a new flow. It will be either a locked flow or NULL */
+    Flow *f = FlowGetNew(tv, dtv, p);
+    if (f == NULL) {
         return NULL;
     }
 
-    /* ok, we have a flow in the bucket. Let's find out if it is our flow */
-    f = fb->head;
-
-    /* see if this is the flow we are looking for */
-    if (FlowCompare(f, p) == 0) {
-        while (f) {
-            f = f->hnext;
-
-            if (f == NULL) {
-                FBLOCK_UNLOCK(fb);
-                return NULL;
-            }
+    /* flow is locked */
 
-            if (FlowCompare(f, p) != 0) {
-                /* we found our flow, lets put it on top of the
-                 * hash list -- this rewards active flows */
-                if (f->hnext) {
-                    f->hnext->hprev = f->hprev;
-                }
-                if (f->hprev) {
-                    f->hprev->hnext = f->hnext;
-                }
-                if (f == fb->tail) {
-                    fb->tail = f->hprev;
-                }
+    /* put at the start of the list */
+    f->hnext = fb->head;
+    fb->head->hprev = f;
+    fb->head = f;
 
-                f->hnext = fb->head;
-                f->hprev = NULL;
-                fb->head->hprev = f;
-                fb->head = f;
+    /* initialize and return */
+    FlowInit(f, p);
+    f->flow_hash = hash;
+    f->fb = fb;
 
-                /* found our flow, lock & return */
-                FLOWLOCK_WRLOCK(f);
-                /* update the last seen timestamp of this flow */
-                COPY_TIMESTAMP(&p->ts,&f->lastts);
-                FlowReference(dest, f);
-
-                FBLOCK_UNLOCK(fb);
-                return f;
-            }
-        }
+    f->thread_id = thread_id;
+    if (autofp_tmqh_flow_qid != -1) {
+        SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
     }
-
-    /* lock & return */
-    FLOWLOCK_WRLOCK(f);
-    /* update the last seen timestamp of this flow */
-    COPY_TIMESTAMP(&p->ts,&f->lastts);
-    FlowReference(dest, f);
-
-    FBLOCK_UNLOCK(fb);
     return f;
 }
 
@@ -556,10 +466,9 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
 {
     Flow *f = NULL;
 
-    /* get the key to our bucket */
-    uint32_t hash = FlowGetHash(p);
     /* get our hash bucket and lock it */
-    FlowBucket *fb = &flow_hash[hash  % flow_config.hash_size];
+    const uint32_t hash = p->flow_hash;
+    FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
     FBLOCK_LOCK(fb);
 
     SCLogDebug("fb %p fb->head %p", fb, fb->head);
@@ -645,6 +554,14 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
 
                 /* found our flow, lock & return */
                 FLOWLOCK_WRLOCK(f);
+                if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
+                    f = TcpReuseReplace(tv, dtv, fb, f, hash, p);
+                    if (f == NULL) {
+                        FBLOCK_UNLOCK(fb);
+                        return NULL;
+                    }
+                }
+
                 /* update the last seen timestamp of this flow */
                 COPY_TIMESTAMP(&p->ts,&f->lastts);
                 FlowReference(dest, f);
@@ -657,6 +574,14 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
 
     /* lock & return */
     FLOWLOCK_WRLOCK(f);
+    if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
+        f = TcpReuseReplace(tv, dtv, fb, f, hash, p);
+        if (f == NULL) {
+            FBLOCK_UNLOCK(fb);
+            return NULL;
+        }
+    }
+
     /* update the last seen timestamp of this flow */
     COPY_TIMESTAMP(&p->ts,&f->lastts);
     FlowReference(dest, f);
index 9c3973e65495f7d4ff12ac9943856f1a32c55d86..9c6b1653d6e30419f0017b638eea456dc5725d6e 100644 (file)
@@ -226,37 +226,6 @@ static inline int FlowUpdateSeenFlag(const Packet *p)
     return 1;
 }
 
-/**
- *
- *  Remove packet from flow. This assumes this happens *before* the packet
- *  is added to the stream engine and other higher state.
- *
- *  \todo we can't restore the lastts
- */
-void FlowHandlePacketUpdateRemove(Flow *f, Packet *p)
-{
-    if (p->flowflags & FLOW_PKT_TOSERVER) {
-        f->todstpktcnt--;
-        f->todstbytecnt -= GET_PKT_LEN(p);
-        p->flowflags &= ~(FLOW_PKT_TOSERVER|FLOW_PKT_TOSERVER_FIRST);
-    } else {
-        f->tosrcpktcnt--;
-        f->tosrcbytecnt -= GET_PKT_LEN(p);
-        p->flowflags &= ~(FLOW_PKT_TOCLIENT|FLOW_PKT_TOCLIENT_FIRST);
-    }
-    p->flowflags &= ~FLOW_PKT_ESTABLISHED;
-
-    /*set the detection bypass flags*/
-    if (f->flags & FLOW_NOPACKET_INSPECTION) {
-        SCLogDebug("unsetting FLOW_NOPACKET_INSPECTION flag on flow %p", f);
-        DecodeUnsetNoPacketInspectionFlag(p);
-    }
-    if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {
-        SCLogDebug("unsetting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f);
-        DecodeUnsetNoPayloadInspectionFlag(p);
-    }
-}
-
 /** \brief Update Packet and Flow
  *
  *  Updates packet and flow based on the new packet.
@@ -330,10 +299,6 @@ void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
     if (f == NULL)
         return;
 
-    FlowHandlePacketUpdate(f, p);
-
-    FLOWLOCK_UNLOCK(f);
-
     /* set the flow in the packet */
     p->flags |= PKT_HAS_FLOW;
     return;
index 2085f32242ed66927ff3f9d2d707c9b722709e30..f0fcc226ab98272d86652428c611956ee7a6756a 100644 (file)
@@ -432,6 +432,11 @@ typedef struct FlowProto_ {
     void (*Freefunc)(void *);
 } FlowProto;
 
+/** \brief prepare packet for a life with flow
+ *  Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup
+ *  and calc the hash value to be used in the lookup and autofp flow
+ *  balancing. */
+void FlowSetupPacket(Packet *p);
 void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
 void FlowInitConfig (char);
 void FlowPrintQueueInfo (void);
@@ -577,11 +582,7 @@ AppProto FlowGetAppProtocol(const Flow *f);
 void *FlowGetAppState(const Flow *f);
 uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags);
 
-void FlowHandlePacketUpdateRemove(Flow *f, Packet *p);
 void FlowHandlePacketUpdate(Flow *f, Packet *p);
 
-Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest);
-Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest);
-
 #endif /* __FLOW_H__ */
 
index f449a953a295db38f60b50bcc104c05cd6cff25d..e2ffde4b4f481f6ce2661694992fc53e7cd81d9f 100644 (file)
@@ -132,7 +132,6 @@ int RunModeErfFileAutoFp(void)
     int thread;
 
     RunModeInitialize();
-    RunmodeSetFlowStreamAsync();
 
     char *file = NULL;
     if (ConfGet("erf-file.file", &file) == 0) {
index 29a65eeebf54a359735315e34e4f4561f69f69af..cf99fac6fc612e1274eb4f3ad74548d4e78ef02c 100644 (file)
@@ -165,7 +165,6 @@ int RunModeFilePcapAutoFp(void)
     int thread;
 
     RunModeInitialize();
-    RunmodeSetFlowStreamAsync();
 
     char *file = NULL;
     if (ConfGet("pcap-file.file", &file) == 0) {
index 22cb446c8f4562b792b08085906e7431ebbf355c..eec197f9fd52785f63d9e8cb73267089a73fc6eb 100644 (file)
@@ -4863,145 +4863,14 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn
     return 0;
 }
 
-/** \brief Handle TCP reuse of tuple
- *
- *  Logic:
- *  1. see if packet could trigger a new session
- *  2. see if the flow/ssn is in a state where we want to support the reuse
- *  3. disconnect packet from the old flow
- *  -> at this point new packets can still find the old flow
- *  -> as the flow's reference count != 0, it can't disappear
- *  4. setup a new flow unconditionally
- *  5. attach packet to new flow
- *  6. tag old flow as FLOW_TCP_REUSED
- *  -> NEW packets won't find it
- *  -> existing packets in our queues may still reference it
- *  7. dereference the old flow (reference cnt *may* now be 0,
- *     if no other packets reference it)
- *
- *  The packets that still hold a reference to the old flow are updated
- *  by HandleFlowReuseApplyToPacket()
- */
-static void TcpSessionReuseHandle(Packet *p) {
-    if (likely(TcpSessionPacketIsStreamStarter(p) == 0))
-        return;
-
-    int reuse = 0;
-    FLOWLOCK_RDLOCK(p->flow);
-    reuse = TcpSessionReuseDoneEnough(p, p->flow, p->flow->protoctx);
-    if (!reuse) {
-        SCLogDebug("steam starter packet %"PRIu64", but state not "
-                   "ready to be reused", p->pcap_cnt);
-        FLOWLOCK_UNLOCK(p->flow);
-        return;
-    }
-
-    SCLogDebug("steam starter packet %"PRIu64", and state "
-            "ready to be reused", p->pcap_cnt);
-
-    /* ok, this packet needs a new flow */
-
-    /* first, get a reference to the old flow */
-    Flow *old_f = NULL;
-    FlowReference(&old_f, p->flow);
-
-    /* get some settings that we move over to the new flow */
-    FlowThreadId thread_id = old_f->thread_id;
-    int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
-
-    /* disconnect the packet from the old flow */
-    FlowHandlePacketUpdateRemove(p->flow, p);
-    FLOWLOCK_UNLOCK(p->flow);
-    FlowDeReference(&p->flow); // < can't disappear while usecnt >0
-
-    /* Can't tag flow as reused yet, would be a race condition:
-     * new packets will not get old flow because of FLOW_TCP_REUSED,
-     * so new flow may be created. This new flow could be handled in
-     * a different thread. */
-
-    /* Get a flow. It will be either a locked flow or NULL */
-    Flow *new_f = FlowGetFlowFromHashByPacket(p, &p->flow);
-    if (new_f == NULL) {
-        FlowDeReference(&old_f); // < can't disappear while usecnt >0
-        return;
-    }
-
-    /* update flow and packet */
-    FlowHandlePacketUpdate(new_f, p);
-    BUG_ON(new_f != p->flow);
-
-    /* copy flow balancing settings */
-    new_f->thread_id = thread_id;
-    SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
-
-    FLOWLOCK_UNLOCK(new_f);
-
-    /* tag original flow that it's now unused */
-    FLOWLOCK_WRLOCK(old_f);
-    SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt);
-    old_f->flags |= FLOW_TCP_REUSED;
-    FLOWLOCK_UNLOCK(old_f);
-    FlowDeReference(&old_f); // < can't disappear while usecnt >0
-
-    SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt);
-}
-
-/** \brief Handle packets that reference the wrong flow because of TCP reuse
- *
- *  In the case of TCP reuse we can have many packets that were assigned
- *  a flow by the capture/decode threads before the stream engine decided
- *  that a new flow was needed for these packets.
- *  When HandleFlowReuse creates a new flow, the packets already processed
- *  by the flow engine will still reference the old flow.
- *
- *  This function detects this case and replaces the flow for those packets.
- *  It's a fairly expensive operation, but it should be rare as it's only
- *  done for packets that were already in the engine when the TCP reuse
- *  case was handled. New packets are assigned the correct flow by the
- *  flow engine.
- */
-static void TcpSessionReuseHandleApplyToPacket(Packet *p)
+void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
 {
-    int need_flow_replace = 0;
-
-    FLOWLOCK_WRLOCK(p->flow);
-    if (p->flow->flags & FLOW_TCP_REUSED) {
-        SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt);
-        need_flow_replace = 1;
-    }
-
-    if (likely(need_flow_replace == 0)) {
-        /* Work around a race condition: if HandleFlowReuse has inserted a new flow,
-         * it will not have seen both sides of the session yet. The packet we have here
-         * may be the first that got the flow directly from the hash right after the
-         * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */
-        if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) {
-            p->flowflags |= FLOW_PKT_ESTABLISHED;
-            SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
-        } else {
-            SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
-        }
-        SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow);
-        FLOWLOCK_UNLOCK(p->flow);
-        return;
-    }
-
-    /* disconnect packet from old flow */
-    FlowHandlePacketUpdateRemove(p->flow, p);
-    FLOWLOCK_UNLOCK(p->flow);
-    FlowDeReference(&p->flow); // < can't disappear while usecnt >0
+    FlowHandlePacketUpdate(p->flow, p);
 
-    /* find the new flow that does belong to this packet */
-    Flow *new_f = FlowLookupFlowFromHash(p, &p->flow);
-    if (new_f == NULL) {
-        // TODO reset packet flag wrt flow: direction, HAS_FLOW etc
-        p->flags &= ~PKT_HAS_FLOW;
-        return;
+    /* handle the app layer part of the UDP packet payload */
+    if (p->proto == IPPROTO_UDP) {
+        AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
     }
-    FlowHandlePacketUpdate(new_f, p);
-    BUG_ON(new_f != p->flow);
-    FLOWLOCK_UNLOCK(new_f);
-    SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow);
 }
 
 TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
@@ -5009,49 +4878,59 @@ TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
     StreamTcpThread *stt = (StreamTcpThread *)data;
     TmEcode ret = TM_ECODE_OK;
 
-    if (!(PKT_IS_TCP(p)))
+    SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt);
+
+    if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) {
+        FLOWLOCK_WRLOCK(p->flow);
+        AppLayerProfilingReset(stt->ra_ctx->app_tctx);
+        (void)StreamTcpPacket(tv, p, stt, pq);
+        p->flags |= PKT_IGNORE_CHECKSUM;
+        stt->pkts++;
+        FLOWLOCK_UNLOCK(p->flow);
         return TM_ECODE_OK;
+    }
+
+    if (!(p->flags & PKT_WANTS_FLOW)) {
+        return TM_ECODE_OK;
+    }
+
+    FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
+    if (likely(p->flow != NULL)) {
+        FlowUpdate(tv, stt, p);
+    }
+
+    if (!(PKT_IS_TCP(p))) {
+        goto unlock;
+    }
 
     if (p->flow == NULL) {
         StatsIncr(tv, stt->counter_tcp_no_flow);
-        return TM_ECODE_OK;
+        goto unlock;
     }
 
+    /* only TCP packets with a flow from here */
+
     if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
         if (StreamTcpValidateChecksum(p) == 0) {
             StatsIncr(tv, stt->counter_tcp_invalid_checksum);
-            return TM_ECODE_OK;
+            goto unlock;
         }
     } else {
         p->flags |= PKT_IGNORE_CHECKSUM;
     }
-
-    if (stt->runmode_flow_stream_async) {
-        /* "autofp" handling of TCP session/flow reuse */
-        if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
-            /* apply previous reuses to this packet */
-            TcpSessionReuseHandleApplyToPacket(p);
-            if (p->flow == NULL)
-                return ret;
-
-            if (!(p->flowflags & FLOW_PKT_TOSERVER_FIRST)) {
-                /* after that, check for 'new' reuse */
-                TcpSessionReuseHandle(p);
-                if (p->flow == NULL)
-                    return ret;
-            }
-        }
-    }
     AppLayerProfilingReset(stt->ra_ctx->app_tctx);
 
-    FLOWLOCK_WRLOCK(p->flow);
     ret = StreamTcpPacket(tv, p, stt, pq);
-    FLOWLOCK_UNLOCK(p->flow);
 
     //if (ret)
       //  return TM_ECODE_FAILED;
 
     stt->pkts++;
+
+ unlock:
+    if (p->flow) {
+        FLOWLOCK_UNLOCK(p->flow);
+    }
     return ret;
 }
 
@@ -5114,11 +4993,6 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
     if (stt->ssn_pool_id < 0 || ssn_pool == NULL)
         SCReturnInt(TM_ECODE_FAILED);
 
-    /* see if need to enable the TCP reuse handling in the stream engine */
-    stt->runmode_flow_stream_async = RunmodeGetFlowStreamAsync();
-    SCLogDebug("Flow and Stream engine run %s",
-            stt->runmode_flow_stream_async ? "asynchronous" : "synchronous");
-
     SCReturnInt(TM_ECODE_OK);
 }
 
index 416cd5aa808f7c5ef7efd959bede4d10e9ca257f..e23d9eab71e7df36f94ba386b7a408e71ff0b48a 100644 (file)
@@ -76,10 +76,6 @@ typedef struct TcpStreamCnf_ {
 typedef struct StreamTcpThread_ {
     int ssn_pool_id;
 
-    /** if set to true, we activate the TCP tuple reuse code in the
-     *  stream engine. */
-    int runmode_flow_stream_async;
-
     uint64_t pkts;
 
     /** queue for pseudo packet(s) that were created in the stream
index 5a6b40c9f58f512fe466feb7623a647b33c111b8..881c5985cae8232b913212630ed7d5745ed6a577 100644 (file)
@@ -61,7 +61,9 @@ void TmqhFlowRegister(void)
         if (strcasecmp(scheduler, "round-robin") == 0) {
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
         } else if (strcasecmp(scheduler, "active-packets") == 0) {
-            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+            //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+            SCLogNotice("FIXME: using flow hash instead of active packets");
+            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
         } else if (strcasecmp(scheduler, "hash") == 0) {
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
         } else if (strcasecmp(scheduler, "ippair") == 0) {
@@ -73,7 +75,8 @@ void TmqhFlowRegister(void)
             exit(EXIT_FAILURE);
         }
     } else {
-        tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+        //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+        tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
     }
 
     return;
@@ -330,7 +333,7 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
  * \param tv thread vars.
  * \param p packet.
  */
-void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
+void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p)
 {
     int16_t qid = 0;
 
@@ -371,6 +374,31 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
     return;
 }
 
+void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
+{
+    int16_t qid = 0;
+
+    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+    if (p->flags & PKT_WANTS_FLOW) {
+        uint32_t hash = p->flow_hash;
+        qid = hash % ctx->size;
+    } else {
+        qid = ctx->last++;
+
+        if (ctx->last == ctx->size)
+            ctx->last = 0;
+    }
+    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
+
+    PacketQueue *q = ctx->queues[qid].q;
+    SCMutexLock(&q->mutex_q);
+    PacketEnqueue(q, p);
+    SCCondSignal(&q->cond_q);
+    SCMutexUnlock(&q->mutex_q);
+
+    return;
+}
 /**
  * \brief select the queue to output based on IP address pair.
  *
index 13724e98fe010b4e14a0640c6b8e44c46a8c133c..8242c81724615d2c3d215f8ce5e1fabc4079b314 100644 (file)
 
 #include "flow-hash.h"
 
-/** set to true if flow engine and stream engine run in different
- *  threads. */
-static int runmode_flow_stream_async = 0;
-
-void RunmodeSetFlowStreamAsync(void)
-{
-    runmode_flow_stream_async = 1;
-    FlowDisableTcpReuseHandling();
-}
-
-int RunmodeGetFlowStreamAsync(void)
-{
-    return runmode_flow_stream_async;
-}
-
 /** \brief create a queue string for autofp to pass to
  *         the flow queue handler.
  *
@@ -121,8 +106,6 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser,
     if (thread_max < 1)
         thread_max = 1;
 
-    RunmodeSetFlowStreamAsync();
-
     queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
     if (queues == NULL) {
         SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed");
@@ -497,8 +480,6 @@ int RunModeSetIPSAutoFp(ConfigIPSParserFunc ConfigParser,
     if (thread_max < 1)
         thread_max = 1;
 
-    RunmodeSetFlowStreamAsync();
-
     queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
     if (queues == NULL) {
         SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed");
index dd2c20dc9c08b59ca61e5300147b907f84632612..e58644a5308c56b46e3e32554cf4169eafc9e074 100644 (file)
@@ -854,8 +854,10 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir)
             p->dst.addr_data32[0] = i;
         }
         FlowHandlePacket(NULL, NULL, p);
-        if (p->flow != NULL)
+        if (p->flow != NULL) {
             SC_ATOMIC_RESET(p->flow->use_cnt);
+            FLOWLOCK_UNLOCK(p->flow);
+        }
 
         /* Now the queues shoul be updated */
         UTHFreePacket(p);