]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
stream: reduce pool locking overhead
authorVictor Julien <vjulien@oisf.net>
Fri, 23 Sep 2022 20:54:52 +0000 (22:54 +0200)
committerVictor Julien <vjulien@oisf.net>
Sat, 1 Oct 2022 18:27:38 +0000 (20:27 +0200)
Add thread local cache to avoid locking overhead for ssns and segments.

A thread will return segments/ssns to a local cache first, and if that
is full, to a return queue where the actual return to the pool returns
a batch, to amortize locking overhead.

Adds segment and session pool/cache counters to see where how effective
the cache is.

12 files changed:
src/Makefile.am
src/flow-manager.c
src/stream-tcp-cache.c [new file with mode: 0644]
src/stream-tcp-cache.h [new file with mode: 0644]
src/stream-tcp-private.h
src/stream-tcp-reassemble.c
src/stream-tcp-reassemble.h
src/stream-tcp.c
src/stream-tcp.h
src/tests/stream-tcp.c
src/util-pool-thread.c
src/util-pool-thread.h

index f348e89f2264cc9fc53a7aca85853e24a550c5bb..799a8bd972808a4d38275ff4218f3410c9eb2fd2 100755 (executable)
@@ -477,6 +477,7 @@ noinst_HEADERS = \
        source-windivert-prototypes.h \
        stream.h \
        stream-tcp.h \
+       stream-tcp-cache.h \
        stream-tcp-inline.h \
        stream-tcp-list.h \
        stream-tcp-private.h \
@@ -1076,6 +1077,7 @@ libsuricata_c_a_SOURCES = \
        source-windivert.c \
        stream.c \
        stream-tcp.c \
+       stream-tcp-cache.c \
        stream-tcp-inline.c \
        stream-tcp-list.c \
        stream-tcp-reassemble.c \
index 23ab9e801a412242259d0c028f462c8ebff03c90..b26542706b74b3cc5d92c0204d9fdaba1a3c18cb 100644 (file)
@@ -715,6 +715,7 @@ static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void *
 
 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
 {
+    StreamTcpThreadCacheCleanup();
     PacketPoolDestroy();
     SCFree(data);
     return TM_ECODE_OK;
@@ -1022,6 +1023,8 @@ static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void
 
 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
 {
+    StreamTcpThreadCacheCleanup();
+
     FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
     if (ftd->output_thread_data != NULL)
         OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
diff --git a/src/stream-tcp-cache.c b/src/stream-tcp-cache.c
new file mode 100644 (file)
index 0000000..b74d033
--- /dev/null
@@ -0,0 +1,196 @@
+/* Copyright (C) 2007-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ */
+
+#include "suricata-common.h"
+#include "stream-tcp-private.h"
+#include "stream-tcp-cache.h"
+
+typedef struct TcpPoolCache {
+    bool cache_enabled; /**< cache should only be enabled for worker threads */
+    TcpSegment *segs_cache[64];
+    uint32_t segs_cache_idx;
+    uint32_t segs_returns_idx;
+    TcpSegment *segs_returns[64];
+
+    TcpSession *ssns_cache[64];
+    uint32_t ssns_cache_idx;
+    uint32_t ssns_returns_idx;
+    TcpSession *ssns_returns[64];
+} TcpPoolCache;
+
+static thread_local TcpPoolCache tcp_pool_cache;
+extern PoolThread *ssn_pool;
+extern PoolThread *segment_thread_pool;
+
+/** \brief enable segment cache. Should only be done for worker threads */
+void StreamTcpThreadCacheEnable(void)
+{
+    tcp_pool_cache.cache_enabled = true;
+}
+
+void StreamTcpThreadCacheReturnSegment(TcpSegment *seg)
+{
+    SCEnter();
+#ifdef UNITTESTS
+    if (RunmodeIsUnittests()) {
+        PoolThreadReturn(segment_thread_pool, seg);
+        SCReturn;
+    }
+#endif
+
+    /* cache can have segs from any pool id */
+    if (tcp_pool_cache.cache_enabled && tcp_pool_cache.segs_cache_idx < 64) {
+        tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx++] = seg;
+    } else {
+        /* segs_returns should only have a single pool id. If ours is different,
+         * flush it. */
+        bool flush = false;
+        if (tcp_pool_cache.segs_returns_idx &&
+                tcp_pool_cache.segs_returns[0]->pool_id != seg->pool_id) {
+            flush = true;
+        }
+        if (tcp_pool_cache.segs_returns_idx == 64) {
+            flush = true;
+        }
+
+        if (flush) {
+            PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id;
+            PoolThreadLock(segment_thread_pool, pool_id);
+            for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) {
+                TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i];
+                PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg);
+            }
+            PoolThreadUnlock(segment_thread_pool, pool_id);
+            tcp_pool_cache.segs_returns_idx = 0;
+        }
+
+        tcp_pool_cache.segs_returns[tcp_pool_cache.segs_returns_idx++] = seg;
+    }
+}
+
+void StreamTcpThreadCacheReturnSession(TcpSession *ssn)
+{
+    SCEnter();
+#ifdef UNITTESTS
+    if (RunmodeIsUnittests()) {
+        PoolThreadReturn(ssn_pool, ssn);
+        SCReturn;
+    }
+#endif
+
+    /* cache can have ssns from any pool id */
+    if (tcp_pool_cache.cache_enabled && tcp_pool_cache.ssns_cache_idx < 64) {
+        tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx++] = ssn;
+    } else {
+        /* ssns_returns should only have a single pool id. If ours is different,
+         * flush it. */
+        bool flush = false;
+        if (tcp_pool_cache.ssns_returns_idx &&
+                tcp_pool_cache.ssns_returns[0]->pool_id != ssn->pool_id) {
+            flush = true;
+        }
+        if (tcp_pool_cache.ssns_returns_idx == 64) {
+            flush = true;
+        }
+
+        if (flush) {
+            PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id;
+            PoolThreadLock(ssn_pool, pool_id);
+            for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) {
+                TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i];
+                PoolThreadReturnRaw(ssn_pool, pool_id, ret_ssn);
+            }
+            PoolThreadUnlock(ssn_pool, pool_id);
+            tcp_pool_cache.ssns_returns_idx = 0;
+        }
+
+        tcp_pool_cache.ssns_returns[tcp_pool_cache.ssns_returns_idx++] = ssn;
+    }
+    SCReturn;
+}
+
+void StreamTcpThreadCacheCleanup(void)
+{
+    SCEnter();
+
+    /* segments */
+    SCLogDebug("tcp_pool_cache.segs_cache_idx %u", tcp_pool_cache.segs_cache_idx);
+    for (uint32_t i = 0; i < tcp_pool_cache.segs_cache_idx; i++) {
+        PoolThreadReturn(segment_thread_pool, tcp_pool_cache.segs_cache[i]);
+    }
+    tcp_pool_cache.segs_cache_idx = 0;
+
+    SCLogDebug("tcp_pool_cache.segs_returns_idx %u", tcp_pool_cache.segs_returns_idx);
+    if (tcp_pool_cache.segs_returns_idx) {
+        PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id;
+        PoolThreadLock(segment_thread_pool, pool_id);
+        for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) {
+            TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i];
+            PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg);
+        }
+        PoolThreadUnlock(segment_thread_pool, pool_id);
+        tcp_pool_cache.segs_returns_idx = 0;
+    }
+
+    /* sessions */
+    SCLogDebug("tcp_pool_cache.ssns_cache_idx %u", tcp_pool_cache.ssns_cache_idx);
+    for (uint32_t i = 0; i < tcp_pool_cache.ssns_cache_idx; i++) {
+        PoolThreadReturn(segment_thread_pool, tcp_pool_cache.ssns_cache[i]);
+    }
+    tcp_pool_cache.ssns_cache_idx = 0;
+
+    SCLogDebug("tcp_pool_cache.ssns_returns_idx %u", tcp_pool_cache.ssns_returns_idx);
+    if (tcp_pool_cache.ssns_returns_idx) {
+        PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id;
+        PoolThreadLock(segment_thread_pool, pool_id);
+        for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) {
+            TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i];
+            PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_ssn);
+        }
+        PoolThreadUnlock(segment_thread_pool, pool_id);
+        tcp_pool_cache.ssns_returns_idx = 0;
+    }
+
+    SCReturn;
+}
+
+TcpSegment *StreamTcpThreadCacheGetSegment(void)
+{
+    if (tcp_pool_cache.segs_cache_idx) {
+        TcpSegment *seg = tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx - 1];
+        tcp_pool_cache.segs_cache_idx--;
+        memset(&seg->sbseg, 0, sizeof(seg->sbseg));
+        return seg;
+    }
+    return NULL;
+}
+
+TcpSession *StreamTcpThreadCacheGetSession(void)
+{
+    if (tcp_pool_cache.ssns_cache_idx) {
+        TcpSession *ssn = tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx - 1];
+        tcp_pool_cache.ssns_cache_idx--;
+        return ssn;
+    }
+    return NULL;
+}
diff --git a/src/stream-tcp-cache.h b/src/stream-tcp-cache.h
new file mode 100644 (file)
index 0000000..1a61532
--- /dev/null
@@ -0,0 +1,39 @@
+/* Copyright (C) 2007-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ */
+
+#ifndef __STREAM_TCP_CACHE_H__
+#define __STREAM_TCP_CACHE_H__
+
+#include "suricata.h"
+#include "flow.h"
+#include "stream-tcp-private.h"
+
+void StreamTcpThreadCacheEnable(void);
+void StreamTcpThreadCacheReturnSegment(TcpSegment *seg);
+void StreamTcpThreadCacheReturnSession(TcpSession *ssn);
+void StreamTcpThreadCacheCleanup(void);
+
+TcpSegment *StreamTcpThreadCacheGetSegment(void);
+TcpSession *StreamTcpThreadCacheGetSession(void);
+
+#endif /* __STREAM_TCP_CACHE_H__ */
index cf7f882eeb6c69edde5293cfd480cbde97ffcb27..e817533d7a1d22f2eedaec93c8c14fe4b40ca8d7 100644 (file)
@@ -70,7 +70,7 @@ typedef struct TcpSegmentPcapHdrStorage_ {
 } TcpSegmentPcapHdrStorage;
 
 typedef struct TcpSegment {
-    PoolThreadReserved res;
+    PoolThreadId pool_id;
     uint16_t payload_len;       /**< actual size of the payload */
     uint32_t seq;
     RB_ENTRY(TcpSegment) __attribute__((__packed__)) rb;
@@ -269,7 +269,7 @@ enum TcpState {
 }
 
 typedef struct TcpSession_ {
-    PoolThreadReserved res;
+    PoolThreadId pool_id;
     uint8_t state:4;                        /**< tcp state from state enum */
     uint8_t pstate:4;                       /**< previous state */
     uint8_t queue_len;                      /**< length of queue list below */
index 2a26ae6568ef99071114ffcc1c30111467f85c5d..5f4032cc30c4cd7011576bfe17933408f4a756a0 100644 (file)
@@ -48,6 +48,7 @@
 
 #include "stream-tcp.h"
 #include "stream-tcp-private.h"
+#include "stream-tcp-cache.h"
 #include "stream-tcp-reassemble.h"
 #include "stream-tcp-inline.h"
 #include "stream-tcp-list.h"
@@ -76,7 +77,7 @@ static uint64_t segment_pool_memcnt = 0;
 
 thread_local uint64_t t_pcapcnt = UINT64_MAX;
 
-static PoolThread *segment_thread_pool = NULL;
+PoolThread *segment_thread_pool = NULL;
 /* init only, protect initializing and growing pool */
 static SCMutex segment_thread_pool_mutex = SCMUTEX_INITIALIZER;
 
@@ -374,7 +375,7 @@ void StreamTcpSegmentReturntoPool(TcpSegment *seg)
         seg->pcap_hdr_storage->pktlen = 0;
     }
 
-    PoolThreadReturn(segment_thread_pool, seg);
+    StreamTcpThreadCacheReturnSegment(seg);
 }
 
 /**
@@ -565,6 +566,8 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(ThreadVars *tv)
 void StreamTcpReassembleFreeThreadCtx(TcpReassemblyThreadCtx *ra_ctx)
 {
     SCEnter();
+    StreamTcpThreadCacheCleanup();
+
     if (ra_ctx) {
         AppLayerDestroyCtxThread(ra_ctx->app_tctx);
         SCFree(ra_ctx);
@@ -2036,7 +2039,14 @@ int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_
  */
 TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx)
 {
-    TcpSegment *seg = (TcpSegment *)PoolThreadGetById(
+    TcpSegment *seg = StreamTcpThreadCacheGetSegment();
+    if (seg) {
+        StatsIncr(tv, ra_ctx->counter_tcp_segment_from_cache);
+        memset(&seg->sbseg, 0, sizeof(seg->sbseg));
+        return seg;
+    }
+
+    seg = (TcpSegment *)PoolThreadGetById(
             segment_thread_pool, (uint16_t)ra_ctx->segment_thread_pool_id);
     SCLogDebug("seg we return is %p", seg);
     if (seg == NULL) {
@@ -2045,6 +2055,7 @@ TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx)
         StatsIncr(tv, ra_ctx->counter_tcp_segment_memcap);
     } else {
         memset(&seg->sbseg, 0, sizeof(seg->sbseg));
+        StatsIncr(tv, ra_ctx->counter_tcp_segment_from_pool);
     }
 
     return seg;
index 11bdd63851f59e5557655649a9ee9a4742001384..4135db039999e69f9d77eca3d9f496ca684f45ee 100644 (file)
@@ -63,6 +63,10 @@ typedef struct TcpReassemblyThreadCtx_ {
 
     /** TCP segments which are not being reassembled due to memcap was reached */
     uint16_t counter_tcp_segment_memcap;
+
+    uint16_t counter_tcp_segment_from_cache;
+    uint16_t counter_tcp_segment_from_pool;
+
     /** number of streams that stop reassembly because their depth is reached */
     uint16_t counter_tcp_stream_depth;
     /** count number of streams with a unrecoverable stream gap (missing pkts) */
index e487d6796ac9a58ad74e5f7f3d17b959f6f81013..1430e2f9e46143f484a1d9496e6717eb407d6545 100644 (file)
 #include "util-device.h"
 
 #include "stream-tcp-private.h"
-#include "stream-tcp-reassemble.h"
 #include "stream-tcp.h"
+#include "stream-tcp-cache.h"
 #include "stream-tcp-inline.h"
+#include "stream-tcp-reassemble.h"
 #include "stream-tcp-sack.h"
 #include "stream-tcp-util.h"
 #include "stream.h"
@@ -104,7 +105,7 @@ static int StreamTcpStateDispatch(ThreadVars *tv, Packet *p,
 extern thread_local uint64_t t_pcapcnt;
 extern int g_detect_disabled;
 
-static PoolThread *ssn_pool = NULL;
+PoolThread *ssn_pool = NULL;
 static SCMutex ssn_pool_mutex = SCMUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */
 #ifdef DEBUG
 static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */
@@ -250,11 +251,11 @@ void StreamTcpSessionClear(void *ssnptr)
     StreamTcpSessionCleanup(ssn);
 
     /* HACK: don't loose track of thread id */
-    PoolThreadReserved a = ssn->res;
+    PoolThreadId pool_id = ssn->pool_id;
     memset(ssn, 0, sizeof(TcpSession));
-    ssn->res = a;
+    ssn->pool_id = pool_id;
 
-    PoolThreadReturn(ssn_pool, ssn);
+    StreamTcpThreadCacheReturnSession(ssn);
 #ifdef DEBUG
     SCMutexLock(&ssn_pool_mutex);
     ssn_pool_cnt--;
@@ -688,13 +689,26 @@ void StreamTcpFreeConfig(bool quiet)
  *
  *  \retval ssn new TCP session.
  */
-static TcpSession *StreamTcpNewSession (Packet *p, int id)
+static TcpSession *StreamTcpNewSession(ThreadVars *tv, StreamTcpThread *stt, Packet *p, int id)
 {
     TcpSession *ssn = (TcpSession *)p->flow->protoctx;
 
     if (ssn == NULL) {
         DEBUG_VALIDATE_BUG_ON(id < 0 || id > UINT16_MAX);
-        p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id);
+        p->flow->protoctx = StreamTcpThreadCacheGetSession();
+        if (p->flow->protoctx != NULL) {
+#ifdef UNITTESTS
+            if (tv)
+#endif
+                StatsIncr(tv, stt->counter_tcp_ssn_from_cache);
+        } else {
+            p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id);
+            if (p->flow->protoctx != NULL)
+#ifdef UNITTESTS
+                if (tv)
+#endif
+                    StatsIncr(tv, stt->counter_tcp_ssn_from_pool);
+        }
 #ifdef DEBUG
         SCMutexLock(&ssn_pool_mutex);
         if (p->flow->protoctx != NULL)
@@ -940,7 +954,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
         SCLogDebug("midstream picked up");
 
         if (ssn == NULL) {
-            ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
+            ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
             if (ssn == NULL) {
                 StatsIncr(tv, stt->counter_tcp_ssn_memcap);
                 return -1;
@@ -1033,7 +1047,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
 
     } else if (p->tcph->th_flags & TH_SYN) {
         if (ssn == NULL) {
-            ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
+            ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
             if (ssn == NULL) {
                 StatsIncr(tv, stt->counter_tcp_ssn_memcap);
                 return -1;
@@ -1113,7 +1127,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
         SCLogDebug("midstream picked up");
 
         if (ssn == NULL) {
-            ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
+            ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
             if (ssn == NULL) {
                 StatsIncr(tv, stt->counter_tcp_ssn_memcap);
                 return -1;
@@ -5394,12 +5408,15 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
         SCReturnInt(TM_ECODE_FAILED);
     memset(stt, 0, sizeof(StreamTcpThread));
     stt->ssn_pool_id = -1;
+    StreamTcpThreadCacheEnable();
 
     *data = (void *)stt;
 
     stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
     stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv);
     stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv);
+    stt->counter_tcp_ssn_from_cache = StatsRegisterCounter("tcp.ssn_from_cache", tv);
+    stt->counter_tcp_ssn_from_pool = StatsRegisterCounter("tcp.ssn_from_pool", tv);
     stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv);
     stt->counter_tcp_pseudo_failed = StatsRegisterCounter("tcp.pseudo_failed", tv);
     stt->counter_tcp_invalid_checksum = StatsRegisterCounter("tcp.invalid_checksum", tv);
@@ -5416,6 +5433,9 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
         SCReturnInt(TM_ECODE_FAILED);
 
     stt->ra_ctx->counter_tcp_segment_memcap = StatsRegisterCounter("tcp.segment_memcap_drop", tv);
+    stt->ra_ctx->counter_tcp_segment_from_cache =
+            StatsRegisterCounter("tcp.segment_from_cache", tv);
+    stt->ra_ctx->counter_tcp_segment_from_pool = StatsRegisterCounter("tcp.segment_from_pool", tv);
     stt->ra_ctx->counter_tcp_stream_depth = StatsRegisterCounter("tcp.stream_depth_reached", tv);
     stt->ra_ctx->counter_tcp_reass_gap = StatsRegisterCounter("tcp.reassembly_gap", tv);
     stt->ra_ctx->counter_tcp_reass_overlap = StatsRegisterCounter("tcp.overlap", tv);
index a2d857ee1fc54019c5cd29d99050b596b74e284a..3a4b0e9e57fc4927e7ebeb4c59d14a3e14d3ca2e 100644 (file)
@@ -84,6 +84,8 @@ typedef struct StreamTcpThread_ {
     uint16_t counter_tcp_sessions;
     /** sessions not picked up because memcap was reached */
     uint16_t counter_tcp_ssn_memcap;
+    uint16_t counter_tcp_ssn_from_cache;
+    uint16_t counter_tcp_ssn_from_pool;
     /** pseudo packets processed */
     uint16_t counter_tcp_pseudo;
     /** pseudo packets failed to setup */
@@ -211,5 +213,8 @@ void StreamTcpUpdateAppLayerProgress(TcpSession *ssn, char direction,
 uint64_t StreamTcpGetAcked(const TcpStream *stream);
 uint64_t StreamTcpGetUsable(const TcpStream *stream, const bool eof);
 
+void StreamTcpThreadCacheEnable(void);
+void StreamTcpThreadCacheCleanup(void);
+
 #endif /* __STREAM_TCP_H__ */
 
index c6eec9b902f3acfcc7727b08fe2968145ea9c14b..f3c0ff0f8817c99290fa9cb868b08e73a4bee2e6 100644 (file)
@@ -47,7 +47,7 @@ static int StreamTcpTest01(void)
     FLOW_INITIALIZE(&f);
     p->flow = &f;
     StreamTcpUTInit(&stt.ra_ctx);
-    TcpSession *ssn = StreamTcpNewSession(p, 0);
+    TcpSession *ssn = StreamTcpNewSession(NULL, &stt, p, 0);
     FAIL_IF_NULL(ssn);
     f.protoctx = ssn;
     FAIL_IF_NOT_NULL(f.alparser);
index 8ed982bb236f52393ef7a3800eac3a6707330e97..d80d29570b3aaf03b310701a7ee50c7413dff43f 100644 (file)
@@ -177,7 +177,7 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id)
     data = PoolGet(e->pool);
     SCMutexUnlock(&e->lock);
     if (data) {
-        PoolThreadReserved *did = data;
+        PoolThreadId *did = data;
         *did = id;
     }
 
@@ -186,7 +186,7 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id)
 
 void PoolThreadReturn(PoolThread *pt, void *data)
 {
-    PoolThreadReserved *id = data;
+    PoolThreadId *id = data;
 
     if (pt == NULL || *id >= pt->size)
         return;
@@ -199,9 +199,30 @@ void PoolThreadReturn(PoolThread *pt, void *data)
     SCMutexUnlock(&e->lock);
 }
 
+void PoolThreadLock(PoolThread *pt, PoolThreadId id)
+{
+    BUG_ON(pt == NULL || id >= pt->size);
+    PoolThreadElement *e = &pt->array[id];
+    SCMutexLock(&e->lock);
+}
+
+void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data)
+{
+    BUG_ON(pt == NULL || id >= pt->size);
+    PoolThreadElement *e = &pt->array[id];
+    PoolReturn(e->pool, data);
+}
+
+void PoolThreadUnlock(PoolThread *pt, PoolThreadId id)
+{
+    BUG_ON(pt == NULL || id >= pt->size);
+    PoolThreadElement *e = &pt->array[id];
+    SCMutexUnlock(&e->lock);
+}
+
 #ifdef UNITTESTS
 struct PoolThreadTestData {
-    PoolThreadReserved res;
+    PoolThreadId res;
     int abc;
 };
 
index 47e343b2c4e4863e4901cc67792ba836b786a862..44b76e75e2924954d74423cd68a625068d48d697 100644 (file)
@@ -57,7 +57,7 @@ typedef struct PoolThread_ {
 
 /** per data item reserved data containing the
  *  thread pool id */
-typedef uint16_t PoolThreadReserved;
+typedef uint16_t PoolThreadId;
 
 void PoolThreadRegisterTests(void);
 
@@ -91,6 +91,10 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id);
  *  \param data memory block to return, with PoolThreadReserved as it's first member */
 void PoolThreadReturn(PoolThread *pt, void *data);
 
+void PoolThreadLock(PoolThread *pt, PoolThreadId id);
+void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data);
+void PoolThreadUnlock(PoolThread *pt, PoolThreadId id);
+
 /** \brief get size of PoolThread (number of 'threads', so array elements)
  *  \param pt thread pool
  *  \retval size or -1 on error */