if (transport_layer == nullptr)
{
ErrorMessage("MPDataBus: Failed to get transport layer\n");
- return 0;
+ return 1;
}
transport_layer->register_receive_handler(std::bind(&MPDataBus::receive_message, this, std::placeholders::_1));
void MPDataBus::register_event_helpers(const PubKey& key, unsigned evt_id, MPSerializeFunc& mp_serializer_helper, MPDeserializeFunc& mp_deserializer_helper)
{
- if (!SnortConfig::get_conf()->mp_dbus && !SnortConfig::get_conf()->mp_dbus->transport_layer)
+ if (!SnortConfig::get_conf()->mp_dbus or !SnortConfig::get_conf()->mp_dbus->transport_layer)
{
ErrorMessage("MPDataBus: MPDataBus or transport layer not initialized\n");
return;
MPDataBusLog("Processing event for publisher ID %u \n",
event_info->pub_id);
+ if (!transport_layer){
+ run_thread.store(false);
+ ErrorMessage("MPDataBus: Transport layer not initialized\n");
+ return;
+ }
+
auto send_res = transport_layer->send_to_transport(*event_info);
{
// and the shared_ptr will handle the memory management by reference counting
static bool publish(unsigned pub_id, unsigned evt_id, std::shared_ptr<DataEvent> e, Flow* f = nullptr);
+ // The user needs to pass the MPSerializeFunc and MPDeserializeFunc function pointers
+ // to the register_event_helpers function, which will be used to serialize and deserialize
+ // before publishing any events to the MPDataBus
static void register_event_helpers(const PubKey& key, unsigned evt_id, MPSerializeFunc& mp_serializer_helper, MPDeserializeFunc& mp_deserializer_helper);
-
// API for receiving the DataEvent and Event type from transport layer using EventInfo
void receive_message(const MPEventInfo& event_info);
tp_appid_module_api.h
tp_appid_session_api.h
tp_appid_types.h
+ ../../framework/mp_data_bus.h
+ ../../framework/mp_data_bus.cc
)
set ( APPID_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR} )
// do not reload third party on reload_config()
if (!tp_appid_ctxt)
+ {
tp_appid_ctxt = TPLibHandler::create_tp_appid_ctxt(config, *odp_ctxt);
+ if (sc->max_procs > 1)
+ TPLibHandler::tp_mp_init(*tp_appid_ctxt);
+ }
once = true;
}
else
#include "appid_peg_counts.h"
#include "service_state.h"
#include "appid_cpu_profile_table.h"
+#include "tp_lib_handler.h"
using namespace snort;
using namespace std;
delete tp_ctxt;
AppIdContext& ctxt = inspector.get_ctxt();
ctxt.create_tp_appid_ctxt();
+ if (SnortConfig::get_conf()->max_procs > 1)
+ TPLibHandler::tp_mp_init(*ctxt.get_tp_appid_ctxt());
main_broadcast_command(new ACThirdPartyAppIdContextSwap(inspector, ctrlcon), ctrlcon);
log_message("== unload old third-party complete\n");
ReloadTracker::update(ctrlcon, "unload old third-party complete, start swapping to new configuration.");
ThirdPartyAppIdContext* tp_appid_ctxt = TPLibHandler::create_tp_appid_ctxt(config, ctxt.get_odp_ctxt());
CHECK_TRUE(tp_appid_ctxt != nullptr);
+ TPLibHandler::tp_mp_init(*tp_appid_ctxt);
+
TpAppIdCreateSession asf = tph->tpsession_factory();
ThirdPartyAppIdSession* tpsession = asf(*tp_appid_ctxt);
TPLibHandler::pfini();
}
+TEST(tp_lib_handler, tp_mp_init)
+{
+ config.tp_appid_path="./libtp_mock.so";
+ config.tp_appid_config="./tp.config";
+
+ tph = TPLibHandler::get();
+ ThirdPartyAppIdContext* tp_appid_ctxt = TPLibHandler::create_tp_appid_ctxt(config, ctxt.get_odp_ctxt());
+
+ TPLibHandler::tp_mp_init(*tp_appid_ctxt);
+ CHECK_TRUE(tp_appid_ctxt != nullptr);
+
+ delete tp_appid_ctxt;
+ TPLibHandler::pfini();
+}
+
TEST(tp_lib_handler, load_error)
{
// Trigger load error:
int tinit() override {return 0;}
bool tfini(bool) override {return false;}
+ int tp_mp_init(ThirdPartyAppIdContext *tp_appid_ctxt) {return true;}
const string& get_user_config() const override { return user_config; }
private:
return new ThirdPartyAppIdSessionImpl(ctxt);
}
+ SO_PUBLIC int tp_mp_init(ThirdPartyAppIdContext& ctxt)
+ {
+ return 0;
+ }
+
SO_PUBLIC int tp_appid_pfini()
{
return 0;
{
return 0;
}
+
+ SO_PUBLIC int tp_appid_mp_init(ThirdPartyAppIdContext& ctxt)
+ {
+ return 0;
+ }
}
{
{ "tp_appid_create_ctxt", (dummyFunc*)&tp_appid_create_ctxt },
{ "tp_appid_create_session", (dummyFunc*)&tp_appid_create_session },
+ { "tp_appid_mp_init", (dummyFunc*)&tp_appid_mp_init },
{ "tp_appid_pfini", (dummyFunc*)&tp_appid_pfini },
{ "tp_appid_tfini", (dummyFunc*)&tp_appid_tfini },
{ nullptr, nullptr }
return tp_appid_ctxt;
}
+void TPLibHandler::tp_mp_init(ThirdPartyAppIdContext& tp_appid_ctxt)
+{
+ assert(self != nullptr);
+ int ret = 0;
+
+ if (self->tp_appid_mp_init)
+ {
+ ret = self->tp_appid_mp_init(tp_appid_ctxt);
+ }
+
+ if (ret != 0)
+ {
+ APPID_LOG(nullptr, TRACE_ERROR_LEVEL, "Could not subscribe to the appid tp syncevent\n", ret);
+ return;
+ }
+
+ return;
+}
+
void TPLibHandler::tfini()
{
assert(self != nullptr);
// Must return null if it fails to create the object.
typedef ThirdPartyAppIdContext* (* TpAppIdCreateCtxt)(ThirdPartyConfig& );
typedef ThirdPartyAppIdSession* (* TpAppIdCreateSession)(ThirdPartyAppIdContext& ctxt);
+typedef int (* TpMPInit)(ThirdPartyAppIdContext& ctxt);
typedef int (* TpAppIdPfini)();
typedef int (* TpAppIdTfini)();
const OdpContext& odp_ctxt);
static void tfini();
static void pfini();
+ static void tp_mp_init(ThirdPartyAppIdContext& tp_appid_ctxt);
TpAppIdCreateSession tpsession_factory() const
{
TpAppIdCreateSession tp_appid_create_session = nullptr;
TpAppIdPfini tp_appid_pfini = nullptr;
TpAppIdTfini tp_appid_tfini = nullptr;
+ TpMPInit tp_appid_mp_init = nullptr;
bool load_callback(const char* path);
};