]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow engine: improve scalability
authorVictor Julien <victor@inliniac.net>
Fri, 9 Mar 2012 17:31:46 +0000 (18:31 +0100)
committerVictor Julien <victor@inliniac.net>
Fri, 9 Mar 2012 17:31:46 +0000 (18:31 +0100)
Major redesign of the flow engine. Remove the flow queues that turned
out to be major choke points when using many threads. Flow manager now
walks the hash table directly. Simplify the way we get a new flow in
case of emergency.

18 files changed:
src/flow-hash.c
src/flow-hash.h
src/flow-manager.c
src/flow-manager.h
src/flow-private.h
src/flow-queue.c
src/flow-queue.h
src/flow-timeout.c
src/flow-timeout.h
src/flow-util.c
src/flow-util.h
src/flow.c
src/flow.h
src/source-pcap-file.c
src/stream-tcp-reassemble.c
src/stream-tcp.c
src/stream.c
src/suricata.c

index f381e80e5eada2a0fea815a12d888ebcbd8b1b78..0b6fcb2e7dcd3e851c319cfcde4b8ea0f0948903 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 
 #define FLOW_DEFAULT_FLOW_PRUNE 5
 
+SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
+SC_ATOMIC_EXTERN(unsigned char, flow_flags);
+
+static Flow *FlowGetUsedFlow(void);
+
 #ifdef FLOW_DEBUG_STATS
 #define FLOW_DEBUG_STATS_PROTO_ALL      0
 #define FLOW_DEBUG_STATS_PROTO_TCP      1
@@ -300,56 +305,33 @@ static Flow *FlowGetNew(Packet *p) {
         return NULL;
     }
 
-    /* no, so get a new one */
+    /* get a flow from the spare queue */
     f = FlowDequeue(&flow_spare_q);
     if (f == NULL) {
-        /* If we reached the max memcap, try to clean some flows:
-         * 1- first by normal timeouts
-         * 2- by emergency mode timeouts
-         * 3- by last time seen
-         */
+        /* If we reached the max memcap, we get a used flow */
         if ((SC_ATOMIC_GET(flow_memuse) + sizeof(Flow)) > flow_config.memcap) {
-            uint32_t not_released = 0;
-
-            SCLogDebug("We need to prune some flows(1)");
-
-            /* Ok, then try to release flow_try_release flows */
-            not_released = FlowPruneFlowsCnt(&p->ts, flow_config.flow_try_release);
-            if (not_released == (uint32_t)flow_config.flow_try_release) {
-                /* This means that none of the flows was released, so try again
-                 * with more agressive timeout values (emergency mode) */
-
-                if ( !(flow_flags & FLOW_EMERGENCY)) {
-                    SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine "
-                            "running with FLOW_EMERGENCY bit set "
-                            "(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")",
-                            (uintmax_t)p->ts.tv_sec, (uintmax_t)p->ts.tv_usec);
-                    flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */
-                    FlowWakeupFlowManagerThread();
-                }
-                SCLogDebug("We need to prune some flows with emerg bit (2)");
-
-                not_released = FlowPruneFlowsCnt(&p->ts, FLOW_DEFAULT_FLOW_PRUNE);
-                if (not_released == (uint32_t)flow_config.flow_try_release) {
-                    /* Here the engine is on a real stress situation
-                     * Try to kill the last time seen "flow_try_release" flows
-                     * directly, ignoring timeouts */
-                    SCLogDebug("We need to KILL some flows (3)");
-                    not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE);
-                    if (not_released == (uint32_t)flow_config.flow_try_release) {
-                        return NULL;
-                    }
-                }
+            /* declare state of emergency */
+            if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
+                SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
+
+                /* under high load, waking up the flow mgr each time leads
+                 * to high cpu usage. Flows are not timed out much faster if
+                 * we check a 1000 times a second. */
+                FlowWakeupFlowManagerThread();
             }
-        }
 
-        /* now see if we can alloc a new flow */
-        f = FlowAlloc();
-        if (f == NULL) {
-            return NULL;
-        }
+            f = FlowGetUsedFlow();
 
-        /* flow is initialized but *unlocked* */
+            /* freed a flow, but it's unlocked */
+        } else {
+            /* now see if we can alloc a new flow */
+            f = FlowAlloc();
+            if (f == NULL) {
+                return NULL;
+            }
+
+            /* flow is initialized but *unlocked* */
+        }
     } else {
         /* flow has been recycled before it went into the spare queue */
 
@@ -383,37 +365,36 @@ Flow *FlowGetFlowFromHash (Packet *p)
     uint32_t key = FlowGetKey(p);
     /* get our hash bucket and lock it */
     FlowBucket *fb = &flow_hash[key];
-    SCSpinLock(&fb->s);
+    FBLOCK_LOCK(fb);
 
-    SCLogDebug("fb %p fb->f %p", fb, fb->f);
+    SCLogDebug("fb %p fb->head %p", fb, fb->head);
 
     FlowHashCountIncr;
 
     /* see if the bucket already has a flow */
-    if (fb->f == NULL) {
-        f = fb->f = FlowGetNew(p);
+    if (fb->head == NULL) {
+        f = FlowGetNew(p);
         if (f == NULL) {
-            SCSpinUnlock(&fb->s);
+            FBLOCK_UNLOCK(fb);
             FlowHashCountUpdate;
             return NULL;
         }
 
         /* flow is locked */
+        fb->head = f;
+        fb->tail = f;
 
         /* got one, now lock, initialize and return */
         FlowInit(f,p);
-        f->flags |= FLOW_NEW_LIST;
         f->fb = fb;
 
-        FlowEnqueue(&flow_new_q[f->protomap], f);
-
-        SCSpinUnlock(&fb->s);
+        FBLOCK_UNLOCK(fb);
         FlowHashCountUpdate;
         return f;
     }
 
     /* ok, we have a flow in the bucket. Let's find out if it is our flow */
-    f = fb->f;
+    f = fb->head;
 
     /* see if this is the flow we are looking for */
     if (FlowCompare(f, p) == 0) {
@@ -428,10 +409,11 @@ Flow *FlowGetFlowFromHash (Packet *p)
             if (f == NULL) {
                 f = pf->hnext = FlowGetNew(p);
                 if (f == NULL) {
-                    SCSpinUnlock(&fb->s);
+                    FBLOCK_UNLOCK(fb);
                     FlowHashCountUpdate;
                     return NULL;
                 }
+                fb->tail = f;
 
                 /* flow is locked */
 
@@ -439,13 +421,9 @@ Flow *FlowGetFlowFromHash (Packet *p)
 
                 /* initialize and return */
                 FlowInit(f,p);
-
-                f->flags |= FLOW_NEW_LIST;
                 f->fb = fb;
 
-                FlowEnqueue(&flow_new_q[f->protomap], f);
-
-                SCSpinUnlock(&fb->s);
+                FBLOCK_UNLOCK(fb);
                 FlowHashCountUpdate;
                 return f;
             }
@@ -453,18 +431,25 @@ Flow *FlowGetFlowFromHash (Packet *p)
             if (FlowCompare(f, p) != 0) {
                 /* we found our flow, lets put it on top of the
                  * hash list -- this rewards active flows */
-                if (f->hnext) f->hnext->hprev = f->hprev;
-                if (f->hprev) f->hprev->hnext = f->hnext;
+                if (f->hnext) {
+                    f->hnext->hprev = f->hprev;
+                }
+                if (f->hprev) {
+                    f->hprev->hnext = f->hnext;
+                }
+                if (f == fb->tail) {
+                    fb->tail = f->hprev;
+                }
 
-                f->hnext = fb->f;
+                f->hnext = fb->head;
                 f->hprev = NULL;
-                fb->f->hprev = f;
-                fb->f = f;
+                fb->head->hprev = f;
+                fb->head = f;
 
                 /* found our flow, lock & return */
                 FlowIncrUsecnt(f);
                 SCMutexLock(&f->m);
-                SCSpinUnlock(&fb->s);
+                FBLOCK_UNLOCK(fb);
                 FlowHashCountUpdate;
                 return f;
             }
@@ -474,8 +459,79 @@ Flow *FlowGetFlowFromHash (Packet *p)
     /* lock & return */
     FlowIncrUsecnt(f);
     SCMutexLock(&f->m);
-    SCSpinUnlock(&fb->s);
+    FBLOCK_UNLOCK(fb);
     FlowHashCountUpdate;
     return f;
 }
 
+/** \internal
+ *  \brief Get a flow from the hash directly.
+ *
+ *  Called in conditions where the spare queue is empty and memcap is reached.
+ *
+ *  Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt
+ *  is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the
+ *  top each time since that would clear the top of the hash leading to longer
+ *  and longer search times under high pressure (observed).
+ *
+ *  \retval f flow or NULL
+ */
+static Flow *FlowGetUsedFlow(void) {
+    uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size;
+    uint32_t cnt = flow_config.hash_size;
+
+    while (cnt--) {
+        if (idx++ >= flow_config.hash_size)
+            idx = 0;
+
+        FlowBucket *fb = &flow_hash[idx];
+        if (fb == NULL)
+            continue;
+
+        if (FBLOCK_TRYLOCK(fb) != 0)
+            continue;
+
+        Flow *f = fb->tail;
+        if (f == NULL) {
+            FBLOCK_UNLOCK(fb);
+            continue;
+        }
+
+        if (SCMutexTrylock(&f->m) != 0) {
+            FBLOCK_UNLOCK(fb);
+            continue;
+        }
+
+        /** never prune a flow that is used by a packet or stream msg
+         *  we are currently processing in one of the threads */
+        if (SC_ATOMIC_GET(f->use_cnt) > 0) {
+            FBLOCK_UNLOCK(fb);
+            SCMutexUnlock(&f->m);
+            continue;
+        }
+
+        /* remove from the hash */
+        if (f->hprev != NULL)
+            f->hprev->hnext = f->hnext;
+        if (f->hnext != NULL)
+            f->hnext->hprev = f->hprev;
+        if (fb->head == f)
+            fb->head = f->hnext;
+        if (fb->tail == f)
+            fb->tail = f->hprev;
+
+        f->hnext = NULL;
+        f->hprev = NULL;
+        f->fb = NULL;
+        FBLOCK_UNLOCK(fb);
+
+        FlowClearMemory (f, f->protomap);
+
+        SCMutexUnlock(&f->m);
+
+        SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
+        return f;
+    }
+
+    return NULL;
+}
index 8ae5a5e4022d1e112d6aca01dde85c8ff6919c0a..171c527f06b7690c271feb11c3ab202d6d2df8e5 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 #ifndef __FLOW_HASH_H__
 #define __FLOW_HASH_H__
 
+/** Spinlocks or Mutex for the flow buckets. */
+//#define FBLOCK_SPIN
+#define FBLOCK_MUTEX
+
+#ifdef FBLOCK_SPIN
+    #ifdef FBLOCK_MUTEX
+        #error Cannot enable both FBLOCK_SPIN and FBLOCK_MUTEX
+    #endif
+#endif
+
 /* flow hash bucket -- the hash is basically an array of these buckets.
  * Each bucket contains a flow or list of flows. All these flows have
  * the same hashkey (the hash is a chained hash). When doing modifications
  * to the list, the entire bucket is locked. */
 typedef struct FlowBucket_ {
-    Flow *f;
-//    SCMutex m;
+    Flow *head;
+    Flow *tail;
+#ifdef FBLOCK_MUTEX
+    SCMutex m;
+#elif defined FBLOCK_SPIN
     SCSpinlock s;
+#else
+    #error Enable FBLOCK_SPIN or FBLOCK_MUTEX
+#endif
 } FlowBucket;
 
+#ifdef FBLOCK_SPIN
+    #define FBLOCK_INIT(fb) SCSpinInit(&(fb)->s, 0)
+    #define FBLOCK_DESTROY(fb) SCSpinDestroy(&(fb)->s)
+    #define FBLOCK_LOCK(fb) SCSpinLock(&(fb)->s)
+    #define FBLOCK_TRYLOCK(fb) SCSpinTrylock(&(fb)->s)
+    #define FBLOCK_UNLOCK(fb) SCSpinUnlock(&(fb)->s)
+#elif defined FBLOCK_MUTEX
+    #define FBLOCK_INIT(fb) SCMutexInit(&(fb)->m, NULL)
+    #define FBLOCK_DESTROY(fb) SCMutexDestroy(&(fb)->m)
+    #define FBLOCK_LOCK(fb) SCMutexLock(&(fb)->m)
+    #define FBLOCK_TRYLOCK(fb) SCMutexTrylock(&(fb)->m)
+    #define FBLOCK_UNLOCK(fb) SCMutexUnlock(&(fb)->m)
+#else
+    #error Enable FBLOCK_SPIN or FBLOCK_MUTEX
+#endif
+
 /* prototypes */
 
 Flow *FlowGetFlowFromHash(Packet *);
index 188487de0a58564182e5dbd35aed0c19a4746d09..7a970755b782c8222109e0aa656e7ac7db566a3f 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -19,6 +19,7 @@
  * \file
  *
  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
+ * \author Victor Julien <victor@inliniac.net>
  */
 
 #include "suricata-common.h"
 /* Run mode selected at suricata.c */
 extern int run_mode;
 
-/* 0.4 seconds */
+SC_ATOMIC_EXTERN(unsigned char, flow_flags);
+
+/* 1 seconds */
 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
-/* 0.01 seconds */
+/* 0.1 seconds */
 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
-#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 10000000
+#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
 #define NEW_FLOW_COUNT_COND 10
 
+typedef struct FlowTimeoutCounters_ {
+    uint32_t new;
+    uint32_t est;
+    uint32_t clo;
+} FlowTimeoutCounters;
+
 /**
  * \brief Used to kill flow manager thread(s).
  *
@@ -112,24 +121,257 @@ void FlowKillFlowManagerThread(void)
     return;
 }
 
-/** \brief Thread that manages the various queue's and removes timed out flows.
- *  \param td ThreadVars casted to void ptr
+/** \internal
+ *  \brief Get the flow's state
+ *
+ *  \param f flow
+ *
+ *  \retval state either FLOW_STATE_NEW, FLOW_STATE_ESTABLISHED or FLOW_STATE_CLOSED
+ */
+static inline int FlowGetFlowState(Flow *f) {
+    if (flow_proto[f->protomap].GetProtoState != NULL) {
+        return flow_proto[f->protomap].GetProtoState(f->protoctx);
+    } else {
+        if (f->flags & FLOW_TO_SRC_SEEN && f->flags & FLOW_TO_DST_SEEN)
+            return FLOW_STATE_ESTABLISHED;
+        else
+            return FLOW_STATE_NEW;
+    }
+}
+
+/** \internal
+ *  \brief get timeout for flow
+ *
+ *  \param f flow
+ *  \param state flow state
+ *  \param emergency bool indicating emergency mode 1 yes, 0 no
+ *
+ *  \retval timeout timeout in seconds
+ */
+static inline uint32_t FlowGetFlowTimeout(Flow *f, int state, int emergency) {
+    uint32_t timeout;
+
+    if (emergency) {
+        switch(state) {
+            default:
+            case FLOW_STATE_NEW:
+                timeout = flow_proto[f->protomap].emerg_new_timeout;
+                break;
+            case FLOW_STATE_ESTABLISHED:
+                timeout = flow_proto[f->protomap].emerg_est_timeout;
+                break;
+            case FLOW_STATE_CLOSED:
+                timeout = flow_proto[f->protomap].emerg_closed_timeout;
+                break;
+        }
+    } else { /* implies no emergency */
+        switch(state) {
+            default:
+            case FLOW_STATE_NEW:
+                timeout = flow_proto[f->protomap].new_timeout;
+                break;
+            case FLOW_STATE_ESTABLISHED:
+                timeout = flow_proto[f->protomap].est_timeout;
+                break;
+            case FLOW_STATE_CLOSED:
+                timeout = flow_proto[f->protomap].closed_timeout;
+                break;
+        }
+    }
+
+    return timeout;
+}
+
+/** \internal
+ *  \brief check if a flow is timed out
+ *
+ *  \param f flow
+ *  \param ts timestamp
+ *  \param emergency bool indicating emergency mode
+ *
+ *  \retval 0 not timed out
+ *  \retval 1 timed out
+ */
+static int FlowManagerFlowTimeout(Flow *f, int state, struct timeval *ts, int emergency) {
+    /* set the timeout value according to the flow operating mode,
+     * flow's state and protocol.*/
+    uint32_t timeout = FlowGetFlowTimeout(f, state, emergency);
+
+    /* do the timeout check */
+    if ((int32_t)(f->lastts_sec + timeout) >= ts->tv_sec) {
+        return 0;
+    }
+
+    return 1;
+}
+
+/** \internal
+ *  \brief See if we can really discard this flow. Check use_cnt reference
+ *         counter and force reassembly if necessary.
+ *
+ *  \param f flow
+ *  \param ts timestamp
+ *  \param emergency bool indicating emergency mode
+ *
+ *  \retval 0 not timed out just yet
+ *  \retval 1 fully timed out, lets kill it
+ */
+static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts) {
+    /** never prune a flow that is used by a packet or stream msg
+     *  we are currently processing in one of the threads */
+    if (SC_ATOMIC_GET(f->use_cnt) > 0) {
+        return 0;
+    }
+
+    int server = 0, client = 0;
+    if (FlowForceReassemblyNeedReassmbly(f, &server, &client) == 1) {
+        FlowForceReassemblyForFlowV2(f, server, client);
+        return 0;
+    }
+#ifdef DEBUG
+    /* this should not be possible */
+    BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
+#endif
+
+    return 1;
+}
+
+/**
+ *  \internal
  *
- * IDEAS/TODO
- * Create a 'emergency mode' in which flow handling threads can indicate
- * we are/seem to be under attack..... maybe this thread should check
- * key indicators for that like:
- * - number of flows created in the last x time
- * - avg number of pkts per flow (how?)
- * - avg flow age
+ *  \brief check all flows in a hash row for timing out
  *
- * Keep an eye on the spare list, alloc flows if needed...
+ *  \param f last flow in the hash row
+ *  \param ts timestamp
+ *  \param emergency bool indicating emergency mode
+ *  \param counters ptr to FlowTimeoutCounters structure
+ *
+ *  \retval cnt timed out flows
+ */
+static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
+        int emergency, FlowTimeoutCounters *counters)
+{
+    uint32_t cnt = 0;
+
+    do {
+        if (SCMutexTrylock(&f->m) != 0) {
+            f = f->hprev;
+            continue;
+        }
+
+        Flow *next_flow = f->hprev;
+
+        int state = FlowGetFlowState(f);
+
+        /* timeout logic goes here */
+        if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) {
+            SCMutexUnlock(&f->m);
+            f = f->hprev;
+            continue;
+        }
+
+        /* check if the flow is fully timed out and
+         * ready to be discarded. */
+        if (FlowManagerFlowTimedOut(f, ts) == 1) {
+            /* remove from the hash */
+            if (f->hprev != NULL)
+                f->hprev->hnext = f->hnext;
+            if (f->hnext != NULL)
+                f->hnext->hprev = f->hprev;
+            if (f->fb->head == f)
+                f->fb->head = f->hnext;
+            if (f->fb->tail == f)
+                f->fb->tail = f->hprev;
+
+            f->hnext = NULL;
+            f->hprev = NULL;
+
+            FlowClearMemory (f, f->protomap);
+
+            /* no one is referring to this flow, use_cnt 0, removed from hash
+             * so we can unlock it and move it back to the spare queue. */
+            SCMutexUnlock(&f->m);
+
+            /* move to spare list */
+            FlowMoveToSpare(f);
+
+            cnt++;
+
+            switch (state) {
+                case FLOW_STATE_NEW:
+                default:
+                    counters->new++;
+                    break;
+                case FLOW_STATE_ESTABLISHED:
+                    counters->est++;
+                    break;
+                case FLOW_STATE_CLOSED:
+                    counters->clo++;
+                    break;
+            }
+        } else {
+            SCMutexUnlock(&f->m);
+        }
+
+        f = next_flow;
+    } while (f != NULL);
+
+    return cnt;
+}
+
+/**
+ *  \brief time out flows from the hash
+ *
+ *  \param ts timestamp
+ *  \param try_cnt number of flows to time out max (0 is unlimited)
+ *  \param counters ptr to FlowTimeoutCounters structure
+ *
+ *  \retval cnt number of timed out flow
+ */
+uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounters *counters) {
+    uint32_t idx = 0;
+    uint32_t cnt = 0;
+    int emergency = 0;
+
+    if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
+        emergency = 1;
+
+    for (idx = 0; idx < flow_config.hash_size; idx++) {
+        FlowBucket *fb = &flow_hash[idx];
+        if (fb == NULL)
+            continue;
+        if (FBLOCK_TRYLOCK(fb) != 0)
+            continue;
+
+        /* flow hash bucket is now locked */
+
+        if (fb->tail == NULL)
+            goto next;
+
+        /* we have a flow, or more than one */
+        cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters);
+
+next:
+        FBLOCK_UNLOCK(fb);
+
+        if (try_cnt > 0 && cnt >= try_cnt)
+            break;
+    }
+
+    return cnt;
+}
+
+/** \brief Thread that manages the flow table and times out flows.
+ *
+ *  \param td ThreadVars casted to void ptr
+ *
+ *  Keeps an eye on the spare list, alloc flows if needed...
  */
 void *FlowManagerThread(void *td)
 {
     ThreadVars *th_v = (ThreadVars *)td;
     struct timeval ts;
-    uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0, nowcnt;
+    uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
     int emerg = FALSE;
     int prev_emerg = FALSE;
     uint32_t last_sec = 0;
@@ -137,18 +379,21 @@ void *FlowManagerThread(void *td)
     int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
     int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
 
-    uint16_t flow_mgr_closing_cnt = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
+    uint16_t flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
             SC_PERF_TYPE_UINT64,
             "NULL");
-    uint16_t flow_mgr_new_cnt = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
+    uint16_t flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
             SC_PERF_TYPE_UINT64,
             "NULL");
-    uint16_t flow_mgr_established_cnt = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
+    uint16_t flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
             SC_PERF_TYPE_UINT64,
             "NULL");
     uint16_t flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", th_v,
             SC_PERF_TYPE_Q_NORMAL,
             "NULL");
+    uint16_t flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", th_v,
+            SC_PERF_TYPE_Q_NORMAL,
+            "NULL");
     uint16_t flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", th_v,
             SC_PERF_TYPE_UINT64,
             "NULL");
@@ -178,7 +423,7 @@ void *FlowManagerThread(void *td)
     {
         TmThreadTestThreadUnPaused(th_v);
 
-        if (flow_flags & FLOW_EMERGENCY) {
+        if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
             emerg = TRUE;
 
             if (emerg == TRUE && prev_emerg == FALSE) {
@@ -203,58 +448,36 @@ void *FlowManagerThread(void *td)
         /* see if we still have enough spare flows */
         FlowUpdateSpareFlows();
 
-        int i;
-        closing_cnt = 0;
-        new_cnt = 0;
-        established_cnt = 0;
-        for (i = 0; i < FLOW_PROTO_MAX; i++) {
-            /* prune closing list */
-            nowcnt = FlowPruneFlowQueue(&flow_close_q[i], &ts);
-            if (nowcnt) {
-                SCLogDebug("Pruned %" PRIu32 " closing flows...", nowcnt);
-                closing_cnt += nowcnt;
-            }
-
-            /* prune new list */
-            nowcnt = FlowPruneFlowQueue(&flow_new_q[i], &ts);
-            if (nowcnt) {
-                SCLogDebug("Pruned %" PRIu32 " new flows...", nowcnt);
-                new_cnt += nowcnt;
-            }
+        /* try to time out flows */
+        FlowTimeoutCounters counters = { 0, 0, 0, };
+        FlowTimeoutHash(&ts, 0 /* check all */, &counters);
 
-            /* prune established list */
-            nowcnt = FlowPruneFlowQueue(&flow_est_q[i], &ts);
-            if (nowcnt) {
-                SCLogDebug("Pruned %" PRIu32 " established flows...", nowcnt);
-                established_cnt += nowcnt;
-            }
-        }
-        SCPerfCounterAddUI64(flow_mgr_closing_cnt, th_v->sc_perf_pca, (uint64_t)closing_cnt);
-        SCPerfCounterAddUI64(flow_mgr_new_cnt, th_v->sc_perf_pca, (uint64_t)new_cnt);
-        SCPerfCounterAddUI64(flow_mgr_established_cnt, th_v->sc_perf_pca, (uint64_t)established_cnt);
+        SCPerfCounterAddUI64(flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
+        SCPerfCounterAddUI64(flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
+        SCPerfCounterAddUI64(flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
         long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
         SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
 
+        uint32_t len = 0;
+        FQLOCK_LOCK(&flow_spare_q);
+        len = flow_spare_q.len;
+        FQLOCK_UNLOCK(&flow_spare_q);
+        SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
+
         /* Don't fear, FlowManagerThread is here...
          * clear emergency bit if we have at least xx flows pruned. */
         if (emerg == TRUE) {
-            uint32_t len = 0;
-
-            SCMutexLock(&flow_spare_q.mutex_q);
-
-            len = flow_spare_q.len;
-
-            SCMutexUnlock(&flow_spare_q.mutex_q);
-
             SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
                        "flow_spare_q status: %"PRIu32"%% flows at the queue",
                        len, flow_config.prealloc, len * 100 / flow_config.prealloc);
             /* only if we have pruned this "emergency_recovery" percentage
              * of flows, we will unset the emergency bit */
             if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
-                flow_flags &= ~FLOW_EMERGENCY;
+                SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
+
                 emerg = FALSE;
                 prev_emerg = FALSE;
+
                 flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
                 flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
                 SCLogInfo("Flow emergency mode over, back to normal... unsetting"
@@ -281,6 +504,8 @@ void *FlowManagerThread(void *td)
         SCCondTimedwait(&flow_manager_cond, &flow_manager_mutex, &cond_time);
         SCMutexUnlock(&flow_manager_mutex);
 
+        SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
+
         SCPerfSyncCountersIfSignalled(th_v, 0);
     }
 
@@ -292,15 +517,6 @@ void *FlowManagerThread(void *td)
               "timed out, %"PRIu32" flows in closed state", new_cnt,
               established_cnt, closing_cnt);
 
-#ifdef FLOW_PRUNE_DEBUG
-    SCLogInfo("prune_queue_lock %"PRIu64, prune_queue_lock);
-    SCLogInfo("prune_queue_empty %"PRIu64, prune_queue_empty);
-    SCLogInfo("prune_flow_lock %"PRIu64, prune_flow_lock);
-    SCLogInfo("prune_bucket_lock %"PRIu64, prune_bucket_lock);
-    SCLogInfo("prune_no_timeout %"PRIu64, prune_no_timeout);
-    SCLogInfo("prune_usecnt %"PRIu64, prune_usecnt);
-#endif
-
     TmThreadsSetFlag(th_v, THV_CLOSED);
     pthread_exit((void *) 0);
 }
@@ -326,3 +542,279 @@ void FlowManagerThreadSpawn()
 
     return;
 }
+
+#ifdef UNITTESTS
+
+/**
+ *  \test   Test the timing out of a flow with a fresh TcpSession
+ *          (just initialized, no data segments) in normal mode.
+ *
+ *  \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest01 (void) {
+    TcpSession ssn;
+    Flow f;
+    FlowBucket fb;
+    struct timeval ts;
+
+    FlowQueueInit(&flow_spare_q);
+
+    memset(&ssn, 0, sizeof(TcpSession));
+    memset(&f, 0, sizeof(Flow));
+    memset(&ts, 0, sizeof(ts));
+    memset(&fb, 0, sizeof(FlowBucket));
+
+    FBLOCK_INIT(&fb);
+
+    FLOW_INITIALIZE(&f);
+    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+    TimeGet(&ts);
+    f.lastts_sec = ts.tv_sec - 5000;
+    f.protoctx = &ssn;
+    f.fb = &fb;
+
+    f.proto = IPPROTO_TCP;
+
+    int state = FlowGetFlowState(&f);
+    if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+        FBLOCK_DESTROY(&fb);
+        FLOW_DESTROY(&f);
+        FlowQueueDestroy(&flow_spare_q);
+        return 0;
+    }
+
+    FBLOCK_DESTROY(&fb);
+    FLOW_DESTROY(&f);
+
+    FlowQueueDestroy(&flow_spare_q);
+    return 1;
+}
+
+/**
+ *  \test   Test the timing out of a flow with a TcpSession
+ *          (with data segments) in normal mode.
+ *
+ *  \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest02 (void) {
+    TcpSession ssn;
+    Flow f;
+    FlowBucket fb;
+    struct timeval ts;
+    TcpSegment seg;
+    TcpStream client;
+    uint8_t payload[3] = {0x41, 0x41, 0x41};
+
+    FlowQueueInit(&flow_spare_q);
+
+    memset(&ssn, 0, sizeof(TcpSession));
+    memset(&f, 0, sizeof(Flow));
+    memset(&fb, 0, sizeof(FlowBucket));
+    memset(&ts, 0, sizeof(ts));
+    memset(&seg, 0, sizeof(TcpSegment));
+    memset(&client, 0, sizeof(TcpSegment));
+
+    FBLOCK_INIT(&fb);
+    FLOW_INITIALIZE(&f);
+    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+    TimeGet(&ts);
+    seg.payload = payload;
+    seg.payload_len = 3;
+    seg.next = NULL;
+    seg.prev = NULL;
+    client.seg_list = &seg;
+    ssn.client = client;
+    ssn.server = client;
+    ssn.state = TCP_ESTABLISHED;
+    f.lastts_sec = ts.tv_sec - 5000;
+    f.protoctx = &ssn;
+    f.fb = &fb;
+    f.proto = IPPROTO_TCP;
+
+    int state = FlowGetFlowState(&f);
+    if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+        FBLOCK_DESTROY(&fb);
+        FLOW_DESTROY(&f);
+        FlowQueueDestroy(&flow_spare_q);
+        return 0;
+    }
+    FBLOCK_DESTROY(&fb);
+    FLOW_DESTROY(&f);
+    FlowQueueDestroy(&flow_spare_q);
+    return 1;
+
+}
+
+/**
+ *  \test   Test the timing out of a flow with a fresh TcpSession
+ *          (just initialized, no data segments) in emergency mode.
+ *
+ *  \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest03 (void) {
+    TcpSession ssn;
+    Flow f;
+    FlowBucket fb;
+    struct timeval ts;
+
+    FlowQueueInit(&flow_spare_q);
+
+    memset(&ssn, 0, sizeof(TcpSession));
+    memset(&f, 0, sizeof(Flow));
+    memset(&ts, 0, sizeof(ts));
+    memset(&fb, 0, sizeof(FlowBucket));
+
+    FBLOCK_INIT(&fb);
+    FLOW_INITIALIZE(&f);
+    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+    TimeGet(&ts);
+    ssn.state = TCP_SYN_SENT;
+    f.lastts_sec = ts.tv_sec - 300;
+    f.protoctx = &ssn;
+    f.fb = &fb;
+    f.proto = IPPROTO_TCP;
+    f.flags |= FLOW_EMERGENCY;
+
+    int state = FlowGetFlowState(&f);
+    if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+        FBLOCK_DESTROY(&fb);
+        FLOW_DESTROY(&f);
+        FlowQueueDestroy(&flow_spare_q);
+        return 0;
+    }
+
+    FBLOCK_DESTROY(&fb);
+    FLOW_DESTROY(&f);
+    FlowQueueDestroy(&flow_spare_q);
+    return 1;
+}
+
+/**
+ *  \test   Test the timing out of a flow with a TcpSession
+ *          (with data segments) in emergency mode.
+ *
+ *  \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest04 (void) {
+
+    TcpSession ssn;
+    Flow f;
+    FlowBucket fb;
+    struct timeval ts;
+    TcpSegment seg;
+    TcpStream client;
+    uint8_t payload[3] = {0x41, 0x41, 0x41};
+
+    FlowQueueInit(&flow_spare_q);
+
+    memset(&ssn, 0, sizeof(TcpSession));
+    memset(&f, 0, sizeof(Flow));
+    memset(&fb, 0, sizeof(FlowBucket));
+    memset(&ts, 0, sizeof(ts));
+    memset(&seg, 0, sizeof(TcpSegment));
+    memset(&client, 0, sizeof(TcpSegment));
+
+    FBLOCK_INIT(&fb);
+    FLOW_INITIALIZE(&f);
+    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+    TimeGet(&ts);
+    seg.payload = payload;
+    seg.payload_len = 3;
+    seg.next = NULL;
+    seg.prev = NULL;
+    client.seg_list = &seg;
+    ssn.client = client;
+    ssn.server = client;
+    ssn.state = TCP_ESTABLISHED;
+    f.lastts_sec = ts.tv_sec - 5000;
+    f.protoctx = &ssn;
+    f.fb = &fb;
+    f.proto = IPPROTO_TCP;
+    f.flags |= FLOW_EMERGENCY;
+
+    int state = FlowGetFlowState(&f);
+    if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+        FBLOCK_DESTROY(&fb);
+        FLOW_DESTROY(&f);
+        FlowQueueDestroy(&flow_spare_q);
+        return 0;
+    }
+
+    FBLOCK_DESTROY(&fb);
+    FLOW_DESTROY(&f);
+    FlowQueueDestroy(&flow_spare_q);
+    return 1;
+}
+
+/**
+ *  \test   Test flow allocations when it reach memcap
+ *
+ *
+ *  \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest05 (void) {
+    int result = 0;
+
+    FlowInitConfig(FLOW_QUIET);
+    FlowConfig backup;
+    memcpy(&backup, &flow_config, sizeof(FlowConfig));
+
+    uint32_t ini = 0;
+    uint32_t end = flow_spare_q.len;
+    flow_config.memcap = 10000;
+    flow_config.prealloc = 100;
+
+    /* Let's get the flow_spare_q empty */
+    UTHBuildPacketOfFlows(ini, end, 0);
+
+    /* And now let's try to reach the memcap val */
+    while (SC_ATOMIC_GET(flow_memuse) + sizeof(Flow) < flow_config.memcap) {
+        ini = end + 1;
+        end = end + 2;
+        UTHBuildPacketOfFlows(ini, end, 0);
+    }
+
+    /* should time out normal */
+    TimeSetIncrementTime(2000);
+    ini = end + 1;
+    end = end + 2;;
+    UTHBuildPacketOfFlows(ini, end, 0);
+
+    struct timeval ts;
+    TimeGet(&ts);
+    /* try to time out flows */
+    FlowTimeoutCounters counters = { 0, 0, 0, };
+    FlowTimeoutHash(&ts, 0 /* check all */, &counters);
+
+    if (flow_spare_q.len > 0) {
+        result = 1;
+    }
+
+    memcpy(&flow_config, &backup, sizeof(FlowConfig));
+    FlowShutdown();
+
+    return result;
+}
+#endif /* UNITTESTS */
+
+/**
+ *  \brief   Function to register the Flow Unitests.
+ */
+void FlowMgrRegisterTests (void) {
+#ifdef UNITTESTS
+    UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession", FlowMgrTest01, 1);
+    UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments", FlowMgrTest02, 1);
+    UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession", FlowMgrTest03, 1);
+    UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments", FlowMgrTest04, 1);
+    UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap", FlowMgrTest05, 1);
+#endif /* UNITTESTS */
+}
index 85aa1801628b6e0908dab9aa5eb2406b48ec6ead..6bb64b8771ca9458ee72a0adfad0be2b47b85596 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 #ifndef __FLOW_MANAGER_H__
 #define __FLOW_MANAGER_H__
 
+/** flow manager scheduling condition */
 SCCondT flow_manager_cond;
 SCMutex flow_manager_mutex;
-
 #define FlowWakeupFlowManagerThread() SCCondSignal(&flow_manager_cond)
 
 void FlowManagerThreadSpawn(void);
 void FlowKillFlowManagerThread(void);
+void FlowMgrRegisterTests (void);
 
 #endif /* __FLOW_MANAGER_H__ */
index dac216e8f51186e7b9a67e34db4057ef1ab1fdae..97eb25a29b03ff4302e347201ce6ae6e2334699f 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -78,26 +78,9 @@ FlowProto flow_proto[FLOW_PROTO_MAX];
 /** spare/unused/prealloced flows live here */
 FlowQueue flow_spare_q;
 
-/** Flows in the new/unreplied state live here */
-FlowQueue flow_new_q[FLOW_PROTO_MAX];
-
-/** All "established" flows live here, the top holds the
- *  last recently used (lru) flow, so we can remove
- *  that in case of memory problems and check it for
- *  timeouts. */
-FlowQueue flow_est_q[FLOW_PROTO_MAX];
-
-/** All "closing" flows live here, the top holds the
- *  last recently used (lru) flow, so we can remove
- *  that in case of memory problems and check it for
- *  timeouts. */
-FlowQueue flow_close_q[FLOW_PROTO_MAX];
-
 FlowBucket *flow_hash;
 FlowConfig flow_config;
 
-uint8_t flow_flags;
-
 /** flow memuse counter (atomic), for enforcing memcap limit */
 SC_ATOMIC_DECLARE(long long unsigned int, flow_memuse);
 
index bfd2365f2e7448edb25abbb7f0f2683779e4204c..eb6d120959aa8a714d16e169aef50750565e7fd4 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -32,7 +32,6 @@
 #include "util-error.h"
 #include "util-debug.h"
 #include "util-print.h"
-#include <string.h>
 
 FlowQueue *FlowQueueNew() {
     FlowQueue *q = (FlowQueue *)SCMalloc(sizeof(FlowQueue));
@@ -47,8 +46,7 @@ FlowQueue *FlowQueueNew() {
 FlowQueue *FlowQueueInit (FlowQueue *q) {
     if (q != NULL) {
         memset(q, 0, sizeof(FlowQueue));
-        SCMutexInit(&q->mutex_q, NULL);
-        SCCondInit(&q->cond_q, NULL);
+        FQLOCK_INIT(q);
     }
     return q;
 }
@@ -59,8 +57,7 @@ FlowQueue *FlowQueueInit (FlowQueue *q) {
  *  \param q the flow queue to destroy
  */
 void FlowQueueDestroy (FlowQueue *q) {
-    SCMutexDestroy(&q->mutex_q);
-    SCCondDestroy(&q->cond_q);
+    FQLOCK_DESTROY(q);
 }
 
 /**
@@ -74,7 +71,8 @@ void FlowEnqueue (FlowQueue *q, Flow *f) {
     BUG_ON(q == NULL || f == NULL);
 #endif
 
-    SCMutexLock(&q->mutex_q);
+    FQLOCK_LOCK(q);
+
     /* more flows in queue */
     if (q->top != NULL) {
         f->lnext = q->top;
@@ -90,7 +88,7 @@ void FlowEnqueue (FlowQueue *q, Flow *f) {
     if (q->len > q->dbg_maxlen)
         q->dbg_maxlen = q->len;
 #endif /* DBG_PERF */
-    SCMutexUnlock(&q->mutex_q);
+    FQLOCK_UNLOCK(q);
 }
 
 /**
@@ -101,11 +99,11 @@ void FlowEnqueue (FlowQueue *q, Flow *f) {
  *  \retval f flow or NULL if empty list.
  */
 Flow *FlowDequeue (FlowQueue *q) {
-    SCMutexLock(&q->mutex_q);
+    FQLOCK_LOCK(q);
 
     Flow *f = q->bot;
     if (f == NULL) {
-        SCMutexUnlock(&q->mutex_q);
+        FQLOCK_UNLOCK(q);
         return NULL;
     }
 
@@ -128,108 +126,10 @@ Flow *FlowDequeue (FlowQueue *q) {
     f->lnext = NULL;
     f->lprev = NULL;
 
-    SCMutexUnlock(&q->mutex_q);
+    FQLOCK_UNLOCK(q);
     return f;
 }
 
-/**
- *  \brief Transfer a flow from one queue to another
- *
- *  \param f the flow to be transfered
- *  \param srcq the source queue, where the flow will be removed.
- *  \param dstq the dest queue where the flow will be placed
- *
- *  \note srcq and dstq must be different queues.
- */
-void FlowRequeue(Flow *f, FlowQueue *srcq, FlowQueue *dstq)
-{
-#ifdef DEBUG
-    BUG_ON(srcq == NULL || dstq == NULL || srcq == dstq);
-#endif /* DEBUG */
-
-    SCMutexLock(&srcq->mutex_q);
-
-    /* remove from old queue */
-    if (srcq->top == f)
-        srcq->top = f->lnext;       /* remove from queue top */
-    if (srcq->bot == f)
-        srcq->bot = f->lprev;       /* remove from queue bot */
-    if (f->lprev != NULL)
-        f->lprev->lnext = f->lnext; /* remove from flow prev */
-    if (f->lnext != NULL)
-        f->lnext->lprev = f->lprev; /* remove from flow next */
-
-#ifdef DEBUG
-    BUG_ON(srcq->len == 0);
-#endif
-    if (srcq->len > 0)
-        srcq->len--; /* adjust len */
-
-    f->lnext = NULL;
-    f->lprev = NULL;
-
-    SCMutexUnlock(&srcq->mutex_q);
-
-    SCMutexLock(&dstq->mutex_q);
-
-    /* add to new queue (append) */
-    f->lprev = dstq->bot;
-    if (f->lprev != NULL)
-        f->lprev->lnext = f;
-    f->lnext = NULL;
-    dstq->bot = f;
-    if (dstq->top == NULL)
-        dstq->top = f;
-
-    dstq->len++;
-#ifdef DBG_PERF
-    if (dstq->len > dstq->dbg_maxlen)
-        dstq->dbg_maxlen = dstq->len;
-#endif /* DBG_PERF */
-
-    SCMutexUnlock(&dstq->mutex_q);
-}
-
-/**
- *  \brief Move flow to bottom of queue
- *
- *  \param f the flow to be transfered
- *  \param q the queue
- */
-void FlowRequeueMoveToBot(Flow *f, FlowQueue *q)
-{
-#ifdef DEBUG
-    BUG_ON(q == NULL || f == NULL);
-#endif /* DEBUG */
-
-    SCMutexLock(&q->mutex_q);
-
-    /* remove from the queue */
-    if (q->top == f)
-        q->top = f->lnext;       /* remove from queue top */
-    if (q->bot == f)
-        q->bot = f->lprev;       /* remove from queue bot */
-    if (f->lprev != NULL)
-        f->lprev->lnext = f->lnext; /* remove from flow prev */
-    if (f->lnext != NULL)
-        f->lnext->lprev = f->lprev; /* remove from flow next */
-
-    /* readd to the queue (append) */
-    f->lprev = q->bot;
-
-    if (f->lprev != NULL)
-        f->lprev->lnext = f;
-
-    f->lnext = NULL;
-
-    q->bot = f;
-
-    if (q->top == NULL)
-        q->top = f;
-
-    SCMutexUnlock(&q->mutex_q);
-}
-
 /**
  *  \brief Transfer a flow from a queue to the spare queue
  *
@@ -238,32 +138,10 @@ void FlowRequeueMoveToBot(Flow *f, FlowQueue *q)
  *
  *  \note spare queue needs locking
  */
-void FlowRequeueMoveToSpare(Flow *f, FlowQueue *q)
+void FlowMoveToSpare(Flow *f)
 {
-#ifdef DEBUG
-    BUG_ON(q == NULL || f == NULL);
-#endif /* DEBUG */
-
-    /* remove from old queue */
-    if (q->top == f)
-        q->top = f->lnext;       /* remove from queue top */
-    if (q->bot == f)
-        q->bot = f->lprev;       /* remove from queue bot */
-    if (f->lprev != NULL)
-        f->lprev->lnext = f->lnext; /* remove from flow prev */
-    if (f->lnext != NULL)
-        f->lnext->lprev = f->lprev; /* remove from flow next */
-#ifdef DEBUG
-    BUG_ON(q->len == 0);
-#endif
-    if (q->len > 0)
-        q->len--; /* adjust len */
-
-    f->lnext = NULL;
-    f->lprev = NULL;
-
     /* now put it in spare */
-    SCMutexLock(&flow_spare_q.mutex_q);
+    FQLOCK_LOCK(&flow_spare_q);
 
     /* add to new queue (append) */
     f->lprev = flow_spare_q.bot;
@@ -280,6 +158,6 @@ void FlowRequeueMoveToSpare(Flow *f, FlowQueue *q)
         flow_spare_q.dbg_maxlen = flow_spare_q.len;
 #endif /* DBG_PERF */
 
-    SCMutexUnlock(&flow_spare_q.mutex_q);
+    FQLOCK_UNLOCK(&flow_spare_q);
 }
 
index 57ca310a079ad8d0d8485df17210560488fdafc9..b370cd22268e28917f7a337405952bc05b111757 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 #include "suricata-common.h"
 #include "flow.h"
 
+/** Spinlocks or Mutex for the flow queues. */
+//#define FQLOCK_SPIN
+#define FQLOCK_MUTEX
+
+#ifdef FQLOCK_SPIN
+    #ifdef FQLOCK_MUTEX
+        #error Cannot enable both FQLOCK_SPIN and FQLOCK_MUTEX
+    #endif
+#endif
+
 /* Define a queue for storing flows */
 typedef struct FlowQueue_
 {
@@ -36,10 +46,31 @@ typedef struct FlowQueue_
 #ifdef DBG_PERF
     uint32_t dbg_maxlen;
 #endif /* DBG_PERF */
-    SCMutex mutex_q;
-    SCCondT cond_q;
+#ifdef FQLOCK_MUTEX
+    SCMutex m;
+#elif defined FQLOCK_SPIN
+    SCSpinlock s;
+#else
+    #error Enable FQLOCK_SPIN or FQLOCK_MUTEX
+#endif
 } FlowQueue;
 
+#ifdef FQLOCK_SPIN
+    #define FQLOCK_INIT(q) SCSpinInit(&(q)->s, 0)
+    #define FQLOCK_DESTROY(q) SCSpinDestroy(&(q)->s)
+    #define FQLOCK_LOCK(q) SCSpinLock(&(q)->s)
+    #define FQLOCK_TRYLOCK(q) SCSpinTrylock(&(q)->s)
+    #define FQLOCK_UNLOCK(q) SCSpinUnlock(&(q)->s)
+#elif defined FQLOCK_MUTEX
+    #define FQLOCK_INIT(q) SCMutexInit(&(q)->m, NULL)
+    #define FQLOCK_DESTROY(q) SCMutexDestroy(&(q)->m)
+    #define FQLOCK_LOCK(q) SCMutexLock(&(q)->m)
+    #define FQLOCK_TRYLOCK(q) SCMutexTrylock(&(q)->m)
+    #define FQLOCK_UNLOCK(q) SCMutexUnlock(&(q)->m)
+#else
+    #error Enable FQLOCK_SPIN or FQLOCK_MUTEX
+#endif
+
 /* prototypes */
 FlowQueue *FlowQueueNew();
 FlowQueue *FlowQueueInit(FlowQueue *);
@@ -48,9 +79,7 @@ void FlowQueueDestroy (FlowQueue *);
 void FlowEnqueue (FlowQueue *, Flow *);
 Flow *FlowDequeue (FlowQueue *);
 
-void FlowRequeue(Flow *, FlowQueue *, FlowQueue *);
-void FlowRequeueMoveToBot(Flow *, FlowQueue *);
-void FlowRequeueMoveToSpare(Flow *, FlowQueue *);
+void FlowMoveToSpare(Flow *);
 
 #endif /* __FLOW_QUEUE_H__ */
 
index 09745baa129ca8ebf011a60b757bf482653bed03..c68d6548a55de6a48990bfe3aef09899b4b062ed 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -436,7 +436,7 @@ int FlowForceReassemblyForFlowV2(Flow *f, int server, int client)
  *
  * \param q The queue to process flows from.
  */
-static inline void FlowForceReassemblyForQ(FlowQueue *q)
+static inline void FlowForceReassemblyForHash(void)
 {
     Flow *f;
     TcpSession *ssn;
@@ -444,128 +444,137 @@ static inline void FlowForceReassemblyForQ(FlowQueue *q)
     int server_ok;
     int tcp_needs_inspection;
 
-    /* get the topmost flow from the QUEUE */
-    f = q->top;
+    uint32_t idx = 0;
 
     /* We use this packet just for reassembly purpose */
     Packet *reassemble_p = PacketGetFromAlloc();
     if (reassemble_p == NULL)
         return;
 
-    /* we need to loop through all the flows in the queue */
-    while (f != NULL) {
-        PACKET_RECYCLE(reassemble_p);
+    for (idx = 0; idx < flow_config.hash_size; idx++) {
+        FlowBucket *fb = &flow_hash[idx];
+        if (fb == NULL)
+            continue;
+        FBLOCK_LOCK(fb);
 
-        SCMutexLock(&f->m);
+        /* get the topmost flow from the QUEUE */
+        f = fb->head;
 
-        /* Get the tcp session for the flow */
-        ssn = (TcpSession *)f->protoctx;
+        /* we need to loop through all the flows in the queue */
+        while (f != NULL) {
+            PACKET_RECYCLE(reassemble_p);
 
-        /* \todo Also skip flows that shouldn't be inspected */
-        if (ssn == NULL) {
-            SCMutexUnlock(&f->m);
-            f = f->lnext;
-            continue;
-        }
-
-        /* ah ah!  We have some unattended toserver segments */
-        if ((client_ok = StreamHasUnprocessedSegments(ssn, 0)) == 1) {
-            StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
+            SCMutexLock(&f->m);
 
-            ssn->client.last_ack = (ssn->client.seg_list_tail->seq +
-                                    ssn->client.seg_list_tail->payload_len);
+            /* Get the tcp session for the flow */
+            ssn = (TcpSession *)f->protoctx;
 
-            FlowForceReassemblyPseudoPacketSetup(reassemble_p, 1, f, ssn, 1);
-            StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
-                                             stt->ra_ctx, ssn, &ssn->server,
-                                             reassemble_p, NULL);
-            StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
-        }
-        /* oh oh!  We have some unattended toclient segments */
-        if ((server_ok = StreamHasUnprocessedSegments(ssn, 1)) == 1) {
-            StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
+            /* \todo Also skip flows that shouldn't be inspected */
+            if (ssn == NULL) {
+                SCMutexUnlock(&f->m);
+                f = f->hnext;
+                continue;
+            }
 
-            ssn->server.last_ack = (ssn->server.seg_list_tail->seq +
-                                    ssn->server.seg_list_tail->payload_len);
+            /* ah ah!  We have some unattended toserver segments */
+            if ((client_ok = StreamHasUnprocessedSegments(ssn, 0)) == 1) {
+                StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
 
-            FlowForceReassemblyPseudoPacketSetup(reassemble_p, 0, f, ssn, 1);
-            StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
-                                             stt->ra_ctx, ssn, &ssn->client,
-                                             reassemble_p, NULL);
-            StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
-        }
+                ssn->client.last_ack = (ssn->client.seg_list_tail->seq +
+                        ssn->client.seg_list_tail->payload_len);
 
-        if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED)
-            tcp_needs_inspection = 1;
-        else
-            tcp_needs_inspection = 0;
+                FlowForceReassemblyPseudoPacketSetup(reassemble_p, 1, f, ssn, 1);
+                StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
+                        stt->ra_ctx, ssn, &ssn->server,
+                        reassemble_p, NULL);
+                StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
+            }
+            /* oh oh!  We have some unattended toclient segments */
+            if ((server_ok = StreamHasUnprocessedSegments(ssn, 1)) == 1) {
+                StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
+
+                ssn->server.last_ack = (ssn->server.seg_list_tail->seq +
+                        ssn->server.seg_list_tail->payload_len);
+
+                FlowForceReassemblyPseudoPacketSetup(reassemble_p, 0, f, ssn, 1);
+                StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
+                        stt->ra_ctx, ssn, &ssn->client,
+                        reassemble_p, NULL);
+                StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
+            }
 
-        SCMutexUnlock(&f->m);
+            if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED)
+                tcp_needs_inspection = 1;
+            else
+                tcp_needs_inspection = 0;
 
-        /* insert a pseudo packet in the toserver direction */
-        if (client_ok || tcp_needs_inspection)
-        {
-            SCMutexLock(&f->m);
-            Packet *p = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 1);
             SCMutexUnlock(&f->m);
 
-            if (p == NULL) {
-                TmqhOutputPacketpool(NULL, reassemble_p);
-                return;
-            }
+            /* insert a pseudo packet in the toserver direction */
+            if (client_ok || tcp_needs_inspection)
+            {
+                SCMutexLock(&f->m);
+                Packet *p = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 1);
+                SCMutexUnlock(&f->m);
 
-            if (stream_pseudo_pkt_detect_prev_TV != NULL) {
-                stream_pseudo_pkt_detect_prev_TV->
-                    tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
-            } else {
-                TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
-                while (s != NULL) {
-                    s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
-                                &s->slot_post_pq);
-                    s = s->slot_next;
+                if (p == NULL) {
+                    TmqhOutputPacketpool(NULL, reassemble_p);
+                    return;
                 }
 
-                if (stream_pseudo_pkt_detect_TV != NULL) {
-                    stream_pseudo_pkt_detect_TV->
-                        tmqh_out(stream_pseudo_pkt_detect_TV, p);
-                }
-            }
-        } /* if (ssn->client.seg_list != NULL) */
-        if (server_ok || tcp_needs_inspection)
-        {
-            SCMutexLock(&f->m);
-            Packet *p = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
-            SCMutexUnlock(&f->m);
+                if (stream_pseudo_pkt_detect_prev_TV != NULL) {
+                    stream_pseudo_pkt_detect_prev_TV->
+                        tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
+                } else {
+                    TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
+                    while (s != NULL) {
+                        s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
+                                &s->slot_post_pq);
+                        s = s->slot_next;
+                    }
 
-            if (p == NULL) {
-                TmqhOutputPacketpool(NULL, reassemble_p);
-                return;
-            }
+                    if (stream_pseudo_pkt_detect_TV != NULL) {
+                        stream_pseudo_pkt_detect_TV->
+                            tmqh_out(stream_pseudo_pkt_detect_TV, p);
+                    }
+                }
+            } /* if (ssn->client.seg_list != NULL) */
+            if (server_ok || tcp_needs_inspection)
+            {
+                SCMutexLock(&f->m);
+                Packet *p = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
+                SCMutexUnlock(&f->m);
+
+                if (p == NULL) {
+                    TmqhOutputPacketpool(NULL, reassemble_p);
+                    return;
+                }
 
-            if (stream_pseudo_pkt_detect_prev_TV != NULL) {
-                stream_pseudo_pkt_detect_prev_TV->
-                    tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
-            } else {
-                TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
-                while (s != NULL) {
-                    s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
+                if (stream_pseudo_pkt_detect_prev_TV != NULL) {
+                    stream_pseudo_pkt_detect_prev_TV->
+                        tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
+                } else {
+                    TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
+                    while (s != NULL) {
+                        s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
                                 &s->slot_post_pq);
-                    s = s->slot_next;
-                }
+                        s = s->slot_next;
+                    }
 
-                if (stream_pseudo_pkt_detect_TV != NULL) {
-                    stream_pseudo_pkt_detect_TV->
-                        tmqh_out(stream_pseudo_pkt_detect_TV, p);
+                    if (stream_pseudo_pkt_detect_TV != NULL) {
+                        stream_pseudo_pkt_detect_TV->
+                            tmqh_out(stream_pseudo_pkt_detect_TV, p);
+                    }
                 }
-            }
-        } /* if (ssn->server.seg_list != NULL) */
+            } /* if (ssn->server.seg_list != NULL) */
 
-        /* next flow in the queue */
-        f = f->lnext;
-    } /* while (f != NULL) */
+            /* next flow in the queue */
+            f = f->hnext;
+        } /* while (f != NULL) */
+        FBLOCK_UNLOCK(fb);
+    }
 
     TmqhOutputPacketpool(NULL, reassemble_p);
-
     return;
 }
 
@@ -610,9 +619,7 @@ void FlowForceReassembly(void)
 
     /** ----- Part 3 ----- **/
     /* Carry out flow reassembly for unattended flows */
-    FlowForceReassemblyForQ(&flow_new_q[FLOW_PROTO_TCP]);
-    FlowForceReassemblyForQ(&flow_est_q[FLOW_PROTO_TCP]);
-    FlowForceReassemblyForQ(&flow_close_q[FLOW_PROTO_TCP]);
+    FlowForceReassemblyForHash();
 
     return;
 }
index 51ea7be87973ced5cad41b813a6a9fad486e0b67..c36a4fecf392329c481767ce4a1f93ff86117bea 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -26,7 +26,6 @@
 
 int FlowForceReassemblyForFlowV2(Flow *f, int server, int client);
 int FlowForceReassemblyNeedReassmbly(Flow *f, int *server, int *client);
-//int FlowForceReassemblyForFlowV2(Flow *);
 void FlowForceReassembly(void);
 void FlowForceReassemblySetup(void);
 
index 206bd8f3ef9d4179d15ae7f2a7a1178770542e69..71016c997a6d0f59043751937476d2a74a09d107 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
index c517540099be434a432f3bf443f52012fbe4a5e3..103642afdcb69472a9154478cd39dd3b0e03918e 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
index 2cfc6a9f6080c36ded0c86346c61869409cfefeb..c11517b587f5777c2b6cdd293755afd376bafbaa 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -21,9 +21,6 @@
  *  \author Victor Julien <victor@inliniac.net>
  *
  *  Flow implementation.
- *
- *  \todo Maybe place the flow that we get a packet for on top of the
- *    list in the bucket. This rewards active flows.
  */
 
 #include "suricata-common.h"
 
 #define FLOW_DEFAULT_PREALLOC    10000
 
+/** atomic int that is used when freeing a flow from the hash. In this
+ *  case we walk the hash to find a flow to free. This var records where
+ *  we left off in the hash. Without this only the top rows of the hash
+ *  are freed. This isn't just about fairness. Under severe presure, the
+ *  hash rows on top would be all freed and the time to find a flow to
+ *  free increased with every run. */
+SC_ATOMIC_DECLARE(unsigned int, flow_prune_idx);
+
+/** atomic flags */
+SC_ATOMIC_DECLARE(unsigned char, flow_flags);
+
 void FlowRegisterTests(void);
 void FlowInitFlowProto();
 int FlowSetProtoTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
 int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
-static int FlowClearMemory(Flow *,uint8_t );
 int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *));
 int FlowSetFlowStateFunc(uint8_t , int (*GetProtoState)(void *));
-static uint32_t FlowPruneFlowQueueCnt(FlowQueue *, struct timeval *, int);
-int FlowKill(FlowQueue *);
 
 /* Run mode selected at suricata.c */
 extern int run_mode;
@@ -96,520 +101,6 @@ void FlowCleanupAppLayer(Flow *f)
     return;
 }
 
-/** \brief Update the flows position in the queue's
- *  \param f Flow to requeue.
- *  \todo if we have a flow state func rely on that soly
- *
- *  In-use flows are in the flow_new_q, flow_est_q lists or flow_close_q lists.
- */
-void FlowUpdateQueue(Flow *f)
-{
-    if (f->flags & FLOW_NEW_LIST) {
-        /* in the new list -- we consider a flow no longer
-         * new if we have seen at least 2 pkts in both ways. */
-        if (f->flags & FLOW_TO_DST_SEEN && f->flags & FLOW_TO_SRC_SEEN) {
-            FlowRequeue(f, &flow_new_q[f->protomap], &flow_est_q[f->protomap]);
-
-            f->flags |= FLOW_EST_LIST; /* transition */
-            f->flags &= ~FLOW_NEW_LIST;
-        } else {
-            FlowRequeueMoveToBot(f, &flow_new_q[f->protomap]);
-        }
-    } else if (f->flags & FLOW_EST_LIST) {
-        if (flow_proto[f->protomap].GetProtoState != NULL) {
-            uint8_t state = flow_proto[f->protomap].GetProtoState(f->protoctx);
-            if (state == FLOW_STATE_CLOSED) {
-                f->flags |= FLOW_CLOSED_LIST; /* transition */
-                f->flags &=~ FLOW_EST_LIST;
-
-                SCLogDebug("flow %p was put into closing queue ts %"PRIuMAX"", f, (uintmax_t)f->lastts_sec);
-                FlowRequeue(f, &flow_est_q[f->protomap], &flow_close_q[f->protomap]);
-            } else {
-                /* Pull and put back -- this way the flows on
-                 * top of the list are least recently used. */
-                FlowRequeueMoveToBot(f, &flow_est_q[f->protomap]);
-            }
-        } else {
-            /* Pull and put back -- this way the flows on
-             * top of the list are least recently used. */
-            FlowRequeueMoveToBot(f, &flow_est_q[f->protomap]);
-        }
-    } else if (f->flags & FLOW_CLOSED_LIST){
-        /* for the case of ssn reuse in TCP sessions we need to be able to pull
-         * a flow back from the dungeon and inject adrenalin straight into it's
-         * heart. */
-        if (flow_proto[f->protomap].GetProtoState != NULL) {
-            uint8_t state = flow_proto[f->protomap].GetProtoState(f->protoctx);
-            if (state == FLOW_STATE_NEW) {
-                f->flags |= FLOW_NEW_LIST; /* transition */
-                f->flags &=~ FLOW_CLOSED_LIST;
-
-                SCLogDebug("flow %p was put into new queue ts %"PRIuMAX"", f, (uintmax_t)f->lastts_sec);
-                FlowRequeue(f, &flow_close_q[f->protomap], &flow_new_q[f->protomap]);
-            } else {
-                /* Pull and put back -- this way the flows on
-                 * top of the list are least recently used. */
-                FlowRequeueMoveToBot(f, &flow_close_q[f->protomap]);
-            }
-        } else {
-            /* Pull and put back -- this way the flows on
-             * top of the list are least recently used. */
-            FlowRequeueMoveToBot(f, &flow_close_q[f->protomap]);
-        }
-    }
-
-    return;
-}
-
-#ifdef FLOW_PRUNE_DEBUG
-static uint64_t prune_queue_lock = 0;
-static uint64_t prune_queue_empty = 0;
-static uint64_t prune_flow_lock = 0;
-static uint64_t prune_bucket_lock = 0;
-static uint64_t prune_no_timeout = 0;
-static uint64_t prune_usecnt = 0;
-#endif
-
-/** \internal
- *  \brief get timeout for flow
- *
- *  \param f flow
- *  \param emergency bool indicating emergency mode 1 yes, 0 no
- *
- *  \retval timeout timeout in seconds
- */
-static inline uint32_t FlowPruneGetFlowTimeout(Flow *f, int emergency) {
-    uint32_t timeout;
-
-    if (emergency) {
-        if (flow_proto[f->protomap].GetProtoState != NULL) {
-            switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
-                default:
-                case FLOW_STATE_NEW:
-                    timeout = flow_proto[f->protomap].emerg_new_timeout;
-                    break;
-                case FLOW_STATE_ESTABLISHED:
-                    timeout = flow_proto[f->protomap].emerg_est_timeout;
-                    break;
-                case FLOW_STATE_CLOSED:
-                    timeout = flow_proto[f->protomap].emerg_closed_timeout;
-                    break;
-            }
-        } else {
-            if (f->flags & FLOW_EST_LIST)
-                timeout = flow_proto[f->protomap].emerg_est_timeout;
-            else
-                timeout = flow_proto[f->protomap].emerg_new_timeout;
-        }
-    } else { /* implies no emergency */
-        if (flow_proto[f->protomap].GetProtoState != NULL) {
-            switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
-                default:
-                case FLOW_STATE_NEW:
-                    timeout = flow_proto[f->protomap].new_timeout;
-                    break;
-                case FLOW_STATE_ESTABLISHED:
-                    timeout = flow_proto[f->protomap].est_timeout;
-                    break;
-                case FLOW_STATE_CLOSED:
-                    timeout = flow_proto[f->protomap].closed_timeout;
-                    break;
-            }
-        } else {
-            if (f->flags & FLOW_EST_LIST)
-                timeout = flow_proto[f->protomap].est_timeout;
-            else
-                timeout = flow_proto[f->protomap].new_timeout;
-        }
-    }
-
-    return timeout;
-}
-
-/** FlowPrune
- *
- * Inspect top (last recently used) flow from the queue and see if
- * we need to prune it.
- *
- * Use trylock here so prevent us from blocking the packet handling.
- *
- * \param q       Flow queue to prune
- * \param ts      Current time
- * \param try_cnt Tries to prune the first try_cnt no of flows in the q
- *
- * \retval 0 on error, failed block, nothing to prune
- * \retval cnt on successfully pruned, cnt flows were pruned
- */
-static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt)
-{
-    SCEnter();
-    int cnt = 0;
-    int try_cnt_temp = 0;
-
-    int mr = SCMutexTrylock(&q->mutex_q);
-    if (mr != 0) {
-        SCLogDebug("trylock failed");
-        if (mr == EBUSY)
-            SCLogDebug("was locked");
-        if (mr == EINVAL)
-            SCLogDebug("bad mutex value");
-
-#ifdef FLOW_PRUNE_DEBUG
-        prune_queue_lock++;
-#endif
-        return 0;
-    }
-
-    Flow *f = q->top;
-
-    /* label */
-    while (f != NULL) {
-        if (try_cnt != 0 && try_cnt_temp == try_cnt) {
-            SCMutexUnlock(&q->mutex_q);
-            return cnt;
-        }
-        try_cnt_temp++;
-
-        if (f == NULL) {
-            SCMutexUnlock(&q->mutex_q);
-            SCLogDebug("top is null");
-
-#ifdef FLOW_PRUNE_DEBUG
-            prune_queue_empty++;
-#endif
-            return cnt;
-        }
-
-        if (SCMutexTrylock(&f->m) != 0) {
-            SCLogDebug("cant lock 1");
-
-#ifdef FLOW_PRUNE_DEBUG
-            prune_flow_lock++;
-#endif
-            f = f->lnext;
-            continue;
-        }
-
-        if (SCSpinTrylock(&f->fb->s) != 0) {
-            SCMutexUnlock(&f->m);
-            SCLogDebug("cant lock 2");
-
-#ifdef FLOW_PRUNE_DEBUG
-            prune_bucket_lock++;
-#endif
-            f = f->lnext;
-            continue;
-        }
-
-        /*set the timeout value according to the flow operating mode, flow's state
-          and protocol.*/
-        uint32_t timeout = FlowPruneGetFlowTimeout(f, flow_flags & FLOW_EMERGENCY ? 1 : 0);
-
-        SCLogDebug("got lock, now check: %" PRIdMAX "+%" PRIu32 "=(%" PRIdMAX ") < "
-                   "%" PRIdMAX "", (intmax_t)f->lastts_sec,
-                   timeout, (intmax_t)f->lastts_sec + timeout,
-                   (intmax_t)ts->tv_sec);
-
-        /* do the timeout check */
-        if ((int32_t)(f->lastts_sec + timeout) >= ts->tv_sec) {
-            SCSpinUnlock(&f->fb->s);
-            SCMutexUnlock(&f->m);
-            SCMutexUnlock(&q->mutex_q);
-            SCLogDebug("timeout check failed");
-
-#ifdef FLOW_PRUNE_DEBUG
-            prune_no_timeout++;
-#endif
-            return cnt;
-        }
-
-        /** never prune a flow that is used by a packet or stream msg
-         *  we are currently processing in one of the threads */
-        if (SC_ATOMIC_GET(f->use_cnt) > 0) {
-            SCLogDebug("timed out but use_cnt > 0: %"PRIu16", %p, proto %"PRIu8"", SC_ATOMIC_GET(f->use_cnt), f, f->proto);
-            SCLogDebug("it is in one of the threads");
-
-#ifdef FLOW_PRUNE_DEBUG
-            prune_usecnt++;
-#endif
-            SCSpinUnlock(&f->fb->s);
-            SCMutexUnlock(&f->m);
-            f = f->lnext;
-            continue;
-        }
-
-        int server = 0, client = 0;
-        if (FlowForceReassemblyNeedReassmbly(f, &server, &client) == 1) {
-            /* we no longer need the fb lock. We know this flow won't be timed
-             * out just yet. So an incoming pkt is allowed to pick up this
-             * flow. */
-            SCSpinUnlock(&f->fb->s);
-
-            FlowForceReassemblyForFlowV2(f, server, client);
-            SCMutexUnlock(&f->m);
-
-            f = f->lnext;
-            continue;
-        }
-#ifdef DEBUG
-        /* this should not be possible */
-        BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
-#endif
-        /* remove from the hash */
-        if (f->hprev != NULL)
-            f->hprev->hnext = f->hnext;
-        if (f->hnext != NULL)
-            f->hnext->hprev = f->hprev;
-        if (f->fb->f == f)
-            f->fb->f = f->hnext;
-
-        f->hnext = NULL;
-        f->hprev = NULL;
-
-        SCSpinUnlock(&f->fb->s);
-        f->fb = NULL;
-
-        FlowClearMemory (f, f->protomap);
-        Flow *next_flow = f->lnext;
-
-        /* no one is referring to this flow, use_cnt 0, removed from hash
-         * so we can unlock it and move it back to the spare queue. */
-        SCMutexUnlock(&f->m);
-
-        /* move to spare list */
-        FlowRequeueMoveToSpare(f, q);
-
-        f = next_flow;
-        cnt++;
-    }
-
-    SCMutexUnlock(&q->mutex_q);
-    return cnt;
-}
-
-/** \brief Time out flows.
- *  \param q flow queue to time out flows from
- *  \param ts current time
- *  \param timeout timeout to consider
- *  \retval cnt number of flows that are timed out
- */
-uint32_t FlowPruneFlowQueue(FlowQueue *q, struct timeval *ts)
-{
-    SCEnter();
-    return FlowPrune(q, ts, 0);
-}
-
-/** \brief Time out flows on new/estabhlished/close queues for each proto until
- *         we release cnt flows as max
- *         Called from the FlowManager
- *  \param ts current time
- *  \retval cnt number of flows that are timed out
- */
-uint32_t FlowPruneFlowsCnt(struct timeval *ts, int cnt)
-{
-    SCEnter();
-    uint32_t nowcnt = 0;
-    int i = 0;
-
-    for (; i < FLOW_PROTO_MAX; i++) {
-        /* prune closing list */
-        nowcnt = FlowPruneFlowQueueCnt(&flow_close_q[i], ts, cnt);
-        if (nowcnt) {
-            cnt -= nowcnt;
-        }
-        if (cnt <= 0)
-            break;
-
-        /* prune new list */
-        nowcnt = FlowPruneFlowQueueCnt(&flow_new_q[i], ts, cnt);
-        if (nowcnt) {
-            cnt -= nowcnt;
-        }
-        if (cnt <= 0)
-            break;
-
-        /* prune established list */
-        nowcnt = FlowPruneFlowQueueCnt(&flow_est_q[i], ts, cnt);
-        if (nowcnt) {
-            cnt -= nowcnt;
-        }
-        if (cnt <= 0)
-            break;
-    }
-
-    return cnt;
-}
-
-/** \brief FlowKillFlowQueueCnt It will try to kill try_cnt count of flows
- *         It will return the number of flows released, and can be 0 or more.
- *  \param q flow queue to time out flows from
- *  \param try_cnt try to prune this number of flows
- *  \retval cnt number of flows that are timed out
- */
-static uint32_t FlowKillFlowQueueCnt(FlowQueue *q, int try_cnt, uint8_t mode)
-{
-    SCEnter();
-    uint32_t cnt = 0;
-    while (try_cnt--) {
-        cnt += FlowKill(q);
-    }
-    SCLogDebug("EMERGENCY mode, Flows killed: %"PRIu32, cnt);
-
-    return cnt;
-}
-
-/** FlowKill
- *
- * Inspect the top flows (last recently used) from the queue
- * and see if we can prune any it (this is if it's not in use).
- *
- * Use trylock here so prevent us from blocking the packet handling.
- *
- * \param q flow queue to prune
- *
- * \retval 0 on error, failed block, nothing to prune
- * \retval 1 on successfully pruned one
- */
-int FlowKill (FlowQueue *q)
-{
-    SCEnter();
-    int mr = SCMutexTrylock(&q->mutex_q);
-
-    if (mr != 0) {
-        SCLogDebug("trylock failed");
-        if (mr == EBUSY)
-            SCLogDebug("was locked");
-        if (mr == EINVAL)
-            SCLogDebug("bad mutex value");
-        return 0;
-    }
-
-    Flow *f = q->top;
-
-    /* This means that the queue is empty */
-    if (f == NULL) {
-        SCMutexUnlock(&q->mutex_q);
-        SCLogDebug("top is null");
-        return 0;
-    }
-
-    do {
-        if (SCMutexTrylock(&f->m) != 0) {
-            f = f->lnext;
-            /* Skip to the next */
-            continue;
-        }
-
-        if (SCSpinTrylock(&f->fb->s) != 0) {
-            SCMutexUnlock(&f->m);
-            f = f->lnext;
-            continue;
-        }
-
-        /** never prune a flow that is used by a packet or stream msg
-         *  we are currently processing in one of the threads */
-        if (SC_ATOMIC_GET(f->use_cnt) > 0) {
-            SCSpinUnlock(&f->fb->s);
-            SCMutexUnlock(&f->m);
-            f = f->lnext;
-            continue;
-        }
-
-        /* remove from the hash */
-        if (f->hprev)
-            f->hprev->hnext = f->hnext;
-        if (f->hnext)
-            f->hnext->hprev = f->hprev;
-        if (f->fb->f == f)
-            f->fb->f = f->hnext;
-
-        f->hnext = NULL;
-        f->hprev = NULL;
-
-        SCSpinUnlock(&f->fb->s);
-        f->fb = NULL;
-
-        FlowClearMemory (f, f->protomap);
-
-        /* no one is referring to this flow, use_cnt 0, removed from hash
-         * so we can unlock it and move it back to the spare queue. */
-        SCMutexUnlock(&f->m);
-
-        /* move to spare list */
-        FlowRequeueMoveToSpare(f, q);
-
-        /* so.. we did it */
-        /* unlock queue */
-        SCMutexUnlock(&q->mutex_q);
-        return 1;
-    } while (f != NULL);
-
-    /* If we reach this point, then we didn't prune any */
-    /* unlock list */
-    SCMutexUnlock(&q->mutex_q);
-
-    return 0;
-}
-
-
-/** \brief Try to kill cnt flows by last recently seen activity on new/estabhlished/close queues for each proto until
- *         we release cnt flows as max. Called only on emergency mode.
- * \param cnt number of flows to release
- * \retval cnt number of flows that are not killed (so 0 if we prune all of them)
- */
-uint32_t FlowKillFlowsCnt(int cnt)
-{
-    SCEnter();
-    uint32_t nowcnt = 0;
-    int i = 0;
-
-    /* Inspect the top of each protocol to select the last recently used */
-    for (; i < FLOW_PROTO_MAX; i++) {
-        /* prune closing list */
-        nowcnt = FlowKillFlowQueueCnt(&flow_close_q[i], cnt, 0);
-        if (nowcnt) {
-            cnt -= nowcnt;
-        }
-        if (cnt <= 0)
-            break;
-
-        /* prune new list */
-        nowcnt = FlowKillFlowQueueCnt(&flow_new_q[i], cnt, 0);
-        if (nowcnt) {
-            cnt -= nowcnt;
-        }
-        if (cnt <= 0)
-            break;
-
-        /* prune established list */
-        nowcnt = FlowKillFlowQueueCnt(&flow_est_q[i], cnt, 0);
-        if (nowcnt) {
-            cnt -= nowcnt;
-        }
-        if (cnt <= 0)
-            break;
-    }
-
-    return cnt;
-}
-
-/** \brief Time out flows will try to prune try_cnt count of flows
- *         It will return the number of flows released, and can be 0 or more.
- *         A more agressive aproach is calling this function with the emergency
- *         bit set (and there will be another even more agressive, killing
- *         flows without the criteria of time outs)
- *  \param q flow queue to time out flows from
- *  \param ts current time
- *  \param timeout timeout to consider
- *  \param try_cnt try to prune this number of flows if they are timed out
- *  \retval cnt number of flows that are timed out
- */
-static uint32_t FlowPruneFlowQueueCnt(FlowQueue *q, struct timeval *ts, int try_cnt)
-{
-    SCEnter();
-    return FlowPrune(q, ts, try_cnt);
-}
-
 /** \brief Make sure we have enough spare flows. 
  *
  *  Enforce the prealloc parameter, so keep at least prealloc flows in the
@@ -623,11 +114,9 @@ int FlowUpdateSpareFlows(void)
     SCEnter();
     uint32_t toalloc = 0, tofree = 0, len;
 
-    SCMutexLock(&flow_spare_q.mutex_q);
-
+    FQLOCK_LOCK(&flow_spare_q);
     len = flow_spare_q.len;
-
-    SCMutexUnlock(&flow_spare_q.mutex_q);
+    FQLOCK_UNLOCK(&flow_spare_q);
 
     if (len < flow_config.prealloc) {
         toalloc = flow_config.prealloc - len;
@@ -809,9 +298,6 @@ void FlowHandlePacket (ThreadVars *tv, Packet *p)
         p->flowflags |= FLOW_PKT_ESTABLISHED;
     }
 
-    /* update queue positions */
-    FlowUpdateQueue(f);
-
     /*set the detection bypass flags*/
     if (f->flags & FLOW_NOPACKET_INSPECTION) {
         SCLogDebug("setting FLOW_NOPACKET_INSPECTION flag on flow %p", f);
@@ -837,15 +323,10 @@ void FlowInitConfig(char quiet)
     SCLogDebug("initializing flow engine...");
 
     memset(&flow_config,  0, sizeof(flow_config));
+    SC_ATOMIC_INIT(flow_flags);
     SC_ATOMIC_INIT(flow_memuse);
-
-    int ifq = 0;
+    SC_ATOMIC_INIT(flow_prune_idx);
     FlowQueueInit(&flow_spare_q);
-    for (ifq = 0; ifq < FLOW_PROTO_MAX; ifq++) {
-        FlowQueueInit(&flow_new_q[ifq]);
-        FlowQueueInit(&flow_est_q[ifq]);
-        FlowQueueInit(&flow_close_q[ifq]);
-    }
 
     unsigned int seed = RandomTimePreseed();
     /* set defaults */
@@ -915,11 +396,11 @@ void FlowInitConfig(char quiet)
         SCLogError(SC_ERR_FATAL, "Fatal error encountered in FlowInitConfig. Exiting...");
         exit(EXIT_FAILURE);
     }
-    uint32_t i = 0;
-
     memset(flow_hash, 0, flow_config.hash_size * sizeof(FlowBucket));
+
+    uint32_t i = 0;
     for (i = 0; i < flow_config.hash_size; i++) {
-        SCSpinInit(&flow_hash[i].s, 0);
+        FBLOCK_INIT(&flow_hash[i]);
     }
     SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket)));
 
@@ -959,51 +440,12 @@ void FlowInitConfig(char quiet)
 
 /** \brief print some flow stats
  *  \warning Not thread safe */
-void FlowPrintQueueInfo (void)
+static void FlowPrintStats (void)
 {
-    int i;
-    SCLogDebug("flow queue info:");
-    SCLogDebug("spare flow queue %" PRIu32 "", flow_spare_q.len);
-#ifdef DBG_PERF
-    SCLogDebug("flow_spare_q.dbg_maxlen %" PRIu32 "", flow_spare_q.dbg_maxlen);
-#endif
-    for (i = 0; i < FLOW_PROTO_MAX; i++) {
-        SCLogDebug("proto [%"PRId32"] new flow queue %" PRIu32 " "
-#ifdef DBG_PERF
-                  " - flow_new_q.dbg_maxlen %" PRIu32 ""
-#endif
-                  ,i,flow_new_q[i].len
-#ifdef DBG_PERF
-                  ,flow_new_q[i].dbg_maxlen
-#endif
-                  );
-
-        SCLogDebug("proto [%"PRId32"] establised flow queue %" PRIu32 " "
-#ifdef DBG_PERF
-                  " - flow_est_q.dbg_maxlen %" PRIu32 ""
-#endif
-                  ,i,flow_est_q[i].len
-#ifdef DBG_PERF
-                  ,flow_est_q[i].dbg_maxlen
-#endif
-                  );
-
-        SCLogDebug("proto [%"PRId32"] closing flow queue %" PRIu32 " "
-#ifdef DBG_PERF
-                  " - flow_closing_q.dbg_maxlen %" PRIu32 ""
-#endif
-                  ,i,flow_close_q[i].len
-#ifdef DBG_PERF
-                  ,flow_close_q[i].dbg_maxlen
-#endif
-                  );
-
-    }
 #ifdef FLOWBITS_STATS
     SCLogInfo("flowbits added: %" PRIu32 ", removed: %" PRIu32 ", max memory usage: %" PRIu32 "",
         flowbits_added, flowbits_removed, flowbits_memuse_max);
 #endif /* FLOWBITS_STATS */
-
     return;
 }
 
@@ -1012,48 +454,39 @@ void FlowPrintQueueInfo (void)
 void FlowShutdown(void)
 {
     Flow *f;
-    int i;
     uint32_t u;
 
+    FlowPrintStats();
+
+    /* free spare queue */
     while((f = FlowDequeue(&flow_spare_q))) {
         FlowFree(f);
     }
-    for (i = 0; i < FLOW_PROTO_MAX; i++) {
-        while((f = FlowDequeue(&flow_new_q[i]))) {
-            uint8_t proto_map = FlowGetProtoMapping(f->proto);
-            FlowClearMemory(f, proto_map);
-            FlowFree(f);
-        }
-        while((f = FlowDequeue(&flow_est_q[i]))) {
-            uint8_t proto_map = FlowGetProtoMapping(f->proto);
-            FlowClearMemory(f, proto_map);
-            FlowFree(f);
-        }
-        while((f = FlowDequeue(&flow_close_q[i]))) {
-            uint8_t proto_map = FlowGetProtoMapping(f->proto);
-            FlowClearMemory(f, proto_map);
-            FlowFree(f);
-        }
-    }
 
+    /* clear and free the hash */
     if (flow_hash != NULL) {
         /* clean up flow mutexes */
         for (u = 0; u < flow_config.hash_size; u++) {
-            SCSpinDestroy(&flow_hash[u].s);
+            Flow *f = flow_hash[u].head;
+            while (f) {
+                Flow *n = f->hnext;
+                uint8_t proto_map = FlowGetProtoMapping(f->proto);
+                FlowClearMemory(f, proto_map);
+                FlowFree(f);
+                f = n;
+            }
+
+            FBLOCK_DESTROY(&flow_hash[u]);
         }
         SCFree(flow_hash);
         flow_hash = NULL;
     }
     SC_ATOMIC_SUB(flow_memuse, flow_config.hash_size * sizeof(FlowBucket));
-
-    int ifq = 0;
     FlowQueueDestroy(&flow_spare_q);
-    for (ifq = 0; ifq < FLOW_PROTO_MAX; ifq++) {
-        FlowQueueDestroy(&flow_new_q[ifq]);
-        FlowQueueDestroy(&flow_est_q[ifq]);
-        FlowQueueDestroy(&flow_close_q[ifq]);
-    }
 
+    SC_ATOMIC_DESTROY(flow_prune_idx);
+    SC_ATOMIC_DESTROY(flow_memuse);
+    SC_ATOMIC_DESTROY(flow_flags);
     return;
 }
 
@@ -1310,7 +743,7 @@ void FlowInitFlowProto(void)
  *  \param  proto_map   mapped value of the protocol to FLOW_PROTO's.
  */
 
-static int FlowClearMemory(Flow* f, uint8_t proto_map)
+int FlowClearMemory(Flow* f, uint8_t proto_map)
 {
     SCEnter();
 
@@ -1490,260 +923,6 @@ static int FlowTest02 (void) {
     return 1;
 }
 
-/**
- *  \brief   Function to test the prunning of the flow in different flow modes.
- *
- *  \param   f    Pointer to the flow to be prunned
- *  \param   ts   time value against which the flow will be checked
- *
- *  \retval on success returns 1 and on failure 0
- */
-
-static int FlowTestPrune(Flow *f, struct timeval *ts) {
-
-    FlowQueue *q = FlowQueueNew();
-    if (q == NULL) {
-        goto error;
-    }
-
-    q->top = NULL;
-
-    FlowEnqueue(q, f);
-    if (q->len != 1) {
-        printf("Failed in enqueue the flow in flowqueue: ");
-        goto error;
-    }
-
-    SCLogDebug("calling FlowPrune");
-    FlowPrune(q, ts, 0);
-    if (q->len != 0) {
-        printf("Failed in prunning the flow: ");
-        goto error;
-    }
-
-    if (f->protoctx != NULL){
-        printf("Failed in freeing the TcpSession: ");
-        goto error;
-    }
-
-    return 1;
-
-error:
-    if (q != NULL) {
-        FlowQueueDestroy(q);
-    }
-    return 0;
-}
-
-/**
- *  \test   Test the timing out of a flow with a fresh TcpSession
- *          (just initialized, no data segments) in normal mode.
- *
- *  \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest03 (void) {
-
-    TcpSession ssn;
-    Flow f;
-    FlowBucket fb;
-    struct timeval ts;
-
-    FlowQueueInit(&flow_spare_q);
-
-    memset(&ssn, 0, sizeof(TcpSession));
-    memset(&f, 0, sizeof(Flow));
-    memset(&ts, 0, sizeof(ts));
-    memset(&fb, 0, sizeof(FlowBucket));
-
-    SCSpinInit(&fb.s, 0);
-
-    FLOW_INITIALIZE(&f);
-    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
-    TimeGet(&ts);
-    f.lastts_sec = ts.tv_sec - 5000;
-    f.protoctx = &ssn;
-    f.fb = &fb;
-
-    f.proto = IPPROTO_TCP;
-
-    if (FlowTestPrune(&f, &ts) != 1) {
-        SCSpinDestroy(&fb.s);
-        FLOW_DESTROY(&f);
-        FlowQueueDestroy(&flow_spare_q);
-        return 0;
-    }
-
-    SCSpinDestroy(&fb.s);
-    FLOW_DESTROY(&f);
-
-    FlowQueueDestroy(&flow_spare_q);
-    return 1;
-}
-
-/**
- *  \test   Test the timing out of a flow with a TcpSession
- *          (with data segments) in normal mode.
- *
- *  \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest04 (void) {
-
-    TcpSession ssn;
-    Flow f;
-    FlowBucket fb;
-    struct timeval ts;
-    TcpSegment seg;
-    TcpStream client;
-    uint8_t payload[3] = {0x41, 0x41, 0x41};
-
-    FlowQueueInit(&flow_spare_q);
-
-    memset(&ssn, 0, sizeof(TcpSession));
-    memset(&f, 0, sizeof(Flow));
-    memset(&fb, 0, sizeof(FlowBucket));
-    memset(&ts, 0, sizeof(ts));
-    memset(&seg, 0, sizeof(TcpSegment));
-    memset(&client, 0, sizeof(TcpSegment));
-
-    SCSpinInit(&fb.s, 0);
-    FLOW_INITIALIZE(&f);
-    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
-    TimeGet(&ts);
-    seg.payload = payload;
-    seg.payload_len = 3;
-    seg.next = NULL;
-    seg.prev = NULL;
-    client.seg_list = &seg;
-    ssn.client = client;
-    ssn.server = client;
-    ssn.state = TCP_ESTABLISHED;
-    f.lastts_sec = ts.tv_sec - 5000;
-    f.protoctx = &ssn;
-    f.fb = &fb;
-    f.proto = IPPROTO_TCP;
-
-    if (FlowTestPrune(&f, &ts) != 1) {
-        SCSpinDestroy(&fb.s);
-        FLOW_DESTROY(&f);
-        FlowQueueDestroy(&flow_spare_q);
-        return 0;
-    }
-    SCSpinDestroy(&fb.s);
-    FLOW_DESTROY(&f);
-    FlowQueueDestroy(&flow_spare_q);
-    return 1;
-
-}
-
-/**
- *  \test   Test the timing out of a flow with a fresh TcpSession
- *          (just initialized, no data segments) in emergency mode.
- *
- *  \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest05 (void) {
-
-    TcpSession ssn;
-    Flow f;
-    FlowBucket fb;
-    struct timeval ts;
-
-    FlowQueueInit(&flow_spare_q);
-
-    memset(&ssn, 0, sizeof(TcpSession));
-    memset(&f, 0, sizeof(Flow));
-    memset(&ts, 0, sizeof(ts));
-    memset(&fb, 0, sizeof(FlowBucket));
-
-    SCSpinInit(&fb.s, 0);
-    FLOW_INITIALIZE(&f);
-    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
-    TimeGet(&ts);
-    ssn.state = TCP_SYN_SENT;
-    f.lastts_sec = ts.tv_sec - 300;
-    f.protoctx = &ssn;
-    f.fb = &fb;
-    f.proto = IPPROTO_TCP;
-    f.flags |= FLOW_EMERGENCY;
-
-    if (FlowTestPrune(&f, &ts) != 1) {
-        SCSpinDestroy(&fb.s);
-        FLOW_DESTROY(&f);
-        FlowQueueDestroy(&flow_spare_q);
-        return 0;
-    }
-
-    SCSpinDestroy(&fb.s);
-    FLOW_DESTROY(&f);
-    FlowQueueDestroy(&flow_spare_q);
-    return 1;
-}
-
-/**
- *  \test   Test the timing out of a flow with a TcpSession
- *          (with data segments) in emergency mode.
- *
- *  \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest06 (void) {
-
-    TcpSession ssn;
-    Flow f;
-    FlowBucket fb;
-    struct timeval ts;
-    TcpSegment seg;
-    TcpStream client;
-    uint8_t payload[3] = {0x41, 0x41, 0x41};
-
-    FlowQueueInit(&flow_spare_q);
-
-    memset(&ssn, 0, sizeof(TcpSession));
-    memset(&f, 0, sizeof(Flow));
-    memset(&fb, 0, sizeof(FlowBucket));
-    memset(&ts, 0, sizeof(ts));
-    memset(&seg, 0, sizeof(TcpSegment));
-    memset(&client, 0, sizeof(TcpSegment));
-
-    SCSpinInit(&fb.s, 0);
-    FLOW_INITIALIZE(&f);
-    f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
-    TimeGet(&ts);
-    seg.payload = payload;
-    seg.payload_len = 3;
-    seg.next = NULL;
-    seg.prev = NULL;
-    client.seg_list = &seg;
-    ssn.client = client;
-    ssn.server = client;
-    ssn.state = TCP_ESTABLISHED;
-    f.lastts_sec = ts.tv_sec - 5000;
-    f.protoctx = &ssn;
-    f.fb = &fb;
-    f.proto = IPPROTO_TCP;
-    f.flags |= FLOW_EMERGENCY;
-
-    if (FlowTestPrune(&f, &ts) != 1) {
-        SCSpinDestroy(&fb.s);
-        FLOW_DESTROY(&f);
-        FlowQueueDestroy(&flow_spare_q);
-        return 0;
-    }
-
-    SCSpinDestroy(&fb.s);
-    FLOW_DESTROY(&f);
-    FlowQueueDestroy(&flow_spare_q);
-    return 1;
-
-}
-
 /**
  *  \test   Test flow allocations when it reach memcap
  *
@@ -1780,8 +959,9 @@ static int FlowTest07 (void) {
     end = end + 2;;
     UTHBuildPacketOfFlows(ini, end, 0);
 
-    /* This means that the engine released 5 flows by normal timeout */
-    if (flow_spare_q.len == 5)
+    /* This means that the engine entered emerg mode: should happen as easy
+     * with flow mgr activated */
+    if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
         result = 1;
 
     memcpy(&flow_config, &backup, sizeof(FlowConfig));
@@ -1828,7 +1008,7 @@ static int FlowTest08 (void) {
     UTHBuildPacketOfFlows(ini, end, 0);
 
     /* This means that the engine released 5 flows by emergency timeout */
-    if (flow_spare_q.len == 5 && (flow_flags & FLOW_EMERGENCY))
+    if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
         result = 1;
 
     memcpy(&flow_config, &backup, sizeof(FlowConfig));
@@ -1873,8 +1053,8 @@ static int FlowTest09 (void) {
     end = end + 2;
     UTHBuildPacketOfFlows(ini, end, 0);
 
-    /* This means that the engine release 5 flows by killing them */
-    if (flow_spare_q.len == 5 && (flow_flags & FLOW_EMERGENCY))
+    /* engine in emerg mode */
+    if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
         result = 1;
 
     memcpy(&flow_config, &backup, sizeof(FlowConfig));
@@ -1882,6 +1062,7 @@ static int FlowTest09 (void) {
 
     return result;
 }
+
 #endif /* UNITTESTS */
 
 /**
@@ -1891,12 +1072,10 @@ void FlowRegisterTests (void) {
 #ifdef UNITTESTS
     UtRegisterTest("FlowTest01 -- Protocol Specific Timeouts", FlowTest01, 1);
     UtRegisterTest("FlowTest02 -- Setting Protocol Specific Free Function", FlowTest02, 1);
-    UtRegisterTest("FlowTest03 -- Timeout a flow having fresh TcpSession", FlowTest03, 1);
-    UtRegisterTest("FlowTest04 -- Timeout a flow having TcpSession with segments", FlowTest04, 1);
-    UtRegisterTest("FlowTest05 -- Timeout a flow in emergency having fresh TcpSession", FlowTest05, 1);
-    UtRegisterTest("FlowTest06 -- Timeout a flow in emergency having TcpSession with segments", FlowTest06, 1);
     UtRegisterTest("FlowTest07 -- Test flow Allocations when it reach memcap", FlowTest07, 1);
     UtRegisterTest("FlowTest08 -- Test flow Allocations when it reach memcap", FlowTest08, 1);
     UtRegisterTest("FlowTest09 -- Test flow Allocations when it reach memcap", FlowTest09, 1);
+
+    FlowMgrRegisterTests();
 #endif /* UNITTESTS */
 }
index 7d4d0c844200b9a9c07d9ca95ee417bb61ca33b3..e5d62312c308f92944b9846c16063b309fda66f9 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 /** At least on packet from the destination address was seen */
 #define FLOW_TO_DST_SEEN                  0x00000002
 
-/** Flow lives in the flow-state-NEW list */
-#define FLOW_NEW_LIST                     0x00000004
-/** Flow lives in the flow-state-EST (established) list */
-#define FLOW_EST_LIST                     0x00000008
-/** Flow lives in the flow-state-CLOSED list */
-#define FLOW_CLOSED_LIST                  0x00000010
+// vacany 3x
 
 /** Flow was inspected against IP-Only sigs in the toserver direction */
 #define FLOW_TOSERVER_IPONLY_SET          0x00000020
@@ -310,7 +305,6 @@ typedef struct Flow_
     /** queue list pointers, protected by queue mutex */
     struct Flow_ *lnext; /* list */
     struct Flow_ *lprev;
-
     struct timeval startts;
 #ifdef DEBUG
     uint32_t todstpktcnt;
@@ -346,9 +340,6 @@ void FlowSetIPOnlyFlagNoLock(Flow *, char);
 void FlowIncrUsecnt(Flow *);
 void FlowDecrUsecnt(Flow *);
 
-uint32_t FlowPruneFlowsCnt(struct timeval *, int);
-uint32_t FlowKillFlowsCnt(int);
-
 void FlowRegisterTests (void);
 int FlowSetProtoTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
 int FlowSetProtoEmergencyTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
@@ -359,7 +350,6 @@ void FlowUpdateQueue(Flow *);
 struct FlowQueue_;
 
 int FlowUpdateSpareFlows(void);
-uint32_t FlowPruneFlowQueue(struct FlowQueue_ *, struct timeval *);
 
 static inline void FlowLockSetNoPacketInspectionFlag(Flow *);
 static inline void FlowSetNoPacketInspectionFlag(Flow *);
@@ -437,6 +427,7 @@ static inline void FlowSetSessionNoApplayerInspectionFlag(Flow *f) {
     f->flags |= FLOW_NO_APPLAYER_INSPECTION;
 }
 
+int FlowClearMemory(Flow *,uint8_t );
 
 #endif /* __FLOW_H__ */
 
index 2b5eeef3395518f2213bb53f076779feb70996e4..e0144be67c4cd23f963646974a2e2aa5c30e41da 100644 (file)
@@ -305,7 +305,7 @@ TmEcode DecodePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, P
     SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, GET_PKT_LEN(p));
 
     double curr_ts = p->ts.tv_sec + p->ts.tv_usec / 1000.0;
-    if (curr_ts < prev_signaled_ts || (curr_ts - prev_signaled_ts) > 2.0) {
+    if (curr_ts < prev_signaled_ts || (curr_ts - prev_signaled_ts) > 60.0) {
         prev_signaled_ts = curr_ts;
         FlowWakeupFlowManagerThread();
     }
index d068a7343ff04b2cea85ce3227a5ed1135e2565c..cc29ecd2364dcc76db4b6f84c041e0d2003187df 100644 (file)
@@ -300,12 +300,14 @@ int StreamTcpReassembleInit(char quiet)
     uint16_t u16 = 0;
     for (u16 = 0; u16 < segment_pool_num; u16++)
     {
+        SCMutexInit(&segment_pool_mutex[u16], NULL);
+        SCMutexLock(&segment_pool_mutex[u16]);
         segment_pool[u16] = PoolInit(segment_pool_poolsizes[u16],
                                      segment_pool_poolsizes_prealloc[u16],
                                      TcpSegmentPoolAlloc, (void *) &
                                      segment_pool_pktsizes[u16],
                                      TcpSegmentPoolFree);
-        SCMutexInit(&segment_pool_mutex[u16], NULL);
+        SCMutexUnlock(&segment_pool_mutex[u16]);
     }
 
     uint16_t idx = 0;
@@ -339,6 +341,7 @@ void StreamTcpReassembleFree(char quiet)
 {
     uint16_t u16 = 0;
     for (u16 = 0; u16 < segment_pool_num; u16++) {
+        SCMutexLock(&segment_pool_mutex[u16]);
         PoolPrintSaturation(segment_pool[u16]);
 
         if (quiet == FALSE) {
@@ -351,6 +354,7 @@ void StreamTcpReassembleFree(char quiet)
         }
         PoolFree(segment_pool[u16]);
 
+        SCMutexUnlock(&segment_pool_mutex[u16]);
         SCMutexDestroy(&segment_pool_mutex[u16]);
     }
 
index e8fb9b1c2f81ab5aa86a6f5716fd4a69927f0481..ab5090f8279da920472b034291de7abe611f0ac6 100644 (file)
@@ -502,16 +502,18 @@ void StreamTcpInitConfig(char quiet)
     stream_memuse_max = 0;
     SCSpinUnlock(&stream_memuse_spinlock);
 
+    SCMutexInit(&ssn_pool_mutex, NULL);
+    SCMutexLock(&ssn_pool_mutex);
     ssn_pool = PoolInit(stream_config.max_sessions,
                         stream_config.prealloc_sessions,
                         StreamTcpSessionPoolAlloc, NULL,
                         StreamTcpSessionPoolFree);
     if (ssn_pool == NULL) {
         SCLogError(SC_ERR_POOL_INIT, "ssn_pool is not initialized");
+        SCMutexUnlock(&ssn_pool_mutex);
         exit(EXIT_FAILURE);
     }
-
-    SCMutexInit(&ssn_pool_mutex, NULL);
+    SCMutexUnlock(&ssn_pool_mutex);
 
     StreamTcpReassembleInit(quiet);
 
@@ -525,13 +527,14 @@ void StreamTcpFreeConfig(char quiet)
 {
     StreamTcpReassembleFree(quiet);
 
+    SCMutexLock(&ssn_pool_mutex);
     if (ssn_pool != NULL) {
         PoolFree(ssn_pool);
         ssn_pool = NULL;
-    } else {
-        SCLogError(SC_ERR_POOL_EMPTY, "ssn_pool is NULL");
-        exit(EXIT_FAILURE);
     }
+    SCMutexUnlock(&ssn_pool_mutex);
+    SCMutexDestroy(&ssn_pool_mutex);
+
     SCLogDebug("ssn_pool_cnt %"PRIu64"", ssn_pool_cnt);
 
     if (!quiet) {
@@ -540,8 +543,6 @@ void StreamTcpFreeConfig(char quiet)
             stream_memuse_max, stream_memuse);
         SCSpinUnlock(&stream_memuse_spinlock);
     }
-    SCMutexDestroy(&ssn_pool_mutex);
-
     SCSpinDestroy(&stream_memuse_spinlock);
 }
 
@@ -584,8 +585,6 @@ static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
         return;
 
     ssn->state = state;
-
-    FlowUpdateQueue(p->flow);
 }
 
 /**
index 66b733d729aef5593250c81b33fa512b44dbcbdb..d7f01f7a7cf72a468f0c1fa6d99e30330dc320ac 100644 (file)
@@ -153,13 +153,18 @@ void StreamMsgPutInQueue(StreamMsgQueue *q, StreamMsg *s)
 void StreamMsgQueuesInit(void) {
     SCMutexInit(&stream_pool_memuse_mutex, NULL);
 
+    SCMutexLock(&stream_msg_pool_mutex);
     stream_msg_pool = PoolInit(0,250,StreamMsgAlloc,NULL,StreamMsgFree);
     if (stream_msg_pool == NULL)
         exit(EXIT_FAILURE); /* XXX */
+    SCMutexUnlock(&stream_msg_pool_mutex);
 }
 
 void StreamMsgQueuesDeinit(char quiet) {
+    SCMutexLock(&stream_msg_pool_mutex);
     PoolFree(stream_msg_pool);
+    SCMutexUnlock(&stream_msg_pool_mutex);
+
     SCMutexDestroy(&stream_pool_memuse_mutex);
 
     if (quiet == FALSE)
index c814388394fb533de1afb56f87530019ba626f9f..7e3c934c17e8564177deef0f0dd2bdc520eef00d 100644 (file)
@@ -1794,7 +1794,6 @@ int main(int argc, char **argv)
     TmThreadKillThreads();
     SCPerfReleaseResources();
     FlowShutdown();
-    FlowPrintQueueInfo();
     StreamTcpFreeConfig(STREAM_VERBOSE);
     HTPFreeConfig();
     HTPAtExitPrintStats();