From: Raza Shafiq (rshafiq) Date: Wed, 16 Jul 2025 15:20:58 +0000 (+0000) Subject: Pull request #4801: flow: watchdog kick in dump flow summary X-Git-Tag: 3.9.2.0~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4559206be052b546ff071e5c534f09f38a24d94a;p=thirdparty%2Fsnort3.git Pull request #4801: flow: watchdog kick in dump flow summary Merge in SNORT/snort3 from ~RSHAFIQ/snort3:flow_summary to master Squashed commit of the following: commit 31cf48a6f448cf5f2dd0644ef1eda0d68e1a8fdc Author: rshafiq Date: Wed Jul 2 10:46:36 2025 -0400 flow: watchdog kick in dump flow summary --- diff --git a/src/flow/flow_cache.cc b/src/flow/flow_cache.cc index cf7bd202e..546402701 100644 --- a/src/flow/flow_cache.cc +++ b/src/flow/flow_cache.cc @@ -980,15 +980,41 @@ bool FlowCache::is_ip_match(const SfIp& flow_sfip, const SfIp& filter_sfip, cons bool FlowCache::filter_flows(const Flow& flow, const FilterFlowCriteria& ffc) const { - const SfIp& flow_srcip = flow.flags.client_initiated ? flow.client_ip : flow.server_ip; - const SfIp& flow_dstip = flow.flags.client_initiated ? flow.server_ip : flow.client_ip; + if ( ffc.pkt_type != PktType::NONE and flow.pkt_type != ffc.pkt_type ) + return false; + + if ( ffc.source_port != 0 ) + { + uint16_t flow_src_port = flow.flags.key_is_reversed ? flow.key->port_h : flow.key->port_l; + if (ffc.source_port != flow_src_port) + return false; + } + + if ( ffc.destination_port != 0 ) + { + uint16_t flow_dst_port = flow.flags.key_is_reversed ? flow.key->port_l : flow.key->port_h; + if (ffc.destination_port != flow_dst_port) + return false; + } + + if ( !ffc.source_sfip.is_set() and !ffc.destination_sfip.is_set() ) + return true; + + if ( ffc.source_sfip.is_set() ) + { + const SfIp& flow_srcip = flow.flags.client_initiated ? flow.client_ip : flow.server_ip; + if ( !is_ip_match(flow_srcip, ffc.source_sfip, ffc.source_subnet_sfip) ) + return false; + } - return ((ffc.pkt_type == PktType::NONE || flow.pkt_type == ffc.pkt_type) - && is_ip_match(flow_srcip, ffc.source_sfip, ffc.source_subnet_sfip) - && is_ip_match(flow_dstip, ffc.destination_sfip, ffc.destination_subnet_sfip) - && (!ffc.source_port || ffc.source_port == (flow.flags.key_is_reversed ? flow.key->port_h : flow.key->port_l)) - && (!ffc.destination_port || ffc.destination_port == (flow.flags.key_is_reversed ? flow.key->port_l : flow.key->port_h))); + if ( ffc.destination_sfip.is_set() ) + { + const SfIp& flow_dstip = flow.flags.client_initiated ? flow.server_ip : flow.client_ip; + if ( !is_ip_match(flow_dstip, ffc.destination_sfip, ffc.destination_subnet_sfip) ) + return false; + } + return true; } void FlowCache::output_flow(std::fstream& stream, const Flow& flow, const struct timeval& now) const @@ -1076,6 +1102,13 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo Flow* walk_flow = nullptr; bool has_more_flows = false; + // Fast path: check if all filter fields are empty to avoid expensive filter_flows calls + bool all_flows = ( ffc.pkt_type == PktType::NONE and + !ffc.source_sfip.is_set() and + !ffc.destination_sfip.is_set() and + ffc.source_port == 0 and + ffc.destination_port == 0 ); + for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id) { if ( proto_id == to_utype(PktType::USER) or @@ -1095,7 +1128,7 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo if ( walk_flow->dump_code != code ) { walk_flow->dump_code = code; - if( filter_flows(*walk_flow, ffc) ) + if( all_flows or filter_flows(*walk_flow, ffc) ) output_flow(stream, *walk_flow, now); ++i; } @@ -1113,7 +1146,15 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria& ffc) const { Flow* walk_flow = nullptr; - + uint32_t processed_count = 0; + + // Fast path: check if all filter fields are empty to avoid expensive filter_flows calls + bool all_flows = ( ffc.pkt_type == PktType::NONE and + !ffc.source_sfip.is_set() and + !ffc.destination_sfip.is_set() and + ffc.source_port == 0 and + ffc.destination_port == 0 ); + for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id) { if ( proto_id == to_utype(PktType::USER) or @@ -1126,13 +1167,16 @@ bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlow while ( walk_flow ) { - if( filter_flows(*walk_flow, ffc) ) + if( all_flows or filter_flows(*walk_flow, ffc) ) { flows_summary.type_summary[to_utype(walk_flow->key->pkt_type)]++; flows_summary.state_summary[to_utype(walk_flow->flow_state)]++; } walk_flow = static_cast(hash_table->get_next_walk_user_data(proto_id)); + + if ( (++processed_count & WDT_MASK) == 0 ) + ThreadConfig::preemptive_kick(); } } diff --git a/src/flow/test/flow_cache_test.cc b/src/flow/test/flow_cache_test.cc index f6057b839..679f7ab6e 100644 --- a/src/flow/test/flow_cache_test.cc +++ b/src/flow/test/flow_cache_test.cc @@ -77,9 +77,16 @@ bool HighAvailabilityManager::in_standby(Flow*) { return false; } uint8_t TraceApi::get_constraints_generation() { return 0; } void TraceApi::filter(const Packet&) {} -void ThreadConfig::preemptive_kick() {} +// Mock counter for preemptive_kick calls +static unsigned preemptive_kick_count = 0; + +void ThreadConfig::preemptive_kick() { preemptive_kick_count++; } unsigned ThreadConfig::get_instance_max() { return 0; } +// Helper function to reset and get kick count +unsigned get_preemptive_kick_count() { return preemptive_kick_count; } +void reset_preemptive_kick_count() { preemptive_kick_count = 0; } + SfIpRet SfIp::set(void const*, int) { return SFIP_SUCCESS; } SfIpRet SfIp::set(void const*) { return SFIP_SUCCESS; } SfIpRet SfIp::pton(const int, const char* ) { return SFIP_SUCCESS; } @@ -1071,6 +1078,50 @@ TEST(dump_flows_summary, dump_flows_summary_with_filter) delete cache; } +TEST(dump_flows_summary, watchdog_kick_functionality) +{ + FlowCacheConfig fcg; + fcg.max_flows = 1000; + FilterFlowCriteria ffc; + FlowsSummary flows_summary; + DummyCache *cache = new DummyCache(fcg); + int port = 1; + + // Reset kick counter before test + reset_preemptive_kick_count(); + + // Add flows that will trigger watchdog kicks + // watch dog mask = 7, so kick happens every 8 flows (when count & 7 == 0) + // Add 64 flows to trigger multiple kicks + for (unsigned i = 0; i < 64; i++) + { + FlowKey flow_key; + flow_key.port_l = port++; + flow_key.pkt_type = PktType::TCP; + cache->allocate(&flow_key); + } + + CHECK(cache->get_count() == 64); + + // Call dump_flows_summary which should trigger watchdog kicks + CHECK(cache->dump_flows_summary(flows_summary, ffc) == true); + + // Check that watchdog was kicked the expected number of times + // With 64 flows and watch dog mask = 7, kicks should happen at: + // flow 8 (8 & 7 = 0), flow 16 (16 & 7 = 0), flow 24, 32, 40, 48, 56, 64 + // That's 8 kicks total + unsigned kick_count = get_preemptive_kick_count(); + CHECK_EQUAL(8, kick_count); + + // Verify all flows were processed correctly + CHECK_EQUAL(64, flows_summary.type_summary[to_utype(PktType::TCP)]); + CHECK_EQUAL(64, flows_summary.state_summary[to_utype(snort::Flow::FlowState::SETUP)]); + + cache->purge(); + CHECK(cache->get_flows_allocated() == 0); + delete cache; +} + TEST_GROUP(flow_cache_lrus)