]> git.ipfire.org Git - people/ms/suricata.git/blame - rust/src/mqtt/mqtt.rs
sip: set unidirection transaction flag
[people/ms/suricata.git] / rust / src / mqtt / mqtt.rs
CommitLineData
c3136007
SS
1/* Copyright (C) 2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18// written by Sascha Steinbiss <sascha@steinbiss.name>
19
20use super::mqtt_message::*;
21use super::parser::*;
22use crate::applayer::{self, LoggerFlags};
23use crate::applayer::*;
24use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
c3136007
SS
25use num_traits::FromPrimitive;
26use nom;
27use std;
28use std::ffi::{CStr,CString};
29use std::mem::transmute;
30
31// Used as a special pseudo packet identifier to denote the first CONNECT
32// packet in a connection. Note that there is no risk of collision with a
33// parsed packet identifier because in the protocol these are only 16 bit
34// unsigned.
35const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
36// Maximum message length in bytes. If the length of a message exceeds
37// this value, it will be truncated. Default: 1MB.
38static mut MAX_MSG_LEN: u32 = 1048576;
39
40static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
41
42#[derive(FromPrimitive, Debug)]
43#[repr(u32)]
44pub enum MQTTEvent {
45 MissingConnect = 0,
46 MissingPublish,
47 MissingSubscribe,
48 MissingUnsubscribe,
49 DoubleConnect,
50 UnintroducedMessage,
51 InvalidQosLevel,
52 MissingMsgId,
53 UnassignedMsgtype,
54}
55
56#[derive(Debug)]
57pub struct MQTTTransaction {
58 tx_id: u64,
59 pkt_id: Option<u32>,
60 pub msg: Vec<MQTTMessage>,
61 complete: bool,
62 toclient: bool,
63 toserver: bool,
64
65 logged: LoggerFlags,
66 de_state: Option<*mut core::DetectEngineState>,
67 events: *mut core::AppLayerDecoderEvents,
68 tx_data: applayer::AppLayerTxData,
69}
70
71impl MQTTTransaction {
72 pub fn new(msg: MQTTMessage) -> MQTTTransaction {
73 let mut m = MQTTTransaction {
74 tx_id: 0,
75 pkt_id: None,
76 complete: false,
77 logged: LoggerFlags::new(),
78 msg: Vec::new(),
79 toclient: false,
80 toserver: false,
81 de_state: None,
82 events: std::ptr::null_mut(),
83 tx_data: applayer::AppLayerTxData::new(),
84 };
85 m.msg.push(msg);
86 return m;
87 }
88
89 pub fn free(&mut self) {
90 if self.events != std::ptr::null_mut() {
91 core::sc_app_layer_decoder_events_free_events(&mut self.events);
92 }
93 if let Some(state) = self.de_state {
94 core::sc_detect_engine_state_free(state);
95 }
96 }
97}
98
99impl Drop for MQTTTransaction {
100 fn drop(&mut self) {
101 self.free();
102 }
103}
104
105pub struct MQTTState {
106 tx_id: u64,
107 pub protocol_version: u8,
108 transactions: Vec<MQTTTransaction>,
109 connected: bool,
110 skip_request: usize,
111 skip_response: usize,
112 max_msg_len: usize,
113}
114
115impl MQTTState {
116 pub fn new() -> Self {
117 Self {
118 tx_id: 0,
119 protocol_version: 0,
120 transactions: Vec::new(),
121 connected: false,
122 skip_request: 0,
123 skip_response: 0,
124 max_msg_len: unsafe { MAX_MSG_LEN as usize },
125 }
126 }
127
128 fn free_tx(&mut self, tx_id: u64) {
129 let len = self.transactions.len();
130 let mut found = false;
131 let mut index = 0;
132 for i in 0..len {
133 let tx = &self.transactions[i];
134 if tx.tx_id == tx_id + 1 {
135 found = true;
136 index = i;
137 break;
138 }
139 }
140 if found {
141 self.transactions.remove(index);
142 }
143 }
144
145 pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
146 for tx in &mut self.transactions {
147 if tx.tx_id == tx_id + 1 {
148 return Some(tx);
149 }
150 }
151 return None;
152 }
153
154 pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
155 for tx in &mut self.transactions {
156 if !tx.complete {
157 if let Some(mpktid) = tx.pkt_id {
158 if mpktid == pkt_id {
159 return Some(tx);
160 }
161 }
162 }
163 }
164 return None;
165 }
166
167 fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
168 let mut tx = MQTTTransaction::new(msg);
169 self.tx_id += 1;
170 tx.tx_id = self.tx_id;
171 if toclient {
172 tx.toclient = true;
173 } else {
174 tx.toserver = true;
175 }
176 return tx;
177 }
178
179 // Handle a MQTT message depending on the direction and state.
180 // Note that we are trying to only have one mutable reference to msg
181 // and its components, however, since we are in a large match operation,
182 // we cannot pass around and/or store more references or move things
183 // without having to introduce lifetimes etc.
184 // This is the reason for the code duplication below. Maybe there is a
185 // more concise way to do it, but this works for now.
186 fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
187 match msg.op {
188 MQTTOperation::CONNECT(ref conn) => {
189 self.protocol_version = conn.protocol_version;
190 if self.connected {
191 let mut tx = self.new_tx(msg, toclient);
192 &mut MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
193 self.transactions.push(tx);
194 } else {
195 let mut tx = self.new_tx(msg, toclient);
196 tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
197 self.transactions.push(tx);
198 }
199 },
200 MQTTOperation::PUBLISH(ref publish) => {
201 if !self.connected {
202 let mut tx = self.new_tx(msg, toclient);
203 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
204 self.transactions.push(tx);
205 return;
206 }
207 match msg.header.qos_level {
208 0 => {
209 // with QOS level 0, we do not need to wait for a
210 // response
211 let mut tx = self.new_tx(msg, toclient);
212 tx.complete = true;
213 self.transactions.push(tx);
214 },
215 1..=2 => {
216 if let Some(pkt_id) = publish.message_id {
217 let mut tx = self.new_tx(msg, toclient);
218 tx.pkt_id = Some(pkt_id as u32);
219 self.transactions.push(tx);
220 } else {
221 let mut tx = self.new_tx(msg, toclient);
222 &mut MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
223 self.transactions.push(tx);
224 }
225 },
226 _ => {
227 let mut tx = self.new_tx(msg, toclient);
228 &mut MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
229 self.transactions.push(tx);
230 }
231 }
232 },
233 MQTTOperation::SUBSCRIBE(ref subscribe) => {
234 if !self.connected {
235 let mut tx = self.new_tx(msg, toclient);
236 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
237 self.transactions.push(tx);
238 return;
239 }
240 let pkt_id = subscribe.message_id as u32;
241 match msg.header.qos_level {
242 0 => {
243 // with QOS level 0, we do not need to wait for a
244 // response
245 let mut tx = self.new_tx(msg, toclient);
246 tx.complete = true;
247 self.transactions.push(tx);
248 },
249 1..=2 => {
250 let mut tx = self.new_tx(msg, toclient);
251 tx.pkt_id = Some(pkt_id);
252 self.transactions.push(tx);
253 },
254 _ => {
255 let mut tx = self.new_tx(msg, toclient);
256 &mut MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
257 self.transactions.push(tx);
258 }
259 }
260 },
261 MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
262 if !self.connected {
263 let mut tx = self.new_tx(msg, toclient);
264 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
265 self.transactions.push(tx);
266 return;
267 }
268 let pkt_id = unsubscribe.message_id as u32;
269 match msg.header.qos_level {
270 0 => {
271 // with QOS level 0, we do not need to wait for a
272 // response
273 let mut tx = self.new_tx(msg, toclient);
274 tx.complete = true;
275 self.transactions.push(tx);
276 },
277 1..=2 => {
278 let mut tx = self.new_tx(msg, toclient);
279 tx.pkt_id = Some(pkt_id);
280 self.transactions.push(tx);
281 },
282 _ => {
283 let mut tx = self.new_tx(msg, toclient);
284 &mut MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
285 self.transactions.push(tx);
286 }
287 }
288 },
289 MQTTOperation::CONNACK(ref _connack) => {
290 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
291 (*tx).msg.push(msg);
292 (*tx).complete = true;
293 (*tx).pkt_id = None;
294 self.connected = true;
295 } else {
296 let mut tx = self.new_tx(msg, toclient);
297 &mut MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
298 self.transactions.push(tx);
299 }
300 },
301 MQTTOperation::PUBREC(ref v)
302 | MQTTOperation::PUBREL(ref v) => {
303 if !self.connected {
304 let mut tx = self.new_tx(msg, toclient);
305 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
306 self.transactions.push(tx);
307 return;
308 }
309 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
310 (*tx).msg.push(msg);
311 } else {
312 let mut tx = self.new_tx(msg, toclient);
313 &mut MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
314 self.transactions.push(tx);
315 }
316 },
317 MQTTOperation::PUBACK(ref v)
318 | MQTTOperation::PUBCOMP(ref v) => {
319 if !self.connected {
320 let mut tx = self.new_tx(msg, toclient);
321 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
322 self.transactions.push(tx);
323 return;
324 }
325 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
326 (*tx).msg.push(msg);
327 (*tx).complete = true;
328 (*tx).pkt_id = None;
329 } else {
330 let mut tx = self.new_tx(msg, toclient);
331 &mut MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
332 self.transactions.push(tx);
333 }
334 },
335 MQTTOperation::SUBACK(ref suback) => {
336 if !self.connected {
337 let mut tx = self.new_tx(msg, toclient);
338 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
339 self.transactions.push(tx);
340 return;
341 }
342 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
343 (*tx).msg.push(msg);
344 (*tx).complete = true;
345 (*tx).pkt_id = None;
346 } else {
347 let mut tx = self.new_tx(msg, toclient);
348 &mut MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
349 self.transactions.push(tx);
350 }
351 },
352 MQTTOperation::UNSUBACK(ref unsuback) => {
353 if !self.connected {
354 let mut tx = self.new_tx(msg, toclient);
355 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
356 self.transactions.push(tx);
357 return;
358 }
359 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
360 (*tx).msg.push(msg);
361 (*tx).complete = true;
362 (*tx).pkt_id = None;
363 } else {
364 let mut tx = self.new_tx(msg, toclient);
365 &mut MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
366 self.transactions.push(tx);
367 }
368 },
369 MQTTOperation::UNASSIGNED => {
370 let mut tx = self.new_tx(msg, toclient);
371 tx.complete = true;
372 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgtype);
373 self.transactions.push(tx);
374 },
375 MQTTOperation::TRUNCATED(_) => {
376 let mut tx = self.new_tx(msg, toclient);
377 tx.complete = true;
378 self.transactions.push(tx);
379 },
380 MQTTOperation::AUTH(_)
381 | MQTTOperation::DISCONNECT(_) => {
382 if !self.connected {
383 let mut tx = self.new_tx(msg, toclient);
384 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
385 self.transactions.push(tx);
386 return;
387 }
388 let mut tx = self.new_tx(msg, toclient);
389 tx.complete = true;
390 self.transactions.push(tx);
391 },
392 MQTTOperation::PINGREQ
393 | MQTTOperation::PINGRESP => {
394 if !self.connected {
395 let mut tx = self.new_tx(msg, toclient);
396 &mut MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
397 self.transactions.push(tx);
398 return;
399 }
400 let mut tx = self.new_tx(msg, toclient);
401 tx.complete = true;
402 self.transactions.push(tx);
403 }
404 }
405 }
406
407 fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
408 let mut current = input;
409 if input.len() == 0 {
410 return AppLayerResult::ok();
411 }
412
413 let mut consumed = 0;
414 SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
415 if self.skip_request > 0 {
416 if input.len() <= self.skip_request {
417 SCLogDebug!("reducing skip_request by {}", input.len());
418 self.skip_request -= input.len();
419 return AppLayerResult::ok();
420 } else {
421 current = &input[self.skip_request..];
422 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
423 consumed = self.skip_request;
424 self.skip_request = 0;
425 }
426 }
427
428
429 while current.len() > 0 {
430 let mut skipped = false;
431 SCLogDebug!("request: handling {}", current.len());
432 match parse_message(current, self.protocol_version, self.max_msg_len) {
433 Ok((rem, msg)) => {
434 SCLogDebug!("request msg {:?}", msg);
435 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
436 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
437 if trunc.skipped_length >= current.len() {
438 skipped = true;
439 self.skip_request = trunc.skipped_length - current.len();
440 } else {
441 current = &current[trunc.skipped_length..];
442 self.skip_request = 0;
443 }
444 }
445 self.handle_msg(msg, false);
446 if skipped {
447 return AppLayerResult::ok();
448 }
449 consumed += current.len() - rem.len();
450 current = rem;
451 }
452 Err(nom::Err::Incomplete(_)) => {
453 SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
454 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
455 }
456 Err(_) => {
457 return AppLayerResult::err();
458 }
459 }
460 }
461
462 return AppLayerResult::ok();
463 }
464
465 fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
466 let mut current = input;
467 if input.len() == 0 {
468 return AppLayerResult::ok();
469 }
470
471 let mut consumed = 0;
472 SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
473 if self.skip_response > 0 {
474 if input.len() <= self.skip_response {
475 self.skip_response -= current.len();
476 return AppLayerResult::ok();
477 } else {
478 current = &input[self.skip_response..];
479 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
480 consumed = self.skip_response;
481 self.skip_response = 0;
482 }
483 }
484
485 while current.len() > 0 {
486 let mut skipped = false;
487 SCLogDebug!("response: handling {}", current.len());
488 match parse_message(current, self.protocol_version, self.max_msg_len as usize) {
489 Ok((rem, msg)) => {
490 SCLogDebug!("response msg {:?}", msg);
491 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
492 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
493 if trunc.skipped_length >= current.len() {
494 skipped = true;
495 self.skip_response = trunc.skipped_length - current.len();
496 } else {
497 current = &current[trunc.skipped_length..];
498 self.skip_response = 0;
499 }
500 SCLogDebug!("skip_response now {}", self.skip_response);
501 }
502 self.handle_msg(msg, true);
503 if skipped {
504 return AppLayerResult::ok();
505 }
506 consumed += current.len() - rem.len();
507 current = rem;
508 }
509 Err(nom::Err::Incomplete(_)) => {
510 SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
511 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
512 }
513 Err(_) => {
514 return AppLayerResult::err();
515 }
516 }
517 }
518
519 return AppLayerResult::ok();
520 }
521
522 fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
523 let ev = event as u8;
524 core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev);
525 }
526
527 fn tx_iterator(
528 &mut self,
529 min_tx_id: u64,
530 state: &mut u64,
531 ) -> Option<(&MQTTTransaction, u64, bool)> {
532 let mut index = *state as usize;
533 let len = self.transactions.len();
534
535 while index < len {
536 let tx = &self.transactions[index];
537 if tx.tx_id < min_tx_id + 1 {
538 index += 1;
539 continue;
540 }
541 *state = index as u64;
542 return Some((tx, tx.tx_id - 1, (len - index) > 1));
543 }
544
545 return None;
546 }
547}
548
549// C exports.
550
551export_tx_get_detect_state!(rs_mqtt_tx_get_detect_state, MQTTTransaction);
552export_tx_set_detect_state!(rs_mqtt_tx_set_detect_state, MQTTTransaction);
553
554#[no_mangle]
555pub extern "C" fn rs_mqtt_probing_parser(
556 _flow: *const Flow,
557 _direction: u8,
558 input: *const u8,
559 input_len: u32,
560 _rdir: *mut u8,
561) -> AppProto {
562 let buf = build_slice!(input, input_len as usize);
563 match parse_fixed_header(buf) {
564 Ok((_, hdr)) => {
565 // reject unassigned message type
566 if hdr.message_type == MQTTTypeCode::UNASSIGNED {
567 return unsafe { ALPROTO_FAILED } ;
568 }
569 // with 2 being the highest valid QoS level
570 if hdr.qos_level > 2 {
571 return unsafe { ALPROTO_FAILED };
572 }
573 return unsafe { ALPROTO_MQTT };
574 },
575 Err(nom::Err::Incomplete(_)) => ALPROTO_UNKNOWN,
576 Err(_) => unsafe { ALPROTO_FAILED }
577 }
578}
579
580#[no_mangle]
547d6c2d 581pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
c3136007
SS
582 let state = MQTTState::new();
583 let boxed = Box::new(state);
584 return unsafe { transmute(boxed) };
585}
586
587#[no_mangle]
588pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
589 let _drop: Box<MQTTState> = unsafe { transmute(state) };
590}
591
592#[no_mangle]
593pub extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
594 let state = cast_pointer!(state, MQTTState);
595 state.free_tx(tx_id);
596}
597
598#[no_mangle]
599pub extern "C" fn rs_mqtt_parse_request(
600 _flow: *const Flow,
601 state: *mut std::os::raw::c_void,
602 _pstate: *mut std::os::raw::c_void,
603 input: *const u8,
604 input_len: u32,
605 _data: *const std::os::raw::c_void,
606 _flags: u8,
607) -> AppLayerResult {
608 let state = cast_pointer!(state, MQTTState);
609 let buf = build_slice!(input, input_len as usize);
610 return state.parse_request(buf).into();
611}
612
613#[no_mangle]
614pub extern "C" fn rs_mqtt_parse_response(
615 _flow: *const Flow,
616 state: *mut std::os::raw::c_void,
617 _pstate: *mut std::os::raw::c_void,
618 input: *const u8,
619 input_len: u32,
620 _data: *const std::os::raw::c_void,
621 _flags: u8,
622) -> AppLayerResult {
623 let state = cast_pointer!(state, MQTTState);
624 let buf = build_slice!(input, input_len as usize);
625 return state.parse_response(buf).into();
626}
627
628#[no_mangle]
629pub extern "C" fn rs_mqtt_state_get_tx(
630 state: *mut std::os::raw::c_void,
631 tx_id: u64,
632) -> *mut std::os::raw::c_void {
633 let state = cast_pointer!(state, MQTTState);
634 match state.get_tx(tx_id) {
635 Some(tx) => {
636 return unsafe { transmute(tx) };
637 }
638 None => {
639 return std::ptr::null_mut();
640 }
641 }
642}
643
644#[no_mangle]
645pub extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
646 let state = cast_pointer!(state, MQTTState);
647 return state.tx_id;
648}
649
650#[no_mangle]
651pub extern "C" fn rs_mqtt_state_progress_completion_status(_direction: u8) -> std::os::raw::c_int {
652 return 1;
653}
654
655#[no_mangle]
656pub extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
657 let tx = cast_pointer!(tx, MQTTTransaction);
658 if tx.toclient {
659 return 1;
660 }
661 return 0;
662}
663
664#[no_mangle]
665pub extern "C" fn rs_mqtt_tx_get_alstate_progress(
666 tx: *mut std::os::raw::c_void,
667 direction: u8,
668) -> std::os::raw::c_int {
669 let tx = cast_pointer!(tx, MQTTTransaction);
670 if tx.complete {
671 if direction == core::STREAM_TOSERVER {
672 if tx.toserver {
673 return 1;
674 }
675 } else if direction == core::STREAM_TOCLIENT {
676 if tx.toclient {
677 return 1;
678 }
679 }
680 }
681 return 0;
682}
683
684#[no_mangle]
685pub extern "C" fn rs_mqtt_tx_get_logged(
686 _state: *mut std::os::raw::c_void,
687 tx: *mut std::os::raw::c_void,
688) -> u32 {
689 let tx = cast_pointer!(tx, MQTTTransaction);
690 return tx.logged.get();
691}
692
693#[no_mangle]
694pub extern "C" fn rs_mqtt_tx_set_logged(
695 _state: *mut std::os::raw::c_void,
696 tx: *mut std::os::raw::c_void,
697 logged: u32,
698) {
699 let tx = cast_pointer!(tx, MQTTTransaction);
700 tx.logged.set(logged);
701}
702
703#[no_mangle]
704pub extern "C" fn rs_mqtt_state_get_events(
705 tx: *mut std::os::raw::c_void,
706) -> *mut core::AppLayerDecoderEvents {
707 let tx = cast_pointer!(tx, MQTTTransaction);
708 return tx.events;
709}
710
711#[no_mangle]
712pub extern "C" fn rs_mqtt_state_get_event_info_by_id(event_id: std::os::raw::c_int,
713 event_name: *mut *const std::os::raw::c_char,
714 event_type: *mut core::AppLayerEventType)
715 -> i8
716{
717 if let Some(e) = FromPrimitive::from_i32(event_id as i32) {
718 let estr = match e {
719 MQTTEvent::MissingConnect => { "missing_connect\0" },
720 MQTTEvent::MissingPublish => { "missing_publish\0" },
721 MQTTEvent::MissingSubscribe => { "missing_subscribe\0" },
722 MQTTEvent::MissingUnsubscribe => { "missing_unsubscribe\0" },
723 MQTTEvent::DoubleConnect => { "double_connect\0" },
724 MQTTEvent::UnintroducedMessage => { "unintroduced_message\0" },
725 MQTTEvent::InvalidQosLevel => { "invalid_qos_level\0" },
726 MQTTEvent::MissingMsgId => { "missing_msg_id\0" },
727 MQTTEvent::UnassignedMsgtype => { "unassigned_msg_type\0" },
728 };
729 unsafe{
730 *event_name = estr.as_ptr() as *const std::os::raw::c_char;
731 *event_type = core::APP_LAYER_EVENT_TYPE_TRANSACTION;
732 };
733 0
734 } else {
735 -1
736 }
737}
738
739#[no_mangle]
740pub extern "C" fn rs_mqtt_state_get_event_info(event_name: *const std::os::raw::c_char,
741 event_id: *mut std::os::raw::c_int,
742 event_type: *mut core::AppLayerEventType)
743 -> std::os::raw::c_int
744{
745 if event_name == std::ptr::null() { return -1; }
746 let c_event_name: &CStr = unsafe { CStr::from_ptr(event_name) };
747 let event = match c_event_name.to_str() {
748 Ok(s) => {
749 match s {
750 "missing_connect" => MQTTEvent::MissingConnect as i32,
751 "missing_publish" => MQTTEvent::MissingPublish as i32,
752 "missing_subscribe" => MQTTEvent::MissingSubscribe as i32,
753 "missing_unsubscribe" => MQTTEvent::MissingUnsubscribe as i32,
754 "double_connect" => MQTTEvent::DoubleConnect as i32,
755 "unintroduced_message" => MQTTEvent::UnintroducedMessage as i32,
756 "invalid_qos_level" => MQTTEvent::InvalidQosLevel as i32,
757 "missing_msg_id" => MQTTEvent::MissingMsgId as i32,
758 "unassigned_msg_type" => MQTTEvent::UnassignedMsgtype as i32,
759 _ => -1, // unknown event
760 }
761 },
762 Err(_) => -1, // UTF-8 conversion failed
763 };
764 unsafe{
765 *event_type = core::APP_LAYER_EVENT_TYPE_TRANSACTION;
766 *event_id = event as std::os::raw::c_int;
767 };
768 0
769}
770
771#[no_mangle]
772pub extern "C" fn rs_mqtt_state_get_tx_iterator(
773 _ipproto: u8,
774 _alproto: AppProto,
775 state: *mut std::os::raw::c_void,
776 min_tx_id: u64,
777 _max_tx_id: u64,
778 istate: &mut u64,
779) -> applayer::AppLayerGetTxIterTuple {
780 let state = cast_pointer!(state, MQTTState);
781 match state.tx_iterator(min_tx_id, istate) {
782 Some((tx, out_tx_id, has_next)) => {
783 let c_tx = unsafe { transmute(tx) };
784 let ires = applayer::AppLayerGetTxIterTuple::with_values(c_tx, out_tx_id, has_next);
785 return ires;
786 }
787 None => {
788 return applayer::AppLayerGetTxIterTuple::not_found();
789 }
790 }
791}
792
793// Parser name as a C style string.
794const PARSER_NAME: &'static [u8] = b"mqtt\0";
795
796export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
797
798#[no_mangle]
799pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
800 let default_port = CString::new("[1883]").unwrap();
801 let max_msg_len = &mut MAX_MSG_LEN;
802 *max_msg_len = cfg_max_msg_len;
803 let parser = RustParser {
804 name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
805 default_port: default_port.as_ptr(),
806 ipproto: IPPROTO_TCP,
807 probe_ts: Some(rs_mqtt_probing_parser),
808 probe_tc: Some(rs_mqtt_probing_parser),
809 min_depth: 0,
810 max_depth: 16,
811 state_new: rs_mqtt_state_new,
812 state_free: rs_mqtt_state_free,
813 tx_free: rs_mqtt_state_tx_free,
814 parse_ts: rs_mqtt_parse_request,
815 parse_tc: rs_mqtt_parse_response,
816 get_tx_count: rs_mqtt_state_get_tx_count,
817 get_tx: rs_mqtt_state_get_tx,
818 tx_get_comp_st: rs_mqtt_state_progress_completion_status,
819 tx_get_progress: rs_mqtt_tx_get_alstate_progress,
820 get_de_state: rs_mqtt_tx_get_detect_state,
821 set_de_state: rs_mqtt_tx_set_detect_state,
822 get_events: Some(rs_mqtt_state_get_events),
823 get_eventinfo: Some(rs_mqtt_state_get_event_info),
824 get_eventinfo_byid: Some(rs_mqtt_state_get_event_info_by_id),
825 localstorage_new: None,
826 localstorage_free: None,
827 get_files: None,
828 get_tx_iterator: Some(rs_mqtt_state_get_tx_iterator),
829 get_tx_data: rs_mqtt_get_tx_data,
830 apply_tx_config: None,
831 flags: 0,
4da0d9bd 832 truncate: None,
c3136007
SS
833 };
834
835 let ip_proto_str = CString::new("tcp").unwrap();
836
837 if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
838 let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
839 ALPROTO_MQTT = alproto;
840 if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
841 let _ = AppLayerRegisterParser(&parser, alproto);
842 }
843 } else {
844 SCLogDebug!("Protocol detector and parser disabled for MQTT.");
845 }
846}