From: Michael Altizer (mialtize) Date: Tue, 3 Sep 2019 13:54:50 +0000 (-0400) Subject: Merge pull request #1721 in SNORT/snort3 from ~MIALTIZE/snort3:retry to master X-Git-Tag: 3.0.0-261~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=269b61191444a6e5c204baf7a861df83f40b5ac3;p=thirdparty%2Fsnort3.git Merge pull request #1721 in SNORT/snort3 from ~MIALTIZE/snort3:retry to master Squashed commit of the following: commit 5ce3c7346368e240487ad6f7f89534a6c27fdc4d Author: Michael Altizer Date: Tue Aug 27 12:57:52 2019 -0400 managers: Make InspectorManager::thread_stop() a no-op if thread_init() was never called This can happen if the DAQ instance start attempt fails and the Analyzer thread is forced to terminate early. commit cadf42eac5fd50c78c471ea6a973b391f7813d7a Author: Michael Altizer Date: Fri Jul 26 15:53:57 2019 -0400 analyzer: Process retry queue and onloads when no DAQ messages are received Additionally, limit the retry queue such that messages will be dropped instead if queuing them would drop the available pool below the batch size (totally arbitrary) and add retry queue peg counts. Also, fix the detection packet count (and thus get_packet_number()) to include retried packets. This does remove the total_from_daq count, which should be reimplemented in the SFDAQ module peg counts. --- diff --git a/src/main/analyzer.cc b/src/main/analyzer.cc index 24ed7b9d2..115daec9c 100644 --- a/src/main/analyzer.cc +++ b/src/main/analyzer.cc @@ -203,11 +203,7 @@ static DAQ_Verdict distill_verdict(Packet* p) Active* act = p->active; // First Pass - if ( act->packet_retry_requested() ) - { - verdict = DAQ_VERDICT_RETRY; - } - else if ( act->session_was_blocked() ) + if ( act->session_was_blocked() ) { if ( !act->can_block() ) verdict = DAQ_VERDICT_PASS; @@ -228,9 +224,6 @@ static DAQ_Verdict distill_verdict(Packet* p) if ( verdict == DAQ_VERDICT_PASS ) verdict = DAQ_VERDICT_BLOCK; } - else if ( verdict == DAQ_VERDICT_RETRY ) - { - } else if ( p->packet_flags & PKT_RESIZED ) { // we never increase, only trim, but daq doesn't support resizing wire packet @@ -278,7 +271,15 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p) { ActionManager::execute(p); - DAQ_Verdict verdict = distill_verdict(p); + DAQ_Verdict verdict = MAX_DAQ_VERDICT; + + if (p->active->packet_retry_requested()) + { + retry_queue->put(p->daq_msg); + aux_counts.retries_queued++; + } + else if (!p->active->is_packet_held()) + verdict = distill_verdict(p); if (PacketTracer::is_active()) { @@ -286,7 +287,12 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p) get_network_policy()->user_policy_id, get_inspection_policy()->user_policy_id, get_ips_policy()->user_policy_id); - PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict)); + if (p->active->packet_retry_requested()) + PacketTracer::log("Verdict: Queuing for Retry\n"); + else if (p->active->is_packet_held()) + PacketTracer::log("Verdict: Holding for Detection\n"); + else + PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict)); PacketTracer::dump(p); } @@ -294,10 +300,7 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p) p->pkth = nullptr; // no longer avail upon sig segv - if (verdict == DAQ_VERDICT_RETRY) - retry_queue->put(p->daq_msg); - - else if ( !p->active->is_packet_held() ) + if (verdict != MAX_DAQ_VERDICT) { // Publish an event if something has indicated that it wants the // finalize event on this flow. @@ -318,18 +321,17 @@ void Analyzer::process_daq_pkt_msg(DAQ_Msg_h msg, bool retry) const DAQ_PktHdr_t* pkthdr = daq_msg_get_pkthdr(msg); set_default_policy(); + pc.analyzed_pkts++; + if (!retry) - { - pc.total_from_daq++; packet_time_update(&pkthdr->ts); - } DetectionEngine::wait_for_context(); switcher->start(); Packet* p = switcher->get_context()->packet; oops_handler->set_current_packet(p); p->context->wire_packet = p; - p->context->packet_number = pc.total_from_daq; + p->context->packet_number = get_packet_number(); DetectionEngine::reset(); @@ -378,7 +380,10 @@ void Analyzer::process_retry_queue() packet_gettimeofday(&now); DAQ_Msg_h msg; while ((msg = retry_queue->get(&now)) != nullptr) + { process_daq_msg(msg, true); + aux_counts.retries_processed++; + } } } @@ -463,12 +468,24 @@ const char* Analyzer::get_state_string() void Analyzer::idle() { // FIXIT-L this whole thing could be pub-sub - DataBus::publish(THREAD_IDLE_EVENT, nullptr); - if (SnortConfig::read_mode()) - Stream::timeout_flows(packet_time()); - else - Stream::timeout_flows(time(nullptr)); aux_counts.idle++; + + // This should only be called if the DAQ timeout elapsed, so increment the packet time + // by the DAQ timeout. + struct timeval now, increment; + unsigned int timeout = SnortConfig::get_conf()->daq_config->timeout; + packet_gettimeofday(&now); + increment = { timeout / 1000, (timeout % 1000) * 1000 }; + timeradd(&now, &increment, &now); + packet_time_update(&now); + + DataBus::publish(THREAD_IDLE_EVENT, nullptr); + + // Service the retry queue with the new packet time. + process_retry_queue(); + + Stream::timeout_flows(packet_time()); + HighAvailabilityManager::process_receive(); } @@ -536,6 +553,14 @@ void Analyzer::term() if ( !sc->dirty_pig ) Stream::purge_flows(); + DAQ_Msg_h msg; + while ((msg = retry_queue->get()) != nullptr) + { + aux_counts.retries_discarded++; + Profile profile(daqPerfStats); + daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK); + } + DetectionEngine::idle(); InspectorManager::thread_stop(sc); ModuleManager::accumulate(sc); @@ -550,16 +575,7 @@ void Analyzer::term() oops_handler->set_current_packet(nullptr); - if ( daq_instance->was_started() ) - { - DAQ_Msg_h msg; - while ((msg = retry_queue->get()) != nullptr) - { - Profile profile(daqPerfStats); - daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK); - } - daq_instance->stop(); - } + daq_instance->stop(); SFDAQ::set_local_instance(nullptr); PacketLatency::tterm(); @@ -725,6 +741,10 @@ DAQ_RecvStatus Analyzer::process_messages() rstat = daq_instance->receive_messages(max_recv); } + // Preemptively service available onloads to potentially unblock processing the first message. + // This conveniently handles servicing offloads in the no messages received case as well. + DetectionEngine::onload(); + unsigned num_recv = 0; DAQ_Msg_h msg; while ((msg = daq_instance->next_message()) != nullptr) @@ -738,9 +758,9 @@ DAQ_RecvStatus Analyzer::process_messages() daq_instance->finalize_message(msg, DAQ_VERDICT_PASS); continue; } - // FIXIT-M add fail open capability - // IMPORTANT: process_daq_msg() is responsible for finalizing the messages. + // FIXIT-M reimplement fail-open capability? num_recv++; + // IMPORTANT: process_daq_msg() is responsible for finalizing the messages. process_daq_msg(msg, false); DetectionEngine::onload(); process_retry_queue(); diff --git a/src/managers/inspector_manager.cc b/src/managers/inspector_manager.cc index 55e87d74b..34b141dc9 100644 --- a/src/managers/inspector_manager.cc +++ b/src/managers/inspector_manager.cc @@ -721,6 +721,10 @@ void InspectorManager::thread_reinit(SnortConfig* sc) void InspectorManager::thread_stop(SnortConfig*) { + // If thread_init() was never called, we have nothing to do. + if ( !s_tl_handlers ) + return; + // pin->tterm() only called for default policy set_default_policy(); InspectionPolicy* pi = snort::get_inspection_policy(); @@ -745,10 +749,7 @@ void InspectorManager::thread_stop(SnortConfig*) void InspectorManager::thread_term(SnortConfig*) { - // FIXIT-L this check required if startup failed in - // Analyzer::analyze before thread_init is called eg - // Can't start DAQ (-1) - socket: Operation not permitted - // ideally thread_term not called w/o calling thread_init + // If thread_init() was never called, we have nothing to do. if ( !s_tl_handlers ) return; diff --git a/src/packet_io/active.cc b/src/packet_io/active.cc index f1821afa7..f52d9835b 100644 --- a/src/packet_io/active.cc +++ b/src/packet_io/active.cc @@ -34,6 +34,7 @@ #include "utils/dnet_header.h" #include "sfdaq.h" +#include "sfdaq_instance.h" using namespace snort; @@ -420,23 +421,28 @@ void Active::daq_drop_packet(const Packet* p) daq_update_status(p); } -bool Active::daq_retry_packet(const Packet* p) +bool Active::retry_packet(const Packet* p) { - bool retry_queued = false; + if (active_action != ACT_PASS || !SFDAQ::forwarding_packet(p->pkth)) + return false; - if ( !p->is_rebuilt() && (active_action == ACT_PASS) ) + // FIXIT-L semi-arbitrary heuristic for preventing retry queue saturation - reevaluate later + if (p->daq_instance->get_pool_available() < p->daq_instance->get_batch_size()) { - if ( SFDAQ::forwarding_packet(p->pkth) ) - { - if (p->packet_flags & PKT_RETRANSMIT) - active_action = ACT_DROP; // Don't add retransmits to retry queue. - else - active_action = ACT_RETRY; - retry_queued = true; - } + // Fall back on dropping the packet and relying on the host to retransmit + active_action = ACT_DROP; + aux_counts.retries_dropped++; + return false; } - return retry_queued; + // If a retransmit would be added to the retry queue, drop it instead. + // FIXIT-L this behavior needs to be reevaluated and probably moved somewhere else + if (p->packet_flags & PKT_RETRANSMIT) + active_action = ACT_DROP; + else + active_action = ACT_RETRY; + + return true; } bool Active::hold_packet(const Packet*) @@ -522,7 +528,7 @@ void Active::apply_delayed_action(Packet* p) reset_session(p, force); break; case ACT_RETRY: - if(!daq_retry_packet(p)) + if(!retry_packet(p)) drop_packet(p, force); break; default: diff --git a/src/packet_io/active.h b/src/packet_io/active.h index 097924fe1..3f3d1615a 100644 --- a/src/packet_io/active.h +++ b/src/packet_io/active.h @@ -69,23 +69,23 @@ public: bool is_reset_candidate(const Packet*); bool is_unreachable_candidate(const Packet*); - ActiveAction get_action() + ActiveAction get_action() const { return active_action; } - ActiveStatus get_status() + ActiveStatus get_status() const { return active_status; } void kill_session(Packet*, EncodeFlags = ENC_FLAG_FWD); - bool can_block() + bool can_block() const { return active_status == AST_ALLOW or active_status == AST_FORCE; } - const char* get_action_string() + const char* get_action_string() const { return act_str[active_action][active_status]; } void drop_packet(const Packet*, bool force = false); void daq_drop_packet(const Packet*); - bool daq_retry_packet(const Packet*); + bool retry_packet(const Packet*); bool hold_packet(const Packet*); void allow_session(Packet*); @@ -98,22 +98,22 @@ public: void reset_again() { active_action = ACT_RESET; } - bool packet_was_dropped() + bool packet_was_dropped() const { return active_action >= ACT_DROP; } - bool packet_would_be_dropped() + bool packet_would_be_dropped() const { return active_status == AST_WOULD; } - bool packet_retry_requested() + bool packet_retry_requested() const { return active_action == ACT_RETRY; } - bool session_was_blocked() + bool session_was_blocked() const { return active_action >= ACT_BLOCK; } - bool packet_force_dropped() + bool packet_force_dropped() const { return active_status == AST_FORCE; } - bool is_packet_held() + bool is_packet_held() const { return active_action == ACT_HOLD; } void set_tunnel_bypass() @@ -122,7 +122,7 @@ public: void clear_tunnel_bypass() { active_tunnel_bypass--; } - bool get_tunnel_bypass() + bool get_tunnel_bypass() const { return active_tunnel_bypass > 0; } void set_delayed_action(ActiveAction, bool force = false); diff --git a/src/packet_io/sfdaq_instance.cc b/src/packet_io/sfdaq_instance.cc index 9f889b088..49870169c 100644 --- a/src/packet_io/sfdaq_instance.cc +++ b/src/packet_io/sfdaq_instance.cc @@ -106,16 +106,7 @@ void SFDAQInstance::reload() } } -void SFDAQInstance::abort() -{ - if (was_started()) - stop(); - - //DAQ_Delete(); - //DAQ_Term(); FIXIT-L this must be called from main thread on abort -} - -const char* SFDAQInstance::get_input_spec() +const char* SFDAQInstance::get_input_spec() const { return input_spec.c_str(); } @@ -125,32 +116,32 @@ const char* SFDAQInstance::get_input_spec() // the datalink type in the file must be used - which may not be known until // start. The value is cached here since it used for packet operations like // logging and is needed at shutdown. This avoids sequencing issues. -int SFDAQInstance::get_base_protocol() +int SFDAQInstance::get_base_protocol() const { return dlt; } -bool SFDAQInstance::can_inject() +bool SFDAQInstance::can_inject() const { return (daq_instance_get_capabilities(instance) & DAQ_CAPA_INJECT) != 0; } -bool SFDAQInstance::can_inject_raw() +bool SFDAQInstance::can_inject_raw() const { return (daq_instance_get_capabilities(instance) & DAQ_CAPA_INJECT_RAW) != 0; } -bool SFDAQInstance::can_replace() +bool SFDAQInstance::can_replace() const { return (daq_instance_get_capabilities(instance) & DAQ_CAPA_REPLACE) != 0; } -bool SFDAQInstance::can_start_unprivileged() +bool SFDAQInstance::can_start_unprivileged() const { return (daq_instance_get_capabilities(instance) & DAQ_CAPA_UNPRIV_START) != 0; } -bool SFDAQInstance::can_whitelist() +bool SFDAQInstance::can_whitelist() const { return (daq_instance_get_capabilities(instance) & DAQ_CAPA_WHITELIST) != 0; } @@ -241,22 +232,22 @@ bool SFDAQInstance::get_tunnel_bypass(uint8_t proto) return (daq_tunnel_mask & proto) != 0; } -bool SFDAQInstance::was_started() +bool SFDAQInstance::was_started() const { - DAQ_State s; - if (!instance) return false; - s = daq_instance_check_status(instance); - - return (DAQ_STATE_STARTED == s); + DAQ_State s = daq_instance_check_status(instance); + return (s == DAQ_STATE_STARTED); } bool SFDAQInstance::stop() { assert(pool_size == pool_available); + if (!was_started()) + return true; + int rval = daq_instance_stop(instance); if (rval != DAQ_SUCCESS) diff --git a/src/packet_io/sfdaq_instance.h b/src/packet_io/sfdaq_instance.h index bc256d889..b8cc80978 100644 --- a/src/packet_io/sfdaq_instance.h +++ b/src/packet_io/sfdaq_instance.h @@ -45,10 +45,9 @@ public: bool init(DAQ_Config_h, const std::string& bpf_string); bool start(); - bool was_started(); + bool was_started() const; bool stop(); void reload(); - void abort(); DAQ_RecvStatus receive_messages(unsigned max_recv); DAQ_Msg_h next_message() @@ -60,16 +59,17 @@ public: int finalize_message(DAQ_Msg_h msg, DAQ_Verdict verdict); const char* get_error(); - int get_base_protocol(); - uint32_t get_batch_size() { return batch_size; } - const char* get_input_spec(); + int get_base_protocol() const; + uint32_t get_batch_size() const { return batch_size; } + uint32_t get_pool_available() const { return pool_available; } + const char* get_input_spec() const; const DAQ_Stats_t* get_stats(); - bool can_inject(); - bool can_inject_raw(); - bool can_replace(); - bool can_start_unprivileged(); - SO_PUBLIC bool can_whitelist(); + bool can_inject() const; + bool can_inject_raw() const; + bool can_replace() const; + bool can_start_unprivileged() const; + SO_PUBLIC bool can_whitelist() const; int inject(DAQ_Msg_h, int rev, const uint8_t* buf, uint32_t len); bool interrupt(); diff --git a/src/packet_io/sfdaq_module.cc b/src/packet_io/sfdaq_module.cc index c15cf9c20..975edb7e9 100644 --- a/src/packet_io/sfdaq_module.cc +++ b/src/packet_io/sfdaq_module.cc @@ -186,6 +186,10 @@ struct DAQStats PegCount skipped; PegCount idle; PegCount rx_bytes; + PegCount retries_queued; + PegCount retries_dropped; + PegCount retries_processed; + PegCount retries_discarded; }; const PegInfo daq_names[] = @@ -213,6 +217,10 @@ const PegInfo daq_names[] = { CountType::SUM, "skipped", "packets skipped at startup" }, { CountType::SUM, "idle", "attempts to acquire from DAQ without available packets" }, { CountType::SUM, "rx_bytes", "total bytes received" }, + { CountType::SUM, "retries_queued", "messages queued for retry" }, + { CountType::SUM, "retries_dropped", "messages dropped when overrunning the retry queue" }, + { CountType::SUM, "retries_processed", "messages processed from the retry queue" }, + { CountType::SUM, "retries_discarded", "messages discarded when purging the retry queue" }, { CountType::END, nullptr, nullptr } }; @@ -283,6 +291,10 @@ void SFDAQModule::prep_counts() stats.skipped = aux_counts.skipped; stats.idle = aux_counts.idle; stats.rx_bytes = aux_counts.rx_bytes; + stats.retries_queued = aux_counts.retries_queued; + stats.retries_dropped = aux_counts.retries_dropped; + stats.retries_processed = aux_counts.retries_processed; + stats.retries_discarded = aux_counts.retries_discarded; memset(&aux_counts, 0, sizeof(AuxCount)); diff --git a/src/utils/stats.cc b/src/utils/stats.cc index 533470663..5f8b0ca1d 100644 --- a/src/utils/stats.cc +++ b/src/utils/stats.cc @@ -51,9 +51,6 @@ namespace snort THREAD_LOCAL PacketCount pc; -PegCount get_packet_number() -{ return pc.total_from_daq; } - //------------------------------------------------------------------------- static inline void LogSeparator(FILE* fh = stdout) @@ -171,7 +168,7 @@ static void timing_stats(uint64_t& num_pkts, uint64_t& usecs) const PegInfo pc_names[] = { - { CountType::NOW, "analyzed", "packets sent to detection" }, + { CountType::NOW, "analyzed", "total packets processed" }, { CountType::SUM, "hard_evals", "non-fast pattern rule evaluations" }, { CountType::SUM, "raw_searches", "fast pattern searches in raw packet data" }, { CountType::SUM, "cooked_searches", "fast pattern searches in cooked packet data" }, diff --git a/src/utils/stats.h b/src/utils/stats.h index 9ab6bfb03..6d7895a93 100644 --- a/src/utils/stats.h +++ b/src/utils/stats.h @@ -34,7 +34,7 @@ using IndexVec = std::vector; // FIXIT-L split this out into appropriate modules struct PacketCount { - PegCount total_from_daq; + PegCount analyzed_pkts; PegCount hard_evals; PegCount raw_searches; PegCount cooked_searches; @@ -85,6 +85,10 @@ struct AuxCount PegCount idle; PegCount rx_bytes; PegCount skipped; + PegCount retries_queued; + PegCount retries_dropped; + PegCount retries_processed; + PegCount retries_discarded; }; extern ProcessCount proc_stats; @@ -98,7 +102,7 @@ namespace snort { extern SO_PUBLIC THREAD_LOCAL PacketCount pc; -SO_PUBLIC PegCount get_packet_number(); +SO_PUBLIC inline PegCount get_packet_number() { return pc.analyzed_pkts; } SO_PUBLIC void LogLabel(const char*, FILE* = stdout); SO_PUBLIC void LogValue(const char*, const char*, FILE* = stdout);