]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Pull request #4801: flow: watchdog kick in dump flow summary
authorRaza Shafiq (rshafiq) <rshafiq@cisco.com>
Wed, 16 Jul 2025 15:20:58 +0000 (15:20 +0000)
committerSteven Baigal (sbaigal) <sbaigal@cisco.com>
Wed, 16 Jul 2025 15:20:58 +0000 (15:20 +0000)
Merge in SNORT/snort3 from ~RSHAFIQ/snort3:flow_summary to master

Squashed commit of the following:

commit 31cf48a6f448cf5f2dd0644ef1eda0d68e1a8fdc
Author: rshafiq <rshafiq@cisco.com>
Date:   Wed Jul 2 10:46:36 2025 -0400

    flow: watchdog kick in dump flow summary

src/flow/flow_cache.cc
src/flow/test/flow_cache_test.cc

index cf7bd202e49cb1faf44b8581b2ef9ff7876dbe55..5464027019d06142f5404d78bb092918541a94bc 100644 (file)
@@ -980,15 +980,41 @@ bool FlowCache::is_ip_match(const SfIp& flow_sfip, const SfIp& filter_sfip, cons
 
 bool FlowCache::filter_flows(const Flow& flow, const FilterFlowCriteria& ffc) const
 {
-    const SfIp& flow_srcip = flow.flags.client_initiated ? flow.client_ip : flow.server_ip;
-    const SfIp& flow_dstip = flow.flags.client_initiated ? flow.server_ip : flow.client_ip;
+    if ( ffc.pkt_type != PktType::NONE and flow.pkt_type != ffc.pkt_type )
+        return false;
+
+    if ( ffc.source_port != 0 )
+    {
+        uint16_t flow_src_port = flow.flags.key_is_reversed ? flow.key->port_h : flow.key->port_l;
+        if (ffc.source_port != flow_src_port)
+            return false;
+    }
+
+    if ( ffc.destination_port != 0 )
+    {
+        uint16_t flow_dst_port = flow.flags.key_is_reversed ? flow.key->port_l : flow.key->port_h;
+        if (ffc.destination_port != flow_dst_port)
+            return false;
+    }
+
+    if ( !ffc.source_sfip.is_set() and !ffc.destination_sfip.is_set() )
+        return true;
+
+    if ( ffc.source_sfip.is_set() )
+    {
+        const SfIp& flow_srcip = flow.flags.client_initiated ? flow.client_ip : flow.server_ip;
+        if ( !is_ip_match(flow_srcip, ffc.source_sfip, ffc.source_subnet_sfip) )
+            return false;
+    }
 
-    return ((ffc.pkt_type == PktType::NONE || flow.pkt_type == ffc.pkt_type)
-            && is_ip_match(flow_srcip, ffc.source_sfip, ffc.source_subnet_sfip)
-            && is_ip_match(flow_dstip, ffc.destination_sfip, ffc.destination_subnet_sfip)
-            && (!ffc.source_port || ffc.source_port == (flow.flags.key_is_reversed ? flow.key->port_h : flow.key->port_l))
-            && (!ffc.destination_port || ffc.destination_port == (flow.flags.key_is_reversed ? flow.key->port_l : flow.key->port_h)));
+    if ( ffc.destination_sfip.is_set() )
+    {
+        const SfIp& flow_dstip = flow.flags.client_initiated ? flow.server_ip : flow.client_ip;
+        if ( !is_ip_match(flow_dstip, ffc.destination_sfip, ffc.destination_subnet_sfip) )
+            return false;
+    }
 
+    return true;
 }
 
 void FlowCache::output_flow(std::fstream& stream, const Flow& flow, const struct timeval& now) const
@@ -1076,6 +1102,13 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo
     Flow* walk_flow = nullptr;
     bool has_more_flows = false;
 
+    // Fast path: check if all filter fields are empty to avoid expensive filter_flows calls
+    bool all_flows = ( ffc.pkt_type == PktType::NONE and 
+                     !ffc.source_sfip.is_set() and
+                     !ffc.destination_sfip.is_set() and 
+                     ffc.source_port == 0 and
+                     ffc.destination_port == 0 );
+
     for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id)
     {
         if ( proto_id == to_utype(PktType::USER) or
@@ -1095,7 +1128,7 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo
             if  ( walk_flow->dump_code != code )
             {
                 walk_flow->dump_code = code;
-                if( filter_flows(*walk_flow, ffc) )
+                if( all_flows or filter_flows(*walk_flow, ffc) )
                     output_flow(stream, *walk_flow, now);
                 ++i;
             }
@@ -1113,7 +1146,15 @@ bool FlowCache::dump_flows(std::fstream& stream, unsigned count, const FilterFlo
 bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria& ffc) const
 {
     Flow* walk_flow = nullptr;
-
+    uint32_t processed_count = 0;
+
+    // Fast path: check if all filter fields are empty to avoid expensive filter_flows calls
+    bool all_flows = ( ffc.pkt_type == PktType::NONE and 
+                     !ffc.source_sfip.is_set() and
+                     !ffc.destination_sfip.is_set() and 
+                     ffc.source_port == 0 and
+                     ffc.destination_port == 0 );
+    
     for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id)
     {
         if ( proto_id == to_utype(PktType::USER) or
@@ -1126,13 +1167,16 @@ bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlow
 
         while ( walk_flow )
         {
-            if( filter_flows(*walk_flow, ffc) )
+            if( all_flows or filter_flows(*walk_flow, ffc) )
             {
                 flows_summary.type_summary[to_utype(walk_flow->key->pkt_type)]++;
                 flows_summary.state_summary[to_utype(walk_flow->flow_state)]++;
             }
 
             walk_flow = static_cast<Flow*>(hash_table->get_next_walk_user_data(proto_id));
+
+            if ( (++processed_count & WDT_MASK) == 0 )
+                ThreadConfig::preemptive_kick();
         }
     }
 
index f6057b839f82e061f524de5940e94e9c9f7d9d22..679f7ab6e5a199380ef44d548ce68a59e62bd3d1 100644 (file)
@@ -77,9 +77,16 @@ bool HighAvailabilityManager::in_standby(Flow*) { return false; }
 uint8_t TraceApi::get_constraints_generation() { return 0; }
 void TraceApi::filter(const Packet&) {}
 
-void ThreadConfig::preemptive_kick() {}
+// Mock counter for preemptive_kick calls
+static unsigned preemptive_kick_count = 0;
+
+void ThreadConfig::preemptive_kick() { preemptive_kick_count++; }
 unsigned ThreadConfig::get_instance_max() { return 0; }
 
+// Helper function to reset and get kick count
+unsigned get_preemptive_kick_count() { return preemptive_kick_count; }
+void reset_preemptive_kick_count() { preemptive_kick_count = 0; }
+
 SfIpRet SfIp::set(void const*, int) { return SFIP_SUCCESS; }
 SfIpRet SfIp::set(void const*) { return SFIP_SUCCESS; }
 SfIpRet SfIp::pton(const int, const char* ) { return SFIP_SUCCESS; }
@@ -1071,6 +1078,50 @@ TEST(dump_flows_summary, dump_flows_summary_with_filter)
     delete cache;
 }
 
+TEST(dump_flows_summary, watchdog_kick_functionality)
+{
+    FlowCacheConfig fcg;
+    fcg.max_flows = 1000;
+    FilterFlowCriteria ffc;
+    FlowsSummary flows_summary;
+    DummyCache *cache = new DummyCache(fcg);
+    int port = 1;
+
+    // Reset kick counter before test
+    reset_preemptive_kick_count();
+
+    // Add flows that will trigger watchdog kicks
+    // watch dog mask = 7, so kick happens every 8 flows (when count & 7 == 0)
+    // Add 64 flows to trigger multiple kicks
+    for (unsigned i = 0; i < 64; i++)
+    {
+        FlowKey flow_key;
+        flow_key.port_l = port++;
+        flow_key.pkt_type = PktType::TCP;
+        cache->allocate(&flow_key);
+    }
+
+    CHECK(cache->get_count() == 64);
+
+    // Call dump_flows_summary which should trigger watchdog kicks
+    CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+    // Check that watchdog was kicked the expected number of times
+    // With 64 flows and watch dog mask = 7, kicks should happen at:
+    // flow 8 (8 & 7 = 0), flow 16 (16 & 7 = 0), flow 24, 32, 40, 48, 56, 64
+    // That's 8 kicks total
+    unsigned kick_count = get_preemptive_kick_count();
+    CHECK_EQUAL(8, kick_count);
+
+    // Verify all flows were processed correctly
+    CHECK_EQUAL(64, flows_summary.type_summary[to_utype(PktType::TCP)]);
+    CHECK_EQUAL(64, flows_summary.state_summary[to_utype(snort::Flow::FlowState::SETUP)]);
+
+    cache->purge();
+    CHECK(cache->get_flows_allocated() == 0);
+    delete cache;
+}
+
 
 
 TEST_GROUP(flow_cache_lrus)