void DataBus::publish(const char*, const uint8_t*, unsigned, Flow*) {}
void DataBus::publish(const char*, Packet*, Flow*) {}
-void DataBus::publish(const char*, void*, int, const uint8_t*) {}
void DataBus::_subscribe(const char* key, DataHandler* h)
{
publish(key, e, f);
}
-void DataBus::publish(const char* key, void* user, int type, const uint8_t* data)
-{
- DaqMetaEvent e(user, type, data);
- publish(key, e, nullptr);
-}
-
//--------------------------------------------------------------------------
// private methods
//--------------------------------------------------------------------------
// convenience methods
static void publish(const char* key, const uint8_t*, unsigned, Flow* = nullptr);
static void publish(const char* key, Packet*, Flow* = nullptr);
- static void publish(const char* key, void* user, int type, const uint8_t* data);
private:
void _subscribe(const char* key, DataHandler*);
DataMap map;
DataModule mapped_module;
};
-
-class SO_PUBLIC DaqMetaEvent : public DataEvent
-{
-public:
- DaqMetaEvent(void* user, int type, const uint8_t *data) :
- user(user), type(type), data(data)
- { }
-
- void* get_user_data()
- { return user; }
-
- int get_type()
- { return type; }
-
- const uint8_t* get_data() override
- { return data; }
-
-private:
- void* user;
- int type;
- const uint8_t* data;
-};
}
//
//
#define PACKET_EVENT "detection.packet"
-#define DAQ_META_EVENT "daq.metapacket"
#define FLOW_STATE_EVENT "flow.state_change"
#define THREAD_IDLE_EVENT "thread.idle"
#define THREAD_ROTATE_EVENT "thread.rotate"
#include "framework/module.h"
#include "log/text_log.h"
#include "protocols/packet.h"
+#include "pub_sub/daq_message_event.h"
using namespace snort;
using namespace std;
static THREAD_LOCAL unsigned s_pkt_num = 0;
-class DaqMetaEventHandler : public DataHandler
+class DaqMessageEventHandler : public DataHandler
{
public:
- DaqMetaEventHandler() : DataHandler(S_NAME) { }
+ DaqMessageEventHandler() : DataHandler(S_NAME) { }
void handle(DataEvent&, Flow*) override;
};
-void DaqMetaEventHandler::handle(DataEvent& event, Flow*)
+void DaqMessageEventHandler::handle(DataEvent& event, Flow*)
{
- if (!hext_log) return;
+ if (!hext_log)
+ return;
- DaqMetaEvent* ev = (DaqMetaEvent*)&event;
+ DaqMessageEvent* dme = (DaqMessageEvent*) &event;
const char* cmd;
- switch (ev->get_type()) {
- case DAQ_MSG_TYPE_SOF: cmd = "sof"; break;
- case DAQ_MSG_TYPE_EOF: cmd = "eof"; break;
- default: return;
+ switch (dme->get_type()) {
+ case DAQ_MSG_TYPE_SOF:
+ cmd = "sof";
+ break;
+ case DAQ_MSG_TYPE_EOF:
+ cmd = "eof";
+ break;
+ default:
+ return;
}
- const Flow_Stats_t* fs = (const Flow_Stats_t*)ev->get_data();
+ const Flow_Stats_t* fs = (const Flow_Stats_t*) dme->get_header();
SfIp src, dst;
char shost[INET6_ADDRSTRLEN];
limit = m->limit;
width = m->width;
raw = m->raw;
- DataBus::subscribe(DAQ_META_EVENT, new DaqMetaEventHandler());
+ DataBus::subscribe(DAQ_SOF_MSG_EVENT, new DaqMessageEventHandler());
+ DataBus::subscribe(DAQ_EOF_MSG_EVENT, new DaqMessageEventHandler());
}
void HextLogger::open()
#include "packet_io/sfdaq_module.h"
#include "packet_tracer/packet_tracer.h"
#include "profiler/profiler.h"
+#include "pub_sub/daq_message_event.h"
#include "pub_sub/finalize_packet_event.h"
-#include "pub_sub/other_message_event.h"
#include "side_channel/side_channel.h"
#include "stream/stream.h"
#include "time/packet_time.h"
// message processing
//-------------------------------------------------------------------------
-static void process_daq_sof_eof_msg(DAQ_Msg_h msg)
+static void process_daq_sof_eof_msg(DAQ_Msg_h msg, DAQ_Verdict& verdict)
{
const Flow_Stats_t *stats = (const Flow_Stats_t *) daq_msg_get_hdr(msg);
+ const char* key;
- if (msg->type == DAQ_MSG_TYPE_EOF)
+ if (daq_msg_get_type(msg) == DAQ_MSG_TYPE_EOF)
+ {
packet_time_update(&stats->eof_timestamp);
+ daq_stats.eof_messages++;
+ key = DAQ_EOF_MSG_EVENT;
+ }
else
+ {
packet_time_update(&stats->sof_timestamp);
+ daq_stats.sof_messages++;
+ key = DAQ_SOF_MSG_EVENT;
+ }
- DataBus::publish(DAQ_META_EVENT, nullptr, daq_msg_get_type(msg), (const uint8_t*) stats);
+ DaqMessageEvent event(msg, verdict);
+ DataBus::publish(key, event);
}
static bool process_packet(Packet* p)
return;
case DAQ_MSG_TYPE_SOF:
case DAQ_MSG_TYPE_EOF:
- process_daq_sof_eof_msg(msg);
+ process_daq_sof_eof_msg(msg, verdict);
break;
default:
{
- OtherMessageEvent event(msg, verdict);
daq_stats.other_messages++;
- // the verdict can be updated by event handler
- DataBus::publish(OTHER_MESSAGE_EVENT, event);
+ DaqMessageEvent event(msg, verdict);
+ DataBus::publish(DAQ_OTHER_MSG_EVENT, event);
}
break;
}
{ CountType::SUM, "retries_dropped", "messages dropped when overrunning the retry queue" },
{ CountType::SUM, "retries_processed", "messages processed from the retry queue" },
{ CountType::SUM, "retries_discarded", "messages discarded when purging the retry queue" },
+ { CountType::SUM, "sof_messages", "start of flow messages received from DAQ" },
+ { CountType::SUM, "eof_messages", "end of flow messages received from DAQ" },
{ CountType::SUM, "other_messages", "messages received from DAQ with unrecognized message type" },
{ CountType::END, nullptr, nullptr }
};
PegCount retries_dropped;
PegCount retries_processed;
PegCount retries_discarded;
+ PegCount sof_messages;
+ PegCount eof_messages;
PegCount other_messages;
};
set (PUB_SUB_INCLUDES
appid_events.h
+ daq_message_event.h
expect_events.h
finalize_packet_event.h
http_events.h
- other_message_event.h
sip_events.h
)
// with this program; if not, write to the Free Software Foundation, Inc.,
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//--------------------------------------------------------------------------
-// other_message_event.h author Steven Baigal <sbaigal@cisco.com>
+// daq_message_event.h author Michael Altizer <mialtize@cisco.com>
-#ifndef OTHER_MESSAGE_EVENT_H
-#define OTHER_MESSAGE_EVENT_H
+#ifndef DAQ_MESSAGE_EVENT_H
+#define DAQ_MESSAGE_EVENT_H
-#include <daq_common.h>
+#include <daq.h>
#include "framework/data_bus.h"
-#define OTHER_MESSAGE_EVENT "daq.other.message"
+#define DAQ_SOF_MSG_EVENT "daq.message.sof"
+#define DAQ_EOF_MSG_EVENT "daq.message.eof"
+#define DAQ_OTHER_MSG_EVENT "daq.message.other"
namespace snort
{
-
-class SO_PUBLIC OtherMessageEvent : public snort::DataEvent
+class SO_PUBLIC DaqMessageEvent : public snort::DataEvent
{
public:
- OtherMessageEvent(DAQ_Msg_h msg, DAQ_Verdict& v) :
- daq_msg(msg), verdict(v)
- {
- }
+ DaqMessageEvent(DAQ_Msg_h msg, DAQ_Verdict& v) : msg(msg), verdict(v) { }
+
+ DAQ_Msg_h get_message()
+ { return msg; }
+
+ DAQ_MsgType get_type() const
+ { return daq_msg_get_type(msg); }
+
+ size_t get_header_length() const
+ { return daq_msg_get_hdr_len(msg); }
- DAQ_Msg_h get_daq_msg()
- { return daq_msg; }
+ const void* get_header() const
+ { return daq_msg_get_hdr(msg); }
- DAQ_Verdict& get_verdict()
+ uint32_t get_data_length() const
+ { return daq_msg_get_data_len(msg); }
+
+ const uint8_t* get_data() override
+ { return daq_msg_get_data(msg); }
+
+ DAQ_Verdict get_verdict()
{ return verdict; }
+ void set_verdict(DAQ_Verdict v)
+ { verdict = v; }
+
private:
- DAQ_Msg_h daq_msg;
+ DAQ_Msg_h msg;
DAQ_Verdict& verdict;
};
-
}
#endif