-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
#define FLOW_DEFAULT_FLOW_PRUNE 5
+SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
+SC_ATOMIC_EXTERN(unsigned char, flow_flags);
+
+static Flow *FlowGetUsedFlow(void);
+
#ifdef FLOW_DEBUG_STATS
#define FLOW_DEBUG_STATS_PROTO_ALL 0
#define FLOW_DEBUG_STATS_PROTO_TCP 1
return NULL;
}
- /* no, so get a new one */
+ /* get a flow from the spare queue */
f = FlowDequeue(&flow_spare_q);
if (f == NULL) {
- /* If we reached the max memcap, try to clean some flows:
- * 1- first by normal timeouts
- * 2- by emergency mode timeouts
- * 3- by last time seen
- */
+ /* If we reached the max memcap, we get a used flow */
if ((SC_ATOMIC_GET(flow_memuse) + sizeof(Flow)) > flow_config.memcap) {
- uint32_t not_released = 0;
-
- SCLogDebug("We need to prune some flows(1)");
-
- /* Ok, then try to release flow_try_release flows */
- not_released = FlowPruneFlowsCnt(&p->ts, flow_config.flow_try_release);
- if (not_released == (uint32_t)flow_config.flow_try_release) {
- /* This means that none of the flows was released, so try again
- * with more agressive timeout values (emergency mode) */
-
- if ( !(flow_flags & FLOW_EMERGENCY)) {
- SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine "
- "running with FLOW_EMERGENCY bit set "
- "(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")",
- (uintmax_t)p->ts.tv_sec, (uintmax_t)p->ts.tv_usec);
- flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */
- FlowWakeupFlowManagerThread();
- }
- SCLogDebug("We need to prune some flows with emerg bit (2)");
-
- not_released = FlowPruneFlowsCnt(&p->ts, FLOW_DEFAULT_FLOW_PRUNE);
- if (not_released == (uint32_t)flow_config.flow_try_release) {
- /* Here the engine is on a real stress situation
- * Try to kill the last time seen "flow_try_release" flows
- * directly, ignoring timeouts */
- SCLogDebug("We need to KILL some flows (3)");
- not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE);
- if (not_released == (uint32_t)flow_config.flow_try_release) {
- return NULL;
- }
- }
+ /* declare state of emergency */
+ if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
+ SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
+
+ /* under high load, waking up the flow mgr each time leads
+ * to high cpu usage. Flows are not timed out much faster if
+ * we check a 1000 times a second. */
+ FlowWakeupFlowManagerThread();
}
- }
- /* now see if we can alloc a new flow */
- f = FlowAlloc();
- if (f == NULL) {
- return NULL;
- }
+ f = FlowGetUsedFlow();
- /* flow is initialized but *unlocked* */
+ /* freed a flow, but it's unlocked */
+ } else {
+ /* now see if we can alloc a new flow */
+ f = FlowAlloc();
+ if (f == NULL) {
+ return NULL;
+ }
+
+ /* flow is initialized but *unlocked* */
+ }
} else {
/* flow has been recycled before it went into the spare queue */
uint32_t key = FlowGetKey(p);
/* get our hash bucket and lock it */
FlowBucket *fb = &flow_hash[key];
- SCSpinLock(&fb->s);
+ FBLOCK_LOCK(fb);
- SCLogDebug("fb %p fb->f %p", fb, fb->f);
+ SCLogDebug("fb %p fb->head %p", fb, fb->head);
FlowHashCountIncr;
/* see if the bucket already has a flow */
- if (fb->f == NULL) {
- f = fb->f = FlowGetNew(p);
+ if (fb->head == NULL) {
+ f = FlowGetNew(p);
if (f == NULL) {
- SCSpinUnlock(&fb->s);
+ FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
return NULL;
}
/* flow is locked */
+ fb->head = f;
+ fb->tail = f;
/* got one, now lock, initialize and return */
FlowInit(f,p);
- f->flags |= FLOW_NEW_LIST;
f->fb = fb;
- FlowEnqueue(&flow_new_q[f->protomap], f);
-
- SCSpinUnlock(&fb->s);
+ FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
return f;
}
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
- f = fb->f;
+ f = fb->head;
/* see if this is the flow we are looking for */
if (FlowCompare(f, p) == 0) {
if (f == NULL) {
f = pf->hnext = FlowGetNew(p);
if (f == NULL) {
- SCSpinUnlock(&fb->s);
+ FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
return NULL;
}
+ fb->tail = f;
/* flow is locked */
/* initialize and return */
FlowInit(f,p);
-
- f->flags |= FLOW_NEW_LIST;
f->fb = fb;
- FlowEnqueue(&flow_new_q[f->protomap], f);
-
- SCSpinUnlock(&fb->s);
+ FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
return f;
}
if (FlowCompare(f, p) != 0) {
/* we found our flow, lets put it on top of the
* hash list -- this rewards active flows */
- if (f->hnext) f->hnext->hprev = f->hprev;
- if (f->hprev) f->hprev->hnext = f->hnext;
+ if (f->hnext) {
+ f->hnext->hprev = f->hprev;
+ }
+ if (f->hprev) {
+ f->hprev->hnext = f->hnext;
+ }
+ if (f == fb->tail) {
+ fb->tail = f->hprev;
+ }
- f->hnext = fb->f;
+ f->hnext = fb->head;
f->hprev = NULL;
- fb->f->hprev = f;
- fb->f = f;
+ fb->head->hprev = f;
+ fb->head = f;
/* found our flow, lock & return */
FlowIncrUsecnt(f);
SCMutexLock(&f->m);
- SCSpinUnlock(&fb->s);
+ FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
return f;
}
/* lock & return */
FlowIncrUsecnt(f);
SCMutexLock(&f->m);
- SCSpinUnlock(&fb->s);
+ FBLOCK_UNLOCK(fb);
FlowHashCountUpdate;
return f;
}
+/** \internal
+ * \brief Get a flow from the hash directly.
+ *
+ * Called in conditions where the spare queue is empty and memcap is reached.
+ *
+ * Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt
+ * is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the
+ * top each time since that would clear the top of the hash leading to longer
+ * and longer search times under high pressure (observed).
+ *
+ * \retval f flow or NULL
+ */
+static Flow *FlowGetUsedFlow(void) {
+ uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size;
+ uint32_t cnt = flow_config.hash_size;
+
+ while (cnt--) {
+ if (idx++ >= flow_config.hash_size)
+ idx = 0;
+
+ FlowBucket *fb = &flow_hash[idx];
+ if (fb == NULL)
+ continue;
+
+ if (FBLOCK_TRYLOCK(fb) != 0)
+ continue;
+
+ Flow *f = fb->tail;
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ continue;
+ }
+
+ if (SCMutexTrylock(&f->m) != 0) {
+ FBLOCK_UNLOCK(fb);
+ continue;
+ }
+
+ /** never prune a flow that is used by a packet or stream msg
+ * we are currently processing in one of the threads */
+ if (SC_ATOMIC_GET(f->use_cnt) > 0) {
+ FBLOCK_UNLOCK(fb);
+ SCMutexUnlock(&f->m);
+ continue;
+ }
+
+ /* remove from the hash */
+ if (f->hprev != NULL)
+ f->hprev->hnext = f->hnext;
+ if (f->hnext != NULL)
+ f->hnext->hprev = f->hprev;
+ if (fb->head == f)
+ fb->head = f->hnext;
+ if (fb->tail == f)
+ fb->tail = f->hprev;
+
+ f->hnext = NULL;
+ f->hprev = NULL;
+ f->fb = NULL;
+ FBLOCK_UNLOCK(fb);
+
+ FlowClearMemory (f, f->protomap);
+
+ SCMutexUnlock(&f->m);
+
+ SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
+ return f;
+ }
+
+ return NULL;
+}
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
#ifndef __FLOW_HASH_H__
#define __FLOW_HASH_H__
+/** Spinlocks or Mutex for the flow buckets. */
+//#define FBLOCK_SPIN
+#define FBLOCK_MUTEX
+
+#ifdef FBLOCK_SPIN
+ #ifdef FBLOCK_MUTEX
+ #error Cannot enable both FBLOCK_SPIN and FBLOCK_MUTEX
+ #endif
+#endif
+
/* flow hash bucket -- the hash is basically an array of these buckets.
* Each bucket contains a flow or list of flows. All these flows have
* the same hashkey (the hash is a chained hash). When doing modifications
* to the list, the entire bucket is locked. */
typedef struct FlowBucket_ {
- Flow *f;
-// SCMutex m;
+ Flow *head;
+ Flow *tail;
+#ifdef FBLOCK_MUTEX
+ SCMutex m;
+#elif defined FBLOCK_SPIN
SCSpinlock s;
+#else
+ #error Enable FBLOCK_SPIN or FBLOCK_MUTEX
+#endif
} FlowBucket;
+#ifdef FBLOCK_SPIN
+ #define FBLOCK_INIT(fb) SCSpinInit(&(fb)->s, 0)
+ #define FBLOCK_DESTROY(fb) SCSpinDestroy(&(fb)->s)
+ #define FBLOCK_LOCK(fb) SCSpinLock(&(fb)->s)
+ #define FBLOCK_TRYLOCK(fb) SCSpinTrylock(&(fb)->s)
+ #define FBLOCK_UNLOCK(fb) SCSpinUnlock(&(fb)->s)
+#elif defined FBLOCK_MUTEX
+ #define FBLOCK_INIT(fb) SCMutexInit(&(fb)->m, NULL)
+ #define FBLOCK_DESTROY(fb) SCMutexDestroy(&(fb)->m)
+ #define FBLOCK_LOCK(fb) SCMutexLock(&(fb)->m)
+ #define FBLOCK_TRYLOCK(fb) SCMutexTrylock(&(fb)->m)
+ #define FBLOCK_UNLOCK(fb) SCMutexUnlock(&(fb)->m)
+#else
+ #error Enable FBLOCK_SPIN or FBLOCK_MUTEX
+#endif
+
/* prototypes */
Flow *FlowGetFlowFromHash(Packet *);
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* \file
*
* \author Anoop Saldanha <anoopsaldanha@gmail.com>
+ * \author Victor Julien <victor@inliniac.net>
*/
#include "suricata-common.h"
/* Run mode selected at suricata.c */
extern int run_mode;
-/* 0.4 seconds */
+SC_ATOMIC_EXTERN(unsigned char, flow_flags);
+
+/* 1 seconds */
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
-/* 0.01 seconds */
+/* 0.1 seconds */
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
-#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 10000000
+#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
#define NEW_FLOW_COUNT_COND 10
+typedef struct FlowTimeoutCounters_ {
+ uint32_t new;
+ uint32_t est;
+ uint32_t clo;
+} FlowTimeoutCounters;
+
/**
* \brief Used to kill flow manager thread(s).
*
return;
}
-/** \brief Thread that manages the various queue's and removes timed out flows.
- * \param td ThreadVars casted to void ptr
+/** \internal
+ * \brief Get the flow's state
+ *
+ * \param f flow
+ *
+ * \retval state either FLOW_STATE_NEW, FLOW_STATE_ESTABLISHED or FLOW_STATE_CLOSED
+ */
+static inline int FlowGetFlowState(Flow *f) {
+ if (flow_proto[f->protomap].GetProtoState != NULL) {
+ return flow_proto[f->protomap].GetProtoState(f->protoctx);
+ } else {
+ if (f->flags & FLOW_TO_SRC_SEEN && f->flags & FLOW_TO_DST_SEEN)
+ return FLOW_STATE_ESTABLISHED;
+ else
+ return FLOW_STATE_NEW;
+ }
+}
+
+/** \internal
+ * \brief get timeout for flow
+ *
+ * \param f flow
+ * \param state flow state
+ * \param emergency bool indicating emergency mode 1 yes, 0 no
+ *
+ * \retval timeout timeout in seconds
+ */
+static inline uint32_t FlowGetFlowTimeout(Flow *f, int state, int emergency) {
+ uint32_t timeout;
+
+ if (emergency) {
+ switch(state) {
+ default:
+ case FLOW_STATE_NEW:
+ timeout = flow_proto[f->protomap].emerg_new_timeout;
+ break;
+ case FLOW_STATE_ESTABLISHED:
+ timeout = flow_proto[f->protomap].emerg_est_timeout;
+ break;
+ case FLOW_STATE_CLOSED:
+ timeout = flow_proto[f->protomap].emerg_closed_timeout;
+ break;
+ }
+ } else { /* implies no emergency */
+ switch(state) {
+ default:
+ case FLOW_STATE_NEW:
+ timeout = flow_proto[f->protomap].new_timeout;
+ break;
+ case FLOW_STATE_ESTABLISHED:
+ timeout = flow_proto[f->protomap].est_timeout;
+ break;
+ case FLOW_STATE_CLOSED:
+ timeout = flow_proto[f->protomap].closed_timeout;
+ break;
+ }
+ }
+
+ return timeout;
+}
+
+/** \internal
+ * \brief check if a flow is timed out
+ *
+ * \param f flow
+ * \param ts timestamp
+ * \param emergency bool indicating emergency mode
+ *
+ * \retval 0 not timed out
+ * \retval 1 timed out
+ */
+static int FlowManagerFlowTimeout(Flow *f, int state, struct timeval *ts, int emergency) {
+ /* set the timeout value according to the flow operating mode,
+ * flow's state and protocol.*/
+ uint32_t timeout = FlowGetFlowTimeout(f, state, emergency);
+
+ /* do the timeout check */
+ if ((int32_t)(f->lastts_sec + timeout) >= ts->tv_sec) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/** \internal
+ * \brief See if we can really discard this flow. Check use_cnt reference
+ * counter and force reassembly if necessary.
+ *
+ * \param f flow
+ * \param ts timestamp
+ * \param emergency bool indicating emergency mode
+ *
+ * \retval 0 not timed out just yet
+ * \retval 1 fully timed out, lets kill it
+ */
+static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts) {
+ /** never prune a flow that is used by a packet or stream msg
+ * we are currently processing in one of the threads */
+ if (SC_ATOMIC_GET(f->use_cnt) > 0) {
+ return 0;
+ }
+
+ int server = 0, client = 0;
+ if (FlowForceReassemblyNeedReassmbly(f, &server, &client) == 1) {
+ FlowForceReassemblyForFlowV2(f, server, client);
+ return 0;
+ }
+#ifdef DEBUG
+ /* this should not be possible */
+ BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
+#endif
+
+ return 1;
+}
+
+/**
+ * \internal
*
- * IDEAS/TODO
- * Create a 'emergency mode' in which flow handling threads can indicate
- * we are/seem to be under attack..... maybe this thread should check
- * key indicators for that like:
- * - number of flows created in the last x time
- * - avg number of pkts per flow (how?)
- * - avg flow age
+ * \brief check all flows in a hash row for timing out
*
- * Keep an eye on the spare list, alloc flows if needed...
+ * \param f last flow in the hash row
+ * \param ts timestamp
+ * \param emergency bool indicating emergency mode
+ * \param counters ptr to FlowTimeoutCounters structure
+ *
+ * \retval cnt timed out flows
+ */
+static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
+ int emergency, FlowTimeoutCounters *counters)
+{
+ uint32_t cnt = 0;
+
+ do {
+ if (SCMutexTrylock(&f->m) != 0) {
+ f = f->hprev;
+ continue;
+ }
+
+ Flow *next_flow = f->hprev;
+
+ int state = FlowGetFlowState(f);
+
+ /* timeout logic goes here */
+ if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) {
+ SCMutexUnlock(&f->m);
+ f = f->hprev;
+ continue;
+ }
+
+ /* check if the flow is fully timed out and
+ * ready to be discarded. */
+ if (FlowManagerFlowTimedOut(f, ts) == 1) {
+ /* remove from the hash */
+ if (f->hprev != NULL)
+ f->hprev->hnext = f->hnext;
+ if (f->hnext != NULL)
+ f->hnext->hprev = f->hprev;
+ if (f->fb->head == f)
+ f->fb->head = f->hnext;
+ if (f->fb->tail == f)
+ f->fb->tail = f->hprev;
+
+ f->hnext = NULL;
+ f->hprev = NULL;
+
+ FlowClearMemory (f, f->protomap);
+
+ /* no one is referring to this flow, use_cnt 0, removed from hash
+ * so we can unlock it and move it back to the spare queue. */
+ SCMutexUnlock(&f->m);
+
+ /* move to spare list */
+ FlowMoveToSpare(f);
+
+ cnt++;
+
+ switch (state) {
+ case FLOW_STATE_NEW:
+ default:
+ counters->new++;
+ break;
+ case FLOW_STATE_ESTABLISHED:
+ counters->est++;
+ break;
+ case FLOW_STATE_CLOSED:
+ counters->clo++;
+ break;
+ }
+ } else {
+ SCMutexUnlock(&f->m);
+ }
+
+ f = next_flow;
+ } while (f != NULL);
+
+ return cnt;
+}
+
+/**
+ * \brief time out flows from the hash
+ *
+ * \param ts timestamp
+ * \param try_cnt number of flows to time out max (0 is unlimited)
+ * \param counters ptr to FlowTimeoutCounters structure
+ *
+ * \retval cnt number of timed out flow
+ */
+uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounters *counters) {
+ uint32_t idx = 0;
+ uint32_t cnt = 0;
+ int emergency = 0;
+
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
+ emergency = 1;
+
+ for (idx = 0; idx < flow_config.hash_size; idx++) {
+ FlowBucket *fb = &flow_hash[idx];
+ if (fb == NULL)
+ continue;
+ if (FBLOCK_TRYLOCK(fb) != 0)
+ continue;
+
+ /* flow hash bucket is now locked */
+
+ if (fb->tail == NULL)
+ goto next;
+
+ /* we have a flow, or more than one */
+ cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters);
+
+next:
+ FBLOCK_UNLOCK(fb);
+
+ if (try_cnt > 0 && cnt >= try_cnt)
+ break;
+ }
+
+ return cnt;
+}
+
+/** \brief Thread that manages the flow table and times out flows.
+ *
+ * \param td ThreadVars casted to void ptr
+ *
+ * Keeps an eye on the spare list, alloc flows if needed...
*/
void *FlowManagerThread(void *td)
{
ThreadVars *th_v = (ThreadVars *)td;
struct timeval ts;
- uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0, nowcnt;
+ uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
int emerg = FALSE;
int prev_emerg = FALSE;
uint32_t last_sec = 0;
int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
- uint16_t flow_mgr_closing_cnt = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
+ uint16_t flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
- uint16_t flow_mgr_new_cnt = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
+ uint16_t flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
- uint16_t flow_mgr_established_cnt = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
+ uint16_t flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", th_v,
SC_PERF_TYPE_Q_NORMAL,
"NULL");
+ uint16_t flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", th_v,
+ SC_PERF_TYPE_Q_NORMAL,
+ "NULL");
uint16_t flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
{
TmThreadTestThreadUnPaused(th_v);
- if (flow_flags & FLOW_EMERGENCY) {
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
emerg = TRUE;
if (emerg == TRUE && prev_emerg == FALSE) {
/* see if we still have enough spare flows */
FlowUpdateSpareFlows();
- int i;
- closing_cnt = 0;
- new_cnt = 0;
- established_cnt = 0;
- for (i = 0; i < FLOW_PROTO_MAX; i++) {
- /* prune closing list */
- nowcnt = FlowPruneFlowQueue(&flow_close_q[i], &ts);
- if (nowcnt) {
- SCLogDebug("Pruned %" PRIu32 " closing flows...", nowcnt);
- closing_cnt += nowcnt;
- }
-
- /* prune new list */
- nowcnt = FlowPruneFlowQueue(&flow_new_q[i], &ts);
- if (nowcnt) {
- SCLogDebug("Pruned %" PRIu32 " new flows...", nowcnt);
- new_cnt += nowcnt;
- }
+ /* try to time out flows */
+ FlowTimeoutCounters counters = { 0, 0, 0, };
+ FlowTimeoutHash(&ts, 0 /* check all */, &counters);
- /* prune established list */
- nowcnt = FlowPruneFlowQueue(&flow_est_q[i], &ts);
- if (nowcnt) {
- SCLogDebug("Pruned %" PRIu32 " established flows...", nowcnt);
- established_cnt += nowcnt;
- }
- }
- SCPerfCounterAddUI64(flow_mgr_closing_cnt, th_v->sc_perf_pca, (uint64_t)closing_cnt);
- SCPerfCounterAddUI64(flow_mgr_new_cnt, th_v->sc_perf_pca, (uint64_t)new_cnt);
- SCPerfCounterAddUI64(flow_mgr_established_cnt, th_v->sc_perf_pca, (uint64_t)established_cnt);
+ SCPerfCounterAddUI64(flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
+ SCPerfCounterAddUI64(flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
+ SCPerfCounterAddUI64(flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
+ uint32_t len = 0;
+ FQLOCK_LOCK(&flow_spare_q);
+ len = flow_spare_q.len;
+ FQLOCK_UNLOCK(&flow_spare_q);
+ SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
+
/* Don't fear, FlowManagerThread is here...
* clear emergency bit if we have at least xx flows pruned. */
if (emerg == TRUE) {
- uint32_t len = 0;
-
- SCMutexLock(&flow_spare_q.mutex_q);
-
- len = flow_spare_q.len;
-
- SCMutexUnlock(&flow_spare_q.mutex_q);
-
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc);
/* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
- flow_flags &= ~FLOW_EMERGENCY;
+ SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
+
emerg = FALSE;
prev_emerg = FALSE;
+
flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
SCLogInfo("Flow emergency mode over, back to normal... unsetting"
SCCondTimedwait(&flow_manager_cond, &flow_manager_mutex, &cond_time);
SCMutexUnlock(&flow_manager_mutex);
+ SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
+
SCPerfSyncCountersIfSignalled(th_v, 0);
}
"timed out, %"PRIu32" flows in closed state", new_cnt,
established_cnt, closing_cnt);
-#ifdef FLOW_PRUNE_DEBUG
- SCLogInfo("prune_queue_lock %"PRIu64, prune_queue_lock);
- SCLogInfo("prune_queue_empty %"PRIu64, prune_queue_empty);
- SCLogInfo("prune_flow_lock %"PRIu64, prune_flow_lock);
- SCLogInfo("prune_bucket_lock %"PRIu64, prune_bucket_lock);
- SCLogInfo("prune_no_timeout %"PRIu64, prune_no_timeout);
- SCLogInfo("prune_usecnt %"PRIu64, prune_usecnt);
-#endif
-
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
}
return;
}
+
+#ifdef UNITTESTS
+
+/**
+ * \test Test the timing out of a flow with a fresh TcpSession
+ * (just initialized, no data segments) in normal mode.
+ *
+ * \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest01 (void) {
+ TcpSession ssn;
+ Flow f;
+ FlowBucket fb;
+ struct timeval ts;
+
+ FlowQueueInit(&flow_spare_q);
+
+ memset(&ssn, 0, sizeof(TcpSession));
+ memset(&f, 0, sizeof(Flow));
+ memset(&ts, 0, sizeof(ts));
+ memset(&fb, 0, sizeof(FlowBucket));
+
+ FBLOCK_INIT(&fb);
+
+ FLOW_INITIALIZE(&f);
+ f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+ TimeGet(&ts);
+ f.lastts_sec = ts.tv_sec - 5000;
+ f.protoctx = &ssn;
+ f.fb = &fb;
+
+ f.proto = IPPROTO_TCP;
+
+ int state = FlowGetFlowState(&f);
+ if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 0;
+ }
+
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+
+ FlowQueueDestroy(&flow_spare_q);
+ return 1;
+}
+
+/**
+ * \test Test the timing out of a flow with a TcpSession
+ * (with data segments) in normal mode.
+ *
+ * \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest02 (void) {
+ TcpSession ssn;
+ Flow f;
+ FlowBucket fb;
+ struct timeval ts;
+ TcpSegment seg;
+ TcpStream client;
+ uint8_t payload[3] = {0x41, 0x41, 0x41};
+
+ FlowQueueInit(&flow_spare_q);
+
+ memset(&ssn, 0, sizeof(TcpSession));
+ memset(&f, 0, sizeof(Flow));
+ memset(&fb, 0, sizeof(FlowBucket));
+ memset(&ts, 0, sizeof(ts));
+ memset(&seg, 0, sizeof(TcpSegment));
+ memset(&client, 0, sizeof(TcpSegment));
+
+ FBLOCK_INIT(&fb);
+ FLOW_INITIALIZE(&f);
+ f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+ TimeGet(&ts);
+ seg.payload = payload;
+ seg.payload_len = 3;
+ seg.next = NULL;
+ seg.prev = NULL;
+ client.seg_list = &seg;
+ ssn.client = client;
+ ssn.server = client;
+ ssn.state = TCP_ESTABLISHED;
+ f.lastts_sec = ts.tv_sec - 5000;
+ f.protoctx = &ssn;
+ f.fb = &fb;
+ f.proto = IPPROTO_TCP;
+
+ int state = FlowGetFlowState(&f);
+ if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 0;
+ }
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 1;
+
+}
+
+/**
+ * \test Test the timing out of a flow with a fresh TcpSession
+ * (just initialized, no data segments) in emergency mode.
+ *
+ * \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest03 (void) {
+ TcpSession ssn;
+ Flow f;
+ FlowBucket fb;
+ struct timeval ts;
+
+ FlowQueueInit(&flow_spare_q);
+
+ memset(&ssn, 0, sizeof(TcpSession));
+ memset(&f, 0, sizeof(Flow));
+ memset(&ts, 0, sizeof(ts));
+ memset(&fb, 0, sizeof(FlowBucket));
+
+ FBLOCK_INIT(&fb);
+ FLOW_INITIALIZE(&f);
+ f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+ TimeGet(&ts);
+ ssn.state = TCP_SYN_SENT;
+ f.lastts_sec = ts.tv_sec - 300;
+ f.protoctx = &ssn;
+ f.fb = &fb;
+ f.proto = IPPROTO_TCP;
+ f.flags |= FLOW_EMERGENCY;
+
+ int state = FlowGetFlowState(&f);
+ if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 0;
+ }
+
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 1;
+}
+
+/**
+ * \test Test the timing out of a flow with a TcpSession
+ * (with data segments) in emergency mode.
+ *
+ * \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest04 (void) {
+
+ TcpSession ssn;
+ Flow f;
+ FlowBucket fb;
+ struct timeval ts;
+ TcpSegment seg;
+ TcpStream client;
+ uint8_t payload[3] = {0x41, 0x41, 0x41};
+
+ FlowQueueInit(&flow_spare_q);
+
+ memset(&ssn, 0, sizeof(TcpSession));
+ memset(&f, 0, sizeof(Flow));
+ memset(&fb, 0, sizeof(FlowBucket));
+ memset(&ts, 0, sizeof(ts));
+ memset(&seg, 0, sizeof(TcpSegment));
+ memset(&client, 0, sizeof(TcpSegment));
+
+ FBLOCK_INIT(&fb);
+ FLOW_INITIALIZE(&f);
+ f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
+
+ TimeGet(&ts);
+ seg.payload = payload;
+ seg.payload_len = 3;
+ seg.next = NULL;
+ seg.prev = NULL;
+ client.seg_list = &seg;
+ ssn.client = client;
+ ssn.server = client;
+ ssn.state = TCP_ESTABLISHED;
+ f.lastts_sec = ts.tv_sec - 5000;
+ f.protoctx = &ssn;
+ f.fb = &fb;
+ f.proto = IPPROTO_TCP;
+ f.flags |= FLOW_EMERGENCY;
+
+ int state = FlowGetFlowState(&f);
+ if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 0;
+ }
+
+ FBLOCK_DESTROY(&fb);
+ FLOW_DESTROY(&f);
+ FlowQueueDestroy(&flow_spare_q);
+ return 1;
+}
+
+/**
+ * \test Test flow allocations when it reach memcap
+ *
+ *
+ * \retval On success it returns 1 and on failure 0.
+ */
+
+static int FlowMgrTest05 (void) {
+ int result = 0;
+
+ FlowInitConfig(FLOW_QUIET);
+ FlowConfig backup;
+ memcpy(&backup, &flow_config, sizeof(FlowConfig));
+
+ uint32_t ini = 0;
+ uint32_t end = flow_spare_q.len;
+ flow_config.memcap = 10000;
+ flow_config.prealloc = 100;
+
+ /* Let's get the flow_spare_q empty */
+ UTHBuildPacketOfFlows(ini, end, 0);
+
+ /* And now let's try to reach the memcap val */
+ while (SC_ATOMIC_GET(flow_memuse) + sizeof(Flow) < flow_config.memcap) {
+ ini = end + 1;
+ end = end + 2;
+ UTHBuildPacketOfFlows(ini, end, 0);
+ }
+
+ /* should time out normal */
+ TimeSetIncrementTime(2000);
+ ini = end + 1;
+ end = end + 2;;
+ UTHBuildPacketOfFlows(ini, end, 0);
+
+ struct timeval ts;
+ TimeGet(&ts);
+ /* try to time out flows */
+ FlowTimeoutCounters counters = { 0, 0, 0, };
+ FlowTimeoutHash(&ts, 0 /* check all */, &counters);
+
+ if (flow_spare_q.len > 0) {
+ result = 1;
+ }
+
+ memcpy(&flow_config, &backup, sizeof(FlowConfig));
+ FlowShutdown();
+
+ return result;
+}
+#endif /* UNITTESTS */
+
+/**
+ * \brief Function to register the Flow Unitests.
+ */
+void FlowMgrRegisterTests (void) {
+#ifdef UNITTESTS
+ UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession", FlowMgrTest01, 1);
+ UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments", FlowMgrTest02, 1);
+ UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession", FlowMgrTest03, 1);
+ UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments", FlowMgrTest04, 1);
+ UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap", FlowMgrTest05, 1);
+#endif /* UNITTESTS */
+}
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
#ifndef __FLOW_MANAGER_H__
#define __FLOW_MANAGER_H__
+/** flow manager scheduling condition */
SCCondT flow_manager_cond;
SCMutex flow_manager_mutex;
-
#define FlowWakeupFlowManagerThread() SCCondSignal(&flow_manager_cond)
void FlowManagerThreadSpawn(void);
void FlowKillFlowManagerThread(void);
+void FlowMgrRegisterTests (void);
#endif /* __FLOW_MANAGER_H__ */
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
/** spare/unused/prealloced flows live here */
FlowQueue flow_spare_q;
-/** Flows in the new/unreplied state live here */
-FlowQueue flow_new_q[FLOW_PROTO_MAX];
-
-/** All "established" flows live here, the top holds the
- * last recently used (lru) flow, so we can remove
- * that in case of memory problems and check it for
- * timeouts. */
-FlowQueue flow_est_q[FLOW_PROTO_MAX];
-
-/** All "closing" flows live here, the top holds the
- * last recently used (lru) flow, so we can remove
- * that in case of memory problems and check it for
- * timeouts. */
-FlowQueue flow_close_q[FLOW_PROTO_MAX];
-
FlowBucket *flow_hash;
FlowConfig flow_config;
-uint8_t flow_flags;
-
/** flow memuse counter (atomic), for enforcing memcap limit */
SC_ATOMIC_DECLARE(long long unsigned int, flow_memuse);
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
#include "util-error.h"
#include "util-debug.h"
#include "util-print.h"
-#include <string.h>
FlowQueue *FlowQueueNew() {
FlowQueue *q = (FlowQueue *)SCMalloc(sizeof(FlowQueue));
FlowQueue *FlowQueueInit (FlowQueue *q) {
if (q != NULL) {
memset(q, 0, sizeof(FlowQueue));
- SCMutexInit(&q->mutex_q, NULL);
- SCCondInit(&q->cond_q, NULL);
+ FQLOCK_INIT(q);
}
return q;
}
* \param q the flow queue to destroy
*/
void FlowQueueDestroy (FlowQueue *q) {
- SCMutexDestroy(&q->mutex_q);
- SCCondDestroy(&q->cond_q);
+ FQLOCK_DESTROY(q);
}
/**
BUG_ON(q == NULL || f == NULL);
#endif
- SCMutexLock(&q->mutex_q);
+ FQLOCK_LOCK(q);
+
/* more flows in queue */
if (q->top != NULL) {
f->lnext = q->top;
if (q->len > q->dbg_maxlen)
q->dbg_maxlen = q->len;
#endif /* DBG_PERF */
- SCMutexUnlock(&q->mutex_q);
+ FQLOCK_UNLOCK(q);
}
/**
* \retval f flow or NULL if empty list.
*/
Flow *FlowDequeue (FlowQueue *q) {
- SCMutexLock(&q->mutex_q);
+ FQLOCK_LOCK(q);
Flow *f = q->bot;
if (f == NULL) {
- SCMutexUnlock(&q->mutex_q);
+ FQLOCK_UNLOCK(q);
return NULL;
}
f->lnext = NULL;
f->lprev = NULL;
- SCMutexUnlock(&q->mutex_q);
+ FQLOCK_UNLOCK(q);
return f;
}
-/**
- * \brief Transfer a flow from one queue to another
- *
- * \param f the flow to be transfered
- * \param srcq the source queue, where the flow will be removed.
- * \param dstq the dest queue where the flow will be placed
- *
- * \note srcq and dstq must be different queues.
- */
-void FlowRequeue(Flow *f, FlowQueue *srcq, FlowQueue *dstq)
-{
-#ifdef DEBUG
- BUG_ON(srcq == NULL || dstq == NULL || srcq == dstq);
-#endif /* DEBUG */
-
- SCMutexLock(&srcq->mutex_q);
-
- /* remove from old queue */
- if (srcq->top == f)
- srcq->top = f->lnext; /* remove from queue top */
- if (srcq->bot == f)
- srcq->bot = f->lprev; /* remove from queue bot */
- if (f->lprev != NULL)
- f->lprev->lnext = f->lnext; /* remove from flow prev */
- if (f->lnext != NULL)
- f->lnext->lprev = f->lprev; /* remove from flow next */
-
-#ifdef DEBUG
- BUG_ON(srcq->len == 0);
-#endif
- if (srcq->len > 0)
- srcq->len--; /* adjust len */
-
- f->lnext = NULL;
- f->lprev = NULL;
-
- SCMutexUnlock(&srcq->mutex_q);
-
- SCMutexLock(&dstq->mutex_q);
-
- /* add to new queue (append) */
- f->lprev = dstq->bot;
- if (f->lprev != NULL)
- f->lprev->lnext = f;
- f->lnext = NULL;
- dstq->bot = f;
- if (dstq->top == NULL)
- dstq->top = f;
-
- dstq->len++;
-#ifdef DBG_PERF
- if (dstq->len > dstq->dbg_maxlen)
- dstq->dbg_maxlen = dstq->len;
-#endif /* DBG_PERF */
-
- SCMutexUnlock(&dstq->mutex_q);
-}
-
-/**
- * \brief Move flow to bottom of queue
- *
- * \param f the flow to be transfered
- * \param q the queue
- */
-void FlowRequeueMoveToBot(Flow *f, FlowQueue *q)
-{
-#ifdef DEBUG
- BUG_ON(q == NULL || f == NULL);
-#endif /* DEBUG */
-
- SCMutexLock(&q->mutex_q);
-
- /* remove from the queue */
- if (q->top == f)
- q->top = f->lnext; /* remove from queue top */
- if (q->bot == f)
- q->bot = f->lprev; /* remove from queue bot */
- if (f->lprev != NULL)
- f->lprev->lnext = f->lnext; /* remove from flow prev */
- if (f->lnext != NULL)
- f->lnext->lprev = f->lprev; /* remove from flow next */
-
- /* readd to the queue (append) */
- f->lprev = q->bot;
-
- if (f->lprev != NULL)
- f->lprev->lnext = f;
-
- f->lnext = NULL;
-
- q->bot = f;
-
- if (q->top == NULL)
- q->top = f;
-
- SCMutexUnlock(&q->mutex_q);
-}
-
/**
* \brief Transfer a flow from a queue to the spare queue
*
*
* \note spare queue needs locking
*/
-void FlowRequeueMoveToSpare(Flow *f, FlowQueue *q)
+void FlowMoveToSpare(Flow *f)
{
-#ifdef DEBUG
- BUG_ON(q == NULL || f == NULL);
-#endif /* DEBUG */
-
- /* remove from old queue */
- if (q->top == f)
- q->top = f->lnext; /* remove from queue top */
- if (q->bot == f)
- q->bot = f->lprev; /* remove from queue bot */
- if (f->lprev != NULL)
- f->lprev->lnext = f->lnext; /* remove from flow prev */
- if (f->lnext != NULL)
- f->lnext->lprev = f->lprev; /* remove from flow next */
-#ifdef DEBUG
- BUG_ON(q->len == 0);
-#endif
- if (q->len > 0)
- q->len--; /* adjust len */
-
- f->lnext = NULL;
- f->lprev = NULL;
-
/* now put it in spare */
- SCMutexLock(&flow_spare_q.mutex_q);
+ FQLOCK_LOCK(&flow_spare_q);
/* add to new queue (append) */
f->lprev = flow_spare_q.bot;
flow_spare_q.dbg_maxlen = flow_spare_q.len;
#endif /* DBG_PERF */
- SCMutexUnlock(&flow_spare_q.mutex_q);
+ FQLOCK_UNLOCK(&flow_spare_q);
}
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
#include "suricata-common.h"
#include "flow.h"
+/** Spinlocks or Mutex for the flow queues. */
+//#define FQLOCK_SPIN
+#define FQLOCK_MUTEX
+
+#ifdef FQLOCK_SPIN
+ #ifdef FQLOCK_MUTEX
+ #error Cannot enable both FQLOCK_SPIN and FQLOCK_MUTEX
+ #endif
+#endif
+
/* Define a queue for storing flows */
typedef struct FlowQueue_
{
#ifdef DBG_PERF
uint32_t dbg_maxlen;
#endif /* DBG_PERF */
- SCMutex mutex_q;
- SCCondT cond_q;
+#ifdef FQLOCK_MUTEX
+ SCMutex m;
+#elif defined FQLOCK_SPIN
+ SCSpinlock s;
+#else
+ #error Enable FQLOCK_SPIN or FQLOCK_MUTEX
+#endif
} FlowQueue;
+#ifdef FQLOCK_SPIN
+ #define FQLOCK_INIT(q) SCSpinInit(&(q)->s, 0)
+ #define FQLOCK_DESTROY(q) SCSpinDestroy(&(q)->s)
+ #define FQLOCK_LOCK(q) SCSpinLock(&(q)->s)
+ #define FQLOCK_TRYLOCK(q) SCSpinTrylock(&(q)->s)
+ #define FQLOCK_UNLOCK(q) SCSpinUnlock(&(q)->s)
+#elif defined FQLOCK_MUTEX
+ #define FQLOCK_INIT(q) SCMutexInit(&(q)->m, NULL)
+ #define FQLOCK_DESTROY(q) SCMutexDestroy(&(q)->m)
+ #define FQLOCK_LOCK(q) SCMutexLock(&(q)->m)
+ #define FQLOCK_TRYLOCK(q) SCMutexTrylock(&(q)->m)
+ #define FQLOCK_UNLOCK(q) SCMutexUnlock(&(q)->m)
+#else
+ #error Enable FQLOCK_SPIN or FQLOCK_MUTEX
+#endif
+
/* prototypes */
FlowQueue *FlowQueueNew();
FlowQueue *FlowQueueInit(FlowQueue *);
void FlowEnqueue (FlowQueue *, Flow *);
Flow *FlowDequeue (FlowQueue *);
-void FlowRequeue(Flow *, FlowQueue *, FlowQueue *);
-void FlowRequeueMoveToBot(Flow *, FlowQueue *);
-void FlowRequeueMoveToSpare(Flow *, FlowQueue *);
+void FlowMoveToSpare(Flow *);
#endif /* __FLOW_QUEUE_H__ */
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
*
* \param q The queue to process flows from.
*/
-static inline void FlowForceReassemblyForQ(FlowQueue *q)
+static inline void FlowForceReassemblyForHash(void)
{
Flow *f;
TcpSession *ssn;
int server_ok;
int tcp_needs_inspection;
- /* get the topmost flow from the QUEUE */
- f = q->top;
+ uint32_t idx = 0;
/* We use this packet just for reassembly purpose */
Packet *reassemble_p = PacketGetFromAlloc();
if (reassemble_p == NULL)
return;
- /* we need to loop through all the flows in the queue */
- while (f != NULL) {
- PACKET_RECYCLE(reassemble_p);
+ for (idx = 0; idx < flow_config.hash_size; idx++) {
+ FlowBucket *fb = &flow_hash[idx];
+ if (fb == NULL)
+ continue;
+ FBLOCK_LOCK(fb);
- SCMutexLock(&f->m);
+ /* get the topmost flow from the QUEUE */
+ f = fb->head;
- /* Get the tcp session for the flow */
- ssn = (TcpSession *)f->protoctx;
+ /* we need to loop through all the flows in the queue */
+ while (f != NULL) {
+ PACKET_RECYCLE(reassemble_p);
- /* \todo Also skip flows that shouldn't be inspected */
- if (ssn == NULL) {
- SCMutexUnlock(&f->m);
- f = f->lnext;
- continue;
- }
-
- /* ah ah! We have some unattended toserver segments */
- if ((client_ok = StreamHasUnprocessedSegments(ssn, 0)) == 1) {
- StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
+ SCMutexLock(&f->m);
- ssn->client.last_ack = (ssn->client.seg_list_tail->seq +
- ssn->client.seg_list_tail->payload_len);
+ /* Get the tcp session for the flow */
+ ssn = (TcpSession *)f->protoctx;
- FlowForceReassemblyPseudoPacketSetup(reassemble_p, 1, f, ssn, 1);
- StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
- stt->ra_ctx, ssn, &ssn->server,
- reassemble_p, NULL);
- StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
- }
- /* oh oh! We have some unattended toclient segments */
- if ((server_ok = StreamHasUnprocessedSegments(ssn, 1)) == 1) {
- StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
+ /* \todo Also skip flows that shouldn't be inspected */
+ if (ssn == NULL) {
+ SCMutexUnlock(&f->m);
+ f = f->hnext;
+ continue;
+ }
- ssn->server.last_ack = (ssn->server.seg_list_tail->seq +
- ssn->server.seg_list_tail->payload_len);
+ /* ah ah! We have some unattended toserver segments */
+ if ((client_ok = StreamHasUnprocessedSegments(ssn, 0)) == 1) {
+ StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
- FlowForceReassemblyPseudoPacketSetup(reassemble_p, 0, f, ssn, 1);
- StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
- stt->ra_ctx, ssn, &ssn->client,
- reassemble_p, NULL);
- StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
- }
+ ssn->client.last_ack = (ssn->client.seg_list_tail->seq +
+ ssn->client.seg_list_tail->payload_len);
- if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED)
- tcp_needs_inspection = 1;
- else
- tcp_needs_inspection = 0;
+ FlowForceReassemblyPseudoPacketSetup(reassemble_p, 1, f, ssn, 1);
+ StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
+ stt->ra_ctx, ssn, &ssn->server,
+ reassemble_p, NULL);
+ StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
+ }
+ /* oh oh! We have some unattended toclient segments */
+ if ((server_ok = StreamHasUnprocessedSegments(ssn, 1)) == 1) {
+ StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
+
+ ssn->server.last_ack = (ssn->server.seg_list_tail->seq +
+ ssn->server.seg_list_tail->payload_len);
+
+ FlowForceReassemblyPseudoPacketSetup(reassemble_p, 0, f, ssn, 1);
+ StreamTcpReassembleHandleSegment(stream_pseudo_pkt_stream_TV,
+ stt->ra_ctx, ssn, &ssn->client,
+ reassemble_p, NULL);
+ StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
+ }
- SCMutexUnlock(&f->m);
+ if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED)
+ tcp_needs_inspection = 1;
+ else
+ tcp_needs_inspection = 0;
- /* insert a pseudo packet in the toserver direction */
- if (client_ok || tcp_needs_inspection)
- {
- SCMutexLock(&f->m);
- Packet *p = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 1);
SCMutexUnlock(&f->m);
- if (p == NULL) {
- TmqhOutputPacketpool(NULL, reassemble_p);
- return;
- }
+ /* insert a pseudo packet in the toserver direction */
+ if (client_ok || tcp_needs_inspection)
+ {
+ SCMutexLock(&f->m);
+ Packet *p = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 1);
+ SCMutexUnlock(&f->m);
- if (stream_pseudo_pkt_detect_prev_TV != NULL) {
- stream_pseudo_pkt_detect_prev_TV->
- tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
- } else {
- TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
- while (s != NULL) {
- s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
- &s->slot_post_pq);
- s = s->slot_next;
+ if (p == NULL) {
+ TmqhOutputPacketpool(NULL, reassemble_p);
+ return;
}
- if (stream_pseudo_pkt_detect_TV != NULL) {
- stream_pseudo_pkt_detect_TV->
- tmqh_out(stream_pseudo_pkt_detect_TV, p);
- }
- }
- } /* if (ssn->client.seg_list != NULL) */
- if (server_ok || tcp_needs_inspection)
- {
- SCMutexLock(&f->m);
- Packet *p = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
- SCMutexUnlock(&f->m);
+ if (stream_pseudo_pkt_detect_prev_TV != NULL) {
+ stream_pseudo_pkt_detect_prev_TV->
+ tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
+ } else {
+ TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
+ while (s != NULL) {
+ s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
+ &s->slot_post_pq);
+ s = s->slot_next;
+ }
- if (p == NULL) {
- TmqhOutputPacketpool(NULL, reassemble_p);
- return;
- }
+ if (stream_pseudo_pkt_detect_TV != NULL) {
+ stream_pseudo_pkt_detect_TV->
+ tmqh_out(stream_pseudo_pkt_detect_TV, p);
+ }
+ }
+ } /* if (ssn->client.seg_list != NULL) */
+ if (server_ok || tcp_needs_inspection)
+ {
+ SCMutexLock(&f->m);
+ Packet *p = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
+ SCMutexUnlock(&f->m);
+
+ if (p == NULL) {
+ TmqhOutputPacketpool(NULL, reassemble_p);
+ return;
+ }
- if (stream_pseudo_pkt_detect_prev_TV != NULL) {
- stream_pseudo_pkt_detect_prev_TV->
- tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
- } else {
- TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
- while (s != NULL) {
- s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
+ if (stream_pseudo_pkt_detect_prev_TV != NULL) {
+ stream_pseudo_pkt_detect_prev_TV->
+ tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
+ } else {
+ TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
+ while (s != NULL) {
+ s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
&s->slot_post_pq);
- s = s->slot_next;
- }
+ s = s->slot_next;
+ }
- if (stream_pseudo_pkt_detect_TV != NULL) {
- stream_pseudo_pkt_detect_TV->
- tmqh_out(stream_pseudo_pkt_detect_TV, p);
+ if (stream_pseudo_pkt_detect_TV != NULL) {
+ stream_pseudo_pkt_detect_TV->
+ tmqh_out(stream_pseudo_pkt_detect_TV, p);
+ }
}
- }
- } /* if (ssn->server.seg_list != NULL) */
+ } /* if (ssn->server.seg_list != NULL) */
- /* next flow in the queue */
- f = f->lnext;
- } /* while (f != NULL) */
+ /* next flow in the queue */
+ f = f->hnext;
+ } /* while (f != NULL) */
+ FBLOCK_UNLOCK(fb);
+ }
TmqhOutputPacketpool(NULL, reassemble_p);
-
return;
}
/** ----- Part 3 ----- **/
/* Carry out flow reassembly for unattended flows */
- FlowForceReassemblyForQ(&flow_new_q[FLOW_PROTO_TCP]);
- FlowForceReassemblyForQ(&flow_est_q[FLOW_PROTO_TCP]);
- FlowForceReassemblyForQ(&flow_close_q[FLOW_PROTO_TCP]);
+ FlowForceReassemblyForHash();
return;
}
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
int FlowForceReassemblyForFlowV2(Flow *f, int server, int client);
int FlowForceReassemblyNeedReassmbly(Flow *f, int *server, int *client);
-//int FlowForceReassemblyForFlowV2(Flow *);
void FlowForceReassembly(void);
void FlowForceReassemblySetup(void);
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* \author Victor Julien <victor@inliniac.net>
*
* Flow implementation.
- *
- * \todo Maybe place the flow that we get a packet for on top of the
- * list in the bucket. This rewards active flows.
*/
#include "suricata-common.h"
#define FLOW_DEFAULT_PREALLOC 10000
+/** atomic int that is used when freeing a flow from the hash. In this
+ * case we walk the hash to find a flow to free. This var records where
+ * we left off in the hash. Without this only the top rows of the hash
+ * are freed. This isn't just about fairness. Under severe presure, the
+ * hash rows on top would be all freed and the time to find a flow to
+ * free increased with every run. */
+SC_ATOMIC_DECLARE(unsigned int, flow_prune_idx);
+
+/** atomic flags */
+SC_ATOMIC_DECLARE(unsigned char, flow_flags);
+
void FlowRegisterTests(void);
void FlowInitFlowProto();
int FlowSetProtoTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
-static int FlowClearMemory(Flow *,uint8_t );
int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *));
int FlowSetFlowStateFunc(uint8_t , int (*GetProtoState)(void *));
-static uint32_t FlowPruneFlowQueueCnt(FlowQueue *, struct timeval *, int);
-int FlowKill(FlowQueue *);
/* Run mode selected at suricata.c */
extern int run_mode;
return;
}
-/** \brief Update the flows position in the queue's
- * \param f Flow to requeue.
- * \todo if we have a flow state func rely on that soly
- *
- * In-use flows are in the flow_new_q, flow_est_q lists or flow_close_q lists.
- */
-void FlowUpdateQueue(Flow *f)
-{
- if (f->flags & FLOW_NEW_LIST) {
- /* in the new list -- we consider a flow no longer
- * new if we have seen at least 2 pkts in both ways. */
- if (f->flags & FLOW_TO_DST_SEEN && f->flags & FLOW_TO_SRC_SEEN) {
- FlowRequeue(f, &flow_new_q[f->protomap], &flow_est_q[f->protomap]);
-
- f->flags |= FLOW_EST_LIST; /* transition */
- f->flags &= ~FLOW_NEW_LIST;
- } else {
- FlowRequeueMoveToBot(f, &flow_new_q[f->protomap]);
- }
- } else if (f->flags & FLOW_EST_LIST) {
- if (flow_proto[f->protomap].GetProtoState != NULL) {
- uint8_t state = flow_proto[f->protomap].GetProtoState(f->protoctx);
- if (state == FLOW_STATE_CLOSED) {
- f->flags |= FLOW_CLOSED_LIST; /* transition */
- f->flags &=~ FLOW_EST_LIST;
-
- SCLogDebug("flow %p was put into closing queue ts %"PRIuMAX"", f, (uintmax_t)f->lastts_sec);
- FlowRequeue(f, &flow_est_q[f->protomap], &flow_close_q[f->protomap]);
- } else {
- /* Pull and put back -- this way the flows on
- * top of the list are least recently used. */
- FlowRequeueMoveToBot(f, &flow_est_q[f->protomap]);
- }
- } else {
- /* Pull and put back -- this way the flows on
- * top of the list are least recently used. */
- FlowRequeueMoveToBot(f, &flow_est_q[f->protomap]);
- }
- } else if (f->flags & FLOW_CLOSED_LIST){
- /* for the case of ssn reuse in TCP sessions we need to be able to pull
- * a flow back from the dungeon and inject adrenalin straight into it's
- * heart. */
- if (flow_proto[f->protomap].GetProtoState != NULL) {
- uint8_t state = flow_proto[f->protomap].GetProtoState(f->protoctx);
- if (state == FLOW_STATE_NEW) {
- f->flags |= FLOW_NEW_LIST; /* transition */
- f->flags &=~ FLOW_CLOSED_LIST;
-
- SCLogDebug("flow %p was put into new queue ts %"PRIuMAX"", f, (uintmax_t)f->lastts_sec);
- FlowRequeue(f, &flow_close_q[f->protomap], &flow_new_q[f->protomap]);
- } else {
- /* Pull and put back -- this way the flows on
- * top of the list are least recently used. */
- FlowRequeueMoveToBot(f, &flow_close_q[f->protomap]);
- }
- } else {
- /* Pull and put back -- this way the flows on
- * top of the list are least recently used. */
- FlowRequeueMoveToBot(f, &flow_close_q[f->protomap]);
- }
- }
-
- return;
-}
-
-#ifdef FLOW_PRUNE_DEBUG
-static uint64_t prune_queue_lock = 0;
-static uint64_t prune_queue_empty = 0;
-static uint64_t prune_flow_lock = 0;
-static uint64_t prune_bucket_lock = 0;
-static uint64_t prune_no_timeout = 0;
-static uint64_t prune_usecnt = 0;
-#endif
-
-/** \internal
- * \brief get timeout for flow
- *
- * \param f flow
- * \param emergency bool indicating emergency mode 1 yes, 0 no
- *
- * \retval timeout timeout in seconds
- */
-static inline uint32_t FlowPruneGetFlowTimeout(Flow *f, int emergency) {
- uint32_t timeout;
-
- if (emergency) {
- if (flow_proto[f->protomap].GetProtoState != NULL) {
- switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
- default:
- case FLOW_STATE_NEW:
- timeout = flow_proto[f->protomap].emerg_new_timeout;
- break;
- case FLOW_STATE_ESTABLISHED:
- timeout = flow_proto[f->protomap].emerg_est_timeout;
- break;
- case FLOW_STATE_CLOSED:
- timeout = flow_proto[f->protomap].emerg_closed_timeout;
- break;
- }
- } else {
- if (f->flags & FLOW_EST_LIST)
- timeout = flow_proto[f->protomap].emerg_est_timeout;
- else
- timeout = flow_proto[f->protomap].emerg_new_timeout;
- }
- } else { /* implies no emergency */
- if (flow_proto[f->protomap].GetProtoState != NULL) {
- switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
- default:
- case FLOW_STATE_NEW:
- timeout = flow_proto[f->protomap].new_timeout;
- break;
- case FLOW_STATE_ESTABLISHED:
- timeout = flow_proto[f->protomap].est_timeout;
- break;
- case FLOW_STATE_CLOSED:
- timeout = flow_proto[f->protomap].closed_timeout;
- break;
- }
- } else {
- if (f->flags & FLOW_EST_LIST)
- timeout = flow_proto[f->protomap].est_timeout;
- else
- timeout = flow_proto[f->protomap].new_timeout;
- }
- }
-
- return timeout;
-}
-
-/** FlowPrune
- *
- * Inspect top (last recently used) flow from the queue and see if
- * we need to prune it.
- *
- * Use trylock here so prevent us from blocking the packet handling.
- *
- * \param q Flow queue to prune
- * \param ts Current time
- * \param try_cnt Tries to prune the first try_cnt no of flows in the q
- *
- * \retval 0 on error, failed block, nothing to prune
- * \retval cnt on successfully pruned, cnt flows were pruned
- */
-static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt)
-{
- SCEnter();
- int cnt = 0;
- int try_cnt_temp = 0;
-
- int mr = SCMutexTrylock(&q->mutex_q);
- if (mr != 0) {
- SCLogDebug("trylock failed");
- if (mr == EBUSY)
- SCLogDebug("was locked");
- if (mr == EINVAL)
- SCLogDebug("bad mutex value");
-
-#ifdef FLOW_PRUNE_DEBUG
- prune_queue_lock++;
-#endif
- return 0;
- }
-
- Flow *f = q->top;
-
- /* label */
- while (f != NULL) {
- if (try_cnt != 0 && try_cnt_temp == try_cnt) {
- SCMutexUnlock(&q->mutex_q);
- return cnt;
- }
- try_cnt_temp++;
-
- if (f == NULL) {
- SCMutexUnlock(&q->mutex_q);
- SCLogDebug("top is null");
-
-#ifdef FLOW_PRUNE_DEBUG
- prune_queue_empty++;
-#endif
- return cnt;
- }
-
- if (SCMutexTrylock(&f->m) != 0) {
- SCLogDebug("cant lock 1");
-
-#ifdef FLOW_PRUNE_DEBUG
- prune_flow_lock++;
-#endif
- f = f->lnext;
- continue;
- }
-
- if (SCSpinTrylock(&f->fb->s) != 0) {
- SCMutexUnlock(&f->m);
- SCLogDebug("cant lock 2");
-
-#ifdef FLOW_PRUNE_DEBUG
- prune_bucket_lock++;
-#endif
- f = f->lnext;
- continue;
- }
-
- /*set the timeout value according to the flow operating mode, flow's state
- and protocol.*/
- uint32_t timeout = FlowPruneGetFlowTimeout(f, flow_flags & FLOW_EMERGENCY ? 1 : 0);
-
- SCLogDebug("got lock, now check: %" PRIdMAX "+%" PRIu32 "=(%" PRIdMAX ") < "
- "%" PRIdMAX "", (intmax_t)f->lastts_sec,
- timeout, (intmax_t)f->lastts_sec + timeout,
- (intmax_t)ts->tv_sec);
-
- /* do the timeout check */
- if ((int32_t)(f->lastts_sec + timeout) >= ts->tv_sec) {
- SCSpinUnlock(&f->fb->s);
- SCMutexUnlock(&f->m);
- SCMutexUnlock(&q->mutex_q);
- SCLogDebug("timeout check failed");
-
-#ifdef FLOW_PRUNE_DEBUG
- prune_no_timeout++;
-#endif
- return cnt;
- }
-
- /** never prune a flow that is used by a packet or stream msg
- * we are currently processing in one of the threads */
- if (SC_ATOMIC_GET(f->use_cnt) > 0) {
- SCLogDebug("timed out but use_cnt > 0: %"PRIu16", %p, proto %"PRIu8"", SC_ATOMIC_GET(f->use_cnt), f, f->proto);
- SCLogDebug("it is in one of the threads");
-
-#ifdef FLOW_PRUNE_DEBUG
- prune_usecnt++;
-#endif
- SCSpinUnlock(&f->fb->s);
- SCMutexUnlock(&f->m);
- f = f->lnext;
- continue;
- }
-
- int server = 0, client = 0;
- if (FlowForceReassemblyNeedReassmbly(f, &server, &client) == 1) {
- /* we no longer need the fb lock. We know this flow won't be timed
- * out just yet. So an incoming pkt is allowed to pick up this
- * flow. */
- SCSpinUnlock(&f->fb->s);
-
- FlowForceReassemblyForFlowV2(f, server, client);
- SCMutexUnlock(&f->m);
-
- f = f->lnext;
- continue;
- }
-#ifdef DEBUG
- /* this should not be possible */
- BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
-#endif
- /* remove from the hash */
- if (f->hprev != NULL)
- f->hprev->hnext = f->hnext;
- if (f->hnext != NULL)
- f->hnext->hprev = f->hprev;
- if (f->fb->f == f)
- f->fb->f = f->hnext;
-
- f->hnext = NULL;
- f->hprev = NULL;
-
- SCSpinUnlock(&f->fb->s);
- f->fb = NULL;
-
- FlowClearMemory (f, f->protomap);
- Flow *next_flow = f->lnext;
-
- /* no one is referring to this flow, use_cnt 0, removed from hash
- * so we can unlock it and move it back to the spare queue. */
- SCMutexUnlock(&f->m);
-
- /* move to spare list */
- FlowRequeueMoveToSpare(f, q);
-
- f = next_flow;
- cnt++;
- }
-
- SCMutexUnlock(&q->mutex_q);
- return cnt;
-}
-
-/** \brief Time out flows.
- * \param q flow queue to time out flows from
- * \param ts current time
- * \param timeout timeout to consider
- * \retval cnt number of flows that are timed out
- */
-uint32_t FlowPruneFlowQueue(FlowQueue *q, struct timeval *ts)
-{
- SCEnter();
- return FlowPrune(q, ts, 0);
-}
-
-/** \brief Time out flows on new/estabhlished/close queues for each proto until
- * we release cnt flows as max
- * Called from the FlowManager
- * \param ts current time
- * \retval cnt number of flows that are timed out
- */
-uint32_t FlowPruneFlowsCnt(struct timeval *ts, int cnt)
-{
- SCEnter();
- uint32_t nowcnt = 0;
- int i = 0;
-
- for (; i < FLOW_PROTO_MAX; i++) {
- /* prune closing list */
- nowcnt = FlowPruneFlowQueueCnt(&flow_close_q[i], ts, cnt);
- if (nowcnt) {
- cnt -= nowcnt;
- }
- if (cnt <= 0)
- break;
-
- /* prune new list */
- nowcnt = FlowPruneFlowQueueCnt(&flow_new_q[i], ts, cnt);
- if (nowcnt) {
- cnt -= nowcnt;
- }
- if (cnt <= 0)
- break;
-
- /* prune established list */
- nowcnt = FlowPruneFlowQueueCnt(&flow_est_q[i], ts, cnt);
- if (nowcnt) {
- cnt -= nowcnt;
- }
- if (cnt <= 0)
- break;
- }
-
- return cnt;
-}
-
-/** \brief FlowKillFlowQueueCnt It will try to kill try_cnt count of flows
- * It will return the number of flows released, and can be 0 or more.
- * \param q flow queue to time out flows from
- * \param try_cnt try to prune this number of flows
- * \retval cnt number of flows that are timed out
- */
-static uint32_t FlowKillFlowQueueCnt(FlowQueue *q, int try_cnt, uint8_t mode)
-{
- SCEnter();
- uint32_t cnt = 0;
- while (try_cnt--) {
- cnt += FlowKill(q);
- }
- SCLogDebug("EMERGENCY mode, Flows killed: %"PRIu32, cnt);
-
- return cnt;
-}
-
-/** FlowKill
- *
- * Inspect the top flows (last recently used) from the queue
- * and see if we can prune any it (this is if it's not in use).
- *
- * Use trylock here so prevent us from blocking the packet handling.
- *
- * \param q flow queue to prune
- *
- * \retval 0 on error, failed block, nothing to prune
- * \retval 1 on successfully pruned one
- */
-int FlowKill (FlowQueue *q)
-{
- SCEnter();
- int mr = SCMutexTrylock(&q->mutex_q);
-
- if (mr != 0) {
- SCLogDebug("trylock failed");
- if (mr == EBUSY)
- SCLogDebug("was locked");
- if (mr == EINVAL)
- SCLogDebug("bad mutex value");
- return 0;
- }
-
- Flow *f = q->top;
-
- /* This means that the queue is empty */
- if (f == NULL) {
- SCMutexUnlock(&q->mutex_q);
- SCLogDebug("top is null");
- return 0;
- }
-
- do {
- if (SCMutexTrylock(&f->m) != 0) {
- f = f->lnext;
- /* Skip to the next */
- continue;
- }
-
- if (SCSpinTrylock(&f->fb->s) != 0) {
- SCMutexUnlock(&f->m);
- f = f->lnext;
- continue;
- }
-
- /** never prune a flow that is used by a packet or stream msg
- * we are currently processing in one of the threads */
- if (SC_ATOMIC_GET(f->use_cnt) > 0) {
- SCSpinUnlock(&f->fb->s);
- SCMutexUnlock(&f->m);
- f = f->lnext;
- continue;
- }
-
- /* remove from the hash */
- if (f->hprev)
- f->hprev->hnext = f->hnext;
- if (f->hnext)
- f->hnext->hprev = f->hprev;
- if (f->fb->f == f)
- f->fb->f = f->hnext;
-
- f->hnext = NULL;
- f->hprev = NULL;
-
- SCSpinUnlock(&f->fb->s);
- f->fb = NULL;
-
- FlowClearMemory (f, f->protomap);
-
- /* no one is referring to this flow, use_cnt 0, removed from hash
- * so we can unlock it and move it back to the spare queue. */
- SCMutexUnlock(&f->m);
-
- /* move to spare list */
- FlowRequeueMoveToSpare(f, q);
-
- /* so.. we did it */
- /* unlock queue */
- SCMutexUnlock(&q->mutex_q);
- return 1;
- } while (f != NULL);
-
- /* If we reach this point, then we didn't prune any */
- /* unlock list */
- SCMutexUnlock(&q->mutex_q);
-
- return 0;
-}
-
-
-/** \brief Try to kill cnt flows by last recently seen activity on new/estabhlished/close queues for each proto until
- * we release cnt flows as max. Called only on emergency mode.
- * \param cnt number of flows to release
- * \retval cnt number of flows that are not killed (so 0 if we prune all of them)
- */
-uint32_t FlowKillFlowsCnt(int cnt)
-{
- SCEnter();
- uint32_t nowcnt = 0;
- int i = 0;
-
- /* Inspect the top of each protocol to select the last recently used */
- for (; i < FLOW_PROTO_MAX; i++) {
- /* prune closing list */
- nowcnt = FlowKillFlowQueueCnt(&flow_close_q[i], cnt, 0);
- if (nowcnt) {
- cnt -= nowcnt;
- }
- if (cnt <= 0)
- break;
-
- /* prune new list */
- nowcnt = FlowKillFlowQueueCnt(&flow_new_q[i], cnt, 0);
- if (nowcnt) {
- cnt -= nowcnt;
- }
- if (cnt <= 0)
- break;
-
- /* prune established list */
- nowcnt = FlowKillFlowQueueCnt(&flow_est_q[i], cnt, 0);
- if (nowcnt) {
- cnt -= nowcnt;
- }
- if (cnt <= 0)
- break;
- }
-
- return cnt;
-}
-
-/** \brief Time out flows will try to prune try_cnt count of flows
- * It will return the number of flows released, and can be 0 or more.
- * A more agressive aproach is calling this function with the emergency
- * bit set (and there will be another even more agressive, killing
- * flows without the criteria of time outs)
- * \param q flow queue to time out flows from
- * \param ts current time
- * \param timeout timeout to consider
- * \param try_cnt try to prune this number of flows if they are timed out
- * \retval cnt number of flows that are timed out
- */
-static uint32_t FlowPruneFlowQueueCnt(FlowQueue *q, struct timeval *ts, int try_cnt)
-{
- SCEnter();
- return FlowPrune(q, ts, try_cnt);
-}
-
/** \brief Make sure we have enough spare flows.
*
* Enforce the prealloc parameter, so keep at least prealloc flows in the
SCEnter();
uint32_t toalloc = 0, tofree = 0, len;
- SCMutexLock(&flow_spare_q.mutex_q);
-
+ FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
-
- SCMutexUnlock(&flow_spare_q.mutex_q);
+ FQLOCK_UNLOCK(&flow_spare_q);
if (len < flow_config.prealloc) {
toalloc = flow_config.prealloc - len;
p->flowflags |= FLOW_PKT_ESTABLISHED;
}
- /* update queue positions */
- FlowUpdateQueue(f);
-
/*set the detection bypass flags*/
if (f->flags & FLOW_NOPACKET_INSPECTION) {
SCLogDebug("setting FLOW_NOPACKET_INSPECTION flag on flow %p", f);
SCLogDebug("initializing flow engine...");
memset(&flow_config, 0, sizeof(flow_config));
+ SC_ATOMIC_INIT(flow_flags);
SC_ATOMIC_INIT(flow_memuse);
-
- int ifq = 0;
+ SC_ATOMIC_INIT(flow_prune_idx);
FlowQueueInit(&flow_spare_q);
- for (ifq = 0; ifq < FLOW_PROTO_MAX; ifq++) {
- FlowQueueInit(&flow_new_q[ifq]);
- FlowQueueInit(&flow_est_q[ifq]);
- FlowQueueInit(&flow_close_q[ifq]);
- }
unsigned int seed = RandomTimePreseed();
/* set defaults */
SCLogError(SC_ERR_FATAL, "Fatal error encountered in FlowInitConfig. Exiting...");
exit(EXIT_FAILURE);
}
- uint32_t i = 0;
-
memset(flow_hash, 0, flow_config.hash_size * sizeof(FlowBucket));
+
+ uint32_t i = 0;
for (i = 0; i < flow_config.hash_size; i++) {
- SCSpinInit(&flow_hash[i].s, 0);
+ FBLOCK_INIT(&flow_hash[i]);
}
SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket)));
/** \brief print some flow stats
* \warning Not thread safe */
-void FlowPrintQueueInfo (void)
+static void FlowPrintStats (void)
{
- int i;
- SCLogDebug("flow queue info:");
- SCLogDebug("spare flow queue %" PRIu32 "", flow_spare_q.len);
-#ifdef DBG_PERF
- SCLogDebug("flow_spare_q.dbg_maxlen %" PRIu32 "", flow_spare_q.dbg_maxlen);
-#endif
- for (i = 0; i < FLOW_PROTO_MAX; i++) {
- SCLogDebug("proto [%"PRId32"] new flow queue %" PRIu32 " "
-#ifdef DBG_PERF
- " - flow_new_q.dbg_maxlen %" PRIu32 ""
-#endif
- ,i,flow_new_q[i].len
-#ifdef DBG_PERF
- ,flow_new_q[i].dbg_maxlen
-#endif
- );
-
- SCLogDebug("proto [%"PRId32"] establised flow queue %" PRIu32 " "
-#ifdef DBG_PERF
- " - flow_est_q.dbg_maxlen %" PRIu32 ""
-#endif
- ,i,flow_est_q[i].len
-#ifdef DBG_PERF
- ,flow_est_q[i].dbg_maxlen
-#endif
- );
-
- SCLogDebug("proto [%"PRId32"] closing flow queue %" PRIu32 " "
-#ifdef DBG_PERF
- " - flow_closing_q.dbg_maxlen %" PRIu32 ""
-#endif
- ,i,flow_close_q[i].len
-#ifdef DBG_PERF
- ,flow_close_q[i].dbg_maxlen
-#endif
- );
-
- }
#ifdef FLOWBITS_STATS
SCLogInfo("flowbits added: %" PRIu32 ", removed: %" PRIu32 ", max memory usage: %" PRIu32 "",
flowbits_added, flowbits_removed, flowbits_memuse_max);
#endif /* FLOWBITS_STATS */
-
return;
}
void FlowShutdown(void)
{
Flow *f;
- int i;
uint32_t u;
+ FlowPrintStats();
+
+ /* free spare queue */
while((f = FlowDequeue(&flow_spare_q))) {
FlowFree(f);
}
- for (i = 0; i < FLOW_PROTO_MAX; i++) {
- while((f = FlowDequeue(&flow_new_q[i]))) {
- uint8_t proto_map = FlowGetProtoMapping(f->proto);
- FlowClearMemory(f, proto_map);
- FlowFree(f);
- }
- while((f = FlowDequeue(&flow_est_q[i]))) {
- uint8_t proto_map = FlowGetProtoMapping(f->proto);
- FlowClearMemory(f, proto_map);
- FlowFree(f);
- }
- while((f = FlowDequeue(&flow_close_q[i]))) {
- uint8_t proto_map = FlowGetProtoMapping(f->proto);
- FlowClearMemory(f, proto_map);
- FlowFree(f);
- }
- }
+ /* clear and free the hash */
if (flow_hash != NULL) {
/* clean up flow mutexes */
for (u = 0; u < flow_config.hash_size; u++) {
- SCSpinDestroy(&flow_hash[u].s);
+ Flow *f = flow_hash[u].head;
+ while (f) {
+ Flow *n = f->hnext;
+ uint8_t proto_map = FlowGetProtoMapping(f->proto);
+ FlowClearMemory(f, proto_map);
+ FlowFree(f);
+ f = n;
+ }
+
+ FBLOCK_DESTROY(&flow_hash[u]);
}
SCFree(flow_hash);
flow_hash = NULL;
}
SC_ATOMIC_SUB(flow_memuse, flow_config.hash_size * sizeof(FlowBucket));
-
- int ifq = 0;
FlowQueueDestroy(&flow_spare_q);
- for (ifq = 0; ifq < FLOW_PROTO_MAX; ifq++) {
- FlowQueueDestroy(&flow_new_q[ifq]);
- FlowQueueDestroy(&flow_est_q[ifq]);
- FlowQueueDestroy(&flow_close_q[ifq]);
- }
+ SC_ATOMIC_DESTROY(flow_prune_idx);
+ SC_ATOMIC_DESTROY(flow_memuse);
+ SC_ATOMIC_DESTROY(flow_flags);
return;
}
* \param proto_map mapped value of the protocol to FLOW_PROTO's.
*/
-static int FlowClearMemory(Flow* f, uint8_t proto_map)
+int FlowClearMemory(Flow* f, uint8_t proto_map)
{
SCEnter();
return 1;
}
-/**
- * \brief Function to test the prunning of the flow in different flow modes.
- *
- * \param f Pointer to the flow to be prunned
- * \param ts time value against which the flow will be checked
- *
- * \retval on success returns 1 and on failure 0
- */
-
-static int FlowTestPrune(Flow *f, struct timeval *ts) {
-
- FlowQueue *q = FlowQueueNew();
- if (q == NULL) {
- goto error;
- }
-
- q->top = NULL;
-
- FlowEnqueue(q, f);
- if (q->len != 1) {
- printf("Failed in enqueue the flow in flowqueue: ");
- goto error;
- }
-
- SCLogDebug("calling FlowPrune");
- FlowPrune(q, ts, 0);
- if (q->len != 0) {
- printf("Failed in prunning the flow: ");
- goto error;
- }
-
- if (f->protoctx != NULL){
- printf("Failed in freeing the TcpSession: ");
- goto error;
- }
-
- return 1;
-
-error:
- if (q != NULL) {
- FlowQueueDestroy(q);
- }
- return 0;
-}
-
-/**
- * \test Test the timing out of a flow with a fresh TcpSession
- * (just initialized, no data segments) in normal mode.
- *
- * \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest03 (void) {
-
- TcpSession ssn;
- Flow f;
- FlowBucket fb;
- struct timeval ts;
-
- FlowQueueInit(&flow_spare_q);
-
- memset(&ssn, 0, sizeof(TcpSession));
- memset(&f, 0, sizeof(Flow));
- memset(&ts, 0, sizeof(ts));
- memset(&fb, 0, sizeof(FlowBucket));
-
- SCSpinInit(&fb.s, 0);
-
- FLOW_INITIALIZE(&f);
- f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
- TimeGet(&ts);
- f.lastts_sec = ts.tv_sec - 5000;
- f.protoctx = &ssn;
- f.fb = &fb;
-
- f.proto = IPPROTO_TCP;
-
- if (FlowTestPrune(&f, &ts) != 1) {
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 0;
- }
-
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
-
- FlowQueueDestroy(&flow_spare_q);
- return 1;
-}
-
-/**
- * \test Test the timing out of a flow with a TcpSession
- * (with data segments) in normal mode.
- *
- * \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest04 (void) {
-
- TcpSession ssn;
- Flow f;
- FlowBucket fb;
- struct timeval ts;
- TcpSegment seg;
- TcpStream client;
- uint8_t payload[3] = {0x41, 0x41, 0x41};
-
- FlowQueueInit(&flow_spare_q);
-
- memset(&ssn, 0, sizeof(TcpSession));
- memset(&f, 0, sizeof(Flow));
- memset(&fb, 0, sizeof(FlowBucket));
- memset(&ts, 0, sizeof(ts));
- memset(&seg, 0, sizeof(TcpSegment));
- memset(&client, 0, sizeof(TcpSegment));
-
- SCSpinInit(&fb.s, 0);
- FLOW_INITIALIZE(&f);
- f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
- TimeGet(&ts);
- seg.payload = payload;
- seg.payload_len = 3;
- seg.next = NULL;
- seg.prev = NULL;
- client.seg_list = &seg;
- ssn.client = client;
- ssn.server = client;
- ssn.state = TCP_ESTABLISHED;
- f.lastts_sec = ts.tv_sec - 5000;
- f.protoctx = &ssn;
- f.fb = &fb;
- f.proto = IPPROTO_TCP;
-
- if (FlowTestPrune(&f, &ts) != 1) {
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 0;
- }
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 1;
-
-}
-
-/**
- * \test Test the timing out of a flow with a fresh TcpSession
- * (just initialized, no data segments) in emergency mode.
- *
- * \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest05 (void) {
-
- TcpSession ssn;
- Flow f;
- FlowBucket fb;
- struct timeval ts;
-
- FlowQueueInit(&flow_spare_q);
-
- memset(&ssn, 0, sizeof(TcpSession));
- memset(&f, 0, sizeof(Flow));
- memset(&ts, 0, sizeof(ts));
- memset(&fb, 0, sizeof(FlowBucket));
-
- SCSpinInit(&fb.s, 0);
- FLOW_INITIALIZE(&f);
- f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
- TimeGet(&ts);
- ssn.state = TCP_SYN_SENT;
- f.lastts_sec = ts.tv_sec - 300;
- f.protoctx = &ssn;
- f.fb = &fb;
- f.proto = IPPROTO_TCP;
- f.flags |= FLOW_EMERGENCY;
-
- if (FlowTestPrune(&f, &ts) != 1) {
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 0;
- }
-
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 1;
-}
-
-/**
- * \test Test the timing out of a flow with a TcpSession
- * (with data segments) in emergency mode.
- *
- * \retval On success it returns 1 and on failure 0.
- */
-
-static int FlowTest06 (void) {
-
- TcpSession ssn;
- Flow f;
- FlowBucket fb;
- struct timeval ts;
- TcpSegment seg;
- TcpStream client;
- uint8_t payload[3] = {0x41, 0x41, 0x41};
-
- FlowQueueInit(&flow_spare_q);
-
- memset(&ssn, 0, sizeof(TcpSession));
- memset(&f, 0, sizeof(Flow));
- memset(&fb, 0, sizeof(FlowBucket));
- memset(&ts, 0, sizeof(ts));
- memset(&seg, 0, sizeof(TcpSegment));
- memset(&client, 0, sizeof(TcpSegment));
-
- SCSpinInit(&fb.s, 0);
- FLOW_INITIALIZE(&f);
- f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
-
- TimeGet(&ts);
- seg.payload = payload;
- seg.payload_len = 3;
- seg.next = NULL;
- seg.prev = NULL;
- client.seg_list = &seg;
- ssn.client = client;
- ssn.server = client;
- ssn.state = TCP_ESTABLISHED;
- f.lastts_sec = ts.tv_sec - 5000;
- f.protoctx = &ssn;
- f.fb = &fb;
- f.proto = IPPROTO_TCP;
- f.flags |= FLOW_EMERGENCY;
-
- if (FlowTestPrune(&f, &ts) != 1) {
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 0;
- }
-
- SCSpinDestroy(&fb.s);
- FLOW_DESTROY(&f);
- FlowQueueDestroy(&flow_spare_q);
- return 1;
-
-}
-
/**
* \test Test flow allocations when it reach memcap
*
end = end + 2;;
UTHBuildPacketOfFlows(ini, end, 0);
- /* This means that the engine released 5 flows by normal timeout */
- if (flow_spare_q.len == 5)
+ /* This means that the engine entered emerg mode: should happen as easy
+ * with flow mgr activated */
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
result = 1;
memcpy(&flow_config, &backup, sizeof(FlowConfig));
UTHBuildPacketOfFlows(ini, end, 0);
/* This means that the engine released 5 flows by emergency timeout */
- if (flow_spare_q.len == 5 && (flow_flags & FLOW_EMERGENCY))
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
result = 1;
memcpy(&flow_config, &backup, sizeof(FlowConfig));
end = end + 2;
UTHBuildPacketOfFlows(ini, end, 0);
- /* This means that the engine release 5 flows by killing them */
- if (flow_spare_q.len == 5 && (flow_flags & FLOW_EMERGENCY))
+ /* engine in emerg mode */
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
result = 1;
memcpy(&flow_config, &backup, sizeof(FlowConfig));
return result;
}
+
#endif /* UNITTESTS */
/**
#ifdef UNITTESTS
UtRegisterTest("FlowTest01 -- Protocol Specific Timeouts", FlowTest01, 1);
UtRegisterTest("FlowTest02 -- Setting Protocol Specific Free Function", FlowTest02, 1);
- UtRegisterTest("FlowTest03 -- Timeout a flow having fresh TcpSession", FlowTest03, 1);
- UtRegisterTest("FlowTest04 -- Timeout a flow having TcpSession with segments", FlowTest04, 1);
- UtRegisterTest("FlowTest05 -- Timeout a flow in emergency having fresh TcpSession", FlowTest05, 1);
- UtRegisterTest("FlowTest06 -- Timeout a flow in emergency having TcpSession with segments", FlowTest06, 1);
UtRegisterTest("FlowTest07 -- Test flow Allocations when it reach memcap", FlowTest07, 1);
UtRegisterTest("FlowTest08 -- Test flow Allocations when it reach memcap", FlowTest08, 1);
UtRegisterTest("FlowTest09 -- Test flow Allocations when it reach memcap", FlowTest09, 1);
+
+ FlowMgrRegisterTests();
#endif /* UNITTESTS */
}
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
/** At least on packet from the destination address was seen */
#define FLOW_TO_DST_SEEN 0x00000002
-/** Flow lives in the flow-state-NEW list */
-#define FLOW_NEW_LIST 0x00000004
-/** Flow lives in the flow-state-EST (established) list */
-#define FLOW_EST_LIST 0x00000008
-/** Flow lives in the flow-state-CLOSED list */
-#define FLOW_CLOSED_LIST 0x00000010
+// vacany 3x
/** Flow was inspected against IP-Only sigs in the toserver direction */
#define FLOW_TOSERVER_IPONLY_SET 0x00000020
/** queue list pointers, protected by queue mutex */
struct Flow_ *lnext; /* list */
struct Flow_ *lprev;
-
struct timeval startts;
#ifdef DEBUG
uint32_t todstpktcnt;
void FlowIncrUsecnt(Flow *);
void FlowDecrUsecnt(Flow *);
-uint32_t FlowPruneFlowsCnt(struct timeval *, int);
-uint32_t FlowKillFlowsCnt(int);
-
void FlowRegisterTests (void);
int FlowSetProtoTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
struct FlowQueue_;
int FlowUpdateSpareFlows(void);
-uint32_t FlowPruneFlowQueue(struct FlowQueue_ *, struct timeval *);
static inline void FlowLockSetNoPacketInspectionFlag(Flow *);
static inline void FlowSetNoPacketInspectionFlag(Flow *);
f->flags |= FLOW_NO_APPLAYER_INSPECTION;
}
+int FlowClearMemory(Flow *,uint8_t );
#endif /* __FLOW_H__ */
SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, GET_PKT_LEN(p));
double curr_ts = p->ts.tv_sec + p->ts.tv_usec / 1000.0;
- if (curr_ts < prev_signaled_ts || (curr_ts - prev_signaled_ts) > 2.0) {
+ if (curr_ts < prev_signaled_ts || (curr_ts - prev_signaled_ts) > 60.0) {
prev_signaled_ts = curr_ts;
FlowWakeupFlowManagerThread();
}
uint16_t u16 = 0;
for (u16 = 0; u16 < segment_pool_num; u16++)
{
+ SCMutexInit(&segment_pool_mutex[u16], NULL);
+ SCMutexLock(&segment_pool_mutex[u16]);
segment_pool[u16] = PoolInit(segment_pool_poolsizes[u16],
segment_pool_poolsizes_prealloc[u16],
TcpSegmentPoolAlloc, (void *) &
segment_pool_pktsizes[u16],
TcpSegmentPoolFree);
- SCMutexInit(&segment_pool_mutex[u16], NULL);
+ SCMutexUnlock(&segment_pool_mutex[u16]);
}
uint16_t idx = 0;
{
uint16_t u16 = 0;
for (u16 = 0; u16 < segment_pool_num; u16++) {
+ SCMutexLock(&segment_pool_mutex[u16]);
PoolPrintSaturation(segment_pool[u16]);
if (quiet == FALSE) {
}
PoolFree(segment_pool[u16]);
+ SCMutexUnlock(&segment_pool_mutex[u16]);
SCMutexDestroy(&segment_pool_mutex[u16]);
}
stream_memuse_max = 0;
SCSpinUnlock(&stream_memuse_spinlock);
+ SCMutexInit(&ssn_pool_mutex, NULL);
+ SCMutexLock(&ssn_pool_mutex);
ssn_pool = PoolInit(stream_config.max_sessions,
stream_config.prealloc_sessions,
StreamTcpSessionPoolAlloc, NULL,
StreamTcpSessionPoolFree);
if (ssn_pool == NULL) {
SCLogError(SC_ERR_POOL_INIT, "ssn_pool is not initialized");
+ SCMutexUnlock(&ssn_pool_mutex);
exit(EXIT_FAILURE);
}
-
- SCMutexInit(&ssn_pool_mutex, NULL);
+ SCMutexUnlock(&ssn_pool_mutex);
StreamTcpReassembleInit(quiet);
{
StreamTcpReassembleFree(quiet);
+ SCMutexLock(&ssn_pool_mutex);
if (ssn_pool != NULL) {
PoolFree(ssn_pool);
ssn_pool = NULL;
- } else {
- SCLogError(SC_ERR_POOL_EMPTY, "ssn_pool is NULL");
- exit(EXIT_FAILURE);
}
+ SCMutexUnlock(&ssn_pool_mutex);
+ SCMutexDestroy(&ssn_pool_mutex);
+
SCLogDebug("ssn_pool_cnt %"PRIu64"", ssn_pool_cnt);
if (!quiet) {
stream_memuse_max, stream_memuse);
SCSpinUnlock(&stream_memuse_spinlock);
}
- SCMutexDestroy(&ssn_pool_mutex);
-
SCSpinDestroy(&stream_memuse_spinlock);
}
return;
ssn->state = state;
-
- FlowUpdateQueue(p->flow);
}
/**
void StreamMsgQueuesInit(void) {
SCMutexInit(&stream_pool_memuse_mutex, NULL);
+ SCMutexLock(&stream_msg_pool_mutex);
stream_msg_pool = PoolInit(0,250,StreamMsgAlloc,NULL,StreamMsgFree);
if (stream_msg_pool == NULL)
exit(EXIT_FAILURE); /* XXX */
+ SCMutexUnlock(&stream_msg_pool_mutex);
}
void StreamMsgQueuesDeinit(char quiet) {
+ SCMutexLock(&stream_msg_pool_mutex);
PoolFree(stream_msg_pool);
+ SCMutexUnlock(&stream_msg_pool_mutex);
+
SCMutexDestroy(&stream_pool_memuse_mutex);
if (quiet == FALSE)
TmThreadKillThreads();
SCPerfReleaseResources();
FlowShutdown();
- FlowPrintQueueInfo();
StreamTcpFreeConfig(STREAM_VERBOSE);
HTPFreeConfig();
HTPAtExitPrintStats();