#define STREAM_UNALIGNED 0
#define STREAM_ALIGNED 1
-#define MQ_NONE 0
-#define MQ_BYTES 1
-#define MQ_SEGS 2
-
#define STREAM_DEFAULT_MAX_SMALL_SEG_SIZE 0 /* disabled */
#define STREAM_DEFAULT_CONSEC_SMALL_SEGS 0 /* disabled */
{ CountType::SUM, "zero_win_probes", "number of tcp zero window probes" },
{ CountType::SUM, "proxy_mode_flows", "number of flows set to proxy normalization policy" },
{ CountType::SUM, "full_retransmits", "number of fully retransmitted segments" },
+ { CountType::SUM, "flush_on_asymmetric_flow", "number of flushes on asymmetric flows" },
{ CountType::END, nullptr, nullptr }
};
PegCount zero_win_probes;
PegCount proxy_mode_flows;
PegCount full_retransmits;
+ PegCount flush_on_asymmetric_flow;
};
extern THREAD_LOCAL struct TcpStats tcpStats;
auto bytes_skipped = ( tsn->c_len <= remaining_bytes ) ? tsn->c_len : remaining_bytes;
remaining_bytes -= bytes_skipped;
- tsn->update_ressembly_lengths(bytes_skipped);
+ tsn->update_reassembly_cursor(bytes_skipped);
if ( !tsn->c_len )
{
}
total_flushed += bytes_copied;
- tsn->update_ressembly_lengths(bytes_copied);
+ tsn->update_reassembly_cursor(bytes_copied);
flags = 0;
if ( !tsn->c_len )
{
if ( !trs.sos.seglist.head )
return true;
-
- uint32_t start, end = (trs.sos.seglist.tail->i_seq + trs.sos.seglist.tail->i_len);
+ // Left side
+ uint32_t start;
if ( SEQ_LT(trs.sos.seglist_base_seq, trs.sos.seglist.head->i_seq) )
start = trs.sos.seglist_base_seq;
else
start = trs.sos.seglist.head->i_seq;
- // Left side
if ( SEQ_LEQ(tsd.get_end_seq(), start) )
return false;
// Right side
+ uint32_t end = (trs.sos.seglist.tail->i_seq + trs.sos.seglist.tail->i_len);
if ( SEQ_GEQ(tsd.get_seq(), end) )
return false;
PacketTracer::log("Stream: Skipped %u packets before seglist hole)\n", packets_skipped);
}
+void TcpReassembler::reset_asymmetric_flow_reassembly(TcpReassemblerState& trs)
+{
+ TcpSegmentNode* tsn = trs.sos.seglist.head;
+ // if there is a hole at the beginning, skip it...
+ if ( SEQ_GT(tsn->i_seq, trs.sos.seglist_base_seq) )
+ {
+ trs.sos.seglist_base_seq = tsn->i_seq;
+ if (PacketTracer::is_active())
+ PacketTracer::log("Stream: Skipped hole at beginning of the seglist\n");
+ }
+
+ while ( tsn )
+ {
+ if ( tsn->next and SEQ_GT(tsn->next->i_seq, tsn->i_seq + tsn->i_len) )
+ {
+ tsn = tsn->next;
+ purge_segments_left_of_hole(trs, tsn);
+ trs.sos.seglist_base_seq = trs.sos.seglist.head->i_seq;
+ }
+ else
+ tsn = tsn->next;
+ }
+
+ if ( trs.tracker->is_splitter_paf() )
+ fallback(*trs.tracker, trs.server_side);
+ else
+ paf_reset(&trs.paf_state);
+}
+
void TcpReassembler::skip_midstream_pickup_seglist_hole(TcpReassemblerState& trs, TcpSegmentDescriptor& tsd)
{
uint32_t ack = tsd.get_ack();
trs.tracker->rcv_nxt = ack;
}
+bool TcpReassembler::flush_on_asymmetric_flow(const TcpReassemblerState &trs, uint32_t flushed, snort::Packet *p)
+{
+ bool asymmetric = flushed && trs.sos.seg_count && !p->flow->two_way_traffic() && !p->ptrs.tcph->is_syn();
+ if ( asymmetric )
+ {
+ TcpStreamTracker::TcpState peer = trs.tracker->session->get_peer_state(trs.tracker);
+ asymmetric = ( peer == TcpStreamTracker::TCP_SYN_SENT || peer == TcpStreamTracker::TCP_SYN_RECV
+ || peer == TcpStreamTracker::TCP_MID_STREAM_SENT );
+ }
+
+ return asymmetric;
+}
+
// iterate over trs.sos.seglist and scan all new acked bytes
// - new means not yet scanned
// - must use trs.sos.seglist data (not packet) since this packet may plug a
return ret_val;
}
+// we are on a FIN, the data has been scanned, it has no gaps,
+// but somehow we are waiting for more data - do final flush here
+// FIXIT-M this convoluted expression needs some refactoring to simplify
+bool TcpReassembler::final_flush_on_fin(const TcpReassemblerState &trs, int32_t flush_amt, Packet *p)
+{
+ return trs.tracker->fin_seq_status >= TcpStreamTracker::FIN_WITH_SEQ_SEEN
+ && -1 <= flush_amt && flush_amt <= 0
+ && trs.paf_state.paf == StreamSplitter::SEARCH
+ && !p->flow->searching_for_service();
+}
+
int TcpReassembler::flush_on_data_policy(TcpReassemblerState& trs, Packet* p)
{
uint32_t flushed = 0;
break;
flushed += flush_to_seq(trs, flush_amt, p, flags);
- }
- while ( trs.sos.seglist.head and !p->flow->is_inspection_disabled() );
+ } while ( trs.sos.seglist.head and !p->flow->is_inspection_disabled() );
if ( (trs.paf_state.paf == StreamSplitter::ABORT) && trs.tracker->is_splitter_paf() )
{
fallback(*trs.tracker, trs.server_side);
return flush_on_data_policy(trs, p);
}
- else if ( trs.tracker->fin_seq_status >= TcpStreamTracker::FIN_WITH_SEQ_SEEN and
- -1 <= flush_amt and flush_amt <= 0 and
- trs.paf_state.paf == StreamSplitter::SEARCH and
- !p->flow->searching_for_service() )
- {
- // we are on a FIN, the data has been scanned, it has no gaps,
- // but somehow we are waiting for more data - do final flush here
+ else if ( final_flush_on_fin(trs, flush_amt, p) )
finish_and_final_flush(trs, p->flow, true, p);
- }
}
break;
}
if ( trs.tracker->is_retransmit_of_held_packet(p) )
flushed = perform_partial_flush(trs, p, flushed);
- // FIXIT-M a drop rule will yoink the seglist out from under us
- // because apply_delayed_action is only deferred to end of context
- // this is causing stability issues
- if ( flushed and trs.sos.seg_count and
- !trs.sos.session->flow->two_way_traffic() and !p->ptrs.tcph->is_syn() )
+ if ( flush_on_asymmetric_flow(trs, flushed, p) )
{
- TcpStreamTracker::TcpState peer = trs.tracker->session->get_peer_state(trs.tracker);
-
- if ( peer == TcpStreamTracker::TCP_SYN_SENT || peer == TcpStreamTracker::TCP_SYN_RECV )
- {
purge_to_seq(trs, trs.sos.seglist.head->i_seq + flushed);
trs.tracker->r_win_base = trs.sos.seglist_base_seq;
- }
+ tcpStats.flush_on_asymmetric_flow++;
}
+
return flushed;
}
#define TCP_REASSEMBLER_H
#include "stream/stream.h"
-#include "stream/tcp/segment_overlap_editor.h"
+
+#include "segment_overlap_editor.h"
class TcpReassembler : public SegmentOverlapEditor
{
uint32_t event_id, uint32_t event_second);
virtual void purge_alerts(TcpReassemblerState&);
virtual bool segment_within_seglist_window(TcpReassemblerState&, TcpSegmentDescriptor&);
+ void reset_asymmetric_flow_reassembly(TcpReassemblerState&);
void skip_midstream_pickup_seglist_hole(TcpReassemblerState&, TcpSegmentDescriptor&);
void initialize_paf(TcpReassemblerState& trs)
{
int32_t flush_amt);
uint32_t perform_partial_flush(TcpReassemblerState&, snort::Packet*, uint32_t flushed = 0);
+
+private:
+ bool final_flush_on_fin(const TcpReassemblerState&, int32_t flush_amt, snort::Packet*);
+ bool flush_on_asymmetric_flow(const TcpReassemblerState &trs, uint32_t flushed, snort::Packet *p);
};
#endif
void skip_midstream_pickup_seglist_hole(TcpSegmentDescriptor& tsd)
{ reassembler->skip_midstream_pickup_seglist_hole(trs, tsd); }
+ void reset_asymmetric_flow_reassembly()
+ { reassembler->reset_asymmetric_flow_reassembly(trs); }
+
void initialize_paf()
{ reassembler->initialize_paf(trs); }
return SEQ_LT((c_seq + c_len), to_seq);
}
- void update_ressembly_lengths(uint16_t bytes)
+ void update_reassembly_cursor(uint16_t bytes)
{
c_seq += bytes;
c_len -= bytes;
DataBus::publish(intrinsic_pub_id, IntrinsicEventIds::FLOW_STATE_CHANGE, nullptr, flow);
}
-bool TcpSession::flow_exceeds_config_thresholds(TcpSegmentDescriptor& tsd)
-{
- TcpStreamTracker* listener = tsd.get_listener();
-
- if ( listener->get_flush_policy() == STREAM_FLPOLICY_IGNORE )
- return true;
-
- // FIXIT-M any discards must be counted and in many cases alerted as well
- // (count all but alert at most once per flow)
- // three cases in this function; look for others
- if ( ( tcp_config->flags & STREAM_CONFIG_NO_ASYNC_REASSEMBLY ) && !flow->two_way_traffic() )
- return true;
-
- if ( tcp_config->max_consec_small_segs )
- {
- if ( tsd.get_len() >= tcp_config->max_consec_small_seg_size )
- listener->small_seg_count = 0;
-
- else if ( ++listener->small_seg_count == tcp_config->max_consec_small_segs )
- tel.set_tcp_event(EVENT_MAX_SMALL_SEGS_EXCEEDED);
- }
-
- if ( tcp_config->max_queued_bytes )
- {
- int32_t space_left =
- tcp_config->max_queued_bytes - listener->reassembler.get_seg_bytes_total();
-
- if ( space_left < (int32_t)tsd.get_len() )
- {
- tcpStats.exceeded_max_bytes++;
- bool inline_mode = tsd.is_nap_policy_inline();
- bool ret_val = true;
-
- if ( space_left > 0 )
- ret_val = !inline_mode; // For partial trim, reassemble only if we can force an inject
- else
- space_left = 0;
-
- if ( inline_mode )
- {
- if ( listener->max_queue_exceeded == MQ_NONE )
- {
- listener->max_queue_seq_nxt = tsd.get_seq() + space_left;
- listener->max_queue_exceeded = MQ_BYTES;
- }
- else
- (const_cast<tcp::TCPHdr*>(tsd.get_pkt()->ptrs.tcph))->set_seq(listener->max_queue_seq_nxt);
- }
-
- if( listener->reassembler.segment_within_seglist_window(tsd) )
- return false;
-
- if ( inline_mode || listener->normalizer.get_trim_win() == NORM_MODE_ON)
- {
- tel.set_tcp_event(EVENT_MAX_QUEUED_BYTES_EXCEEDED);
- listener->normalizer.log_drop_reason(tsd, inline_mode, "stream",
- "Stream: Flow exceeded the configured max byte threshold (" + std::to_string(tcp_config->max_queued_bytes) +
- "). You may want to adjust the 'max_bytes' parameter in the NAP policy"
- " to a higher value, or '0' for unlimited.\n");
- }
-
- listener->normalizer.trim_win_payload(tsd, space_left, inline_mode);
- return ret_val;
- }
- else if ( listener->max_queue_exceeded == MQ_BYTES )
- listener->max_queue_exceeded = MQ_NONE;
- }
-
- if ( tcp_config->max_queued_segs )
- {
- if ( listener->reassembler.get_seg_count() + 1 > tcp_config->max_queued_segs )
- {
- tcpStats.exceeded_max_segs++;
- bool inline_mode = tsd.is_nap_policy_inline();
-
- if ( inline_mode )
- {
- if ( listener->max_queue_exceeded == MQ_NONE )
- {
- listener->max_queue_seq_nxt = tsd.get_seq();
- listener->max_queue_exceeded = MQ_SEGS;
- }
- else
- (const_cast<tcp::TCPHdr*>(tsd.get_pkt()->ptrs.tcph))->set_seq(listener->max_queue_seq_nxt);
- }
-
- if( listener->reassembler.segment_within_seglist_window(tsd) )
- return false;
-
- if ( inline_mode || listener->normalizer.get_trim_win() == NORM_MODE_ON)
- {
- tel.set_tcp_event(EVENT_MAX_QUEUED_SEGS_EXCEEDED);
- listener->normalizer.log_drop_reason(tsd, inline_mode, "stream",
- "Stream: Flow exceeded the configured max segment threshold (" + std::to_string(tcp_config->max_queued_segs) +
- "). You may want to adjust the 'max_segments' parameter in the NAP policy"
- " to a higher value, or '0' for unlimited.\n");
- }
-
- listener->normalizer.trim_win_payload(tsd, 0, inline_mode);
- return true;
- }
- else if ( listener->max_queue_exceeded == MQ_SEGS )
- listener->max_queue_exceeded = MQ_NONE;
- }
-
- return false;
-}
-
void TcpSession::update_stream_order(const TcpSegmentDescriptor& tsd, bool aligned)
{
TcpStreamTracker* listener = tsd.get_listener();
set_pkt_action_flag(ACTION_BAD_PKT);
}
+int32_t TcpSession::kickstart_asymmetric_flow(const TcpSegmentDescriptor& tsd, TcpStreamTracker* listener)
+{
+ listener->reassembler.reset_asymmetric_flow_reassembly();
+ listener->reassembler.flush_on_data_policy(tsd.get_pkt());
+
+ int32_t space_left =
+ tcp_config->max_queued_bytes - listener->reassembler.get_seg_bytes_total();
+
+ if ( listener->get_tcp_state() == TcpStreamTracker::TCP_MID_STREAM_RECV )
+ {
+ listener->set_tcp_state(TcpStreamTracker::TCP_ESTABLISHED);
+ if (PacketTracer::is_active())
+ PacketTracer::log("Stream: Kickstart of midstream asymmetric flow! Seglist queue space: %u\n",
+ space_left );
+ }
+ else
+ {
+ if (PacketTracer::is_active())
+ PacketTracer::log("Stream: Kickstart of asymmetric flow! Seglist queue space: %u\n",
+ space_left );
+ }
+
+ return space_left;
+}
+
+bool TcpSession::check_reassembly_queue_thresholds(TcpSegmentDescriptor& tsd, TcpStreamTracker* listener)
+{
+ // if this packet fits within the current queue limit window then it's good
+ if( listener->reassembler.segment_within_seglist_window(tsd) )
+ return false;
+
+ bool inline_mode = tsd.is_nap_policy_inline();
+
+ if ( tcp_config->max_queued_bytes )
+ {
+ int32_t space_left =
+ tcp_config->max_queued_bytes - listener->reassembler.get_seg_bytes_total();
+
+ if ( space_left < (int32_t)tsd.get_len() )
+ {
+ tcpStats.exceeded_max_bytes++;
+ bool ret_val = true;
+
+ // if inline and this is an asymmetric flow then skip over any seglist holes
+ // and flush to free up seglist space
+ if ( tsd.is_ips_policy_inline() && !tsd.get_pkt()->flow->two_way_traffic() )
+ {
+ space_left = kickstart_asymmetric_flow(tsd, listener);
+ if ( space_left >= (int32_t)tsd.get_len() )
+ return false;
+ }
+
+ if ( space_left > 0 )
+ ret_val = !inline_mode; // For partial trim, reassemble only if we can force an inject
+ else
+ space_left = 0;
+
+ if ( inline_mode || listener->normalizer.get_trim_win() == NORM_MODE_ON)
+ {
+ // FIXIT-M - only alert once per threshold exceeded event
+ tel.set_tcp_event(EVENT_MAX_QUEUED_BYTES_EXCEEDED);
+ listener->normalizer.log_drop_reason(tsd, inline_mode, "stream",
+ "Stream: Flow exceeded the configured max byte threshold (" + std::to_string(tcp_config->max_queued_bytes) +
+ "). You may want to adjust the 'max_bytes' parameter in the NAP policy"
+ " to a higher value, or '0' for unlimited.\n");
+ }
+
+ listener->normalizer.trim_win_payload(tsd, space_left, inline_mode);
+ return ret_val;
+ }
+ }
+
+ if ( tcp_config->max_queued_segs )
+ {
+ if ( listener->reassembler.get_seg_count() + 1 > tcp_config->max_queued_segs )
+ {
+ tcpStats.exceeded_max_segs++;
+
+ // if inline and this is an asymmetric flow then skip over any seglist holes
+ // and flush to free up seglist space
+ if ( tsd.is_ips_policy_inline() && !tsd.get_pkt()->flow->two_way_traffic() )
+ {
+ kickstart_asymmetric_flow(tsd, listener);
+ if ( listener->reassembler.get_seg_count() + 1 <= tcp_config->max_queued_segs )
+ return false;
+ }
+
+ if ( inline_mode || listener->normalizer.get_trim_win() == NORM_MODE_ON)
+ {
+ // FIXIT-M - only alert once per threshold exceeded event
+ tel.set_tcp_event(EVENT_MAX_QUEUED_SEGS_EXCEEDED);
+ listener->normalizer.log_drop_reason(tsd, inline_mode, "stream",
+ "Stream: Flow exceeded the configured max segment threshold (" + std::to_string(tcp_config->max_queued_segs) +
+ "). You may want to adjust the 'max_segments' parameter in the NAP policy"
+ " to a higher value, or '0' for unlimited.\n");
+ }
+
+ listener->normalizer.trim_win_payload(tsd, 0, inline_mode);
+ return true;
+ }
+ }
+
+ return false;
+}
+
+bool TcpSession::filter_packet_for_reassembly(TcpSegmentDescriptor& tsd, TcpStreamTracker* listener)
+{
+ if ( tsd.are_packet_flags_set(PKT_IGNORE)
+ or listener->get_flush_policy() == STREAM_FLPOLICY_IGNORE
+ or ( ( tcp_config->flags & STREAM_CONFIG_NO_ASYNC_REASSEMBLY ) && !flow->two_way_traffic() ) )
+ return false;
+
+ return !check_reassembly_queue_thresholds(tsd, listener);
+}
+
+void TcpSession::check_small_segment_threshold(const TcpSegmentDescriptor &tsd, TcpStreamTracker *listener)
+{
+ // alert if small segments threshold is exceeded
+ if (tcp_config->max_consec_small_segs)
+ {
+ if (tsd.get_len() >= tcp_config->max_consec_small_seg_size)
+ listener->small_seg_count = 0;
+ else if (++listener->small_seg_count == tcp_config->max_consec_small_segs)
+ tel.set_tcp_event(EVENT_MAX_SMALL_SEGS_EXCEEDED);
+ }
+}
+
void TcpSession::handle_data_segment(TcpSegmentDescriptor& tsd, bool flush)
{
TcpStreamTracker* listener = tsd.get_listener();
listener->rcv_nxt = tsd.get_end_seq();
update_stream_order(tsd, stream_is_inorder);
+ check_small_segment_threshold(tsd, listener);
// don't queue data if we are ignoring or queue thresholds are exceeded
- if ( !tsd.are_packet_flags_set(PKT_IGNORE) and !flow_exceeds_config_thresholds(tsd) )
+ if ( filter_packet_for_reassembly(tsd, listener) )
{
set_packet_header_foo(tsd);
listener->reassembler.queue_packet_for_reassembly(tsd);
private:
int process_tcp_packet(TcpSegmentDescriptor&, const snort::Packet*);
void set_os_policy() override;
- bool flow_exceeds_config_thresholds(TcpSegmentDescriptor&);
void update_stream_order(const TcpSegmentDescriptor&, bool aligned);
void swap_trackers();
void init_session_on_syn(TcpSegmentDescriptor&);
void init_tcp_packet_analysis(TcpSegmentDescriptor&);
void check_events_and_actions(const TcpSegmentDescriptor& tsd);
void flush_tracker(TcpStreamTracker&, snort::Packet*, uint32_t dir, bool final_flush);
+ bool check_reassembly_queue_thresholds(TcpSegmentDescriptor&, TcpStreamTracker*);
+ bool filter_packet_for_reassembly(TcpSegmentDescriptor&, TcpStreamTracker*);
+ void check_small_segment_threshold(const TcpSegmentDescriptor&, TcpStreamTracker*);
+ int32_t kickstart_asymmetric_flow(const TcpSegmentDescriptor&, TcpStreamTracker*);
private:
TcpStateMachine* tsm;
#ifndef TCP_STREAM_TRACKER_H
#define TCP_STREAM_TRACKER_H
+#include <cstdint>
#include <list>
#include "stream/paf.h"
uint32_t r_win_base = 0; // remote side window base sequence number (the last ack we got)
uint32_t small_seg_count = 0;
- uint32_t max_queue_seq_nxt = 0; // next expected sequence once queue limit is exceeded
- uint8_t max_queue_exceeded = MQ_NONE;
uint8_t order = IN_SEQUENCE;
FinSeqNumStatus fin_seq_status = TcpStreamTracker::FIN_NOT_SEEN;