]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Pull request #4725: mp_data_bus: standartize data types
authorOleksandr Stepanov -X (ostepano - SOFTSERVE INC at Cisco) <ostepano@cisco.com>
Wed, 7 May 2025 13:31:54 +0000 (13:31 +0000)
committerPriyanka Bangalore Gurudev (prbg) <prbg@cisco.com>
Wed, 7 May 2025 13:31:54 +0000 (13:31 +0000)
Merge in SNORT/snort3 from ~OSTEPANO/snort3:mp_transport_types to master

Squashed commit of the following:

commit f8c03a985161f9c8b3963064d136fd364936e74e
Author: Oleksandr Stepanov <ostepano@cisco.com>
Date:   Tue May 6 10:19:37 2025 -0400

    mp_data_bus: standartize data types

src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc
src/connectors/unixdomain_connector/unixdomain_connector.cc
src/framework/mp_data_bus.cc
src/framework/mp_data_bus.h
src/framework/mp_transport.h
src/framework/test/mp_data_bus_test.cc
src/managers/test/mp_transport_manager_test.cc
src/mp_transport/mp_unix_transport/mp_unix_transport.cc
src/mp_transport/mp_unix_transport/mp_unix_transport.h

index 269e54782f67492a67369f9ba7dc1f5a4612f885..3fd5723887d4ddbd910d984564b52fe99fb42180 100644 (file)
@@ -149,7 +149,7 @@ int bind (int, const struct sockaddr*, socklen_t) { return s_bind_return; }
 int listen (int, int) { return s_listen_return; }
 #endif
 static bool use_test_accept_counter = false;
-static uint test_accept_counter = 0;
+static uint32_t test_accept_counter = 0;
 int accept (int, struct sockaddr*, socklen_t*)
 {
     if ( use_test_accept_counter )
index 461df406cdccc0d1ce884b573031dd47579fef0a..ddfc6f036ee4e12605eeaaedb67396accc030d17 100644 (file)
@@ -584,7 +584,7 @@ void UnixDomainConnectorListener::start_accepting_connections(UnixDomainConnecto
             return;
         }
 
-        ushort error_count = 0;
+        unsigned short error_count = 0;
 
         while (should_accept) {
             if(error_count > 10)
index 71418dc5ca9b88a0937a04b03eae8462b91e5ef6..0b9f2f37ff035ae9a3e507a21eb58e056d397436 100644 (file)
@@ -43,8 +43,6 @@ using namespace snort;
 
 void MPDataBusLog(const char* msg, ...);
 
-std::condition_variable MPDataBus::queue_cv;
-std::mutex MPDataBus::queue_mutex;
 uint32_t MPDataBus::mp_max_eventq_size = DEFAULT_MAX_EVENTQ_SIZE;
 std::string MPDataBus::transport = DEFAULT_TRANSPORT;
 bool MPDataBus::enable_debug = false;
@@ -56,7 +54,7 @@ bool MPDataBus::hold_events = false;
 
 static std::unordered_map<std::string, unsigned> mp_pub_ids;
 static std::mutex mp_stats_mutex;
-static uint mp_current_process_id = 0;
+static uint32_t mp_current_process_id = 0;
 
 void MPDataBusLog(const char* msg, ...)
 {
@@ -76,7 +74,10 @@ void MPDataBusLog(const char* msg, ...)
 // public methods
 //-------------------------------------------------------------------------
 
-MPDataBus::MPDataBus() : run_thread(true)
+MPDataBus::MPDataBus() :
+    run_thread(true),
+    worker_thread(nullptr),
+    mp_event_queue(nullptr)
 {
     mp_event_queue = new Ring<std::shared_ptr<MPEventInfo>>(mp_max_eventq_size);
     start_worker_thread();
@@ -201,13 +202,12 @@ bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptr<DataEv
         return false;
     }
 
-    if (sc->mp_dbus->mp_event_queue != nullptr and !sc->mp_dbus->mp_event_queue->full() and !sc->mp_dbus->mp_event_queue->put(event_info)) {
+    if (!sc->mp_dbus->_enqueue_event(std::move(event_info)))
+    {
         ErrorMessage("MPDataBus: Failed to enqueue event for publisher ID %u and event ID %u\n", pub_id, evt_id);
         return false;
     }
 
-    queue_cv.notify_one();
-
     MPDataBusLog("Event published for publisher ID %u and event ID %u\n", pub_id, evt_id);
 
     return true;
@@ -268,9 +268,9 @@ void MPDataBus::process_event_queue()
 
     std::unique_lock<std::mutex> u_lock(queue_mutex);
 
-    queue_cv.wait_for(u_lock, std::chrono::milliseconds(WORKER_THREAD_SLEEP), [this]() {
-        return mp_event_queue != nullptr && !mp_event_queue->empty();
-    });
+    if( (std::cv_status::timeout == queue_cv.wait_for(u_lock, std::chrono::milliseconds(WORKER_THREAD_SLEEP))) and
+        mp_event_queue->empty() )
+        return;
 
     while (!mp_event_queue->empty()) {
         std::shared_ptr<MPEventInfo> event_info = mp_event_queue->get(nullptr);
@@ -403,7 +403,7 @@ void MPDataBus::dump_stats(ControlConn *ctrlconn, const char *module_name)
             auto transport_pegs = transport_module->get_pegs();
             if(transport_pegs)
             {
-                uint size = 0;
+                uint32_t size = 0;
                 while(transport_pegs[size].type != CountType::END)
                 {
                     size++;
@@ -418,7 +418,7 @@ void MPDataBus::dump_stats(ControlConn *ctrlconn, const char *module_name)
 void MPDataBus::dump_events(ControlConn *ctrlconn, const char *module_name)
 {
     int current_read_idx = 0;
-    uint ring_items = mp_event_queue->count();
+    uint32_t ring_items = mp_event_queue->count();
     if(ring_items == 0)
     {
         if (ctrlconn)
@@ -442,7 +442,7 @@ void MPDataBus::dump_events(ControlConn *ctrlconn, const char *module_name)
         current_read_idx--;
     }
 
-    for (uint i = current_read_idx; i <= ring_items; i++)
+    for (uint32_t i = current_read_idx; i <= ring_items; i++)
     {
         if(i >= mp_max_eventq_size)
         {
@@ -478,7 +478,7 @@ void snort::MPDataBus::show_channel_status(ControlConn *ctrlconn)
         return;
     }
 
-    uint size = 0;
+    unsigned int size = 0;
     auto transport_status = transport_layer->get_channel_status(size);
     if (size == 0)
     {
@@ -486,7 +486,7 @@ void snort::MPDataBus::show_channel_status(ControlConn *ctrlconn)
         return;
     }
     std::string response;
-    for (uint i = 0; i < size; i++)
+    for (unsigned int i = 0; i < size; i++)
     {
         const auto& channel = transport_status[i];
         response += "Channel ID: " + std::to_string(channel.id) + ", Name: " + channel.name + ", Status: " + channel.get_status_string() + "\n";
@@ -533,3 +533,9 @@ bool MPDataBus::_publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f)
     return true;
 }
 
+bool snort::MPDataBus::_enqueue_event(std::shared_ptr<MPEventInfo> ev_info)
+{
+    bool res = mp_event_queue != nullptr and !mp_event_queue->full() and mp_event_queue->put(std::move(ev_info));
+    if(res) queue_cv.notify_one();
+    return res;
+}
index 6638b643156512d38f2ff2b1d8e13a53ffc760db..bdfcfb68faa409401b0b4cc6fb30d08636e3b3a7 100644 (file)
@@ -183,6 +183,7 @@ private:
     void _subscribe(const PubKey& key, unsigned eid, DataHandler* h);
 
     bool _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f);
+    bool _enqueue_event(std::shared_ptr<MPEventInfo> ev_info);
 
 private:
     typedef std::vector<DataHandler*> SubList;
@@ -194,8 +195,8 @@ private:
 
     Ring<std::shared_ptr<MPEventInfo>>* mp_event_queue;
 
-    static std::condition_variable queue_cv;
-    static std::mutex queue_mutex;
+    std::condition_variable queue_cv;
+    std::mutex queue_mutex;
 
     std::unordered_map<unsigned, MPDataBusStats> mp_pub_stats;
 
index 882fb7fd5b942dd15103d8268ee56d8f26efbc96..5e9cfb941e65fdb6805000df3050ec37d224880a 100644 (file)
@@ -81,7 +81,7 @@ class MPTransport
     virtual void enable_logging() = 0;
     virtual void disable_logging() = 0;
     virtual bool is_logging_enabled() = 0;
-    virtual MPTransportChannelStatusHandle* get_channel_status(uint& size) = 0;
+    virtual MPTransportChannelStatusHandle* get_channel_status(unsigned& size) = 0;
 };
 
 
index 0dad5aae1ef0d6ab54fedf5f113ca26ebc462181..cb5fd40e6795794e5ddd33d57b67d365d1dd85d3 100644 (file)
@@ -160,7 +160,7 @@ public:
         return true;
     }
 
-    MPTransportChannelStatusHandle* get_channel_status(uint& size) override
+    MPTransportChannelStatusHandle* get_channel_status(unsigned int& size) override
     {
         size = 0;
         return nullptr;
index d4900d3ca3c2479b525a691c5726aaa4d5f0e5f3..01c74a218762aa38f37f607e8a22c9988fa55b42 100644 (file)
@@ -56,7 +56,7 @@ class MockTransport : public MPTransport
     { }
     virtual bool is_logging_enabled() override
     { return false; }
-    MPTransportChannelStatusHandle* get_channel_status(uint& size) override
+    MPTransportChannelStatusHandle* get_channel_status(unsigned int& size) override
     {
         size = 0;
         return nullptr;
index 783c69aa47a3ec1b073d249249efc3d716260716..ca550658ef25185c919e761215c8cfc860169d37 100644 (file)
 
 #include <cstring>
 #include <fcntl.h>
-#include <filesystem>
 #include <iostream>
 #include <poll.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <sys/stat.h>
 #include <unistd.h>
 
 #include "framework/mp_data_bus.h"
@@ -105,7 +105,7 @@ void MPUnixDomainTransport::side_channel_receive_handler(SCMessage* msg)
     delete msg;
 }
 
-void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector, UnixDomainConnectorConfig* cfg, const ushort& channel_id)
+void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector, UnixDomainConnectorConfig* cfg, const unsigned short& channel_id)
 {
     assert(connector);
     assert(cfg);
@@ -374,18 +374,17 @@ void MPUnixDomainTransport::init_side_channels()
 
     this->is_running = true;
 
-    if ( std::filesystem::is_directory(config->unix_domain_socket_path) == false )
+    struct stat st;
+    if (::stat(config->unix_domain_socket_path.c_str(), &st) != 0 || !S_ISDIR(st.st_mode))
     {
-        std::error_code ec;
-        std::filesystem::create_directories(config->unix_domain_socket_path, ec);
-        if (ec)
+        if (mkdir(config->unix_domain_socket_path.c_str(), 0755) != 0)
         {
             MPTransportLog("Failed to create directory %s\n", config->unix_domain_socket_path.c_str());
             return;
         }
     }
 
-    for (ushort i = instance_id; i < max_processes; i++)
+    for (unsigned short i = instance_id; i < max_processes; i++)
     {
         auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(i);
 
@@ -418,7 +417,7 @@ void MPUnixDomainTransport::init_side_channels()
         this->accept_handlers.push_back(unix_listener_handle);
     }
 
-    for (ushort i = 1; i < instance_id; i++)
+    for (unsigned short i = 1; i < instance_id; i++)
     {
         auto side_channel = new SideChannel(ScMsgFormat::BINARY);
         side_channel->register_receive_handler([this](SCMessage* msg) { this->side_channel_receive_handler(msg); });
@@ -447,7 +446,7 @@ void MPUnixDomainTransport::cleanup_side_channels()
     std::lock_guard<std::mutex> guard_send(_send_mutex);
     std::lock_guard<std::mutex> guard_read(_read_mutex);
 
-    for (uint i = 0; i < this->side_channels.size(); i++)
+    for (uint32_t i = 0; i < this->side_channels.size(); i++)
     {
         delete this->side_channels[i];
     }
@@ -484,7 +483,7 @@ bool MPUnixDomainTransport::is_logging_enabled()
     return this->is_logging_enabled_flag;
 }
 
-MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(uint &size)
+MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(unsigned& size)
 {
     std::lock_guard<std::mutex> guard_send(_send_mutex);
     std::lock_guard<std::mutex> guard_read(_read_mutex);
@@ -496,7 +495,7 @@ MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(uint &
     MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->side_channels.size()];
 
     size = this->side_channels.size();
-    uint it = 0;
+    unsigned int it = 0;
 
     for (auto &&sc_handler : this->side_channels)
     {
index 0c23eb087314d0b993b1876a2b7d45ddbcea481a..f3c687d090e274ceb8b324945d02583bda01b305 100644 (file)
@@ -83,7 +83,7 @@ struct SerializeFunctionHandle
 
 struct SideChannelHandle
 {
-    SideChannelHandle(SideChannel* sc, UnixDomainConnectorConfig* cc, const ushort& channel_id) :
+    SideChannelHandle(SideChannel* sc, UnixDomainConnectorConfig* cc, const unsigned short& channel_id) :
         side_channel(sc), connector_config(cc), channel_id(channel_id)
     { }
 
@@ -91,7 +91,7 @@ struct SideChannelHandle
 
     SideChannel* side_channel;
     UnixDomainConnectorConfig* connector_config;
-    ushort channel_id;
+    unsigned short channel_id;
 };
 
 struct UnixAcceptorHandle
@@ -119,7 +119,7 @@ class MPUnixDomainTransport : public MPTransport
     void disable_logging() override;
     bool is_logging_enabled() override;
     void cleanup();
-    MPTransportChannelStatusHandle* get_channel_status(uint& size) override;
+    MPTransportChannelStatusHandle* get_channel_status(unsigned& size) override;
 
     MPUnixDomainTransportConfig* get_config()
     { return config; }
@@ -130,7 +130,7 @@ class MPUnixDomainTransport : public MPTransport
     void init_side_channels();
     void cleanup_side_channels();
     void side_channel_receive_handler(SCMessage* msg);
-    void handle_new_connection(UnixDomainConnector* connector, UnixDomainConnectorConfig* cfg, const ushort& channel_id);
+    void handle_new_connection(UnixDomainConnector* connector, UnixDomainConnectorConfig* cfg, const unsigned short& channel_id);
     void process_messages_from_side_channels();
     void notify_process_thread();
     void connector_update_handler(UnixDomainConnector* connector, bool is_recconecting, SideChannel* side_channel);
@@ -139,7 +139,7 @@ class MPUnixDomainTransport : public MPTransport
     MPSerializeFunc get_event_serialization_function(unsigned pub_id, unsigned event_id);
     MPDeserializeFunc get_event_deserialization_function(unsigned pub_id, unsigned event_id);
 
-    uint mp_current_process_id = 0;
+    uint32_t mp_current_process_id = 0;
 
     TransportReceiveEventHandler transport_receive_handler = nullptr;
     MPUnixDomainTransportConfig* config = nullptr;