#include "tm-threads.h"
#include "util-pool.h"
+#include "util-pool-thread.h"
#include "util-checksum.h"
#include "util-unittest.h"
#include "util-print.h"
//#define DEBUG
-#define STREAMTCP_DEFAULT_SESSIONS 262144
-#define STREAMTCP_DEFAULT_PREALLOC 32768
+#define STREAMTCP_DEFAULT_PREALLOC 2048
#define STREAMTCP_DEFAULT_MEMCAP (32 * 1024 * 1024) /* 32mb */
#define STREAMTCP_DEFAULT_REASSEMBLY_MEMCAP (64 * 1024 * 1024) /* 64mb */
#define STREAMTCP_DEFAULT_TOSERVER_CHUNK_SIZE 2560
static int StreamTcpValidateRst(TcpSession * , Packet *);
static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *);
-static Pool *ssn_pool = NULL;
-static SCMutex ssn_pool_mutex;
+static PoolThread *ssn_pool = NULL;
+static SCMutex ssn_pool_mutex = PTHREAD_MUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */
#ifdef DEBUG
static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */
#endif
ssn->toclient_smsg_head = NULL;
memset(ssn, 0, sizeof(TcpSession));
- SCMutexLock(&ssn_pool_mutex);
- PoolReturn(ssn_pool, ssn);
+ PoolThreadReturn(ssn_pool, ssn);
#ifdef DEBUG
+ SCMutexLock(&ssn_pool_mutex);
ssn_pool_cnt--;
-#endif
SCMutexUnlock(&ssn_pool_mutex);
+#endif
SCReturn;
}
memset(&stream_config, 0, sizeof(stream_config));
- /** set config defaults */
if ((ConfGetInt("stream.max-sessions", &value)) == 1) {
- stream_config.max_sessions = (uint32_t)value;
- } else {
- if (RunmodeIsUnittests())
- stream_config.max_sessions = 1024;
- else
- stream_config.max_sessions = STREAMTCP_DEFAULT_SESSIONS;
- }
- if (!quiet) {
- SCLogInfo("stream \"max-sessions\": %"PRIu32"", stream_config.max_sessions);
+ SCLogWarning(SC_WARN_OPTION_OBSOLETE, "max-sessions is obsolete. "
+ "Number of concurrent sessions is now only limited by Flow and "
+ "TCP stream engine memcaps.");
}
if ((ConfGetInt("stream.prealloc-sessions", &value)) == 1) {
stream_config.prealloc_sessions = STREAMTCP_DEFAULT_PREALLOC;
}
if (!quiet) {
- SCLogInfo("stream \"prealloc-sessions\": %"PRIu32"", stream_config.prealloc_sessions);
+ SCLogInfo("stream \"prealloc-sessions\": %"PRIu32" (per thread)",
+ stream_config.prealloc_sessions);
}
char *temp_stream_memcap_str;
/* init the memcap/use tracking */
SC_ATOMIC_INIT(st_memuse);
- SCMutexInit(&ssn_pool_mutex, NULL);
- SCMutexLock(&ssn_pool_mutex);
- ssn_pool = PoolInit(stream_config.max_sessions,
- stream_config.prealloc_sessions,
- sizeof(TcpSession),
- StreamTcpSessionPoolAlloc,
- StreamTcpSessionPoolInit, NULL,
- StreamTcpSessionPoolCleanup, NULL);
- if (ssn_pool == NULL) {
- SCLogError(SC_ERR_POOL_INIT, "ssn_pool is not initialized");
- SCMutexUnlock(&ssn_pool_mutex);
- exit(EXIT_FAILURE);
- }
- SCMutexUnlock(&ssn_pool_mutex);
-
StreamTcpReassembleInit(quiet);
/* set the default free function and flow state function
SCMutexLock(&ssn_pool_mutex);
if (ssn_pool != NULL) {
- PoolFree(ssn_pool);
+ PoolThreadFree(ssn_pool);
ssn_pool = NULL;
}
SCMutexUnlock(&ssn_pool_mutex);
*
* \retval TcpSession A new TCP session with field initilaized to 0/NULL.
*/
-TcpSession *StreamTcpNewSession (Packet *p)
+TcpSession *StreamTcpNewSession (Packet *p, int id)
{
TcpSession *ssn = (TcpSession *)p->flow->protoctx;
if (ssn == NULL) {
- SCMutexLock(&ssn_pool_mutex);
- p->flow->protoctx = PoolGet(ssn_pool);
+ p->flow->protoctx = PoolThreadGetById(ssn_pool, id);
#ifdef DEBUG
+ SCMutexLock(&ssn_pool_mutex);
if (p->flow->protoctx != NULL)
ssn_pool_cnt++;
-#endif
SCMutexUnlock(&ssn_pool_mutex);
+#endif
ssn = (TcpSession *)p->flow->protoctx;
if (ssn == NULL) {
return 0;
if (ssn == NULL) {
- ssn = StreamTcpNewSession(p);
+ ssn = StreamTcpNewSession(p, tv->id);
if (ssn == NULL) {
SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
return -1;
} else if (p->tcph->th_flags & TH_SYN) {
if (ssn == NULL) {
- ssn = StreamTcpNewSession(p);
+ ssn = StreamTcpNewSession(p, tv->id);
if (ssn == NULL) {
SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
return -1;
return 0;
if (ssn == NULL) {
- ssn = StreamTcpNewSession(p);
+ ssn = StreamTcpNewSession(p, tv->id);
if (ssn == NULL) {
SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca);
return -1;
SCLogDebug("StreamTcp thread specific ctx online at %p, reassembly ctx %p",
stt, stt->ra_ctx);
+
+ int r = 0;
+ SCMutexLock(&ssn_pool_mutex);
+ if (ssn_pool == NULL)
+ ssn_pool = PoolThreadInit(1, /* thread */
+ 0, /* unlimited */
+ stream_config.prealloc_sessions,
+ sizeof(TcpSession),
+ StreamTcpSessionPoolAlloc,
+ StreamTcpSessionPoolInit, NULL,
+ StreamTcpSessionPoolCleanup, NULL);
+ else {
+ /* grow ssn_pool until we have a element for our thread id */
+ do {
+ r = PoolThreadGrow(ssn_pool,
+ 0, /* unlimited */
+ stream_config.prealloc_sessions,
+ sizeof(TcpSession),
+ StreamTcpSessionPoolAlloc,
+ StreamTcpSessionPoolInit, NULL,
+ StreamTcpSessionPoolCleanup, NULL);
+ } while (r != -1 && r < tv->id);
+ SCLogDebug("pool size %d, thread %d", PoolThreadSize(ssn_pool), tv->id);
+ }
+ SCMutexUnlock(&ssn_pool_mutex);
+ if (r < 0 || ssn_pool == NULL)
+ SCReturnInt(TM_ECODE_FAILED);
+
SCReturnInt(TM_ECODE_OK);
}
StreamTcpInitConfig(TRUE);
- TcpSession *ssn = StreamTcpNewSession(p);
+ TcpSession *ssn = StreamTcpNewSession(p, 0);
if (ssn == NULL) {
printf("Session can not be allocated: ");
goto end;