From 563019254fecad2d56db2137b5f47430df534a31 Mon Sep 17 00:00:00 2001 From: "Oleksandr Stepanov -X (ostepano - SOFTSERVE INC at Cisco)" Date: Tue, 3 Jun 2025 20:49:54 +0000 Subject: [PATCH] Pull request #4757: mp_unix_transport: added reset stats handling Merge in SNORT/snort3 from ~OSTEPANO/snort3:transport_opt to master Squashed commit of the following: commit 85abeddb909fee7f7107f6ff049004c5713840d6 Author: Oleksandr Stepanov Date: Mon May 12 05:50:44 2025 -0400 mp_unix_transport: use shared mutex in message processing --- .../test/unixdomain_connector_test.cc | 4 +- .../unixdomain_connector.cc | 9 +- src/framework/mp_data_bus.cc | 26 +++++ src/framework/mp_data_bus.h | 1 + src/framework/mp_transport.h | 2 + src/main.cc | 1 + src/main/analyzer_command.h | 4 +- src/main/modules.cc | 7 ++ .../mp_unix_transport/mp_unix_transport.cc | 100 ++++++++++-------- .../mp_unix_transport/mp_unix_transport.h | 31 +++++- .../mp_unix_transport_module.cc | 17 ++- .../mp_unix_transport_module.h | 6 +- .../test/unix_transport_module_test.cc | 8 ++ 13 files changed, 165 insertions(+), 51 deletions(-) diff --git a/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc b/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc index f9d5c2d17..81f466349 100644 --- a/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc +++ b/src/connectors/unixdomain_connector/test/unixdomain_connector_test.cc @@ -888,7 +888,9 @@ TEST(unixdomain_connector_reconnect_helper, connect_then_reconnect_call) auto tmp_test_connector = test_reconnect_connector; //trigger the reconnect - test_reconnect_connector->process_receive(); + test_reconnect_connector->start_receive_thread(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); //collapse the reconnect_helper joining reconnect thread delete reconnect_helper; diff --git a/src/connectors/unixdomain_connector/unixdomain_connector.cc b/src/connectors/unixdomain_connector/unixdomain_connector.cc index 97bb769d2..ab2769059 100644 --- a/src/connectors/unixdomain_connector/unixdomain_connector.cc +++ b/src/connectors/unixdomain_connector/unixdomain_connector.cc @@ -295,6 +295,13 @@ void UnixDomainConnector::process_receive() { } else if ((pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)) != 0) { + if (run_thread.load() == false) + { + close(sock_fd); + sock_fd = -1; + return; + } + ErrorMessage("UnixDomainC Input Thread: Undesirable return event while polling on socket %d: 0x%x\n", pfds[0].fd, pfds[0].revents); @@ -349,8 +356,8 @@ void UnixDomainConnector::start_receive_thread() { void UnixDomainConnector::stop_receive_thread() { + run_thread.store(false, std::memory_order_relaxed); if (receive_thread != nullptr) { - run_thread.store(false, std::memory_order_relaxed); if (receive_thread->joinable()) { receive_thread->join(); } diff --git a/src/framework/mp_data_bus.cc b/src/framework/mp_data_bus.cc index 0b9f2f37f..ed907dd0f 100644 --- a/src/framework/mp_data_bus.cc +++ b/src/framework/mp_data_bus.cc @@ -375,6 +375,32 @@ void MPDataBus::sum_stats() } } +void MPDataBus::reset_stats() +{ + std::lock_guard lock(mp_stats_mutex); + + for(auto& [_, pub_stats] : mp_pub_stats) + { + pub_stats.total_messages_sent = 0; + pub_stats.total_messages_received = 0; + pub_stats.total_messages_dropped = 0; + pub_stats.total_messages_published = 0; + pub_stats.total_messages_delivered = 0; + } + + mp_global_stats.total_messages_sent = 0; + mp_global_stats.total_messages_received = 0; + mp_global_stats.total_messages_dropped = 0; + mp_global_stats.total_messages_published = 0; + mp_global_stats.total_messages_delivered = 0; + + auto transport_module = ModuleManager::get_module(transport.c_str()); + if (transport_module) + { + transport_module->reset_stats(); + } +} + void MPDataBus::dump_stats(ControlConn *ctrlconn, const char *module_name) { set_log_conn(ctrlconn); diff --git a/src/framework/mp_data_bus.h b/src/framework/mp_data_bus.h index bdfcfb68f..d72a0101f 100644 --- a/src/framework/mp_data_bus.h +++ b/src/framework/mp_data_bus.h @@ -173,6 +173,7 @@ public: void set_debug_enabled(bool flag); void sum_stats(); + void reset_stats(); void dump_stats(ControlConn* ctrlconn, const char* module_name); void dump_events(ControlConn* ctrlconn, const char* module_name); diff --git a/src/framework/mp_transport.h b/src/framework/mp_transport.h index 5e9cfb941..0ff1baa43 100644 --- a/src/framework/mp_transport.h +++ b/src/framework/mp_transport.h @@ -40,6 +40,7 @@ typedef std::function TransportReceiveEven enum MPTransportChannelStatus { DISCONNECTED = 0, + ACCEPTING, CONNECTING, CONNECTED, MAX @@ -58,6 +59,7 @@ struct MPTransportChannelStatusHandle case DISCONNECTED: return "DISCONNECTED"; case CONNECTING: return "CONNECTING"; case CONNECTED: return "CONNECTED"; + case ACCEPTING: return "ACCEPTING"; default: return "UNKNOWN"; } } diff --git a/src/main.cc b/src/main.cc index 4e5e0e91c..252d563e7 100644 --- a/src/main.cc +++ b/src/main.cc @@ -96,6 +96,7 @@ static const std::map counter_name_to_id = {"file_id", clear_counter_type_t::TYPE_FILE_ID}, {"snort", clear_counter_type_t::TYPE_SNORT}, {"ha", clear_counter_type_t::TYPE_HA}, + {"messaging", clear_counter_type_t::TYPE_MESSAGING}, {"all", clear_counter_type_t::TYPE_ALL} }; diff --git a/src/main/analyzer_command.h b/src/main/analyzer_command.h index df5c7ab26..03e670e8f 100644 --- a/src/main/analyzer_command.h +++ b/src/main/analyzer_command.h @@ -85,6 +85,7 @@ typedef enum clear_counter_type TYPE_FILE_ID, TYPE_SNORT, TYPE_HA, + TYPE_MESSAGING, TYPE_ALL } clear_counter_type_t; @@ -98,7 +99,8 @@ static std::vector clear_counter_type_string_map "appid", "file_id", "snort", - "high_availability" + "high_availability", + "mp_data_bus" }; class ACResetStats : public snort::AnalyzerCommand diff --git a/src/main/modules.cc b/src/main/modules.cc index 767f6e21b..ed9da58e1 100644 --- a/src/main/modules.cc +++ b/src/main/modules.cc @@ -502,6 +502,7 @@ public: const Command* get_commands() const override; const PegInfo* get_pegs() const override; PegCount* get_counts() const override; + void reset_stats() override; Usage get_usage() const override { return GLOBAL; } @@ -561,6 +562,12 @@ PegCount* MPDataBusModule::get_counts() const SnortConfig::get_conf()->mp_dbus->sum_stats(); return (PegCount*)&MPDataBus::mp_global_stats; } + +void MPDataBusModule::reset_stats() +{ + if(SnortConfig::get_conf()->mp_dbus) + SnortConfig::get_conf()->mp_dbus->reset_stats(); +} //------------------------------------------------------------------------- // reference module //------------------------------------------------------------------------- diff --git a/src/mp_transport/mp_unix_transport/mp_unix_transport.cc b/src/mp_transport/mp_unix_transport/mp_unix_transport.cc index c9c8354ef..233904357 100644 --- a/src/mp_transport/mp_unix_transport/mp_unix_transport.cc +++ b/src/mp_transport/mp_unix_transport/mp_unix_transport.cc @@ -31,6 +31,7 @@ #include #include #include +#include #include "framework/mp_data_bus.h" #include "log/messages.h" @@ -38,8 +39,7 @@ #include "main/snort_config.h" static std::mutex _receive_mutex; -static std::mutex _send_mutex; -static std::mutex _read_mutex; +static std::shared_mutex _connection_update_mutex; #define UNIX_SOCKET_NAME_PREFIX "/snort_unix_connector_" #define MP_TRANSPORT_LOG_LABEL "MPUnixTransportDbg" @@ -99,8 +99,8 @@ void MPUnixDomainTransport::side_channel_receive_handler(SCMessage* msg) MPEventInfo event(std::shared_ptr (internal_event), transport_message_header->event_id, transport_message_header->pub_id); (transport_receive_handler)(event); - transport_stats.received_events++; - transport_stats.received_bytes += sizeof(MPTransportMessageHeader) + transport_message_header->data_length; + dynamic_transport_stats.received_events++; + dynamic_transport_stats.received_bytes += sizeof(MPTransportMessageHeader) + transport_message_header->data_length; } delete msg; } @@ -110,20 +110,19 @@ void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector assert(connector); assert(cfg); - std::lock_guard guard_send(_send_mutex); - std::lock_guard guard_read(_read_mutex); + std::lock_guard guard(_connection_update_mutex); if(!this->is_running.load()) return; - transport_stats.successful_connections++; + dynamic_transport_stats.successful_connections++; auto side_channel = new SideChannel(ScMsgFormat::BINARY); side_channel->connector_receive = connector; side_channel->connector_transmit = side_channel->connector_receive; side_channel->register_receive_handler(std::bind(&MPUnixDomainTransport::side_channel_receive_handler, this, std::placeholders::_1)); connector->set_message_received_handler(std::bind(&MPUnixDomainTransport::notify_process_thread, this)); - this->side_channels.push_back(new SideChannelHandle(side_channel, cfg, channel_id)); + this->side_channels.push_back(new SideChannelHandle(side_channel, cfg, channel_id+this->side_channels.size()-mp_current_process_id+1)); connector->set_update_handler(std::bind(&MPUnixDomainTransport::connector_update_handler, this, std::placeholders::_1, std::placeholders::_2, side_channel)); connector->start_receive_thread(); } @@ -145,7 +144,7 @@ bool MPUnixDomainTransport::send_to_transport(MPEventInfo &event) if (!serialize_func) { - transport_stats.send_errors++; + dynamic_transport_stats.send_errors++; MPTransportLog("No serialize function found for event %d\n", event.type); return false; } @@ -158,7 +157,7 @@ bool MPUnixDomainTransport::send_to_transport(MPEventInfo &event) (serialize_func)(event.event.get(), transport_message.data, &transport_message.header.data_length); { - std::lock_guard guard(_send_mutex); + std::shared_lock guard(_connection_update_mutex); for (auto &&sc_handler : this->side_channels) { @@ -169,12 +168,12 @@ bool MPUnixDomainTransport::send_to_transport(MPEventInfo &event) if (!send_result) { MPTransportLog("Failed to send message to side channel\n"); - transport_stats.send_errors++; + dynamic_transport_stats.send_errors++; } else { - transport_stats.sent_events++; - transport_stats.sent_bytes += sizeof(MPTransportMessageHeader) + transport_message.header.data_length; + dynamic_transport_stats.sent_events++; + dynamic_transport_stats.sent_bytes += sizeof(MPTransportMessageHeader) + transport_message.header.data_length; } } } @@ -215,7 +214,7 @@ void MPUnixDomainTransport::process_messages_from_side_channels() } { - std::lock_guard guard(_read_mutex); + std::shared_lock guard(_connection_update_mutex); bool messages_left; do @@ -241,12 +240,10 @@ void MPUnixDomainTransport::notify_process_thread() void MPUnixDomainTransport::connector_update_handler(UnixDomainConnector *connector, bool is_reconecting, SideChannel *side_channel) { - std::lock_guard guard_send(_send_mutex); - std::lock_guard guard_read(_read_mutex); - - if(!this->is_running.load()) + if (this->is_running == false) return; - + std::lock_guard guard(_connection_update_mutex); + if (side_channel->connector_receive) { delete side_channel->connector_receive; @@ -258,7 +255,7 @@ void MPUnixDomainTransport::connector_update_handler(UnixDomainConnector *connec connector->set_message_received_handler(std::bind(&MPUnixDomainTransport::notify_process_thread, this)); side_channel->connector_receive = side_channel->connector_transmit = connector; connector->start_receive_thread(); - this->transport_stats.successful_connections++; + this->dynamic_transport_stats.successful_connections++; } else { @@ -274,11 +271,11 @@ void MPUnixDomainTransport::connector_update_handler(UnixDomainConnector *connec break; } } - this->transport_stats.closed_connections++; + this->dynamic_transport_stats.closed_connections++; } else { - this->transport_stats.connection_retries++; + this->dynamic_transport_stats.connection_retries++; } } } @@ -302,13 +299,13 @@ MPSerializeFunc MPUnixDomainTransport::get_event_serialization_function(unsigned auto helper_it = this->event_helpers.find(pub_id); if (helper_it == this->event_helpers.end()) { - MPTransportLog("%s: No available helper functions is registered for %d\n", pub_id); + MPTransportLog("No available helper functions is registered for %d\n", pub_id); return nullptr; } auto helper_functions = helper_it->second.get_function_set(event_id); if (!helper_functions) { - MPTransportLog("%s: No serialize function found for event %d\n", event_id); + MPTransportLog("No serialize function found for event %d\n", event_id); return nullptr; } return helper_functions->serializer; @@ -382,7 +379,6 @@ void MPUnixDomainTransport::init_side_channels() return; auto instance_id = mp_current_process_id = Snort::get_process_id();//Snort instance id - auto max_processes = config->max_processes; this->is_running = true; @@ -396,9 +392,9 @@ void MPUnixDomainTransport::init_side_channels() } } - for (unsigned short i = instance_id; i < max_processes; i++) + if (instance_id < config->max_processes) { - auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(i); + auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(instance_id); auto unix_listener = new UnixDomainConnectorListener(listen_path.c_str()); @@ -421,8 +417,8 @@ void MPUnixDomainTransport::init_side_channels() } unix_config->paths.push_back(listen_path); - unix_listener->start_accepting_connections( std::bind(&MPUnixDomainTransport::handle_new_connection, this, std::placeholders::_1, std::placeholders::_2, instance_id + i), unix_config); - + unix_listener->start_accepting_connections( std::bind(&MPUnixDomainTransport::handle_new_connection, this, std::placeholders::_1, std::placeholders::_2, instance_id+1), unix_config); + auto unix_listener_handle = new UnixAcceptorHandle(); unix_listener_handle->connector_config = unix_config; unix_listener_handle->listener = unix_listener; @@ -459,8 +455,7 @@ void MPUnixDomainTransport::init_side_channels() void MPUnixDomainTransport::cleanup_side_channels() { - std::lock_guard guard_send(_send_mutex); - std::lock_guard guard_read(_read_mutex); + std::lock_guard guard(_connection_update_mutex); for (uint32_t i = 0; i < this->side_channels.size(); i++) { @@ -505,26 +500,41 @@ bool MPUnixDomainTransport::is_logging_enabled() return this->is_logging_enabled_flag; } +void MPUnixDomainTransport::sum_stats() +{ + std::lock_guard _guard(_connection_update_mutex); + this->transport_stats = this->dynamic_transport_stats; +} + +void MPUnixDomainTransport::reset_stats() +{ + std::lock_guard _guard(_connection_update_mutex); + this->dynamic_transport_stats = MPUnixTransportStats(); + this->transport_stats = MPUnixTransportStats(); +} + MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(unsigned& size) { - std::lock_guard guard_send(_send_mutex); - std::lock_guard guard_read(_read_mutex); - if (this->side_channels.size() == 0) - { - size = 0; - return nullptr; - } - MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->side_channels.size()]; + std::shared_lock _guard(_connection_update_mutex); + MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->config->max_processes-1]; - size = this->side_channels.size(); - unsigned int it = 0; + size = this->config->max_processes-1; for (auto &&sc_handler : this->side_channels) { - result[it].id = sc_handler->channel_id; - result[it].status = sc_handler->side_channel->connector_receive ? MPTransportChannelStatus::CONNECTED : MPTransportChannelStatus::CONNECTING; - result[it].name = "Snort connection to " + std::to_string(sc_handler->channel_id) + " instance"; - it++; + unsigned short idx = sc_handler->channel_id > mp_current_process_id ? sc_handler->channel_id-2 : sc_handler->channel_id-1; + result[idx].id = sc_handler->channel_id; + result[idx].status = sc_handler->side_channel->connector_receive ? MPTransportChannelStatus::CONNECTED : MPTransportChannelStatus::CONNECTING; + result[idx].name = "Snort connection to instance " + std::to_string(sc_handler->channel_id); + } + + for(uint16_t it = 0; it < this->config->max_processes-1; it++) + { + if(result[it].id != 0) + continue; + result[it].id = it + 2 > mp_current_process_id ? it + 2 : it + 1; + result[it].status = result[it].id > mp_current_process_id ? MPTransportChannelStatus::ACCEPTING : MPTransportChannelStatus::DISCONNECTED; + result[it].name = "Snort connection to instance " + std::to_string(result[it].id); } return result; diff --git a/src/mp_transport/mp_unix_transport/mp_unix_transport.h b/src/mp_transport/mp_unix_transport/mp_unix_transport.h index 5d69dbe10..199fbf9fe 100644 --- a/src/mp_transport/mp_unix_transport/mp_unix_transport.h +++ b/src/mp_transport/mp_unix_transport/mp_unix_transport.h @@ -44,6 +44,31 @@ struct MPUnixTransportStats closed_connections(0), connection_retries(0) { } + + MPUnixTransportStats(const MPUnixTransportStats& other) : + sent_events(other.sent_events), + sent_bytes(other.sent_bytes), + received_events(other.received_events), + received_bytes(other.received_bytes), + send_errors(other.send_errors), + successful_connections(other.successful_connections), + closed_connections(other.closed_connections), + connection_retries(other.connection_retries) + { } + + MPUnixTransportStats& operator=(const MPUnixTransportStats& other) + { + sent_events = other.sent_events; + sent_bytes = other.sent_bytes; + received_events = other.received_events; + received_bytes = other.received_bytes; + send_errors = other.send_errors; + successful_connections = other.successful_connections; + closed_connections = other.closed_connections; + connection_retries = other.connection_retries; + + return *this; + } PegCount sent_events; PegCount sent_bytes; @@ -127,6 +152,9 @@ class MPUnixDomainTransport : public MPTransport { return config; } + void sum_stats(); + void reset_stats(); + private: void init_side_channels(); @@ -141,7 +169,7 @@ class MPUnixDomainTransport : public MPTransport MPSerializeFunc get_event_serialization_function(unsigned pub_id, unsigned event_id); MPDeserializeFunc get_event_deserialization_function(unsigned pub_id, unsigned event_id); - uint32_t mp_current_process_id = 0; + uint16_t mp_current_process_id = 0; TransportReceiveEventHandler transport_receive_handler = nullptr; MPUnixDomainTransportConfig* config = nullptr; @@ -157,6 +185,7 @@ class MPUnixDomainTransport : public MPTransport std::thread* consume_thread = nullptr; std::condition_variable consume_thread_cv; + MPUnixTransportStats dynamic_transport_stats; MPUnixTransportStats& transport_stats; }; diff --git a/src/mp_transport/mp_unix_transport/mp_unix_transport_module.cc b/src/mp_transport/mp_unix_transport/mp_unix_transport_module.cc index f01109f20..454830dae 100644 --- a/src/mp_transport/mp_unix_transport/mp_unix_transport_module.cc +++ b/src/mp_transport/mp_unix_transport/mp_unix_transport_module.cc @@ -65,7 +65,7 @@ bool MPUnixDomainTransportModule::begin(const char *, int, SnortConfig *sc) { assert(sc); assert(!config); - config = new MPUnixDomainTransportConfig; + config = new MPUnixDomainTransportConfig; config->max_processes = sc->max_procs; return true; } @@ -117,9 +117,24 @@ const PegInfo *MPUnixDomainTransportModule::get_pegs() const PegCount *MPUnixDomainTransportModule::get_counts() const { + if (transport_handle) + { + transport_handle->sum_stats(); + } + return (PegCount*)&unix_transport_stats; } +void MPUnixDomainTransportModule::reset_stats() +{ + unix_transport_stats = MPUnixTransportStats(); + if (transport_handle) + { + transport_handle->reset_stats(); + } + Module::reset_stats(); +} + static struct MPTransportApi mp_unixdomain_transport_api = { { diff --git a/src/mp_transport/mp_unix_transport/mp_unix_transport_module.h b/src/mp_transport/mp_unix_transport/mp_unix_transport_module.h index fae5bbc17..0c98ff7a3 100644 --- a/src/mp_transport/mp_unix_transport/mp_unix_transport_module.h +++ b/src/mp_transport/mp_unix_transport/mp_unix_transport_module.h @@ -44,12 +44,15 @@ class MPUnixDomainTransportModule : public Module const PegInfo* get_pegs() const override; PegCount* get_counts() const override; + bool global_stats() const override { return true; } + void reset_stats() override; Usage get_usage() const override { return GLOBAL; } MPUnixDomainTransportConfig* config; MPUnixTransportStats unix_transport_stats; + MPUnixDomainTransport* transport_handle = nullptr; }; static Module* mod_ctor() @@ -65,7 +68,8 @@ static void mod_dtor(Module* m) static MPTransport* mp_unixdomain_transport_ctor(Module* m) { auto unix_tr_mod = (MPUnixDomainTransportModule*)m; - return new MPUnixDomainTransport(unix_tr_mod->config, unix_tr_mod->unix_transport_stats); + unix_tr_mod->transport_handle = new MPUnixDomainTransport(unix_tr_mod->config, unix_tr_mod->unix_transport_stats); + return unix_tr_mod->transport_handle; } static void mp_unixdomain_transport_dtor(MPTransport* t) diff --git a/src/mp_transport/mp_unix_transport/test/unix_transport_module_test.cc b/src/mp_transport/mp_unix_transport/test/unix_transport_module_test.cc index 9f26f4d35..d069929eb 100644 --- a/src/mp_transport/mp_unix_transport/test/unix_transport_module_test.cc +++ b/src/mp_transport/mp_unix_transport/test/unix_transport_module_test.cc @@ -76,6 +76,14 @@ namespace snort size = 0; return nullptr; } + void MPUnixDomainTransport::reset_stats() + { + transport_stats = MPUnixTransportStats(); + } + void MPUnixDomainTransport::sum_stats() + { + + } char* snort_strdup(const char*) { -- 2.47.3