bool FlowCache::filter_flows(const Flow& flow, const FilterFlowCriteria& ffc) const
{
- const SfIp& flow_srcip = flow.flags.client_initiated ? flow.client_ip : flow.server_ip;
- const SfIp& flow_dstip = flow.flags.client_initiated ? flow.server_ip : flow.client_ip;
+ if ( ffc.pkt_type != PktType::NONE and flow.pkt_type != ffc.pkt_type )
+ return false;
+
+ if ( ffc.source_port != 0 )
+ {
+ uint16_t flow_src_port = flow.flags.key_is_reversed ? flow.key->port_h : flow.key->port_l;
+ if (ffc.source_port != flow_src_port)
+ return false;
+ }
+
+ if ( ffc.destination_port != 0 )
+ {
+ uint16_t flow_dst_port = flow.flags.key_is_reversed ? flow.key->port_l : flow.key->port_h;
+ if (ffc.destination_port != flow_dst_port)
+ return false;
+ }
+
+ if ( !ffc.source_sfip.is_set() and !ffc.destination_sfip.is_set() )
+ return true;
+
+ if ( ffc.source_sfip.is_set() )
+ {
+ const SfIp& flow_srcip = flow.flags.client_initiated ? flow.client_ip : flow.server_ip;
+ if ( !is_ip_match(flow_srcip, ffc.source_sfip, ffc.source_subnet_sfip) )
+ return false;
+ }
- return ((ffc.pkt_type == PktType::NONE || flow.pkt_type == ffc.pkt_type)
- && is_ip_match(flow_srcip, ffc.source_sfip, ffc.source_subnet_sfip)
- && is_ip_match(flow_dstip, ffc.destination_sfip, ffc.destination_subnet_sfip)
- && (!ffc.source_port || ffc.source_port == (flow.flags.key_is_reversed ? flow.key->port_h : flow.key->port_l))
- && (!ffc.destination_port || ffc.destination_port == (flow.flags.key_is_reversed ? flow.key->port_l : flow.key->port_h)));
+ if ( ffc.destination_sfip.is_set() )
+ {
+ const SfIp& flow_dstip = flow.flags.client_initiated ? flow.server_ip : flow.client_ip;
+ if ( !is_ip_match(flow_dstip, ffc.destination_sfip, ffc.destination_subnet_sfip) )
+ return false;
+ }
+ return true;
}
void FlowCache::output_flow(std::fstream& stream, const Flow& flow, const struct timeval& now) const
Flow* walk_flow = nullptr;
bool has_more_flows = false;
+ // Fast path: check if all filter fields are empty to avoid expensive filter_flows calls
+ bool all_flows = ( ffc.pkt_type == PktType::NONE and
+ !ffc.source_sfip.is_set() and
+ !ffc.destination_sfip.is_set() and
+ ffc.source_port == 0 and
+ ffc.destination_port == 0 );
+
for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id)
{
if ( proto_id == to_utype(PktType::USER) or
if ( walk_flow->dump_code != code )
{
walk_flow->dump_code = code;
- if( filter_flows(*walk_flow, ffc) )
+ if( all_flows or filter_flows(*walk_flow, ffc) )
output_flow(stream, *walk_flow, now);
++i;
}
bool FlowCache::dump_flows_summary(FlowsSummary& flows_summary, const FilterFlowCriteria& ffc) const
{
Flow* walk_flow = nullptr;
-
+ uint32_t processed_count = 0;
+
+ // Fast path: check if all filter fields are empty to avoid expensive filter_flows calls
+ bool all_flows = ( ffc.pkt_type == PktType::NONE and
+ !ffc.source_sfip.is_set() and
+ !ffc.destination_sfip.is_set() and
+ ffc.source_port == 0 and
+ ffc.destination_port == 0 );
+
for (uint8_t proto_id = to_utype(PktType::NONE) + 1; proto_id < total_lru_count; ++proto_id)
{
if ( proto_id == to_utype(PktType::USER) or
while ( walk_flow )
{
- if( filter_flows(*walk_flow, ffc) )
+ if( all_flows or filter_flows(*walk_flow, ffc) )
{
flows_summary.type_summary[to_utype(walk_flow->key->pkt_type)]++;
flows_summary.state_summary[to_utype(walk_flow->flow_state)]++;
}
walk_flow = static_cast<Flow*>(hash_table->get_next_walk_user_data(proto_id));
+
+ if ( (++processed_count & WDT_MASK) == 0 )
+ ThreadConfig::preemptive_kick();
}
}
uint8_t TraceApi::get_constraints_generation() { return 0; }
void TraceApi::filter(const Packet&) {}
-void ThreadConfig::preemptive_kick() {}
+// Mock counter for preemptive_kick calls
+static unsigned preemptive_kick_count = 0;
+
+void ThreadConfig::preemptive_kick() { preemptive_kick_count++; }
unsigned ThreadConfig::get_instance_max() { return 0; }
+// Helper function to reset and get kick count
+unsigned get_preemptive_kick_count() { return preemptive_kick_count; }
+void reset_preemptive_kick_count() { preemptive_kick_count = 0; }
+
SfIpRet SfIp::set(void const*, int) { return SFIP_SUCCESS; }
SfIpRet SfIp::set(void const*) { return SFIP_SUCCESS; }
SfIpRet SfIp::pton(const int, const char* ) { return SFIP_SUCCESS; }
delete cache;
}
+TEST(dump_flows_summary, watchdog_kick_functionality)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 1000;
+ FilterFlowCriteria ffc;
+ FlowsSummary flows_summary;
+ DummyCache *cache = new DummyCache(fcg);
+ int port = 1;
+
+ // Reset kick counter before test
+ reset_preemptive_kick_count();
+
+ // Add flows that will trigger watchdog kicks
+ // watch dog mask = 7, so kick happens every 8 flows (when count & 7 == 0)
+ // Add 64 flows to trigger multiple kicks
+ for (unsigned i = 0; i < 64; i++)
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.pkt_type = PktType::TCP;
+ cache->allocate(&flow_key);
+ }
+
+ CHECK(cache->get_count() == 64);
+
+ // Call dump_flows_summary which should trigger watchdog kicks
+ CHECK(cache->dump_flows_summary(flows_summary, ffc) == true);
+
+ // Check that watchdog was kicked the expected number of times
+ // With 64 flows and watch dog mask = 7, kicks should happen at:
+ // flow 8 (8 & 7 = 0), flow 16 (16 & 7 = 0), flow 24, 32, 40, 48, 56, 64
+ // That's 8 kicks total
+ unsigned kick_count = get_preemptive_kick_count();
+ CHECK_EQUAL(8, kick_count);
+
+ // Verify all flows were processed correctly
+ CHECK_EQUAL(64, flows_summary.type_summary[to_utype(PktType::TCP)]);
+ CHECK_EQUAL(64, flows_summary.state_summary[to_utype(snort::Flow::FlowState::SETUP)]);
+
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
TEST_GROUP(flow_cache_lrus)