]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Merge pull request #1721 in SNORT/snort3 from ~MIALTIZE/snort3:retry to master
authorMichael Altizer (mialtize) <mialtize@cisco.com>
Tue, 3 Sep 2019 13:54:50 +0000 (09:54 -0400)
committerMichael Altizer (mialtize) <mialtize@cisco.com>
Tue, 3 Sep 2019 13:54:50 +0000 (09:54 -0400)
Squashed commit of the following:

commit 5ce3c7346368e240487ad6f7f89534a6c27fdc4d
Author: Michael Altizer <mialtize@cisco.com>
Date:   Tue Aug 27 12:57:52 2019 -0400

    managers: Make InspectorManager::thread_stop() a no-op if thread_init() was never called

    This can happen if the DAQ instance start attempt fails and the Analyzer
    thread is forced to terminate early.

commit cadf42eac5fd50c78c471ea6a973b391f7813d7a
Author: Michael Altizer <mialtize@cisco.com>
Date:   Fri Jul 26 15:53:57 2019 -0400

    analyzer: Process retry queue and onloads when no DAQ messages are received

    Additionally, limit the retry queue such that messages will be dropped
    instead if queuing them would drop the available pool below the batch
    size (totally arbitrary) and add retry queue peg counts.

    Also, fix the detection packet count (and thus get_packet_number()) to
    include retried packets.  This does remove the total_from_daq count, which
    should be reimplemented in the SFDAQ module peg counts.

src/main/analyzer.cc
src/managers/inspector_manager.cc
src/packet_io/active.cc
src/packet_io/active.h
src/packet_io/sfdaq_instance.cc
src/packet_io/sfdaq_instance.h
src/packet_io/sfdaq_module.cc
src/utils/stats.cc
src/utils/stats.h

index 24ed7b9d2df2884bb62d2b866d6035976f540d15..115daec9c2a5f4f80be830678924788f7591f238 100644 (file)
@@ -203,11 +203,7 @@ static DAQ_Verdict distill_verdict(Packet* p)
     Active* act = p->active;
 
     // First Pass
-    if ( act->packet_retry_requested() )
-    {
-        verdict = DAQ_VERDICT_RETRY;
-    }
-    else if ( act->session_was_blocked() )
+    if ( act->session_was_blocked() )
     {
         if ( !act->can_block() )
             verdict = DAQ_VERDICT_PASS;
@@ -228,9 +224,6 @@ static DAQ_Verdict distill_verdict(Packet* p)
         if ( verdict == DAQ_VERDICT_PASS )
             verdict = DAQ_VERDICT_BLOCK;
     }
-    else if ( verdict == DAQ_VERDICT_RETRY )
-    {
-    }
     else if ( p->packet_flags & PKT_RESIZED )
     {
         // we never increase, only trim, but daq doesn't support resizing wire packet
@@ -278,7 +271,15 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p)
 {
     ActionManager::execute(p);
 
-    DAQ_Verdict verdict = distill_verdict(p);
+    DAQ_Verdict verdict = MAX_DAQ_VERDICT;
+
+    if (p->active->packet_retry_requested())
+    {
+        retry_queue->put(p->daq_msg);
+        aux_counts.retries_queued++;
+    }
+    else if (!p->active->is_packet_held())
+        verdict = distill_verdict(p);
 
     if (PacketTracer::is_active())
     {
@@ -286,7 +287,12 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p)
             get_network_policy()->user_policy_id, get_inspection_policy()->user_policy_id,
             get_ips_policy()->user_policy_id);
 
-        PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
+        if (p->active->packet_retry_requested())
+            PacketTracer::log("Verdict: Queuing for Retry\n");
+        else if (p->active->is_packet_held())
+            PacketTracer::log("Verdict: Holding for Detection\n");
+        else
+            PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
         PacketTracer::dump(p);
     }
 
@@ -294,10 +300,7 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p)
 
     p->pkth = nullptr;  // no longer avail upon sig segv
 
-    if (verdict == DAQ_VERDICT_RETRY)
-        retry_queue->put(p->daq_msg);
-
-    else if ( !p->active->is_packet_held() )
+    if (verdict != MAX_DAQ_VERDICT)
     {
         // Publish an event if something has indicated that it wants the
         // finalize event on this flow.
@@ -318,18 +321,17 @@ void Analyzer::process_daq_pkt_msg(DAQ_Msg_h msg, bool retry)
     const DAQ_PktHdr_t* pkthdr = daq_msg_get_pkthdr(msg);
     set_default_policy();
 
+    pc.analyzed_pkts++;
+
     if (!retry)
-    {
-        pc.total_from_daq++;
         packet_time_update(&pkthdr->ts);
-    }
 
     DetectionEngine::wait_for_context();
     switcher->start();
     Packet* p = switcher->get_context()->packet;
     oops_handler->set_current_packet(p);
     p->context->wire_packet = p;
-    p->context->packet_number = pc.total_from_daq;
+    p->context->packet_number = get_packet_number();
 
     DetectionEngine::reset();
 
@@ -378,7 +380,10 @@ void Analyzer::process_retry_queue()
         packet_gettimeofday(&now);
         DAQ_Msg_h msg;
         while ((msg = retry_queue->get(&now)) != nullptr)
+        {
             process_daq_msg(msg, true);
+            aux_counts.retries_processed++;
+        }
     }
 }
 
@@ -463,12 +468,24 @@ const char* Analyzer::get_state_string()
 void Analyzer::idle()
 {
     // FIXIT-L this whole thing could be pub-sub
-    DataBus::publish(THREAD_IDLE_EVENT, nullptr);
-    if (SnortConfig::read_mode())
-        Stream::timeout_flows(packet_time());
-    else
-        Stream::timeout_flows(time(nullptr));
     aux_counts.idle++;
+
+    // This should only be called if the DAQ timeout elapsed, so increment the packet time
+    // by the DAQ timeout.
+    struct timeval now, increment;
+    unsigned int timeout = SnortConfig::get_conf()->daq_config->timeout;
+    packet_gettimeofday(&now);
+    increment = { timeout / 1000, (timeout % 1000) * 1000 };
+    timeradd(&now, &increment, &now);
+    packet_time_update(&now);
+
+    DataBus::publish(THREAD_IDLE_EVENT, nullptr);
+
+    // Service the retry queue with the new packet time.
+    process_retry_queue();
+
+    Stream::timeout_flows(packet_time());
+
     HighAvailabilityManager::process_receive();
 }
 
@@ -536,6 +553,14 @@ void Analyzer::term()
     if ( !sc->dirty_pig )
         Stream::purge_flows();
 
+    DAQ_Msg_h msg;
+    while ((msg = retry_queue->get()) != nullptr)
+    {
+        aux_counts.retries_discarded++;
+        Profile profile(daqPerfStats);
+        daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK);
+    }
+
     DetectionEngine::idle();
     InspectorManager::thread_stop(sc);
     ModuleManager::accumulate(sc);
@@ -550,16 +575,7 @@ void Analyzer::term()
 
     oops_handler->set_current_packet(nullptr);
 
-    if ( daq_instance->was_started() )
-    {
-        DAQ_Msg_h msg;
-        while ((msg = retry_queue->get()) != nullptr)
-        {
-            Profile profile(daqPerfStats);
-            daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK);
-        }
-        daq_instance->stop();
-    }
+    daq_instance->stop();
     SFDAQ::set_local_instance(nullptr);
 
     PacketLatency::tterm();
@@ -725,6 +741,10 @@ DAQ_RecvStatus Analyzer::process_messages()
         rstat = daq_instance->receive_messages(max_recv);
     }
 
+    // Preemptively service available onloads to potentially unblock processing the first message.
+    // This conveniently handles servicing offloads in the no messages received case as well.
+    DetectionEngine::onload();
+
     unsigned num_recv = 0;
     DAQ_Msg_h msg;
     while ((msg = daq_instance->next_message()) != nullptr)
@@ -738,9 +758,9 @@ DAQ_RecvStatus Analyzer::process_messages()
             daq_instance->finalize_message(msg, DAQ_VERDICT_PASS);
             continue;
         }
-        // FIXIT-M add fail open capability
-        // IMPORTANT: process_daq_msg() is responsible for finalizing the messages.
+        // FIXIT-M reimplement fail-open capability?
         num_recv++;
+        // IMPORTANT: process_daq_msg() is responsible for finalizing the messages.
         process_daq_msg(msg, false);
         DetectionEngine::onload();
         process_retry_queue();
index 55e87d74bd33ba0f9f33e6e202bd6b2d0771b277..34b141dc92acb753d5788b81613e07a17bb6deb8 100644 (file)
@@ -721,6 +721,10 @@ void InspectorManager::thread_reinit(SnortConfig* sc)
 
 void InspectorManager::thread_stop(SnortConfig*)
 {
+    // If thread_init() was never called, we have nothing to do.
+    if ( !s_tl_handlers )
+        return;
+
     // pin->tterm() only called for default policy
     set_default_policy();
     InspectionPolicy* pi = snort::get_inspection_policy();
@@ -745,10 +749,7 @@ void InspectorManager::thread_stop(SnortConfig*)
 
 void InspectorManager::thread_term(SnortConfig*)
 {
-    // FIXIT-L this check required if startup failed in
-    // Analyzer::analyze before thread_init is called eg
-    // Can't start DAQ (-1) - socket: Operation not permitted
-    // ideally thread_term not called w/o calling thread_init
+    // If thread_init() was never called, we have nothing to do.
     if ( !s_tl_handlers )
         return;
 
index f1821afa736e25f5ac36aa794c2f77028867ab35..f52d9835b37ed4bbe886c2290a34a150055b8f8e 100644 (file)
@@ -34,6 +34,7 @@
 #include "utils/dnet_header.h"
 
 #include "sfdaq.h"
+#include "sfdaq_instance.h"
 
 using namespace snort;
 
@@ -420,23 +421,28 @@ void Active::daq_drop_packet(const Packet* p)
     daq_update_status(p);
 }
 
-bool Active::daq_retry_packet(const Packet* p)
+bool Active::retry_packet(const Packet* p)
 {
-    bool retry_queued = false;
+    if (active_action != ACT_PASS || !SFDAQ::forwarding_packet(p->pkth))
+        return false;
 
-    if ( !p->is_rebuilt() && (active_action == ACT_PASS) )
+    // FIXIT-L semi-arbitrary heuristic for preventing retry queue saturation - reevaluate later
+    if (p->daq_instance->get_pool_available() < p->daq_instance->get_batch_size())
     {
-        if ( SFDAQ::forwarding_packet(p->pkth) )
-        {
-            if (p->packet_flags & PKT_RETRANSMIT)
-                active_action = ACT_DROP;  // Don't add retransmits to retry queue.
-            else
-                active_action = ACT_RETRY;
-            retry_queued = true;
-        }
+        // Fall back on dropping the packet and relying on the host to retransmit
+        active_action = ACT_DROP;
+        aux_counts.retries_dropped++;
+        return false;
     }
 
-    return retry_queued;
+    // If a retransmit would be added to the retry queue, drop it instead.
+    // FIXIT-L this behavior needs to be reevaluated and probably moved somewhere else
+    if (p->packet_flags & PKT_RETRANSMIT)
+        active_action = ACT_DROP;
+    else
+        active_action = ACT_RETRY;
+
+    return true;
 }
 
 bool Active::hold_packet(const Packet*)
@@ -522,7 +528,7 @@ void Active::apply_delayed_action(Packet* p)
         reset_session(p, force);
         break;
     case ACT_RETRY:
-        if(!daq_retry_packet(p))
+        if(!retry_packet(p))
             drop_packet(p, force);
         break;
     default:
index 097924fe146156bb2fc3a29078b882c2d2cdf917..3f3d1615a848c8ce164e4a8c0768f47b0a578fbe 100644 (file)
@@ -69,23 +69,23 @@ public:
     bool is_reset_candidate(const Packet*);
     bool is_unreachable_candidate(const Packet*);
 
-    ActiveAction get_action()
+    ActiveAction get_action() const
     { return active_action; }
 
-    ActiveStatus get_status()
+    ActiveStatus get_status() const
     { return active_status; }
 
     void kill_session(Packet*, EncodeFlags = ENC_FLAG_FWD);
 
-    bool can_block()
+    bool can_block() const
     { return active_status == AST_ALLOW or active_status == AST_FORCE; }
 
-    const char* get_action_string()
+    const char* get_action_string() const
     { return act_str[active_action][active_status]; }
 
     void drop_packet(const Packet*, bool force = false);
     void daq_drop_packet(const Packet*);
-    bool daq_retry_packet(const Packet*);
+    bool retry_packet(const Packet*);
     bool hold_packet(const Packet*);
 
     void allow_session(Packet*);
@@ -98,22 +98,22 @@ public:
     void reset_again()
     { active_action = ACT_RESET; }
 
-    bool packet_was_dropped()
+    bool packet_was_dropped() const
     { return active_action >= ACT_DROP; }
 
-    bool packet_would_be_dropped()
+    bool packet_would_be_dropped() const
     { return active_status == AST_WOULD; }
 
-    bool packet_retry_requested()
+    bool packet_retry_requested() const
     { return active_action == ACT_RETRY; }
 
-    bool session_was_blocked()
+    bool session_was_blocked() const
     { return active_action >= ACT_BLOCK; }
 
-    bool packet_force_dropped()
+    bool packet_force_dropped() const
     { return active_status == AST_FORCE; }
 
-    bool is_packet_held()
+    bool is_packet_held() const
     { return active_action == ACT_HOLD; }
 
     void set_tunnel_bypass()
@@ -122,7 +122,7 @@ public:
     void clear_tunnel_bypass()
     { active_tunnel_bypass--; }
 
-    bool get_tunnel_bypass()
+    bool get_tunnel_bypass() const
     { return active_tunnel_bypass > 0; }
 
     void set_delayed_action(ActiveAction, bool force = false);
index 9f889b0889f9f99014fd7ec9412eb2642166e226..49870169cb95120859c8a1f1538c5ba6d11b982d 100644 (file)
@@ -106,16 +106,7 @@ void SFDAQInstance::reload()
     }
 }
 
-void SFDAQInstance::abort()
-{
-    if (was_started())
-        stop();
-
-    //DAQ_Delete();
-    //DAQ_Term();  FIXIT-L this must be called from main thread on abort
-}
-
-const char* SFDAQInstance::get_input_spec()
+const char* SFDAQInstance::get_input_spec() const
 {
     return input_spec.c_str();
 }
@@ -125,32 +116,32 @@ const char* SFDAQInstance::get_input_spec()
 // the datalink type in the file must be used - which may not be known until
 // start.  The value is cached here since it used for packet operations like
 // logging and is needed at shutdown.  This avoids sequencing issues.
-int SFDAQInstance::get_base_protocol()
+int SFDAQInstance::get_base_protocol() const
 {
     return dlt;
 }
 
-bool SFDAQInstance::can_inject()
+bool SFDAQInstance::can_inject() const
 {
     return (daq_instance_get_capabilities(instance) & DAQ_CAPA_INJECT) != 0;
 }
 
-bool SFDAQInstance::can_inject_raw()
+bool SFDAQInstance::can_inject_raw() const
 {
     return (daq_instance_get_capabilities(instance) & DAQ_CAPA_INJECT_RAW) != 0;
 }
 
-bool SFDAQInstance::can_replace()
+bool SFDAQInstance::can_replace() const
 {
     return (daq_instance_get_capabilities(instance) & DAQ_CAPA_REPLACE) != 0;
 }
 
-bool SFDAQInstance::can_start_unprivileged()
+bool SFDAQInstance::can_start_unprivileged() const
 {
     return (daq_instance_get_capabilities(instance) & DAQ_CAPA_UNPRIV_START) != 0;
 }
 
-bool SFDAQInstance::can_whitelist()
+bool SFDAQInstance::can_whitelist() const
 {
     return (daq_instance_get_capabilities(instance) & DAQ_CAPA_WHITELIST) != 0;
 }
@@ -241,22 +232,22 @@ bool SFDAQInstance::get_tunnel_bypass(uint8_t proto)
     return (daq_tunnel_mask & proto) != 0;
 }
 
-bool SFDAQInstance::was_started()
+bool SFDAQInstance::was_started() const
 {
-    DAQ_State s;
-
     if (!instance)
         return false;
 
-    s = daq_instance_check_status(instance);
-
-    return (DAQ_STATE_STARTED == s);
+    DAQ_State s = daq_instance_check_status(instance);
+    return (s == DAQ_STATE_STARTED);
 }
 
 bool SFDAQInstance::stop()
 {
     assert(pool_size == pool_available);
 
+    if (!was_started())
+        return true;
+
     int rval = daq_instance_stop(instance);
 
     if (rval != DAQ_SUCCESS)
index bc256d88986a59c955a9f121c4065979c9376ec6..b8cc809788345611e63b139674ac30bacb552291 100644 (file)
@@ -45,10 +45,9 @@ public:
     bool init(DAQ_Config_h, const std::string& bpf_string);
 
     bool start();
-    bool was_started();
+    bool was_started() const;
     bool stop();
     void reload();
-    void abort();
 
     DAQ_RecvStatus receive_messages(unsigned max_recv);
     DAQ_Msg_h next_message()
@@ -60,16 +59,17 @@ public:
     int finalize_message(DAQ_Msg_h msg, DAQ_Verdict verdict);
     const char* get_error();
 
-    int get_base_protocol();
-    uint32_t get_batch_size() { return batch_size; }
-    const char* get_input_spec();
+    int get_base_protocol() const;
+    uint32_t get_batch_size() const { return batch_size; }
+    uint32_t get_pool_available() const { return pool_available; }
+    const char* get_input_spec() const;
     const DAQ_Stats_t* get_stats();
 
-    bool can_inject();
-    bool can_inject_raw();
-    bool can_replace();
-    bool can_start_unprivileged();
-    SO_PUBLIC bool can_whitelist();
+    bool can_inject() const;
+    bool can_inject_raw() const;
+    bool can_replace() const;
+    bool can_start_unprivileged() const;
+    SO_PUBLIC bool can_whitelist() const;
 
     int inject(DAQ_Msg_h, int rev, const uint8_t* buf, uint32_t len);
     bool interrupt();
index c15cf9c206d54c9441a554eaae755b8fbe5a6242..975edb7e90f42e7cdee2d4f8662d0356b6e3abf8 100644 (file)
@@ -186,6 +186,10 @@ struct DAQStats
     PegCount skipped;
     PegCount idle;
     PegCount rx_bytes;
+    PegCount retries_queued;
+    PegCount retries_dropped;
+    PegCount retries_processed;
+    PegCount retries_discarded;
 };
 
 const PegInfo daq_names[] =
@@ -213,6 +217,10 @@ const PegInfo daq_names[] =
     { CountType::SUM, "skipped", "packets skipped at startup" },
     { CountType::SUM, "idle", "attempts to acquire from DAQ without available packets" },
     { CountType::SUM, "rx_bytes", "total bytes received" },
+    { CountType::SUM, "retries_queued", "messages queued for retry" },
+    { CountType::SUM, "retries_dropped", "messages dropped when overrunning the retry queue" },
+    { CountType::SUM, "retries_processed", "messages processed from the retry queue" },
+    { CountType::SUM, "retries_discarded", "messages discarded when purging the retry queue" },
     { CountType::END, nullptr, nullptr }
 };
 
@@ -283,6 +291,10 @@ void SFDAQModule::prep_counts()
     stats.skipped = aux_counts.skipped;
     stats.idle = aux_counts.idle;
     stats.rx_bytes = aux_counts.rx_bytes;
+    stats.retries_queued = aux_counts.retries_queued;
+    stats.retries_dropped = aux_counts.retries_dropped;
+    stats.retries_processed = aux_counts.retries_processed;
+    stats.retries_discarded = aux_counts.retries_discarded;
 
     memset(&aux_counts, 0, sizeof(AuxCount));
 
index 5334706633221e5b4ae95166b680a35b47e32669..5f8b0ca1dff4102ed2e2939ee1d14859de2d5d75 100644 (file)
@@ -51,9 +51,6 @@ namespace snort
 
 THREAD_LOCAL PacketCount pc;
 
-PegCount get_packet_number()
-{ return pc.total_from_daq; }
-
 //-------------------------------------------------------------------------
 
 static inline void LogSeparator(FILE* fh = stdout)
@@ -171,7 +168,7 @@ static void timing_stats(uint64_t& num_pkts, uint64_t& usecs)
 
 const PegInfo pc_names[] =
 {
-    { CountType::NOW, "analyzed", "packets sent to detection" },
+    { CountType::NOW, "analyzed", "total packets processed" },
     { CountType::SUM, "hard_evals", "non-fast pattern rule evaluations" },
     { CountType::SUM, "raw_searches", "fast pattern searches in raw packet data" },
     { CountType::SUM, "cooked_searches", "fast pattern searches in cooked packet data" },
index 9ab6bfb0386699fc3c51c534e8584a7f0962ec84..6d7895a9360755ec3bdfa9a1b7142e29277bc759 100644 (file)
@@ -34,7 +34,7 @@ using IndexVec = std::vector<unsigned>;
 // FIXIT-L split this out into appropriate modules
 struct PacketCount
 {
-    PegCount total_from_daq;
+    PegCount analyzed_pkts;
     PegCount hard_evals;
     PegCount raw_searches;
     PegCount cooked_searches;
@@ -85,6 +85,10 @@ struct AuxCount
     PegCount idle;
     PegCount rx_bytes;
     PegCount skipped;
+    PegCount retries_queued;
+    PegCount retries_dropped;
+    PegCount retries_processed;
+    PegCount retries_discarded;
 };
 
 extern ProcessCount proc_stats;
@@ -98,7 +102,7 @@ namespace snort
 {
 extern SO_PUBLIC THREAD_LOCAL PacketCount pc;
 
-SO_PUBLIC PegCount get_packet_number();
+SO_PUBLIC inline PegCount get_packet_number() { return pc.analyzed_pkts; }
 
 SO_PUBLIC void LogLabel(const char*, FILE* = stdout);
 SO_PUBLIC void LogValue(const char*, const char*, FILE* = stdout);