From: Oleksii Shumeiko -X (oshumeik - SOFTSERVE INC at Cisco) Date: Mon, 23 Jun 2025 13:16:19 +0000 (+0000) Subject: Pull request #4767: Extractor Buffered Printout X-Git-Tag: 3.9.1.0~5 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=c35cbd8434bcec8158037d3c972410c0c625e18e;p=thirdparty%2Fsnort3.git Pull request #4767: Extractor Buffered Printout Merge in SNORT/snort3 from ~OSHUMEIK/snort3:ring2 to master Squashed commit of the following: commit 180fa2a60a25000ed386dafd98db053c018a1630 Author: Oleksii Shumeiko Date: Wed Jun 18 12:13:52 2025 +0300 connectors: set affinity for flusher thread commit e8ab7c14455dd9678fc5cce5e4f1a43e544a3604 Author: Oleksii Shumeiko Date: Mon Jun 16 12:44:27 2025 +0300 connectors: give name to flusher thread commit ba153a5662ae767d68c06d98a2b4a870965d4758 Author: Oleksii Shumeiko Date: Tue May 27 18:23:44 2025 +0300 extractor: add benchmark tests commit 1b990e23946fd36b21035f92c8c7d8c16562102e Author: Oleksii Shumeiko Date: Wed May 14 16:50:37 2025 +0300 connectors: add redirect option to print to a file commit 9860640b438d9741480382958d6ed2c2207ab271 Author: Oleksii Shumeiko Date: Tue May 13 17:41:49 2025 +0300 connectors: rename text log field commit bd4f4cd4d5a6b34b1eff92d53f5238bde30b494e Author: Oleksii Shumeiko Date: Wed May 7 18:01:49 2025 +0300 connectors: rebuild readers as they might be outdated at exit commit 74b8a422ba86c8b76c6a83ef396558a497d3fe7e Author: Oleksii Shumeiko Date: Wed May 7 17:26:17 2025 +0300 connectors: guarantee writes for std connector commit d268bc8b55171a6d7dfd3cd9499f84cb0aff8caa Author: Oleksii Shumeiko Date: Fri Apr 25 18:01:56 2025 +0300 connectors: add buffered output to std_connector New buffer_size option in std_connector. commit 86e30b13424263c4c29c98e5bce06c0c0cc1c3a0 Author: Oleksii Shumeiko Date: Mon Apr 14 10:45:01 2025 +0300 helpers: add 1-reader-1-writer ring buffer Thread safe. Supports variable record size. Overflow on write drops the new record. Benchmark tests added. --- diff --git a/src/connectors/std_connector/CMakeLists.txt b/src/connectors/std_connector/CMakeLists.txt index 4c81ef8d1..64d7f04ed 100644 --- a/src/connectors/std_connector/CMakeLists.txt +++ b/src/connectors/std_connector/CMakeLists.txt @@ -1,5 +1,7 @@ add_library( std_connector OBJECT + std_connector_buffer.cc + std_connector_buffer.h std_connector.cc std_connector.h ) diff --git a/src/connectors/std_connector/dev_notes.txt b/src/connectors/std_connector/dev_notes.txt index 8f7b83722..4c920216c 100644 --- a/src/connectors/std_connector/dev_notes.txt +++ b/src/connectors/std_connector/dev_notes.txt @@ -18,4 +18,31 @@ of Connectors. std_connector pre-configures 3 default connectors: * stdout: default transmit connector * stdin: default receive connector -* stdio: default duplex connector \ No newline at end of file +* stdio: default duplex connector + +--- + +The connector is able to synchronize printouts from multiple threads. It +leverages buffers. `Ring2` buffer guarantees asynchronous reading and writing +operations to the buffer, implementing the following scheme for data path +"Writer->Buffer->Readers[]". + +`StdConnectorBuffer` instance is expected to be unique per Snort configuration. +It ensures that the 1-1-? scheme above is strictly followed: + +* Public part grants `Writer` objects only ++ +1. No way for client code to break the scheme. +2. Connector instance can acquire a buffer. +3. Connector writes data to the buffer via Writer. +4. Connector should abandon Writers before Snort configuration removes `StdConnectorBuffer` instance. + +* Private part keeps `Ring2` and `Reader` ++ +1. Two last pieces complete the scheme. +2. Resource management is encapsulated. +3. All readers are available in a single place (for the printing thread). +4. The printing thread builds the list of readers from the actual pool of buffers. + +Once connector instance gets a writer, its output becomes buffered and +synchronized with other instances via `StdConnectorBuffer` instance. diff --git a/src/connectors/std_connector/std_connector.cc b/src/connectors/std_connector/std_connector.cc index d30f256ac..616efa3a7 100644 --- a/src/connectors/std_connector/std_connector.cc +++ b/src/connectors/std_connector/std_connector.cc @@ -39,6 +39,7 @@ struct StdConnectorStats { PegCount messages_received; PegCount messages_transmitted; + PegCount tx_messages_stalled; }; THREAD_LOCAL StdConnectorStats std_connector_stats; @@ -56,6 +57,12 @@ static const Parameter std_connector_params[] = { "direction", Parameter::PT_ENUM, "receive | transmit | duplex", nullptr, "usage" }, + { "buffer_size", Parameter::PT_INT, "0:max32", "0", + "per-instance buffer size in bytes (0 no buffering, otherwise buffered and synchronized across threads)" }, + + { "redirect", Parameter::PT_STRING, nullptr, nullptr, + "output file name where printout is redirected" }, + { nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr } }; @@ -63,6 +70,7 @@ static const PegInfo std_connector_pegs[] = { { CountType::SUM, "messages_received", "total number of messages received" }, { CountType::SUM, "messages_transmitted", "total number of messages transmitted" }, + { CountType::SUM, "messages_stalled", "total number of messages attempted for transmission but overflowed" }, { CountType::END, nullptr, nullptr } }; @@ -77,24 +85,26 @@ bool StdConnectorModule::begin(const char* mod, int idx, SnortConfig*) { if (idx == 0 and strcmp(mod, "std_connector") == 0) { - auto out_conn = std::make_unique(); + auto out_conn = std::make_unique(); out_conn->direction = Connector::CONN_TRANSMIT; out_conn->connector_name = "stdout"; + out_conn->buffer_size = 0; config_set.push_back(std::move(out_conn)); - auto in_conn = std::make_unique(); + auto in_conn = std::make_unique(); in_conn->direction = Connector::CONN_RECEIVE; in_conn->connector_name = "stdin"; config_set.push_back(std::move(in_conn)); - auto io_conn = std::make_unique(); + auto io_conn = std::make_unique(); io_conn->direction = Connector::CONN_DUPLEX; io_conn->connector_name = "stdio"; + io_conn->buffer_size = 0; config_set.push_back(std::move(io_conn)); } if ( !config ) - config = std::make_unique(); + config = std::make_unique(); return true; } @@ -121,6 +131,11 @@ bool StdConnectorModule::set(const char*, Value& v, SnortConfig*) return false; } } + else if ( v.is("buffer_size") ) + config->buffer_size = v.get_uint32(); + else if ( v.is("redirect") ) + config->output = v.get_string(); + return true; } @@ -145,20 +160,70 @@ PegCount* StdConnectorModule::get_counts() const // connector stuff //------------------------------------------------------------------------- -StdConnector::StdConnector(const snort::ConnectorConfig& conf) : - snort::Connector(conf), extr_std_log(TextLog_Init("stdout")) -{ } +static StdConnectorBuffer fail_safe_buffer(nullptr); +static Ring2 fail_safe_ring(0); + +StdConnectorCommon::StdConnectorCommon(snort::ConnectorConfig::ConfigSet&& c_s) + : ConnectorCommon(std::move(c_s)) +{ + for (auto& config : config_set) + { + StdConnectorConfig& std_config = (StdConnectorConfig&)*config; + + if (std_config.buffer_size > 0) + std_config.buffer = &buffers.emplace_back(std_config.output.c_str()); + } +} + +StdConnector::StdConnector(const StdConnectorConfig& conf) : + snort::Connector(conf), + buffered(conf.buffer and conf.buffer_size != 0), + text_log(TextLog_Init(conf.output.c_str(), false)), + writer(buffered ? conf.buffer->acquire(conf.buffer_size) : fail_safe_ring.writer()), + buffer(conf.buffer ? *conf.buffer : fail_safe_buffer) +{ + buffer.start(); +} StdConnector::~StdConnector() -{ TextLog_Term(extr_std_log); } +{ + TextLog_Term(text_log); + + if (buffered) + buffer.release(writer); +} bool StdConnector::internal_transmit_message(const ConnectorMsg& msg) { if ( !msg.get_data() or msg.get_length() == 0 ) return false; - return TextLog_Print(extr_std_log, "%.*s\n", msg.get_length(), msg.get_data()) && - ++(std_connector_stats.messages_transmitted); + if (!buffered) + return TextLog_Print(text_log, "%.*s\n", msg.get_length(), msg.get_data()) + and ++std_connector_stats.messages_transmitted; + +#ifdef REG_TEST + const bool safe = false; +#else + const bool safe = true; +#endif + + bool success = false; + + do + { + writer.retry(); + success = writer.write(msg.get_data(), msg.get_length()); + } while (!success and !safe); + + writer.push(); + + if (success) + ++std_connector_stats.messages_transmitted; + else + ++std_connector_stats.tx_messages_stalled; + + return success; } bool StdConnector::transmit_message(const ConnectorMsg& msg, const ID&) @@ -168,7 +233,14 @@ bool StdConnector::transmit_message(const ConnectorMsg&& msg, const ID&) { return internal_transmit_message(msg); } bool StdConnector::flush() -{ return TextLog_Flush(extr_std_log); } +{ + if (!buffered) + return TextLog_Flush(text_log); + + writer.push(); + + return true; +} ConnectorMsg StdConnector::receive_message(bool) { @@ -194,7 +266,11 @@ static void mod_dtor(Module* m) { delete m; } static Connector* std_connector_tinit(const ConnectorConfig& config) -{ return new StdConnector(config); } +{ + const StdConnectorConfig& conf = (const StdConnectorConfig&)config; + + return new StdConnector(conf); +} static void std_connector_tterm(Connector* connector) { delete connector; } @@ -202,7 +278,8 @@ static void std_connector_tterm(Connector* connector) static ConnectorCommon* std_connector_ctor(Module* m) { StdConnectorModule* mod = (StdConnectorModule*)m; - ConnectorCommon* std_connector_common = new ConnectorCommon(mod->get_and_clear_config()); + ConnectorCommon* std_connector_common = new StdConnectorCommon(mod->get_and_clear_config()); + return std_connector_common; } diff --git a/src/connectors/std_connector/std_connector.h b/src/connectors/std_connector/std_connector.h index f1f37432b..124259ab8 100644 --- a/src/connectors/std_connector/std_connector.h +++ b/src/connectors/std_connector/std_connector.h @@ -21,14 +21,32 @@ #ifndef STD_CONNECTOR_H #define STD_CONNECTOR_H +#include +#include + #include "framework/connector.h" #include "framework/module.h" +#include "helpers/ring2.h" + +#include "std_connector_buffer.h" #define S_NAME "std_connector" #define S_HELP "implement the stdout/stdin based connector" struct TextLog; +//------------------------------------------------------------------------- +// config +//------------------------------------------------------------------------- + +class StdConnectorConfig : public snort::ConnectorConfig +{ +public: + uint32_t buffer_size = 0; + StdConnectorBuffer* buffer = nullptr; + std::string output = "stdout"; +}; + //------------------------------------------------------------------------- // module stuff //------------------------------------------------------------------------- @@ -54,17 +72,26 @@ public: private: snort::ConnectorConfig::ConfigSet config_set; - std::unique_ptr config; + std::unique_ptr config; }; //------------------------------------------------------------------------- // connector stuff //------------------------------------------------------------------------- +class StdConnectorCommon : public snort::ConnectorCommon +{ +public: + StdConnectorCommon(snort::ConnectorConfig::ConfigSet&&); + +private: + std::list buffers; +}; + class StdConnector : public snort::Connector { public: - StdConnector(const snort::ConnectorConfig& conf); + StdConnector(const StdConnectorConfig& conf); ~StdConnector() override; bool transmit_message(const snort::ConnectorMsg&, const ID& = null) override; @@ -76,7 +103,10 @@ public: private: bool internal_transmit_message(const snort::ConnectorMsg&); - TextLog* extr_std_log; + bool buffered; + TextLog* text_log; + Ring2::Writer writer; + StdConnectorBuffer& buffer; }; #endif diff --git a/src/connectors/std_connector/std_connector_buffer.cc b/src/connectors/std_connector/std_connector_buffer.cc new file mode 100644 index 000000000..9bfd9b943 --- /dev/null +++ b/src/connectors/std_connector/std_connector_buffer.cc @@ -0,0 +1,204 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- + +// std_connector_buffer.cc author Cisco + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "std_connector_buffer.h" + +#include +#include + +#include "log/text_log.h" +#include "main/snort.h" +#include "main/snort_config.h" +#include "main/thread_config.h" +#include "utils/util.h" + +using namespace snort; +using namespace std; + +#define FLUSHER_THREAD_NAME "std_connector.flusher" + +static void flusher(const SnortConfig* initial_config, const char* output, + mutex& rings_mutex, list& rings, atomic_flag& latest, atomic_flag& run) +{ + ThreadConfig *thread_config = initial_config->thread_config; + thread_config->implement_named_thread_affinity(FLUSHER_THREAD_NAME); + SET_THREAD_NAME(pthread_self(), FLUSHER_THREAD_NAME); + + SnortConfig local_config; + local_config.merge(initial_config); + SnortConfig::set_conf(&local_config); + + TextLog* out = TextLog_Init(output, false); + list readers; + + latest.clear(); + + while (run.test_and_set()) + { + if (!rings_mutex.try_lock()) + continue; + + if (!latest.test_and_set()) + { + readers.clear(); + for (auto& ring : rings) + readers.emplace_back(ring.reader()); + } + + for (auto& reader : readers) + { + reader.retry(); + + size_t data_len = 0; + void* data = nullptr; + + while ((data = reader.read(data_len)) and data_len > 0) + TextLog_Print(out, "%.*s\n", (int)data_len, (char*)data); + + reader.pop(); + } + + rings_mutex.unlock(); + + TextLog_Flush(out); + + this_thread::yield(); + } + + { + lock_guard lock(rings_mutex); + + if (!latest.test_and_set()) + { + readers.clear(); + for (auto& ring : rings) + readers.emplace_back(ring.reader()); + } + + for (auto& reader : readers) + { + reader.retry(); + + size_t data_len = 0; + void* data = nullptr; + + while ((data = reader.read(data_len)) and data_len > 0) + TextLog_Print(out, "%.*s\n", (int)data_len, (char*)data); + + reader.pop(); + } + } + + TextLog_Flush(out); + TextLog_Term(out); + SnortConfig::set_conf(nullptr); +} + +StdConnectorBuffer::StdConnectorBuffer(const char* output) +{ + if (!output) + return; + + destination = output; +} + +StdConnectorBuffer::~StdConnectorBuffer() +{ + sink_run.clear(); + + if (sink) + sink->join(); + + delete sink; +} + +void StdConnectorBuffer::start() +{ + scoped_lock lock(start_mutex, rings_mutex); + + if (sink) + return; + + if (destination.empty()) + return; + + auto sc = SnortConfig::get_conf(); + + if (!sc) + return; + + sink_latest.test_and_set(); + sink_run.test_and_set(); + // coverity[missing_lock] + sink = new thread(flusher, sc, destination.c_str(), ref(rings_mutex), ref(rings), ref(sink_latest), ref(sink_run)); + + while (sink_latest.test_and_set()); +} + +Ring2::Writer StdConnectorBuffer::acquire(size_t buffer_size) +{ + lock_guard lock(rings_mutex); + + rings.emplace_back(buffer_size); + + sink_latest.clear(); + + return rings.back().writer(); +} + +bool StdConnectorBuffer::release(const Ring2::Writer& writer) +{ + lock_guard lock(rings_mutex); + + // check for removed rings if they can be deleted + bool updated = false; + auto ring_removed = rings_removed.begin(); + + while (ring_removed != rings_removed.end()) + { + if ((*ring_removed)->empty()) + { + auto ring = *ring_removed; + rings.erase(ring); + ring_removed = rings_removed.erase(ring_removed); + updated = true; + } + else + ring_removed++; + } + + if (updated) + sink_latest.clear(); + + // mark the ring for deletion + auto ring = find_if(rings.begin(), rings.end(), + [&](const Ring2& r) { return r.native(writer); }); + + if (ring == rings.end()) + return false; + + rings_removed.push_back(ring); + + return true; +} diff --git a/src/connectors/std_connector/std_connector_buffer.h b/src/connectors/std_connector/std_connector_buffer.h new file mode 100644 index 000000000..f755c26bf --- /dev/null +++ b/src/connectors/std_connector/std_connector_buffer.h @@ -0,0 +1,57 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- + +// std_connector_buffer.h author Cisco + +#ifndef STD_CONNECTOR_BUFFER_H +#define STD_CONNECTOR_BUFFER_H + +#include +#include +#include +#include +#include + +#include "helpers/ring2.h" + +struct TextLog; + +class StdConnectorBuffer +{ +public: + StdConnectorBuffer(const char* output); + ~StdConnectorBuffer(); + + void start(); + + Ring2::Writer acquire(size_t buffer_size); + bool release(const Ring2::Writer&); + +private: + std::string destination; + std::mutex start_mutex; + + std::mutex rings_mutex; + std::list rings; + std::list::iterator> rings_removed; + std::thread* sink{nullptr}; + std::atomic_flag sink_latest{false}; + std::atomic_flag sink_run{false}; +}; + +#endif diff --git a/src/connectors/std_connector/test/std_connector_test.cc b/src/connectors/std_connector/test/std_connector_test.cc index edccaf1b9..405fdfeb4 100644 --- a/src/connectors/std_connector/test/std_connector_test.cc +++ b/src/connectors/std_connector/test/std_connector_test.cc @@ -31,14 +31,19 @@ #include #include -using namespace snort; +StdConnectorBuffer::StdConnectorBuffer(const char*) {} +StdConnectorBuffer::~StdConnectorBuffer() {} +void StdConnectorBuffer::start() {} +Ring2::Writer StdConnectorBuffer::acquire(unsigned long) { return Ring2(0).writer(); } +bool StdConnectorBuffer::release(Ring2::Writer const&) { return false; } +using namespace snort; extern const BaseApi* std_connector; const ConnectorApi* stdc_api = nullptr; -ConnectorConfig connector_transmit_config; -ConnectorConfig connector_receive_config; +StdConnectorConfig connector_transmit_config; +StdConnectorConfig connector_receive_config; Module* mod; diff --git a/src/helpers/CMakeLists.txt b/src/helpers/CMakeLists.txt index a94f72d2c..7d1a32d1f 100644 --- a/src/helpers/CMakeLists.txt +++ b/src/helpers/CMakeLists.txt @@ -23,6 +23,7 @@ set (HELPERS_INCLUDES json_stream.h literal_search.h memcap_allocator.h + ring2.h scratch_allocator.h sigsafe.h utf.h diff --git a/src/helpers/ring2.h b/src/helpers/ring2.h new file mode 100644 index 000000000..234ab7ae7 --- /dev/null +++ b/src/helpers/ring2.h @@ -0,0 +1,286 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- +// ring2.h author Cisco + +#ifndef RING2_H +#define RING2_H + +#include +#include +#include + +// Ring buffer implementation with the following requirements: +// - stored objects are contiguous in memory +// - stored record is length-value pair, where length is private +// - 1 reader 1 writer are supported +// - reader and writer can be in different threads, but their ops are still thread-local +// - in case of overflow the new object is dropped (no overwrites) +// - FIFO +// - read operation grants direct access (no copy) +// - container may contain gaps (not reaching full capacity) + +class Ring2 +{ +public: + struct Writer + { + Writer(Writer&& w) : ptr(w.ptr), end(w.end), store(w.store), no_turns(w.no_turns) {} + + void retry(); // reinitialize writer + bool write(const void*, size_t); // adds another record + void push(); // publishes new records + + private: + friend Ring2; + + Writer(Ring2& owner, uint8_t* cursor, uint8_t* guard, bool no_wrapping) + : ptr(cursor), end(guard), store(owner), no_turns(no_wrapping) {} + + Writer& operator=(Writer&&); + + uint8_t* ptr; + uint8_t* end; + Ring2& store; + bool no_turns; + }; + + struct Reader + { + Reader(Reader&& r ) : ptr(r.ptr), end(r.end), store(r.store), more_turns(r.more_turns) {} + + void retry(); // reinitialize reader + void* read(size_t&); // gets next record + void pop(); // releases read records + + private: + friend Ring2; + + Reader(Ring2& owner, uint8_t* cursor, uint8_t* guard, bool wrapping) + : ptr(cursor), end(guard), store(owner), more_turns(wrapping) {} + + Reader& operator=(Reader&&); + + uint8_t* ptr; + uint8_t* end; + Ring2& store; + bool more_turns; + }; + + Ring2(size_t); + ~Ring2(); + + Ring2(const Ring2&) = delete; + Ring2& operator =(const Ring2&) = delete; + + size_t capacity() const; + bool empty() const; + void clear(); // invalidates Reader and Writer unless they retry + + Writer writer(); + Reader reader(); + + bool native(const Writer& w) const + { return this == &w.store; } + + bool native(const Reader& r) const + { return this == &r.store; } + +private: + using LEN_TYPE = uint32_t; + static constexpr auto LEN_SIZE = sizeof(LEN_TYPE); + + uint8_t* const store; + uint8_t* const store_end; + std::atomic write_ptr; + std::atomic read_ptr; +}; + +inline Ring2::Ring2(size_t buffer_size) + : store(new uint8_t[buffer_size]), store_end(store + buffer_size) +{ + write_ptr.store(store, std::memory_order_relaxed); + read_ptr.store(store, std::memory_order_relaxed); +} + +inline Ring2::~Ring2() +{ + delete[] store; +} + +inline size_t Ring2::capacity() const +{ + return store_end - store; +} + +inline bool Ring2::empty() const +{ + auto w = write_ptr.load(std::memory_order_acquire); + auto r = read_ptr.load(std::memory_order_acquire); + + return w == r; +} + +inline void Ring2::clear() +{ + write_ptr.store(store, std::memory_order_relaxed); + read_ptr.store(store, std::memory_order_relaxed); +} + +inline Ring2::Writer Ring2::writer() +{ + auto w = write_ptr.load(std::memory_order_acquire); + auto r = read_ptr.load(std::memory_order_acquire); + + auto no_wrapping = w < r; + auto guard = no_wrapping ? r : store_end; + + return {*this, w, guard, no_wrapping}; +} + +inline Ring2::Reader Ring2::reader() +{ + auto w = write_ptr.load(std::memory_order_acquire); + auto r = read_ptr.load(std::memory_order_acquire); + + auto wrapping = r > w; + auto guard = wrapping ? store_end : w; + + return {*this, r, guard, wrapping}; +} + +inline bool Ring2::Writer::write(const void* const data, const size_t data_len) +{ + // empty records are not allowed + if (data_len == 0) + return false; + + // a guarding byte, writer should never move to the very end + // (but reader can, which will empty the store) + if (ptr + LEN_SIZE + data_len < end) + { + // normal case + *(LEN_TYPE*)ptr = data_len; + memcpy(ptr + LEN_SIZE, data, data_len); + ptr += LEN_SIZE + data_len; + return true; + } + else if (no_turns) + { + // overflow + return false; + } + + // wrapping case + const auto zero_record = (ptr + LEN_SIZE <= end); + const auto ptr_orig = ptr; + const auto end_orig = end; + + no_turns = true; + ptr = store.store; + end = store.read_ptr.load(std::memory_order_acquire); + + auto res = ptr != end and write(data, data_len); + + if (res) + { + if (zero_record) + *(LEN_TYPE*)ptr_orig = 0; + } + else + { + no_turns = false; + ptr = ptr_orig; + end = end_orig; + } + + return res; +} + +inline void Ring2::Writer::retry() +{ + *this = store.writer(); +} + +inline void Ring2::Writer::push() +{ + store.write_ptr.store(ptr, std::memory_order_release); +} + +// cppcheck-suppress operatorEqVarError +inline Ring2::Writer& Ring2::Writer::operator =(Ring2::Writer&& other) +{ + ptr = other.ptr; + end = other.end; + no_turns = other.no_turns; + + return *this; +} + +inline void* Ring2::Reader::read(size_t& data_len) +{ + data_len = 0; + + // next record size + if (ptr + LEN_SIZE <= end) + data_len = *(LEN_TYPE*)ptr; + + if (data_len != 0 and ptr + LEN_SIZE + data_len <= end) + { + // normal case + ptr += LEN_SIZE; + auto data = ptr; + ptr += data_len; + + return data; + } + else if (more_turns) + { + // wrapping case + more_turns = false; + ptr = store.store; + end = store.write_ptr.load(std::memory_order_acquire); + + return read(data_len); + } + + // underflow + assert(data_len == 0); + return nullptr; +} + +inline void Ring2::Reader::retry() +{ + *this = store.reader(); +} + +inline void Ring2::Reader::pop() +{ + store.read_ptr.store(ptr, std::memory_order_release); +} + +// cppcheck-suppress operatorEqVarError +inline Ring2::Reader& Ring2::Reader::operator =(Ring2::Reader&& other) +{ + ptr = other.ptr; + end = other.end; + more_turns = other.more_turns; + + return *this; +} + +#endif diff --git a/src/helpers/test/CMakeLists.txt b/src/helpers/test/CMakeLists.txt index 1e41ac818..c8bf69f3d 100644 --- a/src/helpers/test/CMakeLists.txt +++ b/src/helpers/test/CMakeLists.txt @@ -40,3 +40,16 @@ add_catch_test( streambuf_test ../streambuf.cc ) +add_catch_test( ring2_test + SOURCES + ../ring2.h +) + +if (ENABLE_BENCHMARK_TESTS) + + add_catch_test( ring2_benchmark + SOURCES + ../ring2.h + ) + +endif(ENABLE_BENCHMARK_TESTS) diff --git a/src/helpers/test/ring2_benchmark.cc b/src/helpers/test/ring2_benchmark.cc new file mode 100644 index 000000000..beb0f0629 --- /dev/null +++ b/src/helpers/test/ring2_benchmark.cc @@ -0,0 +1,284 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- +// ring2_benchmark.cc author Cisco + +#ifdef BENCHMARK_TEST + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "catch/catch.hpp" + +#include + +#include "helpers/ring2.h" + +#define WRITE_FULL \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + \ + BENCHMARK("full write") \ + { \ + while(writer.write(data, sizeof(data))); \ + writer.retry(); \ + \ + return true; \ + }; \ + } while (0) + +#define READ_FULL \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + auto reader = buffer.reader(); \ + size_t len = 0; \ + \ + while (writer.write(data, sizeof(data))); \ + writer.push(); \ + \ + BENCHMARK("full read") \ + { \ + while(reader.read(len)); \ + reader.retry(); \ + \ + return true; \ + }; \ + } while (0) + +#define WRITE_1 \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + \ + BENCHMARK("1 write") \ + { \ + auto accepted = writer.write(data, sizeof(data)); \ + \ + if (!accepted) \ + writer.retry(); \ + \ + return true; \ + }; \ + } while (0) + +#define READ_1 \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + auto reader = buffer.reader(); \ + size_t len = 0; \ + \ + while (writer.write(data, sizeof(data))); \ + writer.push(); \ + \ + BENCHMARK("1 read") \ + { \ + auto read = reader.read(len); \ + \ + if (!read) \ + reader.retry(); \ + \ + return true; \ + }; \ + } while (0) + +#define WRITE_1_READ_1 \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + auto reader = buffer.reader(); \ + size_t len = 0; \ + \ + BENCHMARK("1 write 1 read") \ + { \ + auto r1 = writer.write(data, sizeof(data)); \ + writer.push(); \ + \ + auto r2 = reader.read(len); \ + reader.pop(); \ + \ + return r1 and r2 and len; \ + }; \ + } while (0) + + +#define WRITE_4_READ_4x \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + auto reader = buffer.reader(); \ + size_t len = 0; \ + \ + BENCHMARK("4 writes 4x reads") \ + { \ + writer.write(data, sizeof(data)); \ + writer.push(); \ + writer.write(data, sizeof(data)); \ + writer.push(); \ + writer.write(data, sizeof(data)); \ + writer.push(); \ + writer.write(data, sizeof(data)); \ + writer.push(); \ + \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.pop(); \ + \ + return true; \ + }; \ + } while (0) + +#define WRITE_4x_READ_4x \ + do { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + auto reader = buffer.reader(); \ + size_t len = 0; \ + \ + BENCHMARK("4x writes 4x reads") \ + { \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.push(); \ + \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.pop(); \ + \ + return true; \ + }; \ + } while (0) + +#define WRITE_8x_READ_8x \ + { \ + buffer.clear(); \ + auto writer = buffer.writer(); \ + auto reader = buffer.reader(); \ + size_t len = 0; \ + \ + BENCHMARK("8x writes 8x reads") \ + { \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.write(data, sizeof(data)); \ + writer.push(); \ + \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.read(len); \ + reader.pop(); \ + \ + return true; \ + }; \ + } while (0) + +#define SEQUENCE \ + do { \ + WRITE_FULL; \ + READ_FULL; \ + WRITE_1; \ + READ_1; \ + WRITE_1_READ_1; \ + WRITE_4_READ_4x; \ + WRITE_4x_READ_4x; \ + WRITE_8x_READ_8x; \ + } while (0) + +TEST_CASE("Linear: buffer 8K, record size 32", "[Ring2]") +{ + Ring2 buffer(8192); + char data[28] = {}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 16K, record size 32", "[Ring2]") +{ + Ring2 buffer(16384); + char data[28] = {}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 64K, record size 32", "[Ring2]") +{ + Ring2 buffer(65536); + char data[28] = {}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 8K, record size 256", "[Ring2]") +{ + Ring2 buffer(8192); + char data[252] = {0}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 16K, record size 256", "[Ring2]") +{ + Ring2 buffer(16384); + char data[252] = {0}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 64K, record size 256", "[Ring2]") +{ + Ring2 buffer(65536); + char data[252] = {0}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 64K, odd record size 239", "[Ring2]") +{ + Ring2 buffer(65536); + char data[239 - 4] = {0}; + + SEQUENCE; +} + +TEST_CASE("Linear: buffer 64K, odd record size 479", "[Ring2]") +{ + Ring2 buffer(65536); + char data[479 - 4] = {0}; + + SEQUENCE; +} + +#endif diff --git a/src/helpers/test/ring2_test.cc b/src/helpers/test/ring2_test.cc new file mode 100644 index 000000000..384dc89ac --- /dev/null +++ b/src/helpers/test/ring2_test.cc @@ -0,0 +1,830 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- +// ring2_test.cc author Cisco + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "catch/catch.hpp" + +#include + +#include "helpers/ring2.h" + +using namespace std; + +using Data = string; + +template +static void write(T& writer, Args&& ... records) +{ + const vector items{std::forward(records)...}; + + for (const auto& item : items) + { + bool res = writer.write(item.data(), item.size()); + CHECK(true == res); + } +} + +template +static void read(T& reader, Args&& ... args) +{ + const vector expected_items{std::forward(args)...}; + + for (const auto& expected : expected_items) + { + size_t data_len = 0; + const char* data = (const char*)reader.read(data_len); + Data actual(data, data_len); + CHECK(expected == actual); + } +} + +TEST_CASE("Basic", "[Ring2]") +{ + Ring2 buffer(1024); + + SECTION("no data") + { + REQUIRE(1024 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + auto reader = buffer.reader(); + + write(writer); + read(reader); + read(reader, ""); + read(reader, "", "", ""); + + CHECK(1024 == buffer.capacity()); + CHECK(true == buffer.empty()); + + writer.push(); + reader.pop(); + + CHECK(1024 == buffer.capacity()); + CHECK(true == buffer.empty()); + } + + SECTION("1 element") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + + write(writer, "hello"); + CHECK(true == buffer.empty()); + + writer.push(); + CHECK(false == buffer.empty()); + + auto reader = buffer.reader(); + + read(reader, "hello"); + CHECK(false == buffer.empty()); + + reader.pop(); + CHECK(true == buffer.empty()); + } +} + +TEST_CASE("Visibility", "[Ring2]") +{ + Ring2 buffer(1024); + + SECTION("caching") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + auto reader = buffer.reader(); + + write(writer, "0", "1", "2", "3", "4"); + reader.retry(); + read(reader, ""); + + write(writer, "5", "6", "7", "8", "9"); + reader.retry(); + read(reader, ""); + + write(writer, "a", "b", "c", "d", "e"); + reader.retry(); + read(reader, ""); + + write(writer, "f"); + reader.retry(); + read(reader, ""); + + writer.push(); + reader.retry(); + read(reader, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"); + } + + SECTION("reader") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + auto reader = buffer.reader(); + + write(writer, "hello", " ", "world", "!"); + writer.push(); + + read(reader, ""); + CHECK(false == buffer.empty()); + + reader.retry(); + read(reader, "hello", " ", "world", "!"); + + CHECK(false == buffer.empty()); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("reader incremental") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + auto reader = buffer.reader(); + + read(reader, ""); + reader.retry(); + read(reader, ""); + + write(writer, "foo"); + writer.push(); + + read(reader, ""); + reader.retry(); + read(reader, "foo", ""); + + write(writer, "bar"); + writer.push(); + + read(reader, ""); + reader.retry(); + read(reader, "foo", "bar", ""); + + write(writer, "baz"); + writer.push(); + + read(reader, ""); + reader.retry(); + read(reader, "foo", "bar", "baz", ""); + reader.pop(); + + CHECK(true == buffer.empty()); + } + + SECTION("reader incremental with pop") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + auto reader = buffer.reader(); + + read(reader, ""); + reader.retry(); + read(reader, ""); + + write(writer, "foo"); + writer.push(); + + read(reader, ""); + reader.retry(); + read(reader, "foo", ""); + reader.pop(); + + write(writer, "bar"); + writer.push(); + + read(reader, ""); + reader.retry(); + read(reader, "bar", ""); + reader.pop(); + + write(writer, "baz"); + writer.push(); + + read(reader, ""); + reader.retry(); + read(reader, "baz", ""); + reader.pop(); + + CHECK(true == buffer.empty()); + } + + SECTION("writer incremental") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "foo"); + writer.push(); + + auto r1 = buffer.reader(); + read(r1, "foo", ""); + + writer.retry(); + write(writer, "bar"); + + auto r2 = buffer.reader(); + read(r2, "foo", ""); + + writer.retry(); + write(writer, "baz"); + + auto r3 = buffer.reader(); + read(r3, "foo", ""); + + writer.retry(); + write(writer, "foo"); + writer.push(); + + auto r4 = buffer.reader(); + read(r4, "foo", "foo", ""); + r4.pop(); + + CHECK(true == buffer.empty()); + } + + SECTION("writer incremental with push") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + + writer.retry(); + write(writer, "foo"); + writer.push(); + + auto r1 = buffer.reader(); + read(r1, "foo", ""); + + writer.retry(); + write(writer, "bar"); + writer.push(); + + auto r2 = buffer.reader(); + read(r2, "foo", "bar", ""); + + writer.retry(); + write(writer, "baz"); + writer.push(); + + auto r3 = buffer.reader(); + read(r3, "foo", "bar", "baz", ""); + r3.pop(); + + CHECK(true == buffer.empty()); + } + + SECTION("clear") + { + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + auto reader = buffer.reader(); + + write(writer, "Alpha", "Bravo", "Charlie", "Delta", "Echo"); + writer.push(); + + reader.retry(); + read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo"); + CHECK(false == buffer.empty()); + + reader.retry(); + read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo"); + CHECK(false == buffer.empty()); + + reader.retry(); + read(reader, "Alpha", "Bravo", "Charlie", "Delta", "Echo"); + CHECK(false == buffer.empty()); + + reader.pop(); + CHECK(true == buffer.empty()); + + writer.retry(); + write(writer, "Foxtrot", "Golf", "Hotel", "India", "Juliette"); + writer.push(); + + CHECK(false == buffer.empty()); + buffer.clear(); + CHECK(true == buffer.empty()); + + reader.retry(); + read(reader, ""); + } +} + +TEST_CASE("Wrapping", "[Ring2]") +{ + Ring2 buffer(21); + + SECTION("[LEN][DATA]#(no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding"); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "padding", "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DATA#](no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding"); + + CHECK(false == writer.write("123456", 6)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "padding"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DATA#](no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding "); + + CHECK(false == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "padding "); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DA#TA](no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding "); + + CHECK(false == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "padding "); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN]#[DATA](no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding 2345"); + + CHECK(false == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "padding 2345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LE#N][DATA](no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding 123456"); + + CHECK(false == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "padding 123456"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("#[LEN][DATA](no room)") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "01234567890abcde"); + + CHECK(false == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "01234567890abcde"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DATA]#") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding"); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "padding"); + eraser.pop(); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DATA#]") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding"); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "padding"); + eraser.pop(); + + CHECK(true == writer.write("123456", 6)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "123456"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DATA#]") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding "); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "padding "); + eraser.pop(); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN][DA#TA]") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding "); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "padding "); + eraser.pop(); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LEN]#[DATA]") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding 2345"); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "padding 2345"); + eraser.pop(); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("[LE#N][DATA]") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "padding 123456"); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "padding 123456"); + eraser.pop(); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("#[LEN][DATA]") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "01234567890abcde"); + writer.push(); + auto eraser = buffer.reader(); + read(eraser, "01234567890abcde"); + eraser.pop(); + + CHECK(true == writer.write("12345", 5)); + writer.push(); + + auto reader = buffer.reader(); + read(reader, "12345"); + reader.pop(); + CHECK(true == buffer.empty()); + } + + SECTION("extended") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto w1 = buffer.writer(); + auto r1 = buffer.reader(); + + write(w1, "0", "1"); // 10 bytes + w1.push(); + + r1.retry(); + read(r1, "0", "1", ""); + + write(w1, "2", "3"); // 20 bytes + w1.push(); + + r1.retry(); + read(r1, "0", "1", "2", "3"); + r1.pop(); + + // wrapping 1 + auto w2 = buffer.writer(); + auto r2 = buffer.reader(); + + write(w2, "4", "5"); // +10 bytes + w2.push(); + + r2.retry(); + read(r2, "4", "5", ""); + + // cannot write two records now, as w2 thinks r2 still stays near the end + write(w2, "6"); // +15 bytes + CHECK(false == w2.write("7", 1)); + w2.push(); + + // The next two lines: + // - make it visible the reader passed the end + // - writer gets updated that the reader moved away from the end + // - this sequence unblocks the case when reader and writer find their pointers close to the end + r2.pop(); + w2.retry(); + + write(w2, "7"); // +20 bytes + w2.push(); + + r2.retry(); + read(r2, "6", "7"); + r2.pop(); + + // wrapping 2 + auto w3 = buffer.writer(); + auto r3 = buffer.reader(); + + write(w3, "a", "b"); // ++10 bytes + w3.push(); + + r3.retry(); + read(r3, "a", "b", ""); + + // cannot write two records now, as w3 thinks r3 still stays near the end + write(w3, "c"); // ++15 bytes + CHECK(false == w3.write("d", 1)); + w3.push(); + + // unblock + r3.pop(); + w3.retry(); + + write(w3, "d"); // ++20 bytes + w3.push(); + + r3.retry(); + read(r3, "c", "d"); + r3.pop(); + } + + SECTION("data fits") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "0123456789", "ab"); // 14 bytes + 6 bytes more + writer.push(); + + auto reader = buffer.reader(); + read(reader, "0123456789", "ab"); + reader.pop(); + + writer.retry(); // as the read pointer is on 21 byte, effective capacity is decreased by 1 byte + CHECK(false == writer.write("0123456789abcdef", 16)); + CHECK(true == writer.write("0123456789abcde", 15)); + writer.push(); + + reader.retry(); + read(reader, "0123456789abcde"); + reader.pop(); + } + + SECTION("data doesn't fit: 1st half, 2nd half") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "0123456789"); // 14 bytes + CHECK(false == writer.write("abc", 3)); // no place for 7 bytes, otherwise it'd fill entire storage + writer.push(); + + auto reader = buffer.reader(); + read(reader, "0123456789"); + reader.pop(); + + CHECK(true == buffer.empty()); // now as the storage is empty + writer.retry(); // but split into two parts of 14 + 7 bytes + CHECK(false == writer.write("0123456789a", 11)); // there is not room for records bigger than that + CHECK(false == writer.write("0123456789", 10)); // moreover, a guarding byte won't let the same record to go in + + CHECK(true == writer.write("012345678", 9)); // writer is forced to a smaller record by reader position + writer.push(); // consuming 13 bytes this time + reader.retry(); + read(reader, "012345678"); + reader.pop(); + + writer.retry(); + CHECK(true == writer.write("abc", 3)); // no, we have more place for the same record of 7 bytes + writer.push(); + + reader.retry(); + read(reader, "abc"); + reader.pop(); + } + + SECTION("Getting writer while it's behind reader") + { + REQUIRE(21 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + write(writer, "0123456789", "ab"); // 14 bytes + 6 bytes more + writer.push(); + + auto reader = buffer.reader(); + read(reader, "0123456789", "ab"); + reader.pop(); + + write(writer, "1", "2"); // start from beginning + writer.push(); + + { + auto co_writer = buffer.writer(); + write(co_writer, "3"); + co_writer.push(); + } + + reader.retry(); + read(reader, "1", "2", "3"); + reader.pop(); + } +} + +TEST_CASE("Empty buffer", "[Ring2]") +{ + Ring2 buffer(0); + + SECTION("writing") + { + REQUIRE(0 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto writer = buffer.writer(); + + CHECK(false == writer.write("", 0)); + CHECK(false == writer.write("a", 1)); + CHECK(false == writer.write("foo", 3)); + writer.push(); + + CHECK(false == writer.write("", 0)); + CHECK(false == writer.write("a", 1)); + CHECK(false == writer.write("foo", 3)); + writer.push(); + + writer.retry(); + + CHECK(false == writer.write("", 0)); + CHECK(false == writer.write("a", 1)); + CHECK(false == writer.write("foo", 3)); + writer.push(); + + CHECK(0 == buffer.capacity()); + CHECK(true == buffer.empty()); + } + + SECTION("reading") + { + REQUIRE(0 == buffer.capacity()); + REQUIRE(true == buffer.empty()); + + auto reader = buffer.reader(); + + read(reader, ""); + read(reader, ""); + read(reader, ""); + reader.pop(); + + read(reader, ""); + read(reader, ""); + read(reader, ""); + reader.pop(); + + reader.retry(); + + read(reader, ""); + read(reader, ""); + read(reader, ""); + reader.pop(); + + CHECK(0 == buffer.capacity()); + CHECK(true == buffer.empty()); + } +} diff --git a/src/managers/connector_manager.cc b/src/managers/connector_manager.cc index e0fc0bcb9..44ad4b877 100644 --- a/src/managers/connector_manager.cc +++ b/src/managers/connector_manager.cc @@ -126,7 +126,7 @@ void ConnectorManager::update_thread_connector(const std::string& connector_name if (connector != connector_ptr->second.thread_connectors[instance_id]) sc.api->tterm(connector_ptr->second.thread_connectors[instance_id]); } - + connector_ptr->second.thread_connectors[instance_id] = connector; break; } diff --git a/src/network_inspectors/extractor/CMakeLists.txt b/src/network_inspectors/extractor/CMakeLists.txt index 9cfd0b271..46f8dc36b 100644 --- a/src/network_inspectors/extractor/CMakeLists.txt +++ b/src/network_inspectors/extractor/CMakeLists.txt @@ -32,3 +32,5 @@ set( FILE_LIST ) add_library(extractor OBJECT ${FILE_LIST}) + +add_subdirectory(test) diff --git a/src/network_inspectors/extractor/test/CMakeLists.txt b/src/network_inspectors/extractor/test/CMakeLists.txt new file mode 100644 index 000000000..ff75fd1f9 --- /dev/null +++ b/src/network_inspectors/extractor/test/CMakeLists.txt @@ -0,0 +1,13 @@ +if (ENABLE_BENCHMARK_TESTS) + + add_catch_test( extractor_benchmark + SOURCES + ../extractor_csv_logger.cc + ../extractor_json_logger.cc + ${CMAKE_SOURCE_DIR}/src/helpers/json_stream.cc + ${CMAKE_SOURCE_DIR}/src/sfip/sf_ip.cc + ${CMAKE_SOURCE_DIR}/src/utils/util_cstring.cc + ${CMAKE_SOURCE_DIR}/src/utils/util.cc + ) + +endif(ENABLE_BENCHMARK_TESTS) diff --git a/src/network_inspectors/extractor/test/extractor_benchmark.cc b/src/network_inspectors/extractor/test/extractor_benchmark.cc new file mode 100644 index 000000000..0a44a3739 --- /dev/null +++ b/src/network_inspectors/extractor/test/extractor_benchmark.cc @@ -0,0 +1,228 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2025-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- +// extractor_benchmark.cc author Cisco + +#ifdef BENCHMARK_TEST + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "catch/catch.hpp" + +#include + +#include "network_inspectors/extractor/extractor_csv_logger.h" +#include "network_inspectors/extractor/extractor_json_logger.h" +#include "network_inspectors/extractor/extractor_null_conn.h" +#include "main/snort_config.h" + +namespace snort +{ +void ErrorMessage(const char*, ...) {} +SnortConfig::SnortConfig(snort::SnortConfig const*, char const*) { thiszone = 0; } +SnortConfig::~SnortConfig() { } +const SnortConfig* SnortConfig::get_conf() { static SnortConfig s_config; return &s_config; } +} + +using namespace snort; +using namespace std; + +#define FIELD_STRING \ + do \ + { \ + BENCHMARK("string") \ + { \ + logger.open_record(); \ + logger.add_field(field, str); \ + return true; \ + }; \ + } while (0) + +#define FIELD_SUB_STRING \ + do \ + { \ + BENCHMARK("sub-string") \ + { \ + logger.open_record(); \ + logger.add_field(field, str, 16); \ + return true; \ + }; \ + } while (0) + +#define FIELD_NUMBER \ + do \ + { \ + BENCHMARK("number") \ + { \ + logger.open_record(); \ + logger.add_field(field, num); \ + return true; \ + }; \ + } while (0) + +#define FIELD_TIMESTAMP \ + do \ + { \ + BENCHMARK("timestamp") \ + { \ + logger.open_record(); \ + logger.add_field(field, ts); \ + return true; \ + }; \ + } while (0) + +#define FIELD_SFIP \ + do \ + { \ + BENCHMARK("IP") \ + { \ + logger.open_record(); \ + logger.add_field(field, ip); \ + return true; \ + }; \ + } while (0) + +#define FIELD_BOOLEAN \ + do \ + { \ + BENCHMARK("boolean") \ + { \ + logger.open_record(); \ + logger.add_field(field, flag); \ + return true; \ + }; \ + } while (0) + +#define FIELD_STRING_VECTOR \ + do \ + { \ + BENCHMARK("string vector (x4)") \ + { \ + logger.open_record(); \ + logger.add_field(field, v4_str); \ + return true; \ + }; \ + } while (0) + +#define FIELD_NUMBER_VECTOR \ + do \ + { \ + BENCHMARK("number vector (x4)") \ + { \ + logger.open_record(); \ + logger.add_field(field, v4_num); \ + return true; \ + }; \ + } while (0) + +#define FIELD_BOOLEAN_VECTOR \ + do \ + { \ + BENCHMARK("boolean vector (x4)") \ + { \ + logger.open_record(); \ + logger.add_field(field, v4_flag); \ + return true; \ + }; \ + } while (0) + +#define FIELD_ALL \ + do \ + { \ + BENCHMARK("record (all fields, no vectors)") \ + { \ + logger.open_record(); \ + logger.add_field(field, str); \ + logger.add_field(field, str, 16); \ + logger.add_field(field, num); \ + logger.add_field(field, ts); \ + logger.add_field(field, ip); \ + logger.add_field(field, flag); \ + logger.close_record(id); \ + return true; \ + }; \ + } while (0) + +#define FIELD_NONE \ + do \ + { \ + BENCHMARK("empty record (no fields)") \ + { \ + logger.open_record(); \ + logger.close_record(id); \ + return true; \ + }; \ + } while (0) + +#define SEQUENCE \ + do { \ + const char* str = "0123456789abcdef"; \ + uint64_t num = 0x12345678abcdef00; \ + struct timeval ts{}; \ + SfIp ip{}; \ + bool flag = false; \ + vector v4_str = {"0123456789abcdef", "0123456789abcdef", "0123456789abcdef", "0123456789abcdef"}; \ + vector v4_num = {0x12345678abcdef00, 0x12345678abcdef00, 0x12345678abcdef00, 0x12345678abcdef00}; \ + vector v4_flag = {false, true, false, true}; \ + \ + FIELD_STRING; \ + FIELD_SUB_STRING; \ + FIELD_NUMBER; \ + FIELD_TIMESTAMP; \ + FIELD_SFIP; \ + FIELD_BOOLEAN; \ + FIELD_STRING_VECTOR; \ + FIELD_NUMBER_VECTOR; \ + FIELD_BOOLEAN_VECTOR; \ + FIELD_ALL; \ + FIELD_NONE; \ + } while (0) + + +TEST_CASE("CSV", "[Extractor]") +{ + ExtractorNullConnector nil; + CsvExtractorLogger logger(&nil, TimeType()); + const Connector::ID& id = logger.get_id(""); + const char* field = "test"; + + SEQUENCE; +} + +TEST_CASE("TSV", "[Extractor]") +{ + ExtractorNullConnector nil; + CsvExtractorLogger logger(&nil, TimeType(), '\t'); + const Connector::ID& id = logger.get_id(""); + const char* field = "test"; + + SEQUENCE; +} + +TEST_CASE("JSON", "[Extractor]") +{ + ExtractorNullConnector nil; + JsonExtractorLogger logger(&nil, TimeType()); + const Connector::ID& id = logger.get_id(""); + const char* field = "test"; + + SEQUENCE; +} + +#endif