From: Victor Julien Date: Fri, 2 Mar 2012 07:39:09 +0000 (+0100) Subject: flow: Refactor how FlowPrune deals with forced timeouts, improving locking logic. X-Git-Tag: suricata-1.3beta1~141 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bfb3f1b7cf1303d11e9fd71bfe8ba4591d178382;p=thirdparty%2Fsuricata.git flow: Refactor how FlowPrune deals with forced timeouts, improving locking logic. --- diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 74a84aa67f..00b0f31f06 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -247,24 +247,19 @@ static inline Packet *FlowForceReassemblyPseudoPacketGet(int direction, return FlowForceReassemblyPseudoPacketSetup(p, direction, f, ssn, dummy); } - /** - * \internal - * \brief Forces reassembly for flow if it needs it. + * \brief Check if a flow needs forced reassembly * - * The function requires flow to be locked beforehand. + * \param f *LOCKED* flow + * \param server ptr to int that should be set to 1 or 2 if we return 1 + * \param client ptr to int that should be set to 1 or 2 if we return 1 * - * \param f Pointer to the flow. - * - * \retval 0 This flow doesn't need any reassembly processing; 1 otherwise. + * \retval 0 no + * \retval 1 yes */ -int FlowForceReassemblyForFlowV2(Flow *f) -{ +int FlowForceReassemblyNeedReassmbly(Flow *f, int *server, int *client) { TcpSession *ssn; - int client_ok = 0; - int server_ok = 0; - /* looks like we have no flows in this queue */ if (f == NULL || f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) { return 0; @@ -272,32 +267,57 @@ int FlowForceReassemblyForFlowV2(Flow *f) /* Get the tcp session for the flow */ ssn = (TcpSession *)f->protoctx; - /* \todo Also skip flows that shouldn't be inspected */ if (ssn == NULL) { return 0; } - client_ok = StreamHasUnprocessedSegments(ssn, 0); - server_ok = StreamHasUnprocessedSegments(ssn, 1); + *client = StreamHasUnprocessedSegments(ssn, 0); + *server = StreamHasUnprocessedSegments(ssn, 1); /* if state is not fully closed we assume that we haven't fully * inspected the app layer state yet */ if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED) { - if (client_ok != 1) - client_ok = 2; - if (server_ok != 1) - server_ok = 2; + if (*client != 1) + *client = 2; + if (*server != 1) + *server = 2; } /* nothing to do */ - if (client_ok == 0 && server_ok == 0) { + if (*client == 0 && *server == 0) { return 0; } - /* move this unlock after the stream reassemble call */ - SCSpinUnlock(&f->fb->s); + return 1; +} +/** + * \internal + * \brief Forces reassembly for flow if it needs it. + * + * The function requires flow to be locked beforehand. + * + * \param f Pointer to the flow. + * \param server action required for server: 1 or 2 + * \param client action required for client: 1 or 2 + * + * \retval 0 This flow doesn't need any reassembly processing; 1 otherwise. + */ +int FlowForceReassemblyForFlowV2(Flow *f, int server, int client) +{ Packet *p1 = NULL, *p2 = NULL, *p3 = NULL; + TcpSession *ssn; + + /* looks like we have no flows in this queue */ + if (f == NULL) { + return 0; + } + + /* Get the tcp session for the flow */ + ssn = (TcpSession *)f->protoctx; + if (ssn == NULL) { + return 0; + } /* The packets we use are based on what segments in what direction are * unprocessed. @@ -308,13 +328,13 @@ int FlowForceReassemblyForFlowV2(Flow *f) * toclient which is now dummy since all we need it for is detection */ /* insert a pseudo packet in the toserver direction */ - if (client_ok == 1) { + if (client == 1) { p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 0); if (p1 == NULL) { return 1; } - if (server_ok == 1) { + if (server == 1) { p2 = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 0); if (p2 == NULL) { TmqhOutputPacketpool(NULL,p1); @@ -335,8 +355,8 @@ int FlowForceReassemblyForFlowV2(Flow *f) } } - } else if (client_ok == 2) { - if (server_ok == 1) { + } else if (client == 2) { + if (server == 1) { p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 0); if (p1 == NULL) { return 1; @@ -353,7 +373,7 @@ int FlowForceReassemblyForFlowV2(Flow *f) return 1; } - if (server_ok == 2) { + if (server == 2) { p2 = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1); if (p2 == NULL) { TmqhOutputPacketpool(NULL, p1); @@ -363,7 +383,7 @@ int FlowForceReassemblyForFlowV2(Flow *f) } } else { - if (server_ok == 1) { + if (server == 1) { p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 0); if (p1 == NULL) { return 1; @@ -374,7 +394,7 @@ int FlowForceReassemblyForFlowV2(Flow *f) TmqhOutputPacketpool(NULL, p1); return 1; } - } else if (server_ok == 2) { + } else if (server == 2) { p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1); if (p1 == NULL) { return 1; diff --git a/src/flow-timeout.h b/src/flow-timeout.h index 875a3bd3f7..51ea7be879 100644 --- a/src/flow-timeout.h +++ b/src/flow-timeout.h @@ -24,7 +24,9 @@ #ifndef __FLOW_TIMEOUT_H__ #define __FLOW_TIMEOUT_H__ -int FlowForceReassemblyForFlowV2(Flow *); +int FlowForceReassemblyForFlowV2(Flow *f, int server, int client); +int FlowForceReassemblyNeedReassmbly(Flow *f, int *server, int *client); +//int FlowForceReassemblyForFlowV2(Flow *); void FlowForceReassembly(void); void FlowForceReassemblySetup(void); diff --git a/src/flow.c b/src/flow.c index 2d02e80cb6..2cfc6a9f60 100644 --- a/src/flow.c +++ b/src/flow.c @@ -170,6 +170,62 @@ static uint64_t prune_no_timeout = 0; static uint64_t prune_usecnt = 0; #endif +/** \internal + * \brief get timeout for flow + * + * \param f flow + * \param emergency bool indicating emergency mode 1 yes, 0 no + * + * \retval timeout timeout in seconds + */ +static inline uint32_t FlowPruneGetFlowTimeout(Flow *f, int emergency) { + uint32_t timeout; + + if (emergency) { + if (flow_proto[f->protomap].GetProtoState != NULL) { + switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) { + default: + case FLOW_STATE_NEW: + timeout = flow_proto[f->protomap].emerg_new_timeout; + break; + case FLOW_STATE_ESTABLISHED: + timeout = flow_proto[f->protomap].emerg_est_timeout; + break; + case FLOW_STATE_CLOSED: + timeout = flow_proto[f->protomap].emerg_closed_timeout; + break; + } + } else { + if (f->flags & FLOW_EST_LIST) + timeout = flow_proto[f->protomap].emerg_est_timeout; + else + timeout = flow_proto[f->protomap].emerg_new_timeout; + } + } else { /* implies no emergency */ + if (flow_proto[f->protomap].GetProtoState != NULL) { + switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) { + default: + case FLOW_STATE_NEW: + timeout = flow_proto[f->protomap].new_timeout; + break; + case FLOW_STATE_ESTABLISHED: + timeout = flow_proto[f->protomap].est_timeout; + break; + case FLOW_STATE_CLOSED: + timeout = flow_proto[f->protomap].closed_timeout; + break; + } + } else { + if (f->flags & FLOW_EST_LIST) + timeout = flow_proto[f->protomap].est_timeout; + else + timeout = flow_proto[f->protomap].new_timeout; + } + } + + return timeout; +} + /** FlowPrune * * Inspect top (last recently used) flow from the queue and see if @@ -179,11 +235,10 @@ static uint64_t prune_usecnt = 0; * * \param q Flow queue to prune * \param ts Current time - * \param timeout Timeout to enforce * \param try_cnt Tries to prune the first try_cnt no of flows in the q * * \retval 0 on error, failed block, nothing to prune - * \retval 1 on successfully pruned one + * \retval cnt on successfully pruned, cnt flows were pruned */ static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt) { @@ -248,47 +303,7 @@ static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt) /*set the timeout value according to the flow operating mode, flow's state and protocol.*/ - uint32_t timeout = 0; - - if (flow_flags & FLOW_EMERGENCY) { - if (flow_proto[f->protomap].GetProtoState != NULL) { - switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) { - case FLOW_STATE_NEW: - timeout = flow_proto[f->protomap].emerg_new_timeout; - break; - case FLOW_STATE_ESTABLISHED: - timeout = flow_proto[f->protomap].emerg_est_timeout; - break; - case FLOW_STATE_CLOSED: - timeout = flow_proto[f->protomap].emerg_closed_timeout; - break; - } - } else { - if (f->flags & FLOW_EST_LIST) - timeout = flow_proto[f->protomap].emerg_est_timeout; - else - timeout = flow_proto[f->protomap].emerg_new_timeout; - } - } else { /* implies no emergency */ - if (flow_proto[f->protomap].GetProtoState != NULL) { - switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) { - case FLOW_STATE_NEW: - timeout = flow_proto[f->protomap].new_timeout; - break; - case FLOW_STATE_ESTABLISHED: - timeout = flow_proto[f->protomap].est_timeout; - break; - case FLOW_STATE_CLOSED: - timeout = flow_proto[f->protomap].closed_timeout; - break; - } - } else { - if (f->flags & FLOW_EST_LIST) - timeout = flow_proto[f->protomap].est_timeout; - else - timeout = flow_proto[f->protomap].new_timeout; - } - } + uint32_t timeout = FlowPruneGetFlowTimeout(f, flow_flags & FLOW_EMERGENCY ? 1 : 0); SCLogDebug("got lock, now check: %" PRIdMAX "+%" PRIu32 "=(%" PRIdMAX ") < " "%" PRIdMAX "", (intmax_t)f->lastts_sec, @@ -317,18 +332,23 @@ static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt) #ifdef FLOW_PRUNE_DEBUG prune_usecnt++; #endif - Flow *prev_f = f; + SCSpinUnlock(&f->fb->s); + SCMutexUnlock(&f->m); f = f->lnext; - SCSpinUnlock(&prev_f->fb->s); - SCMutexUnlock(&prev_f->m); continue; } - if (FlowForceReassemblyForFlowV2(f) == 1) { - Flow *prev_f = f; + int server = 0, client = 0; + if (FlowForceReassemblyNeedReassmbly(f, &server, &client) == 1) { + /* we no longer need the fb lock. We know this flow won't be timed + * out just yet. So an incoming pkt is allowed to pick up this + * flow. */ + SCSpinUnlock(&f->fb->s); + + FlowForceReassemblyForFlowV2(f, server, client); + SCMutexUnlock(&f->m); + f = f->lnext; - SCSpinUnlock(&prev_f->fb->s); - SCMutexUnlock(&prev_f->m); continue; } #ifdef DEBUG diff --git a/src/flow.h b/src/flow.h index 1dd42f79ff..7d4d0c8442 100644 --- a/src/flow.h +++ b/src/flow.h @@ -302,12 +302,12 @@ typedef struct Flow_ SCMutex de_state_m; /**< mutex lock for the de_state object */ - /* list flow ptrs - * NOTE!!! These are NOT protected by the - * above mutex, but by the FlowQ's */ + /** hash list pointers, protected by fb->s */ struct Flow_ *hnext; /* hash list */ struct Flow_ *hprev; struct FlowBucket_ *fb; + + /** queue list pointers, protected by queue mutex */ struct Flow_ *lnext; /* list */ struct Flow_ *lprev;