Active* act = p->active;
// First Pass
- if ( act->packet_retry_requested() )
- {
- verdict = DAQ_VERDICT_RETRY;
- }
- else if ( act->session_was_blocked() )
+ if ( act->session_was_blocked() )
{
if ( !act->can_block() )
verdict = DAQ_VERDICT_PASS;
if ( verdict == DAQ_VERDICT_PASS )
verdict = DAQ_VERDICT_BLOCK;
}
- else if ( verdict == DAQ_VERDICT_RETRY )
- {
- }
else if ( p->packet_flags & PKT_RESIZED )
{
// we never increase, only trim, but daq doesn't support resizing wire packet
{
ActionManager::execute(p);
- DAQ_Verdict verdict = distill_verdict(p);
+ DAQ_Verdict verdict = MAX_DAQ_VERDICT;
+
+ if (p->active->packet_retry_requested())
+ {
+ retry_queue->put(p->daq_msg);
+ aux_counts.retries_queued++;
+ }
+ else if (!p->active->is_packet_held())
+ verdict = distill_verdict(p);
if (PacketTracer::is_active())
{
get_network_policy()->user_policy_id, get_inspection_policy()->user_policy_id,
get_ips_policy()->user_policy_id);
- PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
+ if (p->active->packet_retry_requested())
+ PacketTracer::log("Verdict: Queuing for Retry\n");
+ else if (p->active->is_packet_held())
+ PacketTracer::log("Verdict: Holding for Detection\n");
+ else
+ PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
PacketTracer::dump(p);
}
p->pkth = nullptr; // no longer avail upon sig segv
- if (verdict == DAQ_VERDICT_RETRY)
- retry_queue->put(p->daq_msg);
-
- else if ( !p->active->is_packet_held() )
+ if (verdict != MAX_DAQ_VERDICT)
{
// Publish an event if something has indicated that it wants the
// finalize event on this flow.
const DAQ_PktHdr_t* pkthdr = daq_msg_get_pkthdr(msg);
set_default_policy();
+ pc.analyzed_pkts++;
+
if (!retry)
- {
- pc.total_from_daq++;
packet_time_update(&pkthdr->ts);
- }
DetectionEngine::wait_for_context();
switcher->start();
Packet* p = switcher->get_context()->packet;
oops_handler->set_current_packet(p);
p->context->wire_packet = p;
- p->context->packet_number = pc.total_from_daq;
+ p->context->packet_number = get_packet_number();
DetectionEngine::reset();
packet_gettimeofday(&now);
DAQ_Msg_h msg;
while ((msg = retry_queue->get(&now)) != nullptr)
+ {
process_daq_msg(msg, true);
+ aux_counts.retries_processed++;
+ }
}
}
void Analyzer::idle()
{
// FIXIT-L this whole thing could be pub-sub
- DataBus::publish(THREAD_IDLE_EVENT, nullptr);
- if (SnortConfig::read_mode())
- Stream::timeout_flows(packet_time());
- else
- Stream::timeout_flows(time(nullptr));
aux_counts.idle++;
+
+ // This should only be called if the DAQ timeout elapsed, so increment the packet time
+ // by the DAQ timeout.
+ struct timeval now, increment;
+ unsigned int timeout = SnortConfig::get_conf()->daq_config->timeout;
+ packet_gettimeofday(&now);
+ increment = { timeout / 1000, (timeout % 1000) * 1000 };
+ timeradd(&now, &increment, &now);
+ packet_time_update(&now);
+
+ DataBus::publish(THREAD_IDLE_EVENT, nullptr);
+
+ // Service the retry queue with the new packet time.
+ process_retry_queue();
+
+ Stream::timeout_flows(packet_time());
+
HighAvailabilityManager::process_receive();
}
if ( !sc->dirty_pig )
Stream::purge_flows();
+ DAQ_Msg_h msg;
+ while ((msg = retry_queue->get()) != nullptr)
+ {
+ aux_counts.retries_discarded++;
+ Profile profile(daqPerfStats);
+ daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK);
+ }
+
DetectionEngine::idle();
InspectorManager::thread_stop(sc);
ModuleManager::accumulate(sc);
oops_handler->set_current_packet(nullptr);
- if ( daq_instance->was_started() )
- {
- DAQ_Msg_h msg;
- while ((msg = retry_queue->get()) != nullptr)
- {
- Profile profile(daqPerfStats);
- daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK);
- }
- daq_instance->stop();
- }
+ daq_instance->stop();
SFDAQ::set_local_instance(nullptr);
PacketLatency::tterm();
rstat = daq_instance->receive_messages(max_recv);
}
+ // Preemptively service available onloads to potentially unblock processing the first message.
+ // This conveniently handles servicing offloads in the no messages received case as well.
+ DetectionEngine::onload();
+
unsigned num_recv = 0;
DAQ_Msg_h msg;
while ((msg = daq_instance->next_message()) != nullptr)
daq_instance->finalize_message(msg, DAQ_VERDICT_PASS);
continue;
}
- // FIXIT-M add fail open capability
- // IMPORTANT: process_daq_msg() is responsible for finalizing the messages.
+ // FIXIT-M reimplement fail-open capability?
num_recv++;
+ // IMPORTANT: process_daq_msg() is responsible for finalizing the messages.
process_daq_msg(msg, false);
DetectionEngine::onload();
process_retry_queue();
void InspectorManager::thread_stop(SnortConfig*)
{
+ // If thread_init() was never called, we have nothing to do.
+ if ( !s_tl_handlers )
+ return;
+
// pin->tterm() only called for default policy
set_default_policy();
InspectionPolicy* pi = snort::get_inspection_policy();
void InspectorManager::thread_term(SnortConfig*)
{
- // FIXIT-L this check required if startup failed in
- // Analyzer::analyze before thread_init is called eg
- // Can't start DAQ (-1) - socket: Operation not permitted
- // ideally thread_term not called w/o calling thread_init
+ // If thread_init() was never called, we have nothing to do.
if ( !s_tl_handlers )
return;
#include "utils/dnet_header.h"
#include "sfdaq.h"
+#include "sfdaq_instance.h"
using namespace snort;
daq_update_status(p);
}
-bool Active::daq_retry_packet(const Packet* p)
+bool Active::retry_packet(const Packet* p)
{
- bool retry_queued = false;
+ if (active_action != ACT_PASS || !SFDAQ::forwarding_packet(p->pkth))
+ return false;
- if ( !p->is_rebuilt() && (active_action == ACT_PASS) )
+ // FIXIT-L semi-arbitrary heuristic for preventing retry queue saturation - reevaluate later
+ if (p->daq_instance->get_pool_available() < p->daq_instance->get_batch_size())
{
- if ( SFDAQ::forwarding_packet(p->pkth) )
- {
- if (p->packet_flags & PKT_RETRANSMIT)
- active_action = ACT_DROP; // Don't add retransmits to retry queue.
- else
- active_action = ACT_RETRY;
- retry_queued = true;
- }
+ // Fall back on dropping the packet and relying on the host to retransmit
+ active_action = ACT_DROP;
+ aux_counts.retries_dropped++;
+ return false;
}
- return retry_queued;
+ // If a retransmit would be added to the retry queue, drop it instead.
+ // FIXIT-L this behavior needs to be reevaluated and probably moved somewhere else
+ if (p->packet_flags & PKT_RETRANSMIT)
+ active_action = ACT_DROP;
+ else
+ active_action = ACT_RETRY;
+
+ return true;
}
bool Active::hold_packet(const Packet*)
reset_session(p, force);
break;
case ACT_RETRY:
- if(!daq_retry_packet(p))
+ if(!retry_packet(p))
drop_packet(p, force);
break;
default:
bool is_reset_candidate(const Packet*);
bool is_unreachable_candidate(const Packet*);
- ActiveAction get_action()
+ ActiveAction get_action() const
{ return active_action; }
- ActiveStatus get_status()
+ ActiveStatus get_status() const
{ return active_status; }
void kill_session(Packet*, EncodeFlags = ENC_FLAG_FWD);
- bool can_block()
+ bool can_block() const
{ return active_status == AST_ALLOW or active_status == AST_FORCE; }
- const char* get_action_string()
+ const char* get_action_string() const
{ return act_str[active_action][active_status]; }
void drop_packet(const Packet*, bool force = false);
void daq_drop_packet(const Packet*);
- bool daq_retry_packet(const Packet*);
+ bool retry_packet(const Packet*);
bool hold_packet(const Packet*);
void allow_session(Packet*);
void reset_again()
{ active_action = ACT_RESET; }
- bool packet_was_dropped()
+ bool packet_was_dropped() const
{ return active_action >= ACT_DROP; }
- bool packet_would_be_dropped()
+ bool packet_would_be_dropped() const
{ return active_status == AST_WOULD; }
- bool packet_retry_requested()
+ bool packet_retry_requested() const
{ return active_action == ACT_RETRY; }
- bool session_was_blocked()
+ bool session_was_blocked() const
{ return active_action >= ACT_BLOCK; }
- bool packet_force_dropped()
+ bool packet_force_dropped() const
{ return active_status == AST_FORCE; }
- bool is_packet_held()
+ bool is_packet_held() const
{ return active_action == ACT_HOLD; }
void set_tunnel_bypass()
void clear_tunnel_bypass()
{ active_tunnel_bypass--; }
- bool get_tunnel_bypass()
+ bool get_tunnel_bypass() const
{ return active_tunnel_bypass > 0; }
void set_delayed_action(ActiveAction, bool force = false);
}
}
-void SFDAQInstance::abort()
-{
- if (was_started())
- stop();
-
- //DAQ_Delete();
- //DAQ_Term(); FIXIT-L this must be called from main thread on abort
-}
-
-const char* SFDAQInstance::get_input_spec()
+const char* SFDAQInstance::get_input_spec() const
{
return input_spec.c_str();
}
// the datalink type in the file must be used - which may not be known until
// start. The value is cached here since it used for packet operations like
// logging and is needed at shutdown. This avoids sequencing issues.
-int SFDAQInstance::get_base_protocol()
+int SFDAQInstance::get_base_protocol() const
{
return dlt;
}
-bool SFDAQInstance::can_inject()
+bool SFDAQInstance::can_inject() const
{
return (daq_instance_get_capabilities(instance) & DAQ_CAPA_INJECT) != 0;
}
-bool SFDAQInstance::can_inject_raw()
+bool SFDAQInstance::can_inject_raw() const
{
return (daq_instance_get_capabilities(instance) & DAQ_CAPA_INJECT_RAW) != 0;
}
-bool SFDAQInstance::can_replace()
+bool SFDAQInstance::can_replace() const
{
return (daq_instance_get_capabilities(instance) & DAQ_CAPA_REPLACE) != 0;
}
-bool SFDAQInstance::can_start_unprivileged()
+bool SFDAQInstance::can_start_unprivileged() const
{
return (daq_instance_get_capabilities(instance) & DAQ_CAPA_UNPRIV_START) != 0;
}
-bool SFDAQInstance::can_whitelist()
+bool SFDAQInstance::can_whitelist() const
{
return (daq_instance_get_capabilities(instance) & DAQ_CAPA_WHITELIST) != 0;
}
return (daq_tunnel_mask & proto) != 0;
}
-bool SFDAQInstance::was_started()
+bool SFDAQInstance::was_started() const
{
- DAQ_State s;
-
if (!instance)
return false;
- s = daq_instance_check_status(instance);
-
- return (DAQ_STATE_STARTED == s);
+ DAQ_State s = daq_instance_check_status(instance);
+ return (s == DAQ_STATE_STARTED);
}
bool SFDAQInstance::stop()
{
assert(pool_size == pool_available);
+ if (!was_started())
+ return true;
+
int rval = daq_instance_stop(instance);
if (rval != DAQ_SUCCESS)
bool init(DAQ_Config_h, const std::string& bpf_string);
bool start();
- bool was_started();
+ bool was_started() const;
bool stop();
void reload();
- void abort();
DAQ_RecvStatus receive_messages(unsigned max_recv);
DAQ_Msg_h next_message()
int finalize_message(DAQ_Msg_h msg, DAQ_Verdict verdict);
const char* get_error();
- int get_base_protocol();
- uint32_t get_batch_size() { return batch_size; }
- const char* get_input_spec();
+ int get_base_protocol() const;
+ uint32_t get_batch_size() const { return batch_size; }
+ uint32_t get_pool_available() const { return pool_available; }
+ const char* get_input_spec() const;
const DAQ_Stats_t* get_stats();
- bool can_inject();
- bool can_inject_raw();
- bool can_replace();
- bool can_start_unprivileged();
- SO_PUBLIC bool can_whitelist();
+ bool can_inject() const;
+ bool can_inject_raw() const;
+ bool can_replace() const;
+ bool can_start_unprivileged() const;
+ SO_PUBLIC bool can_whitelist() const;
int inject(DAQ_Msg_h, int rev, const uint8_t* buf, uint32_t len);
bool interrupt();
PegCount skipped;
PegCount idle;
PegCount rx_bytes;
+ PegCount retries_queued;
+ PegCount retries_dropped;
+ PegCount retries_processed;
+ PegCount retries_discarded;
};
const PegInfo daq_names[] =
{ CountType::SUM, "skipped", "packets skipped at startup" },
{ CountType::SUM, "idle", "attempts to acquire from DAQ without available packets" },
{ CountType::SUM, "rx_bytes", "total bytes received" },
+ { CountType::SUM, "retries_queued", "messages queued for retry" },
+ { CountType::SUM, "retries_dropped", "messages dropped when overrunning the retry queue" },
+ { CountType::SUM, "retries_processed", "messages processed from the retry queue" },
+ { CountType::SUM, "retries_discarded", "messages discarded when purging the retry queue" },
{ CountType::END, nullptr, nullptr }
};
stats.skipped = aux_counts.skipped;
stats.idle = aux_counts.idle;
stats.rx_bytes = aux_counts.rx_bytes;
+ stats.retries_queued = aux_counts.retries_queued;
+ stats.retries_dropped = aux_counts.retries_dropped;
+ stats.retries_processed = aux_counts.retries_processed;
+ stats.retries_discarded = aux_counts.retries_discarded;
memset(&aux_counts, 0, sizeof(AuxCount));
THREAD_LOCAL PacketCount pc;
-PegCount get_packet_number()
-{ return pc.total_from_daq; }
-
//-------------------------------------------------------------------------
static inline void LogSeparator(FILE* fh = stdout)
const PegInfo pc_names[] =
{
- { CountType::NOW, "analyzed", "packets sent to detection" },
+ { CountType::NOW, "analyzed", "total packets processed" },
{ CountType::SUM, "hard_evals", "non-fast pattern rule evaluations" },
{ CountType::SUM, "raw_searches", "fast pattern searches in raw packet data" },
{ CountType::SUM, "cooked_searches", "fast pattern searches in cooked packet data" },
// FIXIT-L split this out into appropriate modules
struct PacketCount
{
- PegCount total_from_daq;
+ PegCount analyzed_pkts;
PegCount hard_evals;
PegCount raw_searches;
PegCount cooked_searches;
PegCount idle;
PegCount rx_bytes;
PegCount skipped;
+ PegCount retries_queued;
+ PegCount retries_dropped;
+ PegCount retries_processed;
+ PegCount retries_discarded;
};
extern ProcessCount proc_stats;
{
extern SO_PUBLIC THREAD_LOCAL PacketCount pc;
-SO_PUBLIC PegCount get_packet_number();
+SO_PUBLIC inline PegCount get_packet_number() { return pc.analyzed_pkts; }
SO_PUBLIC void LogLabel(const char*, FILE* = stdout);
SO_PUBLIC void LogValue(const char*, const char*, FILE* = stdout);