]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
mqtt: rustfmt
authorSascha Steinbiss <satta@debian.org>
Tue, 8 Mar 2022 22:19:22 +0000 (23:19 +0100)
committerVictor Julien <vjulien@oisf.net>
Fri, 8 Apr 2022 20:58:27 +0000 (22:58 +0200)
rust/src/mqtt/detect.rs
rust/src/mqtt/logger.rs
rust/src/mqtt/mqtt.rs
rust/src/mqtt/mqtt_message.rs
rust/src/mqtt/parser.rs

index 67460709c08a09602aa9efa4e4030bceefa3ed64..3a313a07acf316593ba12335d6b43da6e7278800 100644 (file)
@@ -17,7 +17,7 @@
 
 // written by Sascha Steinbiss <sascha@steinbiss.name>
 
-use crate::mqtt::mqtt::{MQTTTransaction, MQTTState};
+use crate::mqtt::mqtt::{MQTTState, MQTTTransaction};
 use crate::mqtt::mqtt_message::{MQTTOperation, MQTTTypeCode};
 use std::ffi::CStr;
 use std::ptr;
@@ -33,31 +33,24 @@ pub enum MQTTFlagState {
 }
 
 #[inline]
-fn check_flag_state(
-    flag_state: MQTTFlagState,
-    flag_value: bool,
-    ok: &mut bool,
-) {
+fn check_flag_state(flag_state: MQTTFlagState, flag_value: bool, ok: &mut bool) {
     match flag_state {
         MQTTFlagState::MQTT_MUST_BE_SET => {
             if !flag_value {
                 *ok = false;
             }
-        },
+        }
         MQTTFlagState::MQTT_CANT_BE_SET => {
             if flag_value {
                 *ok = false;
             }
-        },
+        }
         _ => {}
     }
 }
 
 #[no_mangle]
-pub extern "C" fn rs_mqtt_tx_has_type(
-    tx: &MQTTTransaction,
-    mtype: u8,
-) -> u8 {
+pub extern "C" fn rs_mqtt_tx_has_type(tx: &MQTTTransaction, mtype: u8) -> u8 {
     for msg in tx.msg.iter() {
         if mtype == msg.header.message_type as u8 {
             return 1;
@@ -81,9 +74,7 @@ pub unsafe extern "C" fn rs_mqtt_cstr_message_code(
 
 #[no_mangle]
 pub extern "C" fn rs_mqtt_tx_has_flags(
-    tx: &MQTTTransaction,
-    qretain: MQTTFlagState,
-    qdup: MQTTFlagState,
+    tx: &MQTTTransaction, qretain: MQTTFlagState, qdup: MQTTFlagState,
 ) -> u8 {
     for msg in tx.msg.iter() {
         let mut ok = true;
@@ -98,10 +89,7 @@ pub extern "C" fn rs_mqtt_tx_has_flags(
 }
 
 #[no_mangle]
-pub extern "C" fn rs_mqtt_tx_has_qos(
-    tx: &MQTTTransaction,
-    qos: u8,
-) -> u8 {
+pub extern "C" fn rs_mqtt_tx_has_qos(tx: &MQTTTransaction, qos: u8) -> u8 {
     for msg in tx.msg.iter() {
         if qos == msg.header.qos_level {
             return 1;
@@ -111,20 +99,14 @@ pub extern "C" fn rs_mqtt_tx_has_qos(
 }
 
 #[no_mangle]
-pub extern "C" fn rs_mqtt_tx_get_protocol_version(
-    state: &MQTTState,
-) -> u8 {
+pub extern "C" fn rs_mqtt_tx_get_protocol_version(state: &MQTTState) -> u8 {
     return state.protocol_version;
 }
 
 #[no_mangle]
 pub extern "C" fn rs_mqtt_tx_has_connect_flags(
-    tx: &MQTTTransaction,
-    username: MQTTFlagState,
-    password: MQTTFlagState,
-    will: MQTTFlagState,
-    will_retain: MQTTFlagState,
-    clean_session: MQTTFlagState,
+    tx: &MQTTTransaction, username: MQTTFlagState, password: MQTTFlagState, will: MQTTFlagState,
+    will_retain: MQTTFlagState, clean_session: MQTTFlagState,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNECT(ref cv) = msg.op {
@@ -144,9 +126,7 @@ pub extern "C" fn rs_mqtt_tx_has_connect_flags(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNECT(ref cv) = msg.op {
@@ -166,9 +146,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNECT(ref cv) = msg.op {
@@ -190,9 +168,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNECT(ref cv) = msg.op {
@@ -213,9 +189,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNECT(ref cv) = msg.op {
@@ -237,9 +211,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNECT(ref cv) = msg.op {
@@ -261,8 +233,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent(
-    tx: &MQTTTransaction,
-    session_present: *mut bool,
+    tx: &MQTTTransaction, session_present: *mut bool,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::CONNACK(ref ca) = msg.op {
@@ -275,9 +246,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::PUBLISH(ref pubv) = msg.op {
@@ -298,9 +267,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message(
-    tx: &MQTTTransaction,
-    buffer: *mut *const u8,
-    buffer_len: *mut u32,
+    tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
 ) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::PUBLISH(ref pubv) = msg.op {
@@ -320,12 +287,9 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message(
 }
 
 #[no_mangle]
-pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(tx: &MQTTTransaction,
-    i: u32,
-    buf: *mut *const u8,
-    len: *mut u32)
-    -> u8
-{
+pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(
+    tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32,
+) -> u8 {
     let mut offset = 0;
     for msg in tx.msg.iter() {
         if let MQTTOperation::SUBSCRIBE(ref subv) = msg.op {
@@ -349,12 +313,9 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(tx: &MQTTTransaction,
 }
 
 #[no_mangle]
-pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(tx: &MQTTTransaction,
-    i: u32,
-    buf: *mut *const u8,
-    len: *mut u32)
-    -> u8
-{
+pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(
+    tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32,
+) -> u8 {
     let mut offset = 0;
     for msg in tx.msg.iter() {
         if let MQTTOperation::UNSUBSCRIBE(ref unsubv) = msg.op {
@@ -378,10 +339,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(tx: &MQTTTransaction,
 }
 
 #[no_mangle]
-pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(
-    tx: &MQTTTransaction,
-    result: *mut u8,
-) -> u8 {
+pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(tx: &MQTTTransaction, result: *mut u8) -> u8 {
     for msg in tx.msg.iter() {
         match msg.op {
             MQTTOperation::PUBACK(ref v)
@@ -407,17 +365,14 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(
                     return 1;
                 }
             }
-            _ => return 0
+            _ => return 0,
         }
     }
     return 0;
 }
 
 #[no_mangle]
-pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(
-    tx: &MQTTTransaction,
-    code: u8,
-) -> u8 {
+pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(tx: &MQTTTransaction, code: u8) -> u8 {
     for msg in tx.msg.iter() {
         if let MQTTOperation::UNSUBACK(ref unsuback) = msg.op {
             if let Some(ref reason_codes) = unsuback.reason_codes {
@@ -435,10 +390,10 @@ pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(
 #[cfg(test)]
 mod test {
     use super::*;
-    use std;
     use crate::mqtt::mqtt::MQTTTransaction;
     use crate::mqtt::mqtt_message::*;
     use crate::mqtt::parser::FixedHeader;
+    use std;
 
     #[test]
     fn test_multi_unsubscribe() {
@@ -472,23 +427,23 @@ mod test {
         });
         let mut s: *const u8 = std::ptr::null_mut();
         let mut slen: u32 = 0;
-        let mut r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen)};
+        let mut r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        let mut topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "foo");
-        r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "baar");
-        r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "fieee");
-        r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "baaaaz");
-        r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen) };
         assert_eq!(r, 0);
     }
 
@@ -512,7 +467,8 @@ mod test {
                     MQTTSubscribeTopicData {
                         topic_name: "baar".to_string(),
                         qos: 1,
-                    }],
+                    },
+                ],
                 properties: None,
             }),
         });
@@ -534,29 +490,30 @@ mod test {
                     MQTTSubscribeTopicData {
                         topic_name: "baaaaz".to_string(),
                         qos: 1,
-                    }],
+                    },
+                ],
                 properties: None,
             }),
         });
         let mut s: *const u8 = std::ptr::null_mut();
         let mut slen: u32 = 0;
-        let mut r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen)};
+        let mut r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        let mut topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "foo");
-        r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "baar");
-        r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "fieee");
-        r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen) };
         assert_eq!(r, 1);
-        topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)});
+        topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
         assert_eq!(topic, "baaaaz");
-        r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen)};
+        r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen) };
         assert_eq!(r, 0);
     }
 }
index bda119aae67ce84313534651e57023e0da00dd1b..d7b343931ed44ebf21e9f18042c0d0e3c0968694 100644 (file)
 
 // written by Sascha Steinbiss <sascha@steinbiss.name>
 
-use std;
-use super::mqtt::{MQTTTransaction, MQTTState};
+use super::mqtt::{MQTTState, MQTTTransaction};
 use crate::jsonbuilder::{JsonBuilder, JsonError};
 use crate::mqtt::mqtt_message::{MQTTOperation, MQTTSubscribeTopicData};
-use crate::mqtt::parser::{FixedHeader};
+use crate::mqtt::parser::FixedHeader;
+use std;
 
 pub const MQTT_LOG_PASSWORDS: u32 = BIT_U32!(0);
 
 #[inline]
-fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError>
-{
+fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> {
     js.start_object()?;
     js.set_string("topic", &t.topic_name)?;
     js.set_uint("qos", t.qos as u64)?;
@@ -36,8 +35,7 @@ fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<()
 }
 
 #[inline]
-fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError>
-{
+fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> {
     js.set_uint("qos", hdr.qos_level as u64)?;
     js.set_bool("retain", hdr.retain)?;
     js.set_bool("dup", hdr.dup_flag)?;
@@ -247,12 +245,12 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
                 js.open_object("pingreq")?;
                 log_mqtt_header(js, &msg.header)?;
                 js.close()?; // pingreq
-            },
+            }
             MQTTOperation::PINGRESP => {
                 js.open_object("pingresp")?;
                 log_mqtt_header(js, &msg.header)?;
                 js.close()?; // pingresp
-            },
+            }
             MQTTOperation::AUTH(ref auth) => {
                 js.open_object("auth")?;
                 log_mqtt_header(js, &msg.header)?;
@@ -265,7 +263,7 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
                     js.close()?; // properties
                 }
                 js.close()?; // auth
-            },
+            }
             MQTTOperation::DISCONNECT(ref disco) => {
                 js.open_object("disconnect")?;
                 log_mqtt_header(js, &msg.header)?;
@@ -280,15 +278,15 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
                     js.close()?; // properties
                 }
                 js.close()?; // disconnect
-            },
+            }
             MQTTOperation::TRUNCATED(ref trunc) => {
                 js.open_object(&trunc.original_message_type.to_lower_str())?;
                 log_mqtt_header(js, &msg.header)?;
                 js.set_bool("truncated", true)?;
                 js.set_uint("skipped_length", trunc.skipped_length as u64)?;
                 js.close()?; // truncated
-            },
-            MQTTOperation::UNASSIGNED => {},
+            }
+            MQTTOperation::UNASSIGNED => {}
         }
     }
     js.close()?; // mqtt
@@ -297,7 +295,9 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
 }
 
 #[no_mangle]
-pub unsafe extern "C" fn rs_mqtt_logger_log(_state: &mut MQTTState, tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder) -> bool {
+pub unsafe extern "C" fn rs_mqtt_logger_log(
+    _state: &mut MQTTState, tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder,
+) -> bool {
     let tx = cast_pointer!(tx, MQTTTransaction);
     log_mqtt(tx, flags, js).is_ok()
 }
index c4600979a3379682240c874fa00a707e94b207d6..0881efa8737adf190da1d67484a3dff3c142b89d 100644 (file)
@@ -19,8 +19,8 @@
 
 use super::mqtt_message::*;
 use super::parser::*;
-use crate::applayer::{self, LoggerFlags};
 use crate::applayer::*;
+use crate::applayer::{self, LoggerFlags};
 use crate::conf::conf_get;
 use crate::core::*;
 use nom7::Err;
@@ -204,7 +204,7 @@ impl MQTTState {
                     tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
                     self.transactions.push(tx);
                 }
-            },
+            }
             MQTTOperation::PUBLISH(ref publish) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
@@ -219,7 +219,7 @@ impl MQTTState {
                         let mut tx = self.new_tx(msg, toclient);
                         tx.complete = true;
                         self.transactions.push(tx);
-                    },
+                    }
                     1..=2 => {
                         if let Some(pkt_id) = publish.message_id {
                             let mut tx = self.new_tx(msg, toclient);
@@ -230,14 +230,14 @@ impl MQTTState {
                             MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
                             self.transactions.push(tx);
                         }
-                    },
+                    }
                     _ => {
                         let mut tx = self.new_tx(msg, toclient);
                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
                         self.transactions.push(tx);
                     }
                 }
-            },
+            }
             MQTTOperation::SUBSCRIBE(ref subscribe) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
@@ -253,19 +253,19 @@ impl MQTTState {
                         let mut tx = self.new_tx(msg, toclient);
                         tx.complete = true;
                         self.transactions.push(tx);
-                    },
+                    }
                     1..=2 => {
                         let mut tx = self.new_tx(msg, toclient);
                         tx.pkt_id = Some(pkt_id);
                         self.transactions.push(tx);
-                    },
+                    }
                     _ => {
                         let mut tx = self.new_tx(msg, toclient);
                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
                         self.transactions.push(tx);
                     }
                 }
-            },
+            }
             MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
@@ -281,19 +281,19 @@ impl MQTTState {
                         let mut tx = self.new_tx(msg, toclient);
                         tx.complete = true;
                         self.transactions.push(tx);
-                    },
+                    }
                     1..=2 => {
                         let mut tx = self.new_tx(msg, toclient);
                         tx.pkt_id = Some(pkt_id);
                         self.transactions.push(tx);
-                    },
+                    }
                     _ => {
                         let mut tx = self.new_tx(msg, toclient);
                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
                         self.transactions.push(tx);
                     }
                 }
-            },
+            }
             MQTTOperation::CONNACK(ref _connack) => {
                 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
                     (*tx).msg.push(msg);
@@ -305,9 +305,8 @@ impl MQTTState {
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
                     self.transactions.push(tx);
                 }
-            },
-            MQTTOperation::PUBREC(ref v)
-            | MQTTOperation::PUBREL(ref v) => {
+            }
+            MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@@ -321,9 +320,8 @@ impl MQTTState {
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
                     self.transactions.push(tx);
                 }
-            },
-            MQTTOperation::PUBACK(ref v)
-            | MQTTOperation::PUBCOMP(ref v) => {
+            }
+            MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@@ -339,7 +337,7 @@ impl MQTTState {
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
                     self.transactions.push(tx);
                 }
-            },
+            }
             MQTTOperation::SUBACK(ref suback) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
@@ -356,7 +354,7 @@ impl MQTTState {
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
                     self.transactions.push(tx);
                 }
-            },
+            }
             MQTTOperation::UNSUBACK(ref unsuback) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
@@ -373,20 +371,19 @@ impl MQTTState {
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
                     self.transactions.push(tx);
                 }
-            },
+            }
             MQTTOperation::UNASSIGNED => {
                 let mut tx = self.new_tx(msg, toclient);
                 tx.complete = true;
                 MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
                 self.transactions.push(tx);
-            },
+            }
             MQTTOperation::TRUNCATED(_) => {
                 let mut tx = self.new_tx(msg, toclient);
                 tx.complete = true;
                 self.transactions.push(tx);
-            },
-            MQTTOperation::AUTH(_)
-            | MQTTOperation::DISCONNECT(_) => {
+            }
+            MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@@ -396,9 +393,8 @@ impl MQTTState {
                 let mut tx = self.new_tx(msg, toclient);
                 tx.complete = true;
                 self.transactions.push(tx);
-            },
-            MQTTOperation::PINGREQ
-            | MQTTOperation::PINGRESP => {
+            }
+            MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
                 if !self.connected {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@@ -419,7 +415,11 @@ impl MQTTState {
         }
 
         let mut consumed = 0;
-        SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
+        SCLogDebug!(
+            "skip_request {} input len {}",
+            self.skip_request,
+            input.len()
+        );
         if self.skip_request > 0 {
             if input.len() <= self.skip_request {
                 SCLogDebug!("reducing skip_request by {}", input.len());
@@ -427,13 +427,16 @@ impl MQTTState {
                 return AppLayerResult::ok();
             } else {
                 current = &input[self.skip_request..];
-                SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
+                SCLogDebug!(
+                    "skip end reached, skipping {} :{:?}",
+                    self.skip_request,
+                    current
+                );
                 consumed = self.skip_request;
                 self.skip_request = 0;
             }
         }
 
-
         while current.len() > 0 {
             let mut skipped = false;
             SCLogDebug!("request: handling {}", current.len());
@@ -441,7 +444,11 @@ impl MQTTState {
                 Ok((rem, msg)) => {
                     SCLogDebug!("request msg {:?}", msg);
                     if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
-                        SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
+                        SCLogDebug!(
+                            "found truncated with skipped {} current len {}",
+                            trunc.skipped_length,
+                            current.len()
+                        );
                         if trunc.skipped_length >= current.len() {
                             skipped = true;
                             self.skip_request = trunc.skipped_length - current.len();
@@ -458,8 +465,13 @@ impl MQTTState {
                     current = rem;
                 }
                 Err(Err::Incomplete(_)) => {
-                        SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
-                        return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
+                    SCLogDebug!(
+                        "incomplete request: consumed {} needed {} (input len {})",
+                        consumed,
+                        (current.len() + 1),
+                        input.len()
+                    );
+                    return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
                 }
                 Err(_) => {
                     self.set_event_notx(MQTTEvent::MalformedTraffic, false);
@@ -478,14 +490,22 @@ impl MQTTState {
         }
 
         let mut consumed = 0;
-        SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
+        SCLogDebug!(
+            "skip_response {} input len {}",
+            self.skip_response,
+            current.len()
+        );
         if self.skip_response > 0 {
             if input.len() <= self.skip_response {
                 self.skip_response -= current.len();
                 return AppLayerResult::ok();
             } else {
                 current = &input[self.skip_response..];
-                SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
+                SCLogDebug!(
+                    "skip end reached, skipping {} :{:?}",
+                    self.skip_request,
+                    current
+                );
                 consumed = self.skip_response;
                 self.skip_response = 0;
             }
@@ -498,7 +518,11 @@ impl MQTTState {
                 Ok((rem, msg)) => {
                     SCLogDebug!("response msg {:?}", msg);
                     if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
-                        SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
+                        SCLogDebug!(
+                            "found truncated with skipped {} current len {}",
+                            trunc.skipped_length,
+                            current.len()
+                        );
                         if trunc.skipped_length >= current.len() {
                             skipped = true;
                             self.skip_response = trunc.skipped_length - current.len();
@@ -516,7 +540,12 @@ impl MQTTState {
                     current = rem;
                 }
                 Err(Err::Incomplete(_)) => {
-                    SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
+                    SCLogDebug!(
+                        "incomplete response: consumed {} needed {} (input len {})",
+                        consumed,
+                        (current.len() + 1),
+                        input.len()
+                    );
                     return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
                 }
                 Err(_) => {
@@ -552,11 +581,7 @@ impl MQTTState {
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_probing_parser(
-    _flow: *const Flow,
-    _direction: u8,
-    input: *const u8,
-    input_len: u32,
-    _rdir: *mut u8,
+    _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8,
 ) -> AppProto {
     let buf = build_slice!(input, input_len as usize);
     match parse_fixed_header(buf) {
@@ -570,14 +595,16 @@ pub unsafe extern "C" fn rs_mqtt_probing_parser(
                 return ALPROTO_FAILED;
             }
             return ALPROTO_MQTT;
-        },
+        }
         Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
-        Err(_) => ALPROTO_FAILED
+        Err(_) => ALPROTO_FAILED,
     }
 }
 
 #[no_mangle]
-pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
+pub extern "C" fn rs_mqtt_state_new(
+    _orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto,
+) -> *mut std::os::raw::c_void {
     let state = MQTTState::new();
     let boxed = Box::new(state);
     return Box::into_raw(boxed) as *mut _;
@@ -596,11 +623,8 @@ pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void,
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_parse_request(
-    _flow: *const Flow,
-    state: *mut std::os::raw::c_void,
-    _pstate: *mut std::os::raw::c_void,
-    stream_slice: StreamSlice,
-    _data: *const std::os::raw::c_void,
+    _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
+    stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
 ) -> AppLayerResult {
     let state = cast_pointer!(state, MQTTState);
     return state.parse_request(stream_slice.as_slice());
@@ -608,11 +632,8 @@ pub unsafe extern "C" fn rs_mqtt_parse_request(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_parse_response(
-    _flow: *const Flow,
-    state: *mut std::os::raw::c_void,
-    _pstate: *mut std::os::raw::c_void,
-    stream_slice: StreamSlice,
-    _data: *const std::os::raw::c_void,
+    _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
+    stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
 ) -> AppLayerResult {
     let state = cast_pointer!(state, MQTTState);
     return state.parse_response(stream_slice.as_slice());
@@ -620,8 +641,7 @@ pub unsafe extern "C" fn rs_mqtt_parse_response(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_state_get_tx(
-    state: *mut std::os::raw::c_void,
-    tx_id: u64,
+    state: *mut std::os::raw::c_void, tx_id: u64,
 ) -> *mut std::os::raw::c_void {
     let state = cast_pointer!(state, MQTTState);
     match state.get_tx(tx_id) {
@@ -641,7 +661,9 @@ pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_
 }
 
 #[no_mangle]
-pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
+pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(
+    tx: *const std::os::raw::c_void,
+) -> std::os::raw::c_int {
     let tx = cast_pointer!(tx, MQTTTransaction);
     if tx.toclient {
         return 1;
@@ -651,8 +673,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void)
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
-    tx: *mut std::os::raw::c_void,
-    direction: u8,
+    tx: *mut std::os::raw::c_void, direction: u8,
 ) -> std::os::raw::c_int {
     let tx = cast_pointer!(tx, MQTTTransaction);
     match direction.into() {
@@ -672,8 +693,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
-    _state: *mut std::os::raw::c_void,
-    tx: *mut std::os::raw::c_void,
+    _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void,
 ) -> u32 {
     let tx = cast_pointer!(tx, MQTTTransaction);
     return tx.logged.get();
@@ -681,9 +701,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
 
 #[no_mangle]
 pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
-    _state: *mut std::os::raw::c_void,
-    tx: *mut std::os::raw::c_void,
-    logged: u32,
+    _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, logged: u32,
 ) {
     let tx = cast_pointer!(tx, MQTTTransaction);
     tx.logged.set(logged);
index 59583c9a0964e8c6113bf4bba4eb1312c8258c34..aadcf83e1cdb847891655375822f49b31c2fc2a5 100644 (file)
@@ -1,4 +1,3 @@
-
 /* Copyright (C) 2020 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
@@ -111,7 +110,6 @@ impl std::str::FromStr for MQTTTypeCode {
     }
 }
 
-
 #[derive(Debug)]
 pub struct MQTTConnectData {
     pub protocol_string: String,
@@ -204,4 +202,4 @@ pub struct MQTTDisconnectData {
 pub struct MQTTTruncatedData {
     pub original_message_type: MQTTTypeCode,
     pub skipped_length: usize,
-}
\ No newline at end of file
+}
index 9a547286c033b7550a14784ac1057f38db0d6622..25c9839e46f6f30de87ba7e404fd7479fe5ed661 100644 (file)
@@ -21,8 +21,8 @@ use crate::common::nom7::bits;
 use crate::mqtt::mqtt_message::*;
 use crate::mqtt::mqtt_property::*;
 use nom7::bits::streaming::take as take_bits;
-use nom7::bytes::streaming::take_while_m_n;
 use nom7::bytes::complete::take;
+use nom7::bytes::streaming::take_while_m_n;
 use nom7::combinator::{complete, cond, verify};
 use nom7::multi::{length_data, many0, many1};
 use nom7::number::streaming::*;