return nullptr;
}
+bool MPDataBus::is_ready()
+{
+ const SnortConfig* sc = SnortConfig::get_conf();
+ return sc && sc->mp_dbus;
+}
+
void MPDataBus::subscribe(const PubKey& key, unsigned eid, DataHandler* h)
{
- if(! SnortConfig::get_conf()->mp_dbus)
+ const SnortConfig* sc = SnortConfig::get_conf();
+ if (!sc or !sc->mp_dbus)
{
ErrorMessage("MPDataBus: MPDataBus not initialized\n");
return;
}
- SnortConfig::get_conf()->mp_dbus->_subscribe(key, eid, h);
+ sc->mp_dbus->_subscribe(key, eid, h);
MPDataBusLog("Subscribed to event ID %u\n", eid);
}
+void MPDataBus::unsubscribe(const PubKey& key, unsigned eid, DataHandler* h)
+{
+ const SnortConfig* sc = SnortConfig::get_conf();
+ if (!sc or !sc->mp_dbus)
+ {
+ ErrorMessage("MPDataBus: MPDataBus not initialized\n");
+ return;
+ }
+
+ sc->mp_dbus->_unsubscribe(key, eid, h);
+ MPDataBusLog("Unsubscribed from event ID %u\n", eid);
+}
+
bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptr<DataEvent> e, Flow*)
{
std::shared_ptr<MPEventInfo> event_info =
std::make_shared<MPEventInfo>(std::move(e), MPEventType(evt_id), pub_id);
const SnortConfig *sc = SnortConfig::get_conf();
-
- if (sc->mp_dbus == nullptr)
+ if (!sc or !sc->mp_dbus)
{
ErrorMessage("MPDataBus: MPDataBus not initialized\n");
return false;
void MPDataBus::register_event_helpers(const PubKey& key, unsigned evt_id, MPSerializeFunc& mp_serializer_helper, MPDeserializeFunc& mp_deserializer_helper)
{
- if (!SnortConfig::get_conf()->mp_dbus or !SnortConfig::get_conf()->mp_dbus->transport_layer)
+ const SnortConfig* sc = SnortConfig::get_conf();
+ if (!sc or !sc->mp_dbus or !sc->mp_dbus->transport_layer)
{
ErrorMessage("MPDataBus: MPDataBus or transport layer not initialized\n");
return;
MPHelperFunctions helpers(mp_serializer_helper, mp_deserializer_helper);
- SnortConfig::get_conf()->mp_dbus->transport_layer->register_event_helpers(pub_id, evt_id, helpers);
+ sc->mp_dbus->transport_layer->register_event_helpers(pub_id, evt_id, helpers);
MPDataBusLog("Registered event helpers for event ID %u\n", evt_id);
}
_subscribe(pid, eid, h);
}
+void MPDataBus::_unsubscribe(unsigned pid, unsigned eid, DataHandler* h)
+{
+ std::pair<unsigned, unsigned> key = {pid, eid};
+
+ auto it = mp_pub_sub.find(key);
+ if (it == mp_pub_sub.end())
+ {
+ MPDataBusLog("No subscribers found for publisher ID %u and event ID %u\n", pid, eid);
+ return;
+ }
+
+ SubList& subs = it->second;
+ auto handler_it = std::find(subs.begin(), subs.end(), h);
+ if (handler_it != subs.end())
+ {
+ subs.erase(handler_it);
+
+ delete h;
+ MPDataBusLog("Handler unsubscribed and deleted for publisher ID %u and event ID %u\n", pid, eid);
+
+ if (subs.empty())
+ {
+ mp_pub_sub.erase(it);
+ }
+ }
+}
+
+void MPDataBus::_unsubscribe(const PubKey& key, unsigned eid, DataHandler* h)
+{
+ unsigned pid = get_id(key);
+ _unsubscribe(pid, eid, h);
+}
bool MPDataBus::_publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f)
{
static bool valid(unsigned pub_id)
{ return pub_id != 0; }
+ static bool is_ready();
+
static void subscribe(const PubKey& key, unsigned id, DataHandler* handler);
+ // Optional API for unsubscribing from DataEvents in a multiprocess environment
+ // In general, this should not be needed as subscriptions are typically maintained
+ // for the lifetime of the process, but can be helpful in certain cases
+ // Note: This will delete the handler when unsubscribing
+ static void unsubscribe(const PubKey& key, unsigned id, DataHandler* handler);
+
// API for publishing the DataEvent to the peer Snort processes
// The user needs to pass a shared_ptr to the DataEvent object as the third argument
// This is to ensure that the DataEvent object is not deleted before it is published
private:
void _subscribe(unsigned pid, unsigned eid, DataHandler* h);
void _subscribe(const PubKey& key, unsigned eid, DataHandler* h);
+ void _unsubscribe(unsigned pid, unsigned eid, DataHandler* h);
+ void _unsubscribe(const PubKey& key, unsigned eid, DataHandler* h);
bool _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f);
bool _enqueue_event(std::shared_ptr<MPEventInfo> ev_info);
CHECK_EQUAL(1, h2->evt_msg);
}
+TEST(mp_data_bus, unsubscribe_single_handler)
+{
+ UTestHandler1* h1 = new UTestHandler1();
+ MPDataBus::subscribe(pub_key1, DbUtIds::EVENT1, h1);
+
+ std::shared_ptr<UTestEvent> event1 = std::make_shared<UTestEvent>(100);
+ MPEventInfo event_info1(event1, MPEventType(DbUtIds::EVENT1), pub_id1);
+ SnortConfig::get_conf()->mp_dbus->receive_message(event_info1);
+ CHECK_EQUAL(100, h1->evt_msg);
+
+ MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h1);
+}
+
+TEST(mp_data_bus, unsubscribe_nonexistent_handler)
+{
+ UTestHandler1* h1 = new UTestHandler1();
+ UTestHandler2* h2 = new UTestHandler2();
+
+ MPDataBus::subscribe(pub_key1, DbUtIds::EVENT1, h1);
+ MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h2);
+
+ std::shared_ptr<UTestEvent> event1 = std::make_shared<UTestEvent>(100);
+ MPEventInfo event_info1(event1, MPEventType(DbUtIds::EVENT1), pub_id1);
+ SnortConfig::get_conf()->mp_dbus->receive_message(event_info1);
+ CHECK_EQUAL(100, h1->evt_msg);
+ CHECK_EQUAL(1, h2->evt_msg);
+
+ MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h1);
+
+ delete h2;
+}
+
TEST_GROUP(mp_data_bus_clone)
{
unsigned pub_id1 = 0, pub_id2 = 0; // cppcheck-suppress variableScope