]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
rust/mqtt: convert parser to nom7 functions
authorPierre Chifflier <chifflier@wzdftpd.net>
Sat, 30 Oct 2021 14:30:04 +0000 (16:30 +0200)
committerVictor Julien <victor@inliniac.net>
Sat, 6 Nov 2021 15:23:57 +0000 (16:23 +0100)
rust/src/mqtt/mqtt.rs
rust/src/mqtt/mqtt_property.rs
rust/src/mqtt/parser.rs

index 9e5d198f4266ca518c41e303138428f996c7ae6d..91564838cfaae20f438dcc734140bd26e931f945 100644 (file)
@@ -22,7 +22,7 @@ use super::parser::*;
 use crate::applayer::{self, LoggerFlags};
 use crate::applayer::*;
 use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
-use nom;
+use nom7::Err;
 use std;
 use std::ffi::CString;
 
@@ -446,7 +446,7 @@ impl MQTTState {
                     consumed += current.len() - rem.len();
                     current = rem;
                 }
-                Err(nom::Err::Incomplete(_)) => {
+                Err(Err::Incomplete(_)) => {
                         SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
                         return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
                 }
@@ -503,7 +503,7 @@ impl MQTTState {
                     consumed += current.len() - rem.len();
                     current = rem;
                 }
-                Err(nom::Err::Incomplete(_)) => {
+                Err(Err::Incomplete(_)) => {
                     SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
                     return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
                 }
@@ -569,7 +569,7 @@ pub unsafe extern "C" fn rs_mqtt_probing_parser(
             }
             return ALPROTO_MQTT;
         },
-        Err(nom::Err::Incomplete(_)) => ALPROTO_UNKNOWN,
+        Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
         Err(_) => ALPROTO_FAILED
     }
 }
index c407349f6048f437ead96a9e1f5cada349c88c63..789d81ee1458b3904ffd9645abf4203e53b31dac 100644 (file)
@@ -1,4 +1,3 @@
-
 /* Copyright (C) 2020 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
 
 // written by Sascha Steinbiss <sascha@steinbiss.name>
 
-use crate::mqtt::parser::*;
 use crate::jsonbuilder::{JsonBuilder, JsonError};
-use nom::number::streaming::*;
-use nom::*;
+use crate::mqtt::parser::*;
+use nom7::number::streaming::*;
+use nom7::IResult;
 
 // TODO: It might be useful to also add detection on property presence and
 // content, e.g. mqtt.property: AUTHENTICATION_METHOD.
index 31adb8755a321e1ecbe8c8b3e49bb5b88249b77d..fd3937ec5d4193a9163e0a5c7ae137f968bcbb0b 100644 (file)
 
 use crate::mqtt::mqtt_message::*;
 use crate::mqtt::mqtt_property::*;
-use nom::combinator::rest;
-use nom::number::streaming::*;
-use nom::*;
+use nom7::bits::{bits, streaming::take as take_bits};
+use nom7::bytes::streaming::take_while_m_n;
+use nom7::combinator::{complete, cond, verify};
+use nom7::error::Error;
+use nom7::multi::{length_data, many0, many1};
+use nom7::number::streaming::*;
+use nom7::sequence::tuple;
+use nom7::{Err, IResult, Needed};
 use num_traits::FromPrimitive;
 
 #[derive(Debug)]
@@ -54,53 +59,43 @@ fn convert_varint(continued: Vec<u8>, last: u8) -> u32 {
 
 // DATA TYPES
 
-named!(#[inline], pub parse_mqtt_string<String>,
-       do_parse!(
-           length: be_u16
-           >> content: take!(length)
-           >>  (
-                 String::from_utf8_lossy(content).to_string()
-               )
-       ));
-
-named!(#[inline], pub parse_mqtt_variable_integer<u32>,
-       do_parse!(
-           // take at most 4 bytes in total, so as not to overflow u32
-           continued_part: take_while_m_n!(0, 3, is_continuation_bit_set)
-           >> non_continued_part: verify!(be_u8, |&val| !is_continuation_bit_set(val))
-           >>  (
-                 convert_varint(continued_part.to_vec(), non_continued_part)
-               )
-       ));
-
-named!(#[inline], pub parse_mqtt_binary_data<Vec<u8>>,
-       do_parse!(
-           length: be_u16
-           >> data: take!(length)
-           >>  (
-                 data.to_vec()
-               )
-       ));
-
-named!(#[inline], pub parse_mqtt_string_pair<(String, String)>,
-       do_parse!(
-           name: parse_mqtt_string
-           >> value: parse_mqtt_string
-           >>  (
-                 (name, value)
-               )
-       ));
+#[inline]
+pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> {
+    let (i, content) = length_data(be_u16)(i)?;
+    Ok((i, String::from_utf8_lossy(content).to_string()))
+}
+
+#[inline]
+pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> {
+    let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?;
+    let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?;
+    Ok((
+        i,
+        convert_varint(continued_part.to_vec(), non_continued_part),
+    ))
+}
+
+#[inline]
+pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> {
+    let (i, data) = length_data(be_u16)(i)?;
+    Ok((i, data.to_vec()))
+}
+
+#[inline]
+pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> {
+    let (i, name) = parse_mqtt_string(i)?;
+    let (i, value) = parse_mqtt_string(i)?;
+    Ok((i, (name, value)))
+}
 
 // MESSAGE COMPONENTS
 
-named!(#[inline], pub parse_property<MQTTProperty>,
-       do_parse!(
-           identifier: parse_mqtt_variable_integer
-           >> value: call!(parse_qualified_property, identifier)
-           >>  (
-                 value
-               )
-       ));
+#[inline]
+pub fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> {
+    let (i, identifier) = parse_mqtt_variable_integer(i)?;
+    let (i, value) = parse_qualified_property(i, identifier)?;
+    Ok((i, value))
+}
 
 #[inline]
 fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> {
@@ -137,15 +132,12 @@ fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQ
 
 #[inline]
 fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> {
-    bits!(
-        i,
-        tuple!(
-            take_bits!(4u8),
-            take_bits!(1u8),
-            take_bits!(2u8),
-            take_bits!(1u8)
-        )
-    )
+    bits::<_, _, Error<(&[u8], usize)>, _, _>(tuple((
+        take_bits(4u8),
+        take_bits(1u8),
+        take_bits(2u8),
+        take_bits(1u8),
+    )))(i)
 }
 
 #[inline]
@@ -153,11 +145,11 @@ fn parse_message_type(code: u8) -> MQTTTypeCode {
     match code {
         0..=15 => {
             if let Some(t) = FromPrimitive::from_u8(code) {
-                return t
+                return t;
             } else {
-                return MQTTTypeCode::UNASSIGNED
+                return MQTTTypeCode::UNASSIGNED;
             }
-        },
+        }
         _ => {
             // unreachable state in parser: we only pass values parsed from take_bits!(4u8)
             debug_validate_fail!("can't have message codes >15 from 4 bits");
@@ -166,107 +158,104 @@ fn parse_message_type(code: u8) -> MQTTTypeCode {
     }
 }
 
-named!(#[inline], pub parse_fixed_header<FixedHeader>,
-       do_parse!(
-           flags: parse_fixed_header_flags
-           >> remaining_length: parse_mqtt_variable_integer
-           >>  (
-                 FixedHeader {
-                   message_type: parse_message_type(flags.0),
-                   dup_flag: flags.1 != 0,
-                   qos_level: flags.2 as u8,
-                   retain: flags.3 != 0,
-                   remaining_length: remaining_length,
-                 }
-               )
-       ));
+#[inline]
+pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> {
+    let (i, flags) = parse_fixed_header_flags(i)?;
+    let (i, remaining_length) = parse_mqtt_variable_integer(i)?;
+    Ok((
+        i,
+        FixedHeader {
+            message_type: parse_message_type(flags.0),
+            dup_flag: flags.1 != 0,
+            qos_level: flags.2 as u8,
+            retain: flags.3 != 0,
+            remaining_length,
+        },
+    ))
+}
 
 #[inline]
 fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> {
-    bits!(
+    bits::<_, _, Error<(&[u8], usize)>, _, _>(tuple((
+        take_bits(1u8),
+        take_bits(1u8),
+        take_bits(1u8),
+        take_bits(2u8),
+        take_bits(1u8),
+        take_bits(1u8),
+        take_bits(1u8),
+    )))(i)
+}
+
+#[inline]
+pub fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> {
+    let (i, protocol_string) = parse_mqtt_string(i)?;
+    let (i, protocol_version) = be_u8(i)?;
+    let (i, flags) = parse_connect_variable_flags(i)?;
+    let (i, keepalive) = be_u16(i)?;
+    let (i, properties) = parse_properties(i, protocol_version == 5)?;
+    let (i, client_id) = parse_mqtt_string(i)?;
+    let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?;
+    let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?;
+    let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?;
+    let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?;
+    let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?;
+    Ok((
+        i,
+        MQTTConnectData {
+            protocol_string,
+            protocol_version,
+            username_flag: flags.0 != 0,
+            password_flag: flags.1 != 0,
+            will_retain: flags.2 != 0,
+            will_qos: flags.3 as u8,
+            will_flag: flags.4 != 0,
+            clean_session: flags.5 != 0,
+            keepalive,
+            client_id,
+            will_topic,
+            will_message,
+            username,
+            password,
+            properties,
+            will_properties,
+        },
+    ))
+}
+
+pub fn parse_connack(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTConnackData> {
+    let (i, topic_name_compression_response) = be_u8(i)?;
+    let (i, return_code) = be_u8(i)?;
+    let (i, properties) = parse_properties(i, protocol_version == 5)?;
+    Ok((
         i,
-        tuple!(
-            take_bits!(1u8),
-            take_bits!(1u8),
-            take_bits!(1u8),
-            take_bits!(2u8),
-            take_bits!(1u8),
-            take_bits!(1u8),
-            take_bits!(1u8)
-        )
-    )
+        MQTTConnackData {
+            session_present: (topic_name_compression_response & 1) != 0,
+            return_code,
+            properties,
+        },
+    ))
 }
 
-named!(#[inline], pub parse_connect<MQTTConnectData>,
-       do_parse!(
-           protocol_string: parse_mqtt_string
-           >> protocol_version: be_u8
-           >> flags: parse_connect_variable_flags
-           >> keepalive: be_u16
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >> client_id: parse_mqtt_string
-           >> will_properties: call!(parse_properties, protocol_version == 5 && flags.4 != 0)
-           >> will_topic: cond!(flags.4 != 0, parse_mqtt_string)
-           >> will_message: cond!(flags.4 != 0, parse_mqtt_binary_data)
-           >> username: cond!(flags.0 != 0, parse_mqtt_string)
-           >> password: cond!(flags.1 != 0, parse_mqtt_binary_data)
-           >>  (
-                 MQTTConnectData {
-                   protocol_string: protocol_string,
-                   protocol_version: protocol_version,
-                   username_flag: flags.0 != 0,
-                   password_flag: flags.1 != 0,
-                   will_retain: flags.2 != 0,
-                   will_qos: flags.3 as u8,
-                   will_flag: flags.4  != 0,
-                   clean_session: flags.5 != 0,
-                   keepalive: keepalive,
-                   client_id: client_id,
-                   will_topic: will_topic,
-                   will_message: will_message,
-                   username: username,
-                   password: password,
-                   properties: properties,
-                   will_properties: will_properties,
-                 }
-               )
-       ));
-
-named_args!(pub parse_connack(protocol_version: u8)<MQTTConnackData>,
-       do_parse!(
-           topic_name_compression_response: be_u8
-           >> retcode: be_u8
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >>  (
-                 MQTTConnackData {
-                   session_present: (topic_name_compression_response & 1) != 0,
-                   return_code: retcode,
-                   properties: properties,
-                 }
-               )
-       ));
-
-named_args!(pub parse_publish(protocol_version: u8, has_id: bool)<MQTTPublishData>,
-       do_parse!(
-           topic: parse_mqtt_string
-           >> message_id: cond!(has_id, be_u16)
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >> message: rest
-           >>  (
-                 MQTTPublishData {
-                   topic: topic,
-                   message_id: message_id,
-                   message: message.to_vec(),
-                   properties: properties,
-                 }
-               )
-       ));
+pub fn parse_publish(
+    i: &[u8], protocol_version: u8, has_id: bool,
+) -> IResult<&[u8], MQTTPublishData> {
+    let (i, topic) = parse_mqtt_string(i)?;
+    let (i, message_id) = cond(has_id, be_u16)(i)?;
+    let (message, properties) = parse_properties(i, protocol_version == 5)?;
+    Ok((
+        i,
+        MQTTPublishData {
+            topic,
+            message_id,
+            message: message.to_vec(),
+            properties,
+        },
+    ))
+}
 
 #[inline]
-fn parse_msgidonly(
-    input: &[u8],
-    protocol_version: u8,
-) -> IResult<&[u8], MQTTMessageIdOnly> {
+fn parse_msgidonly(input: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTMessageIdOnly> {
     if protocol_version < 5 {
         // before v5 we don't even have to care about reason codes
         // and properties, lucky us
@@ -283,7 +272,7 @@ fn parse_msgidonly(
                 return Ok((
                     rem,
                     MQTTMessageIdOnly {
-                        message_id: message_id,
+                        message_id,
                         reason_code: Some(0),
                         properties: None,
                     },
@@ -298,7 +287,7 @@ fn parse_msgidonly(
                         return Ok((
                             rem,
                             MQTTMessageIdOnly {
-                                message_id: message_id,
+                                message_id,
                                 reason_code: Some(reason_code),
                                 properties: None,
                             },
@@ -309,9 +298,9 @@ fn parse_msgidonly(
                             return Ok((
                                 rem,
                                 MQTTMessageIdOnly {
-                                    message_id: message_id,
+                                    message_id,
                                     reason_code: Some(reason_code),
-                                    properties: properties,
+                                    properties,
                                 },
                             ));
                         }
@@ -325,91 +314,84 @@ fn parse_msgidonly(
     }
 }
 
-named!(#[inline], pub parse_msgidonly_v3<MQTTMessageIdOnly>,
-       do_parse!(
-           message_id: be_u16
-           >>  (
-                 MQTTMessageIdOnly {
-                   message_id: message_id,
-                   reason_code: None,
-                   properties: None,
-                 }
-               )
-       ));
-
-named!(#[inline], pub parse_subscribe_topic<MQTTSubscribeTopicData>,
-       do_parse!(
-           topic: parse_mqtt_string
-           >> qos: be_u8
-           >>  (
-                 MQTTSubscribeTopicData {
-                   topic_name: topic,
-                   qos: qos,
-                 }
-               )
-       ));
-
-named_args!(pub parse_subscribe(protocol_version: u8)<MQTTSubscribeData>,
-       do_parse!(
-           message_id: be_u16
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >> topics: many1!(complete!(parse_subscribe_topic))
-           >>  (
-                 MQTTSubscribeData {
-                   message_id: message_id,
-                   topics: topics,
-                   properties: properties,
-                 }
-               )
-       ));
-
-named_args!(pub parse_suback(protocol_version: u8)<MQTTSubackData>,
-       do_parse!(
-           message_id: be_u16
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >> qoss: rest
-           >>  (
-                 MQTTSubackData {
-                   message_id: message_id,
-                   qoss: qoss.to_vec(),
-                   properties: properties,
-                 }
-               )
-       ));
-
-named_args!(pub parse_unsubscribe(protocol_version: u8)<MQTTUnsubscribeData>,
-       do_parse!(
-           message_id: be_u16
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >> topics: many0!(complete!(parse_mqtt_string))
-           >>  (
-                 MQTTUnsubscribeData {
-                   message_id: message_id,
-                   topics: topics,
-                   properties: properties,
-                 }
-               )
-       ));
-
-named_args!(pub parse_unsuback(protocol_version: u8)<MQTTUnsubackData>,
-       do_parse!(
-           message_id: be_u16
-           >> properties: call!(parse_properties, protocol_version == 5)
-           >> reason_codes: many0!(complete!(be_u8))
-           >>  (
-                 MQTTUnsubackData {
-                   message_id: message_id,
-                   properties: properties,
-                   reason_codes: Some(reason_codes),
-                 }
-               )
-       ));
+#[inline]
+pub fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
+    let (i, message_id) = be_u16(i)?;
+    Ok((
+        i,
+        MQTTMessageIdOnly {
+            message_id,
+            reason_code: None,
+            properties: None,
+        },
+    ))
+}
+
+#[inline]
+pub fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> {
+    let (i, topic_name) = parse_mqtt_string(i)?;
+    let (i, qos) = be_u8(i)?;
+    Ok((i, MQTTSubscribeTopicData { topic_name, qos }))
+}
+
+pub fn parse_subscribe(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTSubscribeData> {
+    let (i, message_id) = be_u16(i)?;
+    let (i, properties) = parse_properties(i, protocol_version == 5)?;
+    let (i, topics) = many1(complete(parse_subscribe_topic))(i)?;
+    Ok((
+        i,
+        MQTTSubscribeData {
+            message_id,
+            topics,
+            properties,
+        },
+    ))
+}
+
+pub fn parse_suback(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTSubackData> {
+    let (i, message_id) = be_u16(i)?;
+    let (qoss, properties) = parse_properties(i, protocol_version == 5)?;
+    Ok((
+        i,
+        MQTTSubackData {
+            message_id,
+            qoss: qoss.to_vec(),
+            properties,
+        },
+    ))
+}
+
+pub fn parse_unsubscribe(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTUnsubscribeData> {
+    let (i, message_id) = be_u16(i)?;
+    let (i, properties) = parse_properties(i, protocol_version == 5)?;
+    let (i, topics) = many0(complete(parse_mqtt_string))(i)?;
+    Ok((
+        i,
+        MQTTUnsubscribeData {
+            message_id,
+            topics,
+            properties,
+        },
+    ))
+}
+
+pub fn parse_unsuback(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTUnsubackData> {
+    let (i, message_id) = be_u16(i)?;
+    let (i, properties) = parse_properties(i, protocol_version == 5)?;
+    let (i, reason_codes) = many0(complete(be_u8))(i)?;
+    Ok((
+        i,
+        MQTTUnsubackData {
+            message_id,
+            properties,
+            reason_codes: Some(reason_codes),
+        },
+    ))
+}
 
 #[inline]
 fn parse_disconnect(
-    input: &[u8],
-    remaining_len: usize,
-    protocol_version: u8,
+    input: &[u8], remaining_len: usize, protocol_version: u8,
 ) -> IResult<&[u8], MQTTDisconnectData> {
     if protocol_version < 5 {
         return Ok((
@@ -452,7 +434,7 @@ fn parse_disconnect(
                         rem,
                         MQTTDisconnectData {
                             reason_code: Some(reason_code),
-                            properties: properties,
+                            properties,
                         },
                     ));
                 }
@@ -463,19 +445,22 @@ fn parse_disconnect(
     }
 }
 
-named!(#[inline], pub parse_auth<MQTTAuthData>,
-       do_parse!(
-           reason_code: be_u8
-           >> properties: call!(parse_properties, true)
-           >>  (
-                 MQTTAuthData {
-                   reason_code: reason_code,
-                   properties: properties,
-                 }
-               )
-       ));
-
-pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> IResult<&[u8], MQTTMessage> {
+#[inline]
+pub fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> {
+    let (i, reason_code) = be_u8(i)?;
+    let (i, properties) = parse_properties(i, true)?;
+    Ok((
+        i,
+        MQTTAuthData {
+            reason_code,
+            properties,
+        },
+    ))
+}
+
+pub fn parse_message(
+    input: &[u8], protocol_version: u8, max_msg_size: usize,
+) -> IResult<&[u8], MQTTMessage> {
     // Parse the fixed header first. This is identical across versions and can
     // be between 2 and 5 bytes long.
     match parse_fixed_header(input) {
@@ -494,7 +479,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
             // type.
             if len > max_msg_size {
                 let msg = MQTTMessage {
-                    header: header,
+                    header,
                     op: MQTTOperation::TRUNCATED(MQTTTruncatedData {
                         original_message_type: message_type,
                         skipped_length: len + skiplen,
@@ -510,7 +495,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
             // have enough data in the input buffer to handle the full
             // message. Signal this by returning an Incomplete IResult value.
             if fullrem.len() < len {
-                return Err(Err::Incomplete(Needed::Size(len - fullrem.len())));
+                return Err(Err::Incomplete(Needed::new(len - fullrem.len())));
             }
 
             // Parse the contents of the buffer into a single message.
@@ -523,119 +508,122 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
                 MQTTTypeCode::CONNECT => match parse_connect(rem) {
                     Ok((_rem, conn)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::CONNECT(conn),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
                 MQTTTypeCode::CONNACK => match parse_connack(rem, protocol_version) {
                     Ok((_rem, connack)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::CONNACK(connack),
                         };
-                        Ok((&input[skiplen+len..], msg))
-                    }
-                    Err(e) => Err(e),
-                },
-                MQTTTypeCode::PUBLISH => match parse_publish(rem, protocol_version, header.qos_level > 0) {
-                    Ok((_rem, publish)) => {
-                        let msg = MQTTMessage {
-                            header: header,
-                            op: MQTTOperation::PUBLISH(publish),
-                        };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
-                MQTTTypeCode::PUBACK | MQTTTypeCode::PUBREC | MQTTTypeCode::PUBREL | MQTTTypeCode::PUBCOMP => {
-                    match parse_msgidonly(rem, protocol_version) {
-                        Ok((_rem, msgidonly)) => {
+                MQTTTypeCode::PUBLISH => {
+                    match parse_publish(rem, protocol_version, header.qos_level > 0) {
+                        Ok((_rem, publish)) => {
                             let msg = MQTTMessage {
-                                header: header,
-                                op: match message_type {
-                                    MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
-                                    MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
-                                    MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
-                                    MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
-                                    _ => MQTTOperation::UNASSIGNED,
-                                },
+                                header,
+                                op: MQTTOperation::PUBLISH(publish),
                             };
-                            Ok((&input[skiplen+len..], msg))
+                            Ok((&input[skiplen + len..], msg))
                         }
                         Err(e) => Err(e),
                     }
+                }
+                MQTTTypeCode::PUBACK
+                | MQTTTypeCode::PUBREC
+                | MQTTTypeCode::PUBREL
+                | MQTTTypeCode::PUBCOMP => match parse_msgidonly(rem, protocol_version) {
+                    Ok((_rem, msgidonly)) => {
+                        let msg = MQTTMessage {
+                            header,
+                            op: match message_type {
+                                MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
+                                MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
+                                MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
+                                MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
+                                _ => MQTTOperation::UNASSIGNED,
+                            },
+                        };
+                        Ok((&input[skiplen + len..], msg))
+                    }
+                    Err(e) => Err(e),
                 },
                 MQTTTypeCode::SUBSCRIBE => match parse_subscribe(rem, protocol_version) {
                     Ok((_rem, subs)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::SUBSCRIBE(subs),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
                 MQTTTypeCode::SUBACK => match parse_suback(rem, protocol_version) {
                     Ok((_rem, suback)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::SUBACK(suback),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
                 MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(rem, protocol_version) {
                     Ok((_rem, unsub)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::UNSUBSCRIBE(unsub),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
                 MQTTTypeCode::UNSUBACK => match parse_unsuback(rem, protocol_version) {
                     Ok((_rem, unsuback)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::UNSUBACK(unsuback),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
                 MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => {
                     let msg = MQTTMessage {
-                        header: header,
+                        header,
                         op: match message_type {
                             MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ,
                             MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP,
                             _ => MQTTOperation::UNASSIGNED,
                         },
                     };
-                    return Ok((&input[skiplen+len..], msg));
+                    return Ok((&input[skiplen + len..], msg));
                 }
                 MQTTTypeCode::DISCONNECT => match parse_disconnect(rem, len, protocol_version) {
                     Ok((_rem, disco)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::DISCONNECT(disco),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
                 MQTTTypeCode::AUTH => match parse_auth(rem) {
                     Ok((_rem, auth)) => {
                         let msg = MQTTMessage {
-                            header: header,
+                            header,
                             op: MQTTOperation::AUTH(auth),
                         };
-                        Ok((&input[skiplen+len..], msg))
+                        Ok((&input[skiplen + len..], msg))
                     }
                     Err(e) => Err(e),
                 },
@@ -644,7 +632,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
                 // crafted MQTT traffic.
                 _ => {
                     let msg = MQTTMessage {
-                        header: header,
+                        header,
                         op: MQTTOperation::UNASSIGNED,
                     };
                     return Ok((&rem[len..], msg));
@@ -660,6 +648,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
 #[cfg(test)]
 mod tests {
     use super::*;
+    use nom7::error::ErrorKind;
 
     fn test_mqtt_parse_variable_fail(buf0: &[u8]) {
         let r0 = parse_mqtt_variable_integer(buf0);
@@ -668,7 +657,7 @@ mod tests {
                 panic!("Result should not have been ok.");
             }
             Err(Err::Error(err)) => {
-                assert_eq!(err.1, error::ErrorKind::Verify);
+                assert_eq!(err.code, ErrorKind::Verify);
             }
             _ => {
                 panic!("Result should be an error.");