]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Pull request #4525: stream_tcp: refactor tcp reasseabler class structure and init...
authorDavis McPherson -X (davmcphe - XORIANT CORPORATION at Cisco) <davmcphe@cisco.com>
Fri, 13 Dec 2024 20:39:49 +0000 (20:39 +0000)
committerSteven Baigal (sbaigal) <sbaigal@cisco.com>
Fri, 13 Dec 2024 20:39:49 +0000 (20:39 +0000)
Merge in SNORT/snort3 from ~DAVMCPHE/snort3:tcp_reassembly_ignore_tsan_fix to master

Squashed commit of the following:

commit 25a35b3442f010d4d242b27e18fde5d9e2ac61f3
Author: davis mcpherson <davmcphe@cisco.com>
Date:   Tue Nov 19 14:49:46 2024 -0500

    stream_tcp: refactor tcp reasseabler class structure and init to avoid thread data race scenarios

src/stream/stream.cc
src/stream/tcp/ips_stream_reassemble.cc
src/stream/tcp/tcp_reassembler.cc
src/stream/tcp/tcp_reassembler.h
src/stream/tcp/tcp_reassembler_ids.h
src/stream/tcp/tcp_reassembler_ips.h
src/stream/tcp/tcp_stream_tracker.cc

index e45358efbf9361d61b1fd715ae696913ad712df7..3b9a808a6780201aa8026601054944034ebdb24c 100644 (file)
@@ -874,6 +874,7 @@ TEST_CASE("Stream API", "[stream_api][stream]")
 {
     // initialization code here
     TcpNormalizerFactory::initialize();
+    TcpReassembler::tinit();
     Flow* flow = new Flow;
     InspectionPolicy ins;
     set_inspection_policy(&ins);
@@ -989,6 +990,7 @@ TEST_CASE("Stream API", "[stream_api][stream]")
 
     delete flow;
     TcpNormalizerFactory::term();
+    TcpReassembler::tterm();
 }
 
 #endif
index 866857527d014fd1f0af2b6b0d90f34170052c47..3cb7c9543568219a9f307a25b39eb4dbfc7766a1 100644 (file)
@@ -276,6 +276,7 @@ TEST_CASE("IPS Stream Reassemble", "[ips_stream_reassemble][stream_tcp]")
     REQUIRE( reassembler != nullptr );
 
     TcpNormalizerFactory::initialize();
+    TcpReassembler::tinit();
 
     Flow* flow = new Flow;
     Packet* pkt = get_syn_packet(flow);
@@ -310,9 +311,10 @@ TEST_CASE("IPS Stream Reassemble", "[ips_stream_reassemble][stream_tcp]")
     }
 #endif
     release_packet(pkt);
-    TcpNormalizerFactory::term();
     delete flow;
     ips_stream_reassemble->mod_dtor(reassembler);
+    TcpNormalizerFactory::term();
+    TcpReassembler::tterm();
 }
 
 #endif
index 70c8f879acc10631ec778d8a7b78e6062fcd8a70..ffaaae66e726225c7ace03be5dcda53e060f9fa1 100644 (file)
@@ -43,7 +43,7 @@
 
 using namespace snort;
 
-void TcpReassembler::init(bool server, StreamSplitter* ss)
+void TcpReassemblerBase::init(bool server, StreamSplitter* ss)
 {
     splitter = ss;
     paf.paf_setup(ss);
@@ -66,20 +66,20 @@ void TcpReassembler::init(bool server, StreamSplitter* ss)
     }
 }
 
-bool TcpReassembler::fin_no_gap(const TcpSegmentNode& tsn)
+bool TcpReassemblerBase::fin_no_gap(const TcpSegmentNode& tsn)
 {
     return tracker.fin_seq_status >= FIN_WITH_SEQ_SEEN
         and SEQ_GEQ(tsn.next_seq(), tracker.get_fin_i_seq());
 }
 
-bool TcpReassembler::fin_acked_no_gap(const TcpSegmentNode& tsn)
+bool TcpReassemblerBase::fin_acked_no_gap(const TcpSegmentNode& tsn)
 {
     return tracker.fin_seq_status >= FIN_WITH_SEQ_ACKED
         and SEQ_GEQ(tsn.next_seq(), tracker.get_fin_i_seq());
 }
 
 // If we are skipping seglist hole, update tsn so that we can purge
-void TcpReassembler::update_skipped_bytes(uint32_t remaining_bytes)
+void TcpReassemblerBase::update_skipped_bytes(uint32_t remaining_bytes)
 {
     TcpSegmentNode* tsn;
 
@@ -98,7 +98,7 @@ void TcpReassembler::update_skipped_bytes(uint32_t remaining_bytes)
     }
 }
 
-void TcpReassembler::purge_to_seq(uint32_t flush_seq)
+void TcpReassemblerBase::purge_to_seq(uint32_t flush_seq)
 {
     seglist.purge_flushed_segments(flush_seq);
 
@@ -116,7 +116,7 @@ void TcpReassembler::purge_to_seq(uint32_t flush_seq)
 // part of a segment
 // * FIXIT-L need flag to mark any reassembled packets that have a gap
 //   (if we reassemble such)
-void TcpReassembler::purge_flushed_ackd()
+void TcpReassemblerBase::purge_flushed_ackd()
 {
     if ( !seglist.head )
         return;
@@ -138,7 +138,7 @@ void TcpReassembler::purge_flushed_ackd()
         purge_to_seq(seq);
 }
 
-void TcpReassembler::show_rebuilt_packet(Packet* pkt)
+void TcpReassemblerBase::show_rebuilt_packet(Packet* pkt)
 {
     if ( seglist.session->tcp_config->flags & STREAM_CONFIG_SHOW_PACKETS )
     {
@@ -149,7 +149,7 @@ void TcpReassembler::show_rebuilt_packet(Packet* pkt)
     }
 }
 
-int TcpReassembler::flush_data_segments(uint32_t flush_len, Packet* pdu)
+int TcpReassemblerBase::flush_data_segments(uint32_t flush_len, Packet* pdu)
 {
     uint32_t flags = PKT_PDU_HEAD;
 
@@ -213,7 +213,7 @@ int TcpReassembler::flush_data_segments(uint32_t flush_len, Packet* pdu)
 }
 
 // FIXIT-L consolidate encode format, update, and this into new function?
-void TcpReassembler::prep_pdu(Flow* flow, Packet* p, uint32_t pkt_flags, Packet* pdu)
+void TcpReassemblerBase::prep_pdu(Flow* flow, Packet* p, uint32_t pkt_flags, Packet* pdu)
 {
     pdu->ptrs.set_pkt_type(PktType::PDU);
     pdu->proto_bits |= PROTO_BIT__TCP;
@@ -260,7 +260,7 @@ void TcpReassembler::prep_pdu(Flow* flow, Packet* p, uint32_t pkt_flags, Packet*
     }
 }
 
-Packet* TcpReassembler::initialize_pdu(Packet* p, uint32_t pkt_flags, struct timeval tv)
+Packet* TcpReassemblerBase::initialize_pdu(Packet* p, uint32_t pkt_flags, struct timeval tv)
 {
     // partial flushes already set the pdu for http_inspect splitter processing
     Packet* pdu = p->was_set() ? p : DetectionEngine::set_next_packet(p);
@@ -290,7 +290,7 @@ Packet* TcpReassembler::initialize_pdu(Packet* p, uint32_t pkt_flags, struct tim
 }
 
 // flush a seglist up to the given point, generate a pseudopacket, and fire it thru the system.
-int TcpReassembler::flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags)
+int TcpReassemblerBase::flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags)
 {
     assert( p && seglist.cur_rseg);
 
@@ -344,7 +344,7 @@ int TcpReassembler::flush_to_seq(uint32_t bytes, Packet* p, uint32_t pkt_flags)
     return flushed_bytes;
 }
 
-int TcpReassembler::do_zero_byte_flush(Packet* p, uint32_t pkt_flags)
+int TcpReassemblerBase::do_zero_byte_flush(Packet* p, uint32_t pkt_flags)
 {
     unsigned bytes_copied = 0;
 
@@ -371,7 +371,7 @@ int TcpReassembler::do_zero_byte_flush(Packet* p, uint32_t pkt_flags)
 // get the footprint for the current seglist, the difference
 // between our base sequence and the last ack'd sequence we received
 
-uint32_t TcpReassembler::get_q_footprint()
+uint32_t TcpReassemblerBase::get_q_footprint()
 {
     int32_t footprint = 0;
     int32_t sequenced = 0;
@@ -389,7 +389,7 @@ uint32_t TcpReassembler::get_q_footprint()
 // boosted by tracking sequenced bytes as seglist is updated
 // to avoid the while loop, etc. below.
 
-uint32_t TcpReassembler::get_q_sequenced()
+uint32_t TcpReassemblerBase::get_q_sequenced()
 {
     TcpSegmentNode* tsn = seglist.cur_rseg;
 
@@ -423,7 +423,7 @@ uint32_t TcpReassembler::get_q_sequenced()
     return len;
 }
 
-bool TcpReassembler::is_q_sequenced()
+bool TcpReassemblerBase::is_q_sequenced()
 {
     TcpSegmentNode* tsn = seglist.cur_rseg;
 
@@ -449,7 +449,7 @@ bool TcpReassembler::is_q_sequenced()
     return (tsn->unscanned() != 0);
 }
 
-void TcpReassembler::final_flush(Packet* p, uint32_t dir)
+void TcpReassemblerBase::final_flush(Packet* p, uint32_t dir)
 {
     tracker.set_tf_flags(TF_FORCE_FLUSH);
 
@@ -504,7 +504,7 @@ static Packet* get_packet(Flow* flow, uint32_t flags, bool c2s)
     return p;
 }
 
-bool TcpReassembler::splitter_finish(snort::Flow* flow)
+bool TcpReassemblerBase::splitter_finish(snort::Flow* flow)
 {
     if (!splitter)
         return true;
@@ -519,7 +519,7 @@ bool TcpReassembler::splitter_finish(snort::Flow* flow)
     return false;
 }
 
-void TcpReassembler::finish_and_final_flush(Flow* flow, bool clear, Packet* p)
+void TcpReassemblerBase::finish_and_final_flush(Flow* flow, bool clear, Packet* p)
 {
     bool pending = clear and paf.paf_initialized() and splitter_finish(flow);
 
@@ -528,7 +528,7 @@ void TcpReassembler::finish_and_final_flush(Flow* flow, bool clear, Packet* p)
 }
 
 // Call this only from outside reassembly.
-void TcpReassembler::flush_queued_segments(Flow* flow, bool clear, Packet* p)
+void TcpReassemblerBase::flush_queued_segments(Flow* flow, bool clear, Packet* p)
 {
     if ( p )
     {
@@ -551,7 +551,7 @@ void TcpReassembler::flush_queued_segments(Flow* flow, bool clear, Packet* p)
 }
 
 
-void TcpReassembler::check_first_segment_hole()
+void TcpReassemblerBase::check_first_segment_hole()
 {
     if ( SEQ_LT(seglist.seglist_base_seq, seglist.head->start_seq()) )
     {
@@ -561,14 +561,14 @@ void TcpReassembler::check_first_segment_hole()
     }
 }
 
-uint32_t TcpReassembler::perform_partial_flush(Flow* flow, Packet*& p)
+uint32_t TcpReassemblerBase::perform_partial_flush(Flow* flow, Packet*& p)
 {
     p = get_packet(flow, packet_dir, server_side);
     return perform_partial_flush(p);
 }
 
 // No error checking here, so the caller must ensure that p, p->flow are not null.
-uint32_t TcpReassembler::perform_partial_flush(Packet* p)
+uint32_t TcpReassemblerBase::perform_partial_flush(Packet* p)
 {
     uint32_t flushed = 0;
     if ( splitter->init_partial_flush(p->flow) )
@@ -590,7 +590,7 @@ uint32_t TcpReassembler::perform_partial_flush(Packet* p)
 // we are on a FIN, the data has been scanned, it has no gaps,
 // but somehow we are waiting for more data - do final flush here
 // FIXIT-M this convoluted expression needs some refactoring to simplify
-bool TcpReassembler::final_flush_on_fin(int32_t flush_amt, Packet *p, FinSeqNumStatus fin_status)
+bool TcpReassemblerBase::final_flush_on_fin(int32_t flush_amt, Packet *p, FinSeqNumStatus fin_status)
 {
     return tracker.fin_seq_status >= fin_status
         && -1 <= flush_amt && flush_amt <= 0
@@ -598,7 +598,7 @@ bool TcpReassembler::final_flush_on_fin(int32_t flush_amt, Packet *p, FinSeqNumS
         && !p->flow->searching_for_service();
 }
 
-bool TcpReassembler::asymmetric_flow_flushed(uint32_t flushed, snort::Packet *p)
+bool TcpReassemblerBase::asymmetric_flow_flushed(uint32_t flushed, snort::Packet *p)
 {
     bool asymmetric = flushed && seglist.seg_count && !p->flow->two_way_traffic() && !p->ptrs.tcph->is_syn();
     if ( asymmetric )
@@ -611,27 +611,48 @@ bool TcpReassembler::asymmetric_flow_flushed(uint32_t flushed, snort::Packet *p)
     return asymmetric;
 }
 
-// define a tracker to assign to the Ignore Flush Policy reassembler and
-// create an instance of the Ignore reassembler.  When reassembly is ignored
-// all operations are no-ops so a single instance is created and shared by
-// all TCP sessions with a flush policy of ignore.  Minimal initialization
-// is performed as no state or processing is done by this reassembler.
-// The ignore tracker reassembler member variable is set to the ignore
-// reassembler as it is referenced in the dtor of the tracker.  The ignore
-// tracker is never assigned or used by a Flow, it's only purpose is to
-// be passed into the ignore reassembler (along with the tracker's seglist)
-TcpStreamTracker ignore_tracker(false);
-TcpReassemblerIgnore* tcp_ignore_reassembler = new TcpReassemblerIgnore();
-
-TcpReassemblerIgnore::TcpReassemblerIgnore()
-    : TcpReassembler(ignore_tracker, ignore_tracker.seglist)
+// Allocate an instance of the ignore reassembler for the client side trackers
+// and the server side trackers for each packet thread. The ignore reassemblers
+// do not maintain any state so are shared by all TCP sessions of a packet thread.
+// The server and client instances are created during thread initialization and
+// deleted at thread termination.
+static THREAD_LOCAL TcpReassemblerIgnore* ignore_reassembler_server = nullptr;
+static THREAD_LOCAL TcpReassemblerIgnore* ignore_reassembler_client = nullptr;
+
+void TcpReassembler::tinit()
+{
+    ignore_reassembler_server = new TcpReassemblerIgnore(true);
+    ignore_reassembler_client = new TcpReassemblerIgnore(false);
+}
+
+void TcpReassembler::tterm()
+{
+    delete ignore_reassembler_server;
+    ignore_reassembler_server = nullptr;
+
+    delete ignore_reassembler_client;
+    ignore_reassembler_client = nullptr;
+}
+
+TcpReassemblerIgnore::TcpReassemblerIgnore(bool server)
 {
-    tracker.reassembler = this;
+    server_side = server;
+    packet_dir = server ? PKT_FROM_CLIENT : PKT_FROM_SERVER;
 }
 
+// FIXIT-M The caller should allocate the Packet when one is needed for a partial flush
 uint32_t TcpReassemblerIgnore::perform_partial_flush(snort::Flow* flow, snort::Packet*& p)
 {
     p = get_packet(flow, packet_dir, server_side);
     return 0;
 }
 
+TcpReassemblerIgnore* TcpReassemblerIgnore::get_instance(bool server_tracker)
+{
+    if ( server_tracker )
+        return ignore_reassembler_server;
+    else
+        return ignore_reassembler_client;
+}
+
+
index bb85b128b80c9042e4a1100f5377ae7399ef0676..1999632f9368ad2a8ffa31be25edc902fb4d5b0e 100644 (file)
@@ -52,31 +52,123 @@ public:
         FINAL_FLUSH_OK = -1
     };
 
-    TcpReassembler(TcpStreamTracker& trk, TcpReassemblySegments& seglist)
-        : tracker(trk), seglist(seglist)
+    TcpReassembler()
     { }
 
     virtual ~TcpReassembler()
     { }
 
-    virtual void init(bool server, snort::StreamSplitter* ss);
+    virtual void init(bool server, snort::StreamSplitter* ss) = 0;
     virtual int eval_flush_policy_on_ack(snort::Packet*) = 0;
     virtual int eval_flush_policy_on_data(snort::Packet*) = 0;
     virtual int eval_asymmetric_flush(snort::Packet*) = 0;
     virtual int flush_stream(snort::Packet*, uint32_t dir, bool final_flush = false) = 0;
-    virtual void flush_queued_segments(snort::Flow* flow, bool clear, snort::Packet* = nullptr);
-    virtual void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet*);
-    virtual uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&);
-    virtual void purge_flushed_ackd();
+    virtual void flush_queued_segments(snort::Flow* flow, bool clear, snort::Packet* = nullptr) = 0;
+    virtual void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet*) = 0;
+    virtual uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) = 0;
+    virtual void purge_flushed_ackd() = 0;
     virtual FlushPolicy get_flush_policy() const = 0;
+    virtual void release_splitter() = 0;
+    virtual bool is_splitter_paf() const = 0;
+    virtual bool segment_already_scanned(uint32_t seq) = 0;
+    virtual void initialize_paf() = 0;
+    virtual void reset_paf() = 0;
+    virtual void clear_paf() = 0;
+
+    // static methods for TcpReassembler per thread initialization and termination
+    static void tinit();
+    static void tterm();
+
+protected:
+    uint8_t packet_dir = 0;
+    bool server_side = true;
+};
+
+class TcpReassemblerIgnore : public TcpReassembler
+{
+public:
+    TcpReassemblerIgnore(bool server);
+
+    void init(bool, snort::StreamSplitter*) override
+    { }
+
+    int eval_flush_policy_on_ack(snort::Packet*) override
+    { return 0; }
+
+    int eval_flush_policy_on_data(snort::Packet*) override
+    { return 0; }
+
+    int eval_asymmetric_flush(snort::Packet*) override
+    { return 0; }
+
+    int flush_stream(snort::Packet*, uint32_t, bool) override
+    { return 0; }
+
+    void flush_queued_segments(snort::Flow*, bool, snort::Packet*) override
+    { }
+
+    void finish_and_final_flush(snort::Flow*, bool, snort::Packet*) override
+    { }
+
+    uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) override;
+
+    void purge_flushed_ackd() override
+    { }
+
+    void release_splitter() override
+    { }
+
+    bool is_splitter_paf() const override
+    { return false; }
+
+    bool segment_already_scanned(uint32_t) override
+    { return false; }
+
+    void reset_paf() override
+    { }
+
+    void clear_paf() override
+    { }
+
+    void initialize_paf() override
+    { }
+
+    FlushPolicy get_flush_policy() const override
+    { return STREAM_FLPOLICY_IGNORE; }
+
+    static TcpReassemblerIgnore* get_instance(bool server_side);
+};
+
+class  TcpReassemblerBase : public TcpReassembler
+{
+public:
 
-    void release_splitter()
+    // OK means FIN seen, data scanned, flush point not found, no gaps
+    enum ScanStatus {
+        FINAL_FLUSH_HOLD = -2,
+        FINAL_FLUSH_OK = -1
+    };
+
+    TcpReassemblerBase(TcpStreamTracker& trk, TcpReassemblySegments& seglist)
+        : tracker(trk), seglist(seglist)
+    { }
+
+    virtual ~TcpReassemblerBase() override
+    { }
+
+    virtual void init(bool server, snort::StreamSplitter* ss) override;
+    virtual void flush_queued_segments(snort::Flow* flow, bool clear, snort::Packet* = nullptr) override;
+    virtual void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet*) override;
+    virtual uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) override;
+    virtual void purge_flushed_ackd() override;
+
+    void release_splitter() override
     { splitter = nullptr; }
 
-    bool is_splitter_paf() const
+    bool is_splitter_paf() const override
     { return splitter && splitter->is_paf(); }
 
-    bool segment_already_scanned(uint32_t seq)
+    bool segment_already_scanned(uint32_t seq) override
     {
         if ( paf.paf_initialized() and SEQ_GT(paf.pos, seq) )
             return true;
@@ -84,7 +176,7 @@ public:
             return false;
     }
 
-    virtual void initialize_paf()
+    virtual void initialize_paf() override
     {
         assert( get_flush_policy() != STREAM_FLPOLICY_IGNORE );
 
@@ -96,10 +188,10 @@ public:
             paf.paf_initialize(seglist.head->start_seq());
     }
 
-    void reset_paf()
+    void reset_paf() override
     { paf.paf_reset(); }
 
-    void clear_paf()
+    void clear_paf() override
     { paf.paf_clear(); }
 
 protected:
@@ -131,52 +223,8 @@ protected:
 
     snort::Packet* last_pdu = nullptr;
     uint8_t ignore_dir = 0;
-    uint8_t packet_dir = 0;
-    bool server_side = true;
     bool splitter_finish_flag = false;
 };
 
-class TcpReassemblerIgnore : public TcpReassembler
-{
-public:
-    TcpReassemblerIgnore();
-    ~TcpReassemblerIgnore() override
-    { }
-
-    void init(bool, snort::StreamSplitter*) override
-    { }
-
-    int eval_flush_policy_on_ack(snort::Packet*) override
-    { return 0; }
-
-    int eval_flush_policy_on_data(snort::Packet*) override
-    { return 0; }
-
-    int eval_asymmetric_flush(snort::Packet*) override
-    { return 0; }
-
-    int flush_stream(snort::Packet*, uint32_t, bool) override
-    { return 0; }
-
-    void flush_queued_segments(snort::Flow*, bool, snort::Packet*) override
-    { }
-
-    void finish_and_final_flush(snort::Flow*, bool, snort::Packet*) override
-    { }
-
-    uint32_t perform_partial_flush(snort::Flow*, snort::Packet*&) override;
-
-    void purge_flushed_ackd() override
-    { }
-
-    virtual void initialize_paf() override
-    { }
-
-    FlushPolicy get_flush_policy() const override
-    { return STREAM_FLPOLICY_IGNORE; }
-};
-
-extern TcpReassemblerIgnore* tcp_ignore_reassembler;
-
 #endif
 
index 099d5c33bb708621c1a96106a7fa7e358098c690..803291d1991e5376cc098f95751557c46e4f0ca4 100644 (file)
 class TcpSegmentDescriptor;
 class TcpSegmentNode;
 
-class TcpReassemblerIds : public TcpReassembler
+class TcpReassemblerIds : public TcpReassemblerBase
 {
 public:
     TcpReassemblerIds(TcpStreamTracker& trk, TcpReassemblySegments& sl)
-        : TcpReassembler(trk, sl)
+        : TcpReassemblerBase(trk, sl)
     { }
 
     ~TcpReassemblerIds() override
index 3a81efd1ee10138ddd013d123170218cd6f4a5e9..39914eb5e01730681afed5c5c2533e201cf6793e 100644 (file)
 class TcpSegmentDescriptor;
 class TcpSegmentNode;
 
-class TcpReassemblerIps : public TcpReassembler
+class TcpReassemblerIps : public TcpReassemblerBase
 {
 public:
     TcpReassemblerIps(TcpStreamTracker& trk, TcpReassemblySegments& sl)
-        : TcpReassembler(trk, sl)
+        : TcpReassemblerBase(trk, sl)
     { }
 
     ~TcpReassemblerIps() override
index a0f0e12a9c6f7f7c194992f7ae99847ff76f359e..554f3d997b21cc9c8b107f14f1e6fa3e063a6beb 100644 (file)
@@ -368,27 +368,20 @@ void TcpStreamTracker::update_flush_policy(StreamSplitter* splitter)
             oaitw_reassembler = reassembler;
         }
 
-        reassembler = tcp_ignore_reassembler;
-        reassembler->init(!client_tracker, splitter);
+        reassembler = TcpReassemblerIgnore::get_instance(!client_tracker);
     }
     else if ( flush_policy == STREAM_FLPOLICY_ON_DATA )
     {
-        if ( reassembler )
-        {
-            // update from IDS -> IPS is not supported
-            assert( reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_ACK );
-        }
+        // update from IDS -> IPS is not supported
+        assert( !reassembler or reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_ACK );
 
         reassembler = new TcpReassemblerIps(*this, seglist);
         reassembler->init(!client_tracker, splitter);
     }
     else
     {
-        if ( reassembler )
-        {
-            // update from IPS -> IDS is not supported
-            assert( reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_DATA );
-        }
+        // update from IPS -> IDS is not supported
+        assert( !reassembler or reassembler->get_flush_policy() != STREAM_FLPOLICY_ON_DATA );
 
         reassembler = new TcpReassemblerIds(*this, seglist);
         reassembler->init(!client_tracker, splitter);
@@ -999,6 +992,8 @@ void TcpStreamTracker::thread_init()
 {
     assert(!hpq);
     hpq = new HeldPacketQueue();
+
+    TcpReassembler::tinit();
 }
 
 void TcpStreamTracker::thread_term()
@@ -1006,4 +1001,6 @@ void TcpStreamTracker::thread_term()
     assert(hpq->empty());
     delete hpq;
     hpq = nullptr;
+
+    TcpReassembler::tterm();
 }