From: Oleksandr Stepanov -X (ostepano - SOFTSERVE INC at Cisco) Date: Wed, 7 May 2025 13:31:54 +0000 (+0000) Subject: Pull request #4725: mp_data_bus: standartize data types X-Git-Tag: 3.7.4.0~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9e44b21830aaa06a0156123e5777f1f9f37e04f6;p=thirdparty%2Fsnort3.git Pull request #4725: mp_data_bus: standartize data types Merge in SNORT/snort3 from ~OSTEPANO/snort3:mp_transport_types to master Squashed commit of the following: commit f8c03a985161f9c8b3963064d136fd364936e74e Author: Oleksandr Stepanov Date: Tue May 6 10:19:37 2025 -0400 mp_data_bus: standartize data types --- diff --git a/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc b/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc index 269e54782..3fd572388 100644 --- a/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc +++ b/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc @@ -149,7 +149,7 @@ int bind (int, const struct sockaddr*, socklen_t) { return s_bind_return; } int listen (int, int) { return s_listen_return; } #endif static bool use_test_accept_counter = false; -static uint test_accept_counter = 0; +static uint32_t test_accept_counter = 0; int accept (int, struct sockaddr*, socklen_t*) { if ( use_test_accept_counter ) diff --git a/src/connectors/unixdomain_connector/unixdomain_connector.cc b/src/connectors/unixdomain_connector/unixdomain_connector.cc index 461df406c..ddfc6f036 100644 --- a/src/connectors/unixdomain_connector/unixdomain_connector.cc +++ b/src/connectors/unixdomain_connector/unixdomain_connector.cc @@ -584,7 +584,7 @@ void UnixDomainConnectorListener::start_accepting_connections(UnixDomainConnecto return; } - ushort error_count = 0; + unsigned short error_count = 0; while (should_accept) { if(error_count > 10) diff --git a/src/framework/mp_data_bus.cc b/src/framework/mp_data_bus.cc index 71418dc5c..0b9f2f37f 100644 --- a/src/framework/mp_data_bus.cc +++ b/src/framework/mp_data_bus.cc @@ -43,8 +43,6 @@ using namespace snort; void MPDataBusLog(const char* msg, ...); -std::condition_variable MPDataBus::queue_cv; -std::mutex MPDataBus::queue_mutex; uint32_t MPDataBus::mp_max_eventq_size = DEFAULT_MAX_EVENTQ_SIZE; std::string MPDataBus::transport = DEFAULT_TRANSPORT; bool MPDataBus::enable_debug = false; @@ -56,7 +54,7 @@ bool MPDataBus::hold_events = false; static std::unordered_map mp_pub_ids; static std::mutex mp_stats_mutex; -static uint mp_current_process_id = 0; +static uint32_t mp_current_process_id = 0; void MPDataBusLog(const char* msg, ...) { @@ -76,7 +74,10 @@ void MPDataBusLog(const char* msg, ...) // public methods //------------------------------------------------------------------------- -MPDataBus::MPDataBus() : run_thread(true) +MPDataBus::MPDataBus() : + run_thread(true), + worker_thread(nullptr), + mp_event_queue(nullptr) { mp_event_queue = new Ring>(mp_max_eventq_size); start_worker_thread(); @@ -201,13 +202,12 @@ bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptrmp_dbus->mp_event_queue != nullptr and !sc->mp_dbus->mp_event_queue->full() and !sc->mp_dbus->mp_event_queue->put(event_info)) { + if (!sc->mp_dbus->_enqueue_event(std::move(event_info))) + { ErrorMessage("MPDataBus: Failed to enqueue event for publisher ID %u and event ID %u\n", pub_id, evt_id); return false; } - queue_cv.notify_one(); - MPDataBusLog("Event published for publisher ID %u and event ID %u\n", pub_id, evt_id); return true; @@ -268,9 +268,9 @@ void MPDataBus::process_event_queue() std::unique_lock u_lock(queue_mutex); - queue_cv.wait_for(u_lock, std::chrono::milliseconds(WORKER_THREAD_SLEEP), [this]() { - return mp_event_queue != nullptr && !mp_event_queue->empty(); - }); + if( (std::cv_status::timeout == queue_cv.wait_for(u_lock, std::chrono::milliseconds(WORKER_THREAD_SLEEP))) and + mp_event_queue->empty() ) + return; while (!mp_event_queue->empty()) { std::shared_ptr event_info = mp_event_queue->get(nullptr); @@ -403,7 +403,7 @@ void MPDataBus::dump_stats(ControlConn *ctrlconn, const char *module_name) auto transport_pegs = transport_module->get_pegs(); if(transport_pegs) { - uint size = 0; + uint32_t size = 0; while(transport_pegs[size].type != CountType::END) { size++; @@ -418,7 +418,7 @@ void MPDataBus::dump_stats(ControlConn *ctrlconn, const char *module_name) void MPDataBus::dump_events(ControlConn *ctrlconn, const char *module_name) { int current_read_idx = 0; - uint ring_items = mp_event_queue->count(); + uint32_t ring_items = mp_event_queue->count(); if(ring_items == 0) { if (ctrlconn) @@ -442,7 +442,7 @@ void MPDataBus::dump_events(ControlConn *ctrlconn, const char *module_name) current_read_idx--; } - for (uint i = current_read_idx; i <= ring_items; i++) + for (uint32_t i = current_read_idx; i <= ring_items; i++) { if(i >= mp_max_eventq_size) { @@ -478,7 +478,7 @@ void snort::MPDataBus::show_channel_status(ControlConn *ctrlconn) return; } - uint size = 0; + unsigned int size = 0; auto transport_status = transport_layer->get_channel_status(size); if (size == 0) { @@ -486,7 +486,7 @@ void snort::MPDataBus::show_channel_status(ControlConn *ctrlconn) return; } std::string response; - for (uint i = 0; i < size; i++) + for (unsigned int i = 0; i < size; i++) { const auto& channel = transport_status[i]; response += "Channel ID: " + std::to_string(channel.id) + ", Name: " + channel.name + ", Status: " + channel.get_status_string() + "\n"; @@ -533,3 +533,9 @@ bool MPDataBus::_publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f) return true; } +bool snort::MPDataBus::_enqueue_event(std::shared_ptr ev_info) +{ + bool res = mp_event_queue != nullptr and !mp_event_queue->full() and mp_event_queue->put(std::move(ev_info)); + if(res) queue_cv.notify_one(); + return res; +} diff --git a/src/framework/mp_data_bus.h b/src/framework/mp_data_bus.h index 6638b6431..bdfcfb68f 100644 --- a/src/framework/mp_data_bus.h +++ b/src/framework/mp_data_bus.h @@ -183,6 +183,7 @@ private: void _subscribe(const PubKey& key, unsigned eid, DataHandler* h); bool _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f); + bool _enqueue_event(std::shared_ptr ev_info); private: typedef std::vector SubList; @@ -194,8 +195,8 @@ private: Ring>* mp_event_queue; - static std::condition_variable queue_cv; - static std::mutex queue_mutex; + std::condition_variable queue_cv; + std::mutex queue_mutex; std::unordered_map mp_pub_stats; diff --git a/src/framework/mp_transport.h b/src/framework/mp_transport.h index 882fb7fd5..5e9cfb941 100644 --- a/src/framework/mp_transport.h +++ b/src/framework/mp_transport.h @@ -81,7 +81,7 @@ class MPTransport virtual void enable_logging() = 0; virtual void disable_logging() = 0; virtual bool is_logging_enabled() = 0; - virtual MPTransportChannelStatusHandle* get_channel_status(uint& size) = 0; + virtual MPTransportChannelStatusHandle* get_channel_status(unsigned& size) = 0; }; diff --git a/src/framework/test/mp_data_bus_test.cc b/src/framework/test/mp_data_bus_test.cc index 0dad5aae1..cb5fd40e6 100644 --- a/src/framework/test/mp_data_bus_test.cc +++ b/src/framework/test/mp_data_bus_test.cc @@ -160,7 +160,7 @@ public: return true; } - MPTransportChannelStatusHandle* get_channel_status(uint& size) override + MPTransportChannelStatusHandle* get_channel_status(unsigned int& size) override { size = 0; return nullptr; diff --git a/src/managers/test/mp_transport_manager_test.cc b/src/managers/test/mp_transport_manager_test.cc index d4900d3ca..01c74a218 100644 --- a/src/managers/test/mp_transport_manager_test.cc +++ b/src/managers/test/mp_transport_manager_test.cc @@ -56,7 +56,7 @@ class MockTransport : public MPTransport { } virtual bool is_logging_enabled() override { return false; } - MPTransportChannelStatusHandle* get_channel_status(uint& size) override + MPTransportChannelStatusHandle* get_channel_status(unsigned int& size) override { size = 0; return nullptr; diff --git a/src/mp_transport/mp_unix_transport/mp_unix_transport.cc b/src/mp_transport/mp_unix_transport/mp_unix_transport.cc index 783c69aa4..ca550658e 100644 --- a/src/mp_transport/mp_unix_transport/mp_unix_transport.cc +++ b/src/mp_transport/mp_unix_transport/mp_unix_transport.cc @@ -25,11 +25,11 @@ #include #include -#include #include #include #include #include +#include #include #include "framework/mp_data_bus.h" @@ -105,7 +105,7 @@ void MPUnixDomainTransport::side_channel_receive_handler(SCMessage* msg) delete msg; } -void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector, UnixDomainConnectorConfig* cfg, const ushort& channel_id) +void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector, UnixDomainConnectorConfig* cfg, const unsigned short& channel_id) { assert(connector); assert(cfg); @@ -374,18 +374,17 @@ void MPUnixDomainTransport::init_side_channels() this->is_running = true; - if ( std::filesystem::is_directory(config->unix_domain_socket_path) == false ) + struct stat st; + if (::stat(config->unix_domain_socket_path.c_str(), &st) != 0 || !S_ISDIR(st.st_mode)) { - std::error_code ec; - std::filesystem::create_directories(config->unix_domain_socket_path, ec); - if (ec) + if (mkdir(config->unix_domain_socket_path.c_str(), 0755) != 0) { MPTransportLog("Failed to create directory %s\n", config->unix_domain_socket_path.c_str()); return; } } - for (ushort i = instance_id; i < max_processes; i++) + for (unsigned short i = instance_id; i < max_processes; i++) { auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(i); @@ -418,7 +417,7 @@ void MPUnixDomainTransport::init_side_channels() this->accept_handlers.push_back(unix_listener_handle); } - for (ushort i = 1; i < instance_id; i++) + for (unsigned short i = 1; i < instance_id; i++) { auto side_channel = new SideChannel(ScMsgFormat::BINARY); side_channel->register_receive_handler([this](SCMessage* msg) { this->side_channel_receive_handler(msg); }); @@ -447,7 +446,7 @@ void MPUnixDomainTransport::cleanup_side_channels() std::lock_guard guard_send(_send_mutex); std::lock_guard guard_read(_read_mutex); - for (uint i = 0; i < this->side_channels.size(); i++) + for (uint32_t i = 0; i < this->side_channels.size(); i++) { delete this->side_channels[i]; } @@ -484,7 +483,7 @@ bool MPUnixDomainTransport::is_logging_enabled() return this->is_logging_enabled_flag; } -MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(uint &size) +MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(unsigned& size) { std::lock_guard guard_send(_send_mutex); std::lock_guard guard_read(_read_mutex); @@ -496,7 +495,7 @@ MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(uint & MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->side_channels.size()]; size = this->side_channels.size(); - uint it = 0; + unsigned int it = 0; for (auto &&sc_handler : this->side_channels) { diff --git a/src/mp_transport/mp_unix_transport/mp_unix_transport.h b/src/mp_transport/mp_unix_transport/mp_unix_transport.h index 0c23eb087..f3c687d09 100644 --- a/src/mp_transport/mp_unix_transport/mp_unix_transport.h +++ b/src/mp_transport/mp_unix_transport/mp_unix_transport.h @@ -83,7 +83,7 @@ struct SerializeFunctionHandle struct SideChannelHandle { - SideChannelHandle(SideChannel* sc, UnixDomainConnectorConfig* cc, const ushort& channel_id) : + SideChannelHandle(SideChannel* sc, UnixDomainConnectorConfig* cc, const unsigned short& channel_id) : side_channel(sc), connector_config(cc), channel_id(channel_id) { } @@ -91,7 +91,7 @@ struct SideChannelHandle SideChannel* side_channel; UnixDomainConnectorConfig* connector_config; - ushort channel_id; + unsigned short channel_id; }; struct UnixAcceptorHandle @@ -119,7 +119,7 @@ class MPUnixDomainTransport : public MPTransport void disable_logging() override; bool is_logging_enabled() override; void cleanup(); - MPTransportChannelStatusHandle* get_channel_status(uint& size) override; + MPTransportChannelStatusHandle* get_channel_status(unsigned& size) override; MPUnixDomainTransportConfig* get_config() { return config; } @@ -130,7 +130,7 @@ class MPUnixDomainTransport : public MPTransport void init_side_channels(); void cleanup_side_channels(); void side_channel_receive_handler(SCMessage* msg); - void handle_new_connection(UnixDomainConnector* connector, UnixDomainConnectorConfig* cfg, const ushort& channel_id); + void handle_new_connection(UnixDomainConnector* connector, UnixDomainConnectorConfig* cfg, const unsigned short& channel_id); void process_messages_from_side_channels(); void notify_process_thread(); void connector_update_handler(UnixDomainConnector* connector, bool is_recconecting, SideChannel* side_channel); @@ -139,7 +139,7 @@ class MPUnixDomainTransport : public MPTransport MPSerializeFunc get_event_serialization_function(unsigned pub_id, unsigned event_id); MPDeserializeFunc get_event_deserialization_function(unsigned pub_id, unsigned event_id); - uint mp_current_process_id = 0; + uint32_t mp_current_process_id = 0; TransportReceiveEventHandler transport_receive_handler = nullptr; MPUnixDomainTransportConfig* config = nullptr;