From: Sascha Steinbiss Date: Tue, 8 Mar 2022 22:19:22 +0000 (+0100) Subject: mqtt: rustfmt X-Git-Tag: suricata-7.0.0-beta1~768 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2a3ed9a6aeee398df3c936f356a865ddb0a9c5ae;p=thirdparty%2Fsuricata.git mqtt: rustfmt --- diff --git a/rust/src/mqtt/detect.rs b/rust/src/mqtt/detect.rs index 67460709c0..3a313a07ac 100644 --- a/rust/src/mqtt/detect.rs +++ b/rust/src/mqtt/detect.rs @@ -17,7 +17,7 @@ // written by Sascha Steinbiss -use crate::mqtt::mqtt::{MQTTTransaction, MQTTState}; +use crate::mqtt::mqtt::{MQTTState, MQTTTransaction}; use crate::mqtt::mqtt_message::{MQTTOperation, MQTTTypeCode}; use std::ffi::CStr; use std::ptr; @@ -33,31 +33,24 @@ pub enum MQTTFlagState { } #[inline] -fn check_flag_state( - flag_state: MQTTFlagState, - flag_value: bool, - ok: &mut bool, -) { +fn check_flag_state(flag_state: MQTTFlagState, flag_value: bool, ok: &mut bool) { match flag_state { MQTTFlagState::MQTT_MUST_BE_SET => { if !flag_value { *ok = false; } - }, + } MQTTFlagState::MQTT_CANT_BE_SET => { if flag_value { *ok = false; } - }, + } _ => {} } } #[no_mangle] -pub extern "C" fn rs_mqtt_tx_has_type( - tx: &MQTTTransaction, - mtype: u8, -) -> u8 { +pub extern "C" fn rs_mqtt_tx_has_type(tx: &MQTTTransaction, mtype: u8) -> u8 { for msg in tx.msg.iter() { if mtype == msg.header.message_type as u8 { return 1; @@ -81,9 +74,7 @@ pub unsafe extern "C" fn rs_mqtt_cstr_message_code( #[no_mangle] pub extern "C" fn rs_mqtt_tx_has_flags( - tx: &MQTTTransaction, - qretain: MQTTFlagState, - qdup: MQTTFlagState, + tx: &MQTTTransaction, qretain: MQTTFlagState, qdup: MQTTFlagState, ) -> u8 { for msg in tx.msg.iter() { let mut ok = true; @@ -98,10 +89,7 @@ pub extern "C" fn rs_mqtt_tx_has_flags( } #[no_mangle] -pub extern "C" fn rs_mqtt_tx_has_qos( - tx: &MQTTTransaction, - qos: u8, -) -> u8 { +pub extern "C" fn rs_mqtt_tx_has_qos(tx: &MQTTTransaction, qos: u8) -> u8 { for msg in tx.msg.iter() { if qos == msg.header.qos_level { return 1; @@ -111,20 +99,14 @@ pub extern "C" fn rs_mqtt_tx_has_qos( } #[no_mangle] -pub extern "C" fn rs_mqtt_tx_get_protocol_version( - state: &MQTTState, -) -> u8 { +pub extern "C" fn rs_mqtt_tx_get_protocol_version(state: &MQTTState) -> u8 { return state.protocol_version; } #[no_mangle] pub extern "C" fn rs_mqtt_tx_has_connect_flags( - tx: &MQTTTransaction, - username: MQTTFlagState, - password: MQTTFlagState, - will: MQTTFlagState, - will_retain: MQTTFlagState, - clean_session: MQTTFlagState, + tx: &MQTTTransaction, username: MQTTFlagState, password: MQTTFlagState, will: MQTTFlagState, + will_retain: MQTTFlagState, clean_session: MQTTFlagState, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNECT(ref cv) = msg.op { @@ -144,9 +126,7 @@ pub extern "C" fn rs_mqtt_tx_has_connect_flags( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNECT(ref cv) = msg.op { @@ -166,9 +146,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNECT(ref cv) = msg.op { @@ -190,9 +168,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNECT(ref cv) = msg.op { @@ -213,9 +189,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNECT(ref cv) = msg.op { @@ -237,9 +211,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNECT(ref cv) = msg.op { @@ -261,8 +233,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent( - tx: &MQTTTransaction, - session_present: *mut bool, + tx: &MQTTTransaction, session_present: *mut bool, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::CONNACK(ref ca) = msg.op { @@ -275,9 +246,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::PUBLISH(ref pubv) = msg.op { @@ -298,9 +267,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message( - tx: &MQTTTransaction, - buffer: *mut *const u8, - buffer_len: *mut u32, + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, ) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::PUBLISH(ref pubv) = msg.op { @@ -320,12 +287,9 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message( } #[no_mangle] -pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(tx: &MQTTTransaction, - i: u32, - buf: *mut *const u8, - len: *mut u32) - -> u8 -{ +pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic( + tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32, +) -> u8 { let mut offset = 0; for msg in tx.msg.iter() { if let MQTTOperation::SUBSCRIBE(ref subv) = msg.op { @@ -349,12 +313,9 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(tx: &MQTTTransaction, } #[no_mangle] -pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(tx: &MQTTTransaction, - i: u32, - buf: *mut *const u8, - len: *mut u32) - -> u8 -{ +pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic( + tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32, +) -> u8 { let mut offset = 0; for msg in tx.msg.iter() { if let MQTTOperation::UNSUBSCRIBE(ref unsubv) = msg.op { @@ -378,10 +339,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(tx: &MQTTTransaction, } #[no_mangle] -pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code( - tx: &MQTTTransaction, - result: *mut u8, -) -> u8 { +pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(tx: &MQTTTransaction, result: *mut u8) -> u8 { for msg in tx.msg.iter() { match msg.op { MQTTOperation::PUBACK(ref v) @@ -407,17 +365,14 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code( return 1; } } - _ => return 0 + _ => return 0, } } return 0; } #[no_mangle] -pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code( - tx: &MQTTTransaction, - code: u8, -) -> u8 { +pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(tx: &MQTTTransaction, code: u8) -> u8 { for msg in tx.msg.iter() { if let MQTTOperation::UNSUBACK(ref unsuback) = msg.op { if let Some(ref reason_codes) = unsuback.reason_codes { @@ -435,10 +390,10 @@ pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code( #[cfg(test)] mod test { use super::*; - use std; use crate::mqtt::mqtt::MQTTTransaction; use crate::mqtt::mqtt_message::*; use crate::mqtt::parser::FixedHeader; + use std; #[test] fn test_multi_unsubscribe() { @@ -472,23 +427,23 @@ mod test { }); let mut s: *const u8 = std::ptr::null_mut(); let mut slen: u32 = 0; - let mut r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen)}; + let mut r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen) }; assert_eq!(r, 1); - let mut topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "foo"); - r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen) }; assert_eq!(r, 1); - topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "baar"); - r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen) }; assert_eq!(r, 1); - topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "fieee"); - r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen) }; assert_eq!(r, 1); - topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "baaaaz"); - r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen) }; assert_eq!(r, 0); } @@ -512,7 +467,8 @@ mod test { MQTTSubscribeTopicData { topic_name: "baar".to_string(), qos: 1, - }], + }, + ], properties: None, }), }); @@ -534,29 +490,30 @@ mod test { MQTTSubscribeTopicData { topic_name: "baaaaz".to_string(), qos: 1, - }], + }, + ], properties: None, }), }); let mut s: *const u8 = std::ptr::null_mut(); let mut slen: u32 = 0; - let mut r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen)}; + let mut r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen) }; assert_eq!(r, 1); - let mut topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "foo"); - r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen) }; assert_eq!(r, 1); - topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "baar"); - r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen) }; assert_eq!(r, 1); - topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "fieee"); - r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen) }; assert_eq!(r, 1); - topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); assert_eq!(topic, "baaaaz"); - r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen)}; + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen) }; assert_eq!(r, 0); } } diff --git a/rust/src/mqtt/logger.rs b/rust/src/mqtt/logger.rs index bda119aae6..d7b343931e 100644 --- a/rust/src/mqtt/logger.rs +++ b/rust/src/mqtt/logger.rs @@ -17,17 +17,16 @@ // written by Sascha Steinbiss -use std; -use super::mqtt::{MQTTTransaction, MQTTState}; +use super::mqtt::{MQTTState, MQTTTransaction}; use crate::jsonbuilder::{JsonBuilder, JsonError}; use crate::mqtt::mqtt_message::{MQTTOperation, MQTTSubscribeTopicData}; -use crate::mqtt::parser::{FixedHeader}; +use crate::mqtt::parser::FixedHeader; +use std; pub const MQTT_LOG_PASSWORDS: u32 = BIT_U32!(0); #[inline] -fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> -{ +fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> { js.start_object()?; js.set_string("topic", &t.topic_name)?; js.set_uint("qos", t.qos as u64)?; @@ -36,8 +35,7 @@ fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<() } #[inline] -fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> -{ +fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> { js.set_uint("qos", hdr.qos_level as u64)?; js.set_bool("retain", hdr.retain)?; js.set_bool("dup", hdr.dup_flag)?; @@ -247,12 +245,12 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<() js.open_object("pingreq")?; log_mqtt_header(js, &msg.header)?; js.close()?; // pingreq - }, + } MQTTOperation::PINGRESP => { js.open_object("pingresp")?; log_mqtt_header(js, &msg.header)?; js.close()?; // pingresp - }, + } MQTTOperation::AUTH(ref auth) => { js.open_object("auth")?; log_mqtt_header(js, &msg.header)?; @@ -265,7 +263,7 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<() js.close()?; // properties } js.close()?; // auth - }, + } MQTTOperation::DISCONNECT(ref disco) => { js.open_object("disconnect")?; log_mqtt_header(js, &msg.header)?; @@ -280,15 +278,15 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<() js.close()?; // properties } js.close()?; // disconnect - }, + } MQTTOperation::TRUNCATED(ref trunc) => { js.open_object(&trunc.original_message_type.to_lower_str())?; log_mqtt_header(js, &msg.header)?; js.set_bool("truncated", true)?; js.set_uint("skipped_length", trunc.skipped_length as u64)?; js.close()?; // truncated - }, - MQTTOperation::UNASSIGNED => {}, + } + MQTTOperation::UNASSIGNED => {} } } js.close()?; // mqtt @@ -297,7 +295,9 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<() } #[no_mangle] -pub unsafe extern "C" fn rs_mqtt_logger_log(_state: &mut MQTTState, tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder) -> bool { +pub unsafe extern "C" fn rs_mqtt_logger_log( + _state: &mut MQTTState, tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder, +) -> bool { let tx = cast_pointer!(tx, MQTTTransaction); log_mqtt(tx, flags, js).is_ok() } diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs index c4600979a3..0881efa873 100644 --- a/rust/src/mqtt/mqtt.rs +++ b/rust/src/mqtt/mqtt.rs @@ -19,8 +19,8 @@ use super::mqtt_message::*; use super::parser::*; -use crate::applayer::{self, LoggerFlags}; use crate::applayer::*; +use crate::applayer::{self, LoggerFlags}; use crate::conf::conf_get; use crate::core::*; use nom7::Err; @@ -204,7 +204,7 @@ impl MQTTState { tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); self.transactions.push(tx); } - }, + } MQTTOperation::PUBLISH(ref publish) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); @@ -219,7 +219,7 @@ impl MQTTState { let mut tx = self.new_tx(msg, toclient); tx.complete = true; self.transactions.push(tx); - }, + } 1..=2 => { if let Some(pkt_id) = publish.message_id { let mut tx = self.new_tx(msg, toclient); @@ -230,14 +230,14 @@ impl MQTTState { MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); self.transactions.push(tx); } - }, + } _ => { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); self.transactions.push(tx); } } - }, + } MQTTOperation::SUBSCRIBE(ref subscribe) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); @@ -253,19 +253,19 @@ impl MQTTState { let mut tx = self.new_tx(msg, toclient); tx.complete = true; self.transactions.push(tx); - }, + } 1..=2 => { let mut tx = self.new_tx(msg, toclient); tx.pkt_id = Some(pkt_id); self.transactions.push(tx); - }, + } _ => { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); self.transactions.push(tx); } } - }, + } MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); @@ -281,19 +281,19 @@ impl MQTTState { let mut tx = self.new_tx(msg, toclient); tx.complete = true; self.transactions.push(tx); - }, + } 1..=2 => { let mut tx = self.new_tx(msg, toclient); tx.pkt_id = Some(pkt_id); self.transactions.push(tx); - }, + } _ => { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); self.transactions.push(tx); } } - }, + } MQTTOperation::CONNACK(ref _connack) => { if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { (*tx).msg.push(msg); @@ -305,9 +305,8 @@ impl MQTTState { MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); self.transactions.push(tx); } - }, - MQTTOperation::PUBREC(ref v) - | MQTTOperation::PUBREL(ref v) => { + } + MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); @@ -321,9 +320,8 @@ impl MQTTState { MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); self.transactions.push(tx); } - }, - MQTTOperation::PUBACK(ref v) - | MQTTOperation::PUBCOMP(ref v) => { + } + MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); @@ -339,7 +337,7 @@ impl MQTTState { MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); self.transactions.push(tx); } - }, + } MQTTOperation::SUBACK(ref suback) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); @@ -356,7 +354,7 @@ impl MQTTState { MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); self.transactions.push(tx); } - }, + } MQTTOperation::UNSUBACK(ref unsuback) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); @@ -373,20 +371,19 @@ impl MQTTState { MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); self.transactions.push(tx); } - }, + } MQTTOperation::UNASSIGNED => { let mut tx = self.new_tx(msg, toclient); tx.complete = true; MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType); self.transactions.push(tx); - }, + } MQTTOperation::TRUNCATED(_) => { let mut tx = self.new_tx(msg, toclient); tx.complete = true; self.transactions.push(tx); - }, - MQTTOperation::AUTH(_) - | MQTTOperation::DISCONNECT(_) => { + } + MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => { if !self.connected { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); @@ -396,9 +393,8 @@ impl MQTTState { let mut tx = self.new_tx(msg, toclient); tx.complete = true; self.transactions.push(tx); - }, - MQTTOperation::PINGREQ - | MQTTOperation::PINGRESP => { + } + MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => { if !self.connected { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); @@ -419,7 +415,11 @@ impl MQTTState { } let mut consumed = 0; - SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len()); + SCLogDebug!( + "skip_request {} input len {}", + self.skip_request, + input.len() + ); if self.skip_request > 0 { if input.len() <= self.skip_request { SCLogDebug!("reducing skip_request by {}", input.len()); @@ -427,13 +427,16 @@ impl MQTTState { return AppLayerResult::ok(); } else { current = &input[self.skip_request..]; - SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); + SCLogDebug!( + "skip end reached, skipping {} :{:?}", + self.skip_request, + current + ); consumed = self.skip_request; self.skip_request = 0; } } - while current.len() > 0 { let mut skipped = false; SCLogDebug!("request: handling {}", current.len()); @@ -441,7 +444,11 @@ impl MQTTState { Ok((rem, msg)) => { SCLogDebug!("request msg {:?}", msg); if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { - SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); + SCLogDebug!( + "found truncated with skipped {} current len {}", + trunc.skipped_length, + current.len() + ); if trunc.skipped_length >= current.len() { skipped = true; self.skip_request = trunc.skipped_length - current.len(); @@ -458,8 +465,13 @@ impl MQTTState { current = rem; } Err(Err::Incomplete(_)) => { - SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); - return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); + SCLogDebug!( + "incomplete request: consumed {} needed {} (input len {})", + consumed, + (current.len() + 1), + input.len() + ); + return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); } Err(_) => { self.set_event_notx(MQTTEvent::MalformedTraffic, false); @@ -478,14 +490,22 @@ impl MQTTState { } let mut consumed = 0; - SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len()); + SCLogDebug!( + "skip_response {} input len {}", + self.skip_response, + current.len() + ); if self.skip_response > 0 { if input.len() <= self.skip_response { self.skip_response -= current.len(); return AppLayerResult::ok(); } else { current = &input[self.skip_response..]; - SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); + SCLogDebug!( + "skip end reached, skipping {} :{:?}", + self.skip_request, + current + ); consumed = self.skip_response; self.skip_response = 0; } @@ -498,7 +518,11 @@ impl MQTTState { Ok((rem, msg)) => { SCLogDebug!("response msg {:?}", msg); if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { - SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); + SCLogDebug!( + "found truncated with skipped {} current len {}", + trunc.skipped_length, + current.len() + ); if trunc.skipped_length >= current.len() { skipped = true; self.skip_response = trunc.skipped_length - current.len(); @@ -516,7 +540,12 @@ impl MQTTState { current = rem; } Err(Err::Incomplete(_)) => { - SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); + SCLogDebug!( + "incomplete response: consumed {} needed {} (input len {})", + consumed, + (current.len() + 1), + input.len() + ); return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); } Err(_) => { @@ -552,11 +581,7 @@ impl MQTTState { #[no_mangle] pub unsafe extern "C" fn rs_mqtt_probing_parser( - _flow: *const Flow, - _direction: u8, - input: *const u8, - input_len: u32, - _rdir: *mut u8, + _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8, ) -> AppProto { let buf = build_slice!(input, input_len as usize); match parse_fixed_header(buf) { @@ -570,14 +595,16 @@ pub unsafe extern "C" fn rs_mqtt_probing_parser( return ALPROTO_FAILED; } return ALPROTO_MQTT; - }, + } Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN, - Err(_) => ALPROTO_FAILED + Err(_) => ALPROTO_FAILED, } } #[no_mangle] -pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void { +pub extern "C" fn rs_mqtt_state_new( + _orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto, +) -> *mut std::os::raw::c_void { let state = MQTTState::new(); let boxed = Box::new(state); return Box::into_raw(boxed) as *mut _; @@ -596,11 +623,8 @@ pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, #[no_mangle] pub unsafe extern "C" fn rs_mqtt_parse_request( - _flow: *const Flow, - state: *mut std::os::raw::c_void, - _pstate: *mut std::os::raw::c_void, - stream_slice: StreamSlice, - _data: *const std::os::raw::c_void, + _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, + stream_slice: StreamSlice, _data: *const std::os::raw::c_void, ) -> AppLayerResult { let state = cast_pointer!(state, MQTTState); return state.parse_request(stream_slice.as_slice()); @@ -608,11 +632,8 @@ pub unsafe extern "C" fn rs_mqtt_parse_request( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_parse_response( - _flow: *const Flow, - state: *mut std::os::raw::c_void, - _pstate: *mut std::os::raw::c_void, - stream_slice: StreamSlice, - _data: *const std::os::raw::c_void, + _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, + stream_slice: StreamSlice, _data: *const std::os::raw::c_void, ) -> AppLayerResult { let state = cast_pointer!(state, MQTTState); return state.parse_response(stream_slice.as_slice()); @@ -620,8 +641,7 @@ pub unsafe extern "C" fn rs_mqtt_parse_response( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_state_get_tx( - state: *mut std::os::raw::c_void, - tx_id: u64, + state: *mut std::os::raw::c_void, tx_id: u64, ) -> *mut std::os::raw::c_void { let state = cast_pointer!(state, MQTTState); match state.get_tx(tx_id) { @@ -641,7 +661,9 @@ pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_ } #[no_mangle] -pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int { +pub unsafe extern "C" fn rs_mqtt_tx_is_toclient( + tx: *const std::os::raw::c_void, +) -> std::os::raw::c_int { let tx = cast_pointer!(tx, MQTTTransaction); if tx.toclient { return 1; @@ -651,8 +673,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress( - tx: *mut std::os::raw::c_void, - direction: u8, + tx: *mut std::os::raw::c_void, direction: u8, ) -> std::os::raw::c_int { let tx = cast_pointer!(tx, MQTTTransaction); match direction.into() { @@ -672,8 +693,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_get_logged( - _state: *mut std::os::raw::c_void, - tx: *mut std::os::raw::c_void, + _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, ) -> u32 { let tx = cast_pointer!(tx, MQTTTransaction); return tx.logged.get(); @@ -681,9 +701,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_logged( #[no_mangle] pub unsafe extern "C" fn rs_mqtt_tx_set_logged( - _state: *mut std::os::raw::c_void, - tx: *mut std::os::raw::c_void, - logged: u32, + _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, logged: u32, ) { let tx = cast_pointer!(tx, MQTTTransaction); tx.logged.set(logged); diff --git a/rust/src/mqtt/mqtt_message.rs b/rust/src/mqtt/mqtt_message.rs index 59583c9a09..aadcf83e1c 100644 --- a/rust/src/mqtt/mqtt_message.rs +++ b/rust/src/mqtt/mqtt_message.rs @@ -1,4 +1,3 @@ - /* Copyright (C) 2020 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of @@ -111,7 +110,6 @@ impl std::str::FromStr for MQTTTypeCode { } } - #[derive(Debug)] pub struct MQTTConnectData { pub protocol_string: String, @@ -204,4 +202,4 @@ pub struct MQTTDisconnectData { pub struct MQTTTruncatedData { pub original_message_type: MQTTTypeCode, pub skipped_length: usize, -} \ No newline at end of file +} diff --git a/rust/src/mqtt/parser.rs b/rust/src/mqtt/parser.rs index 9a547286c0..25c9839e46 100644 --- a/rust/src/mqtt/parser.rs +++ b/rust/src/mqtt/parser.rs @@ -21,8 +21,8 @@ use crate::common::nom7::bits; use crate::mqtt::mqtt_message::*; use crate::mqtt::mqtt_property::*; use nom7::bits::streaming::take as take_bits; -use nom7::bytes::streaming::take_while_m_n; use nom7::bytes::complete::take; +use nom7::bytes::streaming::take_while_m_n; use nom7::combinator::{complete, cond, verify}; use nom7::multi::{length_data, many0, many1}; use nom7::number::streaming::*;