#include "managers/ips_manager.h"
#include "managers/event_manager.h"
#include "managers/module_manager.h"
+#include "memory/memory_cap.h"
#include "packet_io/active.h"
#include "packet_io/sfdaq.h"
#include "packet_io/sfdaq_config.h"
return verdict;
}
+void Analyzer::add_to_retry_queue(DAQ_Msg_h daq_msg)
+{
+ // Temporarily increase memcap until message is finalized in case
+ // DAQ makes a copy of the data buffer.
+ memory::MemoryCap::update_allocations(daq_msg_get_data_len(daq_msg));
+ retry_queue->put(daq_msg);
+}
+
/*
* Private message processing methods
*/
void Analyzer::post_process_daq_pkt_msg(Packet* p)
{
+ bool msg_was_held = false;
+
Active::execute(p);
DAQ_Verdict verdict = MAX_DAQ_VERDICT;
if (p->active->packet_retry_requested())
{
- retry_queue->put(p->daq_msg);
+ add_to_retry_queue(p->daq_msg);
daq_stats.retries_queued++;
}
- else if (p->active->is_packet_held() and Stream::set_packet_action_to_hold(p))
+ else
{
- if (p->flow->flags.trigger_detained_packet_event)
- DataBus::publish(DETAINED_PACKET_EVENT, p);
+ msg_was_held = (p->active->is_packet_held() and Stream::set_packet_action_to_hold(p));
+ if (msg_was_held)
+ {
+ if (p->flow->flags.trigger_detained_packet_event)
+ {
+ DataBus::publish(DETAINED_PACKET_EVENT, p);
+ }
+ }
+ else
+ verdict = distill_verdict(p);
}
- else
- verdict = distill_verdict(p);
if (PacketTracer::is_active())
{
if (p->active->packet_retry_requested())
PacketTracer::log("Verdict: Queuing for Retry\n");
- else if (p->active->is_packet_held())
+ else if (msg_was_held)
PacketTracer::log("Verdict: Holding for Detection\n");
else
PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
{
process_daq_msg(msg, true);
daq_stats.retries_processed++;
+
+ // Decrease memcap now that msg has been finalized.
+ memory::MemoryCap::update_deallocations(daq_msg_get_data_len(msg));
}
}
}
bool process_rebuilt_packet(snort::Packet*, const DAQ_PktHdr_t*, const uint8_t* pkt, uint32_t pktlen);
bool inspect_rebuilt(snort::Packet*);
void finalize_daq_message(DAQ_Msg_h, DAQ_Verdict);
+ void add_to_retry_queue(DAQ_Msg_h);
// Functions called by analyzer commands
void start();
#include "stubs.h"
#include "main/analyzer.h"
+#include "memory/memory_cap.h"
#include "packet_io/sfdaq_instance.h"
+#include "packet_io/sfdaq.h"
#include <CppUTest/CommandLineTestRunner.h>
#include <CppUTest/TestHarness.h>
deferred_trust = on ? TRUST_DEFER_ON : TRUST_DEFER_OFF;
}
void Flow::trust() { }
+
+SFDAQInstance* SFDAQ::get_local_instance() { return nullptr; }
}
+void memory::MemoryCap::update_allocations(size_t) { }
+void memory::MemoryCap::update_deallocations(size_t) { }
+
using namespace snort;
//--------------------------------------------------------------------------
return false;
// FIXIT-L semi-arbitrary heuristic for preventing retry queue saturation - reevaluate later
- if (!p->daq_instance || p->daq_instance->get_pool_available() < p->daq_instance->get_batch_size())
+ SFDAQInstance* daq_instance = p->daq_instance ? p->daq_instance : SFDAQ::get_local_instance();
+ if (!daq_instance || daq_instance->get_pool_available() < daq_instance->get_batch_size())
{
// Fall back on dropping the packet and relying on the host to retransmit
active_action = ACT_DROP;
#include "dce_smb2_session.h"
-inline uint32_t Smb2Tid(const Smb2Hdr* hdr)
+uint32_t Smb2Tid(const Smb2Hdr* hdr)
{
return snort::alignedNtohl(&(((const Smb2SyncHdr*)hdr)->tree_id));
}
tcp_session->set_no_ack(on_off);
}
+void Stream::partial_flush(Flow* flow, bool to_server)
+{
+ if ( flow->pkt_type == PktType::TCP )
+ {
+ if ( to_server )
+ ((TcpStreamSession*)flow->session)->server.perform_partial_flush();
+ else
+ ((TcpStreamSession*)flow->session)->client.perform_partial_flush();
+ }
+}
+
#ifdef UNIT_TEST
#include "catch/snort_catch.h"
static bool set_packet_action_to_hold(Packet*);
static void set_no_ack_mode(Flow*, bool);
+ static void partial_flush(Flow*, bool to_server);
private:
static void set_ip_protocol(Flow*);
{ CountType::SUM, "held_packets_passed", "number of held packets passed" },
{ CountType::SUM, "held_packet_timeouts", "number of held packets that timed out" },
{ CountType::SUM, "held_packet_purges", "number of held packets that were purged without flushing" },
+ { CountType::SUM, "held_packet_retries", "number of held packets that were added to the retry queue" },
{ CountType::NOW, "cur_packets_held", "number of packets currently held" },
{ CountType::MAX, "max_packets_held", "maximum number of packets held simultaneously" },
{ CountType::SUM, "partial_flushes", "number of partial flushes initiated" },
PegCount held_packets_passed;
PegCount held_packet_timeouts;
PegCount held_packet_purges;
+ PegCount held_packet_retries;
PegCount current_packets_held;
PegCount max_packets_held;
PegCount partial_flushes;
#include "tcp_stream_tracker.h"
+#include <daq.h>
+
#include "log/messages.h"
#include "main/analyzer.h"
#include "main/snort.h"
+#include "memory/memory_cap.h"
#include "packet_io/active.h"
#include "profiler/profiler_defs.h"
#include "protocols/eth.h"
if ( held_packet != null_iterator )
return false;
+ // Temporarily increase memcap until message is finalized in case
+ // DAQ makes a copy of the data buffer.
+ memory::MemoryCap::update_allocations(daq_msg_get_data_len(p->daq_msg));
+
held_packet = hpq->append(p->daq_msg, p->ptrs.tcph->seq(), *this);
tcpStats.total_packets_held++;
{
if ( held_packet != null_iterator )
{
+ DAQ_Msg_h msg = held_packet->get_daq_msg();
+ uint32_t msglen = daq_msg_get_data_len(msg);
+
if ( cp->active->packet_was_dropped() )
{
DAQ_Verdict verdict = held_packet->has_expired() ? DAQ_VERDICT_BLACKLIST : DAQ_VERDICT_BLOCK;
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), verdict);
+ Analyzer::get_local_analyzer()->finalize_daq_message(msg, verdict);
tcpStats.held_packets_dropped++;
}
else
{
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), DAQ_VERDICT_PASS);
- tcpStats.held_packets_passed++;
+ if ( cp->active->packet_retry_requested() )
+ {
+ tcpStats.held_packet_retries++;
+ Analyzer::get_local_analyzer()->add_to_retry_queue(msg);
+ }
+ else
+ {
+ Analyzer::get_local_analyzer()->finalize_daq_message(msg, DAQ_VERDICT_PASS);
+ tcpStats.held_packets_passed++;
+ }
}
+ memory::MemoryCap::update_deallocations(msglen);
+
hpq->erase(held_packet);
held_packet = null_iterator;
tcpStats.current_packets_held--;
{
if ( held_packet != null_iterator )
{
+ DAQ_Msg_h msg = held_packet->get_daq_msg();
+ uint32_t msglen = daq_msg_get_data_len(msg);
+
if ( (flow->session_state & STREAM_STATE_BLOCK_PENDING) ||
(flow->ssn_state.session_flags & SSNFLAG_BLOCK) )
{
DAQ_Verdict verdict = held_packet->has_expired() ? DAQ_VERDICT_BLACKLIST : DAQ_VERDICT_BLOCK;
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), verdict);
+ Analyzer::get_local_analyzer()->finalize_daq_message(msg, verdict);
tcpStats.held_packets_dropped++;
}
else
{
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), DAQ_VERDICT_PASS);
+ Analyzer::get_local_analyzer()->finalize_daq_message(msg, DAQ_VERDICT_PASS);
tcpStats.held_packets_passed++;
}
+ memory::MemoryCap::update_deallocations(msglen);
+
hpq->erase(held_packet);
held_packet = null_iterator;
tcpStats.current_packets_held--;