use std;
use std::ffi::{CStr,CString};
use std::mem::transmute;
+use std::collections::VecDeque;
// Used as a special pseudo packet identifier to denote the first CONNECT
// packet in a connection. Note that there is no risk of collision with a
pub struct MQTTState {
tx_id: u64,
pub protocol_version: u8,
- transactions: Vec<MQTTTransaction>,
+ transactions: VecDeque<MQTTTransaction>,
connected: bool,
skip_request: usize,
skip_response: usize,
Self {
tx_id: 0,
protocol_version: 0,
- transactions: Vec::new(),
+ transactions: VecDeque::new(),
connected: false,
skip_request: 0,
skip_response: 0,
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id as u32);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
}
},
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
let pkt_id = subscribe.message_id as u32;
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
}
},
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
let pkt_id = unsubscribe.message_id as u32;
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
}
},
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
MQTTOperation::PUBREC(ref v)
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
MQTTOperation::PUBACK(ref v)
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
},
MQTTOperation::UNASSIGNED => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgtype);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
MQTTOperation::TRUNCATED(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
MQTTOperation::AUTH(_)
| MQTTOperation::DISCONNECT(_) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
},
MQTTOperation::PINGREQ
| MQTTOperation::PINGRESP => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
}
}
}
tx.complete = true;
MQTTState::set_event(&mut tx, event);
- self.transactions.push(tx);
+ self.transactions.push_back(tx);
}
}