From: Denys Zikratyi -X (dzikraty - SOFTSERVE INC at Cisco) Date: Thu, 23 Jan 2025 16:47:33 +0000 (+0000) Subject: Pull request #4566: Add summary of flows X-Git-Tag: 3.6.3.0~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fd928e3a09c530680feac180154fec5157d63de3;p=thirdparty%2Fsnort3.git Pull request #4566: Add summary of flows Merge in SNORT/snort3 from ~DZIKRATY/snort3:add_summary_of_flows to master Squashed commit of the following: commit d03bc68913f2fc84b562a7ed97d89fea0e133afe Author: Denys Zikratyi Date: Tue Jan 7 06:10:58 2025 -0500 flow: add command that dumps only flow summaries --- diff --git a/src/flow/flow_cache.cc b/src/flow/flow_cache.cc index 255a021c8..bb4104126 100644 --- a/src/flow/flow_cache.cc +++ b/src/flow/flow_cache.cc @@ -24,6 +24,7 @@ #include "flow/flow_cache.h" +#include #include #include "control/control.h" @@ -61,39 +62,11 @@ static const unsigned ALL_FLOWS = 3; static const unsigned WDT_MASK = 7; // kick watchdog once for every 8 flows deleted -uint8_t DumpFlows::dump_code = 0; - -#ifndef REG_TEST -DumpFlows::DumpFlows(unsigned count, ControlConn* conn) -#else -DumpFlows::DumpFlows(unsigned count, ControlConn* conn, int resume) -#endif - : snort::AnalyzerCommand(conn), dump_count(count) -#ifdef REG_TEST - , resume(resume) -#endif -{ - next.resize(ThreadConfig::get_instance_max()); - ++dump_code; -} +DumpFlowsBase::DumpFlowsBase(ControlConn *conn) + : snort::AnalyzerCommand(conn) +{} -bool DumpFlows::open_files(const std::string& base_name) -{ - dump_stream.resize(ThreadConfig::get_instance_max()); - for (unsigned i = 0; i < ThreadConfig::get_instance_max(); ++i) - { - std::string file_name = base_name + std::to_string(i + 1); - dump_stream[i].open(file_name, std::fstream::out | std::fstream::trunc); - if (0 != (dump_stream[i].rdstate() & std::fstream::failbit)) - { - LogRespond(ctrlcon, "Dump flows failed to open %s\n", file_name.c_str()); - return false; - } - } - return true; -} - -void DumpFlows::cidr2mask(const uint32_t cidr, uint32_t* mask) const +void DumpFlowsBase::cidr2mask(const uint32_t cidr, uint32_t* mask) const { size_t i; @@ -104,7 +77,7 @@ void DumpFlows::cidr2mask(const uint32_t cidr, uint32_t* mask) const } } -bool DumpFlows::set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subnet) const +bool DumpFlowsBase::set_ip(std::string filter_ip, snort::SfIp &ip, snort::SfIp &subnet) const { size_t slash_pos = filter_ip.find('/'); if (slash_pos != std::string::npos) @@ -179,6 +152,38 @@ bool DumpFlows::set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subn return false; } +uint8_t DumpFlows::dump_code = 0; + +#ifndef REG_TEST +DumpFlows::DumpFlows(unsigned count, ControlConn* conn) +#else +DumpFlows::DumpFlows(unsigned count, ControlConn* conn, int resume) +#endif + : DumpFlowsBase(conn), dump_count(count) +#ifdef REG_TEST + , resume(resume) +#endif +{ + next.resize(ThreadConfig::get_instance_max()); + ++dump_code; +} + +bool DumpFlows::open_files(const std::string& base_name) +{ + dump_stream.resize(ThreadConfig::get_instance_max()); + for (unsigned i = 0; i < ThreadConfig::get_instance_max(); ++i) + { + std::string file_name = base_name + std::to_string(i + 1); + dump_stream[i].open(file_name, std::fstream::out | std::fstream::trunc); + if (0 != (dump_stream[i].rdstate() & std::fstream::failbit)) + { + LogRespond(ctrlcon, "Dump flows failed to open %s\n", file_name.c_str()); + return false; + } + } + return true; +} + bool DumpFlows::execute(Analyzer&, void**) { if (!flow_con) @@ -193,6 +198,85 @@ bool DumpFlows::execute(Analyzer&, void**) return flow_con->dump_flows(dump_stream[id], dump_count, ffc, first, dump_code); } +DumpFlowsSummary::DumpFlowsSummary(ControlConn* conn) + : DumpFlowsBase(conn) +{ + flows_summaries.resize(ThreadConfig::get_instance_max()); +} + +DumpFlowsSummary::~DumpFlowsSummary() +{ + FlowsTypeSummary type_summary{}; + FlowsStateSummary state_summary{}; + unsigned total_pkts = 0; + + for (const auto& flows_sum : flows_summaries) + { + for (unsigned i = 0; i < type_summary.size(); ++i) + { + type_summary[i] += flows_sum.type_summary[i]; + total_pkts += flows_sum.type_summary[i]; + } + for (unsigned i = 0; i < state_summary.size(); ++i) + { + state_summary[i] += flows_sum.state_summary[i]; + } + } + + LogRespond(ctrlcon, "Total: %u\n", total_pkts); + for (unsigned i = 0; i < type_summary.size(); ++i) + { + PktType proto = static_cast(i); + + switch (proto) + { + case PktType::IP: + LogRespond(ctrlcon, "IP: %u\n", type_summary[i]); + break; + case PktType::ICMP: + LogRespond(ctrlcon, "ICMP: %u\n", type_summary[i]); + break; + case PktType::TCP: + LogRespond(ctrlcon, "TCP: %u\n", type_summary[i]); + break; + case PktType::UDP: + LogRespond(ctrlcon, "UDP: %u\n", type_summary[i]); + break; + default: + break; + } + } + + unsigned pending = 0; + for (unsigned i = 0; i < state_summary.size(); ++i) + { + snort::Flow::FlowState state = static_cast(i); + + switch (state) + { + case snort::Flow::FlowState::ALLOW: + LogRespond(ctrlcon, "Allowed: %u\n", state_summary[i]); + break; + case snort::Flow::FlowState::BLOCK: + LogRespond(ctrlcon, "Blocked: %u\n", state_summary[i]); + break; + default: + pending += state_summary[i]; + break; + } + } + LogRespond(ctrlcon, "Pending: %u\n", pending); +} + +bool DumpFlowsSummary::execute(Analyzer &, void **) +{ + if (!flow_con) + return true; + unsigned id = get_instance_id(); + + return flow_con->dump_flows_summary(flows_summaries[id], ffc); +} + //------------------------------------------------------------------------- // FlowCache stuff //------------------------------------------------------------------------- @@ -961,7 +1045,34 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo return !has_more_flows; } +bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria& ffc) const +{ + Flow* walk_flow = nullptr; + + for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id) + { + if ( proto_id == to_utype(PktType::USER) or + proto_id == to_utype(PktType::FILE) or + proto_id == to_utype(PktType::PDU) ) + continue; + + + walk_flow = static_cast(hash_table->get_walk_user_data(proto_id)); + + while ( walk_flow ) + { + if( filter_flows(*walk_flow, ffc) ) + { + flows_summary.type_summary[to_utype(walk_flow->key->pkt_type)]++; + flows_summary.state_summary[to_utype(walk_flow->flow_state)]++; + } + walk_flow = static_cast(hash_table->get_next_walk_user_data(proto_id)); + } + } + + return true; +} size_t FlowCache::uni_flows_size() const { return uni_flows ? uni_flows->get_count() : 0; @@ -1005,3 +1116,5 @@ size_t FlowCache::count_flows_in_lru(uint8_t lru_index) const return count; } #endif + + diff --git a/src/flow/flow_cache.h b/src/flow/flow_cache.h index dd85c60c1..415f20b22 100644 --- a/src/flow/flow_cache.h +++ b/src/flow/flow_cache.h @@ -25,6 +25,7 @@ // there is a FlowCache instance for each protocol. // Flows are stored in a ZHash instance by FlowKey. +#include #include #include #include @@ -32,11 +33,12 @@ #include #include +#include "filter_flow_critera.h" #include "framework/counts.h" #include "flow_config.h" +#include "flow.h" #include "main/analyzer_command.h" #include "prune_stats.h" -#include "filter_flow_critera.h" constexpr uint8_t max_protocols = static_cast(to_utype(PktType::MAX)); constexpr uint8_t allowlist_lru_index = max_protocols; @@ -44,13 +46,38 @@ constexpr uint8_t total_lru_count = max_protocols + 1; constexpr uint64_t all_lru_mask = (1ULL << max_protocols) - 1; constexpr uint8_t first_proto = to_utype(PktType::NONE) + 1; +typedef std::array FlowsTypeSummary; +typedef std::array FlowsStateSummary; + +struct FlowsSummary +{ + FlowsTypeSummary type_summary{}; + FlowsStateSummary state_summary{}; +}; + namespace snort { class Flow; struct FlowKey; } -class DumpFlows : public snort::AnalyzerCommand +class DumpFlowsBase : public snort::AnalyzerCommand +{ +public: + DumpFlowsBase(ControlConn*); + virtual ~DumpFlowsBase() override= default; + void cidr2mask(const uint32_t cidr, uint32_t* mask) const; + bool set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subnet) const; + bool execute(Analyzer&, void**) override = 0; + const char* stringify() override = 0; + void set_filter_criteria(const FilterFlowCriteria& filter_criteria) + {ffc = filter_criteria;} + +protected: + FilterFlowCriteria ffc; +}; + +class DumpFlows : public DumpFlowsBase { public: #ifndef REG_TEST @@ -60,13 +87,9 @@ public: #endif ~DumpFlows() override = default; bool open_files(const std::string& base_name); - void cidr2mask(const uint32_t cidr, uint32_t* mask) const; - bool set_ip(std::string filter_ip, snort::SfIp& ip, snort::SfIp& subnet) const; bool execute(Analyzer&, void**) override; const char* stringify() override { return "DumpFlows"; } - void set_filter_criteria(const FilterFlowCriteria& filter_criteria) - {ffc = filter_criteria;} private: //dump_code is to track if the flow is dumped only once per dump_flow command. @@ -74,12 +97,24 @@ private: std::vector dump_stream; std::vector next; unsigned dump_count; - FilterFlowCriteria ffc; #ifdef REG_TEST int resume = -1; #endif }; +class DumpFlowsSummary : public DumpFlowsBase +{ +public: + DumpFlowsSummary(ControlConn*); + + ~DumpFlowsSummary() override; + bool execute(Analyzer&, void**) override; + const char* stringify() override + { return "DumpFlowsSummary"; } + +private: + std::vector flows_summaries; +}; class FlowUniList; @@ -104,6 +139,8 @@ public: unsigned delete_flows(unsigned num_to_delete); unsigned prune_multiple(PruneReason, bool do_cleanup); bool dump_flows(std::fstream&, unsigned count, const FilterFlowCriteria& ffc, bool first, uint8_t code) const; + bool dump_flows_summary(FlowsSummary&, const FilterFlowCriteria& ffc) const; + unsigned purge(); unsigned get_count(); diff --git a/src/flow/flow_control.cc b/src/flow/flow_control.cc index ea6b7fad1..3c7d6b934 100644 --- a/src/flow/flow_control.cc +++ b/src/flow/flow_control.cc @@ -147,6 +147,9 @@ unsigned FlowControl::prune_multiple(PruneReason reason, bool do_cleanup) bool FlowControl::dump_flows(std::fstream& stream, unsigned count, const FilterFlowCriteria& ffc, bool first, uint8_t code) const { return cache->dump_flows(stream, count, ffc, first, code); } +bool FlowControl::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria &ffc) const +{ return cache->dump_flows_summary(flows_summary, ffc); } + void FlowControl::timeout_flows(unsigned max, time_t cur_time) { cache->timeout(max, cur_time); diff --git a/src/flow/flow_control.h b/src/flow/flow_control.h index 2f3d62383..155aac669 100644 --- a/src/flow/flow_control.h +++ b/src/flow/flow_control.h @@ -75,6 +75,8 @@ public: bool move_to_allowlist(snort::Flow*); bool dump_flows(std::fstream&, unsigned count, const FilterFlowCriteria& ffc, bool first, uint8_t code) const; + bool dump_flows_summary(FlowsSummary&, const FilterFlowCriteria& ffc) const; + int add_expected_ignore( const snort::Packet* ctrlPkt, PktType, IpProtocol, diff --git a/src/flow/test/flow_cache_test.cc b/src/flow/test/flow_cache_test.cc index 936b1b118..8eeec0870 100644 --- a/src/flow/test/flow_cache_test.cc +++ b/src/flow/test/flow_cache_test.cc @@ -143,6 +143,14 @@ class DummyCache : public FlowCache bool filter_flows(const Flow& flow, const FilterFlowCriteria& ffc) const override { (void)flow; (void)ffc; return true; }; }; +class DummyCacheWithFilter : public FlowCache +{ + public: + DummyCacheWithFilter(const FlowCacheConfig& cfg) : FlowCache(cfg) {} + ~DummyCacheWithFilter() = default; + void output_flow(std::fstream& stream, const Flow& flow, const struct timeval& now) const override { (void)stream, (void)flow, (void)now; }; +}; + TEST_GROUP(flow_prune) { }; // No flows in the flow cache, pruning should not happen @@ -795,6 +803,271 @@ TEST(dump_flows, dump_flows_no_flows_to_dump) delete cache; } +TEST_GROUP(dump_flows_summary) { }; + +TEST(dump_flows_summary, dump_flows_summary_with_all_empty_caches) +{ + FlowCacheConfig fcg; + FilterFlowCriteria ffc; + FlowsSummary flows_summary; + DummyCache *cache = new DummyCache(fcg); + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + CHECK(cache->get_flows_allocated() == 0); + + FlowsTypeSummary expected_type{}; + CHECK(expected_type == flows_summary.type_summary); + + FlowsStateSummary expected_state{}; + CHECK(expected_state == flows_summary.state_summary); + + delete cache; +} + +TEST(dump_flows_summary, dump_flows_summary_with_one_tcp_flow) +{ + FlowCacheConfig fcg; + fcg.max_flows = 5; + FilterFlowCriteria ffc; + FlowsSummary flows_summary; + DummyCache *cache = new DummyCache(fcg); + + FlowKey flow_key; + flow_key.port_l = 1; + flow_key.pkt_type = PktType::TCP; + cache->allocate(&flow_key); + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + CHECK(cache->get_count() == 1); + + FlowsTypeSummary expected_type{}; + expected_type[to_utype(PktType::TCP)] = 1; + CHECK(expected_type == flows_summary.type_summary); + + FlowsStateSummary expected_state{}; + expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 1; + CHECK(expected_state == flows_summary.state_summary); + + cache->purge(); + CHECK(cache->get_flows_allocated() == 0); + delete cache; +} + + +TEST(dump_flows_summary, dump_flows_summary_with_5_of_each_flow) +{ + FlowCacheConfig fcg; + fcg.max_flows = 50; + FilterFlowCriteria ffc; + FlowsSummary flows_summary; + DummyCache *cache = new DummyCache(fcg); + int port = 1; + + std::vector types = {PktType::IP, PktType::ICMP, PktType::TCP, PktType::UDP}; + + for (const auto& type : types) + { + for (unsigned i = 0; i < 5; i++) + { + FlowKey flow_key; + flow_key.port_l = port++; + flow_key.pkt_type = type; + cache->allocate(&flow_key); + } + } + CHECK (cache->get_count() == 5 * types.size()); + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + + FlowsTypeSummary expected_type{}; + for (const auto& type : types) + expected_type[to_utype(type)] = 5; + CHECK(expected_type == flows_summary.type_summary); + + FlowsStateSummary expected_state{}; + expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 5 * types.size(); + CHECK(expected_state == flows_summary.state_summary); + + cache->purge(); + CHECK(cache->get_flows_allocated() == 0); + CHECK (cache->get_count() == 0); + delete cache; +} + +TEST(dump_flows_summary, dump_flows_summary_with_different_flow_states) +{ + FlowCacheConfig fcg; + fcg.max_flows = 50; + FilterFlowCriteria ffc; + FlowsSummary flows_summary; + DummyCache *cache = new DummyCache(fcg); + int port = 1; + unsigned flows_number = 5; + + std::vector types = {snort::Flow::FlowState::BLOCK, snort::Flow::FlowState::ALLOW, snort::Flow::FlowState::SETUP}; + + for (const auto& type : types) + { + for (unsigned i = 0; i < 5; i++) + { + FlowKey flow_key; + flow_key.port_l = port++; + flow_key.pkt_type = PktType::TCP; + cache->allocate(&flow_key); + Flow* flow = cache->find(&flow_key); + flow->flow_state = type; + } + } + + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + CHECK(cache->get_count() == flows_number * types.size()); + + FlowsTypeSummary expected_type{}; + expected_type[to_utype(PktType::TCP)] = flows_number * types.size(); + CHECK(expected_type == flows_summary.type_summary); + + FlowsStateSummary expected_state{}; + for (const auto& type : types) + { + expected_state[to_utype(type)] = flows_number; + } + CHECK(expected_state == flows_summary.state_summary); + + cache->purge(); + CHECK(cache->get_flows_allocated() == 0); + delete cache; +} + +TEST(dump_flows_summary, dump_flows_summary_with_allowlist) +{ + FlowCacheConfig fcg; + fcg.max_flows = 50; + FilterFlowCriteria ffc; + FlowsSummary flows_summary{}; + DummyCache* cache = new DummyCache(fcg); + int port = 1; + FlowKey flow_key[10]; + + // Add TCP flows and mark some as allow listed + for (unsigned i = 0; i < 10; ++i) + { + flow_key[i].port_l = port++; + flow_key[i].pkt_type = PktType::TCP; + Flow* flow = cache->allocate(&flow_key[i]); + // Mark the first 5 flows as allow listed + if (i < 5) + { + CHECK(cache->move_to_allowlist(flow) == true); + } + } + + CHECK(cache->get_count() == 10); + + //check flows are properly moved to allow list + CHECK(cache->count_flows_in_lru(to_utype(PktType::TCP)) == 5); // Check 5 TCP flows + CHECK(cache->count_flows_in_lru(allowlist_lru_index) == 5); // Check 5 allow listed flows + + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + + FlowsTypeSummary expected_type{}; + expected_type[to_utype(PktType::TCP)] = 10; + CHECK(expected_type == flows_summary.type_summary); + + FlowsStateSummary expected_state{}; + expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 10; + CHECK(expected_state == flows_summary.state_summary); + + // Verify that allow listed flows exist and are correctly handled + for (unsigned i = 0; i < 5; ++i) + { + flow_key[i].port_l = i + 1; // allow listed flow ports + flow_key[i].pkt_type = PktType::TCP; + Flow* flow = cache->find(&flow_key[i]); + CHECK(flow != nullptr); // Ensure the flow is found + CHECK(flow->flags.in_allowlist == 1); // Ensure the flow is allow listed + } + + // Ensure cache cleanup and correct flow counts + cache->purge(); + CHECK(cache->get_flows_allocated() == 0); + CHECK(cache->get_count() == 0); + delete cache; +} + +TEST(dump_flows_summary, dump_flows_summary_with_filter) +{ + FlowCacheConfig fcg; + fcg.max_flows = 50; + FilterFlowCriteria ffc; + FlowsSummary flows_summary; + DummyCacheWithFilter *cache = new DummyCacheWithFilter(fcg); + unsigned flows_number = 5; + + std::vector types = {PktType::IP, PktType::ICMP, PktType::TCP, PktType::UDP}; + + for (const auto& type : types) + { + int port = 1; + for (unsigned i = 0; i < 5; i++) + { + FlowKey flow_key; + flow_key.port_l = port++; + flow_key.port_h = 80; + flow_key.pkt_type = type; + cache->allocate(&flow_key); + + Flow* flow = cache->find(&flow_key); + flow->pkt_type = type; + } + } + CHECK(cache->get_count() == flows_number * types.size()); + + // check proto filter + ffc.pkt_type = PktType::TCP; + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + + FlowsTypeSummary expected_type{}; + expected_type[to_utype(PktType::TCP)] = flows_number; + CHECK(expected_type == flows_summary.type_summary); + + FlowsStateSummary expected_state{}; + expected_state[to_utype(snort::Flow::FlowState::SETUP)] = flows_number; + CHECK(expected_state == flows_summary.state_summary); + + //check port filter + ffc.pkt_type = PktType::NONE; + ffc.source_port = 1; + flows_summary = {}; + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + + expected_type = {}; + for (const auto& type : types) + expected_type[to_utype(type)] = 1; + CHECK(expected_type == flows_summary.type_summary); + + expected_state = {}; + expected_state[to_utype(snort::Flow::FlowState::SETUP)] = types.size(); + CHECK(expected_state == flows_summary.state_summary); + + // check combined filter + ffc.pkt_type = PktType::UDP; + ffc.source_port = 1; + ffc.destination_port = 80; + flows_summary = {}; + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + + expected_type = {}; + expected_type[to_utype(PktType::UDP)] = 1; + CHECK(expected_type == flows_summary.type_summary); + + expected_state = {}; + expected_state[to_utype(snort::Flow::FlowState::SETUP)] = 1; + CHECK(expected_state == flows_summary.state_summary); + + cache->purge(); + CHECK(cache->get_flows_allocated() == 0); + delete cache; +} + + + TEST_GROUP(flow_cache_lrus) { FlowCacheConfig fcg; diff --git a/src/flow/test/flow_control_test.cc b/src/flow/test/flow_control_test.cc index 5785eb84a..efbc354f1 100644 --- a/src/flow/test/flow_control_test.cc +++ b/src/flow/test/flow_control_test.cc @@ -75,6 +75,7 @@ void Flow::init(PktType) { } const SnortConfig* SnortConfig::get_conf() { return nullptr; } void FlowCache::unlink_uni(Flow*) { } bool FlowCache::dump_flows(std::fstream&, unsigned, const FilterFlowCriteria&, bool, uint8_t) const { return false; } +bool FlowCache::dump_flows_summary(FlowsSummary&, const FilterFlowCriteria&) const { return false; } void FlowCache::output_flow(std::fstream&, const Flow&, const struct timeval& ) const { } bool FlowCache::filter_flows(const Flow&, const FilterFlowCriteria&) const { return true; }; void Flow::set_client_initiate(Packet*) { } diff --git a/src/stream/base/stream_module.cc b/src/stream/base/stream_module.cc index 6a28b957e..106201d06 100644 --- a/src/stream/base/stream_module.cc +++ b/src/stream/base/stream_module.cc @@ -263,9 +263,97 @@ uncompleted queue*/ return 0; } +static int dump_flows_summary(lua_State* L) +{ + ControlConn* ctrlcon = ControlConn::query_from_lua(L); + PktType proto_type = PktType::NONE; + Inspector* inspector = InspectorManager::get_inspector("stream", Module::GLOBAL, IT_STREAM); + if (!inspector) + { + LogRespond(ctrlcon, "Dump flows requires stream to be configured\n"); + return -1; + } + const char* protocol = luaL_optstring(L, 1, nullptr); + if (!protocol) + { + LogRespond(ctrlcon, "protocol must be a string or convertible to a string\n"); + return -1; + } + + if (protocol[0] != '\0') + { + auto proto_it = protocol_to_type.find(protocol); + if (proto_it == protocol_to_type.end()) + { + LogRespond(ctrlcon, "valid protocols are IP/TCP/UDP/ICMP\n"); + return -1; + } + else + proto_type = proto_it->second; + } + + std::string source_ip = luaL_optstring(L, 2, nullptr); + if (!source_ip.c_str()) + { + LogRespond(ctrlcon, "source_ip must be a string or convertible to a string\n"); + return -1; + } + std::string destination_ip= luaL_optstring(L, 3, nullptr); + if (!destination_ip.c_str()) + { + LogRespond(ctrlcon, "destination_ip must be a string or convertible to a string\n"); + return -1; + } + int source_port = luaL_optint(L, 4, -1); + if ( source_port<0 || source_port>65535 ) + { + LogRespond(ctrlcon, "source_port must be between 0-65535\n"); + return -1; + } + int destination_port = luaL_optint(L, 5, -1); + if ( destination_port<0 || destination_port>65535) + { + LogRespond(ctrlcon, "destination_port must be between 0-65535\n"); + return -1; + } + + DumpFlowsSummary* dfs = new DumpFlowsSummary(ctrlcon); + + SfIp src_ip,src_subnet; + if (!dfs->set_ip(source_ip, src_ip, src_subnet)) + { + LogRespond(ctrlcon, "Invalid source ip\n"); + delete dfs; + return -1; + } + SfIp dst_ip,dst_subnet; + if (!dfs->set_ip(destination_ip, dst_ip, dst_subnet)) + { + LogRespond(ctrlcon, "Invalid destination ip\n"); + delete dfs; + return -1; + } + + FilterFlowCriteria ffc; + ffc.pkt_type = proto_type; + ffc.source_port = static_cast(source_port); + ffc.destination_port = static_cast(destination_port); + ffc.source_sfip=src_ip; + ffc.destination_sfip=dst_ip; + ffc.source_subnet_sfip=src_subnet; + ffc.destination_subnet_sfip=dst_subnet; + dfs->set_filter_criteria(ffc); + + + LogRespond(ctrlcon, "== dumping connection summaries\n"); + main_broadcast_command(dfs, ctrlcon); + return 0; +} + static const Command stream_cmds[] = { { "dump_flows", dump_flows, nullptr, "dump the flow table" }, + { "dump_flows_summary", dump_flows_summary, nullptr, "dump the flow summaries" }, { nullptr, nullptr, nullptr, nullptr } };