Add thread local cache to avoid locking overhead for ssns and segments.
A thread will return segments/ssns to a local cache first, and if that
is full, to a return queue where the actual return to the pool returns
a batch, to amortize locking overhead.
Adds segment and session pool/cache counters to see where how effective
the cache is.
source-windivert-prototypes.h \
stream.h \
stream-tcp.h \
+ stream-tcp-cache.h \
stream-tcp-inline.h \
stream-tcp-list.h \
stream-tcp-private.h \
source-windivert.c \
stream.c \
stream-tcp.c \
+ stream-tcp-cache.c \
stream-tcp-inline.c \
stream-tcp-list.c \
stream-tcp-reassemble.c \
static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
{
+ StreamTcpThreadCacheCleanup();
PacketPoolDestroy();
SCFree(data);
return TM_ECODE_OK;
static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
{
+ StreamTcpThreadCacheCleanup();
+
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
if (ftd->output_thread_data != NULL)
OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
--- /dev/null
+/* Copyright (C) 2007-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ */
+
+#include "suricata-common.h"
+#include "stream-tcp-private.h"
+#include "stream-tcp-cache.h"
+
+typedef struct TcpPoolCache {
+ bool cache_enabled; /**< cache should only be enabled for worker threads */
+ TcpSegment *segs_cache[64];
+ uint32_t segs_cache_idx;
+ uint32_t segs_returns_idx;
+ TcpSegment *segs_returns[64];
+
+ TcpSession *ssns_cache[64];
+ uint32_t ssns_cache_idx;
+ uint32_t ssns_returns_idx;
+ TcpSession *ssns_returns[64];
+} TcpPoolCache;
+
+static thread_local TcpPoolCache tcp_pool_cache;
+extern PoolThread *ssn_pool;
+extern PoolThread *segment_thread_pool;
+
+/** \brief enable segment cache. Should only be done for worker threads */
+void StreamTcpThreadCacheEnable(void)
+{
+ tcp_pool_cache.cache_enabled = true;
+}
+
+void StreamTcpThreadCacheReturnSegment(TcpSegment *seg)
+{
+ SCEnter();
+#ifdef UNITTESTS
+ if (RunmodeIsUnittests()) {
+ PoolThreadReturn(segment_thread_pool, seg);
+ SCReturn;
+ }
+#endif
+
+ /* cache can have segs from any pool id */
+ if (tcp_pool_cache.cache_enabled && tcp_pool_cache.segs_cache_idx < 64) {
+ tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx++] = seg;
+ } else {
+ /* segs_returns should only have a single pool id. If ours is different,
+ * flush it. */
+ bool flush = false;
+ if (tcp_pool_cache.segs_returns_idx &&
+ tcp_pool_cache.segs_returns[0]->pool_id != seg->pool_id) {
+ flush = true;
+ }
+ if (tcp_pool_cache.segs_returns_idx == 64) {
+ flush = true;
+ }
+
+ if (flush) {
+ PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id;
+ PoolThreadLock(segment_thread_pool, pool_id);
+ for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) {
+ TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i];
+ PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg);
+ }
+ PoolThreadUnlock(segment_thread_pool, pool_id);
+ tcp_pool_cache.segs_returns_idx = 0;
+ }
+
+ tcp_pool_cache.segs_returns[tcp_pool_cache.segs_returns_idx++] = seg;
+ }
+}
+
+void StreamTcpThreadCacheReturnSession(TcpSession *ssn)
+{
+ SCEnter();
+#ifdef UNITTESTS
+ if (RunmodeIsUnittests()) {
+ PoolThreadReturn(ssn_pool, ssn);
+ SCReturn;
+ }
+#endif
+
+ /* cache can have ssns from any pool id */
+ if (tcp_pool_cache.cache_enabled && tcp_pool_cache.ssns_cache_idx < 64) {
+ tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx++] = ssn;
+ } else {
+ /* ssns_returns should only have a single pool id. If ours is different,
+ * flush it. */
+ bool flush = false;
+ if (tcp_pool_cache.ssns_returns_idx &&
+ tcp_pool_cache.ssns_returns[0]->pool_id != ssn->pool_id) {
+ flush = true;
+ }
+ if (tcp_pool_cache.ssns_returns_idx == 64) {
+ flush = true;
+ }
+
+ if (flush) {
+ PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id;
+ PoolThreadLock(ssn_pool, pool_id);
+ for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) {
+ TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i];
+ PoolThreadReturnRaw(ssn_pool, pool_id, ret_ssn);
+ }
+ PoolThreadUnlock(ssn_pool, pool_id);
+ tcp_pool_cache.ssns_returns_idx = 0;
+ }
+
+ tcp_pool_cache.ssns_returns[tcp_pool_cache.ssns_returns_idx++] = ssn;
+ }
+ SCReturn;
+}
+
+void StreamTcpThreadCacheCleanup(void)
+{
+ SCEnter();
+
+ /* segments */
+ SCLogDebug("tcp_pool_cache.segs_cache_idx %u", tcp_pool_cache.segs_cache_idx);
+ for (uint32_t i = 0; i < tcp_pool_cache.segs_cache_idx; i++) {
+ PoolThreadReturn(segment_thread_pool, tcp_pool_cache.segs_cache[i]);
+ }
+ tcp_pool_cache.segs_cache_idx = 0;
+
+ SCLogDebug("tcp_pool_cache.segs_returns_idx %u", tcp_pool_cache.segs_returns_idx);
+ if (tcp_pool_cache.segs_returns_idx) {
+ PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id;
+ PoolThreadLock(segment_thread_pool, pool_id);
+ for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) {
+ TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i];
+ PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg);
+ }
+ PoolThreadUnlock(segment_thread_pool, pool_id);
+ tcp_pool_cache.segs_returns_idx = 0;
+ }
+
+ /* sessions */
+ SCLogDebug("tcp_pool_cache.ssns_cache_idx %u", tcp_pool_cache.ssns_cache_idx);
+ for (uint32_t i = 0; i < tcp_pool_cache.ssns_cache_idx; i++) {
+ PoolThreadReturn(segment_thread_pool, tcp_pool_cache.ssns_cache[i]);
+ }
+ tcp_pool_cache.ssns_cache_idx = 0;
+
+ SCLogDebug("tcp_pool_cache.ssns_returns_idx %u", tcp_pool_cache.ssns_returns_idx);
+ if (tcp_pool_cache.ssns_returns_idx) {
+ PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id;
+ PoolThreadLock(segment_thread_pool, pool_id);
+ for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) {
+ TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i];
+ PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_ssn);
+ }
+ PoolThreadUnlock(segment_thread_pool, pool_id);
+ tcp_pool_cache.ssns_returns_idx = 0;
+ }
+
+ SCReturn;
+}
+
+TcpSegment *StreamTcpThreadCacheGetSegment(void)
+{
+ if (tcp_pool_cache.segs_cache_idx) {
+ TcpSegment *seg = tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx - 1];
+ tcp_pool_cache.segs_cache_idx--;
+ memset(&seg->sbseg, 0, sizeof(seg->sbseg));
+ return seg;
+ }
+ return NULL;
+}
+
+TcpSession *StreamTcpThreadCacheGetSession(void)
+{
+ if (tcp_pool_cache.ssns_cache_idx) {
+ TcpSession *ssn = tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx - 1];
+ tcp_pool_cache.ssns_cache_idx--;
+ return ssn;
+ }
+ return NULL;
+}
--- /dev/null
+/* Copyright (C) 2007-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ */
+
+#ifndef __STREAM_TCP_CACHE_H__
+#define __STREAM_TCP_CACHE_H__
+
+#include "suricata.h"
+#include "flow.h"
+#include "stream-tcp-private.h"
+
+void StreamTcpThreadCacheEnable(void);
+void StreamTcpThreadCacheReturnSegment(TcpSegment *seg);
+void StreamTcpThreadCacheReturnSession(TcpSession *ssn);
+void StreamTcpThreadCacheCleanup(void);
+
+TcpSegment *StreamTcpThreadCacheGetSegment(void);
+TcpSession *StreamTcpThreadCacheGetSession(void);
+
+#endif /* __STREAM_TCP_CACHE_H__ */
} TcpSegmentPcapHdrStorage;
typedef struct TcpSegment {
- PoolThreadReserved res;
+ PoolThreadId pool_id;
uint16_t payload_len; /**< actual size of the payload */
uint32_t seq;
RB_ENTRY(TcpSegment) __attribute__((__packed__)) rb;
}
typedef struct TcpSession_ {
- PoolThreadReserved res;
+ PoolThreadId pool_id;
uint8_t state:4; /**< tcp state from state enum */
uint8_t pstate:4; /**< previous state */
uint8_t queue_len; /**< length of queue list below */
#include "stream-tcp.h"
#include "stream-tcp-private.h"
+#include "stream-tcp-cache.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp-inline.h"
#include "stream-tcp-list.h"
thread_local uint64_t t_pcapcnt = UINT64_MAX;
-static PoolThread *segment_thread_pool = NULL;
+PoolThread *segment_thread_pool = NULL;
/* init only, protect initializing and growing pool */
static SCMutex segment_thread_pool_mutex = SCMUTEX_INITIALIZER;
seg->pcap_hdr_storage->pktlen = 0;
}
- PoolThreadReturn(segment_thread_pool, seg);
+ StreamTcpThreadCacheReturnSegment(seg);
}
/**
void StreamTcpReassembleFreeThreadCtx(TcpReassemblyThreadCtx *ra_ctx)
{
SCEnter();
+ StreamTcpThreadCacheCleanup();
+
if (ra_ctx) {
AppLayerDestroyCtxThread(ra_ctx->app_tctx);
SCFree(ra_ctx);
*/
TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx)
{
- TcpSegment *seg = (TcpSegment *)PoolThreadGetById(
+ TcpSegment *seg = StreamTcpThreadCacheGetSegment();
+ if (seg) {
+ StatsIncr(tv, ra_ctx->counter_tcp_segment_from_cache);
+ memset(&seg->sbseg, 0, sizeof(seg->sbseg));
+ return seg;
+ }
+
+ seg = (TcpSegment *)PoolThreadGetById(
segment_thread_pool, (uint16_t)ra_ctx->segment_thread_pool_id);
SCLogDebug("seg we return is %p", seg);
if (seg == NULL) {
StatsIncr(tv, ra_ctx->counter_tcp_segment_memcap);
} else {
memset(&seg->sbseg, 0, sizeof(seg->sbseg));
+ StatsIncr(tv, ra_ctx->counter_tcp_segment_from_pool);
}
return seg;
/** TCP segments which are not being reassembled due to memcap was reached */
uint16_t counter_tcp_segment_memcap;
+
+ uint16_t counter_tcp_segment_from_cache;
+ uint16_t counter_tcp_segment_from_pool;
+
/** number of streams that stop reassembly because their depth is reached */
uint16_t counter_tcp_stream_depth;
/** count number of streams with a unrecoverable stream gap (missing pkts) */
#include "util-device.h"
#include "stream-tcp-private.h"
-#include "stream-tcp-reassemble.h"
#include "stream-tcp.h"
+#include "stream-tcp-cache.h"
#include "stream-tcp-inline.h"
+#include "stream-tcp-reassemble.h"
#include "stream-tcp-sack.h"
#include "stream-tcp-util.h"
#include "stream.h"
extern thread_local uint64_t t_pcapcnt;
extern int g_detect_disabled;
-static PoolThread *ssn_pool = NULL;
+PoolThread *ssn_pool = NULL;
static SCMutex ssn_pool_mutex = SCMUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */
#ifdef DEBUG
static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */
StreamTcpSessionCleanup(ssn);
/* HACK: don't loose track of thread id */
- PoolThreadReserved a = ssn->res;
+ PoolThreadId pool_id = ssn->pool_id;
memset(ssn, 0, sizeof(TcpSession));
- ssn->res = a;
+ ssn->pool_id = pool_id;
- PoolThreadReturn(ssn_pool, ssn);
+ StreamTcpThreadCacheReturnSession(ssn);
#ifdef DEBUG
SCMutexLock(&ssn_pool_mutex);
ssn_pool_cnt--;
*
* \retval ssn new TCP session.
*/
-static TcpSession *StreamTcpNewSession (Packet *p, int id)
+static TcpSession *StreamTcpNewSession(ThreadVars *tv, StreamTcpThread *stt, Packet *p, int id)
{
TcpSession *ssn = (TcpSession *)p->flow->protoctx;
if (ssn == NULL) {
DEBUG_VALIDATE_BUG_ON(id < 0 || id > UINT16_MAX);
- p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id);
+ p->flow->protoctx = StreamTcpThreadCacheGetSession();
+ if (p->flow->protoctx != NULL) {
+#ifdef UNITTESTS
+ if (tv)
+#endif
+ StatsIncr(tv, stt->counter_tcp_ssn_from_cache);
+ } else {
+ p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id);
+ if (p->flow->protoctx != NULL)
+#ifdef UNITTESTS
+ if (tv)
+#endif
+ StatsIncr(tv, stt->counter_tcp_ssn_from_pool);
+ }
#ifdef DEBUG
SCMutexLock(&ssn_pool_mutex);
if (p->flow->protoctx != NULL)
SCLogDebug("midstream picked up");
if (ssn == NULL) {
- ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
+ ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
if (ssn == NULL) {
StatsIncr(tv, stt->counter_tcp_ssn_memcap);
return -1;
} else if (p->tcph->th_flags & TH_SYN) {
if (ssn == NULL) {
- ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
+ ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
if (ssn == NULL) {
StatsIncr(tv, stt->counter_tcp_ssn_memcap);
return -1;
SCLogDebug("midstream picked up");
if (ssn == NULL) {
- ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
+ ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
if (ssn == NULL) {
StatsIncr(tv, stt->counter_tcp_ssn_memcap);
return -1;
SCReturnInt(TM_ECODE_FAILED);
memset(stt, 0, sizeof(StreamTcpThread));
stt->ssn_pool_id = -1;
+ StreamTcpThreadCacheEnable();
*data = (void *)stt;
stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv);
stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv);
+ stt->counter_tcp_ssn_from_cache = StatsRegisterCounter("tcp.ssn_from_cache", tv);
+ stt->counter_tcp_ssn_from_pool = StatsRegisterCounter("tcp.ssn_from_pool", tv);
stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv);
stt->counter_tcp_pseudo_failed = StatsRegisterCounter("tcp.pseudo_failed", tv);
stt->counter_tcp_invalid_checksum = StatsRegisterCounter("tcp.invalid_checksum", tv);
SCReturnInt(TM_ECODE_FAILED);
stt->ra_ctx->counter_tcp_segment_memcap = StatsRegisterCounter("tcp.segment_memcap_drop", tv);
+ stt->ra_ctx->counter_tcp_segment_from_cache =
+ StatsRegisterCounter("tcp.segment_from_cache", tv);
+ stt->ra_ctx->counter_tcp_segment_from_pool = StatsRegisterCounter("tcp.segment_from_pool", tv);
stt->ra_ctx->counter_tcp_stream_depth = StatsRegisterCounter("tcp.stream_depth_reached", tv);
stt->ra_ctx->counter_tcp_reass_gap = StatsRegisterCounter("tcp.reassembly_gap", tv);
stt->ra_ctx->counter_tcp_reass_overlap = StatsRegisterCounter("tcp.overlap", tv);
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
uint16_t counter_tcp_ssn_memcap;
+ uint16_t counter_tcp_ssn_from_cache;
+ uint16_t counter_tcp_ssn_from_pool;
/** pseudo packets processed */
uint16_t counter_tcp_pseudo;
/** pseudo packets failed to setup */
uint64_t StreamTcpGetAcked(const TcpStream *stream);
uint64_t StreamTcpGetUsable(const TcpStream *stream, const bool eof);
+void StreamTcpThreadCacheEnable(void);
+void StreamTcpThreadCacheCleanup(void);
+
#endif /* __STREAM_TCP_H__ */
FLOW_INITIALIZE(&f);
p->flow = &f;
StreamTcpUTInit(&stt.ra_ctx);
- TcpSession *ssn = StreamTcpNewSession(p, 0);
+ TcpSession *ssn = StreamTcpNewSession(NULL, &stt, p, 0);
FAIL_IF_NULL(ssn);
f.protoctx = ssn;
FAIL_IF_NOT_NULL(f.alparser);
data = PoolGet(e->pool);
SCMutexUnlock(&e->lock);
if (data) {
- PoolThreadReserved *did = data;
+ PoolThreadId *did = data;
*did = id;
}
void PoolThreadReturn(PoolThread *pt, void *data)
{
- PoolThreadReserved *id = data;
+ PoolThreadId *id = data;
if (pt == NULL || *id >= pt->size)
return;
SCMutexUnlock(&e->lock);
}
+void PoolThreadLock(PoolThread *pt, PoolThreadId id)
+{
+ BUG_ON(pt == NULL || id >= pt->size);
+ PoolThreadElement *e = &pt->array[id];
+ SCMutexLock(&e->lock);
+}
+
+void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data)
+{
+ BUG_ON(pt == NULL || id >= pt->size);
+ PoolThreadElement *e = &pt->array[id];
+ PoolReturn(e->pool, data);
+}
+
+void PoolThreadUnlock(PoolThread *pt, PoolThreadId id)
+{
+ BUG_ON(pt == NULL || id >= pt->size);
+ PoolThreadElement *e = &pt->array[id];
+ SCMutexUnlock(&e->lock);
+}
+
#ifdef UNITTESTS
struct PoolThreadTestData {
- PoolThreadReserved res;
+ PoolThreadId res;
int abc;
};
/** per data item reserved data containing the
* thread pool id */
-typedef uint16_t PoolThreadReserved;
+typedef uint16_t PoolThreadId;
void PoolThreadRegisterTests(void);
* \param data memory block to return, with PoolThreadReserved as it's first member */
void PoolThreadReturn(PoolThread *pt, void *data);
+void PoolThreadLock(PoolThread *pt, PoolThreadId id);
+void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data);
+void PoolThreadUnlock(PoolThread *pt, PoolThreadId id);
+
/** \brief get size of PoolThread (number of 'threads', so array elements)
* \param pt thread pool
* \retval size or -1 on error */