]> git.ipfire.org Git - people/ms/suricata.git/blob - rust/src/mqtt/mqtt.rs
rust: functions that reference raw pointers are unsafe
[people/ms/suricata.git] / rust / src / mqtt / mqtt.rs
1 /* Copyright (C) 2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18 // written by Sascha Steinbiss <sascha@steinbiss.name>
19
20 use super::mqtt_message::*;
21 use super::parser::*;
22 use crate::applayer::{self, LoggerFlags};
23 use crate::applayer::*;
24 use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
25 use num_traits::FromPrimitive;
26 use nom;
27 use std;
28 use std::ffi::{CStr,CString};
29
30 // Used as a special pseudo packet identifier to denote the first CONNECT
31 // packet in a connection. Note that there is no risk of collision with a
32 // parsed packet identifier because in the protocol these are only 16 bit
33 // unsigned.
34 const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
35 // Maximum message length in bytes. If the length of a message exceeds
36 // this value, it will be truncated. Default: 1MB.
37 static mut MAX_MSG_LEN: u32 = 1048576;
38
39 static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
40
41 #[derive(FromPrimitive, Debug)]
42 #[repr(u32)]
43 pub enum MQTTEvent {
44 MissingConnect = 0,
45 MissingPublish,
46 MissingSubscribe,
47 MissingUnsubscribe,
48 DoubleConnect,
49 UnintroducedMessage,
50 InvalidQosLevel,
51 MissingMsgId,
52 UnassignedMsgtype,
53 }
54
55 #[derive(Debug)]
56 pub struct MQTTTransaction {
57 tx_id: u64,
58 pkt_id: Option<u32>,
59 pub msg: Vec<MQTTMessage>,
60 complete: bool,
61 toclient: bool,
62 toserver: bool,
63
64 logged: LoggerFlags,
65 de_state: Option<*mut core::DetectEngineState>,
66 events: *mut core::AppLayerDecoderEvents,
67 tx_data: applayer::AppLayerTxData,
68 }
69
70 impl MQTTTransaction {
71 pub fn new(msg: MQTTMessage) -> MQTTTransaction {
72 let mut m = MQTTTransaction {
73 tx_id: 0,
74 pkt_id: None,
75 complete: false,
76 logged: LoggerFlags::new(),
77 msg: Vec::new(),
78 toclient: false,
79 toserver: false,
80 de_state: None,
81 events: std::ptr::null_mut(),
82 tx_data: applayer::AppLayerTxData::new(),
83 };
84 m.msg.push(msg);
85 return m;
86 }
87
88 pub fn free(&mut self) {
89 if self.events != std::ptr::null_mut() {
90 core::sc_app_layer_decoder_events_free_events(&mut self.events);
91 }
92 if let Some(state) = self.de_state {
93 core::sc_detect_engine_state_free(state);
94 }
95 }
96 }
97
98 impl Drop for MQTTTransaction {
99 fn drop(&mut self) {
100 self.free();
101 }
102 }
103
104 pub struct MQTTState {
105 tx_id: u64,
106 pub protocol_version: u8,
107 transactions: Vec<MQTTTransaction>,
108 connected: bool,
109 skip_request: usize,
110 skip_response: usize,
111 max_msg_len: usize,
112 }
113
114 impl MQTTState {
115 pub fn new() -> Self {
116 Self {
117 tx_id: 0,
118 protocol_version: 0,
119 transactions: Vec::new(),
120 connected: false,
121 skip_request: 0,
122 skip_response: 0,
123 max_msg_len: unsafe { MAX_MSG_LEN as usize },
124 }
125 }
126
127 fn free_tx(&mut self, tx_id: u64) {
128 let len = self.transactions.len();
129 let mut found = false;
130 let mut index = 0;
131 for i in 0..len {
132 let tx = &self.transactions[i];
133 if tx.tx_id == tx_id + 1 {
134 found = true;
135 index = i;
136 break;
137 }
138 }
139 if found {
140 self.transactions.remove(index);
141 }
142 }
143
144 pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
145 for tx in &mut self.transactions {
146 if tx.tx_id == tx_id + 1 {
147 return Some(tx);
148 }
149 }
150 return None;
151 }
152
153 pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
154 for tx in &mut self.transactions {
155 if !tx.complete {
156 if let Some(mpktid) = tx.pkt_id {
157 if mpktid == pkt_id {
158 return Some(tx);
159 }
160 }
161 }
162 }
163 return None;
164 }
165
166 fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
167 let mut tx = MQTTTransaction::new(msg);
168 self.tx_id += 1;
169 tx.tx_id = self.tx_id;
170 if toclient {
171 tx.toclient = true;
172 } else {
173 tx.toserver = true;
174 }
175 return tx;
176 }
177
178 // Handle a MQTT message depending on the direction and state.
179 // Note that we are trying to only have one mutable reference to msg
180 // and its components, however, since we are in a large match operation,
181 // we cannot pass around and/or store more references or move things
182 // without having to introduce lifetimes etc.
183 // This is the reason for the code duplication below. Maybe there is a
184 // more concise way to do it, but this works for now.
185 fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
186 match msg.op {
187 MQTTOperation::CONNECT(ref conn) => {
188 self.protocol_version = conn.protocol_version;
189 if self.connected {
190 let mut tx = self.new_tx(msg, toclient);
191 MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
192 self.transactions.push(tx);
193 } else {
194 let mut tx = self.new_tx(msg, toclient);
195 tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
196 self.transactions.push(tx);
197 }
198 },
199 MQTTOperation::PUBLISH(ref publish) => {
200 if !self.connected {
201 let mut tx = self.new_tx(msg, toclient);
202 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
203 self.transactions.push(tx);
204 return;
205 }
206 match msg.header.qos_level {
207 0 => {
208 // with QOS level 0, we do not need to wait for a
209 // response
210 let mut tx = self.new_tx(msg, toclient);
211 tx.complete = true;
212 self.transactions.push(tx);
213 },
214 1..=2 => {
215 if let Some(pkt_id) = publish.message_id {
216 let mut tx = self.new_tx(msg, toclient);
217 tx.pkt_id = Some(pkt_id as u32);
218 self.transactions.push(tx);
219 } else {
220 let mut tx = self.new_tx(msg, toclient);
221 MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
222 self.transactions.push(tx);
223 }
224 },
225 _ => {
226 let mut tx = self.new_tx(msg, toclient);
227 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
228 self.transactions.push(tx);
229 }
230 }
231 },
232 MQTTOperation::SUBSCRIBE(ref subscribe) => {
233 if !self.connected {
234 let mut tx = self.new_tx(msg, toclient);
235 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
236 self.transactions.push(tx);
237 return;
238 }
239 let pkt_id = subscribe.message_id as u32;
240 match msg.header.qos_level {
241 0 => {
242 // with QOS level 0, we do not need to wait for a
243 // response
244 let mut tx = self.new_tx(msg, toclient);
245 tx.complete = true;
246 self.transactions.push(tx);
247 },
248 1..=2 => {
249 let mut tx = self.new_tx(msg, toclient);
250 tx.pkt_id = Some(pkt_id);
251 self.transactions.push(tx);
252 },
253 _ => {
254 let mut tx = self.new_tx(msg, toclient);
255 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
256 self.transactions.push(tx);
257 }
258 }
259 },
260 MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
261 if !self.connected {
262 let mut tx = self.new_tx(msg, toclient);
263 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
264 self.transactions.push(tx);
265 return;
266 }
267 let pkt_id = unsubscribe.message_id as u32;
268 match msg.header.qos_level {
269 0 => {
270 // with QOS level 0, we do not need to wait for a
271 // response
272 let mut tx = self.new_tx(msg, toclient);
273 tx.complete = true;
274 self.transactions.push(tx);
275 },
276 1..=2 => {
277 let mut tx = self.new_tx(msg, toclient);
278 tx.pkt_id = Some(pkt_id);
279 self.transactions.push(tx);
280 },
281 _ => {
282 let mut tx = self.new_tx(msg, toclient);
283 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
284 self.transactions.push(tx);
285 }
286 }
287 },
288 MQTTOperation::CONNACK(ref _connack) => {
289 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
290 (*tx).msg.push(msg);
291 (*tx).complete = true;
292 (*tx).pkt_id = None;
293 self.connected = true;
294 } else {
295 let mut tx = self.new_tx(msg, toclient);
296 MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
297 self.transactions.push(tx);
298 }
299 },
300 MQTTOperation::PUBREC(ref v)
301 | MQTTOperation::PUBREL(ref v) => {
302 if !self.connected {
303 let mut tx = self.new_tx(msg, toclient);
304 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
305 self.transactions.push(tx);
306 return;
307 }
308 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
309 (*tx).msg.push(msg);
310 } else {
311 let mut tx = self.new_tx(msg, toclient);
312 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
313 self.transactions.push(tx);
314 }
315 },
316 MQTTOperation::PUBACK(ref v)
317 | MQTTOperation::PUBCOMP(ref v) => {
318 if !self.connected {
319 let mut tx = self.new_tx(msg, toclient);
320 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
321 self.transactions.push(tx);
322 return;
323 }
324 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
325 (*tx).msg.push(msg);
326 (*tx).complete = true;
327 (*tx).pkt_id = None;
328 } else {
329 let mut tx = self.new_tx(msg, toclient);
330 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
331 self.transactions.push(tx);
332 }
333 },
334 MQTTOperation::SUBACK(ref suback) => {
335 if !self.connected {
336 let mut tx = self.new_tx(msg, toclient);
337 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
338 self.transactions.push(tx);
339 return;
340 }
341 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
342 (*tx).msg.push(msg);
343 (*tx).complete = true;
344 (*tx).pkt_id = None;
345 } else {
346 let mut tx = self.new_tx(msg, toclient);
347 MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
348 self.transactions.push(tx);
349 }
350 },
351 MQTTOperation::UNSUBACK(ref unsuback) => {
352 if !self.connected {
353 let mut tx = self.new_tx(msg, toclient);
354 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
355 self.transactions.push(tx);
356 return;
357 }
358 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
359 (*tx).msg.push(msg);
360 (*tx).complete = true;
361 (*tx).pkt_id = None;
362 } else {
363 let mut tx = self.new_tx(msg, toclient);
364 MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
365 self.transactions.push(tx);
366 }
367 },
368 MQTTOperation::UNASSIGNED => {
369 let mut tx = self.new_tx(msg, toclient);
370 tx.complete = true;
371 MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgtype);
372 self.transactions.push(tx);
373 },
374 MQTTOperation::TRUNCATED(_) => {
375 let mut tx = self.new_tx(msg, toclient);
376 tx.complete = true;
377 self.transactions.push(tx);
378 },
379 MQTTOperation::AUTH(_)
380 | MQTTOperation::DISCONNECT(_) => {
381 if !self.connected {
382 let mut tx = self.new_tx(msg, toclient);
383 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
384 self.transactions.push(tx);
385 return;
386 }
387 let mut tx = self.new_tx(msg, toclient);
388 tx.complete = true;
389 self.transactions.push(tx);
390 },
391 MQTTOperation::PINGREQ
392 | MQTTOperation::PINGRESP => {
393 if !self.connected {
394 let mut tx = self.new_tx(msg, toclient);
395 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
396 self.transactions.push(tx);
397 return;
398 }
399 let mut tx = self.new_tx(msg, toclient);
400 tx.complete = true;
401 self.transactions.push(tx);
402 }
403 }
404 }
405
406 fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
407 let mut current = input;
408 if input.len() == 0 {
409 return AppLayerResult::ok();
410 }
411
412 let mut consumed = 0;
413 SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
414 if self.skip_request > 0 {
415 if input.len() <= self.skip_request {
416 SCLogDebug!("reducing skip_request by {}", input.len());
417 self.skip_request -= input.len();
418 return AppLayerResult::ok();
419 } else {
420 current = &input[self.skip_request..];
421 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
422 consumed = self.skip_request;
423 self.skip_request = 0;
424 }
425 }
426
427
428 while current.len() > 0 {
429 let mut skipped = false;
430 SCLogDebug!("request: handling {}", current.len());
431 match parse_message(current, self.protocol_version, self.max_msg_len) {
432 Ok((rem, msg)) => {
433 SCLogDebug!("request msg {:?}", msg);
434 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
435 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
436 if trunc.skipped_length >= current.len() {
437 skipped = true;
438 self.skip_request = trunc.skipped_length - current.len();
439 } else {
440 current = &current[trunc.skipped_length..];
441 self.skip_request = 0;
442 }
443 }
444 self.handle_msg(msg, false);
445 if skipped {
446 return AppLayerResult::ok();
447 }
448 consumed += current.len() - rem.len();
449 current = rem;
450 }
451 Err(nom::Err::Incomplete(_)) => {
452 SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
453 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
454 }
455 Err(_) => {
456 return AppLayerResult::err();
457 }
458 }
459 }
460
461 return AppLayerResult::ok();
462 }
463
464 fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
465 let mut current = input;
466 if input.len() == 0 {
467 return AppLayerResult::ok();
468 }
469
470 let mut consumed = 0;
471 SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
472 if self.skip_response > 0 {
473 if input.len() <= self.skip_response {
474 self.skip_response -= current.len();
475 return AppLayerResult::ok();
476 } else {
477 current = &input[self.skip_response..];
478 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
479 consumed = self.skip_response;
480 self.skip_response = 0;
481 }
482 }
483
484 while current.len() > 0 {
485 let mut skipped = false;
486 SCLogDebug!("response: handling {}", current.len());
487 match parse_message(current, self.protocol_version, self.max_msg_len as usize) {
488 Ok((rem, msg)) => {
489 SCLogDebug!("response msg {:?}", msg);
490 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
491 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
492 if trunc.skipped_length >= current.len() {
493 skipped = true;
494 self.skip_response = trunc.skipped_length - current.len();
495 } else {
496 current = &current[trunc.skipped_length..];
497 self.skip_response = 0;
498 }
499 SCLogDebug!("skip_response now {}", self.skip_response);
500 }
501 self.handle_msg(msg, true);
502 if skipped {
503 return AppLayerResult::ok();
504 }
505 consumed += current.len() - rem.len();
506 current = rem;
507 }
508 Err(nom::Err::Incomplete(_)) => {
509 SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
510 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
511 }
512 Err(_) => {
513 return AppLayerResult::err();
514 }
515 }
516 }
517
518 return AppLayerResult::ok();
519 }
520
521 fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
522 let ev = event as u8;
523 core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev);
524 }
525
526 fn tx_iterator(
527 &mut self,
528 min_tx_id: u64,
529 state: &mut u64,
530 ) -> Option<(&MQTTTransaction, u64, bool)> {
531 let mut index = *state as usize;
532 let len = self.transactions.len();
533
534 while index < len {
535 let tx = &self.transactions[index];
536 if tx.tx_id < min_tx_id + 1 {
537 index += 1;
538 continue;
539 }
540 *state = index as u64;
541 return Some((tx, tx.tx_id - 1, (len - index) > 1));
542 }
543
544 return None;
545 }
546 }
547
548 // C exports.
549
550 export_tx_get_detect_state!(rs_mqtt_tx_get_detect_state, MQTTTransaction);
551 export_tx_set_detect_state!(rs_mqtt_tx_set_detect_state, MQTTTransaction);
552
553 #[no_mangle]
554 pub unsafe extern "C" fn rs_mqtt_probing_parser(
555 _flow: *const Flow,
556 _direction: u8,
557 input: *const u8,
558 input_len: u32,
559 _rdir: *mut u8,
560 ) -> AppProto {
561 let buf = build_slice!(input, input_len as usize);
562 match parse_fixed_header(buf) {
563 Ok((_, hdr)) => {
564 // reject unassigned message type
565 if hdr.message_type == MQTTTypeCode::UNASSIGNED {
566 return ALPROTO_FAILED;
567 }
568 // with 2 being the highest valid QoS level
569 if hdr.qos_level > 2 {
570 return ALPROTO_FAILED;
571 }
572 return ALPROTO_MQTT;
573 },
574 Err(nom::Err::Incomplete(_)) => ALPROTO_UNKNOWN,
575 Err(_) => ALPROTO_FAILED
576 }
577 }
578
579 #[no_mangle]
580 pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
581 let state = MQTTState::new();
582 let boxed = Box::new(state);
583 return Box::into_raw(boxed) as *mut _;
584 }
585
586 #[no_mangle]
587 pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
588 std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) });
589 }
590
591 #[no_mangle]
592 pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
593 let state = cast_pointer!(state, MQTTState);
594 state.free_tx(tx_id);
595 }
596
597 #[no_mangle]
598 pub unsafe extern "C" fn rs_mqtt_parse_request(
599 _flow: *const Flow,
600 state: *mut std::os::raw::c_void,
601 _pstate: *mut std::os::raw::c_void,
602 input: *const u8,
603 input_len: u32,
604 _data: *const std::os::raw::c_void,
605 _flags: u8,
606 ) -> AppLayerResult {
607 let state = cast_pointer!(state, MQTTState);
608 let buf = build_slice!(input, input_len as usize);
609 return state.parse_request(buf).into();
610 }
611
612 #[no_mangle]
613 pub unsafe extern "C" fn rs_mqtt_parse_response(
614 _flow: *const Flow,
615 state: *mut std::os::raw::c_void,
616 _pstate: *mut std::os::raw::c_void,
617 input: *const u8,
618 input_len: u32,
619 _data: *const std::os::raw::c_void,
620 _flags: u8,
621 ) -> AppLayerResult {
622 let state = cast_pointer!(state, MQTTState);
623 let buf = build_slice!(input, input_len as usize);
624 return state.parse_response(buf).into();
625 }
626
627 #[no_mangle]
628 pub unsafe extern "C" fn rs_mqtt_state_get_tx(
629 state: *mut std::os::raw::c_void,
630 tx_id: u64,
631 ) -> *mut std::os::raw::c_void {
632 let state = cast_pointer!(state, MQTTState);
633 match state.get_tx(tx_id) {
634 Some(tx) => {
635 return tx as *const _ as *mut _;
636 }
637 None => {
638 return std::ptr::null_mut();
639 }
640 }
641 }
642
643 #[no_mangle]
644 pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
645 let state = cast_pointer!(state, MQTTState);
646 return state.tx_id;
647 }
648
649 #[no_mangle]
650 pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
651 let tx = cast_pointer!(tx, MQTTTransaction);
652 if tx.toclient {
653 return 1;
654 }
655 return 0;
656 }
657
658 #[no_mangle]
659 pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
660 tx: *mut std::os::raw::c_void,
661 direction: u8,
662 ) -> std::os::raw::c_int {
663 let tx = cast_pointer!(tx, MQTTTransaction);
664 if tx.complete {
665 if direction == core::STREAM_TOSERVER {
666 if tx.toserver {
667 return 1;
668 }
669 } else if direction == core::STREAM_TOCLIENT {
670 if tx.toclient {
671 return 1;
672 }
673 }
674 }
675 return 0;
676 }
677
678 #[no_mangle]
679 pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
680 _state: *mut std::os::raw::c_void,
681 tx: *mut std::os::raw::c_void,
682 ) -> u32 {
683 let tx = cast_pointer!(tx, MQTTTransaction);
684 return tx.logged.get();
685 }
686
687 #[no_mangle]
688 pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
689 _state: *mut std::os::raw::c_void,
690 tx: *mut std::os::raw::c_void,
691 logged: u32,
692 ) {
693 let tx = cast_pointer!(tx, MQTTTransaction);
694 tx.logged.set(logged);
695 }
696
697 #[no_mangle]
698 pub unsafe extern "C" fn rs_mqtt_state_get_events(
699 tx: *mut std::os::raw::c_void,
700 ) -> *mut core::AppLayerDecoderEvents {
701 let tx = cast_pointer!(tx, MQTTTransaction);
702 return tx.events;
703 }
704
705 #[no_mangle]
706 pub unsafe extern "C" fn rs_mqtt_state_get_event_info_by_id(event_id: std::os::raw::c_int,
707 event_name: *mut *const std::os::raw::c_char,
708 event_type: *mut core::AppLayerEventType)
709 -> i8
710 {
711 if let Some(e) = FromPrimitive::from_i32(event_id as i32) {
712 let estr = match e {
713 MQTTEvent::MissingConnect => { "missing_connect\0" },
714 MQTTEvent::MissingPublish => { "missing_publish\0" },
715 MQTTEvent::MissingSubscribe => { "missing_subscribe\0" },
716 MQTTEvent::MissingUnsubscribe => { "missing_unsubscribe\0" },
717 MQTTEvent::DoubleConnect => { "double_connect\0" },
718 MQTTEvent::UnintroducedMessage => { "unintroduced_message\0" },
719 MQTTEvent::InvalidQosLevel => { "invalid_qos_level\0" },
720 MQTTEvent::MissingMsgId => { "missing_msg_id\0" },
721 MQTTEvent::UnassignedMsgtype => { "unassigned_msg_type\0" },
722 };
723 *event_name = estr.as_ptr() as *const std::os::raw::c_char;
724 *event_type = core::APP_LAYER_EVENT_TYPE_TRANSACTION;
725 0
726 } else {
727 -1
728 }
729 }
730
731 #[no_mangle]
732 pub unsafe extern "C" fn rs_mqtt_state_get_event_info(event_name: *const std::os::raw::c_char,
733 event_id: *mut std::os::raw::c_int,
734 event_type: *mut core::AppLayerEventType)
735 -> std::os::raw::c_int
736 {
737 if event_name == std::ptr::null() { return -1; }
738 let c_event_name: &CStr = CStr::from_ptr(event_name);
739 let event = match c_event_name.to_str() {
740 Ok(s) => {
741 match s {
742 "missing_connect" => MQTTEvent::MissingConnect as i32,
743 "missing_publish" => MQTTEvent::MissingPublish as i32,
744 "missing_subscribe" => MQTTEvent::MissingSubscribe as i32,
745 "missing_unsubscribe" => MQTTEvent::MissingUnsubscribe as i32,
746 "double_connect" => MQTTEvent::DoubleConnect as i32,
747 "unintroduced_message" => MQTTEvent::UnintroducedMessage as i32,
748 "invalid_qos_level" => MQTTEvent::InvalidQosLevel as i32,
749 "missing_msg_id" => MQTTEvent::MissingMsgId as i32,
750 "unassigned_msg_type" => MQTTEvent::UnassignedMsgtype as i32,
751 _ => -1, // unknown event
752 }
753 },
754 Err(_) => -1, // UTF-8 conversion failed
755 };
756 *event_type = core::APP_LAYER_EVENT_TYPE_TRANSACTION;
757 *event_id = event as std::os::raw::c_int;
758 0
759 }
760
761 #[no_mangle]
762 pub unsafe extern "C" fn rs_mqtt_state_get_tx_iterator(
763 _ipproto: u8,
764 _alproto: AppProto,
765 state: *mut std::os::raw::c_void,
766 min_tx_id: u64,
767 _max_tx_id: u64,
768 istate: &mut u64,
769 ) -> applayer::AppLayerGetTxIterTuple {
770 let state = cast_pointer!(state, MQTTState);
771 match state.tx_iterator(min_tx_id, istate) {
772 Some((tx, out_tx_id, has_next)) => {
773 let c_tx = tx as *const _ as *mut _;
774 let ires = applayer::AppLayerGetTxIterTuple::with_values(c_tx, out_tx_id, has_next);
775 return ires;
776 }
777 None => {
778 return applayer::AppLayerGetTxIterTuple::not_found();
779 }
780 }
781 }
782
783 // Parser name as a C style string.
784 const PARSER_NAME: &'static [u8] = b"mqtt\0";
785
786 export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
787
788 #[no_mangle]
789 pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
790 let default_port = CString::new("[1883]").unwrap();
791 let max_msg_len = &mut MAX_MSG_LEN;
792 *max_msg_len = cfg_max_msg_len;
793 let parser = RustParser {
794 name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
795 default_port: default_port.as_ptr(),
796 ipproto: IPPROTO_TCP,
797 probe_ts: Some(rs_mqtt_probing_parser),
798 probe_tc: Some(rs_mqtt_probing_parser),
799 min_depth: 0,
800 max_depth: 16,
801 state_new: rs_mqtt_state_new,
802 state_free: rs_mqtt_state_free,
803 tx_free: rs_mqtt_state_tx_free,
804 parse_ts: rs_mqtt_parse_request,
805 parse_tc: rs_mqtt_parse_response,
806 get_tx_count: rs_mqtt_state_get_tx_count,
807 get_tx: rs_mqtt_state_get_tx,
808 tx_comp_st_ts: 1,
809 tx_comp_st_tc: 1,
810 tx_get_progress: rs_mqtt_tx_get_alstate_progress,
811 get_de_state: rs_mqtt_tx_get_detect_state,
812 set_de_state: rs_mqtt_tx_set_detect_state,
813 get_events: Some(rs_mqtt_state_get_events),
814 get_eventinfo: Some(rs_mqtt_state_get_event_info),
815 get_eventinfo_byid: Some(rs_mqtt_state_get_event_info_by_id),
816 localstorage_new: None,
817 localstorage_free: None,
818 get_files: None,
819 get_tx_iterator: Some(rs_mqtt_state_get_tx_iterator),
820 get_tx_data: rs_mqtt_get_tx_data,
821 apply_tx_config: None,
822 flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS,
823 truncate: None,
824 };
825
826 let ip_proto_str = CString::new("tcp").unwrap();
827
828 if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
829 let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
830 ALPROTO_MQTT = alproto;
831 if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
832 let _ = AppLayerRegisterParser(&parser, alproto);
833 }
834 } else {
835 SCLogDebug!("Protocol detector and parser disabled for MQTT.");
836 }
837 }