From: Pierre Chifflier Date: Sat, 30 Oct 2021 14:30:04 +0000 (+0200) Subject: rust/mqtt: convert parser to nom7 functions X-Git-Tag: suricata-7.0.0-beta1~1269 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8a584c211ea10fe6721b26d3a39364764f39101e;p=thirdparty%2Fsuricata.git rust/mqtt: convert parser to nom7 functions --- diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs index 9e5d198f42..91564838cf 100644 --- a/rust/src/mqtt/mqtt.rs +++ b/rust/src/mqtt/mqtt.rs @@ -22,7 +22,7 @@ use super::parser::*; use crate::applayer::{self, LoggerFlags}; use crate::applayer::*; use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP}; -use nom; +use nom7::Err; use std; use std::ffi::CString; @@ -446,7 +446,7 @@ impl MQTTState { consumed += current.len() - rem.len(); current = rem; } - Err(nom::Err::Incomplete(_)) => { + Err(Err::Incomplete(_)) => { SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); } @@ -503,7 +503,7 @@ impl MQTTState { consumed += current.len() - rem.len(); current = rem; } - Err(nom::Err::Incomplete(_)) => { + Err(Err::Incomplete(_)) => { SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); } @@ -569,7 +569,7 @@ pub unsafe extern "C" fn rs_mqtt_probing_parser( } return ALPROTO_MQTT; }, - Err(nom::Err::Incomplete(_)) => ALPROTO_UNKNOWN, + Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN, Err(_) => ALPROTO_FAILED } } diff --git a/rust/src/mqtt/mqtt_property.rs b/rust/src/mqtt/mqtt_property.rs index c407349f60..789d81ee14 100644 --- a/rust/src/mqtt/mqtt_property.rs +++ b/rust/src/mqtt/mqtt_property.rs @@ -1,4 +1,3 @@ - /* Copyright (C) 2020 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of @@ -18,10 +17,10 @@ // written by Sascha Steinbiss -use crate::mqtt::parser::*; use crate::jsonbuilder::{JsonBuilder, JsonError}; -use nom::number::streaming::*; -use nom::*; +use crate::mqtt::parser::*; +use nom7::number::streaming::*; +use nom7::IResult; // TODO: It might be useful to also add detection on property presence and // content, e.g. mqtt.property: AUTHENTICATION_METHOD. diff --git a/rust/src/mqtt/parser.rs b/rust/src/mqtt/parser.rs index 31adb8755a..fd3937ec5d 100644 --- a/rust/src/mqtt/parser.rs +++ b/rust/src/mqtt/parser.rs @@ -19,9 +19,14 @@ use crate::mqtt::mqtt_message::*; use crate::mqtt::mqtt_property::*; -use nom::combinator::rest; -use nom::number::streaming::*; -use nom::*; +use nom7::bits::{bits, streaming::take as take_bits}; +use nom7::bytes::streaming::take_while_m_n; +use nom7::combinator::{complete, cond, verify}; +use nom7::error::Error; +use nom7::multi::{length_data, many0, many1}; +use nom7::number::streaming::*; +use nom7::sequence::tuple; +use nom7::{Err, IResult, Needed}; use num_traits::FromPrimitive; #[derive(Debug)] @@ -54,53 +59,43 @@ fn convert_varint(continued: Vec, last: u8) -> u32 { // DATA TYPES -named!(#[inline], pub parse_mqtt_string, - do_parse!( - length: be_u16 - >> content: take!(length) - >> ( - String::from_utf8_lossy(content).to_string() - ) - )); - -named!(#[inline], pub parse_mqtt_variable_integer, - do_parse!( - // take at most 4 bytes in total, so as not to overflow u32 - continued_part: take_while_m_n!(0, 3, is_continuation_bit_set) - >> non_continued_part: verify!(be_u8, |&val| !is_continuation_bit_set(val)) - >> ( - convert_varint(continued_part.to_vec(), non_continued_part) - ) - )); - -named!(#[inline], pub parse_mqtt_binary_data>, - do_parse!( - length: be_u16 - >> data: take!(length) - >> ( - data.to_vec() - ) - )); - -named!(#[inline], pub parse_mqtt_string_pair<(String, String)>, - do_parse!( - name: parse_mqtt_string - >> value: parse_mqtt_string - >> ( - (name, value) - ) - )); +#[inline] +pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> { + let (i, content) = length_data(be_u16)(i)?; + Ok((i, String::from_utf8_lossy(content).to_string())) +} + +#[inline] +pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> { + let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?; + let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?; + Ok(( + i, + convert_varint(continued_part.to_vec(), non_continued_part), + )) +} + +#[inline] +pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec> { + let (i, data) = length_data(be_u16)(i)?; + Ok((i, data.to_vec())) +} + +#[inline] +pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> { + let (i, name) = parse_mqtt_string(i)?; + let (i, value) = parse_mqtt_string(i)?; + Ok((i, (name, value))) +} // MESSAGE COMPONENTS -named!(#[inline], pub parse_property, - do_parse!( - identifier: parse_mqtt_variable_integer - >> value: call!(parse_qualified_property, identifier) - >> ( - value - ) - )); +#[inline] +pub fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> { + let (i, identifier) = parse_mqtt_variable_integer(i)?; + let (i, value) = parse_qualified_property(i, identifier)?; + Ok((i, value)) +} #[inline] fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option>> { @@ -137,15 +132,12 @@ fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option IResult<&[u8], (u8, u8, u8, u8)> { - bits!( - i, - tuple!( - take_bits!(4u8), - take_bits!(1u8), - take_bits!(2u8), - take_bits!(1u8) - ) - ) + bits::<_, _, Error<(&[u8], usize)>, _, _>(tuple(( + take_bits(4u8), + take_bits(1u8), + take_bits(2u8), + take_bits(1u8), + )))(i) } #[inline] @@ -153,11 +145,11 @@ fn parse_message_type(code: u8) -> MQTTTypeCode { match code { 0..=15 => { if let Some(t) = FromPrimitive::from_u8(code) { - return t + return t; } else { - return MQTTTypeCode::UNASSIGNED + return MQTTTypeCode::UNASSIGNED; } - }, + } _ => { // unreachable state in parser: we only pass values parsed from take_bits!(4u8) debug_validate_fail!("can't have message codes >15 from 4 bits"); @@ -166,107 +158,104 @@ fn parse_message_type(code: u8) -> MQTTTypeCode { } } -named!(#[inline], pub parse_fixed_header, - do_parse!( - flags: parse_fixed_header_flags - >> remaining_length: parse_mqtt_variable_integer - >> ( - FixedHeader { - message_type: parse_message_type(flags.0), - dup_flag: flags.1 != 0, - qos_level: flags.2 as u8, - retain: flags.3 != 0, - remaining_length: remaining_length, - } - ) - )); +#[inline] +pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> { + let (i, flags) = parse_fixed_header_flags(i)?; + let (i, remaining_length) = parse_mqtt_variable_integer(i)?; + Ok(( + i, + FixedHeader { + message_type: parse_message_type(flags.0), + dup_flag: flags.1 != 0, + qos_level: flags.2 as u8, + retain: flags.3 != 0, + remaining_length, + }, + )) +} #[inline] fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> { - bits!( + bits::<_, _, Error<(&[u8], usize)>, _, _>(tuple(( + take_bits(1u8), + take_bits(1u8), + take_bits(1u8), + take_bits(2u8), + take_bits(1u8), + take_bits(1u8), + take_bits(1u8), + )))(i) +} + +#[inline] +pub fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> { + let (i, protocol_string) = parse_mqtt_string(i)?; + let (i, protocol_version) = be_u8(i)?; + let (i, flags) = parse_connect_variable_flags(i)?; + let (i, keepalive) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, client_id) = parse_mqtt_string(i)?; + let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?; + let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?; + let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?; + let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?; + let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?; + Ok(( + i, + MQTTConnectData { + protocol_string, + protocol_version, + username_flag: flags.0 != 0, + password_flag: flags.1 != 0, + will_retain: flags.2 != 0, + will_qos: flags.3 as u8, + will_flag: flags.4 != 0, + clean_session: flags.5 != 0, + keepalive, + client_id, + will_topic, + will_message, + username, + password, + properties, + will_properties, + }, + )) +} + +pub fn parse_connack(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTConnackData> { + let (i, topic_name_compression_response) = be_u8(i)?; + let (i, return_code) = be_u8(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + Ok(( i, - tuple!( - take_bits!(1u8), - take_bits!(1u8), - take_bits!(1u8), - take_bits!(2u8), - take_bits!(1u8), - take_bits!(1u8), - take_bits!(1u8) - ) - ) + MQTTConnackData { + session_present: (topic_name_compression_response & 1) != 0, + return_code, + properties, + }, + )) } -named!(#[inline], pub parse_connect, - do_parse!( - protocol_string: parse_mqtt_string - >> protocol_version: be_u8 - >> flags: parse_connect_variable_flags - >> keepalive: be_u16 - >> properties: call!(parse_properties, protocol_version == 5) - >> client_id: parse_mqtt_string - >> will_properties: call!(parse_properties, protocol_version == 5 && flags.4 != 0) - >> will_topic: cond!(flags.4 != 0, parse_mqtt_string) - >> will_message: cond!(flags.4 != 0, parse_mqtt_binary_data) - >> username: cond!(flags.0 != 0, parse_mqtt_string) - >> password: cond!(flags.1 != 0, parse_mqtt_binary_data) - >> ( - MQTTConnectData { - protocol_string: protocol_string, - protocol_version: protocol_version, - username_flag: flags.0 != 0, - password_flag: flags.1 != 0, - will_retain: flags.2 != 0, - will_qos: flags.3 as u8, - will_flag: flags.4 != 0, - clean_session: flags.5 != 0, - keepalive: keepalive, - client_id: client_id, - will_topic: will_topic, - will_message: will_message, - username: username, - password: password, - properties: properties, - will_properties: will_properties, - } - ) - )); - -named_args!(pub parse_connack(protocol_version: u8), - do_parse!( - topic_name_compression_response: be_u8 - >> retcode: be_u8 - >> properties: call!(parse_properties, protocol_version == 5) - >> ( - MQTTConnackData { - session_present: (topic_name_compression_response & 1) != 0, - return_code: retcode, - properties: properties, - } - ) - )); - -named_args!(pub parse_publish(protocol_version: u8, has_id: bool), - do_parse!( - topic: parse_mqtt_string - >> message_id: cond!(has_id, be_u16) - >> properties: call!(parse_properties, protocol_version == 5) - >> message: rest - >> ( - MQTTPublishData { - topic: topic, - message_id: message_id, - message: message.to_vec(), - properties: properties, - } - ) - )); +pub fn parse_publish( + i: &[u8], protocol_version: u8, has_id: bool, +) -> IResult<&[u8], MQTTPublishData> { + let (i, topic) = parse_mqtt_string(i)?; + let (i, message_id) = cond(has_id, be_u16)(i)?; + let (message, properties) = parse_properties(i, protocol_version == 5)?; + Ok(( + i, + MQTTPublishData { + topic, + message_id, + message: message.to_vec(), + properties, + }, + )) +} #[inline] -fn parse_msgidonly( - input: &[u8], - protocol_version: u8, -) -> IResult<&[u8], MQTTMessageIdOnly> { +fn parse_msgidonly(input: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTMessageIdOnly> { if protocol_version < 5 { // before v5 we don't even have to care about reason codes // and properties, lucky us @@ -283,7 +272,7 @@ fn parse_msgidonly( return Ok(( rem, MQTTMessageIdOnly { - message_id: message_id, + message_id, reason_code: Some(0), properties: None, }, @@ -298,7 +287,7 @@ fn parse_msgidonly( return Ok(( rem, MQTTMessageIdOnly { - message_id: message_id, + message_id, reason_code: Some(reason_code), properties: None, }, @@ -309,9 +298,9 @@ fn parse_msgidonly( return Ok(( rem, MQTTMessageIdOnly { - message_id: message_id, + message_id, reason_code: Some(reason_code), - properties: properties, + properties, }, )); } @@ -325,91 +314,84 @@ fn parse_msgidonly( } } -named!(#[inline], pub parse_msgidonly_v3, - do_parse!( - message_id: be_u16 - >> ( - MQTTMessageIdOnly { - message_id: message_id, - reason_code: None, - properties: None, - } - ) - )); - -named!(#[inline], pub parse_subscribe_topic, - do_parse!( - topic: parse_mqtt_string - >> qos: be_u8 - >> ( - MQTTSubscribeTopicData { - topic_name: topic, - qos: qos, - } - ) - )); - -named_args!(pub parse_subscribe(protocol_version: u8), - do_parse!( - message_id: be_u16 - >> properties: call!(parse_properties, protocol_version == 5) - >> topics: many1!(complete!(parse_subscribe_topic)) - >> ( - MQTTSubscribeData { - message_id: message_id, - topics: topics, - properties: properties, - } - ) - )); - -named_args!(pub parse_suback(protocol_version: u8), - do_parse!( - message_id: be_u16 - >> properties: call!(parse_properties, protocol_version == 5) - >> qoss: rest - >> ( - MQTTSubackData { - message_id: message_id, - qoss: qoss.to_vec(), - properties: properties, - } - ) - )); - -named_args!(pub parse_unsubscribe(protocol_version: u8), - do_parse!( - message_id: be_u16 - >> properties: call!(parse_properties, protocol_version == 5) - >> topics: many0!(complete!(parse_mqtt_string)) - >> ( - MQTTUnsubscribeData { - message_id: message_id, - topics: topics, - properties: properties, - } - ) - )); - -named_args!(pub parse_unsuback(protocol_version: u8), - do_parse!( - message_id: be_u16 - >> properties: call!(parse_properties, protocol_version == 5) - >> reason_codes: many0!(complete!(be_u8)) - >> ( - MQTTUnsubackData { - message_id: message_id, - properties: properties, - reason_codes: Some(reason_codes), - } - ) - )); +#[inline] +pub fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> { + let (i, message_id) = be_u16(i)?; + Ok(( + i, + MQTTMessageIdOnly { + message_id, + reason_code: None, + properties: None, + }, + )) +} + +#[inline] +pub fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> { + let (i, topic_name) = parse_mqtt_string(i)?; + let (i, qos) = be_u8(i)?; + Ok((i, MQTTSubscribeTopicData { topic_name, qos })) +} + +pub fn parse_subscribe(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTSubscribeData> { + let (i, message_id) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, topics) = many1(complete(parse_subscribe_topic))(i)?; + Ok(( + i, + MQTTSubscribeData { + message_id, + topics, + properties, + }, + )) +} + +pub fn parse_suback(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTSubackData> { + let (i, message_id) = be_u16(i)?; + let (qoss, properties) = parse_properties(i, protocol_version == 5)?; + Ok(( + i, + MQTTSubackData { + message_id, + qoss: qoss.to_vec(), + properties, + }, + )) +} + +pub fn parse_unsubscribe(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTUnsubscribeData> { + let (i, message_id) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, topics) = many0(complete(parse_mqtt_string))(i)?; + Ok(( + i, + MQTTUnsubscribeData { + message_id, + topics, + properties, + }, + )) +} + +pub fn parse_unsuback(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTUnsubackData> { + let (i, message_id) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, reason_codes) = many0(complete(be_u8))(i)?; + Ok(( + i, + MQTTUnsubackData { + message_id, + properties, + reason_codes: Some(reason_codes), + }, + )) +} #[inline] fn parse_disconnect( - input: &[u8], - remaining_len: usize, - protocol_version: u8, + input: &[u8], remaining_len: usize, protocol_version: u8, ) -> IResult<&[u8], MQTTDisconnectData> { if protocol_version < 5 { return Ok(( @@ -452,7 +434,7 @@ fn parse_disconnect( rem, MQTTDisconnectData { reason_code: Some(reason_code), - properties: properties, + properties, }, )); } @@ -463,19 +445,22 @@ fn parse_disconnect( } } -named!(#[inline], pub parse_auth, - do_parse!( - reason_code: be_u8 - >> properties: call!(parse_properties, true) - >> ( - MQTTAuthData { - reason_code: reason_code, - properties: properties, - } - ) - )); - -pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> IResult<&[u8], MQTTMessage> { +#[inline] +pub fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> { + let (i, reason_code) = be_u8(i)?; + let (i, properties) = parse_properties(i, true)?; + Ok(( + i, + MQTTAuthData { + reason_code, + properties, + }, + )) +} + +pub fn parse_message( + input: &[u8], protocol_version: u8, max_msg_size: usize, +) -> IResult<&[u8], MQTTMessage> { // Parse the fixed header first. This is identical across versions and can // be between 2 and 5 bytes long. match parse_fixed_header(input) { @@ -494,7 +479,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> // type. if len > max_msg_size { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::TRUNCATED(MQTTTruncatedData { original_message_type: message_type, skipped_length: len + skiplen, @@ -510,7 +495,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> // have enough data in the input buffer to handle the full // message. Signal this by returning an Incomplete IResult value. if fullrem.len() < len { - return Err(Err::Incomplete(Needed::Size(len - fullrem.len()))); + return Err(Err::Incomplete(Needed::new(len - fullrem.len()))); } // Parse the contents of the buffer into a single message. @@ -523,119 +508,122 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> MQTTTypeCode::CONNECT => match parse_connect(rem) { Ok((_rem, conn)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::CONNECT(conn), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, MQTTTypeCode::CONNACK => match parse_connack(rem, protocol_version) { Ok((_rem, connack)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::CONNACK(connack), }; - Ok((&input[skiplen+len..], msg)) - } - Err(e) => Err(e), - }, - MQTTTypeCode::PUBLISH => match parse_publish(rem, protocol_version, header.qos_level > 0) { - Ok((_rem, publish)) => { - let msg = MQTTMessage { - header: header, - op: MQTTOperation::PUBLISH(publish), - }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, - MQTTTypeCode::PUBACK | MQTTTypeCode::PUBREC | MQTTTypeCode::PUBREL | MQTTTypeCode::PUBCOMP => { - match parse_msgidonly(rem, protocol_version) { - Ok((_rem, msgidonly)) => { + MQTTTypeCode::PUBLISH => { + match parse_publish(rem, protocol_version, header.qos_level > 0) { + Ok((_rem, publish)) => { let msg = MQTTMessage { - header: header, - op: match message_type { - MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly), - MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly), - MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly), - MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly), - _ => MQTTOperation::UNASSIGNED, - }, + header, + op: MQTTOperation::PUBLISH(publish), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), } + } + MQTTTypeCode::PUBACK + | MQTTTypeCode::PUBREC + | MQTTTypeCode::PUBREL + | MQTTTypeCode::PUBCOMP => match parse_msgidonly(rem, protocol_version) { + Ok((_rem, msgidonly)) => { + let msg = MQTTMessage { + header, + op: match message_type { + MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly), + MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly), + MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly), + MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly), + _ => MQTTOperation::UNASSIGNED, + }, + }; + Ok((&input[skiplen + len..], msg)) + } + Err(e) => Err(e), }, MQTTTypeCode::SUBSCRIBE => match parse_subscribe(rem, protocol_version) { Ok((_rem, subs)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::SUBSCRIBE(subs), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, MQTTTypeCode::SUBACK => match parse_suback(rem, protocol_version) { Ok((_rem, suback)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::SUBACK(suback), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(rem, protocol_version) { Ok((_rem, unsub)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::UNSUBSCRIBE(unsub), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, MQTTTypeCode::UNSUBACK => match parse_unsuback(rem, protocol_version) { Ok((_rem, unsuback)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::UNSUBACK(unsuback), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => { let msg = MQTTMessage { - header: header, + header, op: match message_type { MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ, MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP, _ => MQTTOperation::UNASSIGNED, }, }; - return Ok((&input[skiplen+len..], msg)); + return Ok((&input[skiplen + len..], msg)); } MQTTTypeCode::DISCONNECT => match parse_disconnect(rem, len, protocol_version) { Ok((_rem, disco)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::DISCONNECT(disco), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, MQTTTypeCode::AUTH => match parse_auth(rem) { Ok((_rem, auth)) => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::AUTH(auth), }; - Ok((&input[skiplen+len..], msg)) + Ok((&input[skiplen + len..], msg)) } Err(e) => Err(e), }, @@ -644,7 +632,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> // crafted MQTT traffic. _ => { let msg = MQTTMessage { - header: header, + header, op: MQTTOperation::UNASSIGNED, }; return Ok((&rem[len..], msg)); @@ -660,6 +648,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> #[cfg(test)] mod tests { use super::*; + use nom7::error::ErrorKind; fn test_mqtt_parse_variable_fail(buf0: &[u8]) { let r0 = parse_mqtt_variable_integer(buf0); @@ -668,7 +657,7 @@ mod tests { panic!("Result should not have been ok."); } Err(Err::Error(err)) => { - assert_eq!(err.1, error::ErrorKind::Verify); + assert_eq!(err.code, ErrorKind::Verify); } _ => { panic!("Result should be an error.");