use crate::applayer::{self, LoggerFlags};
use crate::conf::conf_get;
use crate::core::*;
+use crate::frames::*;
use nom7::Err;
use std;
use std::collections::VecDeque;
static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
+#[derive(AppLayerFrameType)]
+pub enum MQTTFrameType {
+ Pdu,
+ Header,
+ Data,
+}
+
#[derive(FromPrimitive, Debug, AppLayerEvent)]
pub enum MQTTEvent {
MissingConnect,
}
}
- fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
+ fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
+ let input = stream_slice.as_slice();
let mut current = input;
+
if input.is_empty() {
return AppLayerResult::ok();
}
SCLogDebug!("request: handling {}", current.len());
match parse_message(current, self.protocol_version, self.max_msg_len) {
Ok((rem, msg)) => {
+ let _pdu = Frame::new(
+ flow,
+ &stream_slice,
+ input,
+ current.len() as i64,
+ MQTTFrameType::Pdu as u8,
+ );
SCLogDebug!("request msg {:?}", msg);
if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
SCLogDebug!(
continue;
}
}
+
+ self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg);
self.handle_msg(msg, false);
consumed += current.len() - rem.len();
current = rem;
return AppLayerResult::ok();
}
- fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
+ fn parse_response(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
+ let input = stream_slice.as_slice();
let mut current = input;
+
if input.is_empty() {
return AppLayerResult::ok();
}
SCLogDebug!("response: handling {}", current.len());
match parse_message(current, self.protocol_version, self.max_msg_len) {
Ok((rem, msg)) => {
+ let _pdu = Frame::new(
+ flow,
+ &stream_slice,
+ input,
+ input.len() as i64,
+ MQTTFrameType::Pdu as u8,
+ );
+
SCLogDebug!("response msg {:?}", msg);
if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
SCLogDebug!(
continue;
}
}
+
+ self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg);
self.handle_msg(msg, true);
consumed += current.len() - rem.len();
current = rem;
tx.tx_data.set_event(event as u8);
self.transactions.push_back(tx);
}
+
+ fn mqtt_hdr_and_data_frames(&mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTMessage) {
+ let hdr = stream_slice.as_slice();
+ //MQTT payload has a fixed header of 2 bytes
+ let _mqtt_hdr = Frame::new(flow, stream_slice, hdr, 2, MQTTFrameType::Header as u8);
+ SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr);
+ let rem_length = input.header.remaining_length as usize;
+ let data = &hdr[2..rem_length + 2];
+ let _mqtt_data = Frame::new(
+ flow,
+ stream_slice,
+ data,
+ rem_length as i64,
+ MQTTFrameType::Data as u8,
+ );
+ SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data);
+ }
}
// C exports.
#[no_mangle]
pub unsafe extern "C" fn rs_mqtt_parse_request(
- _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
+ flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
) -> AppLayerResult {
let state = cast_pointer!(state, MQTTState);
- return state.parse_request(stream_slice.as_slice());
+ return state.parse_request(flow, stream_slice);
}
#[no_mangle]
pub unsafe extern "C" fn rs_mqtt_parse_response(
- _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
+ flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
) -> AppLayerResult {
let state = cast_pointer!(state, MQTTState);
- return state.parse_response(stream_slice.as_slice());
+ return state.parse_response(flow, stream_slice);
}
#[no_mangle]
apply_tx_config: None,
flags: 0,
truncate: None,
- get_frame_id_by_name: None,
- get_frame_name_by_id: None,
+ get_frame_id_by_name: Some(MQTTFrameType::ffi_id_from_name),
+ get_frame_name_by_id: Some(MQTTFrameType::ffi_name_from_id),
};
let ip_proto_str = CString::new("tcp").unwrap();
} else {
SCLogDebug!("Protocol detector and parser disabled for MQTT.");
}
-}
+}
\ No newline at end of file