From 91353fdb6154cb20ab2d2eb8061532c793c8f927 Mon Sep 17 00:00:00 2001 From: Shivani Bhardwaj Date: Thu, 8 May 2025 12:37:26 +0530 Subject: [PATCH] mqtt: trigger raw stream reassembly Internals --------- Suricata's stream engine returns data for inspection to the detection engine from the stream when the chunk size is reached. Bug --- Inspection triggered only in the specified chunk sizes may be too late when it comes to inspection of smaller protocol specific data which could result in delayed inspection, incorrect data logged with a transaction and logs misindicating the pkt that triggered an alert. Fix --- Fix this by making an explicit call from all respective applayer parsers to trigger raw stream reassembly which shall make the data available for inspection in the following call of the stream engine. This needs to happen per direction on the completion of an entity like a request or a response. Important notes --------------- 1. The above mentioned behavior with and without this patch is affected internally by the following conditions. - inspection depth - stream depth In these special cases, the inspection window will be affected and Suricata may not consider all the data that could be expected to be inspected. 2. This only applies to applayer protocols running over TCP. 3. The inspection window is only considered up to the ACK'd data. 4. This entire issue is about IDS mode only. MQTT creates a transaction per message per direction, so, a call to trigger raw stream reassembly has been made on completion of each transaction in the respective direction. Optimization 7026 Bug 7004 --- rust/src/mqtt/mqtt.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs index 63829e838d..617b4077c1 100644 --- a/rust/src/mqtt/mqtt.rs +++ b/rust/src/mqtt/mqtt.rs @@ -19,18 +19,18 @@ use super::mqtt_message::*; use super::parser::*; -use crate::applayer::*; use crate::applayer; +use crate::applayer::*; use crate::conf::{conf_get, get_memval}; use crate::core::*; use crate::direction::Direction; use crate::flow::Flow; use crate::frames::*; use nom7::Err; -use suricata_sys::sys::AppProto; use std; use std::collections::VecDeque; use std::ffi::CString; +use suricata_sys::sys::AppProto; // Used as a special pseudo packet identifier to denote the first CONNECT // packet in a connection. Note that there is no risk of collision with a @@ -218,7 +218,8 @@ impl MQTTState { // without having to introduce lifetimes etc. // This is the reason for the code duplication below. Maybe there is a // more concise way to do it, but this works for now. - fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) { + fn handle_msg(&mut self, flow: *const Flow, msg: MQTTMessage, toclient: bool) { + let tx_len = self.transactions.len(); match msg.op { MQTTOperation::CONNECT(ref conn) => { self.protocol_version = conn.protocol_version; @@ -398,6 +399,15 @@ impl MQTTState { self.transactions.push_back(tx); } } + // If a new transaction was pushed, only then reassemble the data + if self.transactions.len() > tx_len { + let dir = if toclient { + Direction::ToClient + } else { + Direction::ToServer + }; + sc_app_layer_parser_trigger_raw_stream_reassembly(flow, dir as i32); + } } fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { @@ -452,19 +462,19 @@ impl MQTTState { ); if trunc.skipped_length >= current.len() { self.skip_request = trunc.skipped_length - current.len(); - self.handle_msg(msg, true); + self.handle_msg(flow, msg, true); return AppLayerResult::ok(); } else { consumed += trunc.skipped_length; current = ¤t[trunc.skipped_length..]; - self.handle_msg(msg, true); + self.handle_msg(flow, msg, true); self.skip_request = 0; continue; } } self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg); - self.handle_msg(msg, false); + self.handle_msg(flow, msg, false); consumed += current.len() - rem.len(); current = rem; } @@ -539,20 +549,20 @@ impl MQTTState { ); if trunc.skipped_length >= current.len() { self.skip_response = trunc.skipped_length - current.len(); - self.handle_msg(msg, true); + self.handle_msg(flow, msg, true); SCLogDebug!("skip_response now {}", self.skip_response); return AppLayerResult::ok(); } else { consumed += trunc.skipped_length; current = ¤t[trunc.skipped_length..]; - self.handle_msg(msg, true); + self.handle_msg(flow, msg, true); self.skip_response = 0; continue; } } self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg); - self.handle_msg(msg, true); + self.handle_msg(flow, msg, true); consumed += current.len() - rem.len(); current = rem; } @@ -704,9 +714,7 @@ unsafe extern "C" fn mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) - } #[no_mangle] -pub unsafe extern "C" fn SCMqttTxIsToClient( - tx: &MQTTTransaction, -) -> std::os::raw::c_int { +pub unsafe extern "C" fn SCMqttTxIsToClient(tx: &MQTTTransaction) -> std::os::raw::c_int { if tx.toclient { return 1; } -- 2.47.2