]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: change flow state logic
authorVictor Julien <victor@inliniac.net>
Sat, 24 Jan 2015 12:18:51 +0000 (13:18 +0100)
committerVictor Julien <victor@inliniac.net>
Wed, 4 Feb 2015 10:28:11 +0000 (11:28 +0100)
A flow has 3 states: NEW, ESTABLISHED and CLOSED.

For all protocols except TCP, a flow is in state NEW as long as just one
side of the conversation has been seen. When both sides have been
observed the state is moved to ESTABLISHED.

TCP has a different logic, controlled by the stream engine. Here the TCP
state is leading.

Until now, when parts of the engine needed to know the flow state, it
would invoke a per protocol callback 'GetProtoState'. For TCP this would
return the state based on the TcpSession.

This patch changes this logic. It introduces an atomic variable in the
flow 'flow_state'. It defaults to NEW and is set to ESTABLISHED for non-
TCP protocols when we've seen both sides of the conversation.

For TCP, the state is updated from the TCP engine directly.

The goal is to allow for access to the state without holding the Flow's
main mutex lock. This will later allow the Flow Manager(s) to evaluate
the Flow w/o interupting it.

src/flow-hash.c
src/flow-manager.c
src/flow-private.h
src/flow-util.h
src/flow.c
src/flow.h
src/stream-tcp.c

index 698b5bd7078ee079ff1e6ce82a4cb5c1c4918f79..03e1df6b701d361ebec1d6a63ed6a73d9663c6f5 100644 (file)
@@ -666,7 +666,7 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
         f->fb = NULL;
         FBLOCK_UNLOCK(fb);
 
-        int state = FlowGetFlowState(f);
+        int state = SC_ATOMIC_GET(f->flow_state);
         if (state == FLOW_STATE_NEW)
             f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW;
         else if (state == FLOW_STATE_ESTABLISHED)
index 5aa975f7c4882d2db4fa7184350115de37ccf784..72d6ce535f080d05ab1c58b846c57acb487da67d 100644 (file)
@@ -280,7 +280,7 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
 
         Flow *next_flow = f->hprev;
 
-        int state = FlowGetFlowState(f);
+        int state = SC_ATOMIC_GET(f->flow_state);
 
         /* timeout logic goes here */
         if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) {
@@ -413,7 +413,7 @@ static uint32_t FlowManagerHashRowCleanup(Flow *f)
 
         Flow *next_flow = f->hprev;
 
-        int state = FlowGetFlowState(f);
+        int state = SC_ATOMIC_GET(f->flow_state);
 
         /* remove from the hash */
         if (f->hprev != NULL)
@@ -1034,7 +1034,7 @@ static int FlowMgrTest01 (void)
 
     f.proto = IPPROTO_TCP;
 
-    int state = FlowGetFlowState(&f);
+    int state = SC_ATOMIC_GET(f.flow_state);
     if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
@@ -1093,7 +1093,7 @@ static int FlowMgrTest02 (void)
     f.fb = &fb;
     f.proto = IPPROTO_TCP;
 
-    int state = FlowGetFlowState(&f);
+    int state = SC_ATOMIC_GET(f.flow_state);
     if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
@@ -1140,7 +1140,7 @@ static int FlowMgrTest03 (void)
     f.proto = IPPROTO_TCP;
     f.flags |= FLOW_EMERGENCY;
 
-    int state = FlowGetFlowState(&f);
+    int state = SC_ATOMIC_GET(f.flow_state);
     if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
@@ -1200,7 +1200,7 @@ static int FlowMgrTest04 (void)
     f.proto = IPPROTO_TCP;
     f.flags |= FLOW_EMERGENCY;
 
-    int state = FlowGetFlowState(&f);
+    int state = SC_ATOMIC_GET(f.flow_state);
     if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
index 2c7b8f6dbdd7c319032fd9618bea866fe3263264..bd25960b2bf0c755e5d01c6ee9739da2a259655a 100644 (file)
@@ -96,24 +96,5 @@ uint32_t flowbits_removed;
 SCMutex flowbits_mutex;
 #endif /* FLOWBITS_STATS */
 
-/** \internal
- *  \brief Get the flow's state
- *
- *  \param f flow
- *
- *  \retval state either FLOW_STATE_NEW, FLOW_STATE_ESTABLISHED or FLOW_STATE_CLOSED
- */
-static inline int FlowGetFlowState(Flow *f)
-{
-    if (flow_proto[f->protomap].GetProtoState != NULL) {
-        return flow_proto[f->protomap].GetProtoState(f->protoctx);
-    } else {
-        if ((f->flags & FLOW_TO_SRC_SEEN) && (f->flags & FLOW_TO_DST_SEEN))
-            return FLOW_STATE_ESTABLISHED;
-        else
-            return FLOW_STATE_NEW;
-    }
-}
-
 #endif /* __FLOW_PRIVATE_H__ */
 
index 8ed6fd88040f856d1d1af374d0847d6b0fb26058..152b409836c80371be950d10225808620ca30b1e 100644 (file)
@@ -40,6 +40,7 @@
         (f)->sp = 0; \
         (f)->dp = 0; \
         (f)->proto = 0; \
+        SC_ATOMIC_INIT((f)->flow_state); \
         SC_ATOMIC_INIT((f)->use_cnt); \
         (f)->probing_parser_toserver_alproto_masks = 0; \
         (f)->probing_parser_toclient_alproto_masks = 0; \
@@ -82,6 +83,7 @@
         (f)->sp = 0; \
         (f)->dp = 0; \
         (f)->proto = 0; \
+        SC_ATOMIC_RESET((f)->flow_state); \
         SC_ATOMIC_RESET((f)->use_cnt); \
         (f)->probing_parser_toserver_alproto_masks = 0; \
         (f)->probing_parser_toclient_alproto_masks = 0; \
 
 #define FLOW_DESTROY(f) do { \
         FlowCleanupAppLayer((f)); \
+        SC_ATOMIC_DESTROY((f)->flow_state); \
         SC_ATOMIC_DESTROY((f)->use_cnt); \
         \
         FLOWLOCK_DESTROY((f)); \
index 516e684e946f1400a16d0d1751dea0980e49fc8b..2546e5289e8473daf4339c36dbf2df519d0f9c71 100644 (file)
@@ -87,7 +87,6 @@ void FlowInitFlowProto();
 int FlowSetProtoTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
 int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
 int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *));
-int FlowSetFlowStateFunc(uint8_t , int (*GetProtoState)(void *));
 
 /* Run mode selected at suricata.c */
 extern int run_mode;
@@ -270,6 +269,10 @@ void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
     if ((f->flags & FLOW_TO_DST_SEEN) && (f->flags & FLOW_TO_SRC_SEEN)) {
         SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
         p->flowflags |= FLOW_PKT_ESTABLISHED;
+
+        if (f->proto != IPPROTO_TCP) {
+            SC_ATOMIC_SET(f->flow_state, FLOW_STATE_ESTABLISHED);
+        }
     }
 
     /*set the detection bypass flags*/
@@ -497,7 +500,6 @@ void FlowInitFlowProto(void)
     flow_proto[FLOW_PROTO_DEFAULT].emerg_closed_timeout =
         FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
     flow_proto[FLOW_PROTO_DEFAULT].Freefunc = NULL;
-    flow_proto[FLOW_PROTO_DEFAULT].GetProtoState = NULL;
     /*TCP*/
     flow_proto[FLOW_PROTO_TCP].new_timeout = FLOW_IPPROTO_TCP_NEW_TIMEOUT;
     flow_proto[FLOW_PROTO_TCP].est_timeout = FLOW_IPPROTO_TCP_EST_TIMEOUT;
@@ -509,7 +511,6 @@ void FlowInitFlowProto(void)
     flow_proto[FLOW_PROTO_TCP].emerg_closed_timeout =
         FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
     flow_proto[FLOW_PROTO_TCP].Freefunc = NULL;
-    flow_proto[FLOW_PROTO_TCP].GetProtoState = NULL;
     /*UDP*/
     flow_proto[FLOW_PROTO_UDP].new_timeout = FLOW_IPPROTO_UDP_NEW_TIMEOUT;
     flow_proto[FLOW_PROTO_UDP].est_timeout = FLOW_IPPROTO_UDP_EST_TIMEOUT;
@@ -521,7 +522,6 @@ void FlowInitFlowProto(void)
     flow_proto[FLOW_PROTO_UDP].emerg_closed_timeout =
         FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
     flow_proto[FLOW_PROTO_UDP].Freefunc = NULL;
-    flow_proto[FLOW_PROTO_UDP].GetProtoState = NULL;
     /*ICMP*/
     flow_proto[FLOW_PROTO_ICMP].new_timeout = FLOW_IPPROTO_ICMP_NEW_TIMEOUT;
     flow_proto[FLOW_PROTO_ICMP].est_timeout = FLOW_IPPROTO_ICMP_EST_TIMEOUT;
@@ -533,7 +533,6 @@ void FlowInitFlowProto(void)
     flow_proto[FLOW_PROTO_ICMP].emerg_closed_timeout =
         FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
     flow_proto[FLOW_PROTO_ICMP].Freefunc = NULL;
-    flow_proto[FLOW_PROTO_ICMP].GetProtoState = NULL;
 
     /* Let's see if we have custom timeouts defined from config */
     const char *new = NULL;
@@ -764,22 +763,6 @@ int FlowSetProtoFreeFunc (uint8_t proto, void (*Free)(void *))
     return 1;
 }
 
-/**
- *  \brief  Function to set the function to get protocol specific flow state.
- *
- *  \param   proto            protocol of which function is needed to be set.
- *  \param   GetFlowState     Function pointer which will be called to get state.
- */
-
-int FlowSetFlowStateFunc (uint8_t proto, int (*GetProtoState)(void *))
-{
-    uint8_t proto_map;
-    proto_map = FlowGetProtoMapping(proto);
-
-    flow_proto[proto_map].GetProtoState = GetProtoState;
-    return 1;
-}
-
 /**
  *  \brief   Function to set the timeout values for the specified protocol.
  *
index 12cda835aa9ce5867e75c8666d27099a2397d5ec..6f95401aa048a49e9c64ac3b6843f6b9cad16f91 100644 (file)
@@ -272,6 +272,13 @@ typedef unsigned int FlowRefCount;
 typedef unsigned short FlowRefCount;
 #endif
 
+#ifdef __tile__
+/* Atomic Ints performance better on Tile. */
+typedef unsigned int FlowStateType;
+#else
+typedef unsigned short FlowStateType;
+#endif
+
 /** Local Thread ID */
 typedef uint16_t FlowThreadId;
 
@@ -312,6 +319,8 @@ typedef struct Flow_
 
     /* end of flow "header" */
 
+    SC_ATOMIC_DECLARE(FlowStateType, flow_state);
+
     /** how many pkts and stream msgs are using the flow *right now*. This
      *  variable is atomic so not protected by the Flow mutex "m".
      *
@@ -414,7 +423,6 @@ typedef struct FlowProto_ {
     uint32_t emerg_est_timeout;
     uint32_t emerg_closed_timeout;
     void (*Freefunc)(void *);
-    int (*GetProtoState)(void *);
 } FlowProto;
 
 void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
@@ -428,7 +436,6 @@ void FlowRegisterTests (void);
 int FlowSetProtoTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
 int FlowSetProtoEmergencyTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
 int FlowSetProtoFreeFunc (uint8_t , void (*Free)(void *));
-int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *));
 void FlowUpdateQueue(Flow *);
 
 struct FlowQueue_;
index 2d075797f8e2cee9446869b99ffa2c306bcd42c5..dcad82c9ee8d830501faf6a34ea72d6d7f373870 100644 (file)
@@ -596,7 +596,6 @@ void StreamTcpInitConfig(char quiet)
     /* set the default free function and flow state function
      * values. */
     FlowSetProtoFreeFunc(IPPROTO_TCP, StreamTcpSessionClear);
-    FlowSetFlowStateFunc(IPPROTO_TCP, StreamTcpGetFlowState);
 
 #ifdef UNITTESTS
     if (RunmodeIsUnittests()) {
@@ -679,6 +678,22 @@ static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
         return;
 
     ssn->state = state;
+
+    /* update the flow state */
+    switch(ssn->state) {
+        case TCP_ESTABLISHED:
+        case TCP_FIN_WAIT1:
+        case TCP_FIN_WAIT2:
+        case TCP_CLOSING:
+        case TCP_CLOSE_WAIT:
+            SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_ESTABLISHED);
+            break;
+        case TCP_LAST_ACK:
+        case TCP_TIME_WAIT:
+        case TCP_CLOSED:
+            SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_CLOSED);
+            break;
+    }
 }
 
 /**
@@ -5038,45 +5053,6 @@ static int StreamTcpValidateRst(TcpSession *ssn, Packet *p)
     return 0;
 }
 
-/**
- *  \brief  Function to return the FLOW state depending upon the TCP session state.
- *
- *  \param   s      TCP session of which the state has to be returned
- *  \retval  state  The FLOW_STATE_ depends upon the TCP sesison state, default is
- *                  FLOW_STATE_CLOSED
- */
-
-int StreamTcpGetFlowState(void *s)
-{
-    SCEnter();
-
-    TcpSession *ssn = (TcpSession *)s;
-    if (unlikely(ssn == NULL)) {
-        SCReturnInt(FLOW_STATE_CLOSED);
-    }
-
-    /* sorted most likely to least likely */
-    switch(ssn->state) {
-        case TCP_ESTABLISHED:
-        case TCP_FIN_WAIT1:
-        case TCP_FIN_WAIT2:
-        case TCP_CLOSING:
-        case TCP_CLOSE_WAIT:
-            SCReturnInt(FLOW_STATE_ESTABLISHED);
-        case TCP_NONE:
-        case TCP_SYN_SENT:
-        case TCP_SYN_RECV:
-        case TCP_LISTEN:
-            SCReturnInt(FLOW_STATE_NEW);
-        case TCP_LAST_ACK:
-        case TCP_TIME_WAIT:
-        case TCP_CLOSED:
-            SCReturnInt(FLOW_STATE_CLOSED);
-    }
-
-    SCReturnInt(FLOW_STATE_CLOSED);
-}
-
 /**
  *  \brief Function to check the validity of the received timestamp based on
  *         the target OS of the given stream.