]> git.ipfire.org Git - people/ms/suricata.git/blob - rust/src/mqtt/mqtt.rs
91564838cfaae20f438dcc734140bd26e931f945
[people/ms/suricata.git] / rust / src / mqtt / mqtt.rs
1 /* Copyright (C) 2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 // written by Sascha Steinbiss <sascha@steinbiss.name>
19
20 use super::mqtt_message::*;
21 use super::parser::*;
22 use crate::applayer::{self, LoggerFlags};
23 use crate::applayer::*;
24 use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
25 use nom7::Err;
26 use std;
27 use std::ffi::CString;
28
29 // Used as a special pseudo packet identifier to denote the first CONNECT
30 // packet in a connection. Note that there is no risk of collision with a
31 // parsed packet identifier because in the protocol these are only 16 bit
32 // unsigned.
33 const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
34 // Maximum message length in bytes. If the length of a message exceeds
35 // this value, it will be truncated. Default: 1MB.
36 static mut MAX_MSG_LEN: u32 = 1048576;
37
38 static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
39
40 #[derive(FromPrimitive, Debug, AppLayerEvent)]
41 pub enum MQTTEvent {
42 MissingConnect,
43 MissingPublish,
44 MissingSubscribe,
45 MissingUnsubscribe,
46 DoubleConnect,
47 UnintroducedMessage,
48 InvalidQosLevel,
49 MissingMsgId,
50 UnassignedMsgType,
51 }
52
53 #[derive(Debug)]
54 pub struct MQTTTransaction {
55 tx_id: u64,
56 pkt_id: Option<u32>,
57 pub msg: Vec<MQTTMessage>,
58 complete: bool,
59 toclient: bool,
60 toserver: bool,
61
62 logged: LoggerFlags,
63 de_state: Option<*mut core::DetectEngineState>,
64 events: *mut core::AppLayerDecoderEvents,
65 tx_data: applayer::AppLayerTxData,
66 }
67
68 impl MQTTTransaction {
69 pub fn new(msg: MQTTMessage) -> MQTTTransaction {
70 let mut m = MQTTTransaction {
71 tx_id: 0,
72 pkt_id: None,
73 complete: false,
74 logged: LoggerFlags::new(),
75 msg: Vec::new(),
76 toclient: false,
77 toserver: false,
78 de_state: None,
79 events: std::ptr::null_mut(),
80 tx_data: applayer::AppLayerTxData::new(),
81 };
82 m.msg.push(msg);
83 return m;
84 }
85
86 pub fn free(&mut self) {
87 if !self.events.is_null() {
88 core::sc_app_layer_decoder_events_free_events(&mut self.events);
89 }
90 if let Some(state) = self.de_state {
91 core::sc_detect_engine_state_free(state);
92 }
93 }
94 }
95
96 impl Drop for MQTTTransaction {
97 fn drop(&mut self) {
98 self.free();
99 }
100 }
101
102 pub struct MQTTState {
103 tx_id: u64,
104 pub protocol_version: u8,
105 transactions: Vec<MQTTTransaction>,
106 connected: bool,
107 skip_request: usize,
108 skip_response: usize,
109 max_msg_len: usize,
110 }
111
112 impl MQTTState {
113 pub fn new() -> Self {
114 Self {
115 tx_id: 0,
116 protocol_version: 0,
117 transactions: Vec::new(),
118 connected: false,
119 skip_request: 0,
120 skip_response: 0,
121 max_msg_len: unsafe { MAX_MSG_LEN as usize },
122 }
123 }
124
125 fn free_tx(&mut self, tx_id: u64) {
126 let len = self.transactions.len();
127 let mut found = false;
128 let mut index = 0;
129 for i in 0..len {
130 let tx = &self.transactions[i];
131 if tx.tx_id == tx_id + 1 {
132 found = true;
133 index = i;
134 break;
135 }
136 }
137 if found {
138 self.transactions.remove(index);
139 }
140 }
141
142 pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
143 for tx in &mut self.transactions {
144 if tx.tx_id == tx_id + 1 {
145 return Some(tx);
146 }
147 }
148 return None;
149 }
150
151 pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
152 for tx in &mut self.transactions {
153 if !tx.complete {
154 if let Some(mpktid) = tx.pkt_id {
155 if mpktid == pkt_id {
156 return Some(tx);
157 }
158 }
159 }
160 }
161 return None;
162 }
163
164 fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
165 let mut tx = MQTTTransaction::new(msg);
166 self.tx_id += 1;
167 tx.tx_id = self.tx_id;
168 if toclient {
169 tx.toclient = true;
170 } else {
171 tx.toserver = true;
172 }
173 return tx;
174 }
175
176 // Handle a MQTT message depending on the direction and state.
177 // Note that we are trying to only have one mutable reference to msg
178 // and its components, however, since we are in a large match operation,
179 // we cannot pass around and/or store more references or move things
180 // without having to introduce lifetimes etc.
181 // This is the reason for the code duplication below. Maybe there is a
182 // more concise way to do it, but this works for now.
183 fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
184 match msg.op {
185 MQTTOperation::CONNECT(ref conn) => {
186 self.protocol_version = conn.protocol_version;
187 if self.connected {
188 let mut tx = self.new_tx(msg, toclient);
189 MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
190 self.transactions.push(tx);
191 } else {
192 let mut tx = self.new_tx(msg, toclient);
193 tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
194 self.transactions.push(tx);
195 }
196 },
197 MQTTOperation::PUBLISH(ref publish) => {
198 if !self.connected {
199 let mut tx = self.new_tx(msg, toclient);
200 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
201 self.transactions.push(tx);
202 return;
203 }
204 match msg.header.qos_level {
205 0 => {
206 // with QOS level 0, we do not need to wait for a
207 // response
208 let mut tx = self.new_tx(msg, toclient);
209 tx.complete = true;
210 self.transactions.push(tx);
211 },
212 1..=2 => {
213 if let Some(pkt_id) = publish.message_id {
214 let mut tx = self.new_tx(msg, toclient);
215 tx.pkt_id = Some(pkt_id as u32);
216 self.transactions.push(tx);
217 } else {
218 let mut tx = self.new_tx(msg, toclient);
219 MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
220 self.transactions.push(tx);
221 }
222 },
223 _ => {
224 let mut tx = self.new_tx(msg, toclient);
225 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
226 self.transactions.push(tx);
227 }
228 }
229 },
230 MQTTOperation::SUBSCRIBE(ref subscribe) => {
231 if !self.connected {
232 let mut tx = self.new_tx(msg, toclient);
233 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
234 self.transactions.push(tx);
235 return;
236 }
237 let pkt_id = subscribe.message_id as u32;
238 match msg.header.qos_level {
239 0 => {
240 // with QOS level 0, we do not need to wait for a
241 // response
242 let mut tx = self.new_tx(msg, toclient);
243 tx.complete = true;
244 self.transactions.push(tx);
245 },
246 1..=2 => {
247 let mut tx = self.new_tx(msg, toclient);
248 tx.pkt_id = Some(pkt_id);
249 self.transactions.push(tx);
250 },
251 _ => {
252 let mut tx = self.new_tx(msg, toclient);
253 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
254 self.transactions.push(tx);
255 }
256 }
257 },
258 MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
259 if !self.connected {
260 let mut tx = self.new_tx(msg, toclient);
261 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
262 self.transactions.push(tx);
263 return;
264 }
265 let pkt_id = unsubscribe.message_id as u32;
266 match msg.header.qos_level {
267 0 => {
268 // with QOS level 0, we do not need to wait for a
269 // response
270 let mut tx = self.new_tx(msg, toclient);
271 tx.complete = true;
272 self.transactions.push(tx);
273 },
274 1..=2 => {
275 let mut tx = self.new_tx(msg, toclient);
276 tx.pkt_id = Some(pkt_id);
277 self.transactions.push(tx);
278 },
279 _ => {
280 let mut tx = self.new_tx(msg, toclient);
281 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
282 self.transactions.push(tx);
283 }
284 }
285 },
286 MQTTOperation::CONNACK(ref _connack) => {
287 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
288 (*tx).msg.push(msg);
289 (*tx).complete = true;
290 (*tx).pkt_id = None;
291 self.connected = true;
292 } else {
293 let mut tx = self.new_tx(msg, toclient);
294 MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
295 self.transactions.push(tx);
296 }
297 },
298 MQTTOperation::PUBREC(ref v)
299 | MQTTOperation::PUBREL(ref v) => {
300 if !self.connected {
301 let mut tx = self.new_tx(msg, toclient);
302 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
303 self.transactions.push(tx);
304 return;
305 }
306 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
307 (*tx).msg.push(msg);
308 } else {
309 let mut tx = self.new_tx(msg, toclient);
310 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
311 self.transactions.push(tx);
312 }
313 },
314 MQTTOperation::PUBACK(ref v)
315 | MQTTOperation::PUBCOMP(ref v) => {
316 if !self.connected {
317 let mut tx = self.new_tx(msg, toclient);
318 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
319 self.transactions.push(tx);
320 return;
321 }
322 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
323 (*tx).msg.push(msg);
324 (*tx).complete = true;
325 (*tx).pkt_id = None;
326 } else {
327 let mut tx = self.new_tx(msg, toclient);
328 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
329 self.transactions.push(tx);
330 }
331 },
332 MQTTOperation::SUBACK(ref suback) => {
333 if !self.connected {
334 let mut tx = self.new_tx(msg, toclient);
335 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
336 self.transactions.push(tx);
337 return;
338 }
339 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
340 (*tx).msg.push(msg);
341 (*tx).complete = true;
342 (*tx).pkt_id = None;
343 } else {
344 let mut tx = self.new_tx(msg, toclient);
345 MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
346 self.transactions.push(tx);
347 }
348 },
349 MQTTOperation::UNSUBACK(ref unsuback) => {
350 if !self.connected {
351 let mut tx = self.new_tx(msg, toclient);
352 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
353 self.transactions.push(tx);
354 return;
355 }
356 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
357 (*tx).msg.push(msg);
358 (*tx).complete = true;
359 (*tx).pkt_id = None;
360 } else {
361 let mut tx = self.new_tx(msg, toclient);
362 MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
363 self.transactions.push(tx);
364 }
365 },
366 MQTTOperation::UNASSIGNED => {
367 let mut tx = self.new_tx(msg, toclient);
368 tx.complete = true;
369 MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
370 self.transactions.push(tx);
371 },
372 MQTTOperation::TRUNCATED(_) => {
373 let mut tx = self.new_tx(msg, toclient);
374 tx.complete = true;
375 self.transactions.push(tx);
376 },
377 MQTTOperation::AUTH(_)
378 | MQTTOperation::DISCONNECT(_) => {
379 if !self.connected {
380 let mut tx = self.new_tx(msg, toclient);
381 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
382 self.transactions.push(tx);
383 return;
384 }
385 let mut tx = self.new_tx(msg, toclient);
386 tx.complete = true;
387 self.transactions.push(tx);
388 },
389 MQTTOperation::PINGREQ
390 | MQTTOperation::PINGRESP => {
391 if !self.connected {
392 let mut tx = self.new_tx(msg, toclient);
393 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
394 self.transactions.push(tx);
395 return;
396 }
397 let mut tx = self.new_tx(msg, toclient);
398 tx.complete = true;
399 self.transactions.push(tx);
400 }
401 }
402 }
403
404 fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
405 let mut current = input;
406 if input.len() == 0 {
407 return AppLayerResult::ok();
408 }
409
410 let mut consumed = 0;
411 SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
412 if self.skip_request > 0 {
413 if input.len() <= self.skip_request {
414 SCLogDebug!("reducing skip_request by {}", input.len());
415 self.skip_request -= input.len();
416 return AppLayerResult::ok();
417 } else {
418 current = &input[self.skip_request..];
419 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
420 consumed = self.skip_request;
421 self.skip_request = 0;
422 }
423 }
424
425
426 while current.len() > 0 {
427 let mut skipped = false;
428 SCLogDebug!("request: handling {}", current.len());
429 match parse_message(current, self.protocol_version, self.max_msg_len) {
430 Ok((rem, msg)) => {
431 SCLogDebug!("request msg {:?}", msg);
432 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
433 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
434 if trunc.skipped_length >= current.len() {
435 skipped = true;
436 self.skip_request = trunc.skipped_length - current.len();
437 } else {
438 current = &current[trunc.skipped_length..];
439 self.skip_request = 0;
440 }
441 }
442 self.handle_msg(msg, false);
443 if skipped {
444 return AppLayerResult::ok();
445 }
446 consumed += current.len() - rem.len();
447 current = rem;
448 }
449 Err(Err::Incomplete(_)) => {
450 SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
451 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
452 }
453 Err(_) => {
454 return AppLayerResult::err();
455 }
456 }
457 }
458
459 return AppLayerResult::ok();
460 }
461
462 fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
463 let mut current = input;
464 if input.len() == 0 {
465 return AppLayerResult::ok();
466 }
467
468 let mut consumed = 0;
469 SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
470 if self.skip_response > 0 {
471 if input.len() <= self.skip_response {
472 self.skip_response -= current.len();
473 return AppLayerResult::ok();
474 } else {
475 current = &input[self.skip_response..];
476 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
477 consumed = self.skip_response;
478 self.skip_response = 0;
479 }
480 }
481
482 while current.len() > 0 {
483 let mut skipped = false;
484 SCLogDebug!("response: handling {}", current.len());
485 match parse_message(current, self.protocol_version, self.max_msg_len as usize) {
486 Ok((rem, msg)) => {
487 SCLogDebug!("response msg {:?}", msg);
488 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
489 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
490 if trunc.skipped_length >= current.len() {
491 skipped = true;
492 self.skip_response = trunc.skipped_length - current.len();
493 } else {
494 current = &current[trunc.skipped_length..];
495 self.skip_response = 0;
496 }
497 SCLogDebug!("skip_response now {}", self.skip_response);
498 }
499 self.handle_msg(msg, true);
500 if skipped {
501 return AppLayerResult::ok();
502 }
503 consumed += current.len() - rem.len();
504 current = rem;
505 }
506 Err(Err::Incomplete(_)) => {
507 SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
508 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
509 }
510 Err(_) => {
511 return AppLayerResult::err();
512 }
513 }
514 }
515
516 return AppLayerResult::ok();
517 }
518
519 fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
520 let ev = event as u8;
521 core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev);
522 }
523
524 fn tx_iterator(
525 &mut self,
526 min_tx_id: u64,
527 state: &mut u64,
528 ) -> Option<(&MQTTTransaction, u64, bool)> {
529 let mut index = *state as usize;
530 let len = self.transactions.len();
531
532 while index < len {
533 let tx = &self.transactions[index];
534 if tx.tx_id < min_tx_id + 1 {
535 index += 1;
536 continue;
537 }
538 *state = index as u64;
539 return Some((tx, tx.tx_id - 1, (len - index) > 1));
540 }
541
542 return None;
543 }
544 }
545
546 // C exports.
547
548 export_tx_get_detect_state!(rs_mqtt_tx_get_detect_state, MQTTTransaction);
549 export_tx_set_detect_state!(rs_mqtt_tx_set_detect_state, MQTTTransaction);
550
551 #[no_mangle]
552 pub unsafe extern "C" fn rs_mqtt_probing_parser(
553 _flow: *const Flow,
554 _direction: u8,
555 input: *const u8,
556 input_len: u32,
557 _rdir: *mut u8,
558 ) -> AppProto {
559 let buf = build_slice!(input, input_len as usize);
560 match parse_fixed_header(buf) {
561 Ok((_, hdr)) => {
562 // reject unassigned message type
563 if hdr.message_type == MQTTTypeCode::UNASSIGNED {
564 return ALPROTO_FAILED;
565 }
566 // with 2 being the highest valid QoS level
567 if hdr.qos_level > 2 {
568 return ALPROTO_FAILED;
569 }
570 return ALPROTO_MQTT;
571 },
572 Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
573 Err(_) => ALPROTO_FAILED
574 }
575 }
576
577 #[no_mangle]
578 pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
579 let state = MQTTState::new();
580 let boxed = Box::new(state);
581 return Box::into_raw(boxed) as *mut _;
582 }
583
584 #[no_mangle]
585 pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
586 std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) });
587 }
588
589 #[no_mangle]
590 pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
591 let state = cast_pointer!(state, MQTTState);
592 state.free_tx(tx_id);
593 }
594
595 #[no_mangle]
596 pub unsafe extern "C" fn rs_mqtt_parse_request(
597 _flow: *const Flow,
598 state: *mut std::os::raw::c_void,
599 _pstate: *mut std::os::raw::c_void,
600 input: *const u8,
601 input_len: u32,
602 _data: *const std::os::raw::c_void,
603 _flags: u8,
604 ) -> AppLayerResult {
605 let state = cast_pointer!(state, MQTTState);
606 let buf = build_slice!(input, input_len as usize);
607 return state.parse_request(buf);
608 }
609
610 #[no_mangle]
611 pub unsafe extern "C" fn rs_mqtt_parse_response(
612 _flow: *const Flow,
613 state: *mut std::os::raw::c_void,
614 _pstate: *mut std::os::raw::c_void,
615 input: *const u8,
616 input_len: u32,
617 _data: *const std::os::raw::c_void,
618 _flags: u8,
619 ) -> AppLayerResult {
620 let state = cast_pointer!(state, MQTTState);
621 let buf = build_slice!(input, input_len as usize);
622 return state.parse_response(buf);
623 }
624
625 #[no_mangle]
626 pub unsafe extern "C" fn rs_mqtt_state_get_tx(
627 state: *mut std::os::raw::c_void,
628 tx_id: u64,
629 ) -> *mut std::os::raw::c_void {
630 let state = cast_pointer!(state, MQTTState);
631 match state.get_tx(tx_id) {
632 Some(tx) => {
633 return tx as *const _ as *mut _;
634 }
635 None => {
636 return std::ptr::null_mut();
637 }
638 }
639 }
640
641 #[no_mangle]
642 pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
643 let state = cast_pointer!(state, MQTTState);
644 return state.tx_id;
645 }
646
647 #[no_mangle]
648 pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
649 let tx = cast_pointer!(tx, MQTTTransaction);
650 if tx.toclient {
651 return 1;
652 }
653 return 0;
654 }
655
656 #[no_mangle]
657 pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
658 tx: *mut std::os::raw::c_void,
659 direction: u8,
660 ) -> std::os::raw::c_int {
661 let tx = cast_pointer!(tx, MQTTTransaction);
662 if tx.complete {
663 if direction == core::STREAM_TOSERVER {
664 if tx.toserver {
665 return 1;
666 }
667 } else if direction == core::STREAM_TOCLIENT {
668 if tx.toclient {
669 return 1;
670 }
671 }
672 }
673 return 0;
674 }
675
676 #[no_mangle]
677 pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
678 _state: *mut std::os::raw::c_void,
679 tx: *mut std::os::raw::c_void,
680 ) -> u32 {
681 let tx = cast_pointer!(tx, MQTTTransaction);
682 return tx.logged.get();
683 }
684
685 #[no_mangle]
686 pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
687 _state: *mut std::os::raw::c_void,
688 tx: *mut std::os::raw::c_void,
689 logged: u32,
690 ) {
691 let tx = cast_pointer!(tx, MQTTTransaction);
692 tx.logged.set(logged);
693 }
694
695 #[no_mangle]
696 pub unsafe extern "C" fn rs_mqtt_state_get_events(
697 tx: *mut std::os::raw::c_void,
698 ) -> *mut core::AppLayerDecoderEvents {
699 let tx = cast_pointer!(tx, MQTTTransaction);
700 return tx.events;
701 }
702
703 #[no_mangle]
704 pub unsafe extern "C" fn rs_mqtt_state_get_tx_iterator(
705 _ipproto: u8,
706 _alproto: AppProto,
707 state: *mut std::os::raw::c_void,
708 min_tx_id: u64,
709 _max_tx_id: u64,
710 istate: &mut u64,
711 ) -> applayer::AppLayerGetTxIterTuple {
712 let state = cast_pointer!(state, MQTTState);
713 match state.tx_iterator(min_tx_id, istate) {
714 Some((tx, out_tx_id, has_next)) => {
715 let c_tx = tx as *const _ as *mut _;
716 let ires = applayer::AppLayerGetTxIterTuple::with_values(c_tx, out_tx_id, has_next);
717 return ires;
718 }
719 None => {
720 return applayer::AppLayerGetTxIterTuple::not_found();
721 }
722 }
723 }
724
725 // Parser name as a C style string.
726 const PARSER_NAME: &'static [u8] = b"mqtt\0";
727
728 export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
729
730 #[no_mangle]
731 pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
732 let default_port = CString::new("[1883]").unwrap();
733 let max_msg_len = &mut MAX_MSG_LEN;
734 *max_msg_len = cfg_max_msg_len;
735 let parser = RustParser {
736 name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
737 default_port: default_port.as_ptr(),
738 ipproto: IPPROTO_TCP,
739 probe_ts: Some(rs_mqtt_probing_parser),
740 probe_tc: Some(rs_mqtt_probing_parser),
741 min_depth: 0,
742 max_depth: 16,
743 state_new: rs_mqtt_state_new,
744 state_free: rs_mqtt_state_free,
745 tx_free: rs_mqtt_state_tx_free,
746 parse_ts: rs_mqtt_parse_request,
747 parse_tc: rs_mqtt_parse_response,
748 get_tx_count: rs_mqtt_state_get_tx_count,
749 get_tx: rs_mqtt_state_get_tx,
750 tx_comp_st_ts: 1,
751 tx_comp_st_tc: 1,
752 tx_get_progress: rs_mqtt_tx_get_alstate_progress,
753 get_de_state: rs_mqtt_tx_get_detect_state,
754 set_de_state: rs_mqtt_tx_set_detect_state,
755 get_events: Some(rs_mqtt_state_get_events),
756 get_eventinfo: Some(MQTTEvent::get_event_info),
757 get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id),
758 localstorage_new: None,
759 localstorage_free: None,
760 get_files: None,
761 get_tx_iterator: Some(rs_mqtt_state_get_tx_iterator),
762 get_tx_data: rs_mqtt_get_tx_data,
763 apply_tx_config: None,
764 flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS,
765 truncate: None,
766 };
767
768 let ip_proto_str = CString::new("tcp").unwrap();
769
770 if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
771 let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
772 ALPROTO_MQTT = alproto;
773 if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
774 let _ = AppLayerRegisterParser(&parser, alproto);
775 }
776 } else {
777 SCLogDebug!("Protocol detector and parser disabled for MQTT.");
778 }
779 }