From: Russ Combs Date: Thu, 27 Oct 2016 00:38:15 +0000 (-0400) Subject: next step - asynchronous, in-order offload X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=345cb98de75cadae3b3a29ec24a0cdde02ac3550;p=thirdparty%2Fsnort3.git next step - asynchronous, in-order offload --- diff --git a/src/detection/context_switcher.cc b/src/detection/context_switcher.cc index da2e4fe6d..71b0743d0 100644 --- a/src/detection/context_switcher.cc +++ b/src/detection/context_switcher.cc @@ -26,6 +26,7 @@ #include +#include "utils/stats.h" #include "ips_context.h" #ifdef UNIT_TEST @@ -69,6 +70,8 @@ void ContextSwitcher::start() { assert(busy.empty()); assert(idle.size() > 0); +//printf("%ld cs::start %u (i=%lu, b=%lu)\n", +// pc.total_from_daq, idle.back()->get_slot(), idle.size(), busy.size()); busy.push_back(idle.back()); idle.pop_back(); } @@ -76,12 +79,16 @@ void ContextSwitcher::start() void ContextSwitcher::stop() { assert(busy.size() == 1); +//printf("%ld cs::stop %u (i=%lu, b=%lu)\n", +// pc.total_from_daq, busy.back()->get_slot(), idle.size(), busy.size()); idle.push_back(busy.back()); busy.pop_back(); } void ContextSwitcher::abort() { +//printf("%ld cs::abort (i=%lu, b=%lu)\n", +// pc.total_from_daq, idle.size(), busy.size()); for ( unsigned i = 0; i < hold.capacity(); ++i ) { if ( hold[i] ) @@ -100,6 +107,8 @@ void ContextSwitcher::abort() IpsContext* ContextSwitcher::interrupt() { assert(!idle.empty()); +//printf("%ld cs::interrupt %u (i=%lu, b=%lu)\n", +// pc.total_from_daq, idle.back()->get_slot(), idle.size(), busy.size()); busy.push_back(idle.back()); idle.pop_back(); return busy.back(); @@ -108,6 +117,8 @@ IpsContext* ContextSwitcher::interrupt() IpsContext* ContextSwitcher::complete() { assert(!busy.empty()); +//printf("%ld cs::complete %u (i=%lu, b=%lu)\n", +// pc.total_from_daq, busy.back()->get_slot(), idle.size(), busy.size()); idle.push_back(busy.back()); busy.pop_back(); return busy.empty() ? nullptr : busy.back(); @@ -116,6 +127,8 @@ IpsContext* ContextSwitcher::complete() unsigned ContextSwitcher::suspend() { assert(!busy.empty()); +//printf("%ld cs::suspend %u (i=%lu, b=%lu)\n", +// pc.total_from_daq, busy.back()->get_slot(), idle.size(), busy.size()); IpsContext* c = busy.back(); busy.pop_back(); unsigned slot = c->get_slot(); @@ -127,6 +140,8 @@ unsigned ContextSwitcher::suspend() void ContextSwitcher::resume(unsigned slot) { assert(slot <= hold.capacity()); +//printf("%ld cs::resume %u (i=%lu, b=%lu)\n", +// pc.total_from_daq, slot, idle.size(), busy.size()); busy.push_back(hold[slot]); hold[slot] = nullptr; } @@ -145,6 +160,12 @@ IpsContext* ContextSwitcher::get_context(unsigned slot) const return c; } +IpsContext* ContextSwitcher::get_next() const +{ + assert(!idle.empty()); + return idle.back(); +} + IpsContextData* ContextSwitcher::get_context_data(unsigned id) const { return get_context()->get_context_data(id); diff --git a/src/detection/context_switcher.h b/src/detection/context_switcher.h index 756ce812c..018779a4e 100644 --- a/src/detection/context_switcher.h +++ b/src/detection/context_switcher.h @@ -29,7 +29,7 @@ // available. // // 2. during processing interrupt and complete should be called to start -// and finis processing of a generated pseudo packet. it is possible to +// and finish processing of a generated pseudo packet. it is possible to // interrupt pseudo packets. // // 3. suspend may be called to place the current context on hold and @@ -67,6 +67,7 @@ public: IpsContext* get_context() const; IpsContext* get_context(unsigned) const; + IpsContext* get_next() const; IpsContextData* get_context_data(unsigned id) const; void set_context_data(unsigned id, IpsContextData*) const; diff --git a/src/detection/detection_engine.cc b/src/detection/detection_engine.cc index dff4d2624..e31b8ae73 100644 --- a/src/detection/detection_engine.cc +++ b/src/detection/detection_engine.cc @@ -23,6 +23,7 @@ #include "events/sfeventq.h" #include "filters/sfthreshold.h" #include "framework/endianness.h" +#include "helpers/ring.h" #include "latency/packet_latency.h" #include "main/snort.h" #include "main/snort_config.h" @@ -38,18 +39,29 @@ #include "context_switcher.h" #include "detection_util.h" #include "detect.h" +#include "fp_config.h" #include "fp_detect.h" #include "ips_context.h" +THREAD_LOCAL DetectionEngine::ActiveRules active_rules = DetectionEngine::NONE; + static THREAD_LOCAL unsigned s_events = 0; +static THREAD_LOCAL Ring* offload_ids = nullptr; -THREAD_LOCAL DetectionEngine::ActiveRules active_rules = DetectionEngine::NONE; +void DetectionEngine::thread_init() +{ offload_ids = new Ring(32); } // FIXIT-H get size + +void DetectionEngine::thread_term() +{ delete offload_ids; } DetectionEngine::DetectionEngine() -{ Snort::get_switcher()->interrupt(); } +{ context = Snort::get_switcher()->interrupt(); } DetectionEngine::~DetectionEngine() -{ clear_packet(); } +{ + if ( context == get_context() ) + clear_packet(); +} IpsContext* DetectionEngine::get_context() { return Snort::get_switcher()->get_context(); } @@ -76,12 +88,8 @@ MpseStash* DetectionEngine::get_stash() // any events while rebuilding will be logged against the current packet Packet* DetectionEngine::set_packet() { - ContextSwitcher* sw = Snort::get_switcher(); - - // FIXIT-H bypass the interrupt / complete - const IpsContext* c = sw->interrupt(); + const IpsContext* c = Snort::get_switcher()->get_next(); Packet* p = c->packet; - sw->complete(); p->pkth = c->pkth; p->data = c->buf; @@ -94,7 +102,12 @@ Packet* DetectionEngine::set_packet() void DetectionEngine::clear_packet() { ContextSwitcher* sw = Snort::get_switcher(); - Packet* p = sw->get_context()->packet; + IpsContext* c = sw->get_context(); + + if ( c->offload ) + return; + + Packet* p = c->packet; log_events(p); reset(); @@ -119,22 +132,14 @@ uint8_t* DetectionEngine::get_buffer(unsigned& max) // we do it this way. void DetectionEngine::set_next_file_data(const DataPointer& dp) { - ContextSwitcher* sw = Snort::get_switcher(); - - // FIXIT-H bypass the interrupt / complete - IpsContext* c = sw->interrupt(); + IpsContext* c = Snort::get_switcher()->get_next(); c->file_data = dp; - sw->complete(); } void DetectionEngine::get_next_file_data(DataPointer& dp) { - ContextSwitcher* sw = Snort::get_switcher(); - - // FIXIT-H bypass the interrupt / complete - IpsContext* c = sw->interrupt(); + const IpsContext* c = Snort::get_switcher()->get_next(); dp = c->file_data; - sw->complete(); } void DetectionEngine::set_file_data(const DataPointer& dp) @@ -164,6 +169,90 @@ void DetectionEngine::disable_content() void DetectionEngine::disable_all() { active_rules = NONE; } +bool DetectionEngine::offloaded(Flow* flow) +{ return flow->test_session_flags(SSNFLAG_OFFLOAD); } + +bool DetectionEngine::offloaded(Packet* p) +{ return p->flow and offloaded(p->flow); } + +void DetectionEngine::idle() +{ + while ( !offload_ids->empty() ) + { + const struct timespec blip = { 0, 1 }; +//printf("%lu de::sleep\n", pc.total_from_daq); + nanosleep(&blip, nullptr); + onload(); + } +//printf("%lu de::idle (r=%d)\n", pc.total_from_daq, offload_ids->count()); +} + +void DetectionEngine::onload(Flow* flow) +{ + while ( flow->test_session_flags(SSNFLAG_OFFLOAD) ) + { + const struct timespec blip = { 0, 1 }; +//printf("%lu de::sleep\n", pc.total_from_daq); + nanosleep(&blip, nullptr); + onload(); + } +} + +void DetectionEngine::onload() +{ + ContextSwitcher* sw = Snort::get_switcher(); + unsigned* id = offload_ids->read(); + IpsContext* c = sw->get_context(*id); + + assert(c->offload); + + if ( !c->onload ) + return; + +//printf("%lu de::onload %u (r=%d)\n", pc.total_from_daq, *id, offload_ids->count()); + Packet* p = c->packet; + p->flow->clear_session_flags(SSNFLAG_OFFLOAD); + + c->offload->join(); + delete c->offload; + c->offload = nullptr; + + offload_ids->pop(); + sw->resume(*id); + + fp_onload(p); + InspectorManager::clear(p); + log_events(p); + reset(); + clear_packet(); +} + +bool DetectionEngine::offload(Packet* p) +{ + ContextSwitcher* sw = Snort::get_switcher(); + FastPatternConfig* fp = snort_conf->fast_pattern_config; + + if ( p->type() != PktType::PDU or (p->dsize < fp->get_offload_limit()) or !sw->can_hold() ) + { + fp_local(p); + return false; + } + assert(p == p->context->packet); + onload(p->flow); // FIXIT-H ensures correct sequencing, suboptimal + + p->flow->set_session_flags(SSNFLAG_OFFLOAD|SSNFLAG_WAS_OFF); + pc.offloads++; + + unsigned id = sw->suspend(); + offload_ids->put(id); +//printf("%lu de::offload %u (r=%d)\n", pc.total_from_daq, id, offload_ids->count()); + + p->context->onload = false; + p->context->offload = new std::thread(fp_offload, p, snort_conf); + + return true; +} + bool DetectionEngine::detect(Packet* p) { assert(p); @@ -189,7 +278,7 @@ bool DetectionEngine::detect(Packet* p) case PktType::ICMP: case PktType::PDU: case PktType::FILE: - return fpEvalPacket(p); + return offload(p); default: break; @@ -222,29 +311,36 @@ void DetectionEngine::inspect(Packet* p) Active::apply_delayed_action(p); if ( active_rules > NONE ) - detect(p); + { + if ( detect(p) ) + return; + } } enable_tags(); + // By checking tagging here, we make sure that we log the + // tagged packet whether it generates an alert or not. + + if ( p->has_ip() ) + check_tags(p); + + if ( offloaded(p) ) + return; + // clear closed sessions here after inspection since non-stream // inspectors may depend on flow information // FIXIT-H but this result in double clearing? should normal // clear_session() calls be deleted from stream? this is a // performance hit on short-lived flows - Stream::check_flow_closed(p); - /* - ** By checking tagging here, we make sure that we log the - ** tagged packet whether it generates an alert or not. - */ - if ( p->has_ip() ) - check_tags(p); + Stream::check_flow_closed(p); if ( inspected ) InspectorManager::clear(p); } Profile profile(eventqPerfStats); + log_events(p); reset(); } diff --git a/src/detection/detection_engine.h b/src/detection/detection_engine.h index 3cce49466..fe8da021b 100644 --- a/src/detection/detection_engine.h +++ b/src/detection/detection_engine.h @@ -32,6 +32,7 @@ struct DataPointer; struct Packet; +class Flow; class IpsContext; class IpsContextData; @@ -44,12 +45,23 @@ public: Packet* get_packet(); public: + static void thread_init(); + static void thread_term(); + static IpsContext* get_context(); static Packet* get_current_packet(); static Packet* set_packet(); static void clear_packet(); + static bool offloaded(Flow*); + static bool offloaded(Packet*); + static bool offload(Packet*); + + static void onload(Flow*); + static void onload(); + static void idle(); + static void set_encode_packet(Packet*); static Packet* get_encode_packet(); @@ -92,6 +104,7 @@ public: { return get_detects() == CONTENT; } private: + IpsContext* context; static struct SF_EVENTQ* get_event_queue(); }; diff --git a/src/detection/fp_config.cc b/src/detection/fp_config.cc index a343cb7f7..885c0c107 100644 --- a/src/detection/fp_config.cc +++ b/src/detection/fp_config.cc @@ -46,6 +46,8 @@ FastPatternConfig::FastPatternConfig() search_api = MpseManager::get_search_api("ac_bnfa"); assert(search_api); trim = MpseManager::search_engine_trim(search_api); + + offload_limit = 99999; // FIXIT-H use common value } FastPatternConfig::~FastPatternConfig() diff --git a/src/detection/fp_detect.cc b/src/detection/fp_detect.cc index 0d896862b..60df556eb 100644 --- a/src/detection/fp_detect.cc +++ b/src/detection/fp_detect.cc @@ -935,50 +935,6 @@ static int fp_search( return 0; } -static int fp_tsearch( - PortGroup* port_group, Packet* p, int check_ports, int type, OtnxMatchData* omd, - SnortConfig* sc) -{ - snort_conf = sc; - return fp_search(port_group, p, check_ports, type, omd); -} - -static int fp_offload( - PortGroup* port_group, Packet* p, int check_ports, int type, OtnxMatchData* omd) -{ - MpseStash* stash = p->context->stash; - ContextSwitcher* sw = Snort::get_switcher(); - FastPatternConfig* fp = snort_conf->fast_pattern_config; - - if ( p->type() != PktType::PDU or (p->dsize < fp->get_offload_limit()) or - p->flow->test_session_flags(SSNFLAG_OFFLOAD) or !sw->can_hold() ) - { - stash->enable_process(); - return fp_search(port_group, p, check_ports, type, omd); - } - - assert(p == p->context->packet); - stash->init(); - stash->disable_process(); - - p->flow->set_session_flags(SSNFLAG_OFFLOAD); - pc.offloads++; - - unsigned id = sw->suspend(); - - std::thread t(fp_tsearch, port_group, p, check_ports, type, omd, snort_conf); - t.join(); - - sw->resume(id); - - p->flow->clear_session_flags(SSNFLAG_OFFLOAD); - stash->enable_process(); - stash->process(rule_tree_match, omd); - fpFinalSelectEvent(omd, p); - - return 0; -} - /* ** DESCRIPTION ** This function does a set-wise match on content, and walks an otn list @@ -1025,7 +981,7 @@ static inline int fpEvalHeaderSW(PortGroup* port_group, Packet* p, if ( DetectionEngine::content_enabled() ) { if ( fp->get_stream_insert() || !(p->packet_flags & PKT_STREAM_INSERT) ) - if ( fp_offload(port_group, p, check_ports, type, omd) ) + if ( fp_search(port_group, p, check_ports, type, omd) ) return 0; } @@ -1326,9 +1282,36 @@ int fpEvalPacket(Packet* p) default: break; } - if ( !p->flow or !p->flow->test_session_flags(SSNFLAG_OFFLOAD) ) - return fpFinalSelectEvent(omd, p); return 0; } +void fp_local(Packet* p) +{ + IpsContext* c = p->context; + MpseStash* stash = c->stash; + stash->enable_process(); + stash->init(); + fpEvalPacket(p); + fpFinalSelectEvent(c->otnx, p); +} + +void fp_offload(Packet* p, SnortConfig* sc) +{ + snort_conf = sc; // FIXIT-H reload issue + MpseStash* stash = p->context->stash; + stash->init(); + stash->disable_process(); + fpEvalPacket(p); + p->context->onload = true; +} + +void fp_onload(Packet* p) +{ + IpsContext* c = p->context; + MpseStash* stash = c->stash; + stash->enable_process(); + stash->process(rule_tree_match, c->otnx); + fpFinalSelectEvent(c->otnx, p); +} + diff --git a/src/detection/fp_detect.h b/src/detection/fp_detect.h index 39b2ac893..dc3094464 100644 --- a/src/detection/fp_detect.h +++ b/src/detection/fp_detect.h @@ -102,5 +102,9 @@ class IpsContext; void fp_set_context(IpsContext&); void fp_clear_context(IpsContext&); +void fp_local(Packet*); +void fp_offload(Packet*, struct SnortConfig*); +void fp_onload(Packet*); + #endif diff --git a/src/detection/ips_context.cc b/src/detection/ips_context.cc index 5f136c22a..078b8e9a8 100644 --- a/src/detection/ips_context.cc +++ b/src/detection/ips_context.cc @@ -67,6 +67,9 @@ IpsContext::IpsContext(unsigned size) : packet->context = this; fp_set_context(*this); + + offload = nullptr; + onload = false; } IpsContext::~IpsContext() @@ -75,6 +78,8 @@ IpsContext::~IpsContext() if ( p ) delete p; + assert(!offload); + sfeventq_free(equeue); fp_clear_context(*this); diff --git a/src/detection/ips_context.h b/src/detection/ips_context.h index 9c1580a61..c55fd26fc 100644 --- a/src/detection/ips_context.h +++ b/src/detection/ips_context.h @@ -30,6 +30,8 @@ // integration into Snort. #include +#include + #include "main/snort_types.h" #include "framework/codec.h" @@ -70,12 +72,14 @@ public: Packet* encode_packet; DAQ_PktHdr_t* pkth; uint8_t* buf; + std::thread* offload; DataPointer file_data; class MpseStash* stash; struct OtnxMatchData* otnx; uint64_t pkt_count; + bool onload; struct SF_EVENTQ* equeue; diff --git a/src/flow/flow.h b/src/flow/flow.h index 6efd695d7..353f67bce 100644 --- a/src/flow/flow.h +++ b/src/flow/flow.h @@ -63,6 +63,8 @@ #define SSNFLAG_PROXIED 0x01000000 #define SSNFLAG_OFFLOAD 0x02000000 +#define SSNFLAG_WAS_OFF 0x04000000 // FIXIT-L debug only + #define SSNFLAG_NONE 0x00000000 /* nothing, an MT bag of chips */ #define SSNFLAG_SEEN_BOTH (SSNFLAG_SEEN_SERVER | SSNFLAG_SEEN_CLIENT) diff --git a/src/flow/flow_cache.cc b/src/flow/flow_cache.cc index 758e02c60..18dcfa362 100644 --- a/src/flow/flow_cache.cc +++ b/src/flow/flow_cache.cc @@ -24,6 +24,7 @@ #include "flow/flow_cache.h" +#include "detection/detection_engine.h" #include "flow/ha.h" #include "hash/zhash.h" #include "helpers/flag_context.h" @@ -148,6 +149,7 @@ Flow* FlowCache::get(const FlowKey* key) int FlowCache::release(Flow* flow, PruneReason reason, bool do_cleanup) { + DetectionEngine::onload(flow); flow->reset(do_cleanup); prune_stats.update(reason); return remove(flow); @@ -189,6 +191,9 @@ unsigned FlowCache::prune_stale(uint32_t thetime, const Flow* save_me) break; } #endif + if ( DetectionEngine::offloaded(flow) ) + break; + if ( flow->last_data_seen + config.pruning_timeout >= thetime ) break; @@ -244,7 +249,8 @@ unsigned FlowCache::prune_excess(const Flow* save_me) auto flow = static_cast(hash_table->first()); assert(flow); // holds true because hash_table->get_count() > 0 - if ( (save_me and flow == save_me) or flow->was_blocked() ) + if ( (save_me and flow == save_me) or flow->was_blocked() or + DetectionEngine::offloaded(flow) ) { // check for non-null save_me above to silence analyzer // "called C++ object pointer is null" here @@ -308,7 +314,8 @@ unsigned FlowCache::timeout(unsigned num_flows, time_t thetime) if ( flow->last_data_seen + config.nominal_timeout > thetime ) break; - if ( HighAvailabilityManager::in_standby(flow) ) + if ( HighAvailabilityManager::in_standby(flow) or + DetectionEngine::offloaded(flow) ) { flow = static_cast(hash_table->next()); continue; diff --git a/src/flow/flow_control.cc b/src/flow/flow_control.cc index fba3c91db..b19d07cd6 100644 --- a/src/flow/flow_control.cc +++ b/src/flow/flow_control.cc @@ -44,6 +44,8 @@ FlowControl::FlowControl() FlowControl::~FlowControl() { + DetectionEngine de; + delete ip_cache; delete icmp_cache; delete tcp_cache; @@ -432,6 +434,9 @@ unsigned FlowControl::process(Flow* flow, Packet* p) if ( p->proto_bits & PROTO_BIT__MPLS ) flow->set_mpls_layer_per_dir(p); + if ( p->type() == PktType::PDU ) // FIXIT-H cooked or PDU? + DetectionEngine::onload(flow); + switch ( flow->flow_state ) { case Flow::FlowState::SETUP: diff --git a/src/main/modules.cc b/src/main/modules.cc index 679596057..fa4bccf73 100644 --- a/src/main/modules.cc +++ b/src/main/modules.cc @@ -221,7 +221,7 @@ static const Parameter search_engine_params[] = { "inspect_stream_inserts", Parameter::PT_BOOL, nullptr, "false", "inspect reassembled payload - disabling is good for performance, bad for detection" }, - { "offload_limit", Parameter::PT_INT, nullptr, "99999", + { "offload_limit", Parameter::PT_INT, "0:", "99999", "minimum sizeof PDU to offload fast pattern search (defaults to disabled)" }, { "search_method", Parameter::PT_DYNAMIC, (void*)&get_search_methods, "ac_bnfa", diff --git a/src/main/snort.cc b/src/main/snort.cc index b545413f2..16da1131f 100644 --- a/src/main/snort.cc +++ b/src/main/snort.cc @@ -675,6 +675,7 @@ void Snort::thread_init_unprivileged() InitTag(); EventTrace_Init(); detection_filter_init(snort_conf->detection_filter_config); + DetectionEngine::thread_init(); EventManager::open_outputs(); IpsManager::setup_options(); @@ -695,6 +696,7 @@ void Snort::thread_term() if ( !snort_conf->dirty_pig ) Stream::purge_flows(); + DetectionEngine::idle(); InspectorManager::thread_stop(snort_conf); ModuleManager::accumulate(snort_conf); InspectorManager::thread_term(snort_conf); @@ -719,6 +721,7 @@ void Snort::thread_term() Profiler::consolidate_stats(); + DetectionEngine::thread_term(); detection_filter_term(); EventTrace_Term(); CleanupTag(); diff --git a/src/stream/tcp/tcp_reassembler.cc b/src/stream/tcp/tcp_reassembler.cc index afba7476d..cfb8190e1 100644 --- a/src/stream/tcp/tcp_reassembler.cc +++ b/src/stream/tcp/tcp_reassembler.cc @@ -596,6 +596,7 @@ void TcpReassembler::prep_s5_pkt(Flow* flow, Packet* p, uint32_t pkt_flags) int TcpReassembler::_flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags) { Profile profile(s5TcpFlushPerfStats); + DetectionEngine::onload(session->flow); s5_pkt = DetectionEngine::set_packet(); DAQ_PktHdr_t pkth; diff --git a/src/stream/tcp/tcp_session.cc b/src/stream/tcp/tcp_session.cc index 14b09d324..80f65f4ec 100644 --- a/src/stream/tcp/tcp_session.cc +++ b/src/stream/tcp/tcp_session.cc @@ -98,6 +98,7 @@ void TcpSession::restart(Packet* p) // sanity check since this is called externally assert(p->ptrs.tcph); + DetectionEngine::onload(flow); TcpStreamTracker* talker, * listener; if (p->is_from_server()) @@ -134,6 +135,8 @@ void TcpSession::restart(Packet* p) void TcpSession::clear_session(bool free_flow_data, bool flush_segments, bool restart, Packet* p) { + DetectionEngine::onload(flow); + if ( client->reassembler ) { if ( flush_segments )