From: Victor Julien Date: Thu, 22 Jan 2015 16:30:10 +0000 (+0100) Subject: tcp reuse: handle reuse in stream engine X-Git-Tag: suricata-2.1beta4~204 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=29f70bad34dbf1bd55bd149158d23e9143c86ff5;p=thirdparty%2Fsuricata.git tcp reuse: handle reuse in stream engine For the autofp case, handling TCP reuse in the flow engine didn't work. The problem is the mismatch between the moment the flow engine looks at packets and the stream, and the moment the stream engine runs. Flow engine is invoked in the packet capture thread(s), while the stream engine runs as part of the stream/detect thread(s). Because of the queues between those threads the flow manager may already inspect a new SYN while the stream engine still has to process the previous session. Moving the flow engine to the stream/detect thread(s) wasn't an option as the 'autofp' load balancing depends on the flow already being available in the packet. The solution here is to add a check for this condition to the stream engine. At this point the TCP state is up to date. If a TCP reuse case is encountered, this is the global logic: - detach packet for old flow - get a new flow and attach it to the packet - flag the old flow that it is now obsolete Additional logic makes sure that the packets already in the queue between the flow thread(s) and the stream thread are reassigned the new flow. Some special handling: Apply previous 'reuse' before checking for a new reuse. Otherwise we're tagging the wrong flow in some cases (multiple reuses in the same tuple). When in a flow/ssn reuse condition, properly remove the packet from the flow. Don't 'reuse' if packet is a SYN retransmission. The old flow is timed out normally by the flow manager. --- diff --git a/src/flow-hash.c b/src/flow-hash.c index 1fea10a67d..47e7711a23 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -507,6 +507,121 @@ static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) return f; } +Flow *FlowGetFlowFromHashByPacket(const Packet *p) +{ + Flow *f = NULL; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + f = FlowGetNew(NULL, NULL, p); + if (f != NULL) { + /* flow is locked */ + if (fb->head == NULL) { + fb->head = f; + fb->tail = f; + } else { + f->hprev = fb->tail; + fb->tail->hnext = f; + fb->tail = f; + } + + /* got one, now lock, initialize and return */ + FlowInit(f, p); + f->fb = fb; + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + } + FBLOCK_UNLOCK(fb); + return f; +} + +/** \brief Lookup flow based on packet + * + * Find the flow belonging to this packet. If not found, no new flow + * is set up. + * + * \param p packet to lookup the flow for + * + * \retval f flow or NULL if not found + */ +Flow *FlowLookupFlowFromHash(const Packet *p) +{ + Flow *f = NULL; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + /* see if the bucket already has a flow */ + if (fb->head == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + + /* ok, we have a flow in the bucket. Let's find out if it is our flow */ + f = fb->head; + + /* see if this is the flow we are looking for */ + if (FlowCompare(f, p) == 0) { + while (f) { + FlowHashCountIncr; + + f = f->hnext; + + if (f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + + if (FlowCompare(f, p) != 0) { + /* we found our flow, lets put it on top of the + * hash list -- this rewards active flows */ + if (f->hnext) { + f->hnext->hprev = f->hprev; + } + if (f->hprev) { + f->hprev->hnext = f->hnext; + } + if (f == fb->tail) { + fb->tail = f->hprev; + } + + f->hnext = fb->head; + f->hprev = NULL; + fb->head->hprev = f; + fb->head = f; + + /* found our flow, lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + return f; + } + } + } + + /* lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + return f; +} + /** \brief Get Flow for packet * * Hash retrieval function for flows. Looks up the hash bucket containing the diff --git a/src/flow.h b/src/flow.h index 2ef95d724f..66df3561f7 100644 --- a/src/flow.h +++ b/src/flow.h @@ -575,9 +575,11 @@ int FlowClearMemory(Flow *,uint8_t ); AppProto FlowGetAppProtocol(Flow *f); void *FlowGetAppState(Flow *f); - void FlowHandlePacketUpdateRemove(Flow *f, Packet *p); void FlowHandlePacketUpdate(Flow *f, Packet *p); +Flow *FlowGetFlowFromHashByPacket(const Packet *p); +Flow *FlowLookupFlowFromHash(const Packet *p); + #endif /* __FLOW_H__ */ diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 6f339ae13e..a24c86543f 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -4687,6 +4687,211 @@ static inline int StreamTcpValidateChecksum(Packet *p) return ret; } +/** \internal + * \brief check if a packet is a valid stream started + * \retval bool true/false */ +static int TcpSessionPacketIsStreamStarter(const Packet *p) +{ + uint8_t flag = TH_SYN; + + //SCLogInfo("Want: %02x, have %02x", flag, p->tcph->th_flags); + + if (p->tcph->th_flags == flag) { + SCLogDebug("packet %"PRIu64" is a stream starter: %02x", p->pcap_cnt, p->tcph->th_flags); + return 1; + } + return 0; +} + +/** \internal + * \brief Check if Flow and TCP SSN allow this flow/tuple to be reused + * \retval bool true yes reuse, false no keep tracking old ssn */ +int TcpSessionReuseDoneEnough(Packet *p, const TcpSession *ssn) +{ + if (FlowGetPacketDirection(p->flow, p) == TOSERVER) { + if (ssn == NULL) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p null. No reuse.", p->pcap_cnt, ssn); + return 0; + } + if (SEQ_EQ(ssn->client.isn, TCP_GET_SEQ(p))) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p. Packet SEQ == Stream ISN. Retransmission. Don't reuse.", p->pcap_cnt, ssn); + return 0; + } + if (ssn->state >= TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state == TCP_NONE) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state < TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state); + return 0; + } + + } else { + if (ssn == NULL) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p null. Reuse.", p->pcap_cnt, ssn); + return 1; + } + if (ssn->state >= TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state == TCP_NONE) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state); + return 1; + } + if (ssn->state < TCP_LAST_ACK) { + SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state); + return 0; + } + } + + + SCLogDebug("default: how did we get here?"); + return 0; +} + +/** \brief Handle TCP reuse of tuple + * + * Logic: + * 1. see if packet could trigger a new session + * 2. see if the flow/ssn is in a state where we want to support the reuse + * 3. disconnect packet from the old flow + * -> at this point new packets can still find the old flow + * -> as the flow's reference count != 0, it can't disappear + * 4. setup a new flow unconditionally + * 5. attach packet to new flow + * 6. tag old flow as FLOW_TCP_REUSED + * -> NEW packets won't find it + * -> existing packets in our queues may still reference it + * 7. dereference the old flow (reference cnt *may* now be 0, + * if no other packets reference it) + * + * The packets that still hold a reference to the old flow are updated + * by HandleFlowReuseApplyToPacket() + */ +static void TcpSessionReuseHandle(Packet *p) { + if (likely(TcpSessionPacketIsStreamStarter(p) == 0)) + return; + + int reuse = 0; + FLOWLOCK_RDLOCK(p->flow); + reuse = TcpSessionReuseDoneEnough(p, p->flow->protoctx); + if (!reuse) { + SCLogDebug("steam starter packet %"PRIu64", but state not " + "ready to be reused", p->pcap_cnt); + FLOWLOCK_UNLOCK(p->flow); + return; + } + + /* ok, this packet needs a new flow */ + + /* first, get a reference to the old flow */ + Flow *old_f = NULL; + FlowReference(&old_f, p->flow); + + /* get some settings that we move over to the new flow */ + FlowThreadId thread_id = old_f->thread_id; + int autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid); + + /* disconnect the packet from the old flow */ + FlowHandlePacketUpdateRemove(p->flow, p); + FLOWLOCK_UNLOCK(p->flow); + FlowDeReference(&p->flow); // < can't disappear while usecnt >0 + + /* Can't tag flow as reused yet, would be a race condition: + * new packets will not get old flow because of FLOW_TCP_REUSED, + * so new flow may be created. This new flow could be handled in + * a different thread. */ + + /* Get a flow. It will be either a locked flow or NULL */ + Flow *new_f = FlowGetFlowFromHashByPacket(p); + if (new_f == NULL) { + FlowDeReference(&old_f); // < can't disappear while usecnt >0 + return; + } + + /* update flow and packet */ + FlowHandlePacketUpdate(new_f, p); + BUG_ON(new_f != p->flow); + + /* copy flow balancing settings */ + new_f->thread_id = thread_id; + SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid); + + FLOWLOCK_UNLOCK(new_f); + + /* tag original flow that it's now unused */ + FLOWLOCK_WRLOCK(old_f); + SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt); + old_f->flags |= FLOW_TCP_REUSED; + FLOWLOCK_UNLOCK(old_f); + FlowDeReference(&old_f); // < can't disappear while usecnt >0 + + SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt); +} + +/** \brief Handle packets that reference the wrong flow because of TCP reuse + * + * In the case of TCP reuse we can have many packets that were assigned + * a flow by the capture/decode threads before the stream engine decided + * that a new flow was needed for these packets. + * When HandleFlowReuse creates a new flow, the packets already processed + * by the flow engine will still reference the old flow. + * + * This function detects this case and replaces the flow for those packets. + * It's a fairly expensive operation, but it should be rare as it's only + * done for packets that were already in the engine when the TCP reuse + * case was handled. New packets are assigned the correct flow by the + * flow engine. + */ +static void TcpSessionReuseHandleApplyToPacket(Packet *p) +{ + int need_flow_replace = 0; + + FLOWLOCK_WRLOCK(p->flow); + if (p->flow->flags & FLOW_TCP_REUSED) { + SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt); + need_flow_replace = 1; + } + + if (likely(need_flow_replace == 0)) { + /* Work around a race condition: if HandleFlowReuse has inserted a new flow, + * it will not have seen both sides of the session yet. The packet we have here + * may be the first that got the flow directly from the hash right after the + * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */ + if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) { + p->flowflags |= FLOW_PKT_ESTABLISHED; + SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt); + } else { + SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt); + } + SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow); + FLOWLOCK_UNLOCK(p->flow); + return; + } + + /* disconnect packet from old flow */ + FlowHandlePacketUpdateRemove(p->flow, p); + FLOWLOCK_UNLOCK(p->flow); + FlowDeReference(&p->flow); // < can't disappear while usecnt >0 + + /* find the new flow that does belong to this packet */ + Flow *new_f = FlowLookupFlowFromHash(p); + if (new_f == NULL) { + // TODO reset packet flag wrt flow: direction, HAS_FLOW etc + p->flags &= ~PKT_HAS_FLOW; + return; + } + FlowHandlePacketUpdate(new_f, p); + BUG_ON(new_f != p->flow); + FLOWLOCK_UNLOCK(new_f); + SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow); +} + TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { StreamTcpThread *stt = (StreamTcpThread *)data; @@ -4709,6 +4914,20 @@ TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe p->flags |= PKT_IGNORE_CHECKSUM; } +// TODO autofp only somehow + /* "autofp" handling of TCP session/flow reuse */ + if (!(p->flags & PKT_PSEUDO_STREAM_END)) { + /* apply previous reuses to this packet */ + TcpSessionReuseHandleApplyToPacket(p); + if (p->flow == NULL) + return ret; + + /* after that, check for 'new' reuse */ + TcpSessionReuseHandle(p); + if (p->flow == NULL) + return ret; + } + AppLayerProfilingReset(stt->ra_ctx->app_tctx); FLOWLOCK_WRLOCK(p->flow);