]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Stream: use per thread ssn pool
authorVictor Julien <victor@inliniac.net>
Wed, 15 May 2013 09:13:36 +0000 (11:13 +0200)
committerVictor Julien <victor@inliniac.net>
Fri, 28 Jun 2013 09:16:08 +0000 (11:16 +0200)
Use per thread pools to store and retrieve SSN's from. Uses PoolThread
API.

Remove max-sessions setting. Pools are set to unlimited, but TCP memcap
limits the amount of sessions.

The prealloc_session settings now applies to each thread, so lowered the
default from 32k to 2k.

src/stream-tcp-private.h
src/stream-tcp.c
src/stream-tcp.h
src/util-error.c
src/util-error.h
suricata.yaml.in

index 2933e3f700596526cea8c1c4d92ddb5ee0c2a40a..dc60c742ebae2c59edccd1d723b7421883149592 100644 (file)
@@ -25,6 +25,8 @@
 #define __STREAM_TCP_PRIVATE_H__
 
 #include "decode.h"
+#include "util-pool.h"
+#include "util-pool-thread.h"
 
 #define STREAMTCP_QUEUE_FLAG_TS     0x01
 #define STREAMTCP_QUEUE_FLAG_WS     0x02
@@ -200,6 +202,7 @@ enum
 }
 
 typedef struct TcpSession_ {
+    PoolThreadReserved res;
     uint8_t state;
     uint8_t queue_len;                      /**< length of queue list below */
     uint16_t flags;
index 1aee65c480f290970fe5654986b4d0a900f89547..0c433e76acdcba1640c36d63a545c3dcc37c049d 100644 (file)
@@ -44,6 +44,7 @@
 #include "tm-threads.h"
 
 #include "util-pool.h"
+#include "util-pool-thread.h"
 #include "util-checksum.h"
 #include "util-unittest.h"
 #include "util-print.h"
@@ -71,8 +72,7 @@
 
 //#define DEBUG
 
-#define STREAMTCP_DEFAULT_SESSIONS              262144
-#define STREAMTCP_DEFAULT_PREALLOC              32768
+#define STREAMTCP_DEFAULT_PREALLOC              2048
 #define STREAMTCP_DEFAULT_MEMCAP                (32 * 1024 * 1024) /* 32mb */
 #define STREAMTCP_DEFAULT_REASSEMBLY_MEMCAP     (64 * 1024 * 1024) /* 64mb */
 #define STREAMTCP_DEFAULT_TOSERVER_CHUNK_SIZE   2560
@@ -104,8 +104,8 @@ static int StreamTcpHandleTimestamp(TcpSession * , Packet *);
 static int StreamTcpValidateRst(TcpSession * , Packet *);
 static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *);
 
-static Pool *ssn_pool = NULL;
-static SCMutex ssn_pool_mutex;
+static PoolThread *ssn_pool = NULL;
+static SCMutex ssn_pool_mutex = PTHREAD_MUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */
 #ifdef DEBUG
 static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */
 #endif
@@ -210,12 +210,12 @@ void StreamTcpSessionClear(void *ssnptr)
     ssn->toclient_smsg_head = NULL;
 
     memset(ssn, 0, sizeof(TcpSession));
-    SCMutexLock(&ssn_pool_mutex);
-    PoolReturn(ssn_pool, ssn);
+    PoolThreadReturn(ssn_pool, ssn);
 #ifdef DEBUG
+    SCMutexLock(&ssn_pool_mutex);
     ssn_pool_cnt--;
-#endif
     SCMutexUnlock(&ssn_pool_mutex);
+#endif
 
     SCReturn;
 }
@@ -337,17 +337,10 @@ void StreamTcpInitConfig(char quiet)
 
     memset(&stream_config,  0, sizeof(stream_config));
 
-    /** set config defaults */
     if ((ConfGetInt("stream.max-sessions", &value)) == 1) {
-        stream_config.max_sessions = (uint32_t)value;
-    } else {
-        if (RunmodeIsUnittests())
-            stream_config.max_sessions = 1024;
-        else
-            stream_config.max_sessions = STREAMTCP_DEFAULT_SESSIONS;
-    }
-    if (!quiet) {
-        SCLogInfo("stream \"max-sessions\": %"PRIu32"", stream_config.max_sessions);
+        SCLogWarning(SC_WARN_OPTION_OBSOLETE, "max-sessions is obsolete. "
+            "Number of concurrent sessions is now only limited by Flow and "
+            "TCP stream engine memcaps.");
     }
 
     if ((ConfGetInt("stream.prealloc-sessions", &value)) == 1) {
@@ -359,7 +352,8 @@ void StreamTcpInitConfig(char quiet)
             stream_config.prealloc_sessions = STREAMTCP_DEFAULT_PREALLOC;
     }
     if (!quiet) {
-        SCLogInfo("stream \"prealloc-sessions\": %"PRIu32"", stream_config.prealloc_sessions);
+        SCLogInfo("stream \"prealloc-sessions\": %"PRIu32" (per thread)",
+                stream_config.prealloc_sessions);
     }
 
     char *temp_stream_memcap_str;
@@ -566,21 +560,6 @@ void StreamTcpInitConfig(char quiet)
     /* init the memcap/use tracking */
     SC_ATOMIC_INIT(st_memuse);
 
-    SCMutexInit(&ssn_pool_mutex, NULL);
-    SCMutexLock(&ssn_pool_mutex);
-    ssn_pool = PoolInit(stream_config.max_sessions,
-                        stream_config.prealloc_sessions,
-                        sizeof(TcpSession),
-                        StreamTcpSessionPoolAlloc,
-                        StreamTcpSessionPoolInit, NULL,
-                        StreamTcpSessionPoolCleanup, NULL);
-    if (ssn_pool == NULL) {
-        SCLogError(SC_ERR_POOL_INIT, "ssn_pool is not initialized");
-        SCMutexUnlock(&ssn_pool_mutex);
-        exit(EXIT_FAILURE);
-    }
-    SCMutexUnlock(&ssn_pool_mutex);
-
     StreamTcpReassembleInit(quiet);
 
     /* set the default free function and flow state function
@@ -595,7 +574,7 @@ void StreamTcpFreeConfig(char quiet)
 
     SCMutexLock(&ssn_pool_mutex);
     if (ssn_pool != NULL) {
-        PoolFree(ssn_pool);
+        PoolThreadFree(ssn_pool);
         ssn_pool = NULL;
     }
     SCMutexUnlock(&ssn_pool_mutex);
@@ -611,18 +590,18 @@ void StreamTcpFreeConfig(char quiet)
  *
  *  \retval TcpSession A new TCP session with field initilaized to 0/NULL.
  */
-TcpSession *StreamTcpNewSession (Packet *p)
+TcpSession *StreamTcpNewSession (Packet *p, int id)
 {
     TcpSession *ssn = (TcpSession *)p->flow->protoctx;
 
     if (ssn == NULL) {
-        SCMutexLock(&ssn_pool_mutex);
-        p->flow->protoctx = PoolGet(ssn_pool);
+        p->flow->protoctx = PoolThreadGetById(ssn_pool, id);
 #ifdef DEBUG
+        SCMutexLock(&ssn_pool_mutex);
         if (p->flow->protoctx != NULL)
             ssn_pool_cnt++;
-#endif
         SCMutexUnlock(&ssn_pool_mutex);
+#endif
 
         ssn = (TcpSession *)p->flow->protoctx;
         if (ssn == NULL) {
@@ -770,7 +749,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
             return 0;
 
         if (ssn == NULL) {
-            ssn = StreamTcpNewSession(p);
+            ssn = StreamTcpNewSession(p, tv->id);
             if (ssn == NULL) {
                 SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
                 return -1;
@@ -844,7 +823,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
 
     } else if (p->tcph->th_flags & TH_SYN) {
         if (ssn == NULL) {
-            ssn = StreamTcpNewSession(p);
+            ssn = StreamTcpNewSession(p, tv->id);
             if (ssn == NULL) {
                 SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
                 return -1;
@@ -897,7 +876,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
             return 0;
 
         if (ssn == NULL) {
-            ssn = StreamTcpNewSession(p);
+            ssn = StreamTcpNewSession(p, tv->id);
             if (ssn == NULL) {
                 SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
                 return -1;
@@ -4521,6 +4500,34 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
 
     SCLogDebug("StreamTcp thread specific ctx online at %p, reassembly ctx %p",
                 stt, stt->ra_ctx);
+
+    int r = 0;
+    SCMutexLock(&ssn_pool_mutex);
+    if (ssn_pool == NULL)
+        ssn_pool = PoolThreadInit(1, /* thread */
+                0, /* unlimited */
+                stream_config.prealloc_sessions,
+                sizeof(TcpSession),
+                StreamTcpSessionPoolAlloc,
+                StreamTcpSessionPoolInit, NULL,
+                StreamTcpSessionPoolCleanup, NULL);
+    else {
+        /* grow ssn_pool until we have a element for our thread id */
+        do {
+            r = PoolThreadGrow(ssn_pool,
+                    0, /* unlimited */
+                    stream_config.prealloc_sessions,
+                    sizeof(TcpSession),
+                    StreamTcpSessionPoolAlloc,
+                    StreamTcpSessionPoolInit, NULL,
+                    StreamTcpSessionPoolCleanup, NULL);
+        } while (r != -1 && r < tv->id);
+        SCLogDebug("pool size %d, thread %d", PoolThreadSize(ssn_pool), tv->id);
+    }
+    SCMutexUnlock(&ssn_pool_mutex);
+    if (r < 0 || ssn_pool == NULL)
+        SCReturnInt(TM_ECODE_FAILED);
+
     SCReturnInt(TM_ECODE_OK);
 }
 
@@ -5384,7 +5391,7 @@ static int StreamTcpTest01 (void) {
 
     StreamTcpInitConfig(TRUE);
 
-    TcpSession *ssn = StreamTcpNewSession(p);
+    TcpSession *ssn = StreamTcpNewSession(p, 0);
     if (ssn == NULL) {
         printf("Session can not be allocated: ");
         goto end;
index 31b3c467b8f3fc498888394765fab4c003a5a506..095461ff8532f3ae1c016c3a9a80c95de989b761 100644 (file)
@@ -48,8 +48,7 @@ typedef struct TcpStreamCnf_ {
     uint64_t memcap;
     uint64_t reassembly_memcap; /**< max memory usage for stream reassembly */
 
-    uint32_t max_sessions;
-    uint32_t prealloc_sessions;
+    uint32_t prealloc_sessions; /**< ssns to prealloc per stream thread */
     int midstream;
     int async_oneside;
     uint32_t reassembly_depth;  /**< Depth until when we reassemble the stream */
index d500f74c1b3fe8bb29e29dc448fafcee22d50dd3..9990474ab2dd2caa67bffa21b11886dffb7d380f 100644 (file)
@@ -273,6 +273,7 @@ const char * SCErrorToString(SCError err)
         CASE_CODE (SC_ERR_MAGIC_LOAD);
         CASE_CODE (SC_ERR_CUDA_BUFFER_ERROR);
         CASE_CODE (SC_ERR_DNS_LOG_GENERIC);
+        CASE_CODE (SC_WARN_OPTION_OBSOLETE);
     }
 
     return "UNKNOWN_ERROR";
index d91adf096645632acbd78e6244f9db8e45292492..411ee3e970ee83eb29e1f2cb9df0921b5051fc13 100644 (file)
@@ -262,6 +262,7 @@ typedef enum {
     SC_WARN_UNCOMMON,
     SC_ERR_CUDA_BUFFER_ERROR,
     SC_ERR_DNS_LOG_GENERIC,
+    SC_WARN_OPTION_OBSOLETE,
 } SCError;
 
 const char *SCErrorToString(SCError);
index 1931fd4777908ea9f2d210c3164fbe672b92d0fc..77891008b38b3147044aecfc643f2987fb442bcd 100644 (file)
@@ -587,8 +587,7 @@ flow-timeouts:
 #                               # of checksum. You can control the handling of checksum
 #                               # on a per-interface basis via the 'checksum-checks'
 #                               # option
-#   max-sessions: 262144        # 256k concurrent sessions
-#   prealloc-sessions: 32768    # 32k sessions prealloc'd
+#   prealloc-sessions: 2k       # 2k sessions prealloc'd per stream thread
 #   midstream: false            # don't allow midstream session pickups
 #   async-oneside: false        # don't enable async stream handling
 #   inline: no                  # stream inline mode