use super::mqtt_message::*;
use super::parser::*;
-use crate::applayer::*;
use crate::applayer;
+use crate::applayer::*;
use crate::conf::{conf_get, get_memval};
use crate::core::*;
use crate::direction::Direction;
use crate::flow::Flow;
use crate::frames::*;
use nom7::Err;
-use suricata_sys::sys::AppProto;
use std;
use std::collections::VecDeque;
use std::ffi::CString;
+use suricata_sys::sys::AppProto;
// Used as a special pseudo packet identifier to denote the first CONNECT
// packet in a connection. Note that there is no risk of collision with a
// without having to introduce lifetimes etc.
// This is the reason for the code duplication below. Maybe there is a
// more concise way to do it, but this works for now.
- fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
+ fn handle_msg(&mut self, flow: *const Flow, msg: MQTTMessage, toclient: bool) {
+ let tx_len = self.transactions.len();
match msg.op {
MQTTOperation::CONNECT(ref conn) => {
self.protocol_version = conn.protocol_version;
self.transactions.push_back(tx);
}
}
+ // If a new transaction was pushed, only then reassemble the data
+ if self.transactions.len() > tx_len {
+ let dir = if toclient {
+ Direction::ToClient
+ } else {
+ Direction::ToServer
+ };
+ sc_app_layer_parser_trigger_raw_stream_reassembly(flow, dir as i32);
+ }
}
fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
);
if trunc.skipped_length >= current.len() {
self.skip_request = trunc.skipped_length - current.len();
- self.handle_msg(msg, true);
+ self.handle_msg(flow, msg, true);
return AppLayerResult::ok();
} else {
consumed += trunc.skipped_length;
current = ¤t[trunc.skipped_length..];
- self.handle_msg(msg, true);
+ self.handle_msg(flow, msg, true);
self.skip_request = 0;
continue;
}
}
self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg);
- self.handle_msg(msg, false);
+ self.handle_msg(flow, msg, false);
consumed += current.len() - rem.len();
current = rem;
}
);
if trunc.skipped_length >= current.len() {
self.skip_response = trunc.skipped_length - current.len();
- self.handle_msg(msg, true);
+ self.handle_msg(flow, msg, true);
SCLogDebug!("skip_response now {}", self.skip_response);
return AppLayerResult::ok();
} else {
consumed += trunc.skipped_length;
current = ¤t[trunc.skipped_length..];
- self.handle_msg(msg, true);
+ self.handle_msg(flow, msg, true);
self.skip_response = 0;
continue;
}
}
self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg);
- self.handle_msg(msg, true);
+ self.handle_msg(flow, msg, true);
consumed += current.len() - rem.len();
current = rem;
}
}
#[no_mangle]
-pub unsafe extern "C" fn SCMqttTxIsToClient(
- tx: &MQTTTransaction,
-) -> std::os::raw::c_int {
+pub unsafe extern "C" fn SCMqttTxIsToClient(tx: &MQTTTransaction) -> std::os::raw::c_int {
if tx.toclient {
return 1;
}