]>
Commit | Line | Data |
---|---|---|
40b54a1f | 1 | //-------------------------------------------------------------------------- |
b88d016d | 2 | // Copyright (C) 2015-2024 Cisco and/or its affiliates. All rights reserved. |
40b54a1f RC |
3 | // |
4 | // This program is free software; you can redistribute it and/or modify it | |
5 | // under the terms of the GNU General Public License Version 2 as published | |
6 | // by the Free Software Foundation. You may not use, modify or distribute | |
7 | // this program under any other version of the GNU General Public License. | |
8 | // | |
9 | // This program is distributed in the hope that it will be useful, but | |
10 | // WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
12 | // General Public License for more details. | |
13 | // | |
14 | // You should have received a copy of the GNU General Public License along | |
15 | // with this program; if not, write to the Free Software Foundation, Inc., | |
16 | // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |
17 | //-------------------------------------------------------------------------- | |
728c88e5 | 18 | // ha.cc authors Ed Borgoyn <eborgoyn@cisco.com>, Michael Altizer <mialtize@cisco.com> |
40b54a1f | 19 | |
02d63397 MA |
20 | #ifdef HAVE_CONFIG_H |
21 | #include "config.h" | |
22 | #endif | |
23 | ||
40b54a1f RC |
24 | #include "ha.h" |
25 | ||
02d63397 | 26 | #include "framework/counts.h" |
79e8d40f | 27 | #include "log/messages.h" |
01e1d7d0 | 28 | #include "packet_io/active.h" |
728c88e5 MA |
29 | #include "packet_io/sfdaq_instance.h" |
30 | #include "protocols/packet.h" | |
31 | #include "side_channel/side_channel.h" | |
4c0373da | 32 | #include "stream/stream.h" |
71d8ae0f EB |
33 | #include "time/packet_time.h" |
34 | ||
02d63397 MA |
35 | #include "flow.h" |
36 | #include "flow_key.h" | |
728c88e5 | 37 | #include "ha_module.h" |
63e28351 | 38 | #include "session.h" |
02d63397 | 39 | |
a19c078d TP |
40 | using namespace snort; |
41 | ||
728c88e5 MA |
42 | enum HAEvent |
43 | { | |
44 | HA_DELETE_EVENT = 1, | |
45 | HA_UPDATE_EVENT = 2 | |
46 | }; | |
33ac31c8 | 47 | |
728c88e5 MA |
48 | struct __attribute__((__packed__)) HAMessageHeader |
49 | { | |
50 | uint8_t event; | |
51 | uint8_t version; | |
52 | uint16_t total_length; | |
53 | uint8_t key_type; | |
54 | }; | |
55 | ||
56 | struct __attribute__((__packed__)) HAClientHeader | |
57 | { | |
58 | uint8_t client; | |
59 | uint8_t length; | |
60 | }; | |
61 | ||
62 | // One client for each mask bit plus one 'automatic' session client | |
63 | // client handle = (1<<(client_index-1) | |
64 | // session client has handle of 0 and index of 0 | |
65 | static constexpr uint8_t MAX_CLIENTS = 17; | |
33ac31c8 | 66 | |
728c88e5 MA |
67 | // HighAvailability is the thread-local state/configuration instantiated for each packet thread. |
68 | typedef std::array<FlowHAClient*, MAX_CLIENTS> ClientMap; | |
69 | class HighAvailability | |
70 | { | |
71 | public: | |
72 | HighAvailability(PortBitSet*,bool); | |
73 | ~HighAvailability(); | |
74 | ||
75 | void process_update(Flow*, Packet*); | |
76 | void process_deletion(Flow&); | |
77 | void process_receive(); | |
78 | ||
79 | Flow* process_daq_import(Packet&, FlowKey&); | |
80 | ||
81 | // The [0] entry contains the stream client (always present) | |
82 | // Entries [1] to [MAX_CLIENTS-1] contain the optional clients | |
83 | ClientMap client_map = { }; | |
84 | uint8_t handle_counter = 1; // stream client (index == 0) always exists | |
85 | bool shutting_down = false; | |
86 | ||
87 | private: | |
88 | SideChannel* sc = nullptr; | |
89 | bool use_daq_channel; | |
90 | }; | |
91 | ||
67bf7c8b RS |
92 | |
93 | // Ensure to increment both versions simultaneously to maintain consistency | |
94 | #ifndef DISABLE_TENANT_ID | |
2a0742e1 | 95 | static constexpr uint8_t HA_MESSAGE_VERSION = 4; |
67bf7c8b RS |
96 | #else |
97 | static constexpr uint8_t HA_MESSAGE_VERSION = 5; | |
98 | #endif | |
728c88e5 MA |
99 | |
100 | // define message size and content constants. | |
101 | static constexpr uint8_t KEY_SIZE_IP6 = sizeof(FlowKey); | |
102 | // ip4 key is smaller by 2*(ip6_addr_size - ip4_addr_size) or 2 * (16 - 4) = 24 | |
103 | static constexpr uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24; | |
0047d0e6 EB |
104 | |
105 | enum | |
106 | { | |
22def9c0 EB |
107 | KEY_TYPE_IP6 = 1, |
108 | KEY_TYPE_IP4 = 2 | |
0047d0e6 | 109 | }; |
33ac31c8 | 110 | |
728c88e5 MA |
111 | static constexpr FlowHAClientHandle SESSION_HA_CLIENT = 0x0000; |
112 | static constexpr uint8_t SESSION_HA_CLIENT_INDEX = 0; | |
40b54a1f | 113 | |
71d8ae0f EB |
114 | PortBitSet* HighAvailabilityManager::ports = nullptr; |
115 | bool HighAvailabilityManager::use_daq_channel = false; | |
728c88e5 | 116 | |
71d8ae0f | 117 | struct timeval FlowHAState::min_session_lifetime; |
5e1328c0 | 118 | struct timeval FlowHAState::min_sync_interval; |
db78ffe8 | 119 | |
728c88e5 | 120 | static THREAD_LOCAL HighAvailability* ha; |
db78ffe8 | 121 | |
33ac31c8 | 122 | static inline bool is_ip6_key(const FlowKey* key) |
db78ffe8 EB |
123 | { |
124 | return (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) || | |
33ac31c8 | 125 | key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF)); |
db78ffe8 | 126 | } |
40b54a1f | 127 | |
71d8ae0f | 128 | FlowHAState::FlowHAState() |
40b54a1f | 129 | { |
e1b25ca3 | 130 | // Set the initial update time to now+min_session_lifetime |
5e1328c0 | 131 | packet_gettimeofday(&next_update); |
728c88e5 | 132 | timeradd(&next_update, &min_session_lifetime, &next_update); |
71d8ae0f | 133 | } |
40b54a1f | 134 | |
71d8ae0f EB |
135 | void FlowHAState::set_pending(FlowHAClientHandle handle) |
136 | { | |
728c88e5 | 137 | pending |= handle; |
71d8ae0f | 138 | } |
40b54a1f | 139 | |
33ac31c8 EB |
140 | bool FlowHAState::check_pending(FlowHAClientHandle handle) |
141 | { | |
728c88e5 | 142 | return ((pending & handle) != 0); |
33ac31c8 EB |
143 | } |
144 | ||
145 | void FlowHAState::clear_pending(FlowHAClientHandle handle) | |
71d8ae0f | 146 | { |
728c88e5 | 147 | pending &= ~handle; |
71d8ae0f | 148 | } |
40b54a1f | 149 | |
33ac31c8 | 150 | void FlowHAState::set(uint8_t new_state) |
71d8ae0f | 151 | { |
5e1328c0 | 152 | state = new_state; |
71d8ae0f EB |
153 | } |
154 | ||
5e1328c0 | 155 | void FlowHAState::add(uint8_t new_state) |
71d8ae0f | 156 | { |
5e1328c0 | 157 | state |= new_state; |
71d8ae0f EB |
158 | } |
159 | ||
33ac31c8 | 160 | void FlowHAState::clear(uint8_t old_state) |
71d8ae0f | 161 | { |
5e1328c0 | 162 | state &= ~old_state; |
71d8ae0f EB |
163 | } |
164 | ||
5e1328c0 | 165 | bool FlowHAState::check_any(uint8_t state_mask) |
33ac31c8 EB |
166 | { |
167 | return (state & state_mask) != 0; | |
168 | } | |
169 | ||
5e1328c0 | 170 | void FlowHAState::config_timers(struct timeval min_lifetime, struct timeval min_interval) |
71d8ae0f EB |
171 | { |
172 | min_session_lifetime = min_lifetime; | |
5e1328c0 | 173 | min_sync_interval = min_interval; |
71d8ae0f EB |
174 | } |
175 | ||
5e1328c0 | 176 | bool FlowHAState::sync_interval_elapsed() |
71d8ae0f EB |
177 | { |
178 | struct timeval pkt_time; | |
179 | ||
180 | packet_gettimeofday(&pkt_time); | |
181 | ||
182 | return ( ( pkt_time.tv_sec > next_update.tv_sec ) || | |
33ac31c8 EB |
183 | ( ( pkt_time.tv_sec == next_update.tv_sec ) && |
184 | ( pkt_time.tv_usec > next_update.tv_usec ) ) ); | |
71d8ae0f EB |
185 | } |
186 | ||
63e28351 GK |
187 | void FlowHAState::init_next_update() |
188 | { | |
189 | packet_gettimeofday(&next_update); | |
190 | timeradd(&next_update, &min_session_lifetime, &next_update); | |
191 | } | |
192 | ||
71d8ae0f EB |
193 | void FlowHAState::set_next_update() |
194 | { | |
728c88e5 | 195 | timeradd(&next_update, &min_sync_interval, &next_update); |
71d8ae0f EB |
196 | } |
197 | ||
2a230121 RC |
198 | void FlowHAState::reset() |
199 | { | |
200 | state = INITIAL_STATE; | |
201 | pending = NONE_PENDING; | |
63e28351 | 202 | init_next_update(); |
2a230121 RC |
203 | } |
204 | ||
4758ecbe | 205 | FlowHAClient::FlowHAClient(uint8_t length, bool session_client) : max_length(length) |
db78ffe8 | 206 | { |
728c88e5 | 207 | if (!ha) |
79e8d40f RC |
208 | return; |
209 | ||
728c88e5 | 210 | if (session_client) |
db78ffe8 | 211 | { |
728c88e5 | 212 | index = SESSION_HA_CLIENT_INDEX; |
fd52a3f3 | 213 | handle = SESSION_HA_CLIENT; |
728c88e5 | 214 | ha->client_map[0] = this; |
db78ffe8 EB |
215 | } |
216 | else | |
217 | { | |
728c88e5 | 218 | if (ha->handle_counter >= MAX_CLIENTS) |
79e8d40f RC |
219 | { |
220 | ErrorMessage("Attempting to register too many FlowHAClients\n"); | |
221 | return; | |
222 | } | |
223 | ||
728c88e5 MA |
224 | index = ha->handle_counter; |
225 | handle = (1 << (index - 1)); | |
226 | ha->client_map[index] = this; | |
227 | ha->handle_counter++; | |
79e8d40f | 228 | } |
79e8d40f RC |
229 | } |
230 | ||
33ac31c8 | 231 | // Write the key type, key length, and key into the message. |
728c88e5 | 232 | // Return the type of key written so it can be stored in the message header. |
ab79fd42 | 233 | static uint8_t write_flow_key(const Flow& flow, HAMessage& msg) |
728c88e5 MA |
234 | { |
235 | const FlowKey* key = flow.key; | |
33ac31c8 EB |
236 | assert(key); |
237 | ||
728c88e5 | 238 | if (is_ip6_key(flow.key)) |
33ac31c8 | 239 | { |
728c88e5 MA |
240 | memcpy(msg.cursor, key, KEY_SIZE_IP6); |
241 | msg.advance_cursor(KEY_SIZE_IP6); | |
587470f8 | 242 | |
728c88e5 | 243 | return KEY_TYPE_IP6; |
33ac31c8 | 244 | } |
33ac31c8 | 245 | |
728c88e5 MA |
246 | memcpy(msg.cursor, &key->ip_l[3], sizeof(key->ip_l[3])); |
247 | msg.advance_cursor(sizeof(key->ip_l[3])); | |
248 | memcpy(msg.cursor, &key->ip_h[3], sizeof(key->ip_h[3])); | |
249 | msg.advance_cursor(sizeof(key->ip_h[3])); | |
250 | memcpy(msg.cursor, ((const uint8_t*) key) + 32, KEY_SIZE_IP4 - 8); | |
251 | msg.advance_cursor(KEY_SIZE_IP4 - 8); | |
587470f8 | 252 | |
728c88e5 | 253 | return KEY_TYPE_IP4; |
33ac31c8 EB |
254 | } |
255 | ||
728c88e5 MA |
256 | // Extract the key and return the key length. Position the cursor just after the key. |
257 | static uint8_t read_flow_key(HAMessage& msg, const HAMessageHeader* hdr, FlowKey& key) | |
33ac31c8 | 258 | { |
728c88e5 | 259 | if (hdr->key_type == KEY_TYPE_IP6) |
33ac31c8 | 260 | { |
728c88e5 MA |
261 | if (!msg.fits(KEY_SIZE_IP6)) |
262 | { | |
263 | ha_stats.truncated_msgs++; | |
264 | return 0; | |
265 | } | |
266 | ||
267 | memcpy(&key, msg.cursor, KEY_SIZE_IP6); | |
268 | msg.advance_cursor(KEY_SIZE_IP6); | |
587470f8 | 269 | |
22def9c0 | 270 | return KEY_SIZE_IP6; |
33ac31c8 | 271 | } |
728c88e5 | 272 | else if (hdr->key_type == KEY_TYPE_IP4) |
33ac31c8 | 273 | { |
728c88e5 MA |
274 | if (!msg.fits(KEY_SIZE_IP4)) |
275 | { | |
276 | ha_stats.truncated_msgs++; | |
277 | return 0; | |
278 | } | |
279 | ||
33ac31c8 | 280 | /* Lower IPv4 address */ |
728c88e5 MA |
281 | memcpy(&key.ip_l[3], msg.cursor, sizeof(key.ip_l[3])); |
282 | key.ip_l[0] = key.ip_l[1] = 0; | |
283 | key.ip_l[2] = htonl(0xFFFF); | |
284 | msg.advance_cursor(sizeof(key.ip_l[3])); | |
33ac31c8 | 285 | /* Higher IPv4 address */ |
728c88e5 MA |
286 | memcpy(&key.ip_h[3], msg.cursor, sizeof(key.ip_h[3])); |
287 | key.ip_h[0] = key.ip_h[1] = 0; | |
288 | key.ip_h[2] = htonl(0xFFFF); | |
289 | msg.advance_cursor(sizeof(key.ip_h[3])); | |
33ac31c8 | 290 | /* The remainder of the key */ |
728c88e5 MA |
291 | memcpy(((uint8_t*) &key) + 32, msg.cursor, KEY_SIZE_IP4 - 8); |
292 | msg.advance_cursor(KEY_SIZE_IP4 - 8); | |
587470f8 | 293 | |
22def9c0 | 294 | return KEY_SIZE_IP4; |
33ac31c8 | 295 | } |
728c88e5 MA |
296 | |
297 | ha_stats.unknown_key_type++; | |
298 | return 0; | |
33ac31c8 EB |
299 | } |
300 | ||
728c88e5 | 301 | static inline uint8_t key_size(Flow& flow) |
33ac31c8 | 302 | { |
728c88e5 MA |
303 | assert(flow.key); |
304 | return is_ip6_key(flow.key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4; | |
33ac31c8 EB |
305 | } |
306 | ||
728c88e5 | 307 | static uint16_t calculate_msg_header_length(Flow& flow) |
33ac31c8 EB |
308 | { |
309 | return sizeof(HAMessageHeader) + key_size(flow); | |
310 | } | |
311 | ||
312 | // Calculate the UPDATE message content length based on the | |
313 | // set of active clients. The Session client is always present. | |
728c88e5 | 314 | static uint16_t calculate_update_msg_content_length(Flow& flow, bool full) |
33ac31c8 | 315 | { |
728c88e5 | 316 | assert(ha->client_map[0]); |
33ac31c8 | 317 | |
5e1328c0 RC |
318 | uint16_t length = 0; |
319 | ||
728c88e5 | 320 | for (int i = 0; i < ha->handle_counter; i++) |
8c3010cb RC |
321 | { |
322 | // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck | |
728c88e5 | 323 | if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1))) |
33ac31c8 | 324 | { |
728c88e5 | 325 | assert(ha->client_map[i]); |
63e28351 | 326 | length += (ha->client_map[i]->get_message_size(flow) + sizeof(HAClientHeader)); |
33ac31c8 | 327 | } |
8c3010cb | 328 | } |
33ac31c8 EB |
329 | |
330 | return length; | |
331 | } | |
332 | ||
333 | // Write the HA header and key sections. Position the cursor | |
334 | // at the beginning of the content section. | |
ab79fd42 | 335 | static void write_msg_header(const Flow& flow, HAEvent event, uint16_t content_length, HAMessage& msg) |
33ac31c8 | 336 | { |
728c88e5 MA |
337 | HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor; |
338 | hdr->event = (uint8_t) event; | |
22def9c0 | 339 | hdr->version = HA_MESSAGE_VERSION; |
33ac31c8 | 340 | hdr->total_length = content_length; |
728c88e5 MA |
341 | msg.advance_cursor(sizeof(HAMessageHeader)); |
342 | hdr->key_type = write_flow_key(flow, msg); | |
33ac31c8 EB |
343 | } |
344 | ||
ab79fd42 | 345 | static uint16_t update_msg_header_length(const HAMessage& msg) |
728c88e5 MA |
346 | { |
347 | HAMessageHeader* hdr = (HAMessageHeader*) msg.buffer; | |
348 | hdr->total_length = msg.cursor_position(); | |
349 | return hdr->total_length; | |
350 | } | |
351 | ||
352 | static void write_update_msg_client(FlowHAClient* client, Flow& flow, HAMessage& msg) | |
33ac31c8 | 353 | { |
79e8d40f | 354 | assert(client); |
33ac31c8 | 355 | |
728c88e5 MA |
356 | if (!msg.fits(sizeof(HAClientHeader))) |
357 | return; | |
358 | ||
359 | // Preemptively insert the client header. If production fails, roll back the message cursor | |
360 | // to its original position. | |
361 | uint8_t* original_cursor = msg.cursor; | |
362 | HAClientHeader* header = (HAClientHeader*) original_cursor; | |
363 | header->client = client->index; | |
364 | msg.advance_cursor(sizeof(HAClientHeader)); | |
365 | if (!client->produce(flow, msg)) | |
366 | { | |
367 | msg.reset_cursor(original_cursor); | |
368 | return; | |
369 | } | |
370 | assert(msg.cursor >= (original_cursor + sizeof(HAClientHeader))); | |
371 | header->length = (uint32_t) (msg.cursor - original_cursor - sizeof(HAClientHeader)); | |
79e8d40f RC |
372 | } |
373 | ||
728c88e5 | 374 | static void write_update_msg_content(Flow& flow, HAMessage& msg, bool full) |
79e8d40f | 375 | { |
728c88e5 | 376 | for (int i = 0; i < ha->handle_counter; i++) |
8c3010cb RC |
377 | { |
378 | // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck | |
728c88e5 MA |
379 | if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1))) |
380 | write_update_msg_client(ha->client_map[i], flow, msg); | |
8c3010cb | 381 | } |
33ac31c8 EB |
382 | } |
383 | ||
728c88e5 | 384 | static void consume_ha_delete_message(HAMessage&, const FlowKey& key) |
22def9c0 | 385 | { |
4c0373da | 386 | Stream::delete_flow(&key); |
22def9c0 EB |
387 | } |
388 | ||
63e28351 | 389 | static Flow* consume_ha_update_message(HAMessage& msg, const FlowKey& key, Packet* p) |
22def9c0 | 390 | { |
22def9c0 | 391 | // flow will be nullptr if/when the session does not exist in the caches |
63e28351 | 392 | bool no_flow_found = false; |
4c0373da | 393 | Flow* flow = Stream::get_flow(&key); |
728c88e5 | 394 | if (!flow) |
63e28351 GK |
395 | { |
396 | no_flow_found = true; | |
728c88e5 | 397 | ha_stats.update_msgs_recv_no_flow++; |
63e28351 | 398 | } |
22def9c0 | 399 | |
728c88e5 MA |
400 | // pointer to one past the last byte in the message |
401 | const uint8_t* content_end = msg.buffer + msg.buffer_length; | |
79e8d40f | 402 | |
728c88e5 | 403 | while (msg.cursor < content_end) |
79e8d40f RC |
404 | { |
405 | // do we have sufficient message left to be able to have an HAClientHeader? | |
728c88e5 | 406 | if (!msg.fits(sizeof(HAClientHeader))) |
79e8d40f RC |
407 | { |
408 | ErrorMessage("Consuming HA Update message - no HAClientHeader\n"); | |
728c88e5 | 409 | ha_stats.truncated_msgs++; |
79e8d40f RC |
410 | break; |
411 | } | |
412 | ||
728c88e5 MA |
413 | HAClientHeader* header = (HAClientHeader*) msg.cursor; |
414 | if ((header->client >= ha->handle_counter) || (ha->client_map[header->client] == nullptr)) | |
79e8d40f RC |
415 | { |
416 | ErrorMessage("Consuming HA Update message - invalid client index\n"); | |
728c88e5 | 417 | ha_stats.unknown_client_idx++; |
79e8d40f RC |
418 | break; |
419 | } | |
728c88e5 | 420 | msg.advance_cursor(sizeof(HAClientHeader)); // step to the client content |
79e8d40f | 421 | |
728c88e5 | 422 | if (!msg.fits(header->length)) |
79e8d40f | 423 | { |
e1b25ca3 | 424 | ErrorMessage("Consuming HA Update message - message too short\n"); |
728c88e5 | 425 | ha_stats.truncated_msgs++; |
79e8d40f RC |
426 | break; |
427 | } | |
428 | ||
5e1328c0 RC |
429 | // If the Flow does not exist in the caches, flow will be nullptr |
430 | // upon entry into this message processing loop. Since the session | |
431 | // client is always the first segment of the message, the consume() | |
432 | // invocation for the session client will create the flow. This | |
433 | // flow can in turn be used by subsequent FlowHAClient's. | |
728c88e5 | 434 | if (!ha->client_map[header->client]->consume(flow, &key, msg, header->length)) |
79e8d40f RC |
435 | { |
436 | ErrorMessage("Consuming HA Update message - error from client consume()\n"); | |
728c88e5 | 437 | ha_stats.client_consume_errors++; |
79e8d40f RC |
438 | break; |
439 | } | |
440 | } | |
728c88e5 MA |
441 | |
442 | if (msg.cursor == content_end) | |
443 | ha_stats.update_msgs_consumed++; | |
444 | ||
63e28351 GK |
445 | if( p && no_flow_found && flow && flow->session ) |
446 | { | |
caccd0b8 | 447 | p->flow = flow; |
63e28351 | 448 | flow->session->setup(p); |
774304f6 PB |
449 | flow->set_direction(p); |
450 | flow->set_client_initiate(p); | |
451 | ||
452 | if (p->is_from_client()) | |
453 | { | |
454 | flow->client_intf = p->pkth->ingress_index; | |
455 | flow->server_intf = p->pkth->egress_index; | |
456 | flow->client_group = p->pkth->ingress_group; | |
457 | flow->server_group = p->pkth->egress_group; | |
458 | } | |
459 | else | |
460 | { | |
461 | flow->client_intf = p->pkth->egress_index; | |
462 | flow->server_intf = p->pkth->ingress_index; | |
463 | flow->client_group = p->pkth->egress_group; | |
464 | flow->server_group = p->pkth->ingress_group; | |
465 | } | |
63e28351 GK |
466 | } |
467 | ||
728c88e5 | 468 | return flow; |
22def9c0 EB |
469 | } |
470 | ||
7b248512 NG |
471 | static Flow* consume_ha_message(HAMessage& msg, |
472 | FlowKey* packet_key = nullptr, Packet* p = nullptr) | |
22def9c0 | 473 | { |
728c88e5 | 474 | ha_stats.msgs_recv++; |
22def9c0 | 475 | |
728c88e5 MA |
476 | if (!msg.fits(sizeof(HAMessageHeader))) |
477 | { | |
478 | ha_stats.truncated_msgs++; | |
479 | return nullptr; | |
480 | } | |
22def9c0 | 481 | |
728c88e5 MA |
482 | const HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor; |
483 | ||
484 | if (hdr->version != HA_MESSAGE_VERSION) | |
485 | { | |
486 | ha_stats.msg_version_mismatch++; | |
487 | return nullptr; | |
488 | } | |
489 | ||
490 | if (hdr->total_length != msg.buffer_length) | |
491 | { | |
492 | ha_stats.msg_length_mismatch++; | |
493 | return nullptr; | |
494 | } | |
495 | ||
496 | msg.advance_cursor(sizeof(HAMessageHeader)); | |
497 | ||
498 | FlowKey key; | |
499 | if (read_flow_key(msg, hdr, key) == 0) | |
500 | return nullptr; | |
501 | ||
2a0742e1 | 502 | if (packet_key and !FlowKey::is_equal(packet_key, &key)) |
7b248512 NG |
503 | { |
504 | ha_stats.key_mismatch++; | |
505 | return nullptr; | |
506 | } | |
507 | ||
728c88e5 MA |
508 | Flow* flow = nullptr; |
509 | switch (hdr->event) | |
22def9c0 EB |
510 | { |
511 | case HA_DELETE_EVENT: | |
512 | { | |
728c88e5 MA |
513 | consume_ha_delete_message(msg, key); |
514 | ha_stats.delete_msgs_consumed++; | |
22def9c0 EB |
515 | break; |
516 | } | |
517 | case HA_UPDATE_EVENT: | |
518 | { | |
63e28351 | 519 | flow = consume_ha_update_message(msg, key, p); |
728c88e5 | 520 | ha_stats.update_msgs_recv++; |
22def9c0 EB |
521 | break; |
522 | } | |
22def9c0 | 523 | } |
728c88e5 MA |
524 | |
525 | return flow; | |
526 | } | |
527 | ||
528 | static void ha_sc_receive_handler(SCMessage* sc_msg) | |
529 | { | |
530 | assert(sc_msg); | |
531 | ||
532 | // SC received messages must have reference back to SideChannel object | |
533 | assert(sc_msg->sc); | |
534 | ||
535 | HAMessage ha_msg(sc_msg->content, sc_msg->content_length); | |
536 | consume_ha_message(ha_msg); | |
537 | ||
538 | sc_msg->sc->discard_message(sc_msg); | |
22def9c0 EB |
539 | } |
540 | ||
728c88e5 | 541 | HighAvailability::HighAvailability(PortBitSet* ports, bool daq_channel) |
71d8ae0f | 542 | { |
71d8ae0f | 543 | using namespace std::placeholders; |
71d8ae0f | 544 | |
728c88e5 MA |
545 | // If side channel ports were configured, find the first matching side channel to associate with |
546 | if (ports != nullptr) | |
1c8d6d53 | 547 | { |
728c88e5 MA |
548 | for (SCPort port = 0; port < ports->size(); port++) |
549 | { | |
550 | if (!ports->test(port)) | |
551 | continue; | |
552 | ||
553 | sc = SideChannelManager::get_side_channel(port); | |
554 | if (sc) | |
71d8ae0f | 555 | { |
728c88e5 MA |
556 | // We require a duplex channel |
557 | if (sc->get_direction() != Connector::CONN_DUPLEX) | |
33ac31c8 | 558 | { |
728c88e5 MA |
559 | sc = nullptr; |
560 | continue; | |
33ac31c8 | 561 | } |
728c88e5 MA |
562 | sc->set_default_port(port); |
563 | sc->register_receive_handler(ha_sc_receive_handler); | |
71d8ae0f | 564 | } |
728c88e5 MA |
565 | break; |
566 | } | |
1c8d6d53 | 567 | } |
728c88e5 | 568 | use_daq_channel = daq_channel; |
40b54a1f RC |
569 | } |
570 | ||
571 | HighAvailability::~HighAvailability() | |
572 | { | |
728c88e5 | 573 | if (sc) |
71d8ae0f | 574 | sc->unregister_receive_handler(); |
40b54a1f RC |
575 | } |
576 | ||
728c88e5 | 577 | static void send_sc_update_message(Flow& flow, SideChannel& sc) |
40b54a1f | 578 | { |
728c88e5 MA |
579 | const uint16_t header_len = calculate_msg_header_length(flow); |
580 | const uint16_t content_len = calculate_update_msg_content_length(flow, false); | |
581 | ||
582 | SCMessage* sc_msg = sc.alloc_transmit_message((uint32_t) (header_len + content_len)); | |
22def9c0 | 583 | assert(sc_msg); |
728c88e5 | 584 | HAMessage ha_msg(sc_msg->content, sc_msg->content_length); |
40b54a1f | 585 | |
728c88e5 MA |
586 | write_msg_header(flow, HA_UPDATE_EVENT, header_len + content_len, ha_msg); |
587 | write_update_msg_content(flow, ha_msg, false); | |
588 | update_msg_header_length(ha_msg); | |
589 | sc.transmit_message(sc_msg); | |
590 | } | |
591 | ||
592 | static void send_daq_update_message(Flow& flow, Packet& p) | |
593 | { | |
594 | static THREAD_LOCAL uint8_t daq_io_buffer[UINT16_MAX]; | |
22def9c0 | 595 | |
728c88e5 | 596 | HAMessage ha_msg(daq_io_buffer, sizeof(daq_io_buffer)); |
22def9c0 | 597 | |
728c88e5 MA |
598 | write_msg_header(flow, HA_UPDATE_EVENT, 0, ha_msg); |
599 | write_update_msg_content(flow, ha_msg, true); | |
600 | uint32_t len = update_msg_header_length(ha_msg); | |
601 | ||
602 | DIOCTL_FlowHAState fhs; | |
603 | fhs.msg = p.daq_msg; | |
604 | fhs.data = daq_io_buffer; | |
605 | fhs.length = len; | |
606 | ||
607 | p.daq_instance->ioctl(DIOCTL_SET_FLOW_HA_STATE, &fhs, sizeof(fhs)); | |
608 | ||
609 | ha_stats.daq_stores++; | |
40b54a1f RC |
610 | } |
611 | ||
728c88e5 | 612 | void HighAvailability::process_update(Flow* flow, Packet* p) |
40b54a1f | 613 | { |
728c88e5 | 614 | if (!flow) |
40b54a1f RC |
615 | return; |
616 | ||
5e1328c0 | 617 | // We must have the map array and the session client |
728c88e5 | 618 | assert(client_map[0]); |
5e1328c0 | 619 | |
728c88e5 | 620 | if ( !client_map[0]->is_update_required(flow) && |
5e1328c0 RC |
621 | ( !flow->ha_state->check_pending(ALL_CLIENTS) || |
622 | flow->ha_state->check_any(FlowHAState::NEW) ) ) | |
623 | return; | |
624 | ||
728c88e5 MA |
625 | if (sc) |
626 | send_sc_update_message(*flow, *sc); | |
33ac31c8 | 627 | |
728c88e5 MA |
628 | if (use_daq_channel && p && p->daq_msg) |
629 | send_daq_update_message(*flow, *p); | |
5e1328c0 RC |
630 | |
631 | flow->ha_state->clear(FlowHAState::NEW | FlowHAState::MODIFIED | | |
632 | FlowHAState::MAJOR | FlowHAState::CRITICAL); | |
633 | flow->ha_state->clear_pending(ALL_CLIENTS); | |
634 | flow->ha_state->set_next_update(); | |
db78ffe8 EB |
635 | } |
636 | ||
728c88e5 | 637 | static void send_sc_deletion_message(Flow& flow, SideChannel& sc) |
db78ffe8 | 638 | { |
33ac31c8 | 639 | const uint32_t msg_len = calculate_msg_header_length(flow); |
728c88e5 MA |
640 | SCMessage* sc_msg = sc.alloc_transmit_message(msg_len); |
641 | HAMessage ha_msg(sc_msg->content, sc_msg->content_length); | |
33ac31c8 EB |
642 | |
643 | // No content, only header+key | |
728c88e5 | 644 | write_msg_header(flow, HA_DELETE_EVENT, msg_len, ha_msg); |
33ac31c8 | 645 | |
728c88e5 MA |
646 | sc.transmit_message(sc_msg); |
647 | } | |
648 | ||
649 | void HighAvailability::process_deletion(Flow& flow) | |
650 | { | |
651 | // No need to send message if we already have, we are in standby, or | |
652 | // we have just been created and haven't yet sent an update | |
653 | if (flow.ha_state->check_any(FlowHAState::NEW | FlowHAState::DELETED | FlowHAState::STANDBY)) | |
654 | return; | |
655 | ||
656 | // Only produce deletion messages when using a side channel | |
657 | if (sc) | |
658 | send_sc_deletion_message(flow, *sc); | |
33ac31c8 | 659 | |
728c88e5 | 660 | flow.ha_state->add(FlowHAState::DELETED); |
db78ffe8 | 661 | } |
40b54a1f | 662 | |
db78ffe8 EB |
663 | void HighAvailability::process_receive() |
664 | { | |
728c88e5 | 665 | if (sc) |
0047d0e6 | 666 | sc->process(DISPATCH_ALL_RECEIVE); |
40b54a1f RC |
667 | } |
668 | ||
728c88e5 | 669 | Flow* HighAvailability::process_daq_import(Packet& p, FlowKey& key) |
71d8ae0f | 670 | { |
728c88e5 MA |
671 | Flow* flow = nullptr; |
672 | ||
673 | if (use_daq_channel && p.pkth->flags & DAQ_PKT_FLAG_HA_STATE_AVAIL) | |
674 | { | |
675 | DIOCTL_FlowHAState fhs; | |
676 | fhs.msg = p.daq_msg; | |
677 | ||
678 | if (p.daq_instance->ioctl(DIOCTL_GET_FLOW_HA_STATE, &fhs, sizeof(fhs)) == DAQ_SUCCESS) | |
679 | { | |
680 | HAMessage ha_msg(fhs.data, fhs.length); | |
7b248512 | 681 | flow = consume_ha_message(ha_msg, &key, &p); |
728c88e5 MA |
682 | ha_stats.daq_imports++; |
683 | // Validate that the imported flow matches up with the given flow key. | |
684 | if (flow) | |
685 | { | |
0d3b62e3 | 686 | if (Flow::FlowState::INSPECT < flow->flow_state) |
728c88e5 | 687 | { |
7b248512 NG |
688 | flow->disable_inspection(); |
689 | p.disable_inspect = true; | |
728c88e5 | 690 | } |
7b248512 NG |
691 | // Clear the standby bit so that we don't immediately trigger a new data store |
692 | // FIXIT-L streamline the consume process so this doesn't have to be done here | |
693 | flow->ha_state->clear(FlowHAState::STANDBY); | |
728c88e5 MA |
694 | } |
695 | } | |
696 | } | |
7902706d | 697 | |
728c88e5 | 698 | return flow; |
71d8ae0f EB |
699 | } |
700 | ||
728c88e5 | 701 | void HighAvailabilityManager::reset_config() |
71d8ae0f | 702 | { |
728c88e5 MA |
703 | if (ports) |
704 | { | |
705 | delete ports; | |
706 | ports = nullptr; | |
707 | } | |
708 | } | |
709 | ||
710 | void HighAvailabilityManager::term() | |
711 | { | |
712 | reset_config(); | |
713 | } | |
714 | ||
715 | // Called within the main thread after the initial configuration has been read | |
716 | void HighAvailabilityManager::configure(HighAvailabilityConfig* config) | |
717 | { | |
718 | if (!config) | |
719 | { | |
720 | reset_config(); | |
721 | return; | |
722 | } | |
723 | ||
724 | if (config->ports) | |
725 | ports = new PortBitSet(*config->ports); | |
726 | else if (ports) | |
727 | { | |
728 | delete ports; | |
729 | ports = nullptr; | |
730 | } | |
731 | ||
732 | FlowHAState::config_timers(config->min_session_lifetime, config->min_sync_interval); | |
733 | ||
734 | use_daq_channel = config->daq_channel; | |
71d8ae0f EB |
735 | } |
736 | ||
737 | // Called within the packet thread prior to packet processing | |
40b54a1f RC |
738 | void HighAvailabilityManager::thread_init() |
739 | { | |
71d8ae0f | 740 | // create a a thread local instance iff we are configured to operate. |
728c88e5 MA |
741 | if (ports || use_daq_channel) |
742 | ha = new HighAvailability(ports, use_daq_channel); | |
71d8ae0f EB |
743 | else |
744 | ha = nullptr; | |
40b54a1f RC |
745 | } |
746 | ||
19aa43f5 TP |
747 | void HighAvailabilityManager::thread_term_beginning() |
748 | { | |
728c88e5 MA |
749 | if (ha) |
750 | ha->shutting_down = true; | |
19aa43f5 TP |
751 | } |
752 | ||
71d8ae0f | 753 | // Called in the packet thread at run-down |
40b54a1f RC |
754 | void HighAvailabilityManager::thread_term() |
755 | { | |
728c88e5 | 756 | if (ha) |
33ac31c8 | 757 | { |
71d8ae0f | 758 | delete ha; |
33ac31c8 EB |
759 | ha = nullptr; |
760 | } | |
40b54a1f RC |
761 | } |
762 | ||
728c88e5 | 763 | void HighAvailabilityManager::process_update(Flow* flow, Packet* p) |
db78ffe8 | 764 | { |
01e1d7d0 | 765 | if (ha && flow && !p->active->get_tunnel_bypass()) |
728c88e5 | 766 | ha->process_update(flow, p); |
db78ffe8 EB |
767 | } |
768 | ||
769 | // Deletion messages only contain session content | |
728c88e5 | 770 | void HighAvailabilityManager::process_deletion(Flow& flow) |
db78ffe8 | 771 | { |
728c88e5 | 772 | if (ha && !ha->shutting_down) |
db78ffe8 EB |
773 | ha->process_deletion(flow); |
774 | } | |
775 | ||
776 | void HighAvailabilityManager::process_receive() | |
40b54a1f | 777 | { |
728c88e5 | 778 | if (ha) |
db78ffe8 | 779 | ha->process_receive(); |
40b54a1f RC |
780 | } |
781 | ||
71d8ae0f EB |
782 | // Called in the packet threads to determine whether or not HA is active |
783 | bool HighAvailabilityManager::active() | |
784 | { | |
785 | return (ha != nullptr); | |
786 | } | |
19aa43f5 TP |
787 | |
788 | void HighAvailabilityManager::set_modified(Flow* flow) | |
789 | { | |
728c88e5 | 790 | if (ha && flow && flow->ha_state) |
19aa43f5 TP |
791 | flow->ha_state->add(FlowHAState::MODIFIED); |
792 | } | |
793 | ||
794 | bool HighAvailabilityManager::in_standby(Flow* flow) | |
795 | { | |
728c88e5 | 796 | if (ha && flow && flow->ha_state) |
19aa43f5 | 797 | return flow->ha_state->check_any(FlowHAState::STANDBY); |
728c88e5 MA |
798 | |
799 | return false; | |
800 | } | |
801 | ||
802 | Flow* HighAvailabilityManager::import(Packet& p, FlowKey& key) | |
803 | { | |
804 | if (!ha) | |
805 | return nullptr; | |
806 | ||
807 | return ha->process_daq_import(p, key); | |
19aa43f5 | 808 | } |