int listen (int, int) { return s_listen_return; }
#endif
static bool use_test_accept_counter = false;
-static uint test_accept_counter = 0;
+static uint32_t test_accept_counter = 0;
int accept (int, struct sockaddr*, socklen_t*)
{
if ( use_test_accept_counter )
return;
}
- ushort error_count = 0;
+ unsigned short error_count = 0;
while (should_accept) {
if(error_count > 10)
void MPDataBusLog(const char* msg, ...);
-std::condition_variable MPDataBus::queue_cv;
-std::mutex MPDataBus::queue_mutex;
uint32_t MPDataBus::mp_max_eventq_size = DEFAULT_MAX_EVENTQ_SIZE;
std::string MPDataBus::transport = DEFAULT_TRANSPORT;
bool MPDataBus::enable_debug = false;
static std::unordered_map<std::string, unsigned> mp_pub_ids;
static std::mutex mp_stats_mutex;
-static uint mp_current_process_id = 0;
+static uint32_t mp_current_process_id = 0;
void MPDataBusLog(const char* msg, ...)
{
// public methods
//-------------------------------------------------------------------------
-MPDataBus::MPDataBus() : run_thread(true)
+MPDataBus::MPDataBus() :
+ run_thread(true),
+ worker_thread(nullptr),
+ mp_event_queue(nullptr)
{
mp_event_queue = new Ring<std::shared_ptr<MPEventInfo>>(mp_max_eventq_size);
start_worker_thread();
return false;
}
- if (sc->mp_dbus->mp_event_queue != nullptr and !sc->mp_dbus->mp_event_queue->full() and !sc->mp_dbus->mp_event_queue->put(event_info)) {
+ if (!sc->mp_dbus->_enqueue_event(std::move(event_info)))
+ {
ErrorMessage("MPDataBus: Failed to enqueue event for publisher ID %u and event ID %u\n", pub_id, evt_id);
return false;
}
- queue_cv.notify_one();
-
MPDataBusLog("Event published for publisher ID %u and event ID %u\n", pub_id, evt_id);
return true;
std::unique_lock<std::mutex> u_lock(queue_mutex);
- queue_cv.wait_for(u_lock, std::chrono::milliseconds(WORKER_THREAD_SLEEP), [this]() {
- return mp_event_queue != nullptr && !mp_event_queue->empty();
- });
+ if( (std::cv_status::timeout == queue_cv.wait_for(u_lock, std::chrono::milliseconds(WORKER_THREAD_SLEEP))) and
+ mp_event_queue->empty() )
+ return;
while (!mp_event_queue->empty()) {
std::shared_ptr<MPEventInfo> event_info = mp_event_queue->get(nullptr);
auto transport_pegs = transport_module->get_pegs();
if(transport_pegs)
{
- uint size = 0;
+ uint32_t size = 0;
while(transport_pegs[size].type != CountType::END)
{
size++;
void MPDataBus::dump_events(ControlConn *ctrlconn, const char *module_name)
{
int current_read_idx = 0;
- uint ring_items = mp_event_queue->count();
+ uint32_t ring_items = mp_event_queue->count();
if(ring_items == 0)
{
if (ctrlconn)
current_read_idx--;
}
- for (uint i = current_read_idx; i <= ring_items; i++)
+ for (uint32_t i = current_read_idx; i <= ring_items; i++)
{
if(i >= mp_max_eventq_size)
{
return;
}
- uint size = 0;
+ unsigned int size = 0;
auto transport_status = transport_layer->get_channel_status(size);
if (size == 0)
{
return;
}
std::string response;
- for (uint i = 0; i < size; i++)
+ for (unsigned int i = 0; i < size; i++)
{
const auto& channel = transport_status[i];
response += "Channel ID: " + std::to_string(channel.id) + ", Name: " + channel.name + ", Status: " + channel.get_status_string() + "\n";
return true;
}
+bool snort::MPDataBus::_enqueue_event(std::shared_ptr<MPEventInfo> ev_info)
+{
+ bool res = mp_event_queue != nullptr and !mp_event_queue->full() and mp_event_queue->put(std::move(ev_info));
+ if(res) queue_cv.notify_one();
+ return res;
+}
void _subscribe(const PubKey& key, unsigned eid, DataHandler* h);
bool _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f);
+ bool _enqueue_event(std::shared_ptr<MPEventInfo> ev_info);
private:
typedef std::vector<DataHandler*> SubList;
Ring<std::shared_ptr<MPEventInfo>>* mp_event_queue;
- static std::condition_variable queue_cv;
- static std::mutex queue_mutex;
+ std::condition_variable queue_cv;
+ std::mutex queue_mutex;
std::unordered_map<unsigned, MPDataBusStats> mp_pub_stats;
virtual void enable_logging() = 0;
virtual void disable_logging() = 0;
virtual bool is_logging_enabled() = 0;
- virtual MPTransportChannelStatusHandle* get_channel_status(uint& size) = 0;
+ virtual MPTransportChannelStatusHandle* get_channel_status(unsigned& size) = 0;
};
return true;
}
- MPTransportChannelStatusHandle* get_channel_status(uint& size) override
+ MPTransportChannelStatusHandle* get_channel_status(unsigned int& size) override
{
size = 0;
return nullptr;
{ }
virtual bool is_logging_enabled() override
{ return false; }
- MPTransportChannelStatusHandle* get_channel_status(uint& size) override
+ MPTransportChannelStatusHandle* get_channel_status(unsigned int& size) override
{
size = 0;
return nullptr;
#include <cstring>
#include <fcntl.h>
-#include <filesystem>
#include <iostream>
#include <poll.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/stat.h>
#include <unistd.h>
#include "framework/mp_data_bus.h"
delete msg;
}
-void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector, UnixDomainConnectorConfig* cfg, const ushort& channel_id)
+void MPUnixDomainTransport::handle_new_connection(UnixDomainConnector *connector, UnixDomainConnectorConfig* cfg, const unsigned short& channel_id)
{
assert(connector);
assert(cfg);
this->is_running = true;
- if ( std::filesystem::is_directory(config->unix_domain_socket_path) == false )
+ struct stat st;
+ if (::stat(config->unix_domain_socket_path.c_str(), &st) != 0 || !S_ISDIR(st.st_mode))
{
- std::error_code ec;
- std::filesystem::create_directories(config->unix_domain_socket_path, ec);
- if (ec)
+ if (mkdir(config->unix_domain_socket_path.c_str(), 0755) != 0)
{
MPTransportLog("Failed to create directory %s\n", config->unix_domain_socket_path.c_str());
return;
}
}
- for (ushort i = instance_id; i < max_processes; i++)
+ for (unsigned short i = instance_id; i < max_processes; i++)
{
auto listen_path = config->unix_domain_socket_path + UNIX_SOCKET_NAME_PREFIX + std::to_string(i);
this->accept_handlers.push_back(unix_listener_handle);
}
- for (ushort i = 1; i < instance_id; i++)
+ for (unsigned short i = 1; i < instance_id; i++)
{
auto side_channel = new SideChannel(ScMsgFormat::BINARY);
side_channel->register_receive_handler([this](SCMessage* msg) { this->side_channel_receive_handler(msg); });
std::lock_guard<std::mutex> guard_send(_send_mutex);
std::lock_guard<std::mutex> guard_read(_read_mutex);
- for (uint i = 0; i < this->side_channels.size(); i++)
+ for (uint32_t i = 0; i < this->side_channels.size(); i++)
{
delete this->side_channels[i];
}
return this->is_logging_enabled_flag;
}
-MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(uint &size)
+MPTransportChannelStatusHandle *MPUnixDomainTransport::get_channel_status(unsigned& size)
{
std::lock_guard<std::mutex> guard_send(_send_mutex);
std::lock_guard<std::mutex> guard_read(_read_mutex);
MPTransportChannelStatusHandle* result = new MPTransportChannelStatusHandle[this->side_channels.size()];
size = this->side_channels.size();
- uint it = 0;
+ unsigned int it = 0;
for (auto &&sc_handler : this->side_channels)
{
struct SideChannelHandle
{
- SideChannelHandle(SideChannel* sc, UnixDomainConnectorConfig* cc, const ushort& channel_id) :
+ SideChannelHandle(SideChannel* sc, UnixDomainConnectorConfig* cc, const unsigned short& channel_id) :
side_channel(sc), connector_config(cc), channel_id(channel_id)
{ }
SideChannel* side_channel;
UnixDomainConnectorConfig* connector_config;
- ushort channel_id;
+ unsigned short channel_id;
};
struct UnixAcceptorHandle
void disable_logging() override;
bool is_logging_enabled() override;
void cleanup();
- MPTransportChannelStatusHandle* get_channel_status(uint& size) override;
+ MPTransportChannelStatusHandle* get_channel_status(unsigned& size) override;
MPUnixDomainTransportConfig* get_config()
{ return config; }
void init_side_channels();
void cleanup_side_channels();
void side_channel_receive_handler(SCMessage* msg);
- void handle_new_connection(UnixDomainConnector* connector, UnixDomainConnectorConfig* cfg, const ushort& channel_id);
+ void handle_new_connection(UnixDomainConnector* connector, UnixDomainConnectorConfig* cfg, const unsigned short& channel_id);
void process_messages_from_side_channels();
void notify_process_thread();
void connector_update_handler(UnixDomainConnector* connector, bool is_recconecting, SideChannel* side_channel);
MPSerializeFunc get_event_serialization_function(unsigned pub_id, unsigned event_id);
MPDeserializeFunc get_event_deserialization_function(unsigned pub_id, unsigned event_id);
- uint mp_current_process_id = 0;
+ uint32_t mp_current_process_id = 0;
TransportReceiveEventHandler transport_receive_handler = nullptr;
MPUnixDomainTransportConfig* config = nullptr;