]>
Commit | Line | Data |
---|---|---|
c3136007 SS |
1 | /* Copyright (C) 2020 Open Information Security Foundation |
2 | * | |
3 | * You can copy, redistribute or modify this Program under the terms of | |
4 | * the GNU General Public License version 2 as published by the Free | |
5 | * Software Foundation. | |
6 | * | |
7 | * This program is distributed in the hope that it will be useful, | |
8 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
9 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
10 | * GNU General Public License for more details. | |
11 | * | |
12 | * You should have received a copy of the GNU General Public License | |
13 | * version 2 along with this program; if not, write to the Free Software | |
14 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | |
15 | * 02110-1301, USA. | |
16 | */ | |
17 | ||
18 | // written by Sascha Steinbiss <sascha@steinbiss.name> | |
19 | ||
20 | use super::mqtt_message::*; | |
21 | use super::parser::*; | |
22 | use crate::applayer::{self, LoggerFlags}; | |
23 | use crate::applayer::*; | |
a7ac79be | 24 | use crate::core::{self, *}; |
8a584c21 | 25 | use nom7::Err; |
c3136007 | 26 | use std; |
8eac5fc2 | 27 | use std::ffi::CString; |
c3136007 SS |
28 | |
29 | // Used as a special pseudo packet identifier to denote the first CONNECT | |
30 | // packet in a connection. Note that there is no risk of collision with a | |
31 | // parsed packet identifier because in the protocol these are only 16 bit | |
32 | // unsigned. | |
33 | const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX; | |
34 | // Maximum message length in bytes. If the length of a message exceeds | |
35 | // this value, it will be truncated. Default: 1MB. | |
36 | static mut MAX_MSG_LEN: u32 = 1048576; | |
37 | ||
38 | static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN; | |
39 | ||
8eac5fc2 | 40 | #[derive(FromPrimitive, Debug, AppLayerEvent)] |
c3136007 | 41 | pub enum MQTTEvent { |
8eac5fc2 | 42 | MissingConnect, |
c3136007 SS |
43 | MissingPublish, |
44 | MissingSubscribe, | |
45 | MissingUnsubscribe, | |
46 | DoubleConnect, | |
47 | UnintroducedMessage, | |
48 | InvalidQosLevel, | |
49 | MissingMsgId, | |
8eac5fc2 | 50 | UnassignedMsgType, |
c3136007 SS |
51 | } |
52 | ||
53 | #[derive(Debug)] | |
54 | pub struct MQTTTransaction { | |
55 | tx_id: u64, | |
56 | pkt_id: Option<u32>, | |
57 | pub msg: Vec<MQTTMessage>, | |
58 | complete: bool, | |
59 | toclient: bool, | |
60 | toserver: bool, | |
61 | ||
62 | logged: LoggerFlags, | |
c3136007 SS |
63 | events: *mut core::AppLayerDecoderEvents, |
64 | tx_data: applayer::AppLayerTxData, | |
65 | } | |
66 | ||
67 | impl MQTTTransaction { | |
68 | pub fn new(msg: MQTTMessage) -> MQTTTransaction { | |
69 | let mut m = MQTTTransaction { | |
70 | tx_id: 0, | |
71 | pkt_id: None, | |
72 | complete: false, | |
73 | logged: LoggerFlags::new(), | |
74 | msg: Vec::new(), | |
75 | toclient: false, | |
76 | toserver: false, | |
c3136007 SS |
77 | events: std::ptr::null_mut(), |
78 | tx_data: applayer::AppLayerTxData::new(), | |
79 | }; | |
80 | m.msg.push(msg); | |
81 | return m; | |
82 | } | |
83 | ||
84 | pub fn free(&mut self) { | |
922a453d | 85 | if !self.events.is_null() { |
c3136007 SS |
86 | core::sc_app_layer_decoder_events_free_events(&mut self.events); |
87 | } | |
c3136007 SS |
88 | } |
89 | } | |
90 | ||
91 | impl Drop for MQTTTransaction { | |
92 | fn drop(&mut self) { | |
93 | self.free(); | |
94 | } | |
95 | } | |
96 | ||
b3354096 JI |
97 | impl Transaction for MQTTTransaction { |
98 | fn id(&self) -> u64 { | |
99 | self.tx_id | |
100 | } | |
101 | } | |
102 | ||
c3136007 SS |
103 | pub struct MQTTState { |
104 | tx_id: u64, | |
105 | pub protocol_version: u8, | |
106 | transactions: Vec<MQTTTransaction>, | |
107 | connected: bool, | |
108 | skip_request: usize, | |
109 | skip_response: usize, | |
110 | max_msg_len: usize, | |
111 | } | |
112 | ||
b3354096 JI |
113 | impl State<MQTTTransaction> for MQTTState { |
114 | fn get_transactions(&self) -> &[MQTTTransaction] { | |
115 | &self.transactions | |
116 | } | |
117 | } | |
118 | ||
c3136007 SS |
119 | impl MQTTState { |
120 | pub fn new() -> Self { | |
121 | Self { | |
122 | tx_id: 0, | |
123 | protocol_version: 0, | |
124 | transactions: Vec::new(), | |
125 | connected: false, | |
126 | skip_request: 0, | |
127 | skip_response: 0, | |
128 | max_msg_len: unsafe { MAX_MSG_LEN as usize }, | |
129 | } | |
130 | } | |
131 | ||
132 | fn free_tx(&mut self, tx_id: u64) { | |
133 | let len = self.transactions.len(); | |
134 | let mut found = false; | |
135 | let mut index = 0; | |
136 | for i in 0..len { | |
137 | let tx = &self.transactions[i]; | |
138 | if tx.tx_id == tx_id + 1 { | |
139 | found = true; | |
140 | index = i; | |
141 | break; | |
142 | } | |
143 | } | |
144 | if found { | |
145 | self.transactions.remove(index); | |
146 | } | |
147 | } | |
148 | ||
149 | pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> { | |
150 | for tx in &mut self.transactions { | |
151 | if tx.tx_id == tx_id + 1 { | |
152 | return Some(tx); | |
153 | } | |
154 | } | |
155 | return None; | |
156 | } | |
157 | ||
158 | pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> { | |
159 | for tx in &mut self.transactions { | |
160 | if !tx.complete { | |
161 | if let Some(mpktid) = tx.pkt_id { | |
162 | if mpktid == pkt_id { | |
163 | return Some(tx); | |
164 | } | |
165 | } | |
166 | } | |
167 | } | |
168 | return None; | |
169 | } | |
170 | ||
171 | fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction { | |
172 | let mut tx = MQTTTransaction::new(msg); | |
173 | self.tx_id += 1; | |
174 | tx.tx_id = self.tx_id; | |
175 | if toclient { | |
176 | tx.toclient = true; | |
177 | } else { | |
178 | tx.toserver = true; | |
179 | } | |
180 | return tx; | |
181 | } | |
182 | ||
183 | // Handle a MQTT message depending on the direction and state. | |
184 | // Note that we are trying to only have one mutable reference to msg | |
185 | // and its components, however, since we are in a large match operation, | |
186 | // we cannot pass around and/or store more references or move things | |
187 | // without having to introduce lifetimes etc. | |
188 | // This is the reason for the code duplication below. Maybe there is a | |
189 | // more concise way to do it, but this works for now. | |
190 | fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) { | |
191 | match msg.op { | |
192 | MQTTOperation::CONNECT(ref conn) => { | |
193 | self.protocol_version = conn.protocol_version; | |
194 | if self.connected { | |
195 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 196 | MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect); |
c3136007 SS |
197 | self.transactions.push(tx); |
198 | } else { | |
199 | let mut tx = self.new_tx(msg, toclient); | |
200 | tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); | |
201 | self.transactions.push(tx); | |
202 | } | |
203 | }, | |
204 | MQTTOperation::PUBLISH(ref publish) => { | |
205 | if !self.connected { | |
206 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 207 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
208 | self.transactions.push(tx); |
209 | return; | |
210 | } | |
211 | match msg.header.qos_level { | |
212 | 0 => { | |
213 | // with QOS level 0, we do not need to wait for a | |
214 | // response | |
215 | let mut tx = self.new_tx(msg, toclient); | |
216 | tx.complete = true; | |
217 | self.transactions.push(tx); | |
218 | }, | |
219 | 1..=2 => { | |
220 | if let Some(pkt_id) = publish.message_id { | |
221 | let mut tx = self.new_tx(msg, toclient); | |
222 | tx.pkt_id = Some(pkt_id as u32); | |
223 | self.transactions.push(tx); | |
224 | } else { | |
225 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 226 | MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); |
c3136007 SS |
227 | self.transactions.push(tx); |
228 | } | |
229 | }, | |
230 | _ => { | |
231 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 232 | MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); |
c3136007 SS |
233 | self.transactions.push(tx); |
234 | } | |
235 | } | |
236 | }, | |
237 | MQTTOperation::SUBSCRIBE(ref subscribe) => { | |
238 | if !self.connected { | |
239 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 240 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
241 | self.transactions.push(tx); |
242 | return; | |
243 | } | |
244 | let pkt_id = subscribe.message_id as u32; | |
245 | match msg.header.qos_level { | |
246 | 0 => { | |
247 | // with QOS level 0, we do not need to wait for a | |
248 | // response | |
249 | let mut tx = self.new_tx(msg, toclient); | |
250 | tx.complete = true; | |
251 | self.transactions.push(tx); | |
252 | }, | |
253 | 1..=2 => { | |
254 | let mut tx = self.new_tx(msg, toclient); | |
255 | tx.pkt_id = Some(pkt_id); | |
256 | self.transactions.push(tx); | |
257 | }, | |
258 | _ => { | |
259 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 260 | MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); |
c3136007 SS |
261 | self.transactions.push(tx); |
262 | } | |
263 | } | |
264 | }, | |
265 | MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { | |
266 | if !self.connected { | |
267 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 268 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
269 | self.transactions.push(tx); |
270 | return; | |
271 | } | |
272 | let pkt_id = unsubscribe.message_id as u32; | |
273 | match msg.header.qos_level { | |
274 | 0 => { | |
275 | // with QOS level 0, we do not need to wait for a | |
276 | // response | |
277 | let mut tx = self.new_tx(msg, toclient); | |
278 | tx.complete = true; | |
279 | self.transactions.push(tx); | |
280 | }, | |
281 | 1..=2 => { | |
282 | let mut tx = self.new_tx(msg, toclient); | |
283 | tx.pkt_id = Some(pkt_id); | |
284 | self.transactions.push(tx); | |
285 | }, | |
286 | _ => { | |
287 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 288 | MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); |
c3136007 SS |
289 | self.transactions.push(tx); |
290 | } | |
291 | } | |
292 | }, | |
293 | MQTTOperation::CONNACK(ref _connack) => { | |
294 | if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { | |
295 | (*tx).msg.push(msg); | |
296 | (*tx).complete = true; | |
297 | (*tx).pkt_id = None; | |
298 | self.connected = true; | |
299 | } else { | |
300 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 301 | MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); |
c3136007 SS |
302 | self.transactions.push(tx); |
303 | } | |
304 | }, | |
305 | MQTTOperation::PUBREC(ref v) | |
306 | | MQTTOperation::PUBREL(ref v) => { | |
307 | if !self.connected { | |
308 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 309 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
310 | self.transactions.push(tx); |
311 | return; | |
312 | } | |
313 | if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { | |
314 | (*tx).msg.push(msg); | |
315 | } else { | |
316 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 317 | MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); |
c3136007 SS |
318 | self.transactions.push(tx); |
319 | } | |
320 | }, | |
321 | MQTTOperation::PUBACK(ref v) | |
322 | | MQTTOperation::PUBCOMP(ref v) => { | |
323 | if !self.connected { | |
324 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 325 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
326 | self.transactions.push(tx); |
327 | return; | |
328 | } | |
329 | if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { | |
330 | (*tx).msg.push(msg); | |
331 | (*tx).complete = true; | |
332 | (*tx).pkt_id = None; | |
333 | } else { | |
334 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 335 | MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); |
c3136007 SS |
336 | self.transactions.push(tx); |
337 | } | |
338 | }, | |
339 | MQTTOperation::SUBACK(ref suback) => { | |
340 | if !self.connected { | |
341 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 342 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
343 | self.transactions.push(tx); |
344 | return; | |
345 | } | |
346 | if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) { | |
347 | (*tx).msg.push(msg); | |
348 | (*tx).complete = true; | |
349 | (*tx).pkt_id = None; | |
350 | } else { | |
351 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 352 | MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); |
c3136007 SS |
353 | self.transactions.push(tx); |
354 | } | |
355 | }, | |
356 | MQTTOperation::UNSUBACK(ref unsuback) => { | |
357 | if !self.connected { | |
358 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 359 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
360 | self.transactions.push(tx); |
361 | return; | |
362 | } | |
363 | if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) { | |
364 | (*tx).msg.push(msg); | |
365 | (*tx).complete = true; | |
366 | (*tx).pkt_id = None; | |
367 | } else { | |
368 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 369 | MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); |
c3136007 SS |
370 | self.transactions.push(tx); |
371 | } | |
372 | }, | |
373 | MQTTOperation::UNASSIGNED => { | |
374 | let mut tx = self.new_tx(msg, toclient); | |
375 | tx.complete = true; | |
8eac5fc2 | 376 | MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType); |
c3136007 SS |
377 | self.transactions.push(tx); |
378 | }, | |
379 | MQTTOperation::TRUNCATED(_) => { | |
380 | let mut tx = self.new_tx(msg, toclient); | |
381 | tx.complete = true; | |
382 | self.transactions.push(tx); | |
383 | }, | |
384 | MQTTOperation::AUTH(_) | |
385 | | MQTTOperation::DISCONNECT(_) => { | |
386 | if !self.connected { | |
387 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 388 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
389 | self.transactions.push(tx); |
390 | return; | |
391 | } | |
392 | let mut tx = self.new_tx(msg, toclient); | |
393 | tx.complete = true; | |
394 | self.transactions.push(tx); | |
395 | }, | |
396 | MQTTOperation::PINGREQ | |
397 | | MQTTOperation::PINGRESP => { | |
398 | if !self.connected { | |
399 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 400 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
401 | self.transactions.push(tx); |
402 | return; | |
403 | } | |
404 | let mut tx = self.new_tx(msg, toclient); | |
405 | tx.complete = true; | |
406 | self.transactions.push(tx); | |
407 | } | |
408 | } | |
409 | } | |
410 | ||
411 | fn parse_request(&mut self, input: &[u8]) -> AppLayerResult { | |
412 | let mut current = input; | |
413 | if input.len() == 0 { | |
414 | return AppLayerResult::ok(); | |
415 | } | |
416 | ||
417 | let mut consumed = 0; | |
418 | SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len()); | |
419 | if self.skip_request > 0 { | |
420 | if input.len() <= self.skip_request { | |
421 | SCLogDebug!("reducing skip_request by {}", input.len()); | |
422 | self.skip_request -= input.len(); | |
423 | return AppLayerResult::ok(); | |
424 | } else { | |
425 | current = &input[self.skip_request..]; | |
426 | SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); | |
427 | consumed = self.skip_request; | |
428 | self.skip_request = 0; | |
429 | } | |
430 | } | |
431 | ||
432 | ||
433 | while current.len() > 0 { | |
434 | let mut skipped = false; | |
435 | SCLogDebug!("request: handling {}", current.len()); | |
436 | match parse_message(current, self.protocol_version, self.max_msg_len) { | |
437 | Ok((rem, msg)) => { | |
438 | SCLogDebug!("request msg {:?}", msg); | |
439 | if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { | |
440 | SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); | |
441 | if trunc.skipped_length >= current.len() { | |
442 | skipped = true; | |
443 | self.skip_request = trunc.skipped_length - current.len(); | |
444 | } else { | |
445 | current = ¤t[trunc.skipped_length..]; | |
446 | self.skip_request = 0; | |
447 | } | |
448 | } | |
449 | self.handle_msg(msg, false); | |
450 | if skipped { | |
451 | return AppLayerResult::ok(); | |
452 | } | |
453 | consumed += current.len() - rem.len(); | |
454 | current = rem; | |
455 | } | |
8a584c21 | 456 | Err(Err::Incomplete(_)) => { |
c3136007 SS |
457 | SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); |
458 | return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); | |
459 | } | |
460 | Err(_) => { | |
461 | return AppLayerResult::err(); | |
462 | } | |
463 | } | |
464 | } | |
465 | ||
466 | return AppLayerResult::ok(); | |
467 | } | |
468 | ||
469 | fn parse_response(&mut self, input: &[u8]) -> AppLayerResult { | |
470 | let mut current = input; | |
471 | if input.len() == 0 { | |
472 | return AppLayerResult::ok(); | |
473 | } | |
474 | ||
475 | let mut consumed = 0; | |
476 | SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len()); | |
477 | if self.skip_response > 0 { | |
478 | if input.len() <= self.skip_response { | |
479 | self.skip_response -= current.len(); | |
480 | return AppLayerResult::ok(); | |
481 | } else { | |
482 | current = &input[self.skip_response..]; | |
483 | SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); | |
484 | consumed = self.skip_response; | |
485 | self.skip_response = 0; | |
486 | } | |
487 | } | |
488 | ||
489 | while current.len() > 0 { | |
490 | let mut skipped = false; | |
491 | SCLogDebug!("response: handling {}", current.len()); | |
492 | match parse_message(current, self.protocol_version, self.max_msg_len as usize) { | |
493 | Ok((rem, msg)) => { | |
494 | SCLogDebug!("response msg {:?}", msg); | |
495 | if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { | |
496 | SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); | |
497 | if trunc.skipped_length >= current.len() { | |
498 | skipped = true; | |
499 | self.skip_response = trunc.skipped_length - current.len(); | |
500 | } else { | |
501 | current = ¤t[trunc.skipped_length..]; | |
502 | self.skip_response = 0; | |
503 | } | |
504 | SCLogDebug!("skip_response now {}", self.skip_response); | |
505 | } | |
506 | self.handle_msg(msg, true); | |
507 | if skipped { | |
508 | return AppLayerResult::ok(); | |
509 | } | |
510 | consumed += current.len() - rem.len(); | |
511 | current = rem; | |
512 | } | |
8a584c21 | 513 | Err(Err::Incomplete(_)) => { |
c3136007 SS |
514 | SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); |
515 | return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); | |
516 | } | |
517 | Err(_) => { | |
518 | return AppLayerResult::err(); | |
519 | } | |
520 | } | |
521 | } | |
522 | ||
523 | return AppLayerResult::ok(); | |
524 | } | |
525 | ||
526 | fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) { | |
527 | let ev = event as u8; | |
528 | core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev); | |
529 | } | |
c3136007 SS |
530 | } |
531 | ||
532 | // C exports. | |
533 | ||
c3136007 | 534 | #[no_mangle] |
363b5f99 | 535 | pub unsafe extern "C" fn rs_mqtt_probing_parser( |
c3136007 SS |
536 | _flow: *const Flow, |
537 | _direction: u8, | |
538 | input: *const u8, | |
539 | input_len: u32, | |
540 | _rdir: *mut u8, | |
541 | ) -> AppProto { | |
542 | let buf = build_slice!(input, input_len as usize); | |
543 | match parse_fixed_header(buf) { | |
544 | Ok((_, hdr)) => { | |
545 | // reject unassigned message type | |
546 | if hdr.message_type == MQTTTypeCode::UNASSIGNED { | |
363b5f99 | 547 | return ALPROTO_FAILED; |
c3136007 SS |
548 | } |
549 | // with 2 being the highest valid QoS level | |
550 | if hdr.qos_level > 2 { | |
363b5f99 | 551 | return ALPROTO_FAILED; |
c3136007 | 552 | } |
363b5f99 | 553 | return ALPROTO_MQTT; |
c3136007 | 554 | }, |
8a584c21 | 555 | Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN, |
363b5f99 | 556 | Err(_) => ALPROTO_FAILED |
c3136007 SS |
557 | } |
558 | } | |
559 | ||
560 | #[no_mangle] | |
547d6c2d | 561 | pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void { |
c3136007 SS |
562 | let state = MQTTState::new(); |
563 | let boxed = Box::new(state); | |
53413f2d | 564 | return Box::into_raw(boxed) as *mut _; |
c3136007 SS |
565 | } |
566 | ||
567 | #[no_mangle] | |
568 | pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) { | |
53413f2d | 569 | std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) }); |
c3136007 SS |
570 | } |
571 | ||
572 | #[no_mangle] | |
363b5f99 | 573 | pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) { |
c3136007 SS |
574 | let state = cast_pointer!(state, MQTTState); |
575 | state.free_tx(tx_id); | |
576 | } | |
577 | ||
578 | #[no_mangle] | |
363b5f99 | 579 | pub unsafe extern "C" fn rs_mqtt_parse_request( |
c3136007 SS |
580 | _flow: *const Flow, |
581 | state: *mut std::os::raw::c_void, | |
582 | _pstate: *mut std::os::raw::c_void, | |
583 | input: *const u8, | |
584 | input_len: u32, | |
585 | _data: *const std::os::raw::c_void, | |
586 | _flags: u8, | |
587 | ) -> AppLayerResult { | |
588 | let state = cast_pointer!(state, MQTTState); | |
589 | let buf = build_slice!(input, input_len as usize); | |
ac3a20b6 | 590 | return state.parse_request(buf); |
c3136007 SS |
591 | } |
592 | ||
593 | #[no_mangle] | |
363b5f99 | 594 | pub unsafe extern "C" fn rs_mqtt_parse_response( |
c3136007 SS |
595 | _flow: *const Flow, |
596 | state: *mut std::os::raw::c_void, | |
597 | _pstate: *mut std::os::raw::c_void, | |
598 | input: *const u8, | |
599 | input_len: u32, | |
600 | _data: *const std::os::raw::c_void, | |
601 | _flags: u8, | |
602 | ) -> AppLayerResult { | |
603 | let state = cast_pointer!(state, MQTTState); | |
604 | let buf = build_slice!(input, input_len as usize); | |
ac3a20b6 | 605 | return state.parse_response(buf); |
c3136007 SS |
606 | } |
607 | ||
608 | #[no_mangle] | |
363b5f99 | 609 | pub unsafe extern "C" fn rs_mqtt_state_get_tx( |
c3136007 SS |
610 | state: *mut std::os::raw::c_void, |
611 | tx_id: u64, | |
612 | ) -> *mut std::os::raw::c_void { | |
613 | let state = cast_pointer!(state, MQTTState); | |
614 | match state.get_tx(tx_id) { | |
615 | Some(tx) => { | |
53413f2d | 616 | return tx as *const _ as *mut _; |
c3136007 SS |
617 | } |
618 | None => { | |
619 | return std::ptr::null_mut(); | |
620 | } | |
621 | } | |
622 | } | |
623 | ||
624 | #[no_mangle] | |
363b5f99 | 625 | pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 { |
c3136007 SS |
626 | let state = cast_pointer!(state, MQTTState); |
627 | return state.tx_id; | |
628 | } | |
629 | ||
c3136007 | 630 | #[no_mangle] |
363b5f99 | 631 | pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int { |
c3136007 SS |
632 | let tx = cast_pointer!(tx, MQTTTransaction); |
633 | if tx.toclient { | |
634 | return 1; | |
635 | } | |
636 | return 0; | |
637 | } | |
638 | ||
639 | #[no_mangle] | |
363b5f99 | 640 | pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress( |
c3136007 SS |
641 | tx: *mut std::os::raw::c_void, |
642 | direction: u8, | |
643 | ) -> std::os::raw::c_int { | |
644 | let tx = cast_pointer!(tx, MQTTTransaction); | |
645 | if tx.complete { | |
a7ac79be | 646 | if direction == Direction::ToServer.into() { |
c3136007 SS |
647 | if tx.toserver { |
648 | return 1; | |
649 | } | |
a7ac79be | 650 | } else if direction == Direction::ToClient.into() { |
c3136007 SS |
651 | if tx.toclient { |
652 | return 1; | |
653 | } | |
654 | } | |
655 | } | |
656 | return 0; | |
657 | } | |
658 | ||
659 | #[no_mangle] | |
363b5f99 | 660 | pub unsafe extern "C" fn rs_mqtt_tx_get_logged( |
c3136007 SS |
661 | _state: *mut std::os::raw::c_void, |
662 | tx: *mut std::os::raw::c_void, | |
663 | ) -> u32 { | |
664 | let tx = cast_pointer!(tx, MQTTTransaction); | |
665 | return tx.logged.get(); | |
666 | } | |
667 | ||
668 | #[no_mangle] | |
363b5f99 | 669 | pub unsafe extern "C" fn rs_mqtt_tx_set_logged( |
c3136007 SS |
670 | _state: *mut std::os::raw::c_void, |
671 | tx: *mut std::os::raw::c_void, | |
672 | logged: u32, | |
673 | ) { | |
674 | let tx = cast_pointer!(tx, MQTTTransaction); | |
675 | tx.logged.set(logged); | |
676 | } | |
677 | ||
678 | #[no_mangle] | |
363b5f99 | 679 | pub unsafe extern "C" fn rs_mqtt_state_get_events( |
c3136007 SS |
680 | tx: *mut std::os::raw::c_void, |
681 | ) -> *mut core::AppLayerDecoderEvents { | |
682 | let tx = cast_pointer!(tx, MQTTTransaction); | |
683 | return tx.events; | |
684 | } | |
685 | ||
c3136007 SS |
686 | // Parser name as a C style string. |
687 | const PARSER_NAME: &'static [u8] = b"mqtt\0"; | |
688 | ||
689 | export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction); | |
690 | ||
691 | #[no_mangle] | |
692 | pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { | |
693 | let default_port = CString::new("[1883]").unwrap(); | |
694 | let max_msg_len = &mut MAX_MSG_LEN; | |
695 | *max_msg_len = cfg_max_msg_len; | |
696 | let parser = RustParser { | |
697 | name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char, | |
698 | default_port: default_port.as_ptr(), | |
699 | ipproto: IPPROTO_TCP, | |
700 | probe_ts: Some(rs_mqtt_probing_parser), | |
701 | probe_tc: Some(rs_mqtt_probing_parser), | |
702 | min_depth: 0, | |
703 | max_depth: 16, | |
704 | state_new: rs_mqtt_state_new, | |
705 | state_free: rs_mqtt_state_free, | |
706 | tx_free: rs_mqtt_state_tx_free, | |
707 | parse_ts: rs_mqtt_parse_request, | |
708 | parse_tc: rs_mqtt_parse_response, | |
709 | get_tx_count: rs_mqtt_state_get_tx_count, | |
710 | get_tx: rs_mqtt_state_get_tx, | |
efc9a7a3 VJ |
711 | tx_comp_st_ts: 1, |
712 | tx_comp_st_tc: 1, | |
c3136007 | 713 | tx_get_progress: rs_mqtt_tx_get_alstate_progress, |
c3136007 | 714 | get_events: Some(rs_mqtt_state_get_events), |
8eac5fc2 JI |
715 | get_eventinfo: Some(MQTTEvent::get_event_info), |
716 | get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id), | |
c3136007 SS |
717 | localstorage_new: None, |
718 | localstorage_free: None, | |
719 | get_files: None, | |
b3354096 | 720 | get_tx_iterator: Some(crate::applayer::state_get_tx_iterator::<MQTTState, MQTTTransaction>), |
c3136007 SS |
721 | get_tx_data: rs_mqtt_get_tx_data, |
722 | apply_tx_config: None, | |
ff674d0c | 723 | flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS, |
4da0d9bd | 724 | truncate: None, |
c3136007 SS |
725 | }; |
726 | ||
727 | let ip_proto_str = CString::new("tcp").unwrap(); | |
728 | ||
729 | if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 { | |
730 | let alproto = AppLayerRegisterProtocolDetection(&parser, 1); | |
731 | ALPROTO_MQTT = alproto; | |
732 | if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 { | |
733 | let _ = AppLayerRegisterParser(&parser, alproto); | |
734 | } | |
735 | } else { | |
736 | SCLogDebug!("Protocol detector and parser disabled for MQTT."); | |
737 | } | |
738 | } |