From: Masud Hasan (mashasan) Date: Wed, 18 May 2022 03:30:31 +0000 (+0000) Subject: Pull request #3417: stream: refactor flush_queued_segments X-Git-Tag: 3.1.30.0~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a5f9319a0dcaf20c4336a7ddf9efa816db0d9bb6;p=thirdparty%2Fsnort3.git Pull request #3417: stream: refactor flush_queued_segments Merge in SNORT/snort3 from ~SMINUT/snort3:russ_flush to master Squashed commit of the following: commit 2dc7bba89aaa9dabf74b8ab930aadc948a02d54c Author: Silviu Minut Date: Tue May 17 08:02:26 2022 -0400 stream_tcp: add null check for get_current_wire_packet() in dce too commit d70012d0605e1949b4f300300af33ac1dc2d86f0 Author: Silviu Minut Date: Tue May 10 18:46:22 2022 -0400 stream_tcp: provide a context and a wire packet where needed, when calling into reassembly from outside regular processing (handle_timeouts) commit 3828703345b5dd3a0c213481e02938c0425f6c14 Author: Silviu Minut Date: Fri Apr 29 17:36:04 2022 -0400 stream: refactor flush_queued_segments --- diff --git a/src/detection/detection_engine.cc b/src/detection/detection_engine.cc index dd1a80b56..c0cfb6418 100644 --- a/src/detection/detection_engine.cc +++ b/src/detection/detection_engine.cc @@ -41,6 +41,7 @@ #include "profiler/profiler_defs.h" #include "protocols/packet.h" #include "stream/stream.h" +#include "time/packet_time.h" #include "utils/stats.h" #include "context_switcher.h" @@ -131,10 +132,18 @@ SF_EVENTQ* DetectionEngine::get_event_queue() { return Analyzer::get_switcher()->get_context()->equeue; } Packet* DetectionEngine::get_current_packet() -{ return Analyzer::get_switcher()->get_context()->packet; } +{ + const IpsContext* c = Analyzer::get_switcher()->get_context(); + assert(c); + return c->packet; +} Packet* DetectionEngine::get_current_wire_packet() -{ return Analyzer::get_switcher()->get_context()->wire_packet; } +{ + const IpsContext* c = Analyzer::get_switcher()->get_context(); + assert(c); + return c->wire_packet; +} void DetectionEngine::set_encode_packet(Packet* p) { Analyzer::get_switcher()->get_context()->encode_packet = p; } @@ -145,7 +154,7 @@ Packet* DetectionEngine::get_encode_packet() // we need to stay in the current context until rebuild is successful // any events while rebuilding will be logged against the current packet // however, rebuild is always in the next context, not current. -Packet* DetectionEngine::set_next_packet(Packet* parent, Flow* flow) +Packet* DetectionEngine::set_next_packet(const Packet* parent, Flow* flow) { static THREAD_LOCAL Active shutdown_active; static THREAD_LOCAL ActiveAction* shutdown_action = nullptr; @@ -170,6 +179,7 @@ Packet* DetectionEngine::set_next_packet(Packet* parent, Flow* flow) c->wire_packet = nullptr; } + packet_gettimeofday(&c->pkth->ts); p->pkth = c->pkth; p->data = c->buf; p->pkt = c->buf; @@ -204,6 +214,8 @@ Packet* DetectionEngine::set_next_packet(Packet* parent, Flow* flow) p->reset(); + p->packet_flags |= PKT_WAS_SET; + if ( parent ) p->packet_flags |= PKT_HAS_PARENT; diff --git a/src/detection/detection_engine.h b/src/detection/detection_engine.h index 205af0886..5b62a0fb0 100644 --- a/src/detection/detection_engine.h +++ b/src/detection/detection_engine.h @@ -56,7 +56,7 @@ public: static Packet* get_current_packet(); static Packet* get_current_wire_packet(); - static Packet* set_next_packet(Packet* parent = nullptr, Flow* flow = nullptr); + static Packet* set_next_packet(const Packet* parent = nullptr, Flow* flow = nullptr); static uint8_t* get_next_buffer(unsigned& max); static bool offload(Packet*); diff --git a/src/flow/flow.cc b/src/flow/flow.cc index 62683cda9..6f2e8f823 100644 --- a/src/flow/flow.cc +++ b/src/flow/flow.cc @@ -23,12 +23,14 @@ #include "flow.h" +#include "detection/context_switcher.h" #include "detection/detection_engine.h" #include "flow/flow_key.h" #include "flow/ha.h" #include "flow/session.h" #include "framework/data_bus.h" #include "helpers/bitop.h" +#include "main/analyzer.h" #include "memory/memory_cap.h" #include "protocols/packet.h" #include "protocols/tcp.h" @@ -148,16 +150,18 @@ void Flow::flush(bool do_cleanup) { DetectionEngine::onload(this); - if ( do_cleanup ) + if ( !do_cleanup ) + session->clear(); + + else if ( Analyzer::get_switcher()->get_context() ) + session->flush(); + + else { DetectionEngine::set_next_packet(); DetectionEngine de; - session->flush(); - de.get_context()->clear_callbacks(); } - else - session->clear(); } if ( was_blocked() ) @@ -170,17 +174,18 @@ void Flow::reset(bool do_cleanup) { DetectionEngine::onload(this); - if ( do_cleanup ) + if ( !do_cleanup ) + session->clear(); + + else if ( Analyzer::get_switcher()->get_context() ) + session->cleanup(); + + else { DetectionEngine::set_next_packet(); DetectionEngine de; - session->cleanup(); - - de.get_context()->clear_callbacks(); } - else - session->clear(); } free_flow_data(); diff --git a/src/flow/flow_control.cc b/src/flow/flow_control.cc index a8936fb64..4cb31f4ab 100644 --- a/src/flow/flow_control.cc +++ b/src/flow/flow_control.cc @@ -50,8 +50,6 @@ FlowControl::FlowControl(const FlowCacheConfig& fc) FlowControl::~FlowControl() { - DetectionEngine de; - delete cache; snort_free(mem); delete exp_cache; diff --git a/src/flow/test/flow_test.cc b/src/flow/test/flow_test.cc index 77fe3c8e5..72a84aab0 100644 --- a/src/flow/test/flow_test.cc +++ b/src/flow/test/flow_test.cc @@ -23,12 +23,14 @@ #include "config.h" #endif +#include "detection/context_switcher.h" #include "detection/detection_engine.h" #include "flow/flow.h" #include "flow/flow_stash.h" #include "flow/ha.h" #include "framework/inspector.h" #include "framework/data_bus.h" +#include "main/analyzer.h" #include "main/policy.h" #include "main/snort_config.h" #include "protocols/ip.h" @@ -74,8 +76,10 @@ void set_ips_policy(IpsPolicy*) { } unsigned SnortConfig::get_thread_reload_id() { return 0; } } -Packet* DetectionEngine::set_next_packet(Packet*, Flow*) { return nullptr; } +Packet* DetectionEngine::set_next_packet(const Packet*, Flow*) { return nullptr; } +ContextSwitcher* Analyzer::get_switcher() { return nullptr; } +snort::IpsContext* ContextSwitcher::get_context() const { return nullptr; } IpsContext* DetectionEngine::get_context() { return nullptr; } DetectionEngine::DetectionEngine() = default; diff --git a/src/main/analyzer.cc b/src/main/analyzer.cc index acfa36f00..a04b44d10 100644 --- a/src/main/analyzer.cc +++ b/src/main/analyzer.cc @@ -408,6 +408,9 @@ void Analyzer::process_daq_pkt_msg(DAQ_Msg_h msg, bool retry) switcher->stop(); } + // Beyond this point, we don't have an active context, but e.g. calls to + // get_current_packet() or get_current_wire_packet() require a context. + // We must ensure that a context is available when one is needed. Stream::handle_timeouts(false); HighAvailabilityManager::process_receive(); } diff --git a/src/service_inspectors/dce_rpc/dce_smb_utils.cc b/src/service_inspectors/dce_rpc/dce_smb_utils.cc index 521f2d5db..c08e52f03 100644 --- a/src/service_inspectors/dce_rpc/dce_smb_utils.cc +++ b/src/service_inspectors/dce_rpc/dce_smb_utils.cc @@ -1351,7 +1351,7 @@ static void DCE2_SmbInjectDeletePdu(DCE2_SmbFileTracker* ftracker) Packet* inject_pkt = DetectionEngine::get_current_wire_packet(); Packet* p = DetectionEngine::get_current_packet(); - if ( inject_pkt->flow != p->flow ) + if ( !inject_pkt || inject_pkt->flow != p->flow ) return; NbssHdr* nb_hdr = (NbssHdr*)dce2_smb_delete_pdu; diff --git a/src/stream/tcp/tcp_reassembler.cc b/src/stream/tcp/tcp_reassembler.cc index ab8211881..0c9cb50e6 100644 --- a/src/stream/tcp/tcp_reassembler.cc +++ b/src/stream/tcp/tcp_reassembler.cc @@ -766,7 +766,7 @@ static Packet* get_packet(Flow* flow, uint32_t flags, bool c2s) p->ptrs.set_pkt_type(PktType::PDU); p->proto_bits |= PROTO_BIT__TCP; p->flow = flow; - p->packet_flags = flags; + p->packet_flags |= flags; if ( c2s ) { @@ -790,7 +790,7 @@ static Packet* get_packet(Flow* flow, uint32_t flags, bool c2s) return p; } -void TcpReassembler::flush_queued_segments( +void TcpReassembler::finish_and_final_flush( TcpReassemblerState& trs, Flow* flow, bool clear, Packet* p) { bool pending = clear and paf_initialized(&trs.paf_state) @@ -800,19 +800,28 @@ void TcpReassembler::flush_queued_segments( final_flush(trs, p, trs.packet_dir); } +// Call this only from outside reassembly. void TcpReassembler::flush_queued_segments( TcpReassemblerState& trs, Flow* flow, bool clear, const Packet* p) { - Packet* pdu = get_packet(flow, trs.packet_dir, trs.server_side); - if ( p ) - flush_queued_segments(trs, flow, clear, pdu); - + { + finish_and_final_flush(trs, flow, clear, const_cast(p)); + } else { - // if we weren't given a packet, we must establish a context - DetectionEngine de; - flush_queued_segments(trs, flow, clear, pdu); + Packet* pdu = get_packet(flow, trs.packet_dir, trs.server_side); + + bool pending = clear and paf_initialized(&trs.paf_state); + + if ( pending ) + { + DetectionEngine de; + pending = trs.tracker->splitter_finish(flow); + } + + if ( pending and !(flow->ssn_state.ignore_direction & trs.ignore_dir) ) + final_flush(trs, pdu, trs.packet_dir); } } @@ -1099,7 +1108,7 @@ int TcpReassembler::flush_on_data_policy(TcpReassemblerState& trs, Packet* p) { // we are on a FIN, the data has been scanned, it has no gaps, // but somehow we are waiting for more data - do final flush here - flush_queued_segments(trs, p->flow, true, p ); + finish_and_final_flush(trs, p->flow, true, p); } } break; @@ -1210,7 +1219,7 @@ int TcpReassembler::flush_on_ack_policy(TcpReassemblerState& trs, Packet* p) { // we are acknowledging a FIN, the data has been scanned, it has no gaps, // but somehow we are waiting for more data - do final flush here - flush_queued_segments(trs, p->flow, true, p); + finish_and_final_flush(trs, p->flow, true, p); } } break; @@ -1366,21 +1375,10 @@ void TcpReassembler::queue_packet_for_reassembly( insert_segment_in_seglist(trs, tsd); } -uint32_t TcpReassembler::perform_partial_flush(TcpReassemblerState& trs, Flow* flow) +uint32_t TcpReassembler::perform_partial_flush(TcpReassemblerState& trs, Flow* flow, Packet*& p) { - Packet* p = get_packet(flow, (trs.packet_dir|PKT_WAS_SET), trs.server_side); - - uint32_t result = perform_partial_flush(trs, p); - - // If the held_packet hasn't been released by perform_partial_flush(), - // call finalize directly. - if ( trs.tracker->is_holding_packet() ) - { - trs.tracker->finalize_held_packet(p); - tcpStats.held_packet_purges++; - } - - return result; + p = get_packet(flow, trs.packet_dir, trs.server_side); + return perform_partial_flush(trs, p); } // No error checking here, so the caller must ensure that p, p->flow and context diff --git a/src/stream/tcp/tcp_reassembler.h b/src/stream/tcp/tcp_reassembler.h index 83ce6b17b..5c7770528 100644 --- a/src/stream/tcp/tcp_reassembler.h +++ b/src/stream/tcp/tcp_reassembler.h @@ -42,6 +42,8 @@ public: TcpReassemblerState&, snort::Packet* p, uint32_t dir, bool final_flush = false); virtual void flush_queued_segments( TcpReassemblerState&, snort::Flow* flow, bool clear, const snort::Packet* = nullptr); + void finish_and_final_flush( + TcpReassemblerState&, snort::Flow* flow, bool clear, snort::Packet*); virtual bool is_segment_pending_flush(TcpReassemblerState&); virtual int flush_on_data_policy(TcpReassemblerState&, snort::Packet*); virtual int flush_on_ack_policy(TcpReassemblerState&, snort::Packet*); @@ -51,7 +53,7 @@ public: uint32_t event_id, uint32_t event_second); virtual void purge_alerts(TcpReassemblerState&); - uint32_t perform_partial_flush(TcpReassemblerState&, snort::Flow*); + uint32_t perform_partial_flush(TcpReassemblerState&, snort::Flow*, snort::Packet*&); protected: TcpReassembler() = default; @@ -72,8 +74,6 @@ protected: bool is_segment_fasttrack (TcpReassemblerState&, TcpSegmentNode* tail, const TcpSegmentDescriptor&); void show_rebuilt_packet(const TcpReassemblerState&, snort::Packet*); - void flush_queued_segments( - TcpReassemblerState&, snort::Flow* flow, bool clear, snort::Packet*); int flush_data_segments(TcpReassemblerState&, uint32_t flush_len, snort::Packet* pdu); void prep_pdu( TcpReassemblerState&, snort::Flow*, snort::Packet*, uint32_t pkt_flags, snort::Packet*); diff --git a/src/stream/tcp/tcp_reassemblers.h b/src/stream/tcp/tcp_reassemblers.h index bcfeca059..be03244f6 100644 --- a/src/stream/tcp/tcp_reassemblers.h +++ b/src/stream/tcp/tcp_reassemblers.h @@ -70,6 +70,9 @@ public: int flush_stream(snort::Packet* p, uint32_t dir, bool final_flush = false) { return reassembler->flush_stream(trs, p, dir, final_flush); } + void finish_and_final_flush(snort::Flow* flow, bool clear, snort::Packet* p) + { reassembler->finish_and_final_flush(trs, flow, clear, p); } + void flush_queued_segments(snort::Flow* flow, bool clear, const snort::Packet* p = nullptr) { reassembler->flush_queued_segments(trs, flow, clear, p); } @@ -118,8 +121,8 @@ public: void set_norm_mode_test() { trs.sos.tcp_ips_data = NORM_MODE_TEST; } - uint32_t perform_partial_flush(snort::Flow* flow) - { return reassembler->perform_partial_flush(trs, flow); } + uint32_t perform_partial_flush(snort::Flow* flow, snort::Packet*& p) + { return reassembler->perform_partial_flush(trs, flow, p); } void reset_paf() { paf_reset(&trs.paf_state); } diff --git a/src/stream/tcp/tcp_stream_tracker.cc b/src/stream/tcp/tcp_stream_tracker.cc index 5428393a4..6915ad848 100644 --- a/src/stream/tcp/tcp_stream_tracker.cc +++ b/src/stream/tcp/tcp_stream_tracker.cc @@ -671,14 +671,29 @@ void TcpStreamTracker::perform_fin_recv_flush(TcpSegmentDescriptor& tsd) if ( flush_policy == STREAM_FLPOLICY_ON_DATA and SEQ_EQ(tsd.get_end_seq(), rcv_nxt) and !tsd.get_flow()->searching_for_service() ) - reassembler.flush_queued_segments(tsd.get_flow(), true, tsd.get_pkt()); + reassembler.finish_and_final_flush(tsd.get_flow(), true, tsd.get_pkt()); } uint32_t TcpStreamTracker::perform_partial_flush() { uint32_t flushed = 0; if ( held_packet != null_iterator ) - flushed = reassembler.perform_partial_flush(session->flow); + { + Packet* p; + flushed = reassembler.perform_partial_flush(session->flow, p); + + // If the held_packet hasn't been released by perform_partial_flush(), + // call finalize directly. + if ( is_holding_packet() ) + { + finalize_held_packet(p); + tcpStats.held_packet_purges++; + } + + // call this here explicitly, because we've avoided it in reassembler + // and we need to set flow state to BLOCK, if need be + Stream::check_flow_closed(p); + } return flushed; }