#include "flow/flow_cache.h"
+#include <numeric>
#include <sstream>
#include "control/control.h"
static const unsigned WDT_MASK = 7; // kick watchdog once for every 8 flows deleted
-uint8_t DumpFlows::dump_code = 0;
-
-#ifndef REG_TEST
-DumpFlows::DumpFlows(unsigned count, ControlConn* conn)
-#else
-DumpFlows::DumpFlows(unsigned count, ControlConn* conn, int resume)
-#endif
- : snort::AnalyzerCommand(conn), dump_count(count)
-#ifdef REG_TEST
- , resume(resume)
-#endif
-{
- next.resize(ThreadConfig::get_instance_max());
- ++dump_code;
-}
+DumpFlowsBase::DumpFlowsBase(ControlConn *conn)
+ : snort::AnalyzerCommand(conn)
+{}
-bool DumpFlows::open_files(const std::string& base_name)
-{
- dump_stream.resize(ThreadConfig::get_instance_max());
- for (unsigned i = 0; i < ThreadConfig::get_instance_max(); ++i)
- {
- std::string file_name = base_name + std::to_string(i + 1);
- dump_stream[i].open(file_name, std::fstream::out | std::fstream::trunc);
- if (0 != (dump_stream[i].rdstate() & std::fstream::failbit))
- {
- LogRespond(ctrlcon, "Dump flows failed to open %s\n", file_name.c_str());
- return false;
- }
- }
- return true;
-}
-
-void DumpFlows::cidr2mask(const uint32_t cidr, uint32_t* mask) const
+void DumpFlowsBase::cidr2mask(const uint32_t cidr, uint32_t* mask) const
{
size_t i;
}
}
-bool DumpFlows::set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subnet) const
+bool DumpFlowsBase::set_ip(std::string filter_ip, snort::SfIp &ip, snort::SfIp &subnet) const
{
size_t slash_pos = filter_ip.find('/');
if (slash_pos != std::string::npos)
return false;
}
+uint8_t DumpFlows::dump_code = 0;
+
+#ifndef REG_TEST
+DumpFlows::DumpFlows(unsigned count, ControlConn* conn)
+#else
+DumpFlows::DumpFlows(unsigned count, ControlConn* conn, int resume)
+#endif
+ : DumpFlowsBase(conn), dump_count(count)
+#ifdef REG_TEST
+ , resume(resume)
+#endif
+{
+ next.resize(ThreadConfig::get_instance_max());
+ ++dump_code;
+}
+
+bool DumpFlows::open_files(const std::string& base_name)
+{
+ dump_stream.resize(ThreadConfig::get_instance_max());
+ for (unsigned i = 0; i < ThreadConfig::get_instance_max(); ++i)
+ {
+ std::string file_name = base_name + std::to_string(i + 1);
+ dump_stream[i].open(file_name, std::fstream::out | std::fstream::trunc);
+ if (0 != (dump_stream[i].rdstate() & std::fstream::failbit))
+ {
+ LogRespond(ctrlcon, "Dump flows failed to open %s\n", file_name.c_str());
+ return false;
+ }
+ }
+ return true;
+}
+
bool DumpFlows::execute(Analyzer&, void**)
{
if (!flow_con)
return flow_con->dump_flows(dump_stream[id], dump_count, ffc, first, dump_code);
}
+DumpFlowsSummary::DumpFlowsSummary(ControlConn* conn)
+ : DumpFlowsBase(conn)
+{
+ flows_summaries.resize(ThreadConfig::get_instance_max());
+}
+
+DumpFlowsSummary::~DumpFlowsSummary()
+{
+ FlowsTypeSummary type_summary{};
+ FlowsStateSummary state_summary{};
+ unsigned total_pkts = 0;
+
+ for (const auto& flows_sum : flows_summaries)
+ {
+ for (unsigned i = 0; i < type_summary.size(); ++i)
+ {
+ type_summary[i] += flows_sum.type_summary[i];
+ total_pkts += flows_sum.type_summary[i];
+ }
+ for (unsigned i = 0; i < state_summary.size(); ++i)
+ {
+ state_summary[i] += flows_sum.state_summary[i];
+ }
+ }
+
+ LogRespond(ctrlcon, "Total: %u\n", total_pkts);
+ for (unsigned i = 0; i < type_summary.size(); ++i)
+ {
+ PktType proto = static_cast<PktType>(i);
+
+ switch (proto)
+ {
+ case PktType::IP:
+ LogRespond(ctrlcon, "IP: %u\n", type_summary[i]);
+ break;
+ case PktType::ICMP:
+ LogRespond(ctrlcon, "ICMP: %u\n", type_summary[i]);
+ break;
+ case PktType::TCP:
+ LogRespond(ctrlcon, "TCP: %u\n", type_summary[i]);
+ break;
+ case PktType::UDP:
+ LogRespond(ctrlcon, "UDP: %u\n", type_summary[i]);
+ break;
+ default:
+ break;
+ }
+ }
+
+ unsigned pending = 0;
+ for (unsigned i = 0; i < state_summary.size(); ++i)
+ {
+ snort::Flow::FlowState state = static_cast<snort::Flow::FlowState>(i);
+
+ switch (state)
+ {
+ case snort::Flow::FlowState::ALLOW:
+ LogRespond(ctrlcon, "Allowed: %u\n", state_summary[i]);
+ break;
+ case snort::Flow::FlowState::BLOCK:
+ LogRespond(ctrlcon, "Blocked: %u\n", state_summary[i]);
+ break;
+ default:
+ pending += state_summary[i];
+ break;
+ }
+ }
+ LogRespond(ctrlcon, "Pending: %u\n", pending);
+}
+
+bool DumpFlowsSummary::execute(Analyzer &, void **)
+{
+ if (!flow_con)
+ return true;
+ unsigned id = get_instance_id();
+
+ return flow_con->dump_flows_summary(flows_summaries[id], ffc);
+}
+
//-------------------------------------------------------------------------
// FlowCache stuff
//-------------------------------------------------------------------------
return !has_more_flows;
}
+bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria& ffc) const
+{
+ Flow* walk_flow = nullptr;
+
+ for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id)
+ {
+ if ( proto_id == to_utype(PktType::USER) or
+ proto_id == to_utype(PktType::FILE) or
+ proto_id == to_utype(PktType::PDU) )
+ continue;
+
+
+ walk_flow = static_cast<Flow*>(hash_table->get_walk_user_data(proto_id));
+
+ while ( walk_flow )
+ {
+ if( filter_flows(*walk_flow, ffc) )
+ {
+ flows_summary.type_summary[to_utype(walk_flow->key->pkt_type)]++;
+ flows_summary.state_summary[to_utype(walk_flow->flow_state)]++;
+ }
+ walk_flow = static_cast<Flow*>(hash_table->get_next_walk_user_data(proto_id));
+ }
+ }
+
+ return true;
+}
size_t FlowCache::uni_flows_size() const
{
return uni_flows ? uni_flows->get_count() : 0;
return count;
}
#endif
+
+
// there is a FlowCache instance for each protocol.
// Flows are stored in a ZHash instance by FlowKey.
+#include <array>
#include <ctime>
#include <fstream>
#include <mutex>
#include <vector>
#include <memory>
+#include "filter_flow_critera.h"
#include "framework/counts.h"
#include "flow_config.h"
+#include "flow.h"
#include "main/analyzer_command.h"
#include "prune_stats.h"
-#include "filter_flow_critera.h"
constexpr uint8_t max_protocols = static_cast<uint8_t>(to_utype(PktType::MAX));
constexpr uint8_t allowlist_lru_index = max_protocols;
constexpr uint64_t all_lru_mask = (1ULL << max_protocols) - 1;
constexpr uint8_t first_proto = to_utype(PktType::NONE) + 1;
+typedef std::array<unsigned, to_utype(PktType::MAX)> FlowsTypeSummary;
+typedef std::array<unsigned, to_utype(snort::Flow::FlowState::ALLOW) + 1> FlowsStateSummary;
+
+struct FlowsSummary
+{
+ FlowsTypeSummary type_summary{};
+ FlowsStateSummary state_summary{};
+};
+
namespace snort
{
class Flow;
struct FlowKey;
}
-class DumpFlows : public snort::AnalyzerCommand
+class DumpFlowsBase : public snort::AnalyzerCommand
+{
+public:
+ DumpFlowsBase(ControlConn*);
+ virtual ~DumpFlowsBase() override= default;
+ void cidr2mask(const uint32_t cidr, uint32_t* mask) const;
+ bool set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subnet) const;
+ bool execute(Analyzer&, void**) override = 0;
+ const char* stringify() override = 0;
+ void set_filter_criteria(const FilterFlowCriteria& filter_criteria)
+ {ffc = filter_criteria;}
+
+protected:
+ FilterFlowCriteria ffc;
+};
+
+class DumpFlows : public DumpFlowsBase
{
public:
#ifndef REG_TEST
#endif
~DumpFlows() override = default;
bool open_files(const std::string& base_name);
- void cidr2mask(const uint32_t cidr, uint32_t* mask) const;
- bool set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subnet) const;
bool execute(Analyzer&, void**) override;
const char* stringify() override
{ return "DumpFlows"; }
- void set_filter_criteria(const FilterFlowCriteria& filter_criteria)
- {ffc = filter_criteria;}
private:
//dump_code is to track if the flow is dumped only once per dump_flow command.
std::vector<std::fstream> dump_stream;
std::vector<unsigned> next;
unsigned dump_count;
- FilterFlowCriteria ffc;
#ifdef REG_TEST
int resume = -1;
#endif
};
+class DumpFlowsSummary : public DumpFlowsBase
+{
+public:
+ DumpFlowsSummary(ControlConn*);
+
+ ~DumpFlowsSummary() override;
+ bool execute(Analyzer&, void**) override;
+ const char* stringify() override
+ { return "DumpFlowsSummary"; }
+
+private:
+ std::vector<FlowsSummary> flows_summaries;
+};
class FlowUniList;
unsigned delete_flows(unsigned num_to_delete);
unsigned prune_multiple(PruneReason, bool do_cleanup);
bool dump_flows(std::fstream&, unsigned count, const FilterFlowCriteria& ffc, bool first, uint8_t code) const;
+ bool dump_flows_summary(FlowsSummary&, const FilterFlowCriteria& ffc) const;
+
unsigned purge();
unsigned get_count();
bool FlowControl::dump_flows(std::fstream& stream, unsigned count, const FilterFlowCriteria& ffc, bool first, uint8_t code) const
{ return cache->dump_flows(stream, count, ffc, first, code); }
+bool FlowControl::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria &ffc) const
+{ return cache->dump_flows_summary(flows_summary, ffc); }
+
void FlowControl::timeout_flows(unsigned max, time_t cur_time)
{
cache->timeout(max, cur_time);
bool move_to_allowlist(snort::Flow*);
bool dump_flows(std::fstream&, unsigned count, const FilterFlowCriteria& ffc, bool first, uint8_t code) const;
+ bool dump_flows_summary(FlowsSummary&, const FilterFlowCriteria& ffc) const;
+
int add_expected_ignore(
const snort::Packet* ctrlPkt, PktType, IpProtocol,
bool filter_flows(const Flow& flow, const FilterFlowCriteria& ffc) const override { (void)flow; (void)ffc; return true; };
};
+class DummyCacheWithFilter : public FlowCache
+{
+ public:
+ DummyCacheWithFilter(const FlowCacheConfig& cfg) : FlowCache(cfg) {}
+ ~DummyCacheWithFilter() = default;
+ void output_flow(std::fstream& stream, const Flow& flow, const struct timeval& now) const override { (void)stream, (void)flow, (void)now; };
+};
+
TEST_GROUP(flow_prune) { };
// No flows in the flow cache, pruning should not happen
delete cache;
}
+TEST_GROUP(dump_flows_summary) { };
+
+TEST(dump_flows_summary, dump_flows_summary_with_all_empty_caches)
+{
+ FlowCacheConfig fcg;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary;
+ DummyCache *cache = new DummyCache(fcg);
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+ CHECK(cache->get_flows_allocated() == 0);
+
+ FlowsTypeSummary expected_type{};
+ CHECK(expected_type == flows_summary.type_summary);
+
+ FlowsStateSummary expected_state{};
+ CHECK(expected_state == flows_summary.state_summary);
+
+ delete cache;
+}
+
+TEST(dump_flows_summary, dump_flows_summary_with_one_tcp_flow)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 5;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary;
+ DummyCache *cache = new DummyCache(fcg);
+
+ FlowKey flow_key;
+ flow_key.port_l = 1;
+ flow_key.pkt_type = PktType::TCP;
+ cache->allocate(&flow_key);
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+ CHECK(cache->get_count() == 1);
+
+ FlowsTypeSummary expected_type{};
+ expected_type[to_utype(PktType::TCP)] = 1;
+ CHECK(expected_type == flows_summary.type_summary);
+
+ FlowsStateSummary expected_state{};
+ expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 1;
+ CHECK(expected_state == flows_summary.state_summary);
+
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+
+TEST(dump_flows_summary, dump_flows_summary_with_5_of_each_flow)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 50;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary;
+ DummyCache *cache = new DummyCache(fcg);
+ int port = 1;
+
+ std::vector<PktType> types = {PktType::IP, PktType::ICMP, PktType::TCP, PktType::UDP};
+
+ for (const auto& type : types)
+ {
+ for (unsigned i = 0; i < 5; i++)
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.pkt_type = type;
+ cache->allocate(&flow_key);
+ }
+ }
+ CHECK (cache->get_count() == 5 * types.size());
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+ FlowsTypeSummary expected_type{};
+ for (const auto& type : types)
+ expected_type[to_utype(type)] = 5;
+ CHECK(expected_type == flows_summary.type_summary);
+
+ FlowsStateSummary expected_state{};
+ expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 5 * types.size();
+ CHECK(expected_state == flows_summary.state_summary);
+
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ CHECK (cache->get_count() == 0);
+ delete cache;
+}
+
+TEST(dump_flows_summary, dump_flows_summary_with_different_flow_states)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 50;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary;
+ DummyCache *cache = new DummyCache(fcg);
+ int port = 1;
+ unsigned flows_number = 5;
+
+ std::vector<snort::Flow::FlowState> types = {snort::Flow::FlowState::BLOCK, snort::Flow::FlowState::ALLOW, snort::Flow::FlowState::SETUP};
+
+ for (const auto& type : types)
+ {
+ for (unsigned i = 0; i < 5; i++)
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.pkt_type = PktType::TCP;
+ cache->allocate(&flow_key);
+ Flow* flow = cache->find(&flow_key);
+ flow->flow_state = type;
+ }
+ }
+
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+ CHECK(cache->get_count() == flows_number * types.size());
+
+ FlowsTypeSummary expected_type{};
+ expected_type[to_utype(PktType::TCP)] = flows_number * types.size();
+ CHECK(expected_type == flows_summary.type_summary);
+
+ FlowsStateSummary expected_state{};
+ for (const auto& type : types)
+ {
+ expected_state[to_utype(type)] = flows_number;
+ }
+ CHECK(expected_state == flows_summary.state_summary);
+
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+TEST(dump_flows_summary, dump_flows_summary_with_allowlist)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 50;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary{};
+ DummyCache* cache = new DummyCache(fcg);
+ int port = 1;
+ FlowKey flow_key[10];
+
+ // Add TCP flows and mark some as allow listed
+ for (unsigned i = 0; i < 10; ++i)
+ {
+ flow_key[i].port_l = port++;
+ flow_key[i].pkt_type = PktType::TCP;
+ Flow* flow = cache->allocate(&flow_key[i]);
+ // Mark the first 5 flows as allow listed
+ if (i < 5)
+ {
+ CHECK(cache->move_to_allowlist(flow) == true);
+ }
+ }
+
+ CHECK(cache->get_count() == 10);
+
+ //check flows are properly moved to allow list
+ CHECK(cache->count_flows_in_lru(to_utype(PktType::TCP)) == 5); // Check 5 TCP flows
+ CHECK(cache->count_flows_in_lru(allowlist_lru_index) == 5); // Check 5 allow listed flows
+
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+ FlowsTypeSummary expected_type{};
+ expected_type[to_utype(PktType::TCP)] = 10;
+ CHECK(expected_type == flows_summary.type_summary);
+
+ FlowsStateSummary expected_state{};
+ expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 10;
+ CHECK(expected_state == flows_summary.state_summary);
+
+ // Verify that allow listed flows exist and are correctly handled
+ for (unsigned i = 0; i < 5; ++i)
+ {
+ flow_key[i].port_l = i + 1; // allow listed flow ports
+ flow_key[i].pkt_type = PktType::TCP;
+ Flow* flow = cache->find(&flow_key[i]);
+ CHECK(flow != nullptr); // Ensure the flow is found
+ CHECK(flow->flags.in_allowlist == 1); // Ensure the flow is allow listed
+ }
+
+ // Ensure cache cleanup and correct flow counts
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ CHECK(cache->get_count() == 0);
+ delete cache;
+}
+
+TEST(dump_flows_summary, dump_flows_summary_with_filter)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 50;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary;
+ DummyCacheWithFilter *cache = new DummyCacheWithFilter(fcg);
+ unsigned flows_number = 5;
+
+ std::vector<PktType> types = {PktType::IP, PktType::ICMP, PktType::TCP, PktType::UDP};
+
+ for (const auto& type : types)
+ {
+ int port = 1;
+ for (unsigned i = 0; i < 5; i++)
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.port_h = 80;
+ flow_key.pkt_type = type;
+ cache->allocate(&flow_key);
+
+ Flow* flow = cache->find(&flow_key);
+ flow->pkt_type = type;
+ }
+ }
+ CHECK(cache->get_count() == flows_number * types.size());
+
+ // check proto filter
+ ffc.pkt_type = PktType::TCP;
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+ FlowsTypeSummary expected_type{};
+ expected_type[to_utype(PktType::TCP)] = flows_number;
+ CHECK(expected_type == flows_summary.type_summary);
+
+ FlowsStateSummary expected_state{};
+ expected_state[to_utype(snort::Flow::FlowState::SETUP)] = flows_number;
+ CHECK(expected_state == flows_summary.state_summary);
+
+ //check port filter
+ ffc.pkt_type = PktType::NONE;
+ ffc.source_port = 1;
+ flows_summary = {};
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+ expected_type = {};
+ for (const auto& type : types)
+ expected_type[to_utype(type)] = 1;
+ CHECK(expected_type == flows_summary.type_summary);
+
+ expected_state = {};
+ expected_state[to_utype(snort::Flow::FlowState::SETUP)] = types.size();
+ CHECK(expected_state == flows_summary.state_summary);
+
+ // check combined filter
+ ffc.pkt_type = PktType::UDP;
+ ffc.source_port = 1;
+ ffc.destination_port = 80;
+ flows_summary = {};
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+ expected_type = {};
+ expected_type[to_utype(PktType::UDP)] = 1;
+ CHECK(expected_type == flows_summary.type_summary);
+
+ expected_state = {};
+ expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 1;
+ CHECK(expected_state == flows_summary.state_summary);
+
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+
+
TEST_GROUP(flow_cache_lrus)
{
FlowCacheConfig fcg;
const SnortConfig* SnortConfig::get_conf() { return nullptr; }
void FlowCache::unlink_uni(Flow*) { }
bool FlowCache::dump_flows(std::fstream&, unsigned, const FilterFlowCriteria&, bool, uint8_t) const { return false; }
+bool FlowCache::dump_flows_summary(FlowsSummary&, const FilterFlowCriteria&) const { return false; }
void FlowCache::output_flow(std::fstream&, const Flow&, const struct timeval& ) const { }
bool FlowCache::filter_flows(const Flow&, const FilterFlowCriteria&) const { return true; };
void Flow::set_client_initiate(Packet*) { }
return 0;
}
+static int dump_flows_summary(lua_State* L)
+{
+ ControlConn* ctrlcon = ControlConn::query_from_lua(L);
+ PktType proto_type = PktType::NONE;
+ Inspector* inspector = InspectorManager::get_inspector("stream", Module::GLOBAL, IT_STREAM);
+ if (!inspector)
+ {
+ LogRespond(ctrlcon, "Dump flows requires stream to be configured\n");
+ return -1;
+ }
+ const char* protocol = luaL_optstring(L, 1, nullptr);
+ if (!protocol)
+ {
+ LogRespond(ctrlcon, "protocol must be a string or convertible to a string\n");
+ return -1;
+ }
+
+ if (protocol[0] != '\0')
+ {
+ auto proto_it = protocol_to_type.find(protocol);
+ if (proto_it == protocol_to_type.end())
+ {
+ LogRespond(ctrlcon, "valid protocols are IP/TCP/UDP/ICMP\n");
+ return -1;
+ }
+ else
+ proto_type = proto_it->second;
+ }
+
+ std::string source_ip = luaL_optstring(L, 2, nullptr);
+ if (!source_ip.c_str())
+ {
+ LogRespond(ctrlcon, "source_ip must be a string or convertible to a string\n");
+ return -1;
+ }
+ std::string destination_ip= luaL_optstring(L, 3, nullptr);
+ if (!destination_ip.c_str())
+ {
+ LogRespond(ctrlcon, "destination_ip must be a string or convertible to a string\n");
+ return -1;
+ }
+ int source_port = luaL_optint(L, 4, -1);
+ if ( source_port<0 || source_port>65535 )
+ {
+ LogRespond(ctrlcon, "source_port must be between 0-65535\n");
+ return -1;
+ }
+ int destination_port = luaL_optint(L, 5, -1);
+ if ( destination_port<0 || destination_port>65535)
+ {
+ LogRespond(ctrlcon, "destination_port must be between 0-65535\n");
+ return -1;
+ }
+
+ DumpFlowsSummary* dfs = new DumpFlowsSummary(ctrlcon);
+
+ SfIp src_ip,src_subnet;
+ if (!dfs->set_ip(source_ip, src_ip, src_subnet))
+ {
+ LogRespond(ctrlcon, "Invalid source ip\n");
+ delete dfs;
+ return -1;
+ }
+ SfIp dst_ip,dst_subnet;
+ if (!dfs->set_ip(destination_ip, dst_ip, dst_subnet))
+ {
+ LogRespond(ctrlcon, "Invalid destination ip\n");
+ delete dfs;
+ return -1;
+ }
+
+ FilterFlowCriteria ffc;
+ ffc.pkt_type = proto_type;
+ ffc.source_port = static_cast<uint16_t>(source_port);
+ ffc.destination_port = static_cast<uint16_t>(destination_port);
+ ffc.source_sfip=src_ip;
+ ffc.destination_sfip=dst_ip;
+ ffc.source_subnet_sfip=src_subnet;
+ ffc.destination_subnet_sfip=dst_subnet;
+ dfs->set_filter_criteria(ffc);
+
+
+ LogRespond(ctrlcon, "== dumping connection summaries\n");
+ main_broadcast_command(dfs, ctrlcon);
+ return 0;
+}
+
static const Command stream_cmds[] =
{
{ "dump_flows", dump_flows, nullptr, "dump the flow table" },
+ { "dump_flows_summary", dump_flows_summary, nullptr, "dump the flow summaries" },
{ nullptr, nullptr, nullptr, nullptr }
};