}
fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
- let direction = if toclient {
- Direction::ToClient
- } else {
- Direction::ToServer
- };
+ let direction = if toclient {
+ Direction::ToClient
+ } else {
+ Direction::ToServer
+ };
let mut tx = MQTTTransaction::new(msg, direction);
self.tx_id += 1;
tx.tx_id = self.tx_id;
match msg.op {
MQTTOperation::CONNECT(ref conn) => {
self.protocol_version = conn.protocol_version;
+ let mut tx = self.new_tx(msg, toclient);
+ tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
if self.connected {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
- self.transactions.push_back(tx);
- } else {
- let mut tx = self.new_tx(msg, toclient);
- tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
- self.transactions.push_back(tx);
}
+ self.transactions.push_back(tx);
}
MQTTOperation::PUBLISH(ref publish) => {
- if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
- MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
- }
- match msg.header.qos_level {
+ let qos = msg.header.qos_level;
+ let pkt_id = publish.message_id;
+ let mut tx = self.new_tx(msg, toclient);
+ match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
- let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push_back(tx);
}
1..=2 => {
- if let Some(pkt_id) = publish.message_id {
- let mut tx = self.new_tx(msg, toclient);
+ if let Some(pkt_id) = pkt_id {
tx.pkt_id = Some(pkt_id as u32);
- self.transactions.push_back(tx);
} else {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
- self.transactions.push_back(tx);
}
}
_ => {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
- self.transactions.push_back(tx);
}
}
- }
- MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
}
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::SUBSCRIBE(ref subscribe) => {
let pkt_id = subscribe.message_id as u32;
- match msg.header.qos_level {
+ let qos = msg.header.qos_level;
+ let mut tx = self.new_tx(msg, toclient);
+ match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
- let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push_back(tx);
}
1..=2 => {
- let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
- self.transactions.push_back(tx);
}
_ => {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
- self.transactions.push_back(tx);
}
}
- }
- MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
}
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
let pkt_id = unsubscribe.message_id as u32;
- match msg.header.qos_level {
+ let qos = msg.header.qos_level;
+ let mut tx = self.new_tx(msg, toclient);
+ match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
- let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push_back(tx);
}
1..=2 => {
- let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
- self.transactions.push_back(tx);
}
_ => {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
- self.transactions.push_back(tx);
}
}
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ self.transactions.push_back(tx);
}
MQTTOperation::CONNACK(ref _connack) => {
if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
+ tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
- if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
- MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
- }
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
- if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
- MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
- }
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::SUBACK(ref suback) => {
- if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
- MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
- }
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::UNSUBACK(ref unsuback) => {
- if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
- MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
- }
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
self.transactions.push_back(tx);
}
}
self.transactions.push_back(tx);
}
MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
+ let mut tx = self.new_tx(msg, toclient);
+ tx.complete = true;
if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
}
- let mut tx = self.new_tx(msg, toclient);
- tx.complete = true;
self.transactions.push_back(tx);
}
MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
+ let mut tx = self.new_tx(msg, toclient);
+ tx.complete = true;
if !self.connected {
- let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push_back(tx);
- return;
}
- let mut tx = self.new_tx(msg, toclient);
- tx.complete = true;
self.transactions.push_back(tx);
}
}
}
fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
- let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer });
+ let mut tx = MQTTTransaction::new_empty(if toclient {
+ Direction::ToClient
+ } else {
+ Direction::ToServer
+ });
self.tx_id += 1;
tx.tx_id = self.tx_id;
if toclient {