]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Pull request #4962: mp_data_bus: unsubscribe
authorUmang Sharma (umasharm) <umasharm@cisco.com>
Fri, 7 Nov 2025 19:34:52 +0000 (19:34 +0000)
committerChris Sherwin (chsherwi) <chsherwi@cisco.com>
Fri, 7 Nov 2025 19:34:52 +0000 (19:34 +0000)
Merge in SNORT/snort3 from ~UMASHARM/snort3:mpdbus_unsubscribe to master

Squashed commit of the following:

commit 5e4ec40feb8fa04b35ef621bc1449f730b8ed5ce
Author: Umang Sharma <umasharm@cisco.com>
Date:   Mon Nov 3 08:11:49 2025 -0500

    mp_data_bus: unsubscribe API

src/framework/mp_data_bus.cc
src/framework/mp_data_bus.h
src/framework/test/mp_data_bus_test.cc

index dcae9b53aa90bb32959bfe16b192b90590a05503..d88ae10ed2f585ab98794f06f166ca268ca02436 100644 (file)
@@ -177,26 +177,45 @@ const char* MPDataBus::get_name_from_id(unsigned id)
     return nullptr;
 }
 
+bool MPDataBus::is_ready()
+{
+    const SnortConfig* sc = SnortConfig::get_conf();
+    return sc && sc->mp_dbus;
+}
+
 void MPDataBus::subscribe(const PubKey& key, unsigned eid, DataHandler* h)
 {
-    if(! SnortConfig::get_conf()->mp_dbus)
+    const SnortConfig* sc = SnortConfig::get_conf();
+    if (!sc or !sc->mp_dbus)
     {
         ErrorMessage("MPDataBus: MPDataBus not initialized\n");
         return;
     }
 
-    SnortConfig::get_conf()->mp_dbus->_subscribe(key, eid, h);
+    sc->mp_dbus->_subscribe(key, eid, h);
     MPDataBusLog("Subscribed to event ID %u\n", eid);
 }
 
+void MPDataBus::unsubscribe(const PubKey& key, unsigned eid, DataHandler* h)
+{
+    const SnortConfig* sc = SnortConfig::get_conf();
+    if (!sc or !sc->mp_dbus)
+    {
+        ErrorMessage("MPDataBus: MPDataBus not initialized\n");
+        return;
+    }
+
+    sc->mp_dbus->_unsubscribe(key, eid, h);
+    MPDataBusLog("Unsubscribed from event ID %u\n", eid);
+}
+
 bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptr<DataEvent> e, Flow*)
 {
     std::shared_ptr<MPEventInfo> event_info = 
                 std::make_shared<MPEventInfo>(std::move(e), MPEventType(evt_id), pub_id);
 
     const SnortConfig *sc = SnortConfig::get_conf();
-
-    if (sc->mp_dbus == nullptr)
+    if (!sc or !sc->mp_dbus)
     {
         ErrorMessage("MPDataBus: MPDataBus not initialized\n");
         return false;
@@ -215,7 +234,8 @@ bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, std::shared_ptr<DataEv
 
 void MPDataBus::register_event_helpers(const PubKey& key, unsigned evt_id, MPSerializeFunc& mp_serializer_helper, MPDeserializeFunc& mp_deserializer_helper)
 {
-    if (!SnortConfig::get_conf()->mp_dbus or !SnortConfig::get_conf()->mp_dbus->transport_layer)
+    const SnortConfig* sc = SnortConfig::get_conf();
+    if (!sc or !sc->mp_dbus or !sc->mp_dbus->transport_layer)
     {
         ErrorMessage("MPDataBus: MPDataBus or transport layer not initialized\n");
         return;
@@ -225,7 +245,7 @@ void MPDataBus::register_event_helpers(const PubKey& key, unsigned evt_id, MPSer
 
     MPHelperFunctions helpers(mp_serializer_helper, mp_deserializer_helper);
     
-    SnortConfig::get_conf()->mp_dbus->transport_layer->register_event_helpers(pub_id, evt_id, helpers);
+    sc->mp_dbus->transport_layer->register_event_helpers(pub_id, evt_id, helpers);
     MPDataBusLog("Registered event helpers for event ID %u\n", evt_id);
 }
 
@@ -550,6 +570,38 @@ void MPDataBus::_subscribe(const PubKey& key, unsigned eid, DataHandler* h)
     _subscribe(pid, eid, h);
 }
 
+void MPDataBus::_unsubscribe(unsigned pid, unsigned eid, DataHandler* h)
+{
+    std::pair<unsigned, unsigned> key = {pid, eid};
+
+    auto it = mp_pub_sub.find(key);
+    if (it == mp_pub_sub.end())
+    {
+        MPDataBusLog("No subscribers found for publisher ID %u and event ID %u\n", pid, eid);
+        return;
+    }
+
+    SubList& subs = it->second;
+    auto handler_it = std::find(subs.begin(), subs.end(), h);
+    if (handler_it != subs.end())
+    {
+        subs.erase(handler_it);
+
+        delete h;
+        MPDataBusLog("Handler unsubscribed and deleted for publisher ID %u and event ID %u\n", pid, eid);
+        
+        if (subs.empty())
+        {
+            mp_pub_sub.erase(it);
+        }
+    }
+}
+
+void MPDataBus::_unsubscribe(const PubKey& key, unsigned eid, DataHandler* h)
+{
+    unsigned pid = get_id(key);
+    _unsubscribe(pid, eid, h);
+}
 
 bool MPDataBus::_publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f)
 {
index 50d3a3b909b2037a6320828f6e45f848406331e2..233f3126c3a8df33a5d74426058588d926f726a9 100644 (file)
@@ -151,8 +151,16 @@ public:
     static bool valid(unsigned pub_id)
     { return pub_id != 0; }
 
+    static bool is_ready();
+
     static void subscribe(const PubKey& key, unsigned id, DataHandler* handler); 
 
+    // Optional API for unsubscribing from DataEvents in a multiprocess environment
+    // In general, this should not be needed as subscriptions are typically maintained
+    // for the lifetime of the process, but can be helpful in certain cases
+    // Note: This will delete the handler when unsubscribing
+    static void unsubscribe(const PubKey& key, unsigned id, DataHandler* handler);
+
     // API for publishing the DataEvent to the peer Snort processes
     // The user needs to pass a shared_ptr to the DataEvent object as the third argument
     // This is to ensure that the DataEvent object is not deleted before it is published
@@ -184,6 +192,8 @@ public:
 private: 
     void _subscribe(unsigned pid, unsigned eid, DataHandler* h);
     void _subscribe(const PubKey& key, unsigned eid, DataHandler* h);
+    void _unsubscribe(unsigned pid, unsigned eid, DataHandler* h);
+    void _unsubscribe(const PubKey& key, unsigned eid, DataHandler* h);
 
     bool _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f);
     bool _enqueue_event(std::shared_ptr<MPEventInfo> ev_info);
index cb5fd40e6795794e5ddd33d57b67d365d1dd85d3..e981a579abe8e14118a2509c1039a8080d4c2b05 100644 (file)
@@ -452,6 +452,38 @@ TEST(mp_data_bus, two_subscribers_same_event_and_receive)
     CHECK_EQUAL(1, h2->evt_msg);
 }
 
+TEST(mp_data_bus, unsubscribe_single_handler)
+{
+    UTestHandler1* h1 = new UTestHandler1();
+    MPDataBus::subscribe(pub_key1, DbUtIds::EVENT1, h1);
+
+    std::shared_ptr<UTestEvent> event1 = std::make_shared<UTestEvent>(100);
+    MPEventInfo event_info1(event1, MPEventType(DbUtIds::EVENT1), pub_id1);
+    SnortConfig::get_conf()->mp_dbus->receive_message(event_info1);
+    CHECK_EQUAL(100, h1->evt_msg);
+
+    MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h1);
+}
+
+TEST(mp_data_bus, unsubscribe_nonexistent_handler)
+{
+    UTestHandler1* h1 = new UTestHandler1();
+    UTestHandler2* h2 = new UTestHandler2();
+
+    MPDataBus::subscribe(pub_key1, DbUtIds::EVENT1, h1);
+    MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h2);
+
+    std::shared_ptr<UTestEvent> event1 = std::make_shared<UTestEvent>(100);
+    MPEventInfo event_info1(event1, MPEventType(DbUtIds::EVENT1), pub_id1);
+    SnortConfig::get_conf()->mp_dbus->receive_message(event_info1);
+    CHECK_EQUAL(100, h1->evt_msg);
+    CHECK_EQUAL(1, h2->evt_msg);
+
+    MPDataBus::unsubscribe(pub_key1, DbUtIds::EVENT1, h1);
+
+    delete h2;
+}
+
 TEST_GROUP(mp_data_bus_clone)
 {
     unsigned pub_id1 = 0, pub_id2 = 0;  // cppcheck-suppress variableScope