From: Umang Sharma (umasharm) Date: Fri, 21 Mar 2025 23:31:28 +0000 (+0000) Subject: Pull request #4664: Multiprocess DataBus Framework X-Git-Tag: 3.7.2.0~9 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=5167e1e6033b929404338c5c10da8034b1534b76;p=thirdparty%2Fsnort3.git Pull request #4664: Multiprocess DataBus Framework Merge in SNORT/snort3 from ~UMASHARM/snort3:mpubsub_dbus to master Squashed commit of the following: commit e5e650f62e17bb9529b5c7d05cfd27234261613d Author: Umang Sharma Date: Thu Mar 13 08:25:05 2025 -0400 mp_data_bus: basic framework with skeleton APIs --- diff --git a/src/framework/CMakeLists.txt b/src/framework/CMakeLists.txt index 79ee3a9b1..109b20d35 100644 --- a/src/framework/CMakeLists.txt +++ b/src/framework/CMakeLists.txt @@ -14,6 +14,7 @@ set (FRAMEWORK_INCLUDES ips_option.h logger.h module.h + mp_data_bus.h mpse.h mpse_batch.h parameter.h @@ -40,6 +41,7 @@ add_library ( framework OBJECT parameter.cc pig_pen.cc module.cc + mp_data_bus.cc mpse.cc mpse_batch.cc range.cc diff --git a/src/framework/mp_data_bus.cc b/src/framework/mp_data_bus.cc new file mode 100644 index 000000000..ef8606471 --- /dev/null +++ b/src/framework/mp_data_bus.cc @@ -0,0 +1,126 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2014-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- +// mp_data_bus.cc author Umang Sharma + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "mp_data_bus.h" + +#include +#include + +#include "main/snort_config.h" +#include "protocols/packet.h" +#include "pub_sub/intrinsic_event_ids.h" +#include "utils/stats.h" +#include "main/snort_types.h" + +using namespace snort; + +static std::unordered_map mp_pub_ids; + +//-------------------------------------------------------------------------- +// public methods +//------------------------------------------------------------------------- + +MPDataBus::MPDataBus() = default; + +MPDataBus::~MPDataBus() +{ + // Clean up mp_pub_sub + for (auto& sublist : mp_pub_sub) + { + for (auto* handler : sublist) + { + if (handler->cloned) + handler->cloned = false; + else + delete handler; + } + sublist.clear(); + } + mp_pub_sub.clear(); +} + +unsigned MPDataBus::init(int max_procs) +{ + UNUSED(max_procs); + return 0; +} + +void MPDataBus::clone(MPDataBus& from, const char* exclude_name) +{ + UNUSED(from); + UNUSED(exclude_name); +} + +// module subscribes to an event from a peer snort process +void MPDataBus::subscribe(const PubKey& key, unsigned eid, DataHandler* h) +{ + UNUSED(key); + UNUSED(eid); + UNUSED(h); +} + +// publish event to all peer snort processes subscribed to the event +bool MPDataBus::publish(unsigned pub_id, unsigned evt_id, DataEvent& e, Flow* f) +{ + // Publish implementation + UNUSED(pub_id); + UNUSED(evt_id); + UNUSED(e); + UNUSED(f); + return true; +} + +// register event helpers for serialization and deserialization of msg events +void MPDataBus::register_event_helpers(const PubKey& key, unsigned evt_id, MPSerializeFunc* mp_serializer_helper, MPDeserializeFunc* mp_deserializer_helper) +{ + UNUSED(key); + UNUSED(evt_id); + UNUSED(mp_serializer_helper); + UNUSED(mp_deserializer_helper); +} + +// API for receiving the DataEvent and Event type from transport layer +void MPDataBus::receive_message(const MPEventInfo& event_info) +{ + UNUSED(event_info); +} + +//-------------------------------------------------------------------------- +// private methods +//-------------------------------------------------------------------------- + +void MPDataBus::_subscribe(unsigned pid, unsigned eid, DataHandler* h) +{ + UNUSED(pid); + UNUSED(eid); + UNUSED(h); +} + +void MPDataBus::_publish(unsigned int pid, unsigned int eid, DataEvent& e, Flow* f) +{ + UNUSED(pid); + UNUSED(eid); + UNUSED(e); + UNUSED(f); +} + diff --git a/src/framework/mp_data_bus.h b/src/framework/mp_data_bus.h new file mode 100644 index 000000000..0fb93aee7 --- /dev/null +++ b/src/framework/mp_data_bus.h @@ -0,0 +1,113 @@ +//-------------------------------------------------------------------------- +// Copyright (C) 2014-2025 Cisco and/or its affiliates. All rights reserved. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License Version 2 as published +// by the Free Software Foundation. You may not use, modify or distribute +// this program under any other version of the GNU General Public License. +// +// This program is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// +// You should have received a copy of the GNU General Public License along +// with this program; if not, write to the Free Software Foundation, Inc., +// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +//-------------------------------------------------------------------------- +// mp_data_bus.h author Umang Sharma + +#ifndef MP_DATA_BUS_H +#define MP_DATA_BUS_H + +// The MPDataBus class is a multiprocess version of the DataBus class. +// It is used to publish and subscribe to DataEvents in a multiprocess environment +// and to synchronize between processes. When a Snort instance generates an event that needs +// to be synchronized with other Snort processes, it publishes the event to the MPDataBus. +// The MPDataBus then notifies all other Snort instances that have subscribed to the event +// with a transport channel. +// DataEvents are generated to synchronize between processes in a multiprocess environment. +// They can be used to notify peer Snort processes that are subscribed to relevant events. +// By using DataEvents with a publish-subscribe mechanism, modules can subscribe to events +// from peer Snort processes to communicate with each other in a multiprocess environment. + +#include +#include +#include +#include +#include + +#include "main/snort_types.h" +#include "data_bus.h" +#include + +namespace snort +{ +class Flow; +struct Packet; +struct SnortConfig; + +typedef bool (*MPSerializeFunc)(const DataEvent& event, char** buffer, size_t* length); +typedef bool (*MPDeserializeFunc)(const char* buffer, size_t length, DataEvent* event); + +// Similar to the DataBus class, the MPDataBus class uses uses a combination of PubKey and event ID +// for event subscriptions and publishing. New MP-specific event type enums should be added to the +// appropriate header files in the pub_sub directory. For example, an MPEventIds enum might +// be created in parallel to a pre-existing EventIds enum. The same pub_key can be reused. +// New MP-specific DataEvent structures should similarly be populated in the pub_sub directory in a +// manner analogous to the approach used for intra-snort pub_sub. +typedef unsigned MPEventType; + +struct MPEventInfo +{ + MPEventType type; + unsigned pub_id; + DataEvent event; + MPEventInfo(const DataEvent& e, MPEventType t, unsigned id = 0) + : type(t), pub_id(id), event(e) {} +}; + +struct MPHelperFunctions { + MPSerializeFunc* serializer; + MPDeserializeFunc* deserializer; + + MPHelperFunctions(MPSerializeFunc* s, MPDeserializeFunc* d) + : serializer(s), deserializer(d) {} +}; + +class SO_PUBLIC MPDataBus +{ +public: + MPDataBus(); + ~MPDataBus(); + + static unsigned init(int); + void clone(MPDataBus& from, const char* exclude_name = nullptr); + + unsigned get_id(const PubKey& key) + { return DataBus::get_id(key); } + + bool valid(unsigned pub_id) + { return pub_id != 0; } + + void subscribe(const PubKey& key, unsigned id, DataHandler* handler); + + bool publish(unsigned pub_id, unsigned evt_id, DataEvent& e, Flow* f = nullptr); + + void register_event_helpers(const PubKey& key, unsigned evt_id, MPSerializeFunc* mp_serializer_helper, MPDeserializeFunc* mp_deserializer_helper); + + // API for receiving the DataEvent and Event type from transport layer using EventInfo + void receive_message(const MPEventInfo& event_info); + +private: + void _subscribe(unsigned pid, unsigned eid, DataHandler* h); + void _publish(unsigned pid, unsigned eid, DataEvent& e, Flow* f); + +private: + typedef std::vector SubList; + std::vector mp_pub_sub; +}; +} + +#endif + diff --git a/src/main/policy.h b/src/main/policy.h index 1a1ecc4b1..719a1b3bd 100644 --- a/src/main/policy.h +++ b/src/main/policy.h @@ -40,6 +40,7 @@ typedef unsigned char uuid_t[16]; #include #include "framework/data_bus.h" +#include "framework/mp_data_bus.h" namespace snort { diff --git a/src/main/snort.cc b/src/main/snort.cc index d2f86f008..c7968ad08 100644 --- a/src/main/snort.cc +++ b/src/main/snort.cc @@ -568,6 +568,8 @@ SnortConfig* Snort::get_updated_policy( SnortConfig* sc = new SnortConfig(other_conf, iname); sc->global_dbus->clone(*other_conf->global_dbus, iname); + if (sc->max_procs > 1) + sc->mp_dbus->clone(*other_conf->mp_dbus, iname); if ( fname ) { diff --git a/src/main/snort_config.cc b/src/main/snort_config.cc index f603f3faf..cf2d1628d 100644 --- a/src/main/snort_config.cc +++ b/src/main/snort_config.cc @@ -197,6 +197,8 @@ void SnortConfig::init(const SnortConfig* const other_conf, ProtocolReference* p policy_map = new PolicyMap; thread_config = new ThreadConfig(); global_dbus = new DataBus(); + if (max_procs > 1) + mp_dbus = new MPDataBus(); proto_ref = new ProtocolReference(protocol_reference); so_rules = new SoRules; @@ -258,6 +260,8 @@ SnortConfig::~SnortConfig() if ( cloned ) { delete global_dbus; + if (max_procs > 1) + delete mp_dbus; policy_map->set_cloned(true); delete policy_map; return; @@ -317,6 +321,8 @@ SnortConfig::~SnortConfig() delete overlay_trace_config; delete ha_config; delete global_dbus; + if (max_procs > 1) + delete mp_dbus; delete profiler; delete latency; @@ -386,6 +392,9 @@ void SnortConfig::clone(const SnortConfig* const conf) { *this = *conf; global_dbus = new DataBus(); + if (max_procs > 1) + mp_dbus = new MPDataBus(); + if (conf->homenet.get_family() != 0) memcpy(&homenet, &conf->homenet, sizeof(homenet)); diff --git a/src/main/snort_config.h b/src/main/snort_config.h index 01dc09868..c35260cad 100644 --- a/src/main/snort_config.h +++ b/src/main/snort_config.h @@ -381,6 +381,7 @@ public: std::string tweaks; DataBus* global_dbus = nullptr; + MPDataBus* mp_dbus = nullptr; uint16_t tunnel_mask = 0;