]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Pull request #4767: Extractor Buffered Printout
authorOleksii Shumeiko -X (oshumeik - SOFTSERVE INC at Cisco) <oshumeik@cisco.com>
Mon, 23 Jun 2025 13:16:19 +0000 (13:16 +0000)
committerOleksii Shumeiko -X (oshumeik - SOFTSERVE INC at Cisco) <oshumeik@cisco.com>
Mon, 23 Jun 2025 13:16:19 +0000 (13:16 +0000)
Merge in SNORT/snort3 from ~OSHUMEIK/snort3:ring2 to master

Squashed commit of the following:

commit 180fa2a60a25000ed386dafd98db053c018a1630
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Wed Jun 18 12:13:52 2025 +0300

    connectors: set affinity for flusher thread

commit e8ab7c14455dd9678fc5cce5e4f1a43e544a3604
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Mon Jun 16 12:44:27 2025 +0300

    connectors: give name to flusher thread

commit ba153a5662ae767d68c06d98a2b4a870965d4758
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Tue May 27 18:23:44 2025 +0300

    extractor: add benchmark tests

commit 1b990e23946fd36b21035f92c8c7d8c16562102e
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Wed May 14 16:50:37 2025 +0300

    connectors: add redirect option to print to a file

commit 9860640b438d9741480382958d6ed2c2207ab271
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Tue May 13 17:41:49 2025 +0300

    connectors: rename text log field

commit bd4f4cd4d5a6b34b1eff92d53f5238bde30b494e
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Wed May 7 18:01:49 2025 +0300

    connectors: rebuild readers as they might be outdated at exit

commit 74b8a422ba86c8b76c6a83ef396558a497d3fe7e
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Wed May 7 17:26:17 2025 +0300

    connectors: guarantee writes for std connector

commit d268bc8b55171a6d7dfd3cd9499f84cb0aff8caa
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Fri Apr 25 18:01:56 2025 +0300

    connectors: add buffered output to std_connector

    New buffer_size option in std_connector.

commit 86e30b13424263c4c29c98e5bce06c0c0cc1c3a0
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Mon Apr 14 10:45:01 2025 +0300

    helpers: add 1-reader-1-writer ring buffer

    Thread safe.
    Supports variable record size.
    Overflow on write drops the new record.
    Benchmark tests added.

16 files changed:
src/connectors/std_connector/CMakeLists.txt
src/connectors/std_connector/dev_notes.txt
src/connectors/std_connector/std_connector.cc
src/connectors/std_connector/std_connector.h
src/connectors/std_connector/std_connector_buffer.cc [new file with mode: 0644]
src/connectors/std_connector/std_connector_buffer.h [new file with mode: 0644]
src/connectors/std_connector/test/std_connector_test.cc
src/helpers/CMakeLists.txt
src/helpers/ring2.h [new file with mode: 0644]
src/helpers/test/CMakeLists.txt
src/helpers/test/ring2_benchmark.cc [new file with mode: 0644]
src/helpers/test/ring2_test.cc [new file with mode: 0644]
src/managers/connector_manager.cc
src/network_inspectors/extractor/CMakeLists.txt
src/network_inspectors/extractor/test/CMakeLists.txt [new file with mode: 0644]
src/network_inspectors/extractor/test/extractor_benchmark.cc [new file with mode: 0644]

index 4c81ef8d18b2630190cb456f128fa72d0a6edd09..64d7f04ed030530f329bcba29a5ddf7e503cd743 100644 (file)
@@ -1,5 +1,7 @@
 
 add_library( std_connector OBJECT
+    std_connector_buffer.cc
+    std_connector_buffer.h
     std_connector.cc
     std_connector.h
 )
index 8f7b83722b406994a549b729b303698120d0477c..4c920216cec260ba52e61201f9b21427f83d0b29 100644 (file)
@@ -18,4 +18,31 @@ of Connectors.
 std_connector pre-configures 3 default connectors:
 * stdout: default transmit connector
 * stdin: default receive connector
-* stdio: default duplex connector
\ No newline at end of file
+* stdio: default duplex connector
+
+---
+
+The connector is able to synchronize printouts from multiple threads. It
+leverages buffers. `Ring2` buffer guarantees asynchronous reading and writing
+operations to the buffer, implementing the following scheme for data path
+"Writer->Buffer->Readers[]".
+
+`StdConnectorBuffer` instance is expected to be unique per Snort configuration.
+It ensures that the 1-1-? scheme above is strictly followed:
+
+* Public part grants `Writer` objects only
++
+1. No way for client code to break the scheme.
+2. Connector instance can acquire a buffer.
+3. Connector writes data to the buffer via Writer.
+4. Connector should abandon Writers before Snort configuration removes `StdConnectorBuffer` instance.
+
+* Private part keeps `Ring2` and `Reader`
++
+1. Two last pieces complete the scheme.
+2. Resource management is encapsulated.
+3. All readers are available in a single place (for the printing thread).
+4. The printing thread builds the list of readers from the actual pool of buffers.
+
+Once connector instance gets a writer, its output becomes buffered and
+synchronized with other instances via `StdConnectorBuffer` instance.
index d30f256acdb2f101d2e97d8064b6b6f31c743ed5..616efa3a7ece34fd6378b8466071f884e0b4b292 100644 (file)
@@ -39,6 +39,7 @@ struct StdConnectorStats
 {
     PegCount messages_received;
     PegCount messages_transmitted;
+    PegCount tx_messages_stalled;
 };
 
 THREAD_LOCAL StdConnectorStats std_connector_stats;
@@ -56,6 +57,12 @@ static const Parameter std_connector_params[] =
     { "direction", Parameter::PT_ENUM, "receive | transmit | duplex", nullptr,
       "usage" },
 
+    { "buffer_size", Parameter::PT_INT, "0:max32", "0",
+     "per-instance buffer size in bytes (0 no buffering, otherwise buffered and synchronized across threads)" },
+
+    { "redirect", Parameter::PT_STRING, nullptr, nullptr,
+     "output file name where printout is redirected" },
+
     { nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
 };
 
@@ -63,6 +70,7 @@ static const PegInfo std_connector_pegs[] =
 {
     { CountType::SUM, "messages_received", "total number of messages received" },
     { CountType::SUM, "messages_transmitted", "total number of messages transmitted" },
+    { CountType::SUM, "messages_stalled", "total number of messages attempted for transmission but overflowed" },
     { CountType::END, nullptr, nullptr }
 };
 
@@ -77,24 +85,26 @@ bool StdConnectorModule::begin(const char* mod, int idx, SnortConfig*)
 {
     if (idx == 0 and strcmp(mod, "std_connector") == 0)
     {
-        auto out_conn = std::make_unique<snort::ConnectorConfig>();
+        auto out_conn = std::make_unique<StdConnectorConfig>();
         out_conn->direction = Connector::CONN_TRANSMIT;
         out_conn->connector_name = "stdout";
+        out_conn->buffer_size = 0;
         config_set.push_back(std::move(out_conn));
 
-        auto in_conn = std::make_unique<snort::ConnectorConfig>();
+        auto in_conn = std::make_unique<StdConnectorConfig>();
         in_conn->direction = Connector::CONN_RECEIVE;
         in_conn->connector_name = "stdin";
         config_set.push_back(std::move(in_conn));
 
-        auto io_conn = std::make_unique<snort::ConnectorConfig>();
+        auto io_conn = std::make_unique<StdConnectorConfig>();
         io_conn->direction = Connector::CONN_DUPLEX;
         io_conn->connector_name = "stdio";
+        io_conn->buffer_size = 0;
         config_set.push_back(std::move(io_conn));
     }
 
     if ( !config )
-        config = std::make_unique<snort::ConnectorConfig>();
+        config = std::make_unique<StdConnectorConfig>();
 
     return true;
 }
@@ -121,6 +131,11 @@ bool StdConnectorModule::set(const char*, Value& v, SnortConfig*)
             return false;
         }
     }
+    else if ( v.is("buffer_size") )
+        config->buffer_size = v.get_uint32();
+    else if ( v.is("redirect") )
+        config->output = v.get_string();
+
     return true;
 }
 
@@ -145,20 +160,70 @@ PegCount* StdConnectorModule::get_counts() const
 // connector stuff
 //-------------------------------------------------------------------------
 
-StdConnector::StdConnector(const snort::ConnectorConfig& conf) :
-    snort::Connector(conf), extr_std_log(TextLog_Init("stdout"))
-{ }
+static StdConnectorBuffer fail_safe_buffer(nullptr);
+static Ring2 fail_safe_ring(0);
+
+StdConnectorCommon::StdConnectorCommon(snort::ConnectorConfig::ConfigSet&& c_s)
+    : ConnectorCommon(std::move(c_s))
+{
+    for (auto& config : config_set)
+    {
+        StdConnectorConfig& std_config = (StdConnectorConfig&)*config;
+
+        if (std_config.buffer_size > 0)
+            std_config.buffer = &buffers.emplace_back(std_config.output.c_str());
+    }
+}
+
+StdConnector::StdConnector(const StdConnectorConfig& conf) :
+    snort::Connector(conf),
+    buffered(conf.buffer and conf.buffer_size != 0),
+    text_log(TextLog_Init(conf.output.c_str(), false)),
+    writer(buffered ? conf.buffer->acquire(conf.buffer_size) : fail_safe_ring.writer()),
+    buffer(conf.buffer ? *conf.buffer : fail_safe_buffer)
+{
+    buffer.start();
+}
 
 StdConnector::~StdConnector()
-{ TextLog_Term(extr_std_log); }
+{
+    TextLog_Term(text_log);
+
+    if (buffered)
+        buffer.release(writer);
+}
 
 bool StdConnector::internal_transmit_message(const ConnectorMsg& msg)
 {
     if ( !msg.get_data() or msg.get_length() == 0 )
         return false;
 
-    return TextLog_Print(extr_std_log, "%.*s\n", msg.get_length(), msg.get_data()) &&
-        ++(std_connector_stats.messages_transmitted);
+    if (!buffered)
+        return TextLog_Print(text_log, "%.*s\n", msg.get_length(), msg.get_data())
+            and ++std_connector_stats.messages_transmitted;
+
+#ifdef REG_TEST
+    const bool safe = false;
+#else
+    const bool safe = true;
+#endif
+
+    bool success = false;
+
+    do
+    {
+        writer.retry();
+        success = writer.write(msg.get_data(), msg.get_length());
+    } while (!success and !safe);
+
+    writer.push();
+
+    if (success)
+        ++std_connector_stats.messages_transmitted;
+    else
+        ++std_connector_stats.tx_messages_stalled;
+
+    return success;
 }
 
 bool StdConnector::transmit_message(const ConnectorMsg& msg, const ID&)
@@ -168,7 +233,14 @@ bool StdConnector::transmit_message(const ConnectorMsg&& msg, const ID&)
 { return internal_transmit_message(msg); }
 
 bool StdConnector::flush()
-{ return TextLog_Flush(extr_std_log); }
+{
+    if (!buffered)
+        return TextLog_Flush(text_log);
+
+    writer.push();
+
+    return true;
+}
 
 ConnectorMsg StdConnector::receive_message(bool)
 {
@@ -194,7 +266,11 @@ static void mod_dtor(Module* m)
 { delete m; }
 
 static Connector* std_connector_tinit(const ConnectorConfig& config)
-{ return new StdConnector(config); }
+{
+    const StdConnectorConfig& conf = (const StdConnectorConfig&)config;
+
+    return new StdConnector(conf);
+}
 
 static void std_connector_tterm(Connector* connector)
 { delete connector; }
@@ -202,7 +278,8 @@ static void std_connector_tterm(Connector* connector)
 static ConnectorCommon* std_connector_ctor(Module* m)
 {
     StdConnectorModule* mod = (StdConnectorModule*)m;
-    ConnectorCommon* std_connector_common = new ConnectorCommon(mod->get_and_clear_config());
+    ConnectorCommon* std_connector_common = new StdConnectorCommon(mod->get_and_clear_config());
+
     return std_connector_common;
 }
 
index f1f37432bb13e3f9fcab88b8c0d046c03b7a3458..124259ab88818508c3a4a54c5b7a958173e7b058 100644 (file)
 #ifndef STD_CONNECTOR_H
 #define STD_CONNECTOR_H
 
+#include <list>
+#include <string>
+
 #include "framework/connector.h"
 #include "framework/module.h"
+#include "helpers/ring2.h"
+
+#include "std_connector_buffer.h"
 
 #define S_NAME "std_connector"
 #define S_HELP "implement the stdout/stdin based connector"
 
 struct TextLog;
 
+//-------------------------------------------------------------------------
+// config
+//-------------------------------------------------------------------------
+
+class StdConnectorConfig : public snort::ConnectorConfig
+{
+public:
+    uint32_t buffer_size = 0;
+    StdConnectorBuffer* buffer = nullptr;
+    std::string output = "stdout";
+};
+
 //-------------------------------------------------------------------------
 // module stuff
 //-------------------------------------------------------------------------
@@ -54,17 +72,26 @@ public:
 
 private:
     snort::ConnectorConfig::ConfigSet config_set;
-    std::unique_ptr<snort::ConnectorConfig> config;
+    std::unique_ptr<StdConnectorConfig> config;
 };
 
 //-------------------------------------------------------------------------
 // connector stuff
 //-------------------------------------------------------------------------
 
+class StdConnectorCommon : public snort::ConnectorCommon
+{
+public:
+    StdConnectorCommon(snort::ConnectorConfig::ConfigSet&&);
+
+private:
+    std::list<StdConnectorBuffer> buffers;
+};
+
 class StdConnector : public snort::Connector
 {
 public:
-    StdConnector(const snort::ConnectorConfig& conf);
+    StdConnector(const StdConnectorConfig& conf);
     ~StdConnector() override;
 
     bool transmit_message(const snort::ConnectorMsg&, const ID& = null) override;
@@ -76,7 +103,10 @@ public:
 private:
     bool internal_transmit_message(const snort::ConnectorMsg&);
 
-    TextLog* extr_std_log;
+    bool buffered;
+    TextLog* text_log;
+    Ring2::Writer writer;
+    StdConnectorBuffer& buffer;
 };
 
 #endif
diff --git a/src/connectors/std_connector/std_connector_buffer.cc b/src/connectors/std_connector/std_connector_buffer.cc
new file mode 100644 (file)
index 0000000..9bfd9b9
--- /dev/null
@@ -0,0 +1,204 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// std_connector_buffer.cc author Cisco
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "std_connector_buffer.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "log/text_log.h"
+#include "main/snort.h"
+#include "main/snort_config.h"
+#include "main/thread_config.h"
+#include "utils/util.h"
+
+using namespace snort;
+using namespace std;
+
+#define FLUSHER_THREAD_NAME "std_connector.flusher"
+
+static void flusher(const SnortConfig* initial_config, const char* output,
+    mutex& rings_mutex, list<Ring2>& rings, atomic_flag& latest, atomic_flag& run)
+{
+    ThreadConfig *thread_config = initial_config->thread_config;
+    thread_config->implement_named_thread_affinity(FLUSHER_THREAD_NAME);
+    SET_THREAD_NAME(pthread_self(), FLUSHER_THREAD_NAME);
+
+    SnortConfig local_config;
+    local_config.merge(initial_config);
+    SnortConfig::set_conf(&local_config);
+
+    TextLog* out = TextLog_Init(output, false);
+    list<Ring2::Reader> readers;
+
+    latest.clear();
+
+    while (run.test_and_set())
+    {
+        if (!rings_mutex.try_lock())
+            continue;
+
+        if (!latest.test_and_set())
+        {
+            readers.clear();
+            for (auto& ring : rings)
+                readers.emplace_back(ring.reader());
+        }
+
+        for (auto& reader : readers)
+        {
+            reader.retry();
+
+            size_t data_len = 0;
+            void* data = nullptr;
+
+            while ((data = reader.read(data_len)) and data_len > 0)
+                TextLog_Print(out, "%.*s\n", (int)data_len, (char*)data);
+
+            reader.pop();
+        }
+
+        rings_mutex.unlock();
+
+        TextLog_Flush(out);
+
+        this_thread::yield();
+    }
+
+    {
+        lock_guard<mutex> lock(rings_mutex);
+
+        if (!latest.test_and_set())
+        {
+            readers.clear();
+            for (auto& ring : rings)
+                readers.emplace_back(ring.reader());
+        }
+
+        for (auto& reader : readers)
+        {
+            reader.retry();
+
+            size_t data_len = 0;
+            void* data = nullptr;
+
+            while ((data = reader.read(data_len)) and data_len > 0)
+                TextLog_Print(out, "%.*s\n", (int)data_len, (char*)data);
+
+            reader.pop();
+        }
+    }
+
+    TextLog_Flush(out);
+    TextLog_Term(out);
+    SnortConfig::set_conf(nullptr);
+}
+
+StdConnectorBuffer::StdConnectorBuffer(const char* output)
+{
+    if (!output)
+        return;
+
+    destination = output;
+}
+
+StdConnectorBuffer::~StdConnectorBuffer()
+{
+    sink_run.clear();
+
+    if (sink)
+        sink->join();
+
+    delete sink;
+}
+
+void StdConnectorBuffer::start()
+{
+    scoped_lock<mutex, mutex> lock(start_mutex, rings_mutex);
+
+    if (sink)
+        return;
+
+    if (destination.empty())
+        return;
+
+    auto sc = SnortConfig::get_conf();
+
+    if (!sc)
+        return;
+
+    sink_latest.test_and_set();
+    sink_run.test_and_set();
+    // coverity[missing_lock]
+    sink = new thread(flusher, sc, destination.c_str(), ref(rings_mutex), ref(rings), ref(sink_latest), ref(sink_run));
+
+    while (sink_latest.test_and_set());
+}
+
+Ring2::Writer StdConnectorBuffer::acquire(size_t buffer_size)
+{
+    lock_guard<mutex> lock(rings_mutex);
+
+    rings.emplace_back(buffer_size);
+
+    sink_latest.clear();
+
+    return rings.back().writer();
+}
+
+bool StdConnectorBuffer::release(const Ring2::Writer& writer)
+{
+    lock_guard<mutex> lock(rings_mutex);
+
+    // check for removed rings if they can be deleted
+    bool updated = false;
+    auto ring_removed = rings_removed.begin();
+
+    while (ring_removed != rings_removed.end())
+    {
+        if ((*ring_removed)->empty())
+        {
+            auto ring = *ring_removed;
+            rings.erase(ring);
+            ring_removed = rings_removed.erase(ring_removed);
+            updated = true;
+        }
+        else
+            ring_removed++;
+    }
+
+    if (updated)
+        sink_latest.clear();
+
+    // mark the ring for deletion
+    auto ring = find_if(rings.begin(), rings.end(),
+        [&](const Ring2& r) { return r.native(writer); });
+
+    if (ring == rings.end())
+        return false;
+
+    rings_removed.push_back(ring);
+
+    return true;
+}
diff --git a/src/connectors/std_connector/std_connector_buffer.h b/src/connectors/std_connector/std_connector_buffer.h
new file mode 100644 (file)
index 0000000..f755c26
--- /dev/null
@@ -0,0 +1,57 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// std_connector_buffer.h author Cisco
+
+#ifndef STD_CONNECTOR_BUFFER_H
+#define STD_CONNECTOR_BUFFER_H
+
+#include <atomic>
+#include <list>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "helpers/ring2.h"
+
+struct TextLog;
+
+class StdConnectorBuffer
+{
+public:
+    StdConnectorBuffer(const char* output);
+    ~StdConnectorBuffer();
+
+    void start();
+
+    Ring2::Writer acquire(size_t buffer_size);
+    bool release(const Ring2::Writer&);
+
+private:
+    std::string destination;
+    std::mutex start_mutex;
+
+    std::mutex rings_mutex;
+    std::list<Ring2> rings;
+    std::list<std::list<Ring2>::iterator> rings_removed;
+    std::thread* sink{nullptr};
+    std::atomic_flag sink_latest{false};
+    std::atomic_flag sink_run{false};
+};
+
+#endif
index edccaf1b958c2433317d91f8cf27ef046686c37c..405fdfeb442bfe201a096040ed3ea7514148565d 100644 (file)
 #include <CppUTest/CommandLineTestRunner.h>
 #include <CppUTest/TestHarness.h>
 
-using namespace snort;
+StdConnectorBuffer::StdConnectorBuffer(const char*) {}
+StdConnectorBuffer::~StdConnectorBuffer() {}
+void StdConnectorBuffer::start() {}
+Ring2::Writer StdConnectorBuffer::acquire(unsigned long) { return Ring2(0).writer(); }
+bool StdConnectorBuffer::release(Ring2::Writer const&) { return false; }
 
+using namespace snort;
 
 extern const BaseApi* std_connector;
 const ConnectorApi* stdc_api = nullptr;
 
-ConnectorConfig connector_transmit_config;
-ConnectorConfig connector_receive_config;
+StdConnectorConfig connector_transmit_config;
+StdConnectorConfig connector_receive_config;
 
 Module* mod;
 
index a94f72d2c214a41d25f582591bbbfc50e4df5d5c..7d1a32d1fa3133747f133f3949dbe63042f2bc8b 100644 (file)
@@ -23,6 +23,7 @@ set (HELPERS_INCLUDES
     json_stream.h
     literal_search.h
     memcap_allocator.h
+    ring2.h
     scratch_allocator.h
     sigsafe.h
     utf.h
diff --git a/src/helpers/ring2.h b/src/helpers/ring2.h
new file mode 100644 (file)
index 0000000..234ab7a
--- /dev/null
@@ -0,0 +1,286 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+// ring2.h author Cisco
+
+#ifndef RING2_H
+#define RING2_H
+
+#include <atomic>
+#include <cassert>
+#include <cstring>
+
+// Ring buffer implementation with the following requirements:
+//     - stored objects are contiguous in memory
+//     - stored record is length-value pair, where length is private
+//     - 1 reader 1 writer are supported
+//     - reader and writer can be in different threads, but their ops are still thread-local
+//     - in case of overflow the new object is dropped (no overwrites)
+//     - FIFO
+//     - read operation grants direct access (no copy)
+//     - container may contain gaps (not reaching full capacity)
+
+class Ring2
+{
+public:
+    struct Writer
+    {
+        Writer(Writer&& w) : ptr(w.ptr), end(w.end), store(w.store), no_turns(w.no_turns) {}
+
+        void retry();                       // reinitialize writer
+        bool write(const void*, size_t);    // adds another record
+        void push();                        // publishes new records
+
+    private:
+        friend Ring2;
+
+        Writer(Ring2& owner, uint8_t* cursor, uint8_t* guard, bool no_wrapping)
+            : ptr(cursor), end(guard), store(owner), no_turns(no_wrapping) {}
+
+        Writer& operator=(Writer&&);
+
+        uint8_t* ptr;
+        uint8_t* end;
+        Ring2& store;
+        bool no_turns;
+    };
+
+    struct Reader
+    {
+        Reader(Reader&& r ) : ptr(r.ptr), end(r.end), store(r.store), more_turns(r.more_turns) {}
+
+        void retry();                       // reinitialize reader
+        void* read(size_t&);                // gets next record
+        void pop();                         // releases read records
+
+    private:
+        friend Ring2;
+
+        Reader(Ring2& owner, uint8_t* cursor, uint8_t* guard, bool wrapping)
+            : ptr(cursor), end(guard), store(owner), more_turns(wrapping) {}
+
+        Reader& operator=(Reader&&);
+
+        uint8_t* ptr;
+        uint8_t* end;
+        Ring2& store;
+        bool more_turns;
+    };
+
+    Ring2(size_t);
+    ~Ring2();
+
+    Ring2(const Ring2&) = delete;
+    Ring2& operator =(const Ring2&) = delete;
+
+    size_t capacity() const;
+    bool empty() const;
+    void clear();                           // invalidates Reader and Writer unless they retry
+
+    Writer writer();
+    Reader reader();
+
+    bool native(const Writer& w) const
+    { return this == &w.store; }
+
+    bool native(const Reader& r) const
+    { return this == &r.store; }
+
+private:
+    using LEN_TYPE = uint32_t;
+    static constexpr auto LEN_SIZE = sizeof(LEN_TYPE);
+
+    uint8_t* const store;
+    uint8_t* const store_end;
+    std::atomic<uint8_t*> write_ptr;
+    std::atomic<uint8_t*> read_ptr;
+};
+
+inline Ring2::Ring2(size_t buffer_size)
+    : store(new uint8_t[buffer_size]), store_end(store + buffer_size)
+{
+    write_ptr.store(store, std::memory_order_relaxed);
+    read_ptr.store(store, std::memory_order_relaxed);
+}
+
+inline Ring2::~Ring2()
+{
+    delete[] store;
+}
+
+inline size_t Ring2::capacity() const
+{
+    return store_end - store;
+}
+
+inline bool Ring2::empty() const
+{
+    auto w = write_ptr.load(std::memory_order_acquire);
+    auto r = read_ptr.load(std::memory_order_acquire);
+
+    return w == r;
+}
+
+inline void Ring2::clear()
+{
+    write_ptr.store(store, std::memory_order_relaxed);
+    read_ptr.store(store, std::memory_order_relaxed);
+}
+
+inline Ring2::Writer Ring2::writer()
+{
+    auto w = write_ptr.load(std::memory_order_acquire);
+    auto r = read_ptr.load(std::memory_order_acquire);
+
+    auto no_wrapping = w < r;
+    auto guard = no_wrapping ? r : store_end;
+
+    return {*this, w, guard, no_wrapping};
+}
+
+inline Ring2::Reader Ring2::reader()
+{
+    auto w = write_ptr.load(std::memory_order_acquire);
+    auto r = read_ptr.load(std::memory_order_acquire);
+
+    auto wrapping = r > w;
+    auto guard = wrapping ? store_end : w;
+
+    return {*this, r, guard, wrapping};
+}
+
+inline bool Ring2::Writer::write(const void* const data, const size_t data_len)
+{
+    // empty records are not allowed
+    if (data_len == 0)
+        return false;
+
+    // a guarding byte, writer should never move to the very end
+    // (but reader can, which will empty the store)
+    if (ptr + LEN_SIZE + data_len < end)
+    {
+        // normal case
+        *(LEN_TYPE*)ptr = data_len;
+        memcpy(ptr + LEN_SIZE, data, data_len);
+        ptr += LEN_SIZE + data_len;
+        return true;
+    }
+    else if (no_turns)
+    {
+        // overflow
+        return false;
+    }
+
+    // wrapping case
+    const auto zero_record = (ptr + LEN_SIZE <= end);
+    const auto ptr_orig = ptr;
+    const auto end_orig = end;
+
+    no_turns = true;
+    ptr = store.store;
+    end = store.read_ptr.load(std::memory_order_acquire);
+
+    auto res = ptr != end and write(data, data_len);
+
+    if (res)
+    {
+        if (zero_record)
+            *(LEN_TYPE*)ptr_orig = 0;
+    }
+    else
+    {
+        no_turns = false;
+        ptr = ptr_orig;
+        end = end_orig;
+    }
+
+    return res;
+}
+
+inline void Ring2::Writer::retry()
+{
+    *this = store.writer();
+}
+
+inline void Ring2::Writer::push()
+{
+    store.write_ptr.store(ptr, std::memory_order_release);
+}
+
+// cppcheck-suppress operatorEqVarError
+inline Ring2::Writer& Ring2::Writer::operator =(Ring2::Writer&& other)
+{
+    ptr = other.ptr;
+    end = other.end;
+    no_turns = other.no_turns;
+
+    return *this;
+}
+
+inline void* Ring2::Reader::read(size_t& data_len)
+{
+    data_len = 0;
+
+    // next record size
+    if (ptr + LEN_SIZE <= end)
+        data_len = *(LEN_TYPE*)ptr;
+
+    if (data_len != 0 and ptr + LEN_SIZE + data_len <= end)
+    {
+        // normal case
+        ptr += LEN_SIZE;
+        auto data = ptr;
+        ptr += data_len;
+
+        return data;
+    }
+    else if (more_turns)
+    {
+        // wrapping case
+        more_turns = false;
+        ptr = store.store;
+        end = store.write_ptr.load(std::memory_order_acquire);
+
+        return read(data_len);
+    }
+
+    // underflow
+    assert(data_len == 0);
+    return nullptr;
+}
+
+inline void Ring2::Reader::retry()
+{
+    *this = store.reader();
+}
+
+inline void Ring2::Reader::pop()
+{
+    store.read_ptr.store(ptr, std::memory_order_release);
+}
+
+// cppcheck-suppress operatorEqVarError
+inline Ring2::Reader& Ring2::Reader::operator =(Ring2::Reader&& other)
+{
+    ptr = other.ptr;
+    end = other.end;
+    more_turns = other.more_turns;
+
+    return *this;
+}
+
+#endif
index 1e41ac818f63d9e2348af59c36cbe517879c5e93..c8bf69f3dc8845a70ada942b542cd73a33067066 100644 (file)
@@ -40,3 +40,16 @@ add_catch_test( streambuf_test
         ../streambuf.cc
 )
 
+add_catch_test( ring2_test
+    SOURCES
+        ../ring2.h
+)
+
+if (ENABLE_BENCHMARK_TESTS)
+
+    add_catch_test( ring2_benchmark
+        SOURCES
+            ../ring2.h
+    )
+
+endif(ENABLE_BENCHMARK_TESTS)
diff --git a/src/helpers/test/ring2_benchmark.cc b/src/helpers/test/ring2_benchmark.cc
new file mode 100644 (file)
index 0000000..beb0f06
--- /dev/null
@@ -0,0 +1,284 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+// ring2_benchmark.cc author Cisco
+
+#ifdef BENCHMARK_TEST
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "catch/catch.hpp"
+
+#include <vector>
+
+#include "helpers/ring2.h"
+
+#define WRITE_FULL                                      \
+    do {                                                \
+        buffer.clear();                                 \
+        auto writer = buffer.writer();                  \
+                                                        \
+        BENCHMARK("full write")                         \
+        {                                               \
+            while(writer.write(data, sizeof(data)));    \
+            writer.retry();                             \
+                                                        \
+            return true;                                \
+        };                                              \
+    } while (0)
+
+#define READ_FULL                                   \
+    do {                                            \
+        buffer.clear();                             \
+        auto writer = buffer.writer();              \
+        auto reader = buffer.reader();              \
+        size_t len = 0;                             \
+                                                    \
+        while (writer.write(data, sizeof(data)));   \
+        writer.push();                              \
+                                                    \
+        BENCHMARK("full read")                      \
+        {                                           \
+            while(reader.read(len));                \
+            reader.retry();                         \
+                                                    \
+            return true;                            \
+        };                                          \
+    } while (0)
+
+#define WRITE_1                                                 \
+    do {                                                        \
+        buffer.clear();                                         \
+        auto writer = buffer.writer();                          \
+                                                                \
+        BENCHMARK("1 write")                                    \
+        {                                                       \
+            auto accepted = writer.write(data, sizeof(data));   \
+                                                                \
+            if (!accepted)                                      \
+                writer.retry();                                 \
+                                                                \
+            return true;                                        \
+        };                                                      \
+    } while (0)
+
+#define READ_1                                      \
+    do {                                            \
+        buffer.clear();                             \
+        auto writer = buffer.writer();              \
+        auto reader = buffer.reader();              \
+        size_t len = 0;                             \
+                                                    \
+        while (writer.write(data, sizeof(data)));   \
+        writer.push();                              \
+                                                    \
+        BENCHMARK("1 read")                         \
+        {                                           \
+            auto read = reader.read(len);           \
+                                                    \
+            if (!read)                              \
+                reader.retry();                     \
+                                                    \
+            return true;                            \
+        };                                          \
+    } while (0)
+
+#define WRITE_1_READ_1                                  \
+    do {                                                \
+        buffer.clear();                                 \
+        auto writer = buffer.writer();                  \
+        auto reader = buffer.reader();                  \
+        size_t len = 0;                                 \
+                                                        \
+        BENCHMARK("1 write 1 read")                     \
+        {                                               \
+            auto r1 = writer.write(data, sizeof(data)); \
+            writer.push();                              \
+                                                        \
+            auto r2 = reader.read(len);                 \
+            reader.pop();                               \
+                                                        \
+            return r1 and r2 and len;                   \
+        };                                              \
+    } while (0)
+
+
+#define WRITE_4_READ_4x                         \
+    do {                                        \
+        buffer.clear();                         \
+        auto writer = buffer.writer();          \
+        auto reader = buffer.reader();          \
+        size_t len = 0;                         \
+                                                \
+        BENCHMARK("4 writes 4x reads")          \
+        {                                       \
+            writer.write(data, sizeof(data));   \
+            writer.push();                      \
+            writer.write(data, sizeof(data));   \
+            writer.push();                      \
+            writer.write(data, sizeof(data));   \
+            writer.push();                      \
+            writer.write(data, sizeof(data));   \
+            writer.push();                      \
+                                                \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.pop();                       \
+                                                \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define WRITE_4x_READ_4x                        \
+    do {                                        \
+        buffer.clear();                         \
+        auto writer = buffer.writer();          \
+        auto reader = buffer.reader();          \
+        size_t len = 0;                         \
+                                                \
+        BENCHMARK("4x writes 4x reads")         \
+        {                                       \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.push();                      \
+                                                \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.pop();                       \
+                                                \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define WRITE_8x_READ_8x                        \
+    {                                           \
+        buffer.clear();                         \
+        auto writer = buffer.writer();          \
+        auto reader = buffer.reader();          \
+        size_t len = 0;                         \
+                                                \
+        BENCHMARK("8x writes 8x reads")         \
+        {                                       \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.write(data, sizeof(data));   \
+            writer.push();                      \
+                                                \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.read(len);                   \
+            reader.pop();                       \
+                                                \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define SEQUENCE                                \
+    do {                                        \
+        WRITE_FULL;                             \
+        READ_FULL;                              \
+        WRITE_1;                                \
+        READ_1;                                 \
+        WRITE_1_READ_1;                         \
+        WRITE_4_READ_4x;                        \
+        WRITE_4x_READ_4x;                       \
+        WRITE_8x_READ_8x;                       \
+    } while (0)
+
+TEST_CASE("Linear: buffer 8K, record size 32", "[Ring2]")
+{
+    Ring2 buffer(8192);
+    char data[28] = {};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 16K, record size 32", "[Ring2]")
+{
+    Ring2 buffer(16384);
+    char data[28] = {};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, record size 32", "[Ring2]")
+{
+    Ring2 buffer(65536);
+    char data[28] = {};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 8K, record size 256", "[Ring2]")
+{
+    Ring2 buffer(8192);
+    char data[252] = {0};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 16K, record size 256", "[Ring2]")
+{
+    Ring2 buffer(16384);
+    char data[252] = {0};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, record size 256", "[Ring2]")
+{
+    Ring2 buffer(65536);
+    char data[252] = {0};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, odd record size 239", "[Ring2]")
+{
+    Ring2 buffer(65536);
+    char data[239 - 4] = {0};
+
+    SEQUENCE;
+}
+
+TEST_CASE("Linear: buffer 64K, odd record size 479", "[Ring2]")
+{
+    Ring2 buffer(65536);
+    char data[479 - 4] = {0};
+
+    SEQUENCE;
+}
+
+#endif
diff --git a/src/helpers/test/ring2_test.cc b/src/helpers/test/ring2_test.cc
new file mode 100644 (file)
index 0000000..384dc89
--- /dev/null
@@ -0,0 +1,830 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+// ring2_test.cc author Cisco
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "catch/catch.hpp"
+
+#include <vector>
+
+#include "helpers/ring2.h"
+
+using namespace std;
+
+using Data = string;
+
+template <class T, class... Args>
+static void write(T& writer, Args&& ... records)
+{
+    const vector<Data> items{std::forward<Args>(records)...};
+
+    for (const auto& item : items)
+    {
+        bool res = writer.write(item.data(), item.size());
+        CHECK(true == res);
+    }
+}
+
+template <class T, class... Args>
+static void read(T& reader, Args&& ... args)
+{
+    const vector<Data> expected_items{std::forward<Args>(args)...};
+
+    for (const auto& expected : expected_items)
+    {
+        size_t data_len = 0;
+        const char* data = (const char*)reader.read(data_len);
+        Data actual(data, data_len);
+        CHECK(expected == actual);
+    }
+}
+
+TEST_CASE("Basic", "[Ring2]")
+{
+    Ring2 buffer(1024);
+
+    SECTION("no data")
+    {
+        REQUIRE(1024 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        auto reader = buffer.reader();
+
+        write(writer);
+        read(reader);
+        read(reader, "");
+        read(reader, "", "", "");
+
+        CHECK(1024 == buffer.capacity());
+        CHECK(true == buffer.empty());
+
+        writer.push();
+        reader.pop();
+
+        CHECK(1024 == buffer.capacity());
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("1 element")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+
+        write(writer, "hello");
+        CHECK(true == buffer.empty());
+
+        writer.push();
+        CHECK(false == buffer.empty());
+
+        auto reader = buffer.reader();
+
+        read(reader, "hello");
+        CHECK(false == buffer.empty());
+
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+}
+
+TEST_CASE("Visibility", "[Ring2]")
+{
+    Ring2 buffer(1024);
+
+    SECTION("caching")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        auto reader = buffer.reader();
+
+        write(writer, "0", "1", "2", "3", "4");
+        reader.retry();
+        read(reader, "");
+
+        write(writer, "5", "6", "7", "8", "9");
+        reader.retry();
+        read(reader, "");
+
+        write(writer, "a", "b", "c", "d", "e");
+        reader.retry();
+        read(reader, "");
+
+        write(writer, "f");
+        reader.retry();
+        read(reader, "");
+
+        writer.push();
+        reader.retry();
+        read(reader, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f");
+    }
+
+    SECTION("reader")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        auto reader = buffer.reader();
+
+        write(writer, "hello", " ", "world", "!");
+        writer.push();
+
+        read(reader, "");
+        CHECK(false == buffer.empty());
+
+        reader.retry();
+        read(reader, "hello", " ", "world", "!");
+
+        CHECK(false == buffer.empty());
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("reader incremental")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        auto reader = buffer.reader();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "");
+
+        write(writer, "foo");
+        writer.push();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "foo", "");
+
+        write(writer, "bar");
+        writer.push();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "foo", "bar", "");
+
+        write(writer, "baz");
+        writer.push();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "foo", "bar", "baz", "");
+        reader.pop();
+
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("reader incremental with pop")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        auto reader = buffer.reader();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "");
+
+        write(writer, "foo");
+        writer.push();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "foo", "");
+        reader.pop();
+
+        write(writer, "bar");
+        writer.push();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "bar", "");
+        reader.pop();
+
+        write(writer, "baz");
+        writer.push();
+
+        read(reader, "");
+        reader.retry();
+        read(reader, "baz", "");
+        reader.pop();
+
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("writer incremental")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "foo");
+        writer.push();
+
+        auto r1 = buffer.reader();
+        read(r1, "foo", "");
+
+        writer.retry();
+        write(writer, "bar");
+
+        auto r2 = buffer.reader();
+        read(r2, "foo", "");
+
+        writer.retry();
+        write(writer, "baz");
+
+        auto r3 = buffer.reader();
+        read(r3, "foo", "");
+
+        writer.retry();
+        write(writer, "foo");
+        writer.push();
+
+        auto r4 = buffer.reader();
+        read(r4, "foo", "foo", "");
+        r4.pop();
+
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("writer incremental with push")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+
+        writer.retry();
+        write(writer, "foo");
+        writer.push();
+
+        auto r1 = buffer.reader();
+        read(r1, "foo", "");
+
+        writer.retry();
+        write(writer, "bar");
+        writer.push();
+
+        auto r2 = buffer.reader();
+        read(r2, "foo", "bar", "");
+
+        writer.retry();
+        write(writer, "baz");
+        writer.push();
+
+        auto r3 = buffer.reader();
+        read(r3, "foo", "bar", "baz", "");
+        r3.pop();
+
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("clear")
+    {
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        auto reader = buffer.reader();
+
+        write(writer, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+        writer.push();
+
+        reader.retry();
+        read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+        CHECK(false == buffer.empty());
+
+        reader.retry();
+        read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+        CHECK(false == buffer.empty());
+
+        reader.retry();
+        read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo");
+        CHECK(false == buffer.empty());
+
+        reader.pop();
+        CHECK(true == buffer.empty());
+
+        writer.retry();
+        write(writer, "Foxtrot", "Golf", "Hotel", "India", "Juliette");
+        writer.push();
+
+        CHECK(false == buffer.empty());
+        buffer.clear();
+        CHECK(true == buffer.empty());
+
+        reader.retry();
+        read(reader, "");
+    }
+}
+
+TEST_CASE("Wrapping", "[Ring2]")
+{
+    Ring2 buffer(21);
+
+    SECTION("[LEN][DATA]#(no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding");
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "padding", "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DATA#](no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding");
+
+        CHECK(false == writer.write("123456", 6));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "padding");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DATA#](no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding ");
+
+        CHECK(false == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "padding ");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DA#TA](no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding   ");
+
+        CHECK(false == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "padding   ");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN]#[DATA](no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding 2345");
+
+        CHECK(false == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "padding 2345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LE#N][DATA](no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding 123456");
+
+        CHECK(false == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "padding 123456");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("#[LEN][DATA](no room)")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "01234567890abcde");
+
+        CHECK(false == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "01234567890abcde");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DATA]#")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "padding");
+        eraser.pop();
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DATA#]")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "padding");
+        eraser.pop();
+
+        CHECK(true == writer.write("123456", 6));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "123456");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DATA#]")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding ");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "padding ");
+        eraser.pop();
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN][DA#TA]")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding   ");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "padding   ");
+        eraser.pop();
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LEN]#[DATA]")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding 2345");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "padding 2345");
+        eraser.pop();
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("[LE#N][DATA]")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "padding 123456");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "padding 123456");
+        eraser.pop();
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("#[LEN][DATA]")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "01234567890abcde");
+        writer.push();
+        auto eraser = buffer.reader();
+        read(eraser, "01234567890abcde");
+        eraser.pop();
+
+        CHECK(true == writer.write("12345", 5));
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "12345");
+        reader.pop();
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("extended")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto w1 = buffer.writer();
+        auto r1 = buffer.reader();
+
+        write(w1, "0", "1");                // 10 bytes
+        w1.push();
+
+        r1.retry();
+        read(r1, "0", "1", "");
+
+        write(w1, "2", "3");                // 20 bytes
+        w1.push();
+
+        r1.retry();
+        read(r1, "0", "1", "2", "3");
+        r1.pop();
+
+        // wrapping 1
+        auto w2 = buffer.writer();
+        auto r2 = buffer.reader();
+
+        write(w2, "4", "5");                // +10 bytes
+        w2.push();
+
+        r2.retry();
+        read(r2, "4", "5", "");
+
+        // cannot write two records now, as w2 thinks r2 still stays near the end
+        write(w2, "6");                     // +15 bytes
+        CHECK(false == w2.write("7", 1));
+        w2.push();
+
+        // The next two lines:
+        //     - make it visible the reader passed the end
+        //     - writer gets updated that the reader moved away from the end
+        //     - this sequence unblocks the case when reader and writer find their pointers close to the end
+        r2.pop();
+        w2.retry();
+
+        write(w2, "7");                     // +20 bytes
+        w2.push();
+
+        r2.retry();
+        read(r2, "6", "7");
+        r2.pop();
+
+        // wrapping 2
+        auto w3 = buffer.writer();
+        auto r3 = buffer.reader();
+
+        write(w3, "a", "b");                // ++10 bytes
+        w3.push();
+
+        r3.retry();
+        read(r3, "a", "b", "");
+
+        // cannot write two records now, as w3 thinks r3 still stays near the end
+        write(w3, "c");                     // ++15 bytes
+        CHECK(false == w3.write("d", 1));
+        w3.push();
+
+        // unblock
+        r3.pop();
+        w3.retry();
+
+        write(w3, "d");                     // ++20 bytes
+        w3.push();
+
+        r3.retry();
+        read(r3, "c", "d");
+        r3.pop();
+    }
+
+    SECTION("data fits")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "0123456789", "ab");  // 14 bytes + 6 bytes more
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "0123456789", "ab");
+        reader.pop();
+
+        writer.retry(); // as the read pointer is on 21 byte, effective capacity is decreased by 1 byte
+        CHECK(false == writer.write("0123456789abcdef", 16));
+        CHECK(true == writer.write("0123456789abcde", 15));
+        writer.push();
+
+        reader.retry();
+        read(reader, "0123456789abcde");
+        reader.pop();
+    }
+
+    SECTION("data doesn't fit: 1st half, 2nd half")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "0123456789");            // 14 bytes
+        CHECK(false == writer.write("abc", 3)); // no place for 7 bytes, otherwise it'd fill entire storage
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "0123456789");
+        reader.pop();
+
+        CHECK(true == buffer.empty());                 // now as the storage is empty
+        writer.retry();                                // but split into two parts of 14 + 7 bytes
+        CHECK(false == writer.write("0123456789a", 11)); // there is not room for records bigger than that
+        CHECK(false == writer.write("0123456789", 10));  // moreover, a guarding byte won't let the same record to go in
+
+        CHECK(true == writer.write("012345678", 9));   // writer is forced to a smaller record by reader position
+        writer.push();                                 // consuming 13 bytes this time
+        reader.retry();
+        read(reader, "012345678");
+        reader.pop();
+
+        writer.retry();
+        CHECK(true == writer.write("abc", 3));         // no, we have more place for the same record of 7 bytes
+        writer.push();
+
+        reader.retry();
+        read(reader, "abc");
+        reader.pop();
+    }
+
+    SECTION("Getting writer while it's behind reader")
+    {
+        REQUIRE(21 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+        write(writer, "0123456789", "ab");  // 14 bytes + 6 bytes more
+        writer.push();
+
+        auto reader = buffer.reader();
+        read(reader, "0123456789", "ab");
+        reader.pop();
+
+        write(writer, "1", "2");  // start from beginning
+        writer.push();
+
+        {
+            auto co_writer = buffer.writer();
+            write(co_writer, "3");
+            co_writer.push();
+        }
+
+        reader.retry();
+        read(reader, "1", "2", "3");
+        reader.pop();
+    }
+}
+
+TEST_CASE("Empty buffer", "[Ring2]")
+{
+    Ring2 buffer(0);
+
+    SECTION("writing")
+    {
+        REQUIRE(0 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto writer = buffer.writer();
+
+        CHECK(false == writer.write("", 0));
+        CHECK(false == writer.write("a", 1));
+        CHECK(false == writer.write("foo", 3));
+        writer.push();
+
+        CHECK(false == writer.write("", 0));
+        CHECK(false == writer.write("a", 1));
+        CHECK(false == writer.write("foo", 3));
+        writer.push();
+
+        writer.retry();
+
+        CHECK(false == writer.write("", 0));
+        CHECK(false == writer.write("a", 1));
+        CHECK(false == writer.write("foo", 3));
+        writer.push();
+
+        CHECK(0 == buffer.capacity());
+        CHECK(true == buffer.empty());
+    }
+
+    SECTION("reading")
+    {
+        REQUIRE(0 == buffer.capacity());
+        REQUIRE(true == buffer.empty());
+
+        auto reader = buffer.reader();
+
+        read(reader, "");
+        read(reader, "");
+        read(reader, "");
+        reader.pop();
+
+        read(reader, "");
+        read(reader, "");
+        read(reader, "");
+        reader.pop();
+
+        reader.retry();
+
+        read(reader, "");
+        read(reader, "");
+        read(reader, "");
+        reader.pop();
+
+        CHECK(0 == buffer.capacity());
+        CHECK(true == buffer.empty());
+    }
+}
index e0fc0bcb90cb7f5d88ed7488b534c9a4539d2eb4..44ad4b87790695b4775a8504343d7746cc4f4436 100644 (file)
@@ -126,7 +126,7 @@ void ConnectorManager::update_thread_connector(const std::string& connector_name
                 if (connector != connector_ptr->second.thread_connectors[instance_id])
                     sc.api->tterm(connector_ptr->second.thread_connectors[instance_id]);
             }
-            
+
             connector_ptr->second.thread_connectors[instance_id] = connector;
             break;
         }
index 9cfd0b2711b86c91e034b73b678b39c2367c7b08..46f8dc36b5611e8f448dc4d8fe6f86749b24645f 100644 (file)
@@ -32,3 +32,5 @@ set( FILE_LIST
 )
 
 add_library(extractor OBJECT ${FILE_LIST})
+
+add_subdirectory(test)
diff --git a/src/network_inspectors/extractor/test/CMakeLists.txt b/src/network_inspectors/extractor/test/CMakeLists.txt
new file mode 100644 (file)
index 0000000..ff75fd1
--- /dev/null
@@ -0,0 +1,13 @@
+if (ENABLE_BENCHMARK_TESTS)
+
+    add_catch_test( extractor_benchmark
+        SOURCES
+            ../extractor_csv_logger.cc
+            ../extractor_json_logger.cc
+            ${CMAKE_SOURCE_DIR}/src/helpers/json_stream.cc
+            ${CMAKE_SOURCE_DIR}/src/sfip/sf_ip.cc
+            ${CMAKE_SOURCE_DIR}/src/utils/util_cstring.cc
+            ${CMAKE_SOURCE_DIR}/src/utils/util.cc
+    )
+
+endif(ENABLE_BENCHMARK_TESTS)
diff --git a/src/network_inspectors/extractor/test/extractor_benchmark.cc b/src/network_inspectors/extractor/test/extractor_benchmark.cc
new file mode 100644 (file)
index 0000000..0a44a37
--- /dev/null
@@ -0,0 +1,228 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+// extractor_benchmark.cc author Cisco
+
+#ifdef BENCHMARK_TEST
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "catch/catch.hpp"
+
+#include <vector>
+
+#include "network_inspectors/extractor/extractor_csv_logger.h"
+#include "network_inspectors/extractor/extractor_json_logger.h"
+#include "network_inspectors/extractor/extractor_null_conn.h"
+#include "main/snort_config.h"
+
+namespace snort
+{
+void ErrorMessage(const char*, ...) {}
+SnortConfig::SnortConfig(snort::SnortConfig const*, char const*) { thiszone = 0; }
+SnortConfig::~SnortConfig() { }
+const SnortConfig* SnortConfig::get_conf() { static SnortConfig s_config; return &s_config; }
+}
+
+using namespace snort;
+using namespace std;
+
+#define FIELD_STRING                            \
+    do                                          \
+    {                                           \
+        BENCHMARK("string")                     \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, str);       \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_SUB_STRING                        \
+    do                                          \
+    {                                           \
+        BENCHMARK("sub-string")                 \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, str, 16);   \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_NUMBER                            \
+    do                                          \
+    {                                           \
+        BENCHMARK("number")                     \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, num);       \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_TIMESTAMP                         \
+    do                                          \
+    {                                           \
+        BENCHMARK("timestamp")                  \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, ts);        \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_SFIP                              \
+    do                                          \
+    {                                           \
+        BENCHMARK("IP")                         \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, ip);        \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_BOOLEAN                           \
+    do                                          \
+    {                                           \
+        BENCHMARK("boolean")                    \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, flag);      \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_STRING_VECTOR                     \
+    do                                          \
+    {                                           \
+        BENCHMARK("string vector (x4)")         \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, v4_str);    \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_NUMBER_VECTOR                     \
+    do                                          \
+    {                                           \
+        BENCHMARK("number vector (x4)")         \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, v4_num);    \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_BOOLEAN_VECTOR                    \
+    do                                          \
+    {                                           \
+        BENCHMARK("boolean vector (x4)")        \
+        {                                       \
+            logger.open_record();               \
+            logger.add_field(field, v4_flag);   \
+            return true;                        \
+        };                                      \
+    } while (0)
+
+#define FIELD_ALL                                       \
+    do                                                  \
+    {                                                   \
+        BENCHMARK("record (all fields, no vectors)")    \
+        {                                               \
+            logger.open_record();                       \
+            logger.add_field(field, str);               \
+            logger.add_field(field, str, 16);           \
+            logger.add_field(field, num);               \
+            logger.add_field(field, ts);                \
+            logger.add_field(field, ip);                \
+            logger.add_field(field, flag);              \
+            logger.close_record(id);                    \
+            return true;                                \
+        };                                              \
+    } while (0)
+
+#define FIELD_NONE                                      \
+    do                                                  \
+    {                                                   \
+        BENCHMARK("empty record (no fields)")           \
+        {                                               \
+            logger.open_record();                       \
+            logger.close_record(id);                    \
+            return true;                                \
+        };                                              \
+    } while (0)
+
+#define SEQUENCE                                                        \
+    do {                                                                \
+        const char* str = "0123456789abcdef";                           \
+        uint64_t num = 0x12345678abcdef00;                              \
+        struct timeval ts{};                                            \
+        SfIp ip{};                                                      \
+        bool flag = false;                                              \
+        vector<const char*> v4_str = {"0123456789abcdef", "0123456789abcdef", "0123456789abcdef", "0123456789abcdef"}; \
+        vector<uint64_t> v4_num = {0x12345678abcdef00, 0x12345678abcdef00, 0x12345678abcdef00, 0x12345678abcdef00}; \
+        vector<bool> v4_flag = {false, true, false, true};              \
+                                                                        \
+        FIELD_STRING;                                                   \
+        FIELD_SUB_STRING;                                               \
+        FIELD_NUMBER;                                                   \
+        FIELD_TIMESTAMP;                                                \
+        FIELD_SFIP;                                                     \
+        FIELD_BOOLEAN;                                                  \
+        FIELD_STRING_VECTOR;                                            \
+        FIELD_NUMBER_VECTOR;                                            \
+        FIELD_BOOLEAN_VECTOR;                                           \
+        FIELD_ALL;                                                      \
+        FIELD_NONE;                                                     \
+    } while (0)
+
+
+TEST_CASE("CSV", "[Extractor]")
+{
+    ExtractorNullConnector nil;
+    CsvExtractorLogger logger(&nil, TimeType());
+    const Connector::ID& id = logger.get_id("");
+    const char* field = "test";
+
+    SEQUENCE;
+}
+
+TEST_CASE("TSV", "[Extractor]")
+{
+    ExtractorNullConnector nil;
+    CsvExtractorLogger logger(&nil, TimeType(), '\t');
+    const Connector::ID& id = logger.get_id("");
+    const char* field = "test";
+
+    SEQUENCE;
+}
+
+TEST_CASE("JSON", "[Extractor]")
+{
+    ExtractorNullConnector nil;
+    JsonExtractorLogger logger(&nil, TimeType());
+    const Connector::ID& id = logger.get_id("");
+    const char* field = "test";
+
+    SEQUENCE;
+}
+
+#endif