]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
tcp reuse: handle reuse in stream engine
authorVictor Julien <victor@inliniac.net>
Thu, 22 Jan 2015 16:30:10 +0000 (17:30 +0100)
committerVictor Julien <victor@inliniac.net>
Wed, 18 Feb 2015 08:18:43 +0000 (09:18 +0100)
For the autofp case, handling TCP reuse in the flow engine didn't work.

The problem is the mismatch between the moment the flow engine looks at
packets and the stream, and the moment the stream engine runs. Flow engine
is invoked in the packet capture thread(s), while the stream engine runs
as part of the stream/detect thread(s). Because of the queues between
those threads the flow manager may already inspect a new SYN while the
stream engine still has to process the previous session.

Moving the flow engine to the stream/detect thread(s) wasn't an option
as the 'autofp' load balancing depends on the flow already being
available in the packet.

The solution here is to add a check for this condition to the stream
engine. At this point the TCP state is up to date. If a TCP reuse case
is encountered, this is the global logic:

- detach packet for old flow
- get a new flow and attach it to the packet
- flag the old flow that it is now obsolete

Additional logic makes sure that the packets already in the queue
between the flow thread(s) and the stream thread are reassigned the
new flow.

Some special handling:

Apply previous 'reuse' before checking for a new reuse. Otherwise we're
tagging the wrong flow in some cases (multiple reuses in the same tuple).

When in a flow/ssn reuse condition, properly remove the packet from
the flow.

Don't 'reuse' if packet is a SYN retransmission.

The old flow is timed out normally by the flow manager.

src/flow-hash.c
src/flow.h
src/stream-tcp.c

index 1fea10a67d3963387c6435ffc6ece60f2c4d41ef..47e7711a231004463d7af37a096a94932a12c5c8 100644 (file)
@@ -507,6 +507,121 @@ static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
     return f;
 }
 
+Flow *FlowGetFlowFromHashByPacket(const Packet *p)
+{
+    Flow *f = NULL;
+
+    /* get the key to our bucket */
+    uint32_t key = FlowGetKey(p);
+    /* get our hash bucket and lock it */
+    FlowBucket *fb = &flow_hash[key];
+    FBLOCK_LOCK(fb);
+
+    SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+    f = FlowGetNew(NULL, NULL, p);
+    if (f != NULL) {
+        /* flow is locked */
+        if (fb->head == NULL) {
+            fb->head = f;
+            fb->tail = f;
+        } else {
+            f->hprev = fb->tail;
+            fb->tail->hnext = f;
+            fb->tail = f;
+        }
+
+        /* got one, now lock, initialize and return */
+        FlowInit(f, p);
+        f->fb = fb;
+        /* update the last seen timestamp of this flow */
+        COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+    }
+    FBLOCK_UNLOCK(fb);
+    return f;
+}
+
+/** \brief Lookup flow based on packet
+ *
+ *  Find the flow belonging to this packet. If not found, no new flow
+ *  is set up.
+ *
+ *  \param p packet to lookup the flow for
+ *
+ *  \retval f flow or NULL if not found
+ */
+Flow *FlowLookupFlowFromHash(const Packet *p)
+{
+    Flow *f = NULL;
+
+    /* get the key to our bucket */
+    uint32_t key = FlowGetKey(p);
+    /* get our hash bucket and lock it */
+    FlowBucket *fb = &flow_hash[key];
+    FBLOCK_LOCK(fb);
+
+    SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+    /* see if the bucket already has a flow */
+    if (fb->head == NULL) {
+        FBLOCK_UNLOCK(fb);
+        return NULL;
+    }
+
+    /* ok, we have a flow in the bucket. Let's find out if it is our flow */
+    f = fb->head;
+
+    /* see if this is the flow we are looking for */
+    if (FlowCompare(f, p) == 0) {
+        while (f) {
+            FlowHashCountIncr;
+
+            f = f->hnext;
+
+            if (f == NULL) {
+                FBLOCK_UNLOCK(fb);
+                return NULL;
+            }
+
+            if (FlowCompare(f, p) != 0) {
+                /* we found our flow, lets put it on top of the
+                 * hash list -- this rewards active flows */
+                if (f->hnext) {
+                    f->hnext->hprev = f->hprev;
+                }
+                if (f->hprev) {
+                    f->hprev->hnext = f->hnext;
+                }
+                if (f == fb->tail) {
+                    fb->tail = f->hprev;
+                }
+
+                f->hnext = fb->head;
+                f->hprev = NULL;
+                fb->head->hprev = f;
+                fb->head = f;
+
+                /* found our flow, lock & return */
+                FLOWLOCK_WRLOCK(f);
+                /* update the last seen timestamp of this flow */
+                COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+                FBLOCK_UNLOCK(fb);
+                return f;
+            }
+        }
+    }
+
+    /* lock & return */
+    FLOWLOCK_WRLOCK(f);
+    /* update the last seen timestamp of this flow */
+    COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+    FBLOCK_UNLOCK(fb);
+    return f;
+}
+
 /** \brief Get Flow for packet
  *
  * Hash retrieval function for flows. Looks up the hash bucket containing the
index 2ef95d724f1dfcf26a79a5b97e367a624d6035dd..66df3561f7e3070e9a9f35c6f5aeb86f645d5028 100644 (file)
@@ -575,9 +575,11 @@ int FlowClearMemory(Flow *,uint8_t );
 AppProto FlowGetAppProtocol(Flow *f);
 void *FlowGetAppState(Flow *f);
 
-
 void FlowHandlePacketUpdateRemove(Flow *f, Packet *p);
 void FlowHandlePacketUpdate(Flow *f, Packet *p);
 
+Flow *FlowGetFlowFromHashByPacket(const Packet *p);
+Flow *FlowLookupFlowFromHash(const Packet *p);
+
 #endif /* __FLOW_H__ */
 
index 6f339ae13ef1b4336badcdc3fb8382801ac3fba0..a24c86543fb8b18d58af024e84ea692b6870bc3c 100644 (file)
@@ -4687,6 +4687,211 @@ static inline int StreamTcpValidateChecksum(Packet *p)
     return ret;
 }
 
+/** \internal
+ *  \brief check if a packet is a valid stream started
+ *  \retval bool true/false */
+static int TcpSessionPacketIsStreamStarter(const Packet *p)
+{
+    uint8_t flag = TH_SYN;
+
+    //SCLogInfo("Want: %02x, have %02x", flag, p->tcph->th_flags);
+
+    if (p->tcph->th_flags == flag) {
+        SCLogDebug("packet %"PRIu64" is a stream starter: %02x", p->pcap_cnt, p->tcph->th_flags);
+        return 1;
+    }
+    return 0;
+}
+
+/** \internal
+ *  \brief Check if Flow and TCP SSN allow this flow/tuple to be reused
+ *  \retval bool true yes reuse, false no keep tracking old ssn */
+int TcpSessionReuseDoneEnough(Packet *p, const TcpSession *ssn)
+{
+    if (FlowGetPacketDirection(p->flow, p) == TOSERVER) {
+        if (ssn == NULL) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p null. No reuse.", p->pcap_cnt, ssn);
+            return 0;
+        }
+        if (SEQ_EQ(ssn->client.isn, TCP_GET_SEQ(p))) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p. Packet SEQ == Stream ISN. Retransmission. Don't reuse.", p->pcap_cnt, ssn);
+            return 0;
+        }
+        if (ssn->state >= TCP_LAST_ACK) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+            return 1;
+        }
+        if (ssn->state == TCP_NONE) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+            return 1;
+        }
+        if (ssn->state < TCP_LAST_ACK) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state);
+            return 0;
+        }
+
+    } else {
+        if (ssn == NULL) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p null. Reuse.", p->pcap_cnt, ssn);
+            return 1;
+        }
+        if (ssn->state >= TCP_LAST_ACK) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+            return 1;
+        }
+        if (ssn->state == TCP_NONE) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+            return 1;
+        }
+        if (ssn->state < TCP_LAST_ACK) {
+            SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state);
+            return 0;
+        }
+    }
+
+
+    SCLogDebug("default: how did we get here?");
+    return 0;
+}
+
+/** \brief Handle TCP reuse of tuple
+ *
+ *  Logic:
+ *  1. see if packet could trigger a new session
+ *  2. see if the flow/ssn is in a state where we want to support the reuse
+ *  3. disconnect packet from the old flow
+ *  -> at this point new packets can still find the old flow
+ *  -> as the flow's reference count != 0, it can't disappear
+ *  4. setup a new flow unconditionally
+ *  5. attach packet to new flow
+ *  6. tag old flow as FLOW_TCP_REUSED
+ *  -> NEW packets won't find it
+ *  -> existing packets in our queues may still reference it
+ *  7. dereference the old flow (reference cnt *may* now be 0,
+ *     if no other packets reference it)
+ *
+ *  The packets that still hold a reference to the old flow are updated
+ *  by HandleFlowReuseApplyToPacket()
+ */
+static void TcpSessionReuseHandle(Packet *p) {
+    if (likely(TcpSessionPacketIsStreamStarter(p) == 0))
+        return;
+
+    int reuse = 0;
+    FLOWLOCK_RDLOCK(p->flow);
+    reuse = TcpSessionReuseDoneEnough(p, p->flow->protoctx);
+    if (!reuse) {
+        SCLogDebug("steam starter packet %"PRIu64", but state not "
+                   "ready to be reused", p->pcap_cnt);
+        FLOWLOCK_UNLOCK(p->flow);
+        return;
+    }
+
+    /* ok, this packet needs a new flow */
+
+    /* first, get a reference to the old flow */
+    Flow *old_f = NULL;
+    FlowReference(&old_f, p->flow);
+
+    /* get some settings that we move over to the new flow */
+    FlowThreadId thread_id = old_f->thread_id;
+    int autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
+
+    /* disconnect the packet from the old flow */
+    FlowHandlePacketUpdateRemove(p->flow, p);
+    FLOWLOCK_UNLOCK(p->flow);
+    FlowDeReference(&p->flow); // < can't disappear while usecnt >0
+
+    /* Can't tag flow as reused yet, would be a race condition:
+     * new packets will not get old flow because of FLOW_TCP_REUSED,
+     * so new flow may be created. This new flow could be handled in
+     * a different thread. */
+
+    /* Get a flow. It will be either a locked flow or NULL */
+    Flow *new_f = FlowGetFlowFromHashByPacket(p);
+    if (new_f == NULL) {
+        FlowDeReference(&old_f); // < can't disappear while usecnt >0
+        return;
+    }
+
+    /* update flow and packet */
+    FlowHandlePacketUpdate(new_f, p);
+    BUG_ON(new_f != p->flow);
+
+    /* copy flow balancing settings */
+    new_f->thread_id = thread_id;
+    SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
+
+    FLOWLOCK_UNLOCK(new_f);
+
+    /* tag original flow that it's now unused */
+    FLOWLOCK_WRLOCK(old_f);
+    SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt);
+    old_f->flags |= FLOW_TCP_REUSED;
+    FLOWLOCK_UNLOCK(old_f);
+    FlowDeReference(&old_f); // < can't disappear while usecnt >0
+
+    SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt);
+}
+
+/** \brief Handle packets that reference the wrong flow because of TCP reuse
+ *
+ *  In the case of TCP reuse we can have many packets that were assigned
+ *  a flow by the capture/decode threads before the stream engine decided
+ *  that a new flow was needed for these packets.
+ *  When HandleFlowReuse creates a new flow, the packets already processed
+ *  by the flow engine will still reference the old flow.
+ *
+ *  This function detects this case and replaces the flow for those packets.
+ *  It's a fairly expensive operation, but it should be rare as it's only
+ *  done for packets that were already in the engine when the TCP reuse
+ *  case was handled. New packets are assigned the correct flow by the
+ *  flow engine.
+ */
+static void TcpSessionReuseHandleApplyToPacket(Packet *p)
+{
+    int need_flow_replace = 0;
+
+    FLOWLOCK_WRLOCK(p->flow);
+    if (p->flow->flags & FLOW_TCP_REUSED) {
+        SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt);
+        need_flow_replace = 1;
+    }
+
+    if (likely(need_flow_replace == 0)) {
+        /* Work around a race condition: if HandleFlowReuse has inserted a new flow,
+         * it will not have seen both sides of the session yet. The packet we have here
+         * may be the first that got the flow directly from the hash right after the
+         * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */
+        if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) {
+            p->flowflags |= FLOW_PKT_ESTABLISHED;
+            SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
+        } else {
+            SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
+        }
+        SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow);
+        FLOWLOCK_UNLOCK(p->flow);
+        return;
+    }
+
+    /* disconnect packet from old flow */
+    FlowHandlePacketUpdateRemove(p->flow, p);
+    FLOWLOCK_UNLOCK(p->flow);
+    FlowDeReference(&p->flow); // < can't disappear while usecnt >0
+
+    /* find the new flow that does belong to this packet */
+    Flow *new_f = FlowLookupFlowFromHash(p);
+    if (new_f == NULL) {
+        // TODO reset packet flag wrt flow: direction, HAS_FLOW etc
+        p->flags &= ~PKT_HAS_FLOW;
+        return;
+    }
+    FlowHandlePacketUpdate(new_f, p);
+    BUG_ON(new_f != p->flow);
+    FLOWLOCK_UNLOCK(new_f);
+    SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow);
+}
+
 TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
 {
     StreamTcpThread *stt = (StreamTcpThread *)data;
@@ -4709,6 +4914,20 @@ TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe
         p->flags |= PKT_IGNORE_CHECKSUM;
     }
 
+// TODO autofp only somehow
+    /* "autofp" handling of TCP session/flow reuse */
+    if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
+        /* apply previous reuses to this packet */
+        TcpSessionReuseHandleApplyToPacket(p);
+        if (p->flow == NULL)
+            return ret;
+
+        /* after that, check for 'new' reuse */
+        TcpSessionReuseHandle(p);
+        if (p->flow == NULL)
+            return ret;
+    }
+
     AppLayerProfilingReset(stt->ra_ctx->app_tctx);
 
     FLOWLOCK_WRLOCK(p->flow);