#include <sys/un.h>
#include <sys/stat.h>
#include <unistd.h>
+#include <shared_mutex>
#include "framework/mp_data_bus.h"
#include "log/messages.h"
#include "main/snort_config.h"
static std::mutex _receive_mutex;
-static std::mutex _send_mutex;
-static std::mutex _read_mutex;
+static std::shared_mutex _connection_update_mutex;
#define UNIX_SOCKET_NAME_PREFIX "/snort_unix_connector_"
#define MP_TRANSPORT_LOG_LABEL "MPUnixTransportDbg"
MPEventInfo event(std::shared_ptr<DataEvent> (internal_event), transport_message_header->event_id, transport_message_header->pub_id);
(transport_receive_handler)(event);
- transport_stats.received_events++;
- transport_stats.received_bytes += sizeof(MPTransportMessageHeader) + transport_message_header->data_length;
+ dynamic_transport_stats.received_events++;
+ dynamic_transport_stats.received_bytes += sizeof(MPTransportMessageHeader) + transport_message_header->data_length;
}
delete msg;
}
assert(connector);
assert(cfg);
- std::lock_guard<std::mutex> guard_send(_send_mutex);
- std::lock_guard<std::mutex> guard_read(_read_mutex);
+ std::lock_guard<std::shared_mutex> guard(_connection_update_mutex);
if(!this->is_running.load())
return;
- transport_stats.successful_connections++;
+ dynamic_transport_stats.successful_connections++;
auto side_channel = new SideChannel(ScMsgFormat::BINARY);
side_channel->connector_receive = connector;
side_channel->connector_transmit = side_channel->connector_receive;
side_channel->register_receive_handler(std::bind(&MPUnixDomainTransport::side_channel_receive_handler, this, std::placeholders::_1));
connector->set_message_received_handler(std::bind(&MPUnixDomainTransport::notify_process_thread, this));
- this->side_channels.push_back(new SideChannelHandle(side_channel, cfg, channel_id));
+ this->side_channels.push_back(new SideChannelHandle(side_channel, cfg, channel_id+this->side_channels.size()-mp_current_process_id+1));
connector->set_update_handler(std::bind(&MPUnixDomainTransport::connector_update_handler, this, std::placeholders::_1, std::placeholders::_2, side_channel));
connector->start_receive_thread();
}
if (!serialize_func)
{
- transport_stats.send_errors++;
+ dynamic_transport_stats.send_errors++;
MPTransportLog("No serialize function found for event %d\n", event.type);
return false;
}
(serialize_func)(event.event.get(), transport_message.data, &transport_message.header.data_length);
{
- std::lock_guard<std::mutex> guard(_send_mutex);
+ std::shared_lock<std::shared_mutex> guard(_connection_update_mutex);
for (auto &&sc_handler : this->side_channels)
{
if (!send_result)
{
MPTransportLog("Failed to send message to side channel\n");
- transport_stats.send_errors++;
+ dynamic_transport_stats.send_errors++;
}
else
{
- transport_stats.sent_events++;
- transport_stats.sent_bytes += sizeof(MPTransportMessageHeader) + transport_message.header.data_length;
+ dynamic_transport_stats.sent_events++;
+ dynamic_transport_stats.sent_bytes += sizeof(MPTransportMessageHeader) + transport_message.header.data_length;
}
}
}
}
{
- std::lock_guard<std::mutex> guard(_read_mutex);
+ std::shared_lock<std::shared_mutex> guard(_connection_update_mutex);
bool messages_left;
do
void MPUnixDomainTransport::connector_update_handler(UnixDomainConnector *connector, bool is_reconecting, SideChannel *side_channel)
{
- std::lock_guard<std::mutex> guard_send(_send_mutex);
- std::lock_guard<std::mutex> guard_read(_read_mutex);
-
- if(!this->is_running.load())
+ if (this->is_running == false)
return;
-
+ std::lock_guard<std::shared_mutex> guard(_connection_update_mutex);
+
if (side_channel->connector_receive)
{
delete side_channel->connector_receive;
connector->set_message_received_handler(std::bind(&MPUnixDomainTransport::notify_process_thread, this));
side_channel->connector_receive = side_channel->connector_transmit = connector;
connector->start_receive_thread();
- this->transport_stats.successful_connections++;
+ this->dynamic_transport_stats.successful_connections++;
}
else
{
break;
}
}
- this->transport_stats.closed_connections++;
+ this->dynamic_transport_stats.closed_connections++;
}
else
{
- this->transport_stats.connection_retries++;
+ this->dynamic_transport_stats.connection_retries++;
}
}
}
auto helper_it = this->event_helpers.find(pub_id);
if (helper_it == this->event_helpers.end())
{
- MPTransportLog("%s: No available helper functions is registered for %d\n", pub_id);
+ MPTransportLog("No available helper functions is registered for %d\n", pub_id);
return nullptr;
}
auto helper_functions = helper_it->second.get_function_set(event_id);
if (!helper_functions)
{
- MPTransportLog("%s: No serialize function found for event %d\n", event_id);
+ MPTransportLog("No serialize function found for event %d\n", event_id);
return nullptr;
}
return helper_functions->serializer;
return;
auto instance_id = mp_current_process_id = Snort::get_process_id();//Snort instance id
- auto max_processes = config->max_processes;
this->is_running = true;
}
}
- for (unsigned short i = instance_id; i < max_processes; i++)
+ if (instance_id < config->max_processes)
{
- auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(i);
+ auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(instance_id);
auto unix_listener = new UnixDomainConnectorListener(listen_path.c_str());
}
unix_config->paths.push_back(listen_path);
- unix_listener->start_accepting_connections( std::bind(&MPUnixDomainTransport::handle_new_connection, this, std::placeholders::_1, std::placeholders::_2, instance_id + i), unix_config);
-
+ unix_listener->start_accepting_connections( std::bind(&MPUnixDomainTransport::handle_new_connection, this, std::placeholders::_1, std::placeholders::_2, instance_id+1), unix_config);
+
auto unix_listener_handle = new UnixAcceptorHandle();
unix_listener_handle->connector_config = unix_config;
unix_listener_handle->listener = unix_listener;
void MPUnixDomainTransport::cleanup_side_channels()
{
- std::lock_guard<std::mutex> guard_send(_send_mutex);
- std::lock_guard<std::mutex> guard_read(_read_mutex);
+ std::lock_guard<std::shared_mutex> guard(_connection_update_mutex);
for (uint32_t i = 0; i < this->side_channels.size(); i++)
{
return this->is_logging_enabled_flag;
}
+void MPUnixDomainTransport::sum_stats()
+{
+ std::lock_guard<std::shared_mutex> _guard(_connection_update_mutex);
+ this->transport_stats = this->dynamic_transport_stats;
+}
+
+void MPUnixDomainTransport::reset_stats()
+{
+ std::lock_guard<std::shared_mutex> _guard(_connection_update_mutex);
+ this->dynamic_transport_stats = MPUnixTransportStats();
+ this->transport_stats = MPUnixTransportStats();
+}
+
MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(unsigned& size)
{
- std::lock_guard<std::mutex> guard_send(_send_mutex);
- std::lock_guard<std::mutex> guard_read(_read_mutex);
- if (this->side_channels.size() == 0)
- {
- size = 0;
- return nullptr;
- }
- MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->side_channels.size()];
+ std::shared_lock<std::shared_mutex> _guard(_connection_update_mutex);
+ MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->config->max_processes-1];
- size = this->side_channels.size();
- unsigned int it = 0;
+ size = this->config->max_processes-1;
for (auto &&sc_handler : this->side_channels)
{
- result[it].id = sc_handler->channel_id;
- result[it].status = sc_handler->side_channel->connector_receive ? MPTransportChannelStatus::CONNECTED : MPTransportChannelStatus::CONNECTING;
- result[it].name = "Snort connection to " + std::to_string(sc_handler->channel_id) + " instance";
- it++;
+ unsigned short idx = sc_handler->channel_id > mp_current_process_id ? sc_handler->channel_id-2 : sc_handler->channel_id-1;
+ result[idx].id = sc_handler->channel_id;
+ result[idx].status = sc_handler->side_channel->connector_receive ? MPTransportChannelStatus::CONNECTED : MPTransportChannelStatus::CONNECTING;
+ result[idx].name = "Snort connection to instance " + std::to_string(sc_handler->channel_id);
+ }
+
+ for(uint16_t it = 0; it < this->config->max_processes-1; it++)
+ {
+ if(result[it].id != 0)
+ continue;
+ result[it].id = it + 2 > mp_current_process_id ? it + 2 : it + 1;
+ result[it].status = result[it].id > mp_current_process_id ? MPTransportChannelStatus::ACCEPTING : MPTransportChannelStatus::DISCONNECTED;
+ result[it].name = "Snort connection to instance " + std::to_string(result[it].id);
}
return result;
closed_connections(0),
connection_retries(0)
{ }
+
+ MPUnixTransportStats(const MPUnixTransportStats& other) :
+ sent_events(other.sent_events),
+ sent_bytes(other.sent_bytes),
+ received_events(other.received_events),
+ received_bytes(other.received_bytes),
+ send_errors(other.send_errors),
+ successful_connections(other.successful_connections),
+ closed_connections(other.closed_connections),
+ connection_retries(other.connection_retries)
+ { }
+
+ MPUnixTransportStats& operator=(const MPUnixTransportStats& other)
+ {
+ sent_events = other.sent_events;
+ sent_bytes = other.sent_bytes;
+ received_events = other.received_events;
+ received_bytes = other.received_bytes;
+ send_errors = other.send_errors;
+ successful_connections = other.successful_connections;
+ closed_connections = other.closed_connections;
+ connection_retries = other.connection_retries;
+
+ return *this;
+ }
PegCount sent_events;
PegCount sent_bytes;
{ return config; }
+ void sum_stats();
+ void reset_stats();
+
private:
void init_side_channels();
MPSerializeFunc get_event_serialization_function(unsigned pub_id, unsigned event_id);
MPDeserializeFunc get_event_deserialization_function(unsigned pub_id, unsigned event_id);
- uint32_t mp_current_process_id = 0;
+ uint16_t mp_current_process_id = 0;
TransportReceiveEventHandler transport_receive_handler = nullptr;
MPUnixDomainTransportConfig* config = nullptr;
std::thread* consume_thread = nullptr;
std::condition_variable consume_thread_cv;
+ MPUnixTransportStats dynamic_transport_stats;
MPUnixTransportStats& transport_stats;
};