From: Davis McPherson -X (davmcphe - XORIANT CORPORATION at Cisco) Date: Fri, 13 Dec 2024 20:39:49 +0000 (+0000) Subject: Pull request #4525: stream_tcp: refactor tcp reasseabler class structure and init... X-Git-Tag: 3.6.1.0~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d2ab1bc24658f431f59b3251c7bbadf459e6a5f9;p=thirdparty%2Fsnort3.git Pull request #4525: stream_tcp: refactor tcp reasseabler class structure and init to avoid thread data race scenarios Merge in SNORT/snort3 from ~DAVMCPHE/snort3:tcp_reassembly_ignore_tsan_fix to master Squashed commit of the following: commit 25a35b3442f010d4d242b27e18fde5d9e2ac61f3 Author: davis mcpherson Date: Tue Nov 19 14:49:46 2024 -0500 stream_tcp: refactor tcp reasseabler class structure and init to avoid thread data race scenarios --- diff --git a/src/stream/stream.cc b/src/stream/stream.cc index e45358efb..3b9a808a6 100644 --- a/src/stream/stream.cc +++ b/src/stream/stream.cc @@ -874,6 +874,7 @@ TEST_CASE("Stream API", "[stream_api][stream]") { // initialization code here TcpNormalizerFactory::initialize(); + TcpReassembler::tinit(); Flow* flow = new Flow; InspectionPolicy ins; set_inspection_policy(&ins); @@ -989,6 +990,7 @@ TEST_CASE("Stream API", "[stream_api][stream]") delete flow; TcpNormalizerFactory::term(); + TcpReassembler::tterm(); } #endif diff --git a/src/stream/tcp/ips_stream_reassemble.cc b/src/stream/tcp/ips_stream_reassemble.cc index 866857527..3cb7c9543 100644 --- a/src/stream/tcp/ips_stream_reassemble.cc +++ b/src/stream/tcp/ips_stream_reassemble.cc @@ -276,6 +276,7 @@ TEST_CASE("IPS Stream Reassemble", "[ips_stream_reassemble][stream_tcp]") REQUIRE( reassembler != nullptr ); TcpNormalizerFactory::initialize(); + TcpReassembler::tinit(); Flow* flow = new Flow; Packet* pkt = get_syn_packet(flow); @@ -310,9 +311,10 @@ TEST_CASE("IPS Stream Reassemble", "[ips_stream_reassemble][stream_tcp]") } #endif release_packet(pkt); - TcpNormalizerFactory::term(); delete flow; ips_stream_reassemble->mod_dtor(reassembler); + TcpNormalizerFactory::term(); + TcpReassembler::tterm(); } #endif diff --git a/src/stream/tcp/tcp_reassembler.cc b/src/stream/tcp/tcp_reassembler.cc index 70c8f879a..ffaaae66e 100644 --- a/src/stream/tcp/tcp_reassembler.cc +++ b/src/stream/tcp/tcp_reassembler.cc @@ -43,7 +43,7 @@ using namespace snort; -void TcpReassembler::init(bool server, StreamSplitter* ss) +void TcpReassemblerBase::init(bool server, StreamSplitter* ss) { splitter = ss; paf.paf_setup(ss); @@ -66,20 +66,20 @@ void TcpReassembler::init(bool server, StreamSplitter* ss) } } -bool TcpReassembler::fin_no_gap(const TcpSegmentNode& tsn) +bool TcpReassemblerBase::fin_no_gap(const TcpSegmentNode& tsn) { return tracker.fin_seq_status >= FIN_WITH_SEQ_SEEN and SEQ_GEQ(tsn.next_seq(), tracker.get_fin_i_seq()); } -bool TcpReassembler::fin_acked_no_gap(const TcpSegmentNode& tsn) +bool TcpReassemblerBase::fin_acked_no_gap(const TcpSegmentNode& tsn) { return tracker.fin_seq_status >= FIN_WITH_SEQ_ACKED and SEQ_GEQ(tsn.next_seq(), tracker.get_fin_i_seq()); } // If we are skipping seglist hole, update tsn so that we can purge -void TcpReassembler::update_skipped_bytes(uint32_t remaining_bytes) +void TcpReassemblerBase::update_skipped_bytes(uint32_t remaining_bytes) { TcpSegmentNode* tsn; @@ -98,7 +98,7 @@ void TcpReassembler::update_skipped_bytes(uint32_t remaining_bytes) } } -void TcpReassembler::purge_to_seq(uint32_t flush_seq) +void TcpReassemblerBase::purge_to_seq(uint32_t flush_seq) { seglist.purge_flushed_segments(flush_seq); @@ -116,7 +116,7 @@ void TcpReassembler::purge_to_seq(uint32_t flush_seq) // part of a segment // * FIXIT-L need flag to mark any reassembled packets that have a gap // (if we reassemble such) -void TcpReassembler::purge_flushed_ackd() +void TcpReassemblerBase::purge_flushed_ackd() { if ( !seglist.head ) return; @@ -138,7 +138,7 @@ void TcpReassembler::purge_flushed_ackd() purge_to_seq(seq); } -void TcpReassembler::show_rebuilt_packet(Packet* pkt) +void TcpReassemblerBase::show_rebuilt_packet(Packet* pkt) { if ( seglist.session->tcp_config->flags & STREAM_CONFIG_SHOW_PACKETS ) { @@ -149,7 +149,7 @@ void TcpReassembler::show_rebuilt_packet(Packet* pkt) } } -int TcpReassembler::flush_data_segments(uint32_t flush_len, Packet* pdu) +int TcpReassemblerBase::flush_data_segments(uint32_t flush_len, Packet* pdu) { uint32_t flags = PKT_PDU_HEAD; @@ -213,7 +213,7 @@ int TcpReassembler::flush_data_segments(uint32_t flush_len, Packet* pdu) } // FIXIT-L consolidate encode format, update, and this into new function? -void TcpReassembler::prep_pdu(Flow* flow, Packet* p, uint32_t pkt_flags, Packet* pdu) +void TcpReassemblerBase::prep_pdu(Flow* flow, Packet* p, uint32_t pkt_flags, Packet* pdu) { pdu->ptrs.set_pkt_type(PktType::PDU); pdu->proto_bits |= PROTO_BIT__TCP; @@ -260,7 +260,7 @@ void TcpReassembler::prep_pdu(Flow* flow, Packet* p, uint32_t pkt_flags, Packet* } } -Packet* TcpReassembler::initialize_pdu(Packet* p, uint32_t pkt_flags, struct timeval tv) +Packet* TcpReassemblerBase::initialize_pdu(Packet* p, uint32_t pkt_flags, struct timeval tv) { // partial flushes already set the pdu for http_inspect splitter processing Packet* pdu = p->was_set() ? p : DetectionEngine::set_next_packet(p); @@ -290,7 +290,7 @@ Packet* TcpReassembler::initialize_pdu(Packet* p, uint32_t pkt_flags, struct tim } // flush a seglist up to the given point, generate a pseudopacket, and fire it thru the system. -int TcpReassembler::flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags) +int TcpReassemblerBase::flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags) { assert( p && seglist.cur_rseg); @@ -344,7 +344,7 @@ int TcpReassembler::flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags) return flushed_bytes; } -int TcpReassembler::do_zero_byte_flush(Packet* p, uint32_t pkt_flags) +int TcpReassemblerBase::do_zero_byte_flush(Packet* p, uint32_t pkt_flags) { unsigned bytes_copied = 0; @@ -371,7 +371,7 @@ int TcpReassembler::do_zero_byte_flush(Packet* p, uint32_t pkt_flags) // get the footprint for the current seglist, the difference // between our base sequence and the last ack'd sequence we received -uint32_t TcpReassembler::get_q_footprint() +uint32_t TcpReassemblerBase::get_q_footprint() { int32_t footprint = 0; int32_t sequenced = 0; @@ -389,7 +389,7 @@ uint32_t TcpReassembler::get_q_footprint() // boosted by tracking sequenced bytes as seglist is updated // to avoid the while loop, etc. below. -uint32_t TcpReassembler::get_q_sequenced() +uint32_t TcpReassemblerBase::get_q_sequenced() { TcpSegmentNode* tsn = seglist.cur_rseg; @@ -423,7 +423,7 @@ uint32_t TcpReassembler::get_q_sequenced() return len; } -bool TcpReassembler::is_q_sequenced() +bool TcpReassemblerBase::is_q_sequenced() { TcpSegmentNode* tsn = seglist.cur_rseg; @@ -449,7 +449,7 @@ bool TcpReassembler::is_q_sequenced() return (tsn->unscanned() != 0); } -void TcpReassembler::final_flush(Packet* p, uint32_t dir) +void TcpReassemblerBase::final_flush(Packet* p, uint32_t dir) { tracker.set_tf_flags(TF_FORCE_FLUSH); @@ -504,7 +504,7 @@ static Packet* get_packet(Flow* flow, uint32_t flags, bool c2s) return p; } -bool TcpReassembler::splitter_finish(snort::Flow* flow) +bool TcpReassemblerBase::splitter_finish(snort::Flow* flow) { if (!splitter) return true; @@ -519,7 +519,7 @@ bool TcpReassembler::splitter_finish(snort::Flow* flow) return false; } -void TcpReassembler::finish_and_final_flush(Flow* flow, bool clear, Packet* p) +void TcpReassemblerBase::finish_and_final_flush(Flow* flow, bool clear, Packet* p) { bool pending = clear and paf.paf_initialized() and splitter_finish(flow); @@ -528,7 +528,7 @@ void TcpReassembler::finish_and_final_flush(Flow* flow, bool clear, Packet* p) } // Call this only from outside reassembly. -void TcpReassembler::flush_queued_segments(Flow* flow, bool clear, Packet* p) +void TcpReassemblerBase::flush_queued_segments(Flow* flow, bool clear, Packet* p) { if ( p ) { @@ -551,7 +551,7 @@ void TcpReassembler::flush_queued_segments(Flow* flow, bool clear, Packet* p) } -void TcpReassembler::check_first_segment_hole() +void TcpReassemblerBase::check_first_segment_hole() { if ( SEQ_LT(seglist.seglist_base_seq, seglist.head->start_seq()) ) { @@ -561,14 +561,14 @@ void TcpReassembler::check_first_segment_hole() } } -uint32_t TcpReassembler::perform_partial_flush(Flow* flow, Packet*& p) +uint32_t TcpReassemblerBase::perform_partial_flush(Flow* flow, Packet*& p) { p = get_packet(flow, packet_dir, server_side); return perform_partial_flush(p); } // No error checking here, so the caller must ensure that p, p->flow are not null. -uint32_t TcpReassembler::perform_partial_flush(Packet* p) +uint32_t TcpReassemblerBase::perform_partial_flush(Packet* p) { uint32_t flushed = 0; if ( splitter->init_partial_flush(p->flow) ) @@ -590,7 +590,7 @@ uint32_t TcpReassembler::perform_partial_flush(Packet* p) // we are on a FIN, the data has been scanned, it has no gaps, // but somehow we are waiting for more data - do final flush here // FIXIT-M this convoluted expression needs some refactoring to simplify -bool TcpReassembler::final_flush_on_fin(int32_t flush_amt, Packet *p, FinSeqNumStatus fin_status) +bool TcpReassemblerBase::final_flush_on_fin(int32_t flush_amt, Packet *p, FinSeqNumStatus fin_status) { return tracker.fin_seq_status >= fin_status && -1 <= flush_amt && flush_amt <= 0 @@ -598,7 +598,7 @@ bool TcpReassembler::final_flush_on_fin(int32_t flush_amt, Packet *p, FinSeqNumS && !p->flow->searching_for_service(); } -bool TcpReassembler::asymmetric_flow_flushed(uint32_t flushed, snort::Packet *p) +bool TcpReassemblerBase::asymmetric_flow_flushed(uint32_t flushed, snort::Packet *p) { bool asymmetric = flushed && seglist.seg_count && !p->flow->two_way_traffic() && !p->ptrs.tcph->is_syn(); if ( asymmetric ) @@ -611,27 +611,48 @@ bool TcpReassembler::asymmetric_flow_flushed(uint32_t flushed, snort::Packet *p) return asymmetric; } -// define a tracker to assign to the Ignore Flush Policy reassembler and -// create an instance of the Ignore reassembler. When reassembly is ignored -// all operations are no-ops so a single instance is created and shared by -// all TCP sessions with a flush policy of ignore. Minimal initialization -// is performed as no state or processing is done by this reassembler. -// The ignore tracker reassembler member variable is set to the ignore -// reassembler as it is referenced in the dtor of the tracker. The ignore -// tracker is never assigned or used by a Flow, it's only purpose is to -// be passed into the ignore reassembler (along with the tracker's seglist) -TcpStreamTracker ignore_tracker(false); -TcpReassemblerIgnore* tcp_ignore_reassembler = new TcpReassemblerIgnore(); - -TcpReassemblerIgnore::TcpReassemblerIgnore() - : TcpReassembler(ignore_tracker, ignore_tracker.seglist) +// Allocate an instance of the ignore reassembler for the client side trackers +// and the server side trackers for each packet thread. The ignore reassemblers +// do not maintain any state so are shared by all TCP sessions of a packet thread. +// The server and client instances are created during thread initialization and +// deleted at thread termination. +static THREAD_LOCAL TcpReassemblerIgnore* ignore_reassembler_server = nullptr; +static THREAD_LOCAL TcpReassemblerIgnore* ignore_reassembler_client = nullptr; + +void TcpReassembler::tinit() +{ + ignore_reassembler_server = new TcpReassemblerIgnore(true); + ignore_reassembler_client = new TcpReassemblerIgnore(false); +} + +void TcpReassembler::tterm() +{ + delete ignore_reassembler_server; + ignore_reassembler_server = nullptr; + + delete ignore_reassembler_client; + ignore_reassembler_client = nullptr; +} + +TcpReassemblerIgnore::TcpReassemblerIgnore(bool server) { - tracker.reassembler = this; + server_side = server; + packet_dir = server ? PKT_FROM_CLIENT : PKT_FROM_SERVER; } +// FIXIT-M The caller should allocate the Packet when one is needed for a partial flush uint32_t TcpReassemblerIgnore::perform_partial_flush(snort::Flow* flow, snort::Packet*& p) { p = get_packet(flow, packet_dir, server_side); return 0; } +TcpReassemblerIgnore* TcpReassemblerIgnore::get_instance(bool server_tracker) +{ + if ( server_tracker ) + return ignore_reassembler_server; + else + return ignore_reassembler_client; +} + + diff --git a/src/stream/tcp/tcp_reassembler.h b/src/stream/tcp/tcp_reassembler.h index bb85b128b..1999632f9 100644 --- a/src/stream/tcp/tcp_reassembler.h +++ b/src/stream/tcp/tcp_reassembler.h @@ -52,31 +52,123 @@ public: FINAL_FLUSH_OK = -1 }; - TcpReassembler(TcpStreamTracker& trk, TcpReassemblySegments& seglist) - : tracker(trk), seglist(seglist) + TcpReassembler() { } virtual ~TcpReassembler() { } - virtual void init(bool server, snort::StreamSplitter* ss); + virtual void init(bool server, snort::StreamSplitter* ss) = 0; virtual int eval_flush_policy_on_ack(snort::Packet*) = 0; virtual int eval_flush_policy_on_data(snort::Packet*) = 0; virtual int eval_asymmetric_flush(snort::Packet*) = 0; virtual int flush_stream(snort::Packet*, uint32_t dir, bool final_flush = false) = 0; - virtual void flush_queued_segments(snort::Flow* flow, bool clear, snort::Packet* = nullptr); - virtual void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet*); - virtual uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&); - virtual void purge_flushed_ackd(); + virtual void flush_queued_segments(snort::Flow* flow, bool clear, snort::Packet* = nullptr) = 0; + virtual void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet*) = 0; + virtual uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) = 0; + virtual void purge_flushed_ackd() = 0; virtual FlushPolicy get_flush_policy() const = 0; + virtual void release_splitter() = 0; + virtual bool is_splitter_paf() const = 0; + virtual bool segment_already_scanned(uint32_t seq) = 0; + virtual void initialize_paf() = 0; + virtual void reset_paf() = 0; + virtual void clear_paf() = 0; + + // static methods for TcpReassembler per thread initialization and termination + static void tinit(); + static void tterm(); + +protected: + uint8_t packet_dir = 0; + bool server_side = true; +}; + +class TcpReassemblerIgnore : public TcpReassembler +{ +public: + TcpReassemblerIgnore(bool server); + + void init(bool, snort::StreamSplitter*) override + { } + + int eval_flush_policy_on_ack(snort::Packet*) override + { return 0; } + + int eval_flush_policy_on_data(snort::Packet*) override + { return 0; } + + int eval_asymmetric_flush(snort::Packet*) override + { return 0; } + + int flush_stream(snort::Packet*, uint32_t, bool) override + { return 0; } + + void flush_queued_segments(snort::Flow*, bool, snort::Packet*) override + { } + + void finish_and_final_flush(snort::Flow*, bool, snort::Packet*) override + { } + + uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) override; + + void purge_flushed_ackd() override + { } + + void release_splitter() override + { } + + bool is_splitter_paf() const override + { return false; } + + bool segment_already_scanned(uint32_t) override + { return false; } + + void reset_paf() override + { } + + void clear_paf() override + { } + + void initialize_paf() override + { } + + FlushPolicy get_flush_policy() const override + { return STREAM_FLPOLICY_IGNORE; } + + static TcpReassemblerIgnore* get_instance(bool server_side); +}; + +class TcpReassemblerBase : public TcpReassembler +{ +public: - void release_splitter() + // OK means FIN seen, data scanned, flush point not found, no gaps + enum ScanStatus { + FINAL_FLUSH_HOLD = -2, + FINAL_FLUSH_OK = -1 + }; + + TcpReassemblerBase(TcpStreamTracker& trk, TcpReassemblySegments& seglist) + : tracker(trk), seglist(seglist) + { } + + virtual ~TcpReassemblerBase() override + { } + + virtual void init(bool server, snort::StreamSplitter* ss) override; + virtual void flush_queued_segments(snort::Flow* flow, bool clear, snort::Packet* = nullptr) override; + virtual void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet*) override; + virtual uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) override; + virtual void purge_flushed_ackd() override; + + void release_splitter() override { splitter = nullptr; } - bool is_splitter_paf() const + bool is_splitter_paf() const override { return splitter && splitter->is_paf(); } - bool segment_already_scanned(uint32_t seq) + bool segment_already_scanned(uint32_t seq) override { if ( paf.paf_initialized() and SEQ_GT(paf.pos, seq) ) return true; @@ -84,7 +176,7 @@ public: return false; } - virtual void initialize_paf() + virtual void initialize_paf() override { assert( get_flush_policy() != STREAM_FLPOLICY_IGNORE ); @@ -96,10 +188,10 @@ public: paf.paf_initialize(seglist.head->start_seq()); } - void reset_paf() + void reset_paf() override { paf.paf_reset(); } - void clear_paf() + void clear_paf() override { paf.paf_clear(); } protected: @@ -131,52 +223,8 @@ protected: snort::Packet* last_pdu = nullptr; uint8_t ignore_dir = 0; - uint8_t packet_dir = 0; - bool server_side = true; bool splitter_finish_flag = false; }; -class TcpReassemblerIgnore : public TcpReassembler -{ -public: - TcpReassemblerIgnore(); - ~TcpReassemblerIgnore() override - { } - - void init(bool, snort::StreamSplitter*) override - { } - - int eval_flush_policy_on_ack(snort::Packet*) override - { return 0; } - - int eval_flush_policy_on_data(snort::Packet*) override - { return 0; } - - int eval_asymmetric_flush(snort::Packet*) override - { return 0; } - - int flush_stream(snort::Packet*, uint32_t, bool) override - { return 0; } - - void flush_queued_segments(snort::Flow*, bool, snort::Packet*) override - { } - - void finish_and_final_flush(snort::Flow*, bool, snort::Packet*) override - { } - - uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) override; - - void purge_flushed_ackd() override - { } - - virtual void initialize_paf() override - { } - - FlushPolicy get_flush_policy() const override - { return STREAM_FLPOLICY_IGNORE; } -}; - -extern TcpReassemblerIgnore* tcp_ignore_reassembler; - #endif diff --git a/src/stream/tcp/tcp_reassembler_ids.h b/src/stream/tcp/tcp_reassembler_ids.h index 099d5c33b..803291d19 100644 --- a/src/stream/tcp/tcp_reassembler_ids.h +++ b/src/stream/tcp/tcp_reassembler_ids.h @@ -33,11 +33,11 @@ class TcpSegmentDescriptor; class TcpSegmentNode; -class TcpReassemblerIds : public TcpReassembler +class TcpReassemblerIds : public TcpReassemblerBase { public: TcpReassemblerIds(TcpStreamTracker& trk, TcpReassemblySegments& sl) - : TcpReassembler(trk, sl) + : TcpReassemblerBase(trk, sl) { } ~TcpReassemblerIds() override diff --git a/src/stream/tcp/tcp_reassembler_ips.h b/src/stream/tcp/tcp_reassembler_ips.h index 3a81efd1e..39914eb5e 100644 --- a/src/stream/tcp/tcp_reassembler_ips.h +++ b/src/stream/tcp/tcp_reassembler_ips.h @@ -34,11 +34,11 @@ class TcpSegmentDescriptor; class TcpSegmentNode; -class TcpReassemblerIps : public TcpReassembler +class TcpReassemblerIps : public TcpReassemblerBase { public: TcpReassemblerIps(TcpStreamTracker& trk, TcpReassemblySegments& sl) - : TcpReassembler(trk, sl) + : TcpReassemblerBase(trk, sl) { } ~TcpReassemblerIps() override diff --git a/src/stream/tcp/tcp_stream_tracker.cc b/src/stream/tcp/tcp_stream_tracker.cc index a0f0e12a9..554f3d997 100644 --- a/src/stream/tcp/tcp_stream_tracker.cc +++ b/src/stream/tcp/tcp_stream_tracker.cc @@ -368,27 +368,20 @@ void TcpStreamTracker::update_flush_policy(StreamSplitter* splitter) oaitw_reassembler = reassembler; } - reassembler = tcp_ignore_reassembler; - reassembler->init(!client_tracker, splitter); + reassembler = TcpReassemblerIgnore::get_instance(!client_tracker); } else if ( flush_policy == STREAM_FLPOLICY_ON_DATA ) { - if ( reassembler ) - { - // update from IDS -> IPS is not supported - assert( reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_ACK ); - } + // update from IDS -> IPS is not supported + assert( !reassembler or reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_ACK ); reassembler = new TcpReassemblerIps(*this, seglist); reassembler->init(!client_tracker, splitter); } else { - if ( reassembler ) - { - // update from IPS -> IDS is not supported - assert( reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_DATA ); - } + // update from IPS -> IDS is not supported + assert( !reassembler or reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_DATA ); reassembler = new TcpReassemblerIds(*this, seglist); reassembler->init(!client_tracker, splitter); @@ -999,6 +992,8 @@ void TcpStreamTracker::thread_init() { assert(!hpq); hpq = new HeldPacketQueue(); + + TcpReassembler::tinit(); } void TcpStreamTracker::thread_term() @@ -1006,4 +1001,6 @@ void TcpStreamTracker::thread_term() assert(hpq->empty()); delete hpq; hpq = nullptr; + + TcpReassembler::tterm(); }