break;
node->clear(free_list);
- hash_table->remove();
+ hash_table->release();
++prunes;
}
}
{
if (node->head)
node->clear(free_list);
- hash_table->remove(&key);
+ hash_table->release(&key);
return nullptr;
}
/* Make sure the packet direction is correct */
lws->ssn_state.snort_protocol_id = node->snort_protocol_id;
if (!node->count)
- hash_table->remove(&key);
+ hash_table->release(&key);
return ignoring;
}
private:
class ZHash* hash_table;
ExpectNode* nodes;
- snort::ExpectFlow* pool, * free_list;
+ snort::ExpectFlow* pool;
+ snort::ExpectFlow* free_list;
unsigned long expects, realized;
unsigned long prunes, overflows;
#define SESSION_CACHE_FLAG_PURGING 0x01
+static const unsigned ALLOWED_FLOWS_ONLY = 1;
+static const unsigned OFFLOADED_FLOWS_TOO = 2;
+static const unsigned ALL_FLOWS = 3;
+
//-------------------------------------------------------------------------
// FlowCache stuff
//-------------------------------------------------------------------------
{
if ( flows_allocated < config.max_flows )
{
- Flow* new_flow = new Flow;
+ Flow* new_flow = new Flow();
push(new_flow);
}
else if ( !prune_stale(timestamp, nullptr) )
return flow;
}
-int FlowCache::release(Flow* flow, PruneReason reason, bool do_cleanup)
-{
- flow->reset(do_cleanup);
- prune_stats.update(reason);
- return remove(flow);
-}
-
-int FlowCache::remove(Flow* flow)
+void FlowCache::remove(Flow* flow)
{
if ( flow->next )
unlink_uni(flow);
- bool deleted = hash_table->remove(flow->key);
-
// FIXIT-M This check is added for offload case where both Flow::reset
// and Flow::retire try remove the flow from hash. Flow::reset should
// just mark the flow as pending instead of trying to remove it.
- if ( deleted )
+ if ( hash_table->release(flow->key) )
memory::MemoryCap::update_deallocations(config.proto[to_utype(flow->key->pkt_type)].cap_weight);
+}
- return deleted;
+void FlowCache::release(Flow* flow, PruneReason reason, bool do_cleanup)
+{
+ flow->reset(do_cleanup);
+ prune_stats.update(reason);
+ remove(flow);
}
-int FlowCache::retire(Flow* flow)
+void FlowCache::retire(Flow* flow)
{
flow->reset(true);
flow->term();
prune_stats.update(PruneReason::NONE);
- return remove(flow);
+ remove(flow);
}
unsigned FlowCache::prune_stale(uint32_t thetime, const Flow* save_me)
#else
// Reached the current flow. This *should* be the newest flow
if ( flow == save_me )
- {
- // assert( flow->last_data_seen + config.pruning_timeout >= thetime );
- // bool rv = hash_table->touch(); assert( !rv );
break;
- }
#endif
if ( flow->is_suspended() )
break;
bool FlowCache::prune_one(PruneReason reason, bool do_cleanup)
{
-
// so we don't prune the current flow (assume current == MRU)
if ( hash_table->get_count() <= 1 )
return false;
return retired;
}
+unsigned FlowCache::delete_active_flows(unsigned mode, unsigned num_to_delete, unsigned &deleted)
+{
+ unsigned flows_to_check = hash_table->get_count();
+ while ( num_to_delete && flows_to_check-- )
+ {
+ auto flow = static_cast<Flow*>(hash_table->first());
+ assert(flow);
+ if ( (mode == ALLOWED_FLOWS_ONLY and (flow->was_blocked() || flow->is_suspended()))
+ or (mode == OFFLOADED_FLOWS_TOO and flow->was_blocked()) )
+ {
+ if (!hash_table->touch())
+ break;
+
+ continue;
+ }
+
+ // we have a winner...
+ hash_table->remove(flow->key);
+ if ( flow->next )
+ unlink_uni(flow);
+
+ if ( flow->was_blocked() )
+ delete_stats.update(FlowDeleteState::BLOCKED);
+ else if ( flow->is_suspended() )
+ delete_stats.update(FlowDeleteState::OFFLOADED);
+ else
+ delete_stats.update(FlowDeleteState::ALLOWED);
+
+ delete flow;
+ --flows_allocated;
+ ++deleted;
+ --num_to_delete;
+ }
+
+ return num_to_delete;
+}
+
+unsigned FlowCache::delete_flows(unsigned num_to_delete)
+{
+ ActiveSuspendContext act_susp;
+
+ unsigned deleted = 0;
+
+ // delete from the free list first...
+ while ( num_to_delete )
+ {
+ Flow* flow = (Flow*)hash_table->pop();
+ if ( !flow )
+ break;
+
+ delete flow;
+ delete_stats.update(FlowDeleteState::FREELIST);
+ --flows_allocated;
+ ++deleted;
+ --num_to_delete;
+ }
+
+ unsigned mode = ALLOWED_FLOWS_ONLY;
+ while ( num_to_delete && mode <= ALL_FLOWS )
+ num_to_delete = delete_active_flows(mode++, num_to_delete, deleted);
+
+ return deleted;
+}
+
// Remove all flows from the hash table.
unsigned FlowCache::purge()
{
}
while ( Flow* flow = (Flow*)hash_table->pop() )
+ {
delete flow;
+ --flows_allocated;
+ }
return retired;
}
#include <ctime>
#include <type_traits>
+#include "framework/counts.h"
+
#include "flow_config.h"
#include "prune_stats.h"
snort::Flow* find(const snort::FlowKey*);
snort::Flow* allocate(const snort::FlowKey*);
- int release(snort::Flow*, PruneReason = PruneReason::NONE, bool do_cleanup = true);
+ void release(snort::Flow*, PruneReason = PruneReason::NONE, bool do_cleanup = true);
+
unsigned prune_stale(uint32_t thetime, const snort::Flow* save_me);
unsigned prune_excess(const snort::Flow* save_me);
bool prune_one(PruneReason, bool do_cleanup);
unsigned timeout(unsigned num_flows, time_t cur_time);
+ unsigned delete_flows(unsigned num_to_delete);
+
unsigned purge();
unsigned get_count();
PegCount get_prunes(PruneReason reason) const
{ return prune_stats.get(reason); }
+ PegCount get_total_deletes() const
+ { return delete_stats.get_total(); }
+
+ PegCount get_deletes(FlowDeleteState state) const
+ { return delete_stats.get(state); }
+
void reset_stats()
- { prune_stats = PruneStats(); }
+ {
+ prune_stats = PruneStats();
+ delete_stats = FlowDeleteStats();
+ }
void unlink_uni(snort::Flow*);
+ void set_flow_cache_config(FlowCacheConfig& cfg) { config = cfg; }
+
+ unsigned get_flows_allocated() const
+ { return flows_allocated; }
+
private:
void push(snort::Flow*);
void link_uni(snort::Flow*);
- int remove(snort::Flow*);
- int retire(snort::Flow*);
+ void remove(snort::Flow*);
+ void retire(snort::Flow*);
unsigned prune_unis(PktType);
+ unsigned delete_active_flows
+ (unsigned mode, unsigned num_to_delete, unsigned &deleted);
private:
static const unsigned cleanup_flows = 1;
- const FlowCacheConfig config;
+ FlowCacheConfig config;
uint32_t flags;
class ZHash* hash_table;
FlowUniList* uni_ip_flows;
PruneStats prune_stats;
+ FlowDeleteStats delete_stats;
};
#endif
//-------------------------------------------------------------------------
PegCount FlowControl::get_total_prunes() const
-{
- return cache->get_total_prunes();
-}
+{ return cache->get_total_prunes(); }
PegCount FlowControl::get_prunes(PruneReason reason) const
-{
- return cache->get_prunes(reason);
-}
+{ return cache->get_prunes(reason); }
+
+PegCount FlowControl::get_total_deletes() const
+{ return cache->get_total_deletes(); }
+
+PegCount FlowControl::get_deletes(FlowDeleteState state) const
+{ return cache->get_deletes(state); }
void FlowControl::clear_counts()
{
// cache foo
//-------------------------------------------------------------------------
+unsigned FlowControl::get_flows_allocated() const
+{ return cache->get_flows_allocated(); }
+
Flow* FlowControl::find_flow(const FlowKey* key)
{
return cache->find(key);
return cache->allocate(key);
}
-void FlowControl::delete_flow(const FlowKey* key)
+void FlowControl::release_flow(const FlowKey* key)
{
if ( auto flow = cache->find(key) )
cache->release(flow, PruneReason::HA);
}
-void FlowControl::delete_flow(Flow* flow, PruneReason reason)
+void FlowControl::release_flow(Flow* flow, PruneReason reason)
{
cache->release(flow, reason);
}
cache->purge();
}
+unsigned FlowControl::delete_flows(unsigned num_to_delete)
+{
+ return cache->delete_flows(num_to_delete);
+}
+
// hole for memory manager/prune handler
bool FlowControl::prune_one(PruneReason reason, bool do_cleanup)
{
return ignore;
}
+void FlowControl::update_flow_cache_cfg(FlowCacheConfig& cfg)
+{
+ cache->set_flow_cache_config(cfg);
+}
+
int FlowControl::add_expected(
const Packet* ctrlPkt, PktType type, IpProtocol ip_proto,
const SfIp *srcIP, uint16_t srcPort,
class FlowCache;
enum class PruneReason : uint8_t;
+enum class FlowDeleteState : uint8_t;
class FlowControl
{
snort::Flow* find_flow(const snort::FlowKey*);
snort::Flow* new_flow(const snort::FlowKey*);
+ unsigned get_flows_allocated() const;
void init_proto(PktType, snort::InspectSsnFunc);
void init_exp(uint32_t max);
- void delete_flow(const snort::FlowKey*);
- void delete_flow(snort::Flow*, PruneReason);
+ void release_flow(const snort::FlowKey*);
+ void release_flow(snort::Flow*, PruneReason);
void purge_flows();
+ unsigned delete_flows(unsigned num_to_delete);
bool prune_one(PruneReason, bool do_cleanup);
snort::Flow* stale_flow_cleanup(FlowCache*, snort::Flow*, snort::Packet*);
+ void update_flow_cache_cfg(FlowCacheConfig& cfg);
void timeout_flows(time_t cur_time);
-
bool expected_flow(snort::Flow*, snort::Packet*);
bool is_expected(snort::Packet*);
int add_expected(
const snort::Packet* ctrlPkt, PktType, IpProtocol,
const snort::SfIp *srcIP, uint16_t srcPort,
- const snort::SfIp *dstIP, uint16_t dstPort,
+ const snort::SfIp *dstIP, uint16_t dstPort,
char direction, snort::FlowData*);
int add_expected(
PegCount get_total_prunes() const;
PegCount get_prunes(PruneReason) const;
-
+ PegCount get_total_deletes() const;
+ PegCount get_deletes(FlowDeleteState state) const;
void clear_counts();
private:
PegCount prunes[static_cast<reason_t>(PruneReason::MAX)] { };
- PegCount get_total() const;
+ PegCount get_total() const
+ {
+ PegCount total = 0;
+ for ( reason_t i = 0; i < static_cast<reason_t>(PruneReason::NONE); ++i )
+ total += prunes[i];
+
+ return total;
+ }
PegCount& get(PruneReason reason)
{ return prunes[static_cast<reason_t>(reason)]; }
{ ++get(reason); }
};
-inline PegCount PruneStats::get_total() const
+enum class FlowDeleteState : uint8_t
+{
+ FREELIST,
+ ALLOWED,
+ OFFLOADED,
+ BLOCKED,
+ MAX
+};
+
+struct FlowDeleteStats
{
- PegCount total = 0;
- for ( reason_t i = 0; i < static_cast<reason_t>(PruneReason::NONE); ++i )
- total += prunes[i];
+ using state_t = std::underlying_type<FlowDeleteState>::type;
- return total;
-}
+ PegCount deletes[static_cast<state_t>(FlowDeleteState::MAX)] { };
+
+ PegCount get_total() const
+ {
+ PegCount total = 0;
+ for ( state_t i = 0; i < static_cast<state_t>(PruneReason::MAX); ++i )
+ total += deletes[i];
+
+ return total;
+ }
+ PegCount& get(FlowDeleteState state)
+ { return deletes[static_cast<state_t>(state)]; }
+
+ const PegCount& get(FlowDeleteState state) const
+ { return deletes[static_cast<state_t>(state)]; }
+
+ void update(FlowDeleteState state)
+ { ++get(state); }
+};
#endif
SOURCES ../flow_control.cc
)
+add_cpputest( flow_cache_test
+ SOURCES
+ ../flow_cache.cc
+ ../flow_control.cc
+ ../flow_key.cc
+ ../../hash/zhash.cc
+ ../../hash/hashfcn.cc
+ ../../hash/primetable.cc
+)
+
add_cpputest( session_test )
add_cpputest( flow_test
--- /dev/null
+
+// Copyright (C) 2019-2019 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// flow_control_test.cc author Shivakrishna Mulka <smulka@cisco.com>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <daq_common.h>
+
+#include "flow/flow_control.h"
+
+#include "detection/detection_engine.h"
+#include "main/snort_config.h"
+#include "managers/inspector_manager.h"
+#include "memory/memory_cap.h"
+#include "packet_io/active.h"
+#include "packet_tracer/packet_tracer.h"
+#include "protocols/icmp4.h"
+#include "protocols/packet.h"
+#include "protocols/tcp.h"
+#include "protocols/udp.h"
+#include "protocols/vlan.h"
+#include "stream/stream.h"
+#include "utils/util.h"
+#include "flow/expect_cache.h"
+#include "flow/flow_cache.h"
+#include "flow/ha.h"
+#include "flow/session.h"
+
+#include <CppUTest/CommandLineTestRunner.h>
+#include <CppUTest/TestHarness.h>
+
+using namespace snort;
+
+THREAD_LOCAL bool Active::s_suspend = false;
+
+THREAD_LOCAL PacketTracer* snort::s_pkt_trace = nullptr;
+
+PacketTracer::PacketTracer() { }
+PacketTracer::~PacketTracer() { }
+void PacketTracer::log(const char* format, ...) { }
+void PacketTracer::open_file() { }
+void PacketTracer::dump_to_daq(Packet* p) { }
+void PacketTracer::reset() { }
+Packet::Packet(bool) { }
+Packet::~Packet() { }
+Flow::Flow() { memset(this, 0, sizeof(*this)); }
+Flow::~Flow() { }
+DetectionEngine::DetectionEngine() { }
+ExpectCache::~ExpectCache() { }
+DetectionEngine::~DetectionEngine() { }
+void Flow::init(PktType type) { }
+void Flow::term() { }
+void Flow::reset(bool) { }
+void set_network_policy(SnortConfig* sc, unsigned i) { }
+void DataBus::publish(const char* key, const uint8_t* buf, unsigned len, Flow* f) { }
+void DataBus::publish(const char* key, Packet* p, Flow* f) { }
+SnortConfig* SnortConfig::get_conf() { return nullptr; }
+void Flow::set_direction(Packet* p) { }
+void set_inspection_policy(SnortConfig* sc, unsigned i) { }
+void set_ips_policy(SnortConfig* sc, unsigned i) { }
+void Flow::set_mpls_layer_per_dir(Packet* p) { }
+void DetectionEngine::disable_all(Packet* p) { }
+void Stream::drop_traffic(const Packet* p, char dir) { }
+bool Stream::blocked_flow(Packet* p) { return true; }
+ExpectCache::ExpectCache(uint32_t max) { }
+bool ExpectCache::check(Packet* p, Flow* lws) { return true; }
+bool ExpectCache::is_expected(Packet* p) { return true; }
+Flow* HighAvailabilityManager::import(Packet& p, FlowKey& key) { return nullptr; }
+bool HighAvailabilityManager::in_standby(Flow* flow) { return true; }
+SfIpRet SfIp::set(void const*, int) { return SfIpRet::SFIP_SUCCESS; }
+namespace memory
+{
+void MemoryCap::update_allocations(unsigned long m) { }
+void MemoryCap::update_deallocations(unsigned long m) { }
+bool MemoryCap::over_threshold() { return true; }
+}
+
+namespace snort
+{
+namespace layer
+{
+const vlan::VlanTagHdr* get_vlan_layer(const Packet* const p) { return nullptr; }
+}
+time_t packet_time() { return 0; }
+}
+
+namespace snort
+{
+namespace ip
+{
+uint32_t IpApi::id() const { return 0; }
+}
+}
+
+void Stream::stop_inspection(
+ Flow* flow, Packet* p, char dir,
+ int32_t /*bytes*/, int /*response*/) { }
+
+
+int ExpectCache::add_flow(const Packet *ctrlPkt,
+ PktType type, IpProtocol ip_proto,
+ const SfIp* cliIP, uint16_t cliPort,
+ const SfIp* srvIP, uint16_t srvPort,
+ char direction, FlowData* fd, SnortProtocolId snort_protocol_id)
+{
+ return 1;
+}
+
+TEST_GROUP(flow_prune) { };
+
+// No flows in the flow cache, pruning should not happen
+TEST(flow_prune, empty_cache_prune_flows)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 3;
+ FlowCache *cache = new FlowCache(fcg);
+
+ CHECK(cache->get_count() == 0);
+ CHECK(cache->delete_flows(1) == 0);
+ CHECK(cache->get_count() == 0);
+ delete cache;
+}
+
+// Do not delete blocked flow
+TEST(flow_prune, blocked_flow_prune_flows)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 2;
+ FlowCache *cache = new FlowCache(fcg);
+
+ Flow *list_flows[fcg.max_flows];
+ int first_port = 1;
+ int second_port = 2;
+
+ // Add two flows in the flow cache
+ FlowKey flow_key;
+ memset(&flow_key, 0, sizeof(FlowKey));
+ flow_key.pkt_type = PktType::TCP;
+
+ flow_key.port_l = first_port;
+ list_flows[0] = cache->allocate(&flow_key);
+
+ flow_key.port_l = second_port;
+ list_flows[1] = cache->allocate(&flow_key);
+
+ CHECK(cache->get_count() == fcg.max_flows);
+
+ // block the second flow
+ list_flows[1]->block();
+
+ // Access the first flow
+ // This will move it to the MRU
+ flow_key.port_l = first_port;
+ CHECK(cache->find(&flow_key) != nullptr);
+
+ // Prune one flow. This should delete the MRU flow, since
+ // LRU flow is blocked
+ CHECK(cache->delete_flows(1) == 1);
+
+ // Blocked Flow should still be there
+ flow_key.port_l = second_port;
+ CHECK(cache->find(&flow_key) != nullptr);
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+// Add 3 flows in flow cache and delete one
+TEST(flow_prune, prune_flows)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 3;
+ FlowCache *cache = new FlowCache(fcg);
+ int port = 1;
+
+ Flow *list_flows[fcg.max_flows];
+ for ( int i = 0; i < fcg.max_flows; i++ )
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.pkt_type = PktType::TCP;
+ list_flows[i] = cache->allocate(&flow_key);
+ }
+
+ CHECK(cache->get_count() == fcg.max_flows);
+ CHECK(cache->delete_flows(1) == 1);
+ CHECK(cache->get_count() == fcg.max_flows-1);
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+
+// Add 3 flows in flow cache, delete all
+TEST(flow_prune, prune_all_flows)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 3;
+ FlowCache *cache = new FlowCache(fcg);
+ int port = 1;
+
+ Flow *list_flows[fcg.max_flows];
+ for ( int i = 0; i < fcg.max_flows; i++ )
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.pkt_type = PktType::TCP;
+ list_flows[i] = cache->allocate(&flow_key);
+ }
+
+ CHECK(cache->get_count() == fcg.max_flows);
+ CHECK(cache->delete_flows(3) == 3);
+ CHECK(cache->get_count() == 0);
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+// Add 3 flows, all blocked, in flow cache, delete all
+TEST(flow_prune, prune_all_blocked_flows)
+{
+ FlowCacheConfig fcg;
+ fcg.max_flows = 3;
+ FlowCache *cache = new FlowCache(fcg);
+ int port = 1;
+
+ Flow *list_flows[fcg.max_flows];
+ for ( int i = 0; i < fcg.max_flows; i++ )
+ {
+ FlowKey flow_key;
+ flow_key.port_l = port++;
+ flow_key.pkt_type = PktType::TCP;
+ list_flows[i] = cache->allocate(&flow_key);
+ list_flows[i]->block();
+ }
+
+ CHECK(cache->get_count() == fcg.max_flows);
+ CHECK(cache->delete_flows(3) == 3);
+ CHECK(cache->get_count() == 0);
+
+ cache->purge();
+ CHECK(cache->get_flows_allocated() == 0);
+ delete cache;
+}
+
+int main(int argc, char** argv)
+{
+ return CommandLineTestRunner::RunAllTests(argc, argv);
+}
DetectionEngine::~DetectionEngine() = default;
ExpectCache::~ExpectCache() = default;
unsigned FlowCache::purge() { return 1; }
-Flow* FlowCache::find(const FlowKey*) { return nullptr; }
+Flow* FlowCache::find(const FlowKey* key) { return nullptr; }
Flow* FlowCache::allocate(const FlowKey*) { return nullptr; }
-void FlowCache::push(Flow*) { }
+void FlowCache::push(Flow* flow) { }
bool FlowCache::prune_one(PruneReason, bool) { return true; }
-unsigned FlowCache::timeout(unsigned, time_t) { return 1; }
-void Flow::init(PktType) { }
-void set_network_policy(SnortConfig*, unsigned) { }
-void DataBus::publish(const char*, const uint8_t*, unsigned, Flow*) { }
-void DataBus::publish(const char*, Packet*, Flow*) { }
+unsigned FlowCache::delete_flows(unsigned) { return 0; }
+unsigned FlowCache::timeout(unsigned num_flows, time_t thetime) { return 1; }
+void Flow::init(PktType type) { }
+void set_network_policy(SnortConfig* sc, unsigned i) { }
+void DataBus::publish(const char* key, const uint8_t* buf, unsigned len, Flow* f) { }
+void DataBus::publish(const char* key, Packet* p, Flow* f) { }
SnortConfig* SnortConfig::get_conf() { return nullptr; }
void FlowCache::unlink_uni(Flow*) { }
void Flow::set_direction(Packet*) { }
void Stream::stop_inspection(Flow*, Packet*, char, int32_t, int) { }
-
int ExpectCache::add_flow(const Packet*,
PktType, IpProtocol,
const SfIp*, uint16_t,
return 1;
}
-int FlowCache::release(Flow*, PruneReason, bool)
-{
- return 1;
-}
+void FlowCache::release(Flow*, PruneReason, bool) { }
TEST_GROUP(stale_flow) { };
typedef uint32_t (* hash_func)(HashFnc*, const unsigned char* d, int n);
+
// return 0 for ==, 1 for != ; FIXIT-L convert to bool
typedef int (* keycmp_func)(const void* s1, const void* s2, size_t n);
if ( table[node->rindex] ) // UNINITUSE
{
node->prev = nullptr;
- node->next=table[node->rindex];
+ node->next = table[node->rindex];
table[node->rindex]->prev = node;
table[node->rindex] = node;
}
else
{
- node->prev=nullptr;
- node->next=nullptr;
+ node->prev = nullptr;
+ node->next = nullptr;
table[node->rindex] = node; // UNINITUSE
}
}
if ( table )
{
- for ( unsigned i=0; i < nrows; ++i )
+ for ( unsigned i = 0; i < nrows; ++i )
{
- for ( ZHashNode* node=table[i]; node; )
+ for ( ZHashNode* node = table[i]; node; )
{
ZHashNode* onode = node;
node = node->next;
return false;
}
-bool ZHash::remove(ZHashNode* node)
+bool ZHash::move_to_free_list(ZHashNode* node)
{
if ( !node )
return false;
unlink_node(node);
gunlink_node(node);
-
count--;
save_free_node(node);
return true;
}
-bool ZHash::remove()
+bool ZHash::release()
{
ZHashNode* node = cursor;
cursor = nullptr;
- return remove(node);
+ return move_to_free_list(node);
}
-bool ZHash::remove(const void* key)
+bool ZHash::release(const void* key)
{
int row;
ZHashNode* node = find_node_row(key, row);
- return remove(node);
+ return move_to_free_list(node);
+}
+
+void* ZHash::remove(const void* key)
+{
+ void* pv = nullptr;
+ int row;
+ ZHashNode* node = find_node_row(key, row);
+ if ( node )
+ {
+ unlink_node(node);
+ gunlink_node(node);
+ count--;
+ pv = node->data;
+ s_node_free(node);
+ }
+
+ return pv;
}
int ZHash::set_keyops(
return -1;
}
-
void* find(const void* key);
void* get(const void* key, bool *new_node = nullptr);
-
- bool remove(const void* key);
- bool remove();
+ bool release(const void* key);
+ bool release();
+ void* remove(const void* key);
inline unsigned get_count() { return count; }
-
int set_keyops(
unsigned (* hash_fcn)(HashFnc* p, const unsigned char* d, int n),
int (* keycmp_fcn)(const void* s1, const void* s2, size_t n));
void delete_free_list();
void save_free_node(ZHashNode*);
- bool remove(ZHashNode*);
+ bool move_to_free_list(ZHashNode*);
void move_to_front(ZHashNode*);
int nearest_powerof2(int nrows);
private:
HashFnc* hashfcn;
int keysize;
-
unsigned nrows;
unsigned count;
unsigned find_success;
ZHashNode** table;
- ZHashNode* ghead, * gtail;
+ ZHashNode* ghead;
+ ZHashNode* gtail;
ZHashNode* fhead;
ZHashNode* cursor;
};
UncompletedAnalyzerCommand(snort::AnalyzerCommand* ac, void* acs) : command(ac), state(acs)
{ }
- snort::AnalyzerCommand* command = nullptr;
- void* state = nullptr;
+ snort::AnalyzerCommand* command;
+ void* state;
};
class Analyzer
private:
std::atomic<State> state;
-
unsigned id;
bool exit_requested = false;
-
uint64_t exit_after_cnt;
uint64_t pause_after_cnt = 0;
uint64_t skip_cnt = 0;
-
std::string source;
snort::SFDAQInstance* daq_instance;
RetryQueue* retry_queue = nullptr;
{
reload_tuners = new std::list<ReloadResourceTuner*>(sc->get_reload_resource_tuners());
*ac_state = reload_tuners;
+ for (auto const& rtt : *reload_tuners)
+ rtt->tinit();
+
}
else
reload_tuners = (std::list<ReloadResourceTuner*>*)*ac_state;
if ( !reload_tuners->empty() )
{
auto rrt = reload_tuners->front();
- if (rrt->tune_resources())
+ if ( rrt->tune_packet_context() )
reload_tuners->pop_front();
}
namespace snort
{
class ProtocolReference;
-class ReloadResourceTuner;
struct ProfilerConfig;
struct GHash;
struct XHash;
virtual ~ReloadResourceTuner() = default;
- virtual bool tune_resources() = 0;
- virtual bool tune_resources_idle() = 0;
+ virtual void tinit() { }
+ virtual bool tune_packet_context() = 0;
+ virtual bool tune_idle_context() = 0;
protected:
ReloadResourceTuner() = default;
+ virtual bool tune_resources(unsigned work_limit) = 0;
unsigned max_work = RELOAD_MAX_WORK_PER_PACKET;
unsigned max_work_idle = RELOAD_MAX_WORK_WHEN_IDLE;
{ CountType::SUM, "expected_realized", "number of expected flows realized" },
{ CountType::SUM, "expected_pruned", "number of expected flows pruned" },
{ CountType::SUM, "expected_overflows", "number of expected cache overflows" },
+ { CountType::SUM, "reload_total_adds", "number of flows added by config reloads" },
+ { CountType::SUM, "reload_total_deletes", "number of flows deleted by config reloads" },
+ { CountType::SUM, "reload_freelist_deletes", "number of flows deleted from the free list by config reloads" },
+ { CountType::SUM, "reload_allowed_deletes", "number of allowed flows deleted by config reloads" },
+ { CountType::SUM, "reload_blocked_deletes", "number of blocked flows deleted by config reloads" },
+ { CountType::SUM, "reload_offloaded_deletes", "number of offloaded flows deleted by config reloads" },
{ CountType::END, nullptr, nullptr }
};
stream_base_stats.preemptive_prunes = flow_con->get_prunes(PruneReason::PREEMPTIVE);
stream_base_stats.memcap_prunes = flow_con->get_prunes(PruneReason::MEMCAP);
stream_base_stats.ha_prunes = flow_con->get_prunes(PruneReason::HA);
-
+ stream_base_stats.reload_freelist_flow_deletes = flow_con->get_deletes(FlowDeleteState::FREELIST);
+ stream_base_stats.reload_allowed_flow_deletes = flow_con->get_deletes(FlowDeleteState::ALLOWED);
+ stream_base_stats.reload_offloaded_flow_deletes= flow_con->get_deletes(FlowDeleteState::OFFLOADED);
+ stream_base_stats.reload_blocked_flow_deletes= flow_con->get_deletes(FlowDeleteState::BLOCKED);
ExpectCache* exp_cache = flow_con->get_exp_cache();
if ( exp_cache )
return true;
}
+
//-------------------------------------------------------------------------
// inspector stuff
//-------------------------------------------------------------------------
{
public:
StreamBase(const StreamModuleConfig*);
-
void show(SnortConfig*) override;
void tinit() override;
if ( config.flow_cache_cfg.max_flows > 0 )
flow_con->init_exp(config.flow_cache_cfg.max_flows);
-
+
FlushBucket::set(config.footprint);
}
{ return stream_rules; }
const StreamModuleConfig* StreamModule::get_data()
-{
- return &config;
-}
+{ return &config; }
bool StreamModule::begin(const char* fqn, int, SnortConfig*)
{
return true;
}
-static int check_stream_config(const FlowCacheConfig& new_cfg, const FlowCacheConfig& saved_cfg)
-{
- int ret = 0;
-
- if ( saved_cfg.max_flows != new_cfg.max_flows
- or saved_cfg.pruning_timeout != new_cfg.pruning_timeout )
- {
- ReloadError("Change of stream flow cache options requires a restart\n");
- ret = 1;
- }
-
- return ret;
-}
-
-static int check_stream_proto_config(const FlowCacheConfig& new_cfg, const FlowCacheConfig& saved_cfg, PktType type)
-{
- int ret = 0;
-
- if ( saved_cfg.proto[to_utype(type)].nominal_timeout != new_cfg.proto[to_utype(type)].nominal_timeout )
- {
- ReloadError("Change of stream protocol configuration options requires a restart\n");
- ret = 1;
- }
-
- return ret;
-}
-
// FIXIT-L the detection of stream.xxx_cache changes below is a temporary workaround
// remove this check when stream.xxx_cache params become reloadable
-bool StreamModule::end(const char* fqn, int, SnortConfig*)
+bool StreamModule::end(const char* fqn, int, SnortConfig* cfg)
{
static StreamModuleConfig saved_config = {};
- static int issue_found = 0;
if ( saved_config.flow_cache_cfg.max_flows )
{
- // FIXIT-H - stream reload story will change this to look for change to max_flows config option
- issue_found += check_stream_config(config.flow_cache_cfg, saved_config.flow_cache_cfg);
- issue_found += check_stream_proto_config(config.flow_cache_cfg, saved_config.flow_cache_cfg, PktType::IP);
- issue_found += check_stream_proto_config(config.flow_cache_cfg, saved_config.flow_cache_cfg, PktType::UDP);
- issue_found += check_stream_proto_config(config.flow_cache_cfg, saved_config.flow_cache_cfg, PktType::TCP);
- issue_found += check_stream_proto_config(config.flow_cache_cfg, saved_config.flow_cache_cfg, PktType::ICMP);
- issue_found += check_stream_proto_config(config.flow_cache_cfg, saved_config.flow_cache_cfg, PktType::PDU);
- issue_found += check_stream_proto_config(config.flow_cache_cfg, saved_config.flow_cache_cfg, PktType::FILE);
+ int max_flows_change = config.flow_cache_cfg.max_flows - saved_config.flow_cache_cfg.max_flows;
+ if ( max_flows_change )
+ {
+ // register handler
+ reload_resource_manager.initialize(config.flow_cache_cfg, max_flows_change);
+ cfg->register_reload_resource_tuner(reload_resource_manager);
+ }
}
if ( !strcmp(fqn, "stream") )
and config.footprint != saved_config.footprint )
{
ReloadError("Changing of stream.footprint requires a restart\n");
- issue_found++;
}
- if ( issue_found == 0 )
+ else
saved_config = config;
- issue_found = 0;
}
return true;
void StreamModule::reset_stats()
{ base_reset(); }
+// Stream handler to adjust allocated resources as needed on a config reload
+void StreamReloadResourceManager::initialize(FlowCacheConfig& config_, int max_flows_change_)
+{
+ config = config_;
+ max_flows_change = max_flows_change_;
+}
+
+void StreamReloadResourceManager::tinit()
+{
+ flow_con->update_flow_cache_cfg(config);
+ if ( max_flows_change < 0 )
+ stream_base_stats.reload_total_deletes += abs(max_flows_change);
+ else
+ stream_base_stats.reload_total_adds += max_flows_change;
+}
+
+bool StreamReloadResourceManager::tune_packet_context()
+{
+ return tune_resources(max_work);
+}
+
+bool StreamReloadResourceManager::tune_idle_context()
+{
+ return tune_resources(max_work_idle);
+}
+
+bool StreamReloadResourceManager::tune_resources(unsigned work_limit)
+{
+ // we are done if new max is > currently allocated flow objects
+ if ( flow_con->get_flows_allocated() <= config.max_flows )
+ return true;
+
+ unsigned flows_to_delete = flow_con->get_flows_allocated() - config.max_flows;
+ if ( flows_to_delete > work_limit )
+ flows_to_delete -= flow_con->delete_flows(work_limit);
+ else
+ flows_to_delete -= flow_con->delete_flows(flows_to_delete);
+
+ return ( flows_to_delete ) ? false : true;
+}
+
#ifndef STREAM_MODULE_H
#define STREAM_MODULE_H
+#include "main/analyzer.h"
+#include "main/snort_config.h"
#include "flow/flow_config.h"
+#include "flow/flow_control.h"
#include "framework/module.h"
extern THREAD_LOCAL snort::ProfileStats s5PerfStats;
+extern THREAD_LOCAL class FlowControl* flow_con;
namespace snort
{
PegCount expected_realized;
PegCount expected_pruned;
PegCount expected_overflows;
+ PegCount reload_total_adds;
+ PegCount reload_total_deletes;
+ PegCount reload_freelist_flow_deletes;
+ PegCount reload_allowed_flow_deletes;
+ PegCount reload_blocked_flow_deletes;
+ PegCount reload_offloaded_flow_deletes;
};
extern const PegInfo base_pegs[];
unsigned footprint;
};
+class StreamReloadResourceManager : public snort::ReloadResourceTuner
+{
+public:
+ StreamReloadResourceManager() {}
+
+ void initialize(FlowCacheConfig&, int max_flows_change_);
+ void tinit() override;
+ bool tune_packet_context() override;
+ bool tune_idle_context() override;
+
+private:
+ bool tune_resources(unsigned work_limit) override;
+
+private:
+ FlowCacheConfig config;
+ int max_flows_change = 0;
+};
+
class StreamModule : public snort::Module
{
public:
private:
StreamModuleConfig config;
+ StreamReloadResourceManager reload_resource_manager;
};
extern void base_sum();
extern void base_reset();
#endif
-
if ( sz )
s_flush_bucket = new ConstFlushBucket(sz);
-
else if ( SnortConfig::static_hash() )
s_flush_bucket = new StaticFlushBucket;
-
else
s_flush_bucket = new RandomFlushBucket;
{ return flow_con->new_flow(key); }
void Stream::delete_flow(const FlowKey* key)
-{ flow_con->delete_flow(key); }
+{ flow_con->release_flow(key); }
void Stream::delete_flow(Flow* flow)
{
- flow_con->delete_flow(flow, PruneReason::NONE);
+ flow_con->release_flow(flow, PruneReason::NONE);
}
//-------------------------------------------------------------------------
// this will get called on each onload
// eventually all onloads will occur and delete will be called
if ( not flow->is_suspended() )
- flow_con->delete_flow(flow, PruneReason::NONE);
+ flow_con->release_flow(flow, PruneReason::NONE);
p->flow = nullptr;
}
void Stream::purge_flows()
{
- if ( !flow_con )
- return;
-
- flow_con->purge_flows();
+ if ( flow_con )
+ flow_con->purge_flows();
}
void Stream::timeout_flows(time_t cur_time)
{
- if ( !flow_con )
- return;
-
// FIXIT-M batch here or loop vs looping over idle?
- flow_con->timeout_flows(cur_time);
+ if ( flow_con )
+ flow_con->timeout_flows(cur_time);
}
void Stream::prune_flows()
{
- if ( !flow_con )
- return;
-
- flow_con->prune_one(PruneReason::MEMCAP, false);
+ if ( flow_con )
+ flow_con->prune_one(PruneReason::MEMCAP, false);
}
bool Stream::expected_flow(Flow* f, Packet* p)
{
- return flow_con->expected_flow(f, p) != SSN_DIR_NONE;
+ if ( flow_con )
+ return flow_con->expected_flow(f, p) != SSN_DIR_NONE;
+ return false;
}
//-------------------------------------------------------------------------
TcpStreamSession::setup(p);
splitter_init = false;
- const TcpStreamConfig* pc = get_tcp_cfg(flow->ssn_server);
- flow->set_default_session_timeout(pc->session_timeout, false);
+ const TcpStreamConfig* cfg = get_tcp_cfg(flow->ssn_server);
+ flow->set_default_session_timeout(cfg->session_timeout, false);
SESSION_STATS_ADD(tcpStats);
tcpStats.setups++;
bool TcpStateSynRecv::syn_ack_recv(TcpSegmentDescriptor& tsd, TcpStreamTracker& trk)
{
- if ( trk.is_ack_valid(tsd.get_seg_ack() ) )
+ if ( trk.is_ack_valid(tsd.get_seg_ack()) )
{
Flow* flow = tsd.get_flow();
bool TcpStateSynRecv::ack_recv(TcpSegmentDescriptor& tsd, TcpStreamTracker& trk)
{
- if ( trk.is_ack_valid(tsd.get_seg_ack() ) )
+ if ( trk.is_ack_valid(tsd.get_seg_ack()) )
{
Flow* flow = tsd.get_flow();
bool TcpStateSynRecv::data_seg_recv(TcpSegmentDescriptor& tsd, TcpStreamTracker& trk)
{
- if ( trk.is_ack_valid(tsd.get_seg_ack() ) )
+ if ( trk.is_ack_valid(tsd.get_seg_ack()) )
{
trk.update_tracker_ack_recv(tsd);
tsd.get_pkt()->packet_flags |= PKT_STREAM_TWH;