From: Haleema Khan Date: Fri, 13 Jan 2023 12:25:37 +0000 (+0500) Subject: mqtt: add mqtt frames X-Git-Tag: suricata-7.0.0-rc2~112 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e474858e259bf399c05ee8c09d7890f5a3c52728;p=thirdparty%2Fsuricata.git mqtt: add mqtt frames Adds PDU, Header and Data frame to the MQTT parser. Ticket: 5731 --- diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs index 43bd708151..9044cd0b1f 100644 --- a/rust/src/mqtt/mqtt.rs +++ b/rust/src/mqtt/mqtt.rs @@ -23,6 +23,7 @@ use crate::applayer::*; use crate::applayer::{self, LoggerFlags}; use crate::conf::conf_get; use crate::core::*; +use crate::frames::*; use nom7::Err; use std; use std::collections::VecDeque; @@ -41,6 +42,13 @@ static mut MQTT_MAX_TX: usize = 1024; static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN; +#[derive(AppLayerFrameType)] +pub enum MQTTFrameType { + Pdu, + Header, + Data, +} + #[derive(FromPrimitive, Debug, AppLayerEvent)] pub enum MQTTEvent { MissingConnect, @@ -422,8 +430,10 @@ impl MQTTState { } } - fn parse_request(&mut self, input: &[u8]) -> AppLayerResult { + fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { + let input = stream_slice.as_slice(); let mut current = input; + if input.is_empty() { return AppLayerResult::ok(); } @@ -455,6 +465,13 @@ impl MQTTState { SCLogDebug!("request: handling {}", current.len()); match parse_message(current, self.protocol_version, self.max_msg_len) { Ok((rem, msg)) => { + let _pdu = Frame::new( + flow, + &stream_slice, + input, + current.len() as i64, + MQTTFrameType::Pdu as u8, + ); SCLogDebug!("request msg {:?}", msg); if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { SCLogDebug!( @@ -474,6 +491,8 @@ impl MQTTState { continue; } } + + self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg); self.handle_msg(msg, false); consumed += current.len() - rem.len(); current = rem; @@ -497,8 +516,10 @@ impl MQTTState { return AppLayerResult::ok(); } - fn parse_response(&mut self, input: &[u8]) -> AppLayerResult { + fn parse_response(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { + let input = stream_slice.as_slice(); let mut current = input; + if input.is_empty() { return AppLayerResult::ok(); } @@ -529,6 +550,14 @@ impl MQTTState { SCLogDebug!("response: handling {}", current.len()); match parse_message(current, self.protocol_version, self.max_msg_len) { Ok((rem, msg)) => { + let _pdu = Frame::new( + flow, + &stream_slice, + input, + input.len() as i64, + MQTTFrameType::Pdu as u8, + ); + SCLogDebug!("response msg {:?}", msg); if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { SCLogDebug!( @@ -549,6 +578,8 @@ impl MQTTState { continue; } } + + self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg); self.handle_msg(msg, true); consumed += current.len() - rem.len(); current = rem; @@ -589,6 +620,23 @@ impl MQTTState { tx.tx_data.set_event(event as u8); self.transactions.push_back(tx); } + + fn mqtt_hdr_and_data_frames(&mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTMessage) { + let hdr = stream_slice.as_slice(); + //MQTT payload has a fixed header of 2 bytes + let _mqtt_hdr = Frame::new(flow, stream_slice, hdr, 2, MQTTFrameType::Header as u8); + SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr); + let rem_length = input.header.remaining_length as usize; + let data = &hdr[2..rem_length + 2]; + let _mqtt_data = Frame::new( + flow, + stream_slice, + data, + rem_length as i64, + MQTTFrameType::Data as u8, + ); + SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data); + } } // C exports. @@ -637,20 +685,20 @@ pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, #[no_mangle] pub unsafe extern "C" fn rs_mqtt_parse_request( - _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, + flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, stream_slice: StreamSlice, _data: *const std::os::raw::c_void, ) -> AppLayerResult { let state = cast_pointer!(state, MQTTState); - return state.parse_request(stream_slice.as_slice()); + return state.parse_request(flow, stream_slice); } #[no_mangle] pub unsafe extern "C" fn rs_mqtt_parse_response( - _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, + flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, stream_slice: StreamSlice, _data: *const std::os::raw::c_void, ) -> AppLayerResult { let state = cast_pointer!(state, MQTTState); - return state.parse_response(stream_slice.as_slice()); + return state.parse_response(flow, stream_slice); } #[no_mangle] @@ -761,8 +809,8 @@ pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { apply_tx_config: None, flags: 0, truncate: None, - get_frame_id_by_name: None, - get_frame_name_by_id: None, + get_frame_id_by_name: Some(MQTTFrameType::ffi_id_from_name), + get_frame_name_by_id: Some(MQTTFrameType::ffi_name_from_id), }; let ip_proto_str = CString::new("tcp").unwrap(); @@ -783,4 +831,4 @@ pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { } else { SCLogDebug!("Protocol detector and parser disabled for MQTT."); } -} +} \ No newline at end of file