]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
mqtt: fix logic when setting event
authorPhilippe Antoine <pantoine@oisf.net>
Tue, 17 Oct 2023 08:26:57 +0000 (10:26 +0200)
committerVictor Julien <victor@inliniac.net>
Tue, 30 Jan 2024 08:35:16 +0000 (09:35 +0100)
Especially sets transactions to complete when we get a response
without having seen the request, so that the transactions
end up getting cleaned (instead of living/leaking in the state).

Also try to set the event on the relevant transaction, instead
of creating a new transaction just for the purpose of having
the event.

Ticket: #6299

rust/src/mqtt/mqtt.rs

index 7f60e2a757cd3ba4f7594d05527094edf6b22f7e..fbf03e19af69883ca5e1ea8fe617fc052d17c0f4 100644 (file)
@@ -183,11 +183,11 @@ impl MQTTState {
     }
 
     fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
-       let direction = if toclient {
-           Direction::ToClient
-       } else {
-           Direction::ToServer
-       };
+        let direction = if toclient {
+            Direction::ToClient
+        } else {
+            Direction::ToServer
+        };
         let mut tx = MQTTTransaction::new(msg, direction);
         self.tx_id += 1;
         tx.tx_id = self.tx_id;
@@ -217,104 +217,82 @@ impl MQTTState {
         match msg.op {
             MQTTOperation::CONNECT(ref conn) => {
                 self.protocol_version = conn.protocol_version;
+                let mut tx = self.new_tx(msg, toclient);
+                tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
                 if self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
-                    self.transactions.push_back(tx);
-                } else {
-                    let mut tx = self.new_tx(msg, toclient);
-                    tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
-                    self.transactions.push_back(tx);
                 }
+                self.transactions.push_back(tx);
             }
             MQTTOperation::PUBLISH(ref publish) => {
-                if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
-                    MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
-                }
-                match msg.header.qos_level {
+                let qos = msg.header.qos_level;
+                let pkt_id = publish.message_id;
+                let mut tx = self.new_tx(msg, toclient);
+                match qos {
                     0 => {
                         // with QOS level 0, we do not need to wait for a
                         // response
-                        let mut tx = self.new_tx(msg, toclient);
                         tx.complete = true;
-                        self.transactions.push_back(tx);
                     }
                     1..=2 => {
-                        if let Some(pkt_id) = publish.message_id {
-                            let mut tx = self.new_tx(msg, toclient);
+                        if let Some(pkt_id) = pkt_id {
                             tx.pkt_id = Some(pkt_id as u32);
-                            self.transactions.push_back(tx);
                         } else {
-                            let mut tx = self.new_tx(msg, toclient);
                             MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
-                            self.transactions.push_back(tx);
                         }
                     }
                     _ => {
-                        let mut tx = self.new_tx(msg, toclient);
                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
-                        self.transactions.push_back(tx);
                     }
                 }
-            }
-            MQTTOperation::SUBSCRIBE(ref subscribe) => {
                 if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
                 }
+                self.transactions.push_back(tx);
+            }
+            MQTTOperation::SUBSCRIBE(ref subscribe) => {
                 let pkt_id = subscribe.message_id as u32;
-                match msg.header.qos_level {
+                let qos = msg.header.qos_level;
+                let mut tx = self.new_tx(msg, toclient);
+                match qos {
                     0 => {
                         // with QOS level 0, we do not need to wait for a
                         // response
-                        let mut tx = self.new_tx(msg, toclient);
                         tx.complete = true;
-                        self.transactions.push_back(tx);
                     }
                     1..=2 => {
-                        let mut tx = self.new_tx(msg, toclient);
                         tx.pkt_id = Some(pkt_id);
-                        self.transactions.push_back(tx);
                     }
                     _ => {
-                        let mut tx = self.new_tx(msg, toclient);
                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
-                        self.transactions.push_back(tx);
                     }
                 }
-            }
-            MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
                 if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
                 }
+                self.transactions.push_back(tx);
+            }
+            MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
                 let pkt_id = unsubscribe.message_id as u32;
-                match msg.header.qos_level {
+                let qos = msg.header.qos_level;
+                let mut tx = self.new_tx(msg, toclient);
+                match qos {
                     0 => {
                         // with QOS level 0, we do not need to wait for a
                         // response
-                        let mut tx = self.new_tx(msg, toclient);
                         tx.complete = true;
-                        self.transactions.push_back(tx);
                     }
                     1..=2 => {
-                        let mut tx = self.new_tx(msg, toclient);
                         tx.pkt_id = Some(pkt_id);
-                        self.transactions.push_back(tx);
                     }
                     _ => {
-                        let mut tx = self.new_tx(msg, toclient);
                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
-                        self.transactions.push_back(tx);
                     }
                 }
+                if !self.connected {
+                    MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+                }
+                self.transactions.push_back(tx);
             }
             MQTTOperation::CONNACK(ref _connack) => {
                 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
@@ -325,31 +303,24 @@ impl MQTTState {
                 } else {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
+                    tx.complete = true;
                     self.transactions.push_back(tx);
                 }
             }
             MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
-                if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
-                    MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
-                }
                 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
                     tx.msg.push(msg);
                 } else {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
+                    if !self.connected {
+                        MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+                    }
+                    tx.complete = true;
                     self.transactions.push_back(tx);
                 }
             }
             MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
-                if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
-                    MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
-                }
                 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
                     tx.msg.push(msg);
                     tx.complete = true;
@@ -357,16 +328,14 @@ impl MQTTState {
                 } else {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
+                    if !self.connected {
+                        MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+                    }
+                    tx.complete = true;
                     self.transactions.push_back(tx);
                 }
             }
             MQTTOperation::SUBACK(ref suback) => {
-                if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
-                    MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
-                }
                 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
                     tx.msg.push(msg);
                     tx.complete = true;
@@ -374,16 +343,14 @@ impl MQTTState {
                 } else {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
+                    if !self.connected {
+                        MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+                    }
+                    tx.complete = true;
                     self.transactions.push_back(tx);
                 }
             }
             MQTTOperation::UNSUBACK(ref unsuback) => {
-                if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
-                    MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
-                }
                 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
                     tx.msg.push(msg);
                     tx.complete = true;
@@ -391,6 +358,10 @@ impl MQTTState {
                 } else {
                     let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
+                    if !self.connected {
+                        MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+                    }
+                    tx.complete = true;
                     self.transactions.push_back(tx);
                 }
             }
@@ -406,25 +377,19 @@ impl MQTTState {
                 self.transactions.push_back(tx);
             }
             MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
+                let mut tx = self.new_tx(msg, toclient);
+                tx.complete = true;
                 if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
                 }
-                let mut tx = self.new_tx(msg, toclient);
-                tx.complete = true;
                 self.transactions.push_back(tx);
             }
             MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
+                let mut tx = self.new_tx(msg, toclient);
+                tx.complete = true;
                 if !self.connected {
-                    let mut tx = self.new_tx(msg, toclient);
                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
-                    self.transactions.push_back(tx);
-                    return;
                 }
-                let mut tx = self.new_tx(msg, toclient);
-                tx.complete = true;
                 self.transactions.push_back(tx);
             }
         }
@@ -608,7 +573,11 @@ impl MQTTState {
     }
 
     fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
-        let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer });
+        let mut tx = MQTTTransaction::new_empty(if toclient {
+            Direction::ToClient
+        } else {
+            Direction::ToServer
+        });
         self.tx_id += 1;
         tx.tx_id = self.tx_id;
         if toclient {