]>
Commit | Line | Data |
---|---|---|
c3136007 SS |
1 | /* Copyright (C) 2020 Open Information Security Foundation |
2 | * | |
3 | * You can copy, redistribute or modify this Program under the terms of | |
4 | * the GNU General Public License version 2 as published by the Free | |
5 | * Software Foundation. | |
6 | * | |
7 | * This program is distributed in the hope that it will be useful, | |
8 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
9 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
10 | * GNU General Public License for more details. | |
11 | * | |
12 | * You should have received a copy of the GNU General Public License | |
13 | * version 2 along with this program; if not, write to the Free Software | |
14 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | |
15 | * 02110-1301, USA. | |
16 | */ | |
17 | ||
18 | // written by Sascha Steinbiss <sascha@steinbiss.name> | |
19 | ||
20 | use super::mqtt_message::*; | |
21 | use super::parser::*; | |
22 | use crate::applayer::{self, LoggerFlags}; | |
23 | use crate::applayer::*; | |
24 | use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP}; | |
c3136007 SS |
25 | use nom; |
26 | use std; | |
8eac5fc2 | 27 | use std::ffi::CString; |
c3136007 SS |
28 | |
29 | // Used as a special pseudo packet identifier to denote the first CONNECT | |
30 | // packet in a connection. Note that there is no risk of collision with a | |
31 | // parsed packet identifier because in the protocol these are only 16 bit | |
32 | // unsigned. | |
33 | const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX; | |
34 | // Maximum message length in bytes. If the length of a message exceeds | |
35 | // this value, it will be truncated. Default: 1MB. | |
36 | static mut MAX_MSG_LEN: u32 = 1048576; | |
37 | ||
38 | static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN; | |
39 | ||
8eac5fc2 | 40 | #[derive(FromPrimitive, Debug, AppLayerEvent)] |
c3136007 | 41 | pub enum MQTTEvent { |
8eac5fc2 | 42 | MissingConnect, |
c3136007 SS |
43 | MissingPublish, |
44 | MissingSubscribe, | |
45 | MissingUnsubscribe, | |
46 | DoubleConnect, | |
47 | UnintroducedMessage, | |
48 | InvalidQosLevel, | |
49 | MissingMsgId, | |
8eac5fc2 | 50 | UnassignedMsgType, |
c3136007 SS |
51 | } |
52 | ||
53 | #[derive(Debug)] | |
54 | pub struct MQTTTransaction { | |
55 | tx_id: u64, | |
56 | pkt_id: Option<u32>, | |
57 | pub msg: Vec<MQTTMessage>, | |
58 | complete: bool, | |
59 | toclient: bool, | |
60 | toserver: bool, | |
61 | ||
62 | logged: LoggerFlags, | |
63 | de_state: Option<*mut core::DetectEngineState>, | |
64 | events: *mut core::AppLayerDecoderEvents, | |
65 | tx_data: applayer::AppLayerTxData, | |
66 | } | |
67 | ||
68 | impl MQTTTransaction { | |
69 | pub fn new(msg: MQTTMessage) -> MQTTTransaction { | |
70 | let mut m = MQTTTransaction { | |
71 | tx_id: 0, | |
72 | pkt_id: None, | |
73 | complete: false, | |
74 | logged: LoggerFlags::new(), | |
75 | msg: Vec::new(), | |
76 | toclient: false, | |
77 | toserver: false, | |
78 | de_state: None, | |
79 | events: std::ptr::null_mut(), | |
80 | tx_data: applayer::AppLayerTxData::new(), | |
81 | }; | |
82 | m.msg.push(msg); | |
83 | return m; | |
84 | } | |
85 | ||
86 | pub fn free(&mut self) { | |
922a453d | 87 | if !self.events.is_null() { |
c3136007 SS |
88 | core::sc_app_layer_decoder_events_free_events(&mut self.events); |
89 | } | |
90 | if let Some(state) = self.de_state { | |
91 | core::sc_detect_engine_state_free(state); | |
92 | } | |
93 | } | |
94 | } | |
95 | ||
96 | impl Drop for MQTTTransaction { | |
97 | fn drop(&mut self) { | |
98 | self.free(); | |
99 | } | |
100 | } | |
101 | ||
102 | pub struct MQTTState { | |
103 | tx_id: u64, | |
104 | pub protocol_version: u8, | |
105 | transactions: Vec<MQTTTransaction>, | |
106 | connected: bool, | |
107 | skip_request: usize, | |
108 | skip_response: usize, | |
109 | max_msg_len: usize, | |
110 | } | |
111 | ||
112 | impl MQTTState { | |
113 | pub fn new() -> Self { | |
114 | Self { | |
115 | tx_id: 0, | |
116 | protocol_version: 0, | |
117 | transactions: Vec::new(), | |
118 | connected: false, | |
119 | skip_request: 0, | |
120 | skip_response: 0, | |
121 | max_msg_len: unsafe { MAX_MSG_LEN as usize }, | |
122 | } | |
123 | } | |
124 | ||
125 | fn free_tx(&mut self, tx_id: u64) { | |
126 | let len = self.transactions.len(); | |
127 | let mut found = false; | |
128 | let mut index = 0; | |
129 | for i in 0..len { | |
130 | let tx = &self.transactions[i]; | |
131 | if tx.tx_id == tx_id + 1 { | |
132 | found = true; | |
133 | index = i; | |
134 | break; | |
135 | } | |
136 | } | |
137 | if found { | |
138 | self.transactions.remove(index); | |
139 | } | |
140 | } | |
141 | ||
142 | pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> { | |
143 | for tx in &mut self.transactions { | |
144 | if tx.tx_id == tx_id + 1 { | |
145 | return Some(tx); | |
146 | } | |
147 | } | |
148 | return None; | |
149 | } | |
150 | ||
151 | pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> { | |
152 | for tx in &mut self.transactions { | |
153 | if !tx.complete { | |
154 | if let Some(mpktid) = tx.pkt_id { | |
155 | if mpktid == pkt_id { | |
156 | return Some(tx); | |
157 | } | |
158 | } | |
159 | } | |
160 | } | |
161 | return None; | |
162 | } | |
163 | ||
164 | fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction { | |
165 | let mut tx = MQTTTransaction::new(msg); | |
166 | self.tx_id += 1; | |
167 | tx.tx_id = self.tx_id; | |
168 | if toclient { | |
169 | tx.toclient = true; | |
170 | } else { | |
171 | tx.toserver = true; | |
172 | } | |
173 | return tx; | |
174 | } | |
175 | ||
176 | // Handle a MQTT message depending on the direction and state. | |
177 | // Note that we are trying to only have one mutable reference to msg | |
178 | // and its components, however, since we are in a large match operation, | |
179 | // we cannot pass around and/or store more references or move things | |
180 | // without having to introduce lifetimes etc. | |
181 | // This is the reason for the code duplication below. Maybe there is a | |
182 | // more concise way to do it, but this works for now. | |
183 | fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) { | |
184 | match msg.op { | |
185 | MQTTOperation::CONNECT(ref conn) => { | |
186 | self.protocol_version = conn.protocol_version; | |
187 | if self.connected { | |
188 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 189 | MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect); |
c3136007 SS |
190 | self.transactions.push(tx); |
191 | } else { | |
192 | let mut tx = self.new_tx(msg, toclient); | |
193 | tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); | |
194 | self.transactions.push(tx); | |
195 | } | |
196 | }, | |
197 | MQTTOperation::PUBLISH(ref publish) => { | |
198 | if !self.connected { | |
199 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 200 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
201 | self.transactions.push(tx); |
202 | return; | |
203 | } | |
204 | match msg.header.qos_level { | |
205 | 0 => { | |
206 | // with QOS level 0, we do not need to wait for a | |
207 | // response | |
208 | let mut tx = self.new_tx(msg, toclient); | |
209 | tx.complete = true; | |
210 | self.transactions.push(tx); | |
211 | }, | |
212 | 1..=2 => { | |
213 | if let Some(pkt_id) = publish.message_id { | |
214 | let mut tx = self.new_tx(msg, toclient); | |
215 | tx.pkt_id = Some(pkt_id as u32); | |
216 | self.transactions.push(tx); | |
217 | } else { | |
218 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 219 | MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); |
c3136007 SS |
220 | self.transactions.push(tx); |
221 | } | |
222 | }, | |
223 | _ => { | |
224 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 225 | MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); |
c3136007 SS |
226 | self.transactions.push(tx); |
227 | } | |
228 | } | |
229 | }, | |
230 | MQTTOperation::SUBSCRIBE(ref subscribe) => { | |
231 | if !self.connected { | |
232 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 233 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
234 | self.transactions.push(tx); |
235 | return; | |
236 | } | |
237 | let pkt_id = subscribe.message_id as u32; | |
238 | match msg.header.qos_level { | |
239 | 0 => { | |
240 | // with QOS level 0, we do not need to wait for a | |
241 | // response | |
242 | let mut tx = self.new_tx(msg, toclient); | |
243 | tx.complete = true; | |
244 | self.transactions.push(tx); | |
245 | }, | |
246 | 1..=2 => { | |
247 | let mut tx = self.new_tx(msg, toclient); | |
248 | tx.pkt_id = Some(pkt_id); | |
249 | self.transactions.push(tx); | |
250 | }, | |
251 | _ => { | |
252 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 253 | MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); |
c3136007 SS |
254 | self.transactions.push(tx); |
255 | } | |
256 | } | |
257 | }, | |
258 | MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { | |
259 | if !self.connected { | |
260 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 261 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
262 | self.transactions.push(tx); |
263 | return; | |
264 | } | |
265 | let pkt_id = unsubscribe.message_id as u32; | |
266 | match msg.header.qos_level { | |
267 | 0 => { | |
268 | // with QOS level 0, we do not need to wait for a | |
269 | // response | |
270 | let mut tx = self.new_tx(msg, toclient); | |
271 | tx.complete = true; | |
272 | self.transactions.push(tx); | |
273 | }, | |
274 | 1..=2 => { | |
275 | let mut tx = self.new_tx(msg, toclient); | |
276 | tx.pkt_id = Some(pkt_id); | |
277 | self.transactions.push(tx); | |
278 | }, | |
279 | _ => { | |
280 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 281 | MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); |
c3136007 SS |
282 | self.transactions.push(tx); |
283 | } | |
284 | } | |
285 | }, | |
286 | MQTTOperation::CONNACK(ref _connack) => { | |
287 | if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { | |
288 | (*tx).msg.push(msg); | |
289 | (*tx).complete = true; | |
290 | (*tx).pkt_id = None; | |
291 | self.connected = true; | |
292 | } else { | |
293 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 294 | MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); |
c3136007 SS |
295 | self.transactions.push(tx); |
296 | } | |
297 | }, | |
298 | MQTTOperation::PUBREC(ref v) | |
299 | | MQTTOperation::PUBREL(ref v) => { | |
300 | if !self.connected { | |
301 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 302 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
303 | self.transactions.push(tx); |
304 | return; | |
305 | } | |
306 | if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { | |
307 | (*tx).msg.push(msg); | |
308 | } else { | |
309 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 310 | MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); |
c3136007 SS |
311 | self.transactions.push(tx); |
312 | } | |
313 | }, | |
314 | MQTTOperation::PUBACK(ref v) | |
315 | | MQTTOperation::PUBCOMP(ref v) => { | |
316 | if !self.connected { | |
317 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 318 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
319 | self.transactions.push(tx); |
320 | return; | |
321 | } | |
322 | if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { | |
323 | (*tx).msg.push(msg); | |
324 | (*tx).complete = true; | |
325 | (*tx).pkt_id = None; | |
326 | } else { | |
327 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 328 | MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); |
c3136007 SS |
329 | self.transactions.push(tx); |
330 | } | |
331 | }, | |
332 | MQTTOperation::SUBACK(ref suback) => { | |
333 | if !self.connected { | |
334 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 335 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
336 | self.transactions.push(tx); |
337 | return; | |
338 | } | |
339 | if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) { | |
340 | (*tx).msg.push(msg); | |
341 | (*tx).complete = true; | |
342 | (*tx).pkt_id = None; | |
343 | } else { | |
344 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 345 | MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); |
c3136007 SS |
346 | self.transactions.push(tx); |
347 | } | |
348 | }, | |
349 | MQTTOperation::UNSUBACK(ref unsuback) => { | |
350 | if !self.connected { | |
351 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 352 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
353 | self.transactions.push(tx); |
354 | return; | |
355 | } | |
356 | if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) { | |
357 | (*tx).msg.push(msg); | |
358 | (*tx).complete = true; | |
359 | (*tx).pkt_id = None; | |
360 | } else { | |
361 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 362 | MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); |
c3136007 SS |
363 | self.transactions.push(tx); |
364 | } | |
365 | }, | |
366 | MQTTOperation::UNASSIGNED => { | |
367 | let mut tx = self.new_tx(msg, toclient); | |
368 | tx.complete = true; | |
8eac5fc2 | 369 | MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType); |
c3136007 SS |
370 | self.transactions.push(tx); |
371 | }, | |
372 | MQTTOperation::TRUNCATED(_) => { | |
373 | let mut tx = self.new_tx(msg, toclient); | |
374 | tx.complete = true; | |
375 | self.transactions.push(tx); | |
376 | }, | |
377 | MQTTOperation::AUTH(_) | |
378 | | MQTTOperation::DISCONNECT(_) => { | |
379 | if !self.connected { | |
380 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 381 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
382 | self.transactions.push(tx); |
383 | return; | |
384 | } | |
385 | let mut tx = self.new_tx(msg, toclient); | |
386 | tx.complete = true; | |
387 | self.transactions.push(tx); | |
388 | }, | |
389 | MQTTOperation::PINGREQ | |
390 | | MQTTOperation::PINGRESP => { | |
391 | if !self.connected { | |
392 | let mut tx = self.new_tx(msg, toclient); | |
d541b3d4 | 393 | MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); |
c3136007 SS |
394 | self.transactions.push(tx); |
395 | return; | |
396 | } | |
397 | let mut tx = self.new_tx(msg, toclient); | |
398 | tx.complete = true; | |
399 | self.transactions.push(tx); | |
400 | } | |
401 | } | |
402 | } | |
403 | ||
404 | fn parse_request(&mut self, input: &[u8]) -> AppLayerResult { | |
405 | let mut current = input; | |
406 | if input.len() == 0 { | |
407 | return AppLayerResult::ok(); | |
408 | } | |
409 | ||
410 | let mut consumed = 0; | |
411 | SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len()); | |
412 | if self.skip_request > 0 { | |
413 | if input.len() <= self.skip_request { | |
414 | SCLogDebug!("reducing skip_request by {}", input.len()); | |
415 | self.skip_request -= input.len(); | |
416 | return AppLayerResult::ok(); | |
417 | } else { | |
418 | current = &input[self.skip_request..]; | |
419 | SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); | |
420 | consumed = self.skip_request; | |
421 | self.skip_request = 0; | |
422 | } | |
423 | } | |
424 | ||
425 | ||
426 | while current.len() > 0 { | |
427 | let mut skipped = false; | |
428 | SCLogDebug!("request: handling {}", current.len()); | |
429 | match parse_message(current, self.protocol_version, self.max_msg_len) { | |
430 | Ok((rem, msg)) => { | |
431 | SCLogDebug!("request msg {:?}", msg); | |
432 | if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { | |
433 | SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); | |
434 | if trunc.skipped_length >= current.len() { | |
435 | skipped = true; | |
436 | self.skip_request = trunc.skipped_length - current.len(); | |
437 | } else { | |
438 | current = ¤t[trunc.skipped_length..]; | |
439 | self.skip_request = 0; | |
440 | } | |
441 | } | |
442 | self.handle_msg(msg, false); | |
443 | if skipped { | |
444 | return AppLayerResult::ok(); | |
445 | } | |
446 | consumed += current.len() - rem.len(); | |
447 | current = rem; | |
448 | } | |
449 | Err(nom::Err::Incomplete(_)) => { | |
450 | SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); | |
451 | return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); | |
452 | } | |
453 | Err(_) => { | |
454 | return AppLayerResult::err(); | |
455 | } | |
456 | } | |
457 | } | |
458 | ||
459 | return AppLayerResult::ok(); | |
460 | } | |
461 | ||
462 | fn parse_response(&mut self, input: &[u8]) -> AppLayerResult { | |
463 | let mut current = input; | |
464 | if input.len() == 0 { | |
465 | return AppLayerResult::ok(); | |
466 | } | |
467 | ||
468 | let mut consumed = 0; | |
469 | SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len()); | |
470 | if self.skip_response > 0 { | |
471 | if input.len() <= self.skip_response { | |
472 | self.skip_response -= current.len(); | |
473 | return AppLayerResult::ok(); | |
474 | } else { | |
475 | current = &input[self.skip_response..]; | |
476 | SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); | |
477 | consumed = self.skip_response; | |
478 | self.skip_response = 0; | |
479 | } | |
480 | } | |
481 | ||
482 | while current.len() > 0 { | |
483 | let mut skipped = false; | |
484 | SCLogDebug!("response: handling {}", current.len()); | |
485 | match parse_message(current, self.protocol_version, self.max_msg_len as usize) { | |
486 | Ok((rem, msg)) => { | |
487 | SCLogDebug!("response msg {:?}", msg); | |
488 | if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { | |
489 | SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); | |
490 | if trunc.skipped_length >= current.len() { | |
491 | skipped = true; | |
492 | self.skip_response = trunc.skipped_length - current.len(); | |
493 | } else { | |
494 | current = ¤t[trunc.skipped_length..]; | |
495 | self.skip_response = 0; | |
496 | } | |
497 | SCLogDebug!("skip_response now {}", self.skip_response); | |
498 | } | |
499 | self.handle_msg(msg, true); | |
500 | if skipped { | |
501 | return AppLayerResult::ok(); | |
502 | } | |
503 | consumed += current.len() - rem.len(); | |
504 | current = rem; | |
505 | } | |
506 | Err(nom::Err::Incomplete(_)) => { | |
507 | SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); | |
508 | return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); | |
509 | } | |
510 | Err(_) => { | |
511 | return AppLayerResult::err(); | |
512 | } | |
513 | } | |
514 | } | |
515 | ||
516 | return AppLayerResult::ok(); | |
517 | } | |
518 | ||
519 | fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) { | |
520 | let ev = event as u8; | |
521 | core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev); | |
522 | } | |
523 | ||
524 | fn tx_iterator( | |
525 | &mut self, | |
526 | min_tx_id: u64, | |
527 | state: &mut u64, | |
528 | ) -> Option<(&MQTTTransaction, u64, bool)> { | |
529 | let mut index = *state as usize; | |
530 | let len = self.transactions.len(); | |
531 | ||
532 | while index < len { | |
533 | let tx = &self.transactions[index]; | |
534 | if tx.tx_id < min_tx_id + 1 { | |
535 | index += 1; | |
536 | continue; | |
537 | } | |
538 | *state = index as u64; | |
539 | return Some((tx, tx.tx_id - 1, (len - index) > 1)); | |
540 | } | |
541 | ||
542 | return None; | |
543 | } | |
544 | } | |
545 | ||
546 | // C exports. | |
547 | ||
548 | export_tx_get_detect_state!(rs_mqtt_tx_get_detect_state, MQTTTransaction); | |
549 | export_tx_set_detect_state!(rs_mqtt_tx_set_detect_state, MQTTTransaction); | |
550 | ||
551 | #[no_mangle] | |
363b5f99 | 552 | pub unsafe extern "C" fn rs_mqtt_probing_parser( |
c3136007 SS |
553 | _flow: *const Flow, |
554 | _direction: u8, | |
555 | input: *const u8, | |
556 | input_len: u32, | |
557 | _rdir: *mut u8, | |
558 | ) -> AppProto { | |
559 | let buf = build_slice!(input, input_len as usize); | |
560 | match parse_fixed_header(buf) { | |
561 | Ok((_, hdr)) => { | |
562 | // reject unassigned message type | |
563 | if hdr.message_type == MQTTTypeCode::UNASSIGNED { | |
363b5f99 | 564 | return ALPROTO_FAILED; |
c3136007 SS |
565 | } |
566 | // with 2 being the highest valid QoS level | |
567 | if hdr.qos_level > 2 { | |
363b5f99 | 568 | return ALPROTO_FAILED; |
c3136007 | 569 | } |
363b5f99 | 570 | return ALPROTO_MQTT; |
c3136007 SS |
571 | }, |
572 | Err(nom::Err::Incomplete(_)) => ALPROTO_UNKNOWN, | |
363b5f99 | 573 | Err(_) => ALPROTO_FAILED |
c3136007 SS |
574 | } |
575 | } | |
576 | ||
577 | #[no_mangle] | |
547d6c2d | 578 | pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void { |
c3136007 SS |
579 | let state = MQTTState::new(); |
580 | let boxed = Box::new(state); | |
53413f2d | 581 | return Box::into_raw(boxed) as *mut _; |
c3136007 SS |
582 | } |
583 | ||
584 | #[no_mangle] | |
585 | pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) { | |
53413f2d | 586 | std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) }); |
c3136007 SS |
587 | } |
588 | ||
589 | #[no_mangle] | |
363b5f99 | 590 | pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) { |
c3136007 SS |
591 | let state = cast_pointer!(state, MQTTState); |
592 | state.free_tx(tx_id); | |
593 | } | |
594 | ||
595 | #[no_mangle] | |
363b5f99 | 596 | pub unsafe extern "C" fn rs_mqtt_parse_request( |
c3136007 SS |
597 | _flow: *const Flow, |
598 | state: *mut std::os::raw::c_void, | |
599 | _pstate: *mut std::os::raw::c_void, | |
600 | input: *const u8, | |
601 | input_len: u32, | |
602 | _data: *const std::os::raw::c_void, | |
603 | _flags: u8, | |
604 | ) -> AppLayerResult { | |
605 | let state = cast_pointer!(state, MQTTState); | |
606 | let buf = build_slice!(input, input_len as usize); | |
ac3a20b6 | 607 | return state.parse_request(buf); |
c3136007 SS |
608 | } |
609 | ||
610 | #[no_mangle] | |
363b5f99 | 611 | pub unsafe extern "C" fn rs_mqtt_parse_response( |
c3136007 SS |
612 | _flow: *const Flow, |
613 | state: *mut std::os::raw::c_void, | |
614 | _pstate: *mut std::os::raw::c_void, | |
615 | input: *const u8, | |
616 | input_len: u32, | |
617 | _data: *const std::os::raw::c_void, | |
618 | _flags: u8, | |
619 | ) -> AppLayerResult { | |
620 | let state = cast_pointer!(state, MQTTState); | |
621 | let buf = build_slice!(input, input_len as usize); | |
ac3a20b6 | 622 | return state.parse_response(buf); |
c3136007 SS |
623 | } |
624 | ||
625 | #[no_mangle] | |
363b5f99 | 626 | pub unsafe extern "C" fn rs_mqtt_state_get_tx( |
c3136007 SS |
627 | state: *mut std::os::raw::c_void, |
628 | tx_id: u64, | |
629 | ) -> *mut std::os::raw::c_void { | |
630 | let state = cast_pointer!(state, MQTTState); | |
631 | match state.get_tx(tx_id) { | |
632 | Some(tx) => { | |
53413f2d | 633 | return tx as *const _ as *mut _; |
c3136007 SS |
634 | } |
635 | None => { | |
636 | return std::ptr::null_mut(); | |
637 | } | |
638 | } | |
639 | } | |
640 | ||
641 | #[no_mangle] | |
363b5f99 | 642 | pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 { |
c3136007 SS |
643 | let state = cast_pointer!(state, MQTTState); |
644 | return state.tx_id; | |
645 | } | |
646 | ||
c3136007 | 647 | #[no_mangle] |
363b5f99 | 648 | pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int { |
c3136007 SS |
649 | let tx = cast_pointer!(tx, MQTTTransaction); |
650 | if tx.toclient { | |
651 | return 1; | |
652 | } | |
653 | return 0; | |
654 | } | |
655 | ||
656 | #[no_mangle] | |
363b5f99 | 657 | pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress( |
c3136007 SS |
658 | tx: *mut std::os::raw::c_void, |
659 | direction: u8, | |
660 | ) -> std::os::raw::c_int { | |
661 | let tx = cast_pointer!(tx, MQTTTransaction); | |
662 | if tx.complete { | |
663 | if direction == core::STREAM_TOSERVER { | |
664 | if tx.toserver { | |
665 | return 1; | |
666 | } | |
667 | } else if direction == core::STREAM_TOCLIENT { | |
668 | if tx.toclient { | |
669 | return 1; | |
670 | } | |
671 | } | |
672 | } | |
673 | return 0; | |
674 | } | |
675 | ||
676 | #[no_mangle] | |
363b5f99 | 677 | pub unsafe extern "C" fn rs_mqtt_tx_get_logged( |
c3136007 SS |
678 | _state: *mut std::os::raw::c_void, |
679 | tx: *mut std::os::raw::c_void, | |
680 | ) -> u32 { | |
681 | let tx = cast_pointer!(tx, MQTTTransaction); | |
682 | return tx.logged.get(); | |
683 | } | |
684 | ||
685 | #[no_mangle] | |
363b5f99 | 686 | pub unsafe extern "C" fn rs_mqtt_tx_set_logged( |
c3136007 SS |
687 | _state: *mut std::os::raw::c_void, |
688 | tx: *mut std::os::raw::c_void, | |
689 | logged: u32, | |
690 | ) { | |
691 | let tx = cast_pointer!(tx, MQTTTransaction); | |
692 | tx.logged.set(logged); | |
693 | } | |
694 | ||
695 | #[no_mangle] | |
363b5f99 | 696 | pub unsafe extern "C" fn rs_mqtt_state_get_events( |
c3136007 SS |
697 | tx: *mut std::os::raw::c_void, |
698 | ) -> *mut core::AppLayerDecoderEvents { | |
699 | let tx = cast_pointer!(tx, MQTTTransaction); | |
700 | return tx.events; | |
701 | } | |
702 | ||
c3136007 | 703 | #[no_mangle] |
363b5f99 | 704 | pub unsafe extern "C" fn rs_mqtt_state_get_tx_iterator( |
c3136007 SS |
705 | _ipproto: u8, |
706 | _alproto: AppProto, | |
707 | state: *mut std::os::raw::c_void, | |
708 | min_tx_id: u64, | |
709 | _max_tx_id: u64, | |
710 | istate: &mut u64, | |
711 | ) -> applayer::AppLayerGetTxIterTuple { | |
712 | let state = cast_pointer!(state, MQTTState); | |
713 | match state.tx_iterator(min_tx_id, istate) { | |
714 | Some((tx, out_tx_id, has_next)) => { | |
53413f2d | 715 | let c_tx = tx as *const _ as *mut _; |
c3136007 SS |
716 | let ires = applayer::AppLayerGetTxIterTuple::with_values(c_tx, out_tx_id, has_next); |
717 | return ires; | |
718 | } | |
719 | None => { | |
720 | return applayer::AppLayerGetTxIterTuple::not_found(); | |
721 | } | |
722 | } | |
723 | } | |
724 | ||
725 | // Parser name as a C style string. | |
726 | const PARSER_NAME: &'static [u8] = b"mqtt\0"; | |
727 | ||
728 | export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction); | |
729 | ||
730 | #[no_mangle] | |
731 | pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { | |
732 | let default_port = CString::new("[1883]").unwrap(); | |
733 | let max_msg_len = &mut MAX_MSG_LEN; | |
734 | *max_msg_len = cfg_max_msg_len; | |
735 | let parser = RustParser { | |
736 | name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char, | |
737 | default_port: default_port.as_ptr(), | |
738 | ipproto: IPPROTO_TCP, | |
739 | probe_ts: Some(rs_mqtt_probing_parser), | |
740 | probe_tc: Some(rs_mqtt_probing_parser), | |
741 | min_depth: 0, | |
742 | max_depth: 16, | |
743 | state_new: rs_mqtt_state_new, | |
744 | state_free: rs_mqtt_state_free, | |
745 | tx_free: rs_mqtt_state_tx_free, | |
746 | parse_ts: rs_mqtt_parse_request, | |
747 | parse_tc: rs_mqtt_parse_response, | |
748 | get_tx_count: rs_mqtt_state_get_tx_count, | |
749 | get_tx: rs_mqtt_state_get_tx, | |
efc9a7a3 VJ |
750 | tx_comp_st_ts: 1, |
751 | tx_comp_st_tc: 1, | |
c3136007 SS |
752 | tx_get_progress: rs_mqtt_tx_get_alstate_progress, |
753 | get_de_state: rs_mqtt_tx_get_detect_state, | |
754 | set_de_state: rs_mqtt_tx_set_detect_state, | |
755 | get_events: Some(rs_mqtt_state_get_events), | |
8eac5fc2 JI |
756 | get_eventinfo: Some(MQTTEvent::get_event_info), |
757 | get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id), | |
c3136007 SS |
758 | localstorage_new: None, |
759 | localstorage_free: None, | |
760 | get_files: None, | |
761 | get_tx_iterator: Some(rs_mqtt_state_get_tx_iterator), | |
762 | get_tx_data: rs_mqtt_get_tx_data, | |
763 | apply_tx_config: None, | |
ff674d0c | 764 | flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS, |
4da0d9bd | 765 | truncate: None, |
c3136007 SS |
766 | }; |
767 | ||
768 | let ip_proto_str = CString::new("tcp").unwrap(); | |
769 | ||
770 | if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 { | |
771 | let alproto = AppLayerRegisterProtocolDetection(&parser, 1); | |
772 | ALPROTO_MQTT = alproto; | |
773 | if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 { | |
774 | let _ = AppLayerRegisterParser(&parser, alproto); | |
775 | } | |
776 | } else { | |
777 | SCLogDebug!("Protocol detector and parser disabled for MQTT."); | |
778 | } | |
779 | } |