]> git.ipfire.org Git - people/ms/suricata.git/blame - rust/src/mqtt/mqtt.rs
app-layer: include DetectEngineState in AppLayerTxData
[people/ms/suricata.git] / rust / src / mqtt / mqtt.rs
CommitLineData
c3136007
SS
1/* Copyright (C) 2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18// written by Sascha Steinbiss <sascha@steinbiss.name>
19
20use super::mqtt_message::*;
21use super::parser::*;
22use crate::applayer::{self, LoggerFlags};
23use crate::applayer::*;
a7ac79be 24use crate::core::{self, *};
8a584c21 25use nom7::Err;
c3136007 26use std;
8eac5fc2 27use std::ffi::CString;
c3136007
SS
28
29// Used as a special pseudo packet identifier to denote the first CONNECT
30// packet in a connection. Note that there is no risk of collision with a
31// parsed packet identifier because in the protocol these are only 16 bit
32// unsigned.
33const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
34// Maximum message length in bytes. If the length of a message exceeds
35// this value, it will be truncated. Default: 1MB.
36static mut MAX_MSG_LEN: u32 = 1048576;
37
38static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
39
8eac5fc2 40#[derive(FromPrimitive, Debug, AppLayerEvent)]
c3136007 41pub enum MQTTEvent {
8eac5fc2 42 MissingConnect,
c3136007
SS
43 MissingPublish,
44 MissingSubscribe,
45 MissingUnsubscribe,
46 DoubleConnect,
47 UnintroducedMessage,
48 InvalidQosLevel,
49 MissingMsgId,
8eac5fc2 50 UnassignedMsgType,
c3136007
SS
51}
52
53#[derive(Debug)]
54pub struct MQTTTransaction {
55 tx_id: u64,
56 pkt_id: Option<u32>,
57 pub msg: Vec<MQTTMessage>,
58 complete: bool,
59 toclient: bool,
60 toserver: bool,
61
62 logged: LoggerFlags,
c3136007
SS
63 events: *mut core::AppLayerDecoderEvents,
64 tx_data: applayer::AppLayerTxData,
65}
66
67impl MQTTTransaction {
68 pub fn new(msg: MQTTMessage) -> MQTTTransaction {
69 let mut m = MQTTTransaction {
70 tx_id: 0,
71 pkt_id: None,
72 complete: false,
73 logged: LoggerFlags::new(),
74 msg: Vec::new(),
75 toclient: false,
76 toserver: false,
c3136007
SS
77 events: std::ptr::null_mut(),
78 tx_data: applayer::AppLayerTxData::new(),
79 };
80 m.msg.push(msg);
81 return m;
82 }
83
84 pub fn free(&mut self) {
922a453d 85 if !self.events.is_null() {
c3136007
SS
86 core::sc_app_layer_decoder_events_free_events(&mut self.events);
87 }
c3136007
SS
88 }
89}
90
91impl Drop for MQTTTransaction {
92 fn drop(&mut self) {
93 self.free();
94 }
95}
96
b3354096
JI
97impl Transaction for MQTTTransaction {
98 fn id(&self) -> u64 {
99 self.tx_id
100 }
101}
102
c3136007
SS
103pub struct MQTTState {
104 tx_id: u64,
105 pub protocol_version: u8,
106 transactions: Vec<MQTTTransaction>,
107 connected: bool,
108 skip_request: usize,
109 skip_response: usize,
110 max_msg_len: usize,
111}
112
b3354096
JI
113impl State<MQTTTransaction> for MQTTState {
114 fn get_transactions(&self) -> &[MQTTTransaction] {
115 &self.transactions
116 }
117}
118
c3136007
SS
119impl MQTTState {
120 pub fn new() -> Self {
121 Self {
122 tx_id: 0,
123 protocol_version: 0,
124 transactions: Vec::new(),
125 connected: false,
126 skip_request: 0,
127 skip_response: 0,
128 max_msg_len: unsafe { MAX_MSG_LEN as usize },
129 }
130 }
131
132 fn free_tx(&mut self, tx_id: u64) {
133 let len = self.transactions.len();
134 let mut found = false;
135 let mut index = 0;
136 for i in 0..len {
137 let tx = &self.transactions[i];
138 if tx.tx_id == tx_id + 1 {
139 found = true;
140 index = i;
141 break;
142 }
143 }
144 if found {
145 self.transactions.remove(index);
146 }
147 }
148
149 pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
150 for tx in &mut self.transactions {
151 if tx.tx_id == tx_id + 1 {
152 return Some(tx);
153 }
154 }
155 return None;
156 }
157
158 pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
159 for tx in &mut self.transactions {
160 if !tx.complete {
161 if let Some(mpktid) = tx.pkt_id {
162 if mpktid == pkt_id {
163 return Some(tx);
164 }
165 }
166 }
167 }
168 return None;
169 }
170
171 fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
172 let mut tx = MQTTTransaction::new(msg);
173 self.tx_id += 1;
174 tx.tx_id = self.tx_id;
175 if toclient {
176 tx.toclient = true;
177 } else {
178 tx.toserver = true;
179 }
180 return tx;
181 }
182
183 // Handle a MQTT message depending on the direction and state.
184 // Note that we are trying to only have one mutable reference to msg
185 // and its components, however, since we are in a large match operation,
186 // we cannot pass around and/or store more references or move things
187 // without having to introduce lifetimes etc.
188 // This is the reason for the code duplication below. Maybe there is a
189 // more concise way to do it, but this works for now.
190 fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
191 match msg.op {
192 MQTTOperation::CONNECT(ref conn) => {
193 self.protocol_version = conn.protocol_version;
194 if self.connected {
195 let mut tx = self.new_tx(msg, toclient);
d541b3d4 196 MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
c3136007
SS
197 self.transactions.push(tx);
198 } else {
199 let mut tx = self.new_tx(msg, toclient);
200 tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
201 self.transactions.push(tx);
202 }
203 },
204 MQTTOperation::PUBLISH(ref publish) => {
205 if !self.connected {
206 let mut tx = self.new_tx(msg, toclient);
d541b3d4 207 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
208 self.transactions.push(tx);
209 return;
210 }
211 match msg.header.qos_level {
212 0 => {
213 // with QOS level 0, we do not need to wait for a
214 // response
215 let mut tx = self.new_tx(msg, toclient);
216 tx.complete = true;
217 self.transactions.push(tx);
218 },
219 1..=2 => {
220 if let Some(pkt_id) = publish.message_id {
221 let mut tx = self.new_tx(msg, toclient);
222 tx.pkt_id = Some(pkt_id as u32);
223 self.transactions.push(tx);
224 } else {
225 let mut tx = self.new_tx(msg, toclient);
d541b3d4 226 MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
c3136007
SS
227 self.transactions.push(tx);
228 }
229 },
230 _ => {
231 let mut tx = self.new_tx(msg, toclient);
d541b3d4 232 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
c3136007
SS
233 self.transactions.push(tx);
234 }
235 }
236 },
237 MQTTOperation::SUBSCRIBE(ref subscribe) => {
238 if !self.connected {
239 let mut tx = self.new_tx(msg, toclient);
d541b3d4 240 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
241 self.transactions.push(tx);
242 return;
243 }
244 let pkt_id = subscribe.message_id as u32;
245 match msg.header.qos_level {
246 0 => {
247 // with QOS level 0, we do not need to wait for a
248 // response
249 let mut tx = self.new_tx(msg, toclient);
250 tx.complete = true;
251 self.transactions.push(tx);
252 },
253 1..=2 => {
254 let mut tx = self.new_tx(msg, toclient);
255 tx.pkt_id = Some(pkt_id);
256 self.transactions.push(tx);
257 },
258 _ => {
259 let mut tx = self.new_tx(msg, toclient);
d541b3d4 260 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
c3136007
SS
261 self.transactions.push(tx);
262 }
263 }
264 },
265 MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
266 if !self.connected {
267 let mut tx = self.new_tx(msg, toclient);
d541b3d4 268 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
269 self.transactions.push(tx);
270 return;
271 }
272 let pkt_id = unsubscribe.message_id as u32;
273 match msg.header.qos_level {
274 0 => {
275 // with QOS level 0, we do not need to wait for a
276 // response
277 let mut tx = self.new_tx(msg, toclient);
278 tx.complete = true;
279 self.transactions.push(tx);
280 },
281 1..=2 => {
282 let mut tx = self.new_tx(msg, toclient);
283 tx.pkt_id = Some(pkt_id);
284 self.transactions.push(tx);
285 },
286 _ => {
287 let mut tx = self.new_tx(msg, toclient);
d541b3d4 288 MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
c3136007
SS
289 self.transactions.push(tx);
290 }
291 }
292 },
293 MQTTOperation::CONNACK(ref _connack) => {
294 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
295 (*tx).msg.push(msg);
296 (*tx).complete = true;
297 (*tx).pkt_id = None;
298 self.connected = true;
299 } else {
300 let mut tx = self.new_tx(msg, toclient);
d541b3d4 301 MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
c3136007
SS
302 self.transactions.push(tx);
303 }
304 },
305 MQTTOperation::PUBREC(ref v)
306 | MQTTOperation::PUBREL(ref v) => {
307 if !self.connected {
308 let mut tx = self.new_tx(msg, toclient);
d541b3d4 309 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
310 self.transactions.push(tx);
311 return;
312 }
313 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
314 (*tx).msg.push(msg);
315 } else {
316 let mut tx = self.new_tx(msg, toclient);
d541b3d4 317 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
c3136007
SS
318 self.transactions.push(tx);
319 }
320 },
321 MQTTOperation::PUBACK(ref v)
322 | MQTTOperation::PUBCOMP(ref v) => {
323 if !self.connected {
324 let mut tx = self.new_tx(msg, toclient);
d541b3d4 325 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
326 self.transactions.push(tx);
327 return;
328 }
329 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
330 (*tx).msg.push(msg);
331 (*tx).complete = true;
332 (*tx).pkt_id = None;
333 } else {
334 let mut tx = self.new_tx(msg, toclient);
d541b3d4 335 MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
c3136007
SS
336 self.transactions.push(tx);
337 }
338 },
339 MQTTOperation::SUBACK(ref suback) => {
340 if !self.connected {
341 let mut tx = self.new_tx(msg, toclient);
d541b3d4 342 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
343 self.transactions.push(tx);
344 return;
345 }
346 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
347 (*tx).msg.push(msg);
348 (*tx).complete = true;
349 (*tx).pkt_id = None;
350 } else {
351 let mut tx = self.new_tx(msg, toclient);
d541b3d4 352 MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
c3136007
SS
353 self.transactions.push(tx);
354 }
355 },
356 MQTTOperation::UNSUBACK(ref unsuback) => {
357 if !self.connected {
358 let mut tx = self.new_tx(msg, toclient);
d541b3d4 359 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
360 self.transactions.push(tx);
361 return;
362 }
363 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
364 (*tx).msg.push(msg);
365 (*tx).complete = true;
366 (*tx).pkt_id = None;
367 } else {
368 let mut tx = self.new_tx(msg, toclient);
d541b3d4 369 MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
c3136007
SS
370 self.transactions.push(tx);
371 }
372 },
373 MQTTOperation::UNASSIGNED => {
374 let mut tx = self.new_tx(msg, toclient);
375 tx.complete = true;
8eac5fc2 376 MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
c3136007
SS
377 self.transactions.push(tx);
378 },
379 MQTTOperation::TRUNCATED(_) => {
380 let mut tx = self.new_tx(msg, toclient);
381 tx.complete = true;
382 self.transactions.push(tx);
383 },
384 MQTTOperation::AUTH(_)
385 | MQTTOperation::DISCONNECT(_) => {
386 if !self.connected {
387 let mut tx = self.new_tx(msg, toclient);
d541b3d4 388 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
389 self.transactions.push(tx);
390 return;
391 }
392 let mut tx = self.new_tx(msg, toclient);
393 tx.complete = true;
394 self.transactions.push(tx);
395 },
396 MQTTOperation::PINGREQ
397 | MQTTOperation::PINGRESP => {
398 if !self.connected {
399 let mut tx = self.new_tx(msg, toclient);
d541b3d4 400 MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
c3136007
SS
401 self.transactions.push(tx);
402 return;
403 }
404 let mut tx = self.new_tx(msg, toclient);
405 tx.complete = true;
406 self.transactions.push(tx);
407 }
408 }
409 }
410
411 fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
412 let mut current = input;
413 if input.len() == 0 {
414 return AppLayerResult::ok();
415 }
416
417 let mut consumed = 0;
418 SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
419 if self.skip_request > 0 {
420 if input.len() <= self.skip_request {
421 SCLogDebug!("reducing skip_request by {}", input.len());
422 self.skip_request -= input.len();
423 return AppLayerResult::ok();
424 } else {
425 current = &input[self.skip_request..];
426 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
427 consumed = self.skip_request;
428 self.skip_request = 0;
429 }
430 }
431
432
433 while current.len() > 0 {
434 let mut skipped = false;
435 SCLogDebug!("request: handling {}", current.len());
436 match parse_message(current, self.protocol_version, self.max_msg_len) {
437 Ok((rem, msg)) => {
438 SCLogDebug!("request msg {:?}", msg);
439 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
440 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
441 if trunc.skipped_length >= current.len() {
442 skipped = true;
443 self.skip_request = trunc.skipped_length - current.len();
444 } else {
445 current = &current[trunc.skipped_length..];
446 self.skip_request = 0;
447 }
448 }
449 self.handle_msg(msg, false);
450 if skipped {
451 return AppLayerResult::ok();
452 }
453 consumed += current.len() - rem.len();
454 current = rem;
455 }
8a584c21 456 Err(Err::Incomplete(_)) => {
c3136007
SS
457 SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
458 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
459 }
460 Err(_) => {
461 return AppLayerResult::err();
462 }
463 }
464 }
465
466 return AppLayerResult::ok();
467 }
468
469 fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
470 let mut current = input;
471 if input.len() == 0 {
472 return AppLayerResult::ok();
473 }
474
475 let mut consumed = 0;
476 SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
477 if self.skip_response > 0 {
478 if input.len() <= self.skip_response {
479 self.skip_response -= current.len();
480 return AppLayerResult::ok();
481 } else {
482 current = &input[self.skip_response..];
483 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
484 consumed = self.skip_response;
485 self.skip_response = 0;
486 }
487 }
488
489 while current.len() > 0 {
490 let mut skipped = false;
491 SCLogDebug!("response: handling {}", current.len());
492 match parse_message(current, self.protocol_version, self.max_msg_len as usize) {
493 Ok((rem, msg)) => {
494 SCLogDebug!("response msg {:?}", msg);
495 if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
496 SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
497 if trunc.skipped_length >= current.len() {
498 skipped = true;
499 self.skip_response = trunc.skipped_length - current.len();
500 } else {
501 current = &current[trunc.skipped_length..];
502 self.skip_response = 0;
503 }
504 SCLogDebug!("skip_response now {}", self.skip_response);
505 }
506 self.handle_msg(msg, true);
507 if skipped {
508 return AppLayerResult::ok();
509 }
510 consumed += current.len() - rem.len();
511 current = rem;
512 }
8a584c21 513 Err(Err::Incomplete(_)) => {
c3136007
SS
514 SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
515 return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
516 }
517 Err(_) => {
518 return AppLayerResult::err();
519 }
520 }
521 }
522
523 return AppLayerResult::ok();
524 }
525
526 fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
527 let ev = event as u8;
528 core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev);
529 }
c3136007
SS
530}
531
532// C exports.
533
c3136007 534#[no_mangle]
363b5f99 535pub unsafe extern "C" fn rs_mqtt_probing_parser(
c3136007
SS
536 _flow: *const Flow,
537 _direction: u8,
538 input: *const u8,
539 input_len: u32,
540 _rdir: *mut u8,
541) -> AppProto {
542 let buf = build_slice!(input, input_len as usize);
543 match parse_fixed_header(buf) {
544 Ok((_, hdr)) => {
545 // reject unassigned message type
546 if hdr.message_type == MQTTTypeCode::UNASSIGNED {
363b5f99 547 return ALPROTO_FAILED;
c3136007
SS
548 }
549 // with 2 being the highest valid QoS level
550 if hdr.qos_level > 2 {
363b5f99 551 return ALPROTO_FAILED;
c3136007 552 }
363b5f99 553 return ALPROTO_MQTT;
c3136007 554 },
8a584c21 555 Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
363b5f99 556 Err(_) => ALPROTO_FAILED
c3136007
SS
557 }
558}
559
560#[no_mangle]
547d6c2d 561pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
c3136007
SS
562 let state = MQTTState::new();
563 let boxed = Box::new(state);
53413f2d 564 return Box::into_raw(boxed) as *mut _;
c3136007
SS
565}
566
567#[no_mangle]
568pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
53413f2d 569 std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) });
c3136007
SS
570}
571
572#[no_mangle]
363b5f99 573pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
c3136007
SS
574 let state = cast_pointer!(state, MQTTState);
575 state.free_tx(tx_id);
576}
577
578#[no_mangle]
363b5f99 579pub unsafe extern "C" fn rs_mqtt_parse_request(
c3136007
SS
580 _flow: *const Flow,
581 state: *mut std::os::raw::c_void,
582 _pstate: *mut std::os::raw::c_void,
583 input: *const u8,
584 input_len: u32,
585 _data: *const std::os::raw::c_void,
586 _flags: u8,
587) -> AppLayerResult {
588 let state = cast_pointer!(state, MQTTState);
589 let buf = build_slice!(input, input_len as usize);
ac3a20b6 590 return state.parse_request(buf);
c3136007
SS
591}
592
593#[no_mangle]
363b5f99 594pub unsafe extern "C" fn rs_mqtt_parse_response(
c3136007
SS
595 _flow: *const Flow,
596 state: *mut std::os::raw::c_void,
597 _pstate: *mut std::os::raw::c_void,
598 input: *const u8,
599 input_len: u32,
600 _data: *const std::os::raw::c_void,
601 _flags: u8,
602) -> AppLayerResult {
603 let state = cast_pointer!(state, MQTTState);
604 let buf = build_slice!(input, input_len as usize);
ac3a20b6 605 return state.parse_response(buf);
c3136007
SS
606}
607
608#[no_mangle]
363b5f99 609pub unsafe extern "C" fn rs_mqtt_state_get_tx(
c3136007
SS
610 state: *mut std::os::raw::c_void,
611 tx_id: u64,
612) -> *mut std::os::raw::c_void {
613 let state = cast_pointer!(state, MQTTState);
614 match state.get_tx(tx_id) {
615 Some(tx) => {
53413f2d 616 return tx as *const _ as *mut _;
c3136007
SS
617 }
618 None => {
619 return std::ptr::null_mut();
620 }
621 }
622}
623
624#[no_mangle]
363b5f99 625pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
c3136007
SS
626 let state = cast_pointer!(state, MQTTState);
627 return state.tx_id;
628}
629
c3136007 630#[no_mangle]
363b5f99 631pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
c3136007
SS
632 let tx = cast_pointer!(tx, MQTTTransaction);
633 if tx.toclient {
634 return 1;
635 }
636 return 0;
637}
638
639#[no_mangle]
363b5f99 640pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
c3136007
SS
641 tx: *mut std::os::raw::c_void,
642 direction: u8,
643) -> std::os::raw::c_int {
644 let tx = cast_pointer!(tx, MQTTTransaction);
645 if tx.complete {
a7ac79be 646 if direction == Direction::ToServer.into() {
c3136007
SS
647 if tx.toserver {
648 return 1;
649 }
a7ac79be 650 } else if direction == Direction::ToClient.into() {
c3136007
SS
651 if tx.toclient {
652 return 1;
653 }
654 }
655 }
656 return 0;
657}
658
659#[no_mangle]
363b5f99 660pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
c3136007
SS
661 _state: *mut std::os::raw::c_void,
662 tx: *mut std::os::raw::c_void,
663) -> u32 {
664 let tx = cast_pointer!(tx, MQTTTransaction);
665 return tx.logged.get();
666}
667
668#[no_mangle]
363b5f99 669pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
c3136007
SS
670 _state: *mut std::os::raw::c_void,
671 tx: *mut std::os::raw::c_void,
672 logged: u32,
673) {
674 let tx = cast_pointer!(tx, MQTTTransaction);
675 tx.logged.set(logged);
676}
677
678#[no_mangle]
363b5f99 679pub unsafe extern "C" fn rs_mqtt_state_get_events(
c3136007
SS
680 tx: *mut std::os::raw::c_void,
681) -> *mut core::AppLayerDecoderEvents {
682 let tx = cast_pointer!(tx, MQTTTransaction);
683 return tx.events;
684}
685
c3136007
SS
686// Parser name as a C style string.
687const PARSER_NAME: &'static [u8] = b"mqtt\0";
688
689export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
690
691#[no_mangle]
692pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
693 let default_port = CString::new("[1883]").unwrap();
694 let max_msg_len = &mut MAX_MSG_LEN;
695 *max_msg_len = cfg_max_msg_len;
696 let parser = RustParser {
697 name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
698 default_port: default_port.as_ptr(),
699 ipproto: IPPROTO_TCP,
700 probe_ts: Some(rs_mqtt_probing_parser),
701 probe_tc: Some(rs_mqtt_probing_parser),
702 min_depth: 0,
703 max_depth: 16,
704 state_new: rs_mqtt_state_new,
705 state_free: rs_mqtt_state_free,
706 tx_free: rs_mqtt_state_tx_free,
707 parse_ts: rs_mqtt_parse_request,
708 parse_tc: rs_mqtt_parse_response,
709 get_tx_count: rs_mqtt_state_get_tx_count,
710 get_tx: rs_mqtt_state_get_tx,
efc9a7a3
VJ
711 tx_comp_st_ts: 1,
712 tx_comp_st_tc: 1,
c3136007 713 tx_get_progress: rs_mqtt_tx_get_alstate_progress,
c3136007 714 get_events: Some(rs_mqtt_state_get_events),
8eac5fc2
JI
715 get_eventinfo: Some(MQTTEvent::get_event_info),
716 get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id),
c3136007
SS
717 localstorage_new: None,
718 localstorage_free: None,
719 get_files: None,
b3354096 720 get_tx_iterator: Some(crate::applayer::state_get_tx_iterator::<MQTTState, MQTTTransaction>),
c3136007
SS
721 get_tx_data: rs_mqtt_get_tx_data,
722 apply_tx_config: None,
ff674d0c 723 flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS,
4da0d9bd 724 truncate: None,
c3136007
SS
725 };
726
727 let ip_proto_str = CString::new("tcp").unwrap();
728
729 if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
730 let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
731 ALPROTO_MQTT = alproto;
732 if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
733 let _ = AppLayerRegisterParser(&parser, alproto);
734 }
735 } else {
736 SCLogDebug!("Protocol detector and parser disabled for MQTT.");
737 }
738}