}
oops_handler->set_current_packet(nullptr);
- Stream::timeout_flows(packet_time());
+ Stream::handle_timeouts(false);
HighAvailabilityManager::process_receive();
}
// Service the retry queue with the new packet time.
process_retry_queue();
- Stream::timeout_flows(packet_time());
+ Stream::handle_timeouts(true);
HighAvailabilityManager::process_receive();
#include "tcp/tcp_session.h"
#include "tcp/tcp_stream_session.h"
+#include "tcp/tcp_stream_tracker.h"
using namespace snort;
flow_con->purge_flows();
}
-void Stream::timeout_flows(time_t cur_time)
+void Stream::handle_timeouts(bool idle)
{
+ timeval cur_time;
+ packet_gettimeofday(&cur_time);
+
// FIXIT-M batch here or loop vs looping over idle?
if ( flow_con )
- flow_con->timeout_flows(cur_time);
+ flow_con->timeout_flows(cur_time.tv_sec);
+
+ int max_remove = idle ? -1 : 1; // -1 = all eligible
+ TcpStreamTracker::release_held_packets(cur_time, max_remove);
}
void Stream::prune_flows()
// for shutdown only
static void purge_flows();
- static void timeout_flows(time_t cur_time);
+ static void handle_timeouts(bool idle);
static void prune_flows();
static bool expected_flow(Flow*, Packet*);
endif()
add_library( stream_tcp OBJECT
+ held_packet_queue.cc
+ held_packet_queue.h
ips_stream_reassemble.cc
ips_stream_size.cc
- segment_overlap_editor.h
segment_overlap_editor.cc
+ segment_overlap_editor.h
stream_tcp.cc
stream_tcp.h
tcp_defs.h
- tcp_event_logger.h
tcp_event_logger.cc
+ tcp_event_logger.h
tcp_ha.cc
tcp_ha.h
tcp_module.cc
tcp_state_fin_wait2.cc
tcp_state_fin_wait2.h
tcp_state_handler.cc
- tcp_state_handler.h
+ tcp_state_handler.h
tcp_state_last_ack.cc
tcp_state_last_ack.h
tcp_state_listen.cc
tcp_state_syn_sent.cc
tcp_state_syn_sent.h
tcp_state_time_wait.cc
- tcp_state_time_wait.h
+ tcp_state_time_wait.h
tcp_stream_config.cc
tcp_stream_config.h
tcp_stream_session.cc
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2020-2020 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+//
+// held_packet_queue.cc author Silviu Minut <sminut@cisco.com>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "held_packet_queue.h"
+
+#include "time/packet_time.h"
+
+#include "tcp_module.h"
+#include "tcp_stream_tracker.h"
+
+using namespace snort;
+
+HeldPacket::HeldPacket(DAQ_Msg_h msg, uint32_t seq, const timeval& exp, TcpStreamTracker& trk)
+ : daq_msg(msg), seq_num(seq), expiration(exp), tracker(trk)
+{
+}
+
+HeldPacketQueue::iter_t HeldPacketQueue::append(DAQ_Msg_h msg, uint32_t seq,
+ TcpStreamTracker& trk)
+{
+ timeval now, expiration;
+ packet_gettimeofday(&now);
+ timeradd(&now, &timeout, &expiration);
+
+ q.emplace_back(msg, seq, expiration, trk);
+ return --q.end();
+}
+
+void HeldPacketQueue::erase(iter_t it)
+{
+ q.erase(it);
+}
+
+void HeldPacketQueue::execute(const timeval& cur_time, int max_remove)
+{
+ while ( !q.empty() && (max_remove < 0 || max_remove--) )
+ {
+ auto held_packet = q.begin();
+ if ( held_packet->has_expired(cur_time) )
+ {
+ assert(held_packet == held_packet->get_tracker().held_packet);
+ held_packet->get_tracker().perform_partial_flush();
+ tcpStats.held_packet_timeouts++;
+ }
+ else
+ break;
+ }
+}
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2020-2020 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+//
+// held_packet_queue.h author Silviu Minut <sminut@cisco.com>
+
+#ifndef HELD_PACKET_QUEUE_H
+#define HELD_PACKET_QUEUE_H
+
+#include <daq_common.h>
+
+#include <ctime>
+#include <list>
+
+class TcpStreamTracker;
+
+class HeldPacket
+{
+public:
+
+ HeldPacket(DAQ_Msg_h msg, uint32_t seq, const timeval& timeout, TcpStreamTracker& trk);
+
+ bool has_expired(const timeval& cur_time)
+ {
+ return !timercmp(&cur_time, &expiration, <);
+ }
+
+ TcpStreamTracker& get_tracker() const { return tracker; }
+ DAQ_Msg_h get_daq_msg() const { return daq_msg; }
+ uint32_t get_seq_num() const { return seq_num; }
+
+private:
+ DAQ_Msg_h daq_msg;
+ uint32_t seq_num;
+ timeval expiration;
+ TcpStreamTracker& tracker;
+};
+
+class HeldPacketQueue
+{
+public:
+
+ using list_t = std::list<HeldPacket>;
+ using iter_t = list_t::iterator;
+
+ iter_t append(DAQ_Msg_h msg, uint32_t seq, TcpStreamTracker& trk);
+ void erase(iter_t it);
+ void execute(const timeval& cur_time, int max_remove);
+
+ void set_timeout(uint32_t ms)
+ {
+ timeout.tv_sec = ms / 1000;
+ timeout.tv_usec = static_cast<suseconds_t>((ms % 1000) * 1000);
+ }
+ bool empty() const { return q.empty(); }
+
+private:
+ timeval timeout = {1, 0};
+ list_t q;
+};
+
+#endif
+
#include "tcp_session.h"
#include "tcp_reassemblers.h"
#include "tcp_state_machine.h"
+#include "tcp_stream_tracker.h"
using namespace snort;
void StreamTcp::tinit()
{
+ TcpStreamTracker::set_held_packet_timeout(config->held_packet_timeout);
TcpHAManager::tinit();
TcpSession::sinit();
}
static void tcp_dtor(Inspector* p)
{ delete p; }
+static void stream_tcp_tinit()
+{
+ TcpStreamTracker::thread_init();
+}
+
+static void stream_tcp_tterm()
+{
+ TcpStreamTracker::thread_term();
+}
+
static void stream_tcp_pinit()
{
TcpStateMachine::initialize();
nullptr, // service
stream_tcp_pinit, // pinit
stream_tcp_pterm, // pterm
- nullptr, // tinit,
- nullptr, // tterm,
+ stream_tcp_tinit, // tinit,
+ stream_tcp_tterm, // tterm,
tcp_ctor,
tcp_dtor,
tcp_ssn,
{ CountType::SUM, "held_packet_rexmits", "number of retransmits of held packets" },
{ CountType::SUM, "held_packets_dropped", "number of held packets dropped" },
{ CountType::SUM, "held_packets_passed", "number of held packets passed" },
+ { CountType::SUM, "held_packet_timeouts", "number of held packets that timed out" },
{ CountType::NOW, "cur_packets_held", "number of packets currently held" },
{ CountType::MAX, "max_packets_held", "maximum number of packets held simultaneously" },
{ CountType::SUM, "partial_flushes", "number of partial flushes initiated" },
{ "track_only", Parameter::PT_BOOL, nullptr, "false",
"disable reassembly if true" },
+ { "held_packet_timeout", Parameter::PT_INT, "1:max32", "1000",
+ "timeout in milliseconds for held packets" },
+
{ nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
};
else
config->flags &= ~STREAM_CONFIG_NO_REASSEMBLY;
}
+ else if ( v.is("held_packet_timeout") )
+ {
+ config->held_packet_timeout = v.get_uint32();
+ }
else
return false;
PegCount held_packet_rexmits;
PegCount held_packets_dropped;
PegCount held_packets_passed;
+ PegCount held_packet_timeouts;
PegCount current_packets_held;
PegCount max_packets_held;
PegCount partial_flushes;
}
if ( trs.tracker->is_retransmit_of_held_packet(p) )
- {
- if ( trs.tracker->splitter->init_partial_flush(p->flow) )
- {
- flushed += flush_stream(trs, p, trs.packet_dir, false);
- paf_jump(&trs.tracker->paf_state, flushed);
- tcpStats.partial_flushes++;
- tcpStats.partial_flush_bytes += flushed;
- if ( trs.sos.seg_count )
- {
- purge_to_seq(trs, trs.sos.seglist.head->i_seq + flushed);
- trs.tracker->r_win_base = trs.sos.seglist_base_seq;
- }
- }
- }
+ flushed = perform_partial_flush(trs, p, flushed);
// FIXIT-H a drop rule will yoink the seglist out from under us
// because apply_delayed_action is only deferred to end of context
return rc;
}
+
+uint32_t TcpReassembler::perform_partial_flush(TcpReassemblerState& trs, Flow* flow)
+{
+ // Call this first, to create a context before creating a packet:
+ DetectionEngine::set_next_packet();
+ DetectionEngine de;
+
+ Packet* p = set_packet(flow, trs.packet_dir, trs.server_side);
+ return perform_partial_flush(trs, p);
+}
+
+// No error checking here, so the caller must ensure that p, p->flow and context
+// are not null.
+uint32_t TcpReassembler::perform_partial_flush(TcpReassemblerState& trs, Packet* p, uint32_t flushed)
+{
+ if ( trs.tracker->splitter->init_partial_flush(p->flow) )
+ {
+ flushed += flush_stream(trs, p, trs.packet_dir, false);
+ paf_jump(&trs.tracker->paf_state, flushed);
+ tcpStats.partial_flushes++;
+ tcpStats.partial_flush_bytes += flushed;
+ if ( trs.sos.seg_count )
+ {
+ purge_to_seq(trs, trs.sos.seglist.head->i_seq + flushed);
+ trs.tracker->r_win_base = trs.sos.seglist_base_seq;
+ }
+ }
+ return flushed;
+}
virtual void trace_segments(TcpReassemblerState&);
virtual void purge_alerts(TcpReassemblerState&);
+ uint32_t perform_partial_flush(TcpReassemblerState&, snort::Flow*);
+
protected:
TcpReassembler() = default;
bool next_no_gap(const TcpSegmentNode&);
void update_next(TcpReassemblerState&, const TcpSegmentNode&);
+
+ uint32_t perform_partial_flush(TcpReassemblerState&, snort::Packet*, uint32_t flushed = 0);
+
};
#endif
void reset_paf_segment()
{ trs.sos.seglist.cur_pseg = nullptr; }
+ uint32_t perform_partial_flush(snort::Flow* flow)
+ { return reassembler->perform_partial_flush(trs, flow); }
+
private:
TcpReassembler* reassembler = nullptr;
TcpReassemblerState trs;
int hs_timeout = -1;
bool no_ack;
+
+ uint32_t held_packet_timeout = 1000; // in milliseconds
};
#endif
#include "packet_io/active.h"
#include "profiler/profiler_defs.h"
#include "protocols/eth.h"
-#include "stream/stream.h"
-#include "stream/tcp/tcp_module.h"
-#include "stream/tcp/tcp_normalizers.h"
-#include "stream/tcp/tcp_reassemblers.h"
-#include "stream/tcp/segment_overlap_editor.h"
+
+#include "held_packet_queue.h"
+#include "segment_overlap_editor.h"
+#include "tcp_module.h"
+#include "tcp_normalizers.h"
+#include "tcp_reassemblers.h"
+#include "tcp_session.h"
using namespace snort;
+THREAD_LOCAL HeldPacketQueue* hpq = nullptr;
+
+static const HeldPacketQueue::iter_t null_iterator { };
+
const char* tcp_state_names[] =
{
"TCP_LISTEN", "TCP_SYN_SENT", "TCP_SYN_RECV",
};
TcpStreamTracker::TcpStreamTracker(bool client) :
- tcp_state(client ? TCP_STATE_NONE : TCP_LISTEN), client_tracker(client)
+ tcp_state(client ? TCP_STATE_NONE : TCP_LISTEN), client_tracker(client),
+ held_packet(null_iterator)
{ }
TcpStreamTracker::~TcpStreamTracker()
fin_seq_set = false;
rst_pkt_sent = false;
order = 0;
- held_packet = nullptr;
- held_seq_num = 0;
+ held_packet = null_iterator;
}
//-------------------------------------------------------------------------
bool TcpStreamTracker::set_held_packet(Packet* p)
{
- if ( held_packet )
+ if ( held_packet != null_iterator )
return false;
- held_packet = p->daq_msg;
- held_seq_num = p->ptrs.tcph->seq();
+ held_packet = hpq->append(p->daq_msg, p->ptrs.tcph->seq(), *this);
+
tcpStats.total_packets_held++;
if ( ++tcpStats.current_packets_held > tcpStats.max_packets_held )
tcpStats.max_packets_held = tcpStats.current_packets_held;
+
return true;
}
+uint32_t TcpStreamTracker::perform_partial_flush()
+{
+ uint32_t flushed = 0;
+ if ( held_packet != null_iterator )
+ flushed = reassembler.perform_partial_flush(session->flow);
+ return flushed;
+}
+
bool TcpStreamTracker::is_retransmit_of_held_packet(Packet* cp)
{
- if ( !held_packet or ( cp->daq_msg == held_packet ) )
+ if ( (held_packet == null_iterator) or ( cp->daq_msg == held_packet->get_daq_msg() ) )
return false;
uint32_t next_send_seq = cp->ptrs.tcph->seq() + (uint32_t)cp->dsize;
- if ( SEQ_LEQ(cp->ptrs.tcph->seq(), held_seq_num) and SEQ_GT(next_send_seq, held_seq_num) )
+ if ( SEQ_LEQ(cp->ptrs.tcph->seq(), held_packet->get_seq_num()) and SEQ_GT(next_send_seq, held_packet->get_seq_num()) )
{
tcpStats.held_packet_rexmits++;
return true;
void TcpStreamTracker::finalize_held_packet(Packet* cp)
{
- if ( held_packet )
+ if ( held_packet != null_iterator )
{
if ( cp->active->packet_was_dropped() )
{
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet, DAQ_VERDICT_BLOCK);
+ Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), DAQ_VERDICT_BLOCK);
tcpStats.held_packets_dropped++;
}
else
{
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet, DAQ_VERDICT_PASS);
+ Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), DAQ_VERDICT_PASS);
tcpStats.held_packets_passed++;
}
- held_packet = nullptr;
- held_seq_num = 0;
+ hpq->erase(held_packet);
+ held_packet = null_iterator;
tcpStats.current_packets_held--;
}
void TcpStreamTracker::finalize_held_packet(Flow* flow)
{
- if ( held_packet )
+ if ( held_packet != null_iterator )
{
if ( (flow->session_state & STREAM_STATE_BLOCK_PENDING) ||
(flow->ssn_state.session_flags & SSNFLAG_BLOCK) )
{
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet, DAQ_VERDICT_BLOCK);
+ Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), DAQ_VERDICT_BLOCK);
tcpStats.held_packets_dropped++;
}
else
{
- Analyzer::get_local_analyzer()->finalize_daq_message(held_packet, DAQ_VERDICT_PASS);
+ Analyzer::get_local_analyzer()->finalize_daq_message(held_packet->get_daq_msg(), DAQ_VERDICT_PASS);
tcpStats.held_packets_passed++;
}
- held_packet = nullptr;
- held_seq_num = 0;
+ hpq->erase(held_packet);
+ held_packet = null_iterator;
tcpStats.current_packets_held--;
}
}
+void TcpStreamTracker::release_held_packets(const timeval& cur_time, int max_remove)
+{
+ if ( hpq )
+ hpq->execute(cur_time, max_remove);
+}
+
+void TcpStreamTracker::set_held_packet_timeout(const uint32_t ms)
+{
+ assert(hpq);
+ hpq->set_timeout(ms);
+}
+
+void TcpStreamTracker::thread_init()
+{
+ assert(!hpq);
+ hpq = new HeldPacketQueue();
+}
+
+void TcpStreamTracker::thread_term()
+{
+ assert(hpq->empty());
+ delete hpq;
+ hpq = nullptr;
+}
#ifndef TCP_STREAM_TRACKER_H
#define TCP_STREAM_TRACKER_H
-#include <daq_common.h>
+#include <list>
#include "stream/paf.h"
+
#include "segment_overlap_editor.h"
#include "tcp_defs.h"
#include "tcp_normalizers.h"
struct Packet;
}
+class HeldPacket;
class TcpReassembler;
class TcpSession;
bool is_retransmit_of_held_packet(snort::Packet*);
void finalize_held_packet(snort::Packet*);
void finalize_held_packet(snort::Flow*);
+ uint32_t perform_partial_flush();
+
+ // max_remove < 0 means time out all eligible packets
+ static void release_held_packets(const timeval& cur_time, int max_remove);
+ static void set_held_packet_timeout(const uint32_t ms);
+ static void thread_init();
+ static void thread_term();
public:
uint32_t snd_una = 0; // SND.UNA - send unacknowledged
FinSeqNumStatus fin_seq_status = TcpStreamTracker::FIN_NOT_SEEN;
+ std::list<HeldPacket>::iterator held_packet;
+
protected:
// FIXIT-H reorganize per-flow structs to minimize padding
uint32_t ts_last_packet = 0;
uint8_t mac_addr[6] = { };
uint8_t tcp_options_len = 0;
- DAQ_Msg_h held_packet = nullptr;
- uint32_t held_seq_num = 0;
-
bool mac_addr_valid = false;
bool fin_seq_set = false; // FIXIT-M should be obviated by tcp state
};