From: Umang Sharma (umasharm) Date: Fri, 7 Nov 2025 19:34:52 +0000 (+0000) Subject: Pull request #4962: mp_data_bus: unsubscribe X-Git-Tag: 3.10.0.0~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=dbb067711a9c2f1dda73174a8b8b09089212b1b3;p=thirdparty%2Fsnort3.git Pull request #4962: mp_data_bus: unsubscribe Merge in SNORT/snort3 from ~UMASHARM/snort3:mpdbus_unsubscribe to master Squashed commit of the following: commit 5e4ec40feb8fa04b35ef621bc1449f730b8ed5ce Author: Umang Sharma Date: Mon Nov 3 08:11:49 2025 -0500 mp_data_bus: unsubscribe API --- diff --git a/src/framework/mp_data_bus.cc b/src/framework/mp_data_bus.cc index dcae9b53a..d88ae10ed 100644 --- a/src/framework/mp_data_bus.cc +++ b/src/framework/mp_data_bus.cc @@ -177,26 +177,45 @@ const char* MPDataBus::get_name_from_id(unsigned id) return nullptr; } +bool MPDataBus::is_ready() +{ + const SnortConfig* sc = SnortConfig::get_conf(); + return sc && sc->mp_dbus; +} + void MPDataBus::subscribe(const PubKey& key, unsigned eid, DataHandler* h) { - if(! SnortConfig::get_conf()->mp_dbus) + const SnortConfig* sc = SnortConfig::get_conf(); + if (!sc or !sc->mp_dbus) { ErrorMessage("MPDataBus: MPDataBus not initialized\n"); return; } - SnortConfig::get_conf()->mp_dbus->_subscribe(key, eid, h); + sc->mp_dbus->_subscribe(key, eid, h); MPDataBusLog("Subscribed to event ID %u\n", eid); } +void MPDataBus::unsubscribe(const PubKey& key, unsigned eid, DataHandler* h) +{ + const SnortConfig* sc = SnortConfig::get_conf(); + if (!sc or !sc->mp_dbus) + { + ErrorMessage("MPDataBus: MPDataBus not initialized\n"); + return; + } + + sc->mp_dbus->_unsubscribe(key, eid, h); + MPDataBusLog("Unsubscribed from event ID %u\n", eid); +} + bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptr e, Flow*) { std::shared_ptr event_info = std::make_shared(std::move(e), MPEventType(evt_id), pub_id); const SnortConfig *sc = SnortConfig::get_conf(); - - if (sc->mp_dbus == nullptr) + if (!sc or !sc->mp_dbus) { ErrorMessage("MPDataBus: MPDataBus not initialized\n"); return false; @@ -215,7 +234,8 @@ bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptrmp_dbus or !SnortConfig::get_conf()->mp_dbus->transport_layer) + const SnortConfig* sc = SnortConfig::get_conf(); + if (!sc or !sc->mp_dbus or !sc->mp_dbus->transport_layer) { ErrorMessage("MPDataBus: MPDataBus or transport layer not initialized\n"); return; @@ -225,7 +245,7 @@ void MPDataBus::register_event_helpers(const PubKey& key, unsigned evt_id, MPSer MPHelperFunctions helpers(mp_serializer_helper, mp_deserializer_helper); - SnortConfig::get_conf()->mp_dbus->transport_layer->register_event_helpers(pub_id, evt_id, helpers); + sc->mp_dbus->transport_layer->register_event_helpers(pub_id, evt_id, helpers); MPDataBusLog("Registered event helpers for event ID %u\n", evt_id); } @@ -550,6 +570,38 @@ void MPDataBus::_subscribe(const PubKey& key, unsigned eid, DataHandler* h) _subscribe(pid, eid, h); } +void MPDataBus::_unsubscribe(unsigned pid, unsigned eid, DataHandler* h) +{ + std::pair key = {pid, eid}; + + auto it = mp_pub_sub.find(key); + if (it == mp_pub_sub.end()) + { + MPDataBusLog("No subscribers found for publisher ID %u and event ID %u\n", pid, eid); + return; + } + + SubList& subs = it->second; + auto handler_it = std::find(subs.begin(), subs.end(), h); + if (handler_it != subs.end()) + { + subs.erase(handler_it); + + delete h; + MPDataBusLog("Handler unsubscribed and deleted for publisher ID %u and event ID %u\n", pid, eid); + + if (subs.empty()) + { + mp_pub_sub.erase(it); + } + } +} + +void MPDataBus::_unsubscribe(const PubKey& key, unsigned eid, DataHandler* h) +{ + unsigned pid = get_id(key); + _unsubscribe(pid, eid, h); +} bool MPDataBus::_publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f) { diff --git a/src/framework/mp_data_bus.h b/src/framework/mp_data_bus.h index 50d3a3b90..233f3126c 100644 --- a/src/framework/mp_data_bus.h +++ b/src/framework/mp_data_bus.h @@ -151,8 +151,16 @@ public: static bool valid(unsigned pub_id) { return pub_id != 0; } + static bool is_ready(); + static void subscribe(const PubKey& key, unsigned id, DataHandler* handler); + // Optional API for unsubscribing from DataEvents in a multiprocess environment + // In general, this should not be needed as subscriptions are typically maintained + // for the lifetime of the process, but can be helpful in certain cases + // Note: This will delete the handler when unsubscribing + static void unsubscribe(const PubKey& key, unsigned id, DataHandler* handler); + // API for publishing the DataEvent to the peer Snort processes // The user needs to pass a shared_ptr to the DataEvent object as the third argument // This is to ensure that the DataEvent object is not deleted before it is published @@ -184,6 +192,8 @@ public: private: void _subscribe(unsigned pid, unsigned eid, DataHandler* h); void _subscribe(const PubKey& key, unsigned eid, DataHandler* h); + void _unsubscribe(unsigned pid, unsigned eid, DataHandler* h); + void _unsubscribe(const PubKey& key, unsigned eid, DataHandler* h); bool _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f); bool _enqueue_event(std::shared_ptr ev_info); diff --git a/src/framework/test/mp_data_bus_test.cc b/src/framework/test/mp_data_bus_test.cc index cb5fd40e6..e981a579a 100644 --- a/src/framework/test/mp_data_bus_test.cc +++ b/src/framework/test/mp_data_bus_test.cc @@ -452,6 +452,38 @@ TEST(mp_data_bus, two_subscribers_same_event_and_receive) CHECK_EQUAL(1, h2->evt_msg); } +TEST(mp_data_bus, unsubscribe_single_handler) +{ + UTestHandler1* h1 = new UTestHandler1(); + MPDataBus::subscribe(pub_key1, DbUtIds::EVENT1, h1); + + std::shared_ptr event1 = std::make_shared(100); + MPEventInfo event_info1(event1, MPEventType(DbUtIds::EVENT1), pub_id1); + SnortConfig::get_conf()->mp_dbus->receive_message(event_info1); + CHECK_EQUAL(100, h1->evt_msg); + + MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h1); +} + +TEST(mp_data_bus, unsubscribe_nonexistent_handler) +{ + UTestHandler1* h1 = new UTestHandler1(); + UTestHandler2* h2 = new UTestHandler2(); + + MPDataBus::subscribe(pub_key1, DbUtIds::EVENT1, h1); + MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h2); + + std::shared_ptr event1 = std::make_shared(100); + MPEventInfo event_info1(event1, MPEventType(DbUtIds::EVENT1), pub_id1); + SnortConfig::get_conf()->mp_dbus->receive_message(event_info1); + CHECK_EQUAL(100, h1->evt_msg); + CHECK_EQUAL(1, h2->evt_msg); + + MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h1); + + delete h2; +} + TEST_GROUP(mp_data_bus_clone) { unsigned pub_id1 = 0, pub_id2 = 0; // cppcheck-suppress variableScope