return f;
}
+Flow *FlowGetFlowFromHashByPacket(const Packet *p)
+{
+ Flow *f = NULL;
+
+ /* get the key to our bucket */
+ uint32_t key = FlowGetKey(p);
+ /* get our hash bucket and lock it */
+ FlowBucket *fb = &flow_hash[key];
+ FBLOCK_LOCK(fb);
+
+ SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+ f = FlowGetNew(NULL, NULL, p);
+ if (f != NULL) {
+ /* flow is locked */
+ if (fb->head == NULL) {
+ fb->head = f;
+ fb->tail = f;
+ } else {
+ f->hprev = fb->tail;
+ fb->tail->hnext = f;
+ fb->tail = f;
+ }
+
+ /* got one, now lock, initialize and return */
+ FlowInit(f, p);
+ f->fb = fb;
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ }
+ FBLOCK_UNLOCK(fb);
+ return f;
+}
+
+/** \brief Lookup flow based on packet
+ *
+ * Find the flow belonging to this packet. If not found, no new flow
+ * is set up.
+ *
+ * \param p packet to lookup the flow for
+ *
+ * \retval f flow or NULL if not found
+ */
+Flow *FlowLookupFlowFromHash(const Packet *p)
+{
+ Flow *f = NULL;
+
+ /* get the key to our bucket */
+ uint32_t key = FlowGetKey(p);
+ /* get our hash bucket and lock it */
+ FlowBucket *fb = &flow_hash[key];
+ FBLOCK_LOCK(fb);
+
+ SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+ /* see if the bucket already has a flow */
+ if (fb->head == NULL) {
+ FBLOCK_UNLOCK(fb);
+ return NULL;
+ }
+
+ /* ok, we have a flow in the bucket. Let's find out if it is our flow */
+ f = fb->head;
+
+ /* see if this is the flow we are looking for */
+ if (FlowCompare(f, p) == 0) {
+ while (f) {
+ FlowHashCountIncr;
+
+ f = f->hnext;
+
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ return NULL;
+ }
+
+ if (FlowCompare(f, p) != 0) {
+ /* we found our flow, lets put it on top of the
+ * hash list -- this rewards active flows */
+ if (f->hnext) {
+ f->hnext->hprev = f->hprev;
+ }
+ if (f->hprev) {
+ f->hprev->hnext = f->hnext;
+ }
+ if (f == fb->tail) {
+ fb->tail = f->hprev;
+ }
+
+ f->hnext = fb->head;
+ f->hprev = NULL;
+ fb->head->hprev = f;
+ fb->head = f;
+
+ /* found our flow, lock & return */
+ FLOWLOCK_WRLOCK(f);
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ return f;
+ }
+ }
+ }
+
+ /* lock & return */
+ FLOWLOCK_WRLOCK(f);
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ return f;
+}
+
/** \brief Get Flow for packet
*
* Hash retrieval function for flows. Looks up the hash bucket containing the
return ret;
}
+/** \internal
+ * \brief check if a packet is a valid stream started
+ * \retval bool true/false */
+static int TcpSessionPacketIsStreamStarter(const Packet *p)
+{
+ uint8_t flag = TH_SYN;
+
+ //SCLogInfo("Want: %02x, have %02x", flag, p->tcph->th_flags);
+
+ if (p->tcph->th_flags == flag) {
+ SCLogDebug("packet %"PRIu64" is a stream starter: %02x", p->pcap_cnt, p->tcph->th_flags);
+ return 1;
+ }
+ return 0;
+}
+
+/** \internal
+ * \brief Check if Flow and TCP SSN allow this flow/tuple to be reused
+ * \retval bool true yes reuse, false no keep tracking old ssn */
+int TcpSessionReuseDoneEnough(Packet *p, const TcpSession *ssn)
+{
+ if (FlowGetPacketDirection(p->flow, p) == TOSERVER) {
+ if (ssn == NULL) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p null. No reuse.", p->pcap_cnt, ssn);
+ return 0;
+ }
+ if (SEQ_EQ(ssn->client.isn, TCP_GET_SEQ(p))) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p. Packet SEQ == Stream ISN. Retransmission. Don't reuse.", p->pcap_cnt, ssn);
+ return 0;
+ }
+ if (ssn->state >= TCP_LAST_ACK) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+ return 1;
+ }
+ if (ssn->state == TCP_NONE) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+ return 1;
+ }
+ if (ssn->state < TCP_LAST_ACK) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state);
+ return 0;
+ }
+
+ } else {
+ if (ssn == NULL) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p null. Reuse.", p->pcap_cnt, ssn);
+ return 1;
+ }
+ if (ssn->state >= TCP_LAST_ACK) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p state >= TCP_LAST_ACK (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+ return 1;
+ }
+ if (ssn->state == TCP_NONE) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p state == TCP_NONE (%u). Reuse.", p->pcap_cnt, ssn, ssn->state);
+ return 1;
+ }
+ if (ssn->state < TCP_LAST_ACK) {
+ SCLogDebug("steam starter packet %"PRIu64", ssn %p state < TCP_LAST_ACK (%u). Don't reuse.", p->pcap_cnt, ssn, ssn->state);
+ return 0;
+ }
+ }
+
+
+ SCLogDebug("default: how did we get here?");
+ return 0;
+}
+
+/** \brief Handle TCP reuse of tuple
+ *
+ * Logic:
+ * 1. see if packet could trigger a new session
+ * 2. see if the flow/ssn is in a state where we want to support the reuse
+ * 3. disconnect packet from the old flow
+ * -> at this point new packets can still find the old flow
+ * -> as the flow's reference count != 0, it can't disappear
+ * 4. setup a new flow unconditionally
+ * 5. attach packet to new flow
+ * 6. tag old flow as FLOW_TCP_REUSED
+ * -> NEW packets won't find it
+ * -> existing packets in our queues may still reference it
+ * 7. dereference the old flow (reference cnt *may* now be 0,
+ * if no other packets reference it)
+ *
+ * The packets that still hold a reference to the old flow are updated
+ * by HandleFlowReuseApplyToPacket()
+ */
+static void TcpSessionReuseHandle(Packet *p) {
+ if (likely(TcpSessionPacketIsStreamStarter(p) == 0))
+ return;
+
+ int reuse = 0;
+ FLOWLOCK_RDLOCK(p->flow);
+ reuse = TcpSessionReuseDoneEnough(p, p->flow->protoctx);
+ if (!reuse) {
+ SCLogDebug("steam starter packet %"PRIu64", but state not "
+ "ready to be reused", p->pcap_cnt);
+ FLOWLOCK_UNLOCK(p->flow);
+ return;
+ }
+
+ /* ok, this packet needs a new flow */
+
+ /* first, get a reference to the old flow */
+ Flow *old_f = NULL;
+ FlowReference(&old_f, p->flow);
+
+ /* get some settings that we move over to the new flow */
+ FlowThreadId thread_id = old_f->thread_id;
+ int autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
+
+ /* disconnect the packet from the old flow */
+ FlowHandlePacketUpdateRemove(p->flow, p);
+ FLOWLOCK_UNLOCK(p->flow);
+ FlowDeReference(&p->flow); // < can't disappear while usecnt >0
+
+ /* Can't tag flow as reused yet, would be a race condition:
+ * new packets will not get old flow because of FLOW_TCP_REUSED,
+ * so new flow may be created. This new flow could be handled in
+ * a different thread. */
+
+ /* Get a flow. It will be either a locked flow or NULL */
+ Flow *new_f = FlowGetFlowFromHashByPacket(p);
+ if (new_f == NULL) {
+ FlowDeReference(&old_f); // < can't disappear while usecnt >0
+ return;
+ }
+
+ /* update flow and packet */
+ FlowHandlePacketUpdate(new_f, p);
+ BUG_ON(new_f != p->flow);
+
+ /* copy flow balancing settings */
+ new_f->thread_id = thread_id;
+ SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
+
+ FLOWLOCK_UNLOCK(new_f);
+
+ /* tag original flow that it's now unused */
+ FLOWLOCK_WRLOCK(old_f);
+ SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt);
+ old_f->flags |= FLOW_TCP_REUSED;
+ FLOWLOCK_UNLOCK(old_f);
+ FlowDeReference(&old_f); // < can't disappear while usecnt >0
+
+ SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt);
+}
+
+/** \brief Handle packets that reference the wrong flow because of TCP reuse
+ *
+ * In the case of TCP reuse we can have many packets that were assigned
+ * a flow by the capture/decode threads before the stream engine decided
+ * that a new flow was needed for these packets.
+ * When HandleFlowReuse creates a new flow, the packets already processed
+ * by the flow engine will still reference the old flow.
+ *
+ * This function detects this case and replaces the flow for those packets.
+ * It's a fairly expensive operation, but it should be rare as it's only
+ * done for packets that were already in the engine when the TCP reuse
+ * case was handled. New packets are assigned the correct flow by the
+ * flow engine.
+ */
+static void TcpSessionReuseHandleApplyToPacket(Packet *p)
+{
+ int need_flow_replace = 0;
+
+ FLOWLOCK_WRLOCK(p->flow);
+ if (p->flow->flags & FLOW_TCP_REUSED) {
+ SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt);
+ need_flow_replace = 1;
+ }
+
+ if (likely(need_flow_replace == 0)) {
+ /* Work around a race condition: if HandleFlowReuse has inserted a new flow,
+ * it will not have seen both sides of the session yet. The packet we have here
+ * may be the first that got the flow directly from the hash right after the
+ * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */
+ if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) {
+ p->flowflags |= FLOW_PKT_ESTABLISHED;
+ SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
+ } else {
+ SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
+ }
+ SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow);
+ FLOWLOCK_UNLOCK(p->flow);
+ return;
+ }
+
+ /* disconnect packet from old flow */
+ FlowHandlePacketUpdateRemove(p->flow, p);
+ FLOWLOCK_UNLOCK(p->flow);
+ FlowDeReference(&p->flow); // < can't disappear while usecnt >0
+
+ /* find the new flow that does belong to this packet */
+ Flow *new_f = FlowLookupFlowFromHash(p);
+ if (new_f == NULL) {
+ // TODO reset packet flag wrt flow: direction, HAS_FLOW etc
+ p->flags &= ~PKT_HAS_FLOW;
+ return;
+ }
+ FlowHandlePacketUpdate(new_f, p);
+ BUG_ON(new_f != p->flow);
+ FLOWLOCK_UNLOCK(new_f);
+ SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow);
+}
+
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
StreamTcpThread *stt = (StreamTcpThread *)data;
p->flags |= PKT_IGNORE_CHECKSUM;
}
+// TODO autofp only somehow
+ /* "autofp" handling of TCP session/flow reuse */
+ if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
+ /* apply previous reuses to this packet */
+ TcpSessionReuseHandleApplyToPacket(p);
+ if (p->flow == NULL)
+ return ret;
+
+ /* after that, check for 'new' reuse */
+ TcpSessionReuseHandle(p);
+ if (p->flow == NULL)
+ return ret;
+ }
+
AppLayerProfilingReset(stt->ra_ctx->app_tctx);
FLOWLOCK_WRLOCK(p->flow);