add_library( std_connector OBJECT
+ std_connector_buffer.cc
+ std_connector_buffer.h
std_connector.cc
std_connector.h
)
std_connector pre-configures 3 default connectors:
* stdout: default transmit connector
* stdin: default receive connector
-* stdio: default duplex connector
\ No newline at end of file
+* stdio: default duplex connector
+
+---
+
+The connector is able to synchronize printouts from multiple threads. It
+leverages buffers. `Ring2` buffer guarantees asynchronous reading and writing
+operations to the buffer, implementing the following scheme for data path
+"Writer->Buffer->Readers[]".
+
+`StdConnectorBuffer` instance is expected to be unique per Snort configuration.
+It ensures that the 1-1-? scheme above is strictly followed:
+
+* Public part grants `Writer` objects only
++
+1. No way for client code to break the scheme.
+2. Connector instance can acquire a buffer.
+3. Connector writes data to the buffer via Writer.
+4. Connector should abandon Writers before Snort configuration removes `StdConnectorBuffer` instance.
+
+* Private part keeps `Ring2` and `Reader`
++
+1. Two last pieces complete the scheme.
+2. Resource management is encapsulated.
+3. All readers are available in a single place (for the printing thread).
+4. The printing thread builds the list of readers from the actual pool of buffers.
+
+Once connector instance gets a writer, its output becomes buffered and
+synchronized with other instances via `StdConnectorBuffer` instance.
{
PegCount messages_received;
PegCount messages_transmitted;
+ PegCount tx_messages_stalled;
};
THREAD_LOCAL StdConnectorStats std_connector_stats;
{ "direction", Parameter::PT_ENUM, "receive | transmit | duplex", nullptr,
"usage" },
+ { "buffer_size", Parameter::PT_INT, "0:max32", "0",
+ "per-instance buffer size in bytes (0 no buffering, otherwise buffered and synchronized across threads)" },
+
+ { "redirect", Parameter::PT_STRING, nullptr, nullptr,
+ "output file name where printout is redirected" },
+
{ nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
};
{
{ CountType::SUM, "messages_received", "total number of messages received" },
{ CountType::SUM, "messages_transmitted", "total number of messages transmitted" },
+ { CountType::SUM, "messages_stalled", "total number of messages attempted for transmission but overflowed" },
{ CountType::END, nullptr, nullptr }
};
{
if (idx == 0 and strcmp(mod, "std_connector") == 0)
{
- auto out_conn = std::make_unique<snort::ConnectorConfig>();
+ auto out_conn = std::make_unique<StdConnectorConfig>();
out_conn->direction = Connector::CONN_TRANSMIT;
out_conn->connector_name = "stdout";
+ out_conn->buffer_size = 0;
config_set.push_back(std::move(out_conn));
- auto in_conn = std::make_unique<snort::ConnectorConfig>();
+ auto in_conn = std::make_unique<StdConnectorConfig>();
in_conn->direction = Connector::CONN_RECEIVE;
in_conn->connector_name = "stdin";
config_set.push_back(std::move(in_conn));
- auto io_conn = std::make_unique<snort::ConnectorConfig>();
+ auto io_conn = std::make_unique<StdConnectorConfig>();
io_conn->direction = Connector::CONN_DUPLEX;
io_conn->connector_name = "stdio";
+ io_conn->buffer_size = 0;
config_set.push_back(std::move(io_conn));
}
if ( !config )
- config = std::make_unique<snort::ConnectorConfig>();
+ config = std::make_unique<StdConnectorConfig>();
return true;
}
return false;
}
}
+ else if ( v.is("buffer_size") )
+ config->buffer_size = v.get_uint32();
+ else if ( v.is("redirect") )
+ config->output = v.get_string();
+
return true;
}
// connector stuff
//-------------------------------------------------------------------------
-StdConnector::StdConnector(const snort::ConnectorConfig& conf) :
- snort::Connector(conf), extr_std_log(TextLog_Init("stdout"))
-{ }
+static StdConnectorBuffer fail_safe_buffer(nullptr);
+static Ring2 fail_safe_ring(0);
+
+StdConnectorCommon::StdConnectorCommon(snort::ConnectorConfig::ConfigSet&& c_s)
+ : ConnectorCommon(std::move(c_s))
+{
+ for (auto& config : config_set)
+ {
+ StdConnectorConfig& std_config = (StdConnectorConfig&)*config;
+
+ if (std_config.buffer_size > 0)
+ std_config.buffer = &buffers.emplace_back(std_config.output.c_str());
+ }
+}
+
+StdConnector::StdConnector(const StdConnectorConfig& conf) :
+ snort::Connector(conf),
+ buffered(conf.buffer and conf.buffer_size != 0),
+ text_log(TextLog_Init(conf.output.c_str(), false)),
+ writer(buffered ? conf.buffer->acquire(conf.buffer_size) : fail_safe_ring.writer()),
+ buffer(conf.buffer ? *conf.buffer : fail_safe_buffer)
+{
+ buffer.start();
+}
StdConnector::~StdConnector()
-{ TextLog_Term(extr_std_log); }
+{
+ TextLog_Term(text_log);
+
+ if (buffered)
+ buffer.release(writer);
+}
bool StdConnector::internal_transmit_message(const ConnectorMsg& msg)
{
if ( !msg.get_data() or msg.get_length() == 0 )
return false;
- return TextLog_Print(extr_std_log, "%.*s\n", msg.get_length(), msg.get_data()) &&
- ++(std_connector_stats.messages_transmitted);
+ if (!buffered)
+ return TextLog_Print(text_log, "%.*s\n", msg.get_length(), msg.get_data())
+ and ++std_connector_stats.messages_transmitted;
+
+#ifdef REG_TEST
+ const bool safe = false;
+#else
+ const bool safe = true;
+#endif
+
+ bool success = false;
+
+ do
+ {
+ writer.retry();
+ success = writer.write(msg.get_data(), msg.get_length());
+ } while (!success and !safe);
+
+ writer.push();
+
+ if (success)
+ ++std_connector_stats.messages_transmitted;
+ else
+ ++std_connector_stats.tx_messages_stalled;
+
+ return success;
}
bool StdConnector::transmit_message(const ConnectorMsg& msg, const ID&)
{ return internal_transmit_message(msg); }
bool StdConnector::flush()
-{ return TextLog_Flush(extr_std_log); }
+{
+ if (!buffered)
+ return TextLog_Flush(text_log);
+
+ writer.push();
+
+ return true;
+}
ConnectorMsg StdConnector::receive_message(bool)
{
{ delete m; }
static Connector* std_connector_tinit(const ConnectorConfig& config)
-{ return new StdConnector(config); }
+{
+ const StdConnectorConfig& conf = (const StdConnectorConfig&)config;
+
+ return new StdConnector(conf);
+}
static void std_connector_tterm(Connector* connector)
{ delete connector; }
static ConnectorCommon* std_connector_ctor(Module* m)
{
StdConnectorModule* mod = (StdConnectorModule*)m;
- ConnectorCommon* std_connector_common = new ConnectorCommon(mod->get_and_clear_config());
+ ConnectorCommon* std_connector_common = new StdConnectorCommon(mod->get_and_clear_config());
+
return std_connector_common;
}
#ifndef STD_CONNECTOR_H
#define STD_CONNECTOR_H
+#include <list>
+#include <string>
+
#include "framework/connector.h"
#include "framework/module.h"
+#include "helpers/ring2.h"
+
+#include "std_connector_buffer.h"
#define S_NAME "std_connector"
#define S_HELP "implement the stdout/stdin based connector"
struct TextLog;
+//-------------------------------------------------------------------------
+// config
+//-------------------------------------------------------------------------
+
+class StdConnectorConfig : public snort::ConnectorConfig
+{
+public:
+ uint32_t buffer_size = 0;
+ StdConnectorBuffer* buffer = nullptr;
+ std::string output = "stdout";
+};
+
//-------------------------------------------------------------------------
// module stuff
//-------------------------------------------------------------------------
private:
snort::ConnectorConfig::ConfigSet config_set;
- std::unique_ptr<snort::ConnectorConfig> config;
+ std::unique_ptr<StdConnectorConfig> config;
};
//-------------------------------------------------------------------------
// connector stuff
//-------------------------------------------------------------------------
+class StdConnectorCommon : public snort::ConnectorCommon
+{
+public:
+ StdConnectorCommon(snort::ConnectorConfig::ConfigSet&&);
+
+private:
+ std::list<StdConnectorBuffer> buffers;
+};
+
class StdConnector : public snort::Connector
{
public:
- StdConnector(const snort::ConnectorConfig& conf);
+ StdConnector(const StdConnectorConfig& conf);
~StdConnector() override;
bool transmit_message(const snort::ConnectorMsg&, const ID& = null) override;
private:
bool internal_transmit_message(const snort::ConnectorMsg&);
- TextLog* extr_std_log;
+ bool buffered;
+ TextLog* text_log;
+ Ring2::Writer writer;
+ StdConnectorBuffer& buffer;
};
#endif
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// std_connector_buffer.cc author Cisco
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "std_connector_buffer.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "log/text_log.h"
+#include "main/snort.h"
+#include "main/snort_config.h"
+#include "main/thread_config.h"
+#include "utils/util.h"
+
+using namespace snort;
+using namespace std;
+
+#define FLUSHER_THREAD_NAME "std_connector.flusher"
+
+static void flusher(const SnortConfig* initial_config, const char* output,
+ mutex& rings_mutex, list<Ring2>& rings, atomic_flag& latest, atomic_flag& run)
+{
+ ThreadConfig *thread_config = initial_config->thread_config;
+ thread_config->implement_named_thread_affinity(FLUSHER_THREAD_NAME);
+ SET_THREAD_NAME(pthread_self(), FLUSHER_THREAD_NAME);
+
+ SnortConfig local_config;
+ local_config.merge(initial_config);
+ SnortConfig::set_conf(&local_config);
+
+ TextLog* out = TextLog_Init(output, false);
+ list<Ring2::Reader> readers;
+
+ latest.clear();
+
+ while (run.test_and_set())
+ {
+ if (!rings_mutex.try_lock())
+ continue;
+
+ if (!latest.test_and_set())
+ {
+ readers.clear();
+ for (auto& ring : rings)
+ readers.emplace_back(ring.reader());
+ }
+
+ for (auto& reader : readers)
+ {
+ reader.retry();
+
+ size_t data_len = 0;
+ void* data = nullptr;
+
+ while ((data = reader.read(data_len)) and data_len > 0)
+ TextLog_Print(out, "%.*s\n", (int)data_len, (char*)data);
+
+ reader.pop();
+ }
+
+ rings_mutex.unlock();
+
+ TextLog_Flush(out);
+
+ this_thread::yield();
+ }
+
+ {
+ lock_guard<mutex> lock(rings_mutex);
+
+ if (!latest.test_and_set())
+ {
+ readers.clear();
+ for (auto& ring : rings)
+ readers.emplace_back(ring.reader());
+ }
+
+ for (auto& reader : readers)
+ {
+ reader.retry();
+
+ size_t data_len = 0;
+ void* data = nullptr;
+
+ while ((data = reader.read(data_len)) and data_len > 0)
+ TextLog_Print(out, "%.*s\n", (int)data_len, (char*)data);
+
+ reader.pop();
+ }
+ }
+
+ TextLog_Flush(out);
+ TextLog_Term(out);
+ SnortConfig::set_conf(nullptr);
+}
+
+StdConnectorBuffer::StdConnectorBuffer(const char* output)
+{
+ if (!output)
+ return;
+
+ destination = output;
+}
+
+StdConnectorBuffer::~StdConnectorBuffer()
+{
+ sink_run.clear();
+
+ if (sink)
+ sink->join();
+
+ delete sink;
+}
+
+void StdConnectorBuffer::start()
+{
+ scoped_lock<mutex, mutex> lock(start_mutex, rings_mutex);
+
+ if (sink)
+ return;
+
+ if (destination.empty())
+ return;
+
+ auto sc = SnortConfig::get_conf();
+
+ if (!sc)
+ return;
+
+ sink_latest.test_and_set();
+ sink_run.test_and_set();
+ // coverity[missing_lock]
+ sink = new thread(flusher, sc, destination.c_str(), ref(rings_mutex), ref(rings), ref(sink_latest), ref(sink_run));
+
+ while (sink_latest.test_and_set());
+}
+
+Ring2::Writer StdConnectorBuffer::acquire(size_t buffer_size)
+{
+ lock_guard<mutex> lock(rings_mutex);
+
+ rings.emplace_back(buffer_size);
+
+ sink_latest.clear();
+
+ return rings.back().writer();
+}
+
+bool StdConnectorBuffer::release(const Ring2::Writer& writer)
+{
+ lock_guard<mutex> lock(rings_mutex);
+
+ // check for removed rings if they can be deleted
+ bool updated = false;
+ auto ring_removed = rings_removed.begin();
+
+ while (ring_removed != rings_removed.end())
+ {
+ if ((*ring_removed)->empty())
+ {
+ auto ring = *ring_removed;
+ rings.erase(ring);
+ ring_removed = rings_removed.erase(ring_removed);
+ updated = true;
+ }
+ else
+ ring_removed++;
+ }
+
+ if (updated)
+ sink_latest.clear();
+
+ // mark the ring for deletion
+ auto ring = find_if(rings.begin(), rings.end(),
+ [&](const Ring2& r) { return r.native(writer); });
+
+ if (ring == rings.end())
+ return false;
+
+ rings_removed.push_back(ring);
+
+ return true;
+}
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// std_connector_buffer.h author Cisco
+
+#ifndef STD_CONNECTOR_BUFFER_H
+#define STD_CONNECTOR_BUFFER_H
+
+#include <atomic>
+#include <list>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "helpers/ring2.h"
+
+struct TextLog;
+
+class StdConnectorBuffer
+{
+public:
+ StdConnectorBuffer(const char* output);
+ ~StdConnectorBuffer();
+
+ void start();
+
+ Ring2::Writer acquire(size_t buffer_size);
+ bool release(const Ring2::Writer&);
+
+private:
+ std::string destination;
+ std::mutex start_mutex;
+
+ std::mutex rings_mutex;
+ std::list<Ring2> rings;
+ std::list<std::list<Ring2>::iterator> rings_removed;
+ std::thread* sink{nullptr};
+ std::atomic_flag sink_latest{false};
+ std::atomic_flag sink_run{false};
+};
+
+#endif
#include <CppUTest/CommandLineTestRunner.h>
#include <CppUTest/TestHarness.h>
-using namespace snort;
+StdConnectorBuffer::StdConnectorBuffer(const char*) {}
+StdConnectorBuffer::~StdConnectorBuffer() {}
+void StdConnectorBuffer::start() {}
+Ring2::Writer StdConnectorBuffer::acquire(unsigned long) { return Ring2(0).writer(); }
+bool StdConnectorBuffer::release(Ring2::Writer const&) { return false; }
+using namespace snort;
extern const BaseApi* std_connector;
const ConnectorApi* stdc_api = nullptr;
-ConnectorConfig connector_transmit_config;
-ConnectorConfig connector_receive_config;
+StdConnectorConfig connector_transmit_config;
+StdConnectorConfig connector_receive_config;
Module* mod;
json_stream.h
literal_search.h
memcap_allocator.h
+ ring2.h
scratch_allocator.h
sigsafe.h
utf.h
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+// ring2.h author Cisco
+
+#ifndef RING2_H
+#define RING2_H
+
+#include <atomic>
+#include <cassert>
+#include <cstring>
+
+// Ring buffer implementation with the following requirements:
+// - stored objects are contiguous in memory
+// - stored record is length-value pair, where length is private
+// - 1 reader 1 writer are supported
+// - reader and writer can be in different threads, but their ops are still thread-local
+// - in case of overflow the new object is dropped (no overwrites)
+// - FIFO
+// - read operation grants direct access (no copy)
+// - container may contain gaps (not reaching full capacity)
+
+class Ring2
+{
+public:
+ struct Writer
+ {
+ Writer(Writer&& w) : ptr(w.ptr), end(w.end), store(w.store), no_turns(w.no_turns) {}
+
+ void retry(); // reinitialize writer
+ bool write(const void*, size_t); // adds another record
+ void push(); // publishes new records
+
+ private:
+ friend Ring2;
+
+ Writer(Ring2& owner, uint8_t* cursor, uint8_t* guard, bool no_wrapping)
+ : ptr(cursor), end(guard), store(owner), no_turns(no_wrapping) {}
+
+ Writer& operator=(Writer&&);
+
+ uint8_t* ptr;
+ uint8_t* end;
+ Ring2& store;
+ bool no_turns;
+ };
+
+ struct Reader
+ {
+ Reader(Reader&& r ) : ptr(r.ptr), end(r.end), store(r.store), more_turns(r.more_turns) {}
+
+ void retry(); // reinitialize reader
+ void* read(size_t&); // gets next record
+ void pop(); // releases read records
+
+ private:
+ friend Ring2;
+
+ Reader(Ring2& owner, uint8_t* cursor, uint8_t* guard, bool wrapping)
+ : ptr(cursor), end(guard), store(owner), more_turns(wrapping) {}
+
+ Reader& operator=(Reader&&);
+
+ uint8_t* ptr;
+ uint8_t* end;
+ Ring2& store;
+ bool more_turns;
+ };
+
+ Ring2(size_t);
+ ~Ring2();
+
+ Ring2(const Ring2&) = delete;
+ Ring2& operator =(const Ring2&) = delete;
+
+ size_t capacity() const;
+ bool empty() const;
+ void clear(); // invalidates Reader and Writer unless they retry
+
+ Writer writer();
+ Reader reader();
+
+ bool native(const Writer& w) const
+ { return this == &w.store; }
+
+ bool native(const Reader& r) const
+ { return this == &r.store; }
+
+private:
+ using LEN_TYPE = uint32_t;
+ static constexpr auto LEN_SIZE = sizeof(LEN_TYPE);
+
+ uint8_t* const store;
+ uint8_t* const store_end;
+ std::atomic<uint8_t*> write_ptr;
+ std::atomic<uint8_t*> read_ptr;
+};
+
+inline Ring2::Ring2(size_t buffer_size)
+ : store(new uint8_t[buffer_size]), store_end(store + buffer_size)
+{
+ write_ptr.store(store, std::memory_order_relaxed);
+ read_ptr.store(store, std::memory_order_relaxed);
+}
+
+inline Ring2::~Ring2()
+{
+ delete[] store;
+}
+
+inline size_t Ring2::capacity() const
+{
+ return store_end - store;
+}
+
+inline bool Ring2::empty() const
+{
+ auto w = write_ptr.load(std::memory_order_acquire);
+ auto r = read_ptr.load(std::memory_order_acquire);
+
+ return w == r;
+}
+
+inline void Ring2::clear()
+{
+ write_ptr.store(store, std::memory_order_relaxed);
+ read_ptr.store(store, std::memory_order_relaxed);
+}
+
+inline Ring2::Writer Ring2::writer()
+{
+ auto w = write_ptr.load(std::memory_order_acquire);
+ auto r = read_ptr.load(std::memory_order_acquire);
+
+ auto no_wrapping = w < r;
+ auto guard = no_wrapping ? r : store_end;
+
+ return {*this, w, guard, no_wrapping};
+}
+
+inline Ring2::Reader Ring2::reader()
+{
+ auto w = write_ptr.load(std::memory_order_acquire);
+ auto r = read_ptr.load(std::memory_order_acquire);
+
+ auto wrapping = r > w;
+ auto guard = wrapping ? store_end : w;
+
+ return {*this, r, guard, wrapping};
+}
+
+inline bool Ring2::Writer::write(const void* const data, const size_t data_len)
+{
+ // empty records are not allowed
+ if (data_len == 0)
+ return false;
+
+ // a guarding byte, writer should never move to the very end
+ // (but reader can, which will empty the store)
+ if (ptr + LEN_SIZE + data_len < end)
+ {
+ // normal case
+ *(LEN_TYPE*)ptr = data_len;
+ memcpy(ptr + LEN_SIZE, data, data_len);
+ ptr += LEN_SIZE + data_len;
+ return true;
+ }
+ else if (no_turns)
+ {
+ // overflow
+ return false;
+ }
+
+ // wrapping case
+ const auto zero_record = (ptr + LEN_SIZE <= end);
+ const auto ptr_orig = ptr;
+ const auto end_orig = end;
+
+ no_turns = true;
+ ptr = store.store;
+ end = store.read_ptr.load(std::memory_order_acquire);
+
+ auto res = ptr != end and write(data, data_len);
+
+ if (res)
+ {
+ if (zero_record)
+ *(LEN_TYPE*)ptr_orig = 0;
+ }
+ else
+ {
+ no_turns = false;
+ ptr = ptr_orig;
+ end = end_orig;
+ }
+
+ return res;
+}
+
+inline void Ring2::Writer::retry()
+{
+ *this = store.writer();
+}
+
+inline void Ring2::Writer::push()
+{
+ store.write_ptr.store(ptr, std::memory_order_release);
+}
+
+// cppcheck-suppress operatorEqVarError
+inline Ring2::Writer& Ring2::Writer::operator =(Ring2::Writer&& other)
+{
+ ptr = other.ptr;
+ end = other.end;
+ no_turns = other.no_turns;
+
+ return *this;
+}
+
+inline void* Ring2::Reader::read(size_t& data_len)
+{
+ data_len = 0;
+
+ // next record size
+ if (ptr + LEN_SIZE <= end)
+ data_len = *(LEN_TYPE*)ptr;
+
+ if (data_len != 0 and ptr + LEN_SIZE + data_len <= end)
+ {
+ // normal case
+ ptr += LEN_SIZE;
+ auto data = ptr;
+ ptr += data_len;
+
+ return data;
+ }
+ else if (more_turns)
+ {
+ // wrapping case
+ more_turns = false;
+ ptr = store.store;
+ end = store.write_ptr.load(std::memory_order_acquire);
+
+ return read(data_len);
+ }
+
+ // underflow
+ assert(data_len == 0);
+ return nullptr;
+}
+
+inline void Ring2::Reader::retry()
+{
+ *this = store.reader();
+}
+
+inline void Ring2::Reader::pop()
+{
+ store.read_ptr.store(ptr, std::memory_order_release);
+}
+
+// cppcheck-suppress operatorEqVarError
+inline Ring2::Reader& Ring2::Reader::operator =(Ring2::Reader&& other)
+{
+ ptr = other.ptr;
+ end = other.end;
+ more_turns = other.more_turns;
+
+ return *this;
+}
+
+#endif
../streambuf.cc
)
+add_catch_test( ring2_test
+ SOURCES
+ ../ring2.h
+)
+
+if (ENABLE_BENCHMARK_TESTS)
+
+ add_catch_test( ring2_benchmark
+ SOURCES
+ ../ring2.h
+ )
+
+endif(ENABLE_BENCHMARK_TESTS)
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+// ring2_benchmark.cc author Cisco
+
+#ifdef BENCHMARK_TEST
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "catch/catch.hpp"
+
+#include <vector>
+
+#include "helpers/ring2.h"
+
+#define WRITE_FULL \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ \
+ BENCHMARK("full write") \
+ { \
+ while(writer.write(data, sizeof(data))); \
+ writer.retry(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define READ_FULL \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ auto reader = buffer.reader(); \
+ size_t len = 0; \
+ \
+ while (writer.write(data, sizeof(data))); \
+ writer.push(); \
+ \
+ BENCHMARK("full read") \
+ { \
+ while(reader.read(len)); \
+ reader.retry(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define WRITE_1 \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ \
+ BENCHMARK("1 write") \
+ { \
+ auto accepted = writer.write(data, sizeof(data)); \
+ \
+ if (!accepted) \
+ writer.retry(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define READ_1 \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ auto reader = buffer.reader(); \
+ size_t len = 0; \
+ \
+ while (writer.write(data, sizeof(data))); \
+ writer.push(); \
+ \
+ BENCHMARK("1 read") \
+ { \
+ auto read = reader.read(len); \
+ \
+ if (!read) \
+ reader.retry(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define WRITE_1_READ_1 \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ auto reader = buffer.reader(); \
+ size_t len = 0; \
+ \
+ BENCHMARK("1 write 1 read") \
+ { \
+ auto r1 = writer.write(data, sizeof(data)); \
+ writer.push(); \
+ \
+ auto r2 = reader.read(len); \
+ reader.pop(); \
+ \
+ return r1 and r2 and len; \
+ }; \
+ } while (0)
+
+
+#define WRITE_4_READ_4x \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ auto reader = buffer.reader(); \
+ size_t len = 0; \
+ \
+ BENCHMARK("4 writes 4x reads") \
+ { \
+ writer.write(data, sizeof(data)); \
+ writer.push(); \
+ writer.write(data, sizeof(data)); \
+ writer.push(); \
+ writer.write(data, sizeof(data)); \
+ writer.push(); \
+ writer.write(data, sizeof(data)); \
+ writer.push(); \
+ \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.pop(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define WRITE_4x_READ_4x \
+ do { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ auto reader = buffer.reader(); \
+ size_t len = 0; \
+ \
+ BENCHMARK("4x writes 4x reads") \
+ { \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.push(); \
+ \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.pop(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define WRITE_8x_READ_8x \
+ { \
+ buffer.clear(); \
+ auto writer = buffer.writer(); \
+ auto reader = buffer.reader(); \
+ size_t len = 0; \
+ \
+ BENCHMARK("8x writes 8x reads") \
+ { \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.write(data, sizeof(data)); \
+ writer.push(); \
+ \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.read(len); \
+ reader.pop(); \
+ \
+ return true; \
+ }; \
+ } while (0)
+
+#define SEQUENCE \
+ do { \
+ WRITE_FULL; \
+ READ_FULL; \
+ WRITE_1; \
+ READ_1; \
+ WRITE_1_READ_1; \
+ WRITE_4_READ_4x; \
+ WRITE_4x_READ_4x; \
+ WRITE_8x_READ_8x; \
+ } while (0)
+
+TEST_CASE("Linear: buffer 8K, record size 32", "[Ring2]")
+{
+ Ring2 buffer(8192);
+ char data[28] = {};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 16K, record size 32", "[Ring2]")
+{
+ Ring2 buffer(16384);
+ char data[28] = {};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, record size 32", "[Ring2]")
+{
+ Ring2 buffer(65536);
+ char data[28] = {};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 8K, record size 256", "[Ring2]")
+{
+ Ring2 buffer(8192);
+ char data[252] = {0};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 16K, record size 256", "[Ring2]")
+{
+ Ring2 buffer(16384);
+ char data[252] = {0};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, record size 256", "[Ring2]")
+{
+ Ring2 buffer(65536);
+ char data[252] = {0};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, odd record size 239", "[Ring2]")
+{
+ Ring2 buffer(65536);
+ char data[239 - 4] = {0};
+
+ SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, odd record size 479", "[Ring2]")
+{
+ Ring2 buffer(65536);
+ char data[479 - 4] = {0};
+
+ SEQUENCE;
+}
+
+#endif
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+// ring2_test.cc author Cisco
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "catch/catch.hpp"
+
+#include <vector>
+
+#include "helpers/ring2.h"
+
+using namespace std;
+
+using Data = string;
+
+template <class T, class... Args>
+static void write(T& writer, Args&& ... records)
+{
+ const vector<Data> items{std::forward<Args>(records)...};
+
+ for (const auto& item : items)
+ {
+ bool res = writer.write(item.data(), item.size());
+ CHECK(true == res);
+ }
+}
+
+template <class T, class... Args>
+static void read(T& reader, Args&& ... args)
+{
+ const vector<Data> expected_items{std::forward<Args>(args)...};
+
+ for (const auto& expected : expected_items)
+ {
+ size_t data_len = 0;
+ const char* data = (const char*)reader.read(data_len);
+ Data actual(data, data_len);
+ CHECK(expected == actual);
+ }
+}
+
+TEST_CASE("Basic", "[Ring2]")
+{
+ Ring2 buffer(1024);
+
+ SECTION("no data")
+ {
+ REQUIRE(1024 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ auto reader = buffer.reader();
+
+ write(writer);
+ read(reader);
+ read(reader, "");
+ read(reader, "", "", "");
+
+ CHECK(1024 == buffer.capacity());
+ CHECK(true == buffer.empty());
+
+ writer.push();
+ reader.pop();
+
+ CHECK(1024 == buffer.capacity());
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("1 element")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+
+ write(writer, "hello");
+ CHECK(true == buffer.empty());
+
+ writer.push();
+ CHECK(false == buffer.empty());
+
+ auto reader = buffer.reader();
+
+ read(reader, "hello");
+ CHECK(false == buffer.empty());
+
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+}
+
+TEST_CASE("Visibility", "[Ring2]")
+{
+ Ring2 buffer(1024);
+
+ SECTION("caching")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ auto reader = buffer.reader();
+
+ write(writer, "0", "1", "2", "3", "4");
+ reader.retry();
+ read(reader, "");
+
+ write(writer, "5", "6", "7", "8", "9");
+ reader.retry();
+ read(reader, "");
+
+ write(writer, "a", "b", "c", "d", "e");
+ reader.retry();
+ read(reader, "");
+
+ write(writer, "f");
+ reader.retry();
+ read(reader, "");
+
+ writer.push();
+ reader.retry();
+ read(reader, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f");
+ }
+
+ SECTION("reader")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ auto reader = buffer.reader();
+
+ write(writer, "hello", " ", "world", "!");
+ writer.push();
+
+ read(reader, "");
+ CHECK(false == buffer.empty());
+
+ reader.retry();
+ read(reader, "hello", " ", "world", "!");
+
+ CHECK(false == buffer.empty());
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("reader incremental")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ auto reader = buffer.reader();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "");
+
+ write(writer, "foo");
+ writer.push();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "foo", "");
+
+ write(writer, "bar");
+ writer.push();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "foo", "bar", "");
+
+ write(writer, "baz");
+ writer.push();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "foo", "bar", "baz", "");
+ reader.pop();
+
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("reader incremental with pop")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ auto reader = buffer.reader();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "");
+
+ write(writer, "foo");
+ writer.push();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "foo", "");
+ reader.pop();
+
+ write(writer, "bar");
+ writer.push();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "bar", "");
+ reader.pop();
+
+ write(writer, "baz");
+ writer.push();
+
+ read(reader, "");
+ reader.retry();
+ read(reader, "baz", "");
+ reader.pop();
+
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("writer incremental")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "foo");
+ writer.push();
+
+ auto r1 = buffer.reader();
+ read(r1, "foo", "");
+
+ writer.retry();
+ write(writer, "bar");
+
+ auto r2 = buffer.reader();
+ read(r2, "foo", "");
+
+ writer.retry();
+ write(writer, "baz");
+
+ auto r3 = buffer.reader();
+ read(r3, "foo", "");
+
+ writer.retry();
+ write(writer, "foo");
+ writer.push();
+
+ auto r4 = buffer.reader();
+ read(r4, "foo", "foo", "");
+ r4.pop();
+
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("writer incremental with push")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+
+ writer.retry();
+ write(writer, "foo");
+ writer.push();
+
+ auto r1 = buffer.reader();
+ read(r1, "foo", "");
+
+ writer.retry();
+ write(writer, "bar");
+ writer.push();
+
+ auto r2 = buffer.reader();
+ read(r2, "foo", "bar", "");
+
+ writer.retry();
+ write(writer, "baz");
+ writer.push();
+
+ auto r3 = buffer.reader();
+ read(r3, "foo", "bar", "baz", "");
+ r3.pop();
+
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("clear")
+ {
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ auto reader = buffer.reader();
+
+ write(writer, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+ writer.push();
+
+ reader.retry();
+ read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+ CHECK(false == buffer.empty());
+
+ reader.retry();
+ read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+ CHECK(false == buffer.empty());
+
+ reader.retry();
+ read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+ CHECK(false == buffer.empty());
+
+ reader.pop();
+ CHECK(true == buffer.empty());
+
+ writer.retry();
+ write(writer, "Foxtrot", "Golf", "Hotel", "India", "Juliette");
+ writer.push();
+
+ CHECK(false == buffer.empty());
+ buffer.clear();
+ CHECK(true == buffer.empty());
+
+ reader.retry();
+ read(reader, "");
+ }
+}
+
+TEST_CASE("Wrapping", "[Ring2]")
+{
+ Ring2 buffer(21);
+
+ SECTION("[LEN][DATA]#(no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding");
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "padding", "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DATA#](no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding");
+
+ CHECK(false == writer.write("123456", 6));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "padding");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DATA#](no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding ");
+
+ CHECK(false == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "padding ");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DA#TA](no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding ");
+
+ CHECK(false == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "padding ");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN]#[DATA](no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding 2345");
+
+ CHECK(false == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "padding 2345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LE#N][DATA](no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding 123456");
+
+ CHECK(false == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "padding 123456");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("#[LEN][DATA](no room)")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "01234567890abcde");
+
+ CHECK(false == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "01234567890abcde");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DATA]#")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "padding");
+ eraser.pop();
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DATA#]")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "padding");
+ eraser.pop();
+
+ CHECK(true == writer.write("123456", 6));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "123456");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DATA#]")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding ");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "padding ");
+ eraser.pop();
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN][DA#TA]")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding ");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "padding ");
+ eraser.pop();
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LEN]#[DATA]")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding 2345");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "padding 2345");
+ eraser.pop();
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("[LE#N][DATA]")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "padding 123456");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "padding 123456");
+ eraser.pop();
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("#[LEN][DATA]")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "01234567890abcde");
+ writer.push();
+ auto eraser = buffer.reader();
+ read(eraser, "01234567890abcde");
+ eraser.pop();
+
+ CHECK(true == writer.write("12345", 5));
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "12345");
+ reader.pop();
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("extended")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto w1 = buffer.writer();
+ auto r1 = buffer.reader();
+
+ write(w1, "0", "1"); // 10 bytes
+ w1.push();
+
+ r1.retry();
+ read(r1, "0", "1", "");
+
+ write(w1, "2", "3"); // 20 bytes
+ w1.push();
+
+ r1.retry();
+ read(r1, "0", "1", "2", "3");
+ r1.pop();
+
+ // wrapping 1
+ auto w2 = buffer.writer();
+ auto r2 = buffer.reader();
+
+ write(w2, "4", "5"); // +10 bytes
+ w2.push();
+
+ r2.retry();
+ read(r2, "4", "5", "");
+
+ // cannot write two records now, as w2 thinks r2 still stays near the end
+ write(w2, "6"); // +15 bytes
+ CHECK(false == w2.write("7", 1));
+ w2.push();
+
+ // The next two lines:
+ // - make it visible the reader passed the end
+ // - writer gets updated that the reader moved away from the end
+ // - this sequence unblocks the case when reader and writer find their pointers close to the end
+ r2.pop();
+ w2.retry();
+
+ write(w2, "7"); // +20 bytes
+ w2.push();
+
+ r2.retry();
+ read(r2, "6", "7");
+ r2.pop();
+
+ // wrapping 2
+ auto w3 = buffer.writer();
+ auto r3 = buffer.reader();
+
+ write(w3, "a", "b"); // ++10 bytes
+ w3.push();
+
+ r3.retry();
+ read(r3, "a", "b", "");
+
+ // cannot write two records now, as w3 thinks r3 still stays near the end
+ write(w3, "c"); // ++15 bytes
+ CHECK(false == w3.write("d", 1));
+ w3.push();
+
+ // unblock
+ r3.pop();
+ w3.retry();
+
+ write(w3, "d"); // ++20 bytes
+ w3.push();
+
+ r3.retry();
+ read(r3, "c", "d");
+ r3.pop();
+ }
+
+ SECTION("data fits")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "0123456789", "ab"); // 14 bytes + 6 bytes more
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "0123456789", "ab");
+ reader.pop();
+
+ writer.retry(); // as the read pointer is on 21 byte, effective capacity is decreased by 1 byte
+ CHECK(false == writer.write("0123456789abcdef", 16));
+ CHECK(true == writer.write("0123456789abcde", 15));
+ writer.push();
+
+ reader.retry();
+ read(reader, "0123456789abcde");
+ reader.pop();
+ }
+
+ SECTION("data doesn't fit: 1st half, 2nd half")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "0123456789"); // 14 bytes
+ CHECK(false == writer.write("abc", 3)); // no place for 7 bytes, otherwise it'd fill entire storage
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "0123456789");
+ reader.pop();
+
+ CHECK(true == buffer.empty()); // now as the storage is empty
+ writer.retry(); // but split into two parts of 14 + 7 bytes
+ CHECK(false == writer.write("0123456789a", 11)); // there is not room for records bigger than that
+ CHECK(false == writer.write("0123456789", 10)); // moreover, a guarding byte won't let the same record to go in
+
+ CHECK(true == writer.write("012345678", 9)); // writer is forced to a smaller record by reader position
+ writer.push(); // consuming 13 bytes this time
+ reader.retry();
+ read(reader, "012345678");
+ reader.pop();
+
+ writer.retry();
+ CHECK(true == writer.write("abc", 3)); // no, we have more place for the same record of 7 bytes
+ writer.push();
+
+ reader.retry();
+ read(reader, "abc");
+ reader.pop();
+ }
+
+ SECTION("Getting writer while it's behind reader")
+ {
+ REQUIRE(21 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+ write(writer, "0123456789", "ab"); // 14 bytes + 6 bytes more
+ writer.push();
+
+ auto reader = buffer.reader();
+ read(reader, "0123456789", "ab");
+ reader.pop();
+
+ write(writer, "1", "2"); // start from beginning
+ writer.push();
+
+ {
+ auto co_writer = buffer.writer();
+ write(co_writer, "3");
+ co_writer.push();
+ }
+
+ reader.retry();
+ read(reader, "1", "2", "3");
+ reader.pop();
+ }
+}
+
+TEST_CASE("Empty buffer", "[Ring2]")
+{
+ Ring2 buffer(0);
+
+ SECTION("writing")
+ {
+ REQUIRE(0 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto writer = buffer.writer();
+
+ CHECK(false == writer.write("", 0));
+ CHECK(false == writer.write("a", 1));
+ CHECK(false == writer.write("foo", 3));
+ writer.push();
+
+ CHECK(false == writer.write("", 0));
+ CHECK(false == writer.write("a", 1));
+ CHECK(false == writer.write("foo", 3));
+ writer.push();
+
+ writer.retry();
+
+ CHECK(false == writer.write("", 0));
+ CHECK(false == writer.write("a", 1));
+ CHECK(false == writer.write("foo", 3));
+ writer.push();
+
+ CHECK(0 == buffer.capacity());
+ CHECK(true == buffer.empty());
+ }
+
+ SECTION("reading")
+ {
+ REQUIRE(0 == buffer.capacity());
+ REQUIRE(true == buffer.empty());
+
+ auto reader = buffer.reader();
+
+ read(reader, "");
+ read(reader, "");
+ read(reader, "");
+ reader.pop();
+
+ read(reader, "");
+ read(reader, "");
+ read(reader, "");
+ reader.pop();
+
+ reader.retry();
+
+ read(reader, "");
+ read(reader, "");
+ read(reader, "");
+ reader.pop();
+
+ CHECK(0 == buffer.capacity());
+ CHECK(true == buffer.empty());
+ }
+}
if (connector != connector_ptr->second.thread_connectors[instance_id])
sc.api->tterm(connector_ptr->second.thread_connectors[instance_id]);
}
-
+
connector_ptr->second.thread_connectors[instance_id] = connector;
break;
}
)
add_library(extractor OBJECT ${FILE_LIST})
+
+add_subdirectory(test)
--- /dev/null
+if (ENABLE_BENCHMARK_TESTS)
+
+ add_catch_test( extractor_benchmark
+ SOURCES
+ ../extractor_csv_logger.cc
+ ../extractor_json_logger.cc
+ ${CMAKE_SOURCE_DIR}/src/helpers/json_stream.cc
+ ${CMAKE_SOURCE_DIR}/src/sfip/sf_ip.cc
+ ${CMAKE_SOURCE_DIR}/src/utils/util_cstring.cc
+ ${CMAKE_SOURCE_DIR}/src/utils/util.cc
+ )
+
+endif(ENABLE_BENCHMARK_TESTS)
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+// extractor_benchmark.cc author Cisco
+
+#ifdef BENCHMARK_TEST
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "catch/catch.hpp"
+
+#include <vector>
+
+#include "network_inspectors/extractor/extractor_csv_logger.h"
+#include "network_inspectors/extractor/extractor_json_logger.h"
+#include "network_inspectors/extractor/extractor_null_conn.h"
+#include "main/snort_config.h"
+
+namespace snort
+{
+void ErrorMessage(const char*, ...) {}
+SnortConfig::SnortConfig(snort::SnortConfig const*, char const*) { thiszone = 0; }
+SnortConfig::~SnortConfig() { }
+const SnortConfig* SnortConfig::get_conf() { static SnortConfig s_config; return &s_config; }
+}
+
+using namespace snort;
+using namespace std;
+
+#define FIELD_STRING \
+ do \
+ { \
+ BENCHMARK("string") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, str); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_SUB_STRING \
+ do \
+ { \
+ BENCHMARK("sub-string") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, str, 16); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_NUMBER \
+ do \
+ { \
+ BENCHMARK("number") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, num); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_TIMESTAMP \
+ do \
+ { \
+ BENCHMARK("timestamp") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, ts); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_SFIP \
+ do \
+ { \
+ BENCHMARK("IP") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, ip); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_BOOLEAN \
+ do \
+ { \
+ BENCHMARK("boolean") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, flag); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_STRING_VECTOR \
+ do \
+ { \
+ BENCHMARK("string vector (x4)") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, v4_str); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_NUMBER_VECTOR \
+ do \
+ { \
+ BENCHMARK("number vector (x4)") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, v4_num); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_BOOLEAN_VECTOR \
+ do \
+ { \
+ BENCHMARK("boolean vector (x4)") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, v4_flag); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_ALL \
+ do \
+ { \
+ BENCHMARK("record (all fields, no vectors)") \
+ { \
+ logger.open_record(); \
+ logger.add_field(field, str); \
+ logger.add_field(field, str, 16); \
+ logger.add_field(field, num); \
+ logger.add_field(field, ts); \
+ logger.add_field(field, ip); \
+ logger.add_field(field, flag); \
+ logger.close_record(id); \
+ return true; \
+ }; \
+ } while (0)
+
+#define FIELD_NONE \
+ do \
+ { \
+ BENCHMARK("empty record (no fields)") \
+ { \
+ logger.open_record(); \
+ logger.close_record(id); \
+ return true; \
+ }; \
+ } while (0)
+
+#define SEQUENCE \
+ do { \
+ const char* str = "0123456789abcdef"; \
+ uint64_t num = 0x12345678abcdef00; \
+ struct timeval ts{}; \
+ SfIp ip{}; \
+ bool flag = false; \
+ vector<const char*> v4_str = {"0123456789abcdef", "0123456789abcdef", "0123456789abcdef", "0123456789abcdef"}; \
+ vector<uint64_t> v4_num = {0x12345678abcdef00, 0x12345678abcdef00, 0x12345678abcdef00, 0x12345678abcdef00}; \
+ vector<bool> v4_flag = {false, true, false, true}; \
+ \
+ FIELD_STRING; \
+ FIELD_SUB_STRING; \
+ FIELD_NUMBER; \
+ FIELD_TIMESTAMP; \
+ FIELD_SFIP; \
+ FIELD_BOOLEAN; \
+ FIELD_STRING_VECTOR; \
+ FIELD_NUMBER_VECTOR; \
+ FIELD_BOOLEAN_VECTOR; \
+ FIELD_ALL; \
+ FIELD_NONE; \
+ } while (0)
+
+
+TEST_CASE("CSV", "[Extractor]")
+{
+ ExtractorNullConnector nil;
+ CsvExtractorLogger logger(&nil, TimeType());
+ const Connector::ID& id = logger.get_id("");
+ const char* field = "test";
+
+ SEQUENCE;
+}
+
+TEST_CASE("TSV", "[Extractor]")
+{
+ ExtractorNullConnector nil;
+ CsvExtractorLogger logger(&nil, TimeType(), '\t');
+ const Connector::ID& id = logger.get_id("");
+ const char* field = "test";
+
+ SEQUENCE;
+}
+
+TEST_CASE("JSON", "[Extractor]")
+{
+ ExtractorNullConnector nil;
+ JsonExtractorLogger logger(&nil, TimeType());
+ const Connector::ID& id = logger.get_id("");
+ const char* field = "test";
+
+ SEQUENCE;
+}
+
+#endif