A flow has 3 states: NEW, ESTABLISHED and CLOSED.
For all protocols except TCP, a flow is in state NEW as long as just one
side of the conversation has been seen. When both sides have been
observed the state is moved to ESTABLISHED.
TCP has a different logic, controlled by the stream engine. Here the TCP
state is leading.
Until now, when parts of the engine needed to know the flow state, it
would invoke a per protocol callback 'GetProtoState'. For TCP this would
return the state based on the TcpSession.
This patch changes this logic. It introduces an atomic variable in the
flow 'flow_state'. It defaults to NEW and is set to ESTABLISHED for non-
TCP protocols when we've seen both sides of the conversation.
For TCP, the state is updated from the TCP engine directly.
The goal is to allow for access to the state without holding the Flow's
main mutex lock. This will later allow the Flow Manager(s) to evaluate
the Flow w/o interupting it.
f->fb = NULL;
FBLOCK_UNLOCK(fb);
- int state = FlowGetFlowState(f);
+ int state = SC_ATOMIC_GET(f->flow_state);
if (state == FLOW_STATE_NEW)
f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW;
else if (state == FLOW_STATE_ESTABLISHED)
Flow *next_flow = f->hprev;
- int state = FlowGetFlowState(f);
+ int state = SC_ATOMIC_GET(f->flow_state);
/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) {
Flow *next_flow = f->hprev;
- int state = FlowGetFlowState(f);
+ int state = SC_ATOMIC_GET(f->flow_state);
/* remove from the hash */
if (f->hprev != NULL)
f.proto = IPPROTO_TCP;
- int state = FlowGetFlowState(&f);
+ int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
f.fb = &fb;
f.proto = IPPROTO_TCP;
- int state = FlowGetFlowState(&f);
+ int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
- int state = FlowGetFlowState(&f);
+ int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
- int state = FlowGetFlowState(&f);
+ int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
SCMutex flowbits_mutex;
#endif /* FLOWBITS_STATS */
-/** \internal
- * \brief Get the flow's state
- *
- * \param f flow
- *
- * \retval state either FLOW_STATE_NEW, FLOW_STATE_ESTABLISHED or FLOW_STATE_CLOSED
- */
-static inline int FlowGetFlowState(Flow *f)
-{
- if (flow_proto[f->protomap].GetProtoState != NULL) {
- return flow_proto[f->protomap].GetProtoState(f->protoctx);
- } else {
- if ((f->flags & FLOW_TO_SRC_SEEN) && (f->flags & FLOW_TO_DST_SEEN))
- return FLOW_STATE_ESTABLISHED;
- else
- return FLOW_STATE_NEW;
- }
-}
-
#endif /* __FLOW_PRIVATE_H__ */
(f)->sp = 0; \
(f)->dp = 0; \
(f)->proto = 0; \
+ SC_ATOMIC_INIT((f)->flow_state); \
SC_ATOMIC_INIT((f)->use_cnt); \
(f)->probing_parser_toserver_alproto_masks = 0; \
(f)->probing_parser_toclient_alproto_masks = 0; \
(f)->sp = 0; \
(f)->dp = 0; \
(f)->proto = 0; \
+ SC_ATOMIC_RESET((f)->flow_state); \
SC_ATOMIC_RESET((f)->use_cnt); \
(f)->probing_parser_toserver_alproto_masks = 0; \
(f)->probing_parser_toclient_alproto_masks = 0; \
#define FLOW_DESTROY(f) do { \
FlowCleanupAppLayer((f)); \
+ SC_ATOMIC_DESTROY((f)->flow_state); \
SC_ATOMIC_DESTROY((f)->use_cnt); \
\
FLOWLOCK_DESTROY((f)); \
int FlowSetProtoTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *));
-int FlowSetFlowStateFunc(uint8_t , int (*GetProtoState)(void *));
/* Run mode selected at suricata.c */
extern int run_mode;
if ((f->flags & FLOW_TO_DST_SEEN) && (f->flags & FLOW_TO_SRC_SEEN)) {
SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
p->flowflags |= FLOW_PKT_ESTABLISHED;
+
+ if (f->proto != IPPROTO_TCP) {
+ SC_ATOMIC_SET(f->flow_state, FLOW_STATE_ESTABLISHED);
+ }
}
/*set the detection bypass flags*/
flow_proto[FLOW_PROTO_DEFAULT].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_DEFAULT].Freefunc = NULL;
- flow_proto[FLOW_PROTO_DEFAULT].GetProtoState = NULL;
/*TCP*/
flow_proto[FLOW_PROTO_TCP].new_timeout = FLOW_IPPROTO_TCP_NEW_TIMEOUT;
flow_proto[FLOW_PROTO_TCP].est_timeout = FLOW_IPPROTO_TCP_EST_TIMEOUT;
flow_proto[FLOW_PROTO_TCP].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_TCP].Freefunc = NULL;
- flow_proto[FLOW_PROTO_TCP].GetProtoState = NULL;
/*UDP*/
flow_proto[FLOW_PROTO_UDP].new_timeout = FLOW_IPPROTO_UDP_NEW_TIMEOUT;
flow_proto[FLOW_PROTO_UDP].est_timeout = FLOW_IPPROTO_UDP_EST_TIMEOUT;
flow_proto[FLOW_PROTO_UDP].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_UDP].Freefunc = NULL;
- flow_proto[FLOW_PROTO_UDP].GetProtoState = NULL;
/*ICMP*/
flow_proto[FLOW_PROTO_ICMP].new_timeout = FLOW_IPPROTO_ICMP_NEW_TIMEOUT;
flow_proto[FLOW_PROTO_ICMP].est_timeout = FLOW_IPPROTO_ICMP_EST_TIMEOUT;
flow_proto[FLOW_PROTO_ICMP].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_ICMP].Freefunc = NULL;
- flow_proto[FLOW_PROTO_ICMP].GetProtoState = NULL;
/* Let's see if we have custom timeouts defined from config */
const char *new = NULL;
return 1;
}
-/**
- * \brief Function to set the function to get protocol specific flow state.
- *
- * \param proto protocol of which function is needed to be set.
- * \param GetFlowState Function pointer which will be called to get state.
- */
-
-int FlowSetFlowStateFunc (uint8_t proto, int (*GetProtoState)(void *))
-{
- uint8_t proto_map;
- proto_map = FlowGetProtoMapping(proto);
-
- flow_proto[proto_map].GetProtoState = GetProtoState;
- return 1;
-}
-
/**
* \brief Function to set the timeout values for the specified protocol.
*
typedef unsigned short FlowRefCount;
#endif
+#ifdef __tile__
+/* Atomic Ints performance better on Tile. */
+typedef unsigned int FlowStateType;
+#else
+typedef unsigned short FlowStateType;
+#endif
+
/** Local Thread ID */
typedef uint16_t FlowThreadId;
/* end of flow "header" */
+ SC_ATOMIC_DECLARE(FlowStateType, flow_state);
+
/** how many pkts and stream msgs are using the flow *right now*. This
* variable is atomic so not protected by the Flow mutex "m".
*
uint32_t emerg_est_timeout;
uint32_t emerg_closed_timeout;
void (*Freefunc)(void *);
- int (*GetProtoState)(void *);
} FlowProto;
void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
int FlowSetProtoTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoFreeFunc (uint8_t , void (*Free)(void *));
-int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *));
void FlowUpdateQueue(Flow *);
struct FlowQueue_;
/* set the default free function and flow state function
* values. */
FlowSetProtoFreeFunc(IPPROTO_TCP, StreamTcpSessionClear);
- FlowSetFlowStateFunc(IPPROTO_TCP, StreamTcpGetFlowState);
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
return;
ssn->state = state;
+
+ /* update the flow state */
+ switch(ssn->state) {
+ case TCP_ESTABLISHED:
+ case TCP_FIN_WAIT1:
+ case TCP_FIN_WAIT2:
+ case TCP_CLOSING:
+ case TCP_CLOSE_WAIT:
+ SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_ESTABLISHED);
+ break;
+ case TCP_LAST_ACK:
+ case TCP_TIME_WAIT:
+ case TCP_CLOSED:
+ SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_CLOSED);
+ break;
+ }
}
/**
return 0;
}
-/**
- * \brief Function to return the FLOW state depending upon the TCP session state.
- *
- * \param s TCP session of which the state has to be returned
- * \retval state The FLOW_STATE_ depends upon the TCP sesison state, default is
- * FLOW_STATE_CLOSED
- */
-
-int StreamTcpGetFlowState(void *s)
-{
- SCEnter();
-
- TcpSession *ssn = (TcpSession *)s;
- if (unlikely(ssn == NULL)) {
- SCReturnInt(FLOW_STATE_CLOSED);
- }
-
- /* sorted most likely to least likely */
- switch(ssn->state) {
- case TCP_ESTABLISHED:
- case TCP_FIN_WAIT1:
- case TCP_FIN_WAIT2:
- case TCP_CLOSING:
- case TCP_CLOSE_WAIT:
- SCReturnInt(FLOW_STATE_ESTABLISHED);
- case TCP_NONE:
- case TCP_SYN_SENT:
- case TCP_SYN_RECV:
- case TCP_LISTEN:
- SCReturnInt(FLOW_STATE_NEW);
- case TCP_LAST_ACK:
- case TCP_TIME_WAIT:
- case TCP_CLOSED:
- SCReturnInt(FLOW_STATE_CLOSED);
- }
-
- SCReturnInt(FLOW_STATE_CLOSED);
-}
-
/**
* \brief Function to check the validity of the received timestamp based on
* the target OS of the given stream.