]> git.ipfire.org Git - people/ms/suricata.git/blame - rust/src/mqtt/mqtt.rs
app-layer: include decoder events in app-layer tx data
[people/ms/suricata.git] / rust / src / mqtt / mqtt.rs
CommitLineData
c3136007
SS
1/* Copyright (C) 2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18// written by Sascha Steinbiss <sascha@steinbiss.name>
19
20use super::mqtt_message::*;
21use super::parser::*;
22use crate::applayer::{self, LoggerFlags};
23use crate::applayer::*;
7732efbe 24use crate::core::*;
8a584c21 25use nom7::Err;
c3136007 26use std;
8eac5fc2 27use std::ffi::CString;
c3136007
SS
28
29// Used as a special pseudo packet identifier to denote the first CONNECT
30// packet in a connection. Note that there is no risk of collision with a
31// parsed packet identifier because in the protocol these are only 16 bit
32// unsigned.
33const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
34// Maximum message length in bytes. If the length of a message exceeds
35// this value, it will be truncated. Default: 1MB.
36static mut MAX_MSG_LEN: u32 = 1048576;
37
38static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
39
8eac5fc2 40#[derive(FromPrimitive, Debug, AppLayerEvent)]
c3136007 41pub enum MQTTEvent {
8eac5fc2 42 MissingConnect,
c3136007
SS
43 MissingPublish,
44 MissingSubscribe,
45 MissingUnsubscribe,
46 DoubleConnect,
47 UnintroducedMessage,
48 InvalidQosLevel,
49 MissingMsgId,
8eac5fc2 50 UnassignedMsgType,
c3136007
SS
51}
52
53#[derive(Debug)]
54pub struct MQTTTransaction {
55 tx_id: u64,
56 pkt_id: Option<u32>,
57 pub msg: Vec<MQTTMessage>,
58 complete: bool,
59 toclient: bool,
60 toserver: bool,
61
62 logged: LoggerFlags,
c3136007
SS
63 tx_data: applayer::AppLayerTxData,
64}
65
66impl MQTTTransaction {
67 pub fn new(msg: MQTTMessage) -> MQTTTransaction {
68 let mut m = MQTTTransaction {
69 tx_id: 0,
70 pkt_id: None,
71 complete: false,
72 logged: LoggerFlags::new(),
73 msg: Vec::new(),
74 toclient: false,
75 toserver: false,
c3136007
SS
76 tx_data: applayer::AppLayerTxData::new(),
77 };
78 m.msg.push(msg);
79 return m;
80 }
c3136007
SS
81}
82
b3354096
JI
83impl Transaction for MQTTTransaction {
84 fn id(&self) -> u64 {
85 self.tx_id
86 }
87}
88
c3136007
SS
89pub struct MQTTState {
90 tx_id: u64,
91 pub protocol_version: u8,
92 transactions: Vec<MQTTTransaction>,
93 connected: bool,
94 skip_request: usize,
95 skip_response: usize,
96 max_msg_len: usize,
97}
98
b3354096
JI
99impl State<MQTTTransaction> for MQTTState {
100 fn get_transactions(&self) -> &[MQTTTransaction] {
101 &self.transactions
102 }
103}
104
c3136007
SS
105impl MQTTState {
106 pub fn new() -> Self {
107 Self {
108 tx_id: 0,
109 protocol_version: 0,
110 transactions: Vec::new(),
111 connected: false,
112 skip_request: 0,
113 skip_response: 0,
114 max_msg_len: unsafe { MAX_MSG_LEN as usize },
115 }
116 }
117
118 fn free_tx(&mut self, tx_id: u64) {
119 let len = self.transactions.len();
120 let mut found = false;
121 let mut index = 0;
122 for i in 0..len {
123 let tx = &self.transactions[i];
124 if tx.tx_id == tx_id + 1 {
125 found = true;
126 index = i;
127 break;
128 }
129 }
130 if found {
131 self.transactions.remove(index);
132 }
133 }
134
135 pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
136 for tx in &mut self.transactions {
137 if tx.tx_id == tx_id + 1 {
138 return Some(tx);
139 }
140 }
141 return None;
142 }
143
144 pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
145 for tx in &mut self.transactions {
146 if !tx.complete {
147 if let Some(mpktid) = tx.pkt_id {
148 if mpktid == pkt_id {
149 return Some(tx);
150 }
151 }
152 }
153 }
154 return None;
155 }
156
157 fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
158 let mut tx = MQTTTransaction::new(msg);
159 self.tx_id += 1;
160 tx.tx_id = self.tx_id;
161 if toclient {
162 tx.toclient = true;
163 } else {
164 tx.toserver = true;
165 }
166 return tx;
167 }
168
169 // Handle a MQTT message depending on the direction and state.
170 // Note that we are trying to only have one mutable reference to msg
171 // and its components, however, since we are in a large match operation,
172 // we cannot pass around and/or store more references or move things
173 // without having to introduce lifetimes etc.
174 // This is the reason for the code duplication below. Maybe there is a
175 // more concise way to do it, but this works for now.
176 fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
177 match msg.op {
178 MQTTOperation::CONNECT(ref conn) => {
179 self.protocol_version = conn.protocol_version;
180 if self.connected {
181 let mut tx = self.new_tx(msg, toclient);
d541b3d4 182 MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
c3136007
SS
183 self.transactions.push(tx);
184 } else {
185 let mut tx = self.new_tx(msg, toclient);
186 tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
187 self.transactions.push(tx);
188 }
189 },
190 MQTTOperation::PUBLISH(ref publish) => {
191 if !self.connected {
192 let mut tx = self.new_tx(msg, toclient);
d541b3d4 193 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
194 self.transactions.push(tx);
195 return;
196 }
197 match msg.header.qos_level {
198 0 => {
199 // with QOS level 0, we do not need to wait for a
200 // response
201 let mut tx = self.new_tx(msg, toclient);
202 tx.complete = true;
203 self.transactions.push(tx);
204 },
205 1..=2 => {
206 if let Some(pkt_id) = publish.message_id {
207 let mut tx = self.new_tx(msg, toclient);
208 tx.pkt_id = Some(pkt_id as u32);
209 self.transactions.push(tx);
210 } else {
211 let mut tx = self.new_tx(msg, toclient);
d541b3d4 212 MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
c3136007
SS
213 self.transactions.push(tx);
214 }
215 },
216 _ => {
217 let mut tx = self.new_tx(msg, toclient);
d541b3d4 218 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
c3136007
SS
219 self.transactions.push(tx);
220 }
221 }
222 },
223 MQTTOperation::SUBSCRIBE(ref subscribe) => {
224 if !self.connected {
225 let mut tx = self.new_tx(msg, toclient);
d541b3d4 226 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
227 self.transactions.push(tx);
228 return;
229 }
230 let pkt_id = subscribe.message_id as u32;
231 match msg.header.qos_level {
232 0 => {
233 // with QOS level 0, we do not need to wait for a
234 // response
235 let mut tx = self.new_tx(msg, toclient);
236 tx.complete = true;
237 self.transactions.push(tx);
238 },
239 1..=2 => {
240 let mut tx = self.new_tx(msg, toclient);
241 tx.pkt_id = Some(pkt_id);
242 self.transactions.push(tx);
243 },
244 _ => {
245 let mut tx = self.new_tx(msg, toclient);
d541b3d4 246 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
c3136007
SS
247 self.transactions.push(tx);
248 }
249 }
250 },
251 MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
252 if !self.connected {
253 let mut tx = self.new_tx(msg, toclient);
d541b3d4 254 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
255 self.transactions.push(tx);
256 return;
257 }
258 let pkt_id = unsubscribe.message_id as u32;
259 match msg.header.qos_level {
260 0 => {
261 // with QOS level 0, we do not need to wait for a
262 // response
263 let mut tx = self.new_tx(msg, toclient);
264 tx.complete = true;
265 self.transactions.push(tx);
266 },
267 1..=2 => {
268 let mut tx = self.new_tx(msg, toclient);
269 tx.pkt_id = Some(pkt_id);
270 self.transactions.push(tx);
271 },
272 _ => {
273 let mut tx = self.new_tx(msg, toclient);
d541b3d4 274 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
c3136007
SS
275 self.transactions.push(tx);
276 }
277 }
278 },
279 MQTTOperation::CONNACK(ref _connack) => {
280 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
281 (*tx).msg.push(msg);
282 (*tx).complete = true;
283 (*tx).pkt_id = None;
284 self.connected = true;
285 } else {
286 let mut tx = self.new_tx(msg, toclient);
d541b3d4 287 MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
c3136007
SS
288 self.transactions.push(tx);
289 }
290 },
291 MQTTOperation::PUBREC(ref v)
292 | MQTTOperation::PUBREL(ref v) => {
293 if !self.connected {
294 let mut tx = self.new_tx(msg, toclient);
d541b3d4 295 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
296 self.transactions.push(tx);
297 return;
298 }
299 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
300 (*tx).msg.push(msg);
301 } else {
302 let mut tx = self.new_tx(msg, toclient);
d541b3d4 303 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
c3136007
SS
304 self.transactions.push(tx);
305 }
306 },
307 MQTTOperation::PUBACK(ref v)
308 | MQTTOperation::PUBCOMP(ref v) => {
309 if !self.connected {
310 let mut tx = self.new_tx(msg, toclient);
d541b3d4 311 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
312 self.transactions.push(tx);
313 return;
314 }
315 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
316 (*tx).msg.push(msg);
317 (*tx).complete = true;
318 (*tx).pkt_id = None;
319 } else {
320 let mut tx = self.new_tx(msg, toclient);
d541b3d4 321 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
c3136007
SS
322 self.transactions.push(tx);
323 }
324 },
325 MQTTOperation::SUBACK(ref suback) => {
326 if !self.connected {
327 let mut tx = self.new_tx(msg, toclient);
d541b3d4 328 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
329 self.transactions.push(tx);
330 return;
331 }
332 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
333 (*tx).msg.push(msg);
334 (*tx).complete = true;
335 (*tx).pkt_id = None;
336 } else {
337 let mut tx = self.new_tx(msg, toclient);
d541b3d4 338 MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
c3136007
SS
339 self.transactions.push(tx);
340 }
341 },
342 MQTTOperation::UNSUBACK(ref unsuback) => {
343 if !self.connected {
344 let mut tx = self.new_tx(msg, toclient);
d541b3d4 345 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
346 self.transactions.push(tx);
347 return;
348 }
349 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
350 (*tx).msg.push(msg);
351 (*tx).complete = true;
352 (*tx).pkt_id = None;
353 } else {
354 let mut tx = self.new_tx(msg, toclient);
d541b3d4 355 MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
c3136007
SS
356 self.transactions.push(tx);
357 }
358 },
359 MQTTOperation::UNASSIGNED => {
360 let mut tx = self.new_tx(msg, toclient);
361 tx.complete = true;
8eac5fc2 362 MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
c3136007
SS
363 self.transactions.push(tx);
364 },
365 MQTTOperation::TRUNCATED(_) => {
366 let mut tx = self.new_tx(msg, toclient);
367 tx.complete = true;
368 self.transactions.push(tx);
369 },
370 MQTTOperation::AUTH(_)
371 | MQTTOperation::DISCONNECT(_) => {
372 if !self.connected {
373 let mut tx = self.new_tx(msg, toclient);
d541b3d4 374 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
375 self.transactions.push(tx);
376 return;
377 }
378 let mut tx = self.new_tx(msg, toclient);
379 tx.complete = true;
380 self.transactions.push(tx);
381 },
382 MQTTOperation::PINGREQ
383 | MQTTOperation::PINGRESP => {
384 if !self.connected {
385 let mut tx = self.new_tx(msg, toclient);
d541b3d4 386 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
387 self.transactions.push(tx);
388 return;
389 }
390 let mut tx = self.new_tx(msg, toclient);
391 tx.complete = true;
392 self.transactions.push(tx);
393 }
394 }
395 }
396
397 fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
398 let mut current = input;
399 if input.len() == 0 {
400 return AppLayerResult::ok();
401 }
402
403 let mut consumed = 0;
404 SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
405 if self.skip_request > 0 {
406 if input.len() <= self.skip_request {
407 SCLogDebug!("reducing skip_request by {}", input.len());
408 self.skip_request -= input.len();
409 return AppLayerResult::ok();
410 } else {
411 current = &input[self.skip_request..];
412 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
413 consumed = self.skip_request;
414 self.skip_request = 0;
415 }
416 }
417
418
419 while current.len() > 0 {
420 let mut skipped = false;
421 SCLogDebug!("request: handling {}", current.len());
422 match parse_message(current, self.protocol_version, self.max_msg_len) {
423 Ok((rem, msg)) => {
424 SCLogDebug!("request msg {:?}", msg);
425 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
426 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
427 if trunc.skipped_length >= current.len() {
428 skipped = true;
429 self.skip_request = trunc.skipped_length - current.len();
430 } else {
431 current = &current[trunc.skipped_length..];
432 self.skip_request = 0;
433 }
434 }
435 self.handle_msg(msg, false);
436 if skipped {
437 return AppLayerResult::ok();
438 }
439 consumed += current.len() - rem.len();
440 current = rem;
441 }
8a584c21 442 Err(Err::Incomplete(_)) => {
c3136007
SS
443 SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
444 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
445 }
446 Err(_) => {
447 return AppLayerResult::err();
448 }
449 }
450 }
451
452 return AppLayerResult::ok();
453 }
454
455 fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
456 let mut current = input;
457 if input.len() == 0 {
458 return AppLayerResult::ok();
459 }
460
461 let mut consumed = 0;
462 SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
463 if self.skip_response > 0 {
464 if input.len() <= self.skip_response {
465 self.skip_response -= current.len();
466 return AppLayerResult::ok();
467 } else {
468 current = &input[self.skip_response..];
469 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
470 consumed = self.skip_response;
471 self.skip_response = 0;
472 }
473 }
474
475 while current.len() > 0 {
476 let mut skipped = false;
477 SCLogDebug!("response: handling {}", current.len());
478 match parse_message(current, self.protocol_version, self.max_msg_len as usize) {
479 Ok((rem, msg)) => {
480 SCLogDebug!("response msg {:?}", msg);
481 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
482 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
483 if trunc.skipped_length >= current.len() {
484 skipped = true;
485 self.skip_response = trunc.skipped_length - current.len();
486 } else {
487 current = &current[trunc.skipped_length..];
488 self.skip_response = 0;
489 }
490 SCLogDebug!("skip_response now {}", self.skip_response);
491 }
492 self.handle_msg(msg, true);
493 if skipped {
494 return AppLayerResult::ok();
495 }
496 consumed += current.len() - rem.len();
497 current = rem;
498 }
8a584c21 499 Err(Err::Incomplete(_)) => {
c3136007
SS
500 SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
501 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
502 }
503 Err(_) => {
504 return AppLayerResult::err();
505 }
506 }
507 }
508
509 return AppLayerResult::ok();
510 }
511
512 fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
7732efbe 513 tx.tx_data.set_event(event as u8);
c3136007 514 }
c3136007
SS
515}
516
517// C exports.
518
c3136007 519#[no_mangle]
363b5f99 520pub unsafe extern "C" fn rs_mqtt_probing_parser(
c3136007
SS
521 _flow: *const Flow,
522 _direction: u8,
523 input: *const u8,
524 input_len: u32,
525 _rdir: *mut u8,
526) -> AppProto {
527 let buf = build_slice!(input, input_len as usize);
528 match parse_fixed_header(buf) {
529 Ok((_, hdr)) => {
530 // reject unassigned message type
531 if hdr.message_type == MQTTTypeCode::UNASSIGNED {
363b5f99 532 return ALPROTO_FAILED;
c3136007
SS
533 }
534 // with 2 being the highest valid QoS level
535 if hdr.qos_level > 2 {
363b5f99 536 return ALPROTO_FAILED;
c3136007 537 }
363b5f99 538 return ALPROTO_MQTT;
c3136007 539 },
8a584c21 540 Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
363b5f99 541 Err(_) => ALPROTO_FAILED
c3136007
SS
542 }
543}
544
545#[no_mangle]
547d6c2d 546pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
c3136007
SS
547 let state = MQTTState::new();
548 let boxed = Box::new(state);
53413f2d 549 return Box::into_raw(boxed) as *mut _;
c3136007
SS
550}
551
552#[no_mangle]
553pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
53413f2d 554 std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) });
c3136007
SS
555}
556
557#[no_mangle]
363b5f99 558pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
c3136007
SS
559 let state = cast_pointer!(state, MQTTState);
560 state.free_tx(tx_id);
561}
562
563#[no_mangle]
363b5f99 564pub unsafe extern "C" fn rs_mqtt_parse_request(
c3136007
SS
565 _flow: *const Flow,
566 state: *mut std::os::raw::c_void,
567 _pstate: *mut std::os::raw::c_void,
568 input: *const u8,
569 input_len: u32,
570 _data: *const std::os::raw::c_void,
571 _flags: u8,
572) -> AppLayerResult {
573 let state = cast_pointer!(state, MQTTState);
574 let buf = build_slice!(input, input_len as usize);
ac3a20b6 575 return state.parse_request(buf);
c3136007
SS
576}
577
578#[no_mangle]
363b5f99 579pub unsafe extern "C" fn rs_mqtt_parse_response(
c3136007
SS
580 _flow: *const Flow,
581 state: *mut std::os::raw::c_void,
582 _pstate: *mut std::os::raw::c_void,
583 input: *const u8,
584 input_len: u32,
585 _data: *const std::os::raw::c_void,
586 _flags: u8,
587) -> AppLayerResult {
588 let state = cast_pointer!(state, MQTTState);
589 let buf = build_slice!(input, input_len as usize);
ac3a20b6 590 return state.parse_response(buf);
c3136007
SS
591}
592
593#[no_mangle]
363b5f99 594pub unsafe extern "C" fn rs_mqtt_state_get_tx(
c3136007
SS
595 state: *mut std::os::raw::c_void,
596 tx_id: u64,
597) -> *mut std::os::raw::c_void {
598 let state = cast_pointer!(state, MQTTState);
599 match state.get_tx(tx_id) {
600 Some(tx) => {
53413f2d 601 return tx as *const _ as *mut _;
c3136007
SS
602 }
603 None => {
604 return std::ptr::null_mut();
605 }
606 }
607}
608
609#[no_mangle]
363b5f99 610pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
c3136007
SS
611 let state = cast_pointer!(state, MQTTState);
612 return state.tx_id;
613}
614
c3136007 615#[no_mangle]
363b5f99 616pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
c3136007
SS
617 let tx = cast_pointer!(tx, MQTTTransaction);
618 if tx.toclient {
619 return 1;
620 }
621 return 0;
622}
623
624#[no_mangle]
363b5f99 625pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
c3136007
SS
626 tx: *mut std::os::raw::c_void,
627 direction: u8,
628) -> std::os::raw::c_int {
629 let tx = cast_pointer!(tx, MQTTTransaction);
630 if tx.complete {
a7ac79be 631 if direction == Direction::ToServer.into() {
c3136007
SS
632 if tx.toserver {
633 return 1;
634 }
a7ac79be 635 } else if direction == Direction::ToClient.into() {
c3136007
SS
636 if tx.toclient {
637 return 1;
638 }
639 }
640 }
641 return 0;
642}
643
644#[no_mangle]
363b5f99 645pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
c3136007
SS
646 _state: *mut std::os::raw::c_void,
647 tx: *mut std::os::raw::c_void,
648) -> u32 {
649 let tx = cast_pointer!(tx, MQTTTransaction);
650 return tx.logged.get();
651}
652
653#[no_mangle]
363b5f99 654pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
c3136007
SS
655 _state: *mut std::os::raw::c_void,
656 tx: *mut std::os::raw::c_void,
657 logged: u32,
658) {
659 let tx = cast_pointer!(tx, MQTTTransaction);
660 tx.logged.set(logged);
661}
662
c3136007
SS
663// Parser name as a C style string.
664const PARSER_NAME: &'static [u8] = b"mqtt\0";
665
666export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
667
668#[no_mangle]
669pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
670 let default_port = CString::new("[1883]").unwrap();
671 let max_msg_len = &mut MAX_MSG_LEN;
672 *max_msg_len = cfg_max_msg_len;
673 let parser = RustParser {
674 name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
675 default_port: default_port.as_ptr(),
676 ipproto: IPPROTO_TCP,
677 probe_ts: Some(rs_mqtt_probing_parser),
678 probe_tc: Some(rs_mqtt_probing_parser),
679 min_depth: 0,
680 max_depth: 16,
681 state_new: rs_mqtt_state_new,
682 state_free: rs_mqtt_state_free,
683 tx_free: rs_mqtt_state_tx_free,
684 parse_ts: rs_mqtt_parse_request,
685 parse_tc: rs_mqtt_parse_response,
686 get_tx_count: rs_mqtt_state_get_tx_count,
687 get_tx: rs_mqtt_state_get_tx,
efc9a7a3
VJ
688 tx_comp_st_ts: 1,
689 tx_comp_st_tc: 1,
c3136007 690 tx_get_progress: rs_mqtt_tx_get_alstate_progress,
8eac5fc2
JI
691 get_eventinfo: Some(MQTTEvent::get_event_info),
692 get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id),
c3136007
SS
693 localstorage_new: None,
694 localstorage_free: None,
695 get_files: None,
b3354096 696 get_tx_iterator: Some(crate::applayer::state_get_tx_iterator::<MQTTState, MQTTTransaction>),
c3136007
SS
697 get_tx_data: rs_mqtt_get_tx_data,
698 apply_tx_config: None,
ff674d0c 699 flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS,
4da0d9bd 700 truncate: None,
c3136007
SS
701 };
702
703 let ip_proto_str = CString::new("tcp").unwrap();
704
705 if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
706 let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
707 ALPROTO_MQTT = alproto;
708 if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
709 let _ = AppLayerRegisterParser(&parser, alproto);
710 }
711 } else {
712 SCLogDebug!("Protocol detector and parser disabled for MQTT.");
713 }
714}