]> git.ipfire.org Git - thirdparty/snort3.git/blame - src/flow/ha.cc
Pull request #4436: build: generate and tag 3.3.6.0
[thirdparty/snort3.git] / src / flow / ha.cc
CommitLineData
40b54a1f 1//--------------------------------------------------------------------------
b88d016d 2// Copyright (C) 2015-2024 Cisco and/or its affiliates. All rights reserved.
40b54a1f
RC
3//
4// This program is free software; you can redistribute it and/or modify it
5// under the terms of the GNU General Public License Version 2 as published
6// by the Free Software Foundation. You may not use, modify or distribute
7// this program under any other version of the GNU General Public License.
8//
9// This program is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License along
15// with this program; if not, write to the Free Software Foundation, Inc.,
16// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17//--------------------------------------------------------------------------
728c88e5 18// ha.cc authors Ed Borgoyn <eborgoyn@cisco.com>, Michael Altizer <mialtize@cisco.com>
40b54a1f 19
02d63397
MA
20#ifdef HAVE_CONFIG_H
21#include "config.h"
22#endif
23
40b54a1f
RC
24#include "ha.h"
25
02d63397 26#include "framework/counts.h"
79e8d40f 27#include "log/messages.h"
01e1d7d0 28#include "packet_io/active.h"
728c88e5
MA
29#include "packet_io/sfdaq_instance.h"
30#include "protocols/packet.h"
31#include "side_channel/side_channel.h"
4c0373da 32#include "stream/stream.h"
71d8ae0f
EB
33#include "time/packet_time.h"
34
02d63397
MA
35#include "flow.h"
36#include "flow_key.h"
728c88e5 37#include "ha_module.h"
63e28351 38#include "session.h"
02d63397 39
a19c078d
TP
40using namespace snort;
41
728c88e5
MA
42enum HAEvent
43{
44 HA_DELETE_EVENT = 1,
45 HA_UPDATE_EVENT = 2
46};
33ac31c8 47
728c88e5
MA
48struct __attribute__((__packed__)) HAMessageHeader
49{
50 uint8_t event;
51 uint8_t version;
52 uint16_t total_length;
53 uint8_t key_type;
54};
55
56struct __attribute__((__packed__)) HAClientHeader
57{
58 uint8_t client;
59 uint8_t length;
60};
61
62// One client for each mask bit plus one 'automatic' session client
63// client handle = (1<<(client_index-1)
64// session client has handle of 0 and index of 0
65static constexpr uint8_t MAX_CLIENTS = 17;
33ac31c8 66
728c88e5
MA
67// HighAvailability is the thread-local state/configuration instantiated for each packet thread.
68typedef std::array<FlowHAClient*, MAX_CLIENTS> ClientMap;
69class HighAvailability
70{
71public:
72 HighAvailability(PortBitSet*,bool);
73 ~HighAvailability();
74
75 void process_update(Flow*, Packet*);
76 void process_deletion(Flow&);
77 void process_receive();
78
79 Flow* process_daq_import(Packet&, FlowKey&);
80
81 // The [0] entry contains the stream client (always present)
82 // Entries [1] to [MAX_CLIENTS-1] contain the optional clients
83 ClientMap client_map = { };
84 uint8_t handle_counter = 1; // stream client (index == 0) always exists
85 bool shutting_down = false;
86
87private:
88 SideChannel* sc = nullptr;
89 bool use_daq_channel;
90};
91
67bf7c8b
RS
92
93// Ensure to increment both versions simultaneously to maintain consistency
94#ifndef DISABLE_TENANT_ID
2a0742e1 95static constexpr uint8_t HA_MESSAGE_VERSION = 4;
67bf7c8b
RS
96#else
97static constexpr uint8_t HA_MESSAGE_VERSION = 5;
98#endif
728c88e5
MA
99
100// define message size and content constants.
101static constexpr uint8_t KEY_SIZE_IP6 = sizeof(FlowKey);
102// ip4 key is smaller by 2*(ip6_addr_size - ip4_addr_size) or 2 * (16 - 4) = 24
103static constexpr uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24;
0047d0e6
EB
104
105enum
106{
22def9c0
EB
107 KEY_TYPE_IP6 = 1,
108 KEY_TYPE_IP4 = 2
0047d0e6 109};
33ac31c8 110
728c88e5
MA
111static constexpr FlowHAClientHandle SESSION_HA_CLIENT = 0x0000;
112static constexpr uint8_t SESSION_HA_CLIENT_INDEX = 0;
40b54a1f 113
71d8ae0f
EB
114PortBitSet* HighAvailabilityManager::ports = nullptr;
115bool HighAvailabilityManager::use_daq_channel = false;
728c88e5 116
71d8ae0f 117struct timeval FlowHAState::min_session_lifetime;
5e1328c0 118struct timeval FlowHAState::min_sync_interval;
db78ffe8 119
728c88e5 120static THREAD_LOCAL HighAvailability* ha;
db78ffe8 121
33ac31c8 122static inline bool is_ip6_key(const FlowKey* key)
db78ffe8
EB
123{
124 return (key->ip_l[0] || key->ip_l[1] || key->ip_l[2] != htonl(0xFFFF) ||
33ac31c8 125 key->ip_h[0] || key->ip_h[1] || key->ip_h[2] != htonl(0xFFFF));
db78ffe8 126}
40b54a1f 127
71d8ae0f 128FlowHAState::FlowHAState()
40b54a1f 129{
e1b25ca3 130 // Set the initial update time to now+min_session_lifetime
5e1328c0 131 packet_gettimeofday(&next_update);
728c88e5 132 timeradd(&next_update, &min_session_lifetime, &next_update);
71d8ae0f 133}
40b54a1f 134
71d8ae0f
EB
135void FlowHAState::set_pending(FlowHAClientHandle handle)
136{
728c88e5 137 pending |= handle;
71d8ae0f 138}
40b54a1f 139
33ac31c8
EB
140bool FlowHAState::check_pending(FlowHAClientHandle handle)
141{
728c88e5 142 return ((pending & handle) != 0);
33ac31c8
EB
143}
144
145void FlowHAState::clear_pending(FlowHAClientHandle handle)
71d8ae0f 146{
728c88e5 147 pending &= ~handle;
71d8ae0f 148}
40b54a1f 149
33ac31c8 150void FlowHAState::set(uint8_t new_state)
71d8ae0f 151{
5e1328c0 152 state = new_state;
71d8ae0f
EB
153}
154
5e1328c0 155void FlowHAState::add(uint8_t new_state)
71d8ae0f 156{
5e1328c0 157 state |= new_state;
71d8ae0f
EB
158}
159
33ac31c8 160void FlowHAState::clear(uint8_t old_state)
71d8ae0f 161{
5e1328c0 162 state &= ~old_state;
71d8ae0f
EB
163}
164
5e1328c0 165bool FlowHAState::check_any(uint8_t state_mask)
33ac31c8
EB
166{
167 return (state & state_mask) != 0;
168}
169
5e1328c0 170void FlowHAState::config_timers(struct timeval min_lifetime, struct timeval min_interval)
71d8ae0f
EB
171{
172 min_session_lifetime = min_lifetime;
5e1328c0 173 min_sync_interval = min_interval;
71d8ae0f
EB
174}
175
5e1328c0 176bool FlowHAState::sync_interval_elapsed()
71d8ae0f
EB
177{
178 struct timeval pkt_time;
179
180 packet_gettimeofday(&pkt_time);
181
182 return ( ( pkt_time.tv_sec > next_update.tv_sec ) ||
33ac31c8
EB
183 ( ( pkt_time.tv_sec == next_update.tv_sec ) &&
184 ( pkt_time.tv_usec > next_update.tv_usec ) ) );
71d8ae0f
EB
185}
186
63e28351
GK
187void FlowHAState::init_next_update()
188{
189 packet_gettimeofday(&next_update);
190 timeradd(&next_update, &min_session_lifetime, &next_update);
191}
192
71d8ae0f
EB
193void FlowHAState::set_next_update()
194{
728c88e5 195 timeradd(&next_update, &min_sync_interval, &next_update);
71d8ae0f
EB
196}
197
2a230121
RC
198void FlowHAState::reset()
199{
200 state = INITIAL_STATE;
201 pending = NONE_PENDING;
63e28351 202 init_next_update();
2a230121
RC
203}
204
4758ecbe 205FlowHAClient::FlowHAClient(uint8_t length, bool session_client) : max_length(length)
db78ffe8 206{
728c88e5 207 if (!ha)
79e8d40f
RC
208 return;
209
728c88e5 210 if (session_client)
db78ffe8 211 {
728c88e5 212 index = SESSION_HA_CLIENT_INDEX;
fd52a3f3 213 handle = SESSION_HA_CLIENT;
728c88e5 214 ha->client_map[0] = this;
db78ffe8
EB
215 }
216 else
217 {
728c88e5 218 if (ha->handle_counter >= MAX_CLIENTS)
79e8d40f
RC
219 {
220 ErrorMessage("Attempting to register too many FlowHAClients\n");
221 return;
222 }
223
728c88e5
MA
224 index = ha->handle_counter;
225 handle = (1 << (index - 1));
226 ha->client_map[index] = this;
227 ha->handle_counter++;
79e8d40f 228 }
79e8d40f
RC
229}
230
33ac31c8 231// Write the key type, key length, and key into the message.
728c88e5 232// Return the type of key written so it can be stored in the message header.
ab79fd42 233static uint8_t write_flow_key(const Flow& flow, HAMessage& msg)
728c88e5
MA
234{
235 const FlowKey* key = flow.key;
33ac31c8
EB
236 assert(key);
237
728c88e5 238 if (is_ip6_key(flow.key))
33ac31c8 239 {
728c88e5
MA
240 memcpy(msg.cursor, key, KEY_SIZE_IP6);
241 msg.advance_cursor(KEY_SIZE_IP6);
587470f8 242
728c88e5 243 return KEY_TYPE_IP6;
33ac31c8 244 }
33ac31c8 245
728c88e5
MA
246 memcpy(msg.cursor, &key->ip_l[3], sizeof(key->ip_l[3]));
247 msg.advance_cursor(sizeof(key->ip_l[3]));
248 memcpy(msg.cursor, &key->ip_h[3], sizeof(key->ip_h[3]));
249 msg.advance_cursor(sizeof(key->ip_h[3]));
250 memcpy(msg.cursor, ((const uint8_t*) key) + 32, KEY_SIZE_IP4 - 8);
251 msg.advance_cursor(KEY_SIZE_IP4 - 8);
587470f8 252
728c88e5 253 return KEY_TYPE_IP4;
33ac31c8
EB
254}
255
728c88e5
MA
256// Extract the key and return the key length. Position the cursor just after the key.
257static uint8_t read_flow_key(HAMessage& msg, const HAMessageHeader* hdr, FlowKey& key)
33ac31c8 258{
728c88e5 259 if (hdr->key_type == KEY_TYPE_IP6)
33ac31c8 260 {
728c88e5
MA
261 if (!msg.fits(KEY_SIZE_IP6))
262 {
263 ha_stats.truncated_msgs++;
264 return 0;
265 }
266
267 memcpy(&key, msg.cursor, KEY_SIZE_IP6);
268 msg.advance_cursor(KEY_SIZE_IP6);
587470f8 269
22def9c0 270 return KEY_SIZE_IP6;
33ac31c8 271 }
728c88e5 272 else if (hdr->key_type == KEY_TYPE_IP4)
33ac31c8 273 {
728c88e5
MA
274 if (!msg.fits(KEY_SIZE_IP4))
275 {
276 ha_stats.truncated_msgs++;
277 return 0;
278 }
279
33ac31c8 280 /* Lower IPv4 address */
728c88e5
MA
281 memcpy(&key.ip_l[3], msg.cursor, sizeof(key.ip_l[3]));
282 key.ip_l[0] = key.ip_l[1] = 0;
283 key.ip_l[2] = htonl(0xFFFF);
284 msg.advance_cursor(sizeof(key.ip_l[3]));
33ac31c8 285 /* Higher IPv4 address */
728c88e5
MA
286 memcpy(&key.ip_h[3], msg.cursor, sizeof(key.ip_h[3]));
287 key.ip_h[0] = key.ip_h[1] = 0;
288 key.ip_h[2] = htonl(0xFFFF);
289 msg.advance_cursor(sizeof(key.ip_h[3]));
33ac31c8 290 /* The remainder of the key */
728c88e5
MA
291 memcpy(((uint8_t*) &key) + 32, msg.cursor, KEY_SIZE_IP4 - 8);
292 msg.advance_cursor(KEY_SIZE_IP4 - 8);
587470f8 293
22def9c0 294 return KEY_SIZE_IP4;
33ac31c8 295 }
728c88e5
MA
296
297 ha_stats.unknown_key_type++;
298 return 0;
33ac31c8
EB
299}
300
728c88e5 301static inline uint8_t key_size(Flow& flow)
33ac31c8 302{
728c88e5
MA
303 assert(flow.key);
304 return is_ip6_key(flow.key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4;
33ac31c8
EB
305}
306
728c88e5 307static uint16_t calculate_msg_header_length(Flow& flow)
33ac31c8
EB
308{
309 return sizeof(HAMessageHeader) + key_size(flow);
310}
311
312// Calculate the UPDATE message content length based on the
313// set of active clients. The Session client is always present.
728c88e5 314static uint16_t calculate_update_msg_content_length(Flow& flow, bool full)
33ac31c8 315{
728c88e5 316 assert(ha->client_map[0]);
33ac31c8 317
5e1328c0
RC
318 uint16_t length = 0;
319
728c88e5 320 for (int i = 0; i < ha->handle_counter; i++)
8c3010cb
RC
321 {
322 // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck
728c88e5 323 if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1)))
33ac31c8 324 {
728c88e5 325 assert(ha->client_map[i]);
63e28351 326 length += (ha->client_map[i]->get_message_size(flow) + sizeof(HAClientHeader));
33ac31c8 327 }
8c3010cb 328 }
33ac31c8
EB
329
330 return length;
331}
332
333// Write the HA header and key sections. Position the cursor
334// at the beginning of the content section.
ab79fd42 335static void write_msg_header(const Flow& flow, HAEvent event, uint16_t content_length, HAMessage& msg)
33ac31c8 336{
728c88e5
MA
337 HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor;
338 hdr->event = (uint8_t) event;
22def9c0 339 hdr->version = HA_MESSAGE_VERSION;
33ac31c8 340 hdr->total_length = content_length;
728c88e5
MA
341 msg.advance_cursor(sizeof(HAMessageHeader));
342 hdr->key_type = write_flow_key(flow, msg);
33ac31c8
EB
343}
344
ab79fd42 345static uint16_t update_msg_header_length(const HAMessage& msg)
728c88e5
MA
346{
347 HAMessageHeader* hdr = (HAMessageHeader*) msg.buffer;
348 hdr->total_length = msg.cursor_position();
349 return hdr->total_length;
350}
351
352static void write_update_msg_client(FlowHAClient* client, Flow& flow, HAMessage& msg)
33ac31c8 353{
79e8d40f 354 assert(client);
33ac31c8 355
728c88e5
MA
356 if (!msg.fits(sizeof(HAClientHeader)))
357 return;
358
359 // Preemptively insert the client header. If production fails, roll back the message cursor
360 // to its original position.
361 uint8_t* original_cursor = msg.cursor;
362 HAClientHeader* header = (HAClientHeader*) original_cursor;
363 header->client = client->index;
364 msg.advance_cursor(sizeof(HAClientHeader));
365 if (!client->produce(flow, msg))
366 {
367 msg.reset_cursor(original_cursor);
368 return;
369 }
370 assert(msg.cursor >= (original_cursor + sizeof(HAClientHeader)));
371 header->length = (uint32_t) (msg.cursor - original_cursor - sizeof(HAClientHeader));
79e8d40f
RC
372}
373
728c88e5 374static void write_update_msg_content(Flow& flow, HAMessage& msg, bool full)
79e8d40f 375{
728c88e5 376 for (int i = 0; i < ha->handle_counter; i++)
8c3010cb
RC
377 {
378 // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck
728c88e5
MA
379 if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1)))
380 write_update_msg_client(ha->client_map[i], flow, msg);
8c3010cb 381 }
33ac31c8
EB
382}
383
728c88e5 384static void consume_ha_delete_message(HAMessage&, const FlowKey& key)
22def9c0 385{
4c0373da 386 Stream::delete_flow(&key);
22def9c0
EB
387}
388
63e28351 389static Flow* consume_ha_update_message(HAMessage& msg, const FlowKey& key, Packet* p)
22def9c0 390{
22def9c0 391 // flow will be nullptr if/when the session does not exist in the caches
63e28351 392 bool no_flow_found = false;
4c0373da 393 Flow* flow = Stream::get_flow(&key);
728c88e5 394 if (!flow)
63e28351
GK
395 {
396 no_flow_found = true;
728c88e5 397 ha_stats.update_msgs_recv_no_flow++;
63e28351 398 }
22def9c0 399
728c88e5
MA
400 // pointer to one past the last byte in the message
401 const uint8_t* content_end = msg.buffer + msg.buffer_length;
79e8d40f 402
728c88e5 403 while (msg.cursor < content_end)
79e8d40f
RC
404 {
405 // do we have sufficient message left to be able to have an HAClientHeader?
728c88e5 406 if (!msg.fits(sizeof(HAClientHeader)))
79e8d40f
RC
407 {
408 ErrorMessage("Consuming HA Update message - no HAClientHeader\n");
728c88e5 409 ha_stats.truncated_msgs++;
79e8d40f
RC
410 break;
411 }
412
728c88e5
MA
413 HAClientHeader* header = (HAClientHeader*) msg.cursor;
414 if ((header->client >= ha->handle_counter) || (ha->client_map[header->client] == nullptr))
79e8d40f
RC
415 {
416 ErrorMessage("Consuming HA Update message - invalid client index\n");
728c88e5 417 ha_stats.unknown_client_idx++;
79e8d40f
RC
418 break;
419 }
728c88e5 420 msg.advance_cursor(sizeof(HAClientHeader)); // step to the client content
79e8d40f 421
728c88e5 422 if (!msg.fits(header->length))
79e8d40f 423 {
e1b25ca3 424 ErrorMessage("Consuming HA Update message - message too short\n");
728c88e5 425 ha_stats.truncated_msgs++;
79e8d40f
RC
426 break;
427 }
428
5e1328c0
RC
429 // If the Flow does not exist in the caches, flow will be nullptr
430 // upon entry into this message processing loop. Since the session
431 // client is always the first segment of the message, the consume()
432 // invocation for the session client will create the flow. This
433 // flow can in turn be used by subsequent FlowHAClient's.
728c88e5 434 if (!ha->client_map[header->client]->consume(flow, &key, msg, header->length))
79e8d40f
RC
435 {
436 ErrorMessage("Consuming HA Update message - error from client consume()\n");
728c88e5 437 ha_stats.client_consume_errors++;
79e8d40f
RC
438 break;
439 }
440 }
728c88e5
MA
441
442 if (msg.cursor == content_end)
443 ha_stats.update_msgs_consumed++;
444
63e28351
GK
445 if( p && no_flow_found && flow && flow->session )
446 {
caccd0b8 447 p->flow = flow;
63e28351 448 flow->session->setup(p);
774304f6
PB
449 flow->set_direction(p);
450 flow->set_client_initiate(p);
451
452 if (p->is_from_client())
453 {
454 flow->client_intf = p->pkth->ingress_index;
455 flow->server_intf = p->pkth->egress_index;
456 flow->client_group = p->pkth->ingress_group;
457 flow->server_group = p->pkth->egress_group;
458 }
459 else
460 {
461 flow->client_intf = p->pkth->egress_index;
462 flow->server_intf = p->pkth->ingress_index;
463 flow->client_group = p->pkth->egress_group;
464 flow->server_group = p->pkth->ingress_group;
465 }
63e28351
GK
466 }
467
728c88e5 468 return flow;
22def9c0
EB
469}
470
7b248512
NG
471static Flow* consume_ha_message(HAMessage& msg,
472 FlowKey* packet_key = nullptr, Packet* p = nullptr)
22def9c0 473{
728c88e5 474 ha_stats.msgs_recv++;
22def9c0 475
728c88e5
MA
476 if (!msg.fits(sizeof(HAMessageHeader)))
477 {
478 ha_stats.truncated_msgs++;
479 return nullptr;
480 }
22def9c0 481
728c88e5
MA
482 const HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor;
483
484 if (hdr->version != HA_MESSAGE_VERSION)
485 {
486 ha_stats.msg_version_mismatch++;
487 return nullptr;
488 }
489
490 if (hdr->total_length != msg.buffer_length)
491 {
492 ha_stats.msg_length_mismatch++;
493 return nullptr;
494 }
495
496 msg.advance_cursor(sizeof(HAMessageHeader));
497
498 FlowKey key;
499 if (read_flow_key(msg, hdr, key) == 0)
500 return nullptr;
501
2a0742e1 502 if (packet_key and !FlowKey::is_equal(packet_key, &key))
7b248512
NG
503 {
504 ha_stats.key_mismatch++;
505 return nullptr;
506 }
507
728c88e5
MA
508 Flow* flow = nullptr;
509 switch (hdr->event)
22def9c0
EB
510 {
511 case HA_DELETE_EVENT:
512 {
728c88e5
MA
513 consume_ha_delete_message(msg, key);
514 ha_stats.delete_msgs_consumed++;
22def9c0
EB
515 break;
516 }
517 case HA_UPDATE_EVENT:
518 {
63e28351 519 flow = consume_ha_update_message(msg, key, p);
728c88e5 520 ha_stats.update_msgs_recv++;
22def9c0
EB
521 break;
522 }
22def9c0 523 }
728c88e5
MA
524
525 return flow;
526}
527
528static void ha_sc_receive_handler(SCMessage* sc_msg)
529{
530 assert(sc_msg);
531
532 // SC received messages must have reference back to SideChannel object
533 assert(sc_msg->sc);
534
535 HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
536 consume_ha_message(ha_msg);
537
538 sc_msg->sc->discard_message(sc_msg);
22def9c0
EB
539}
540
728c88e5 541HighAvailability::HighAvailability(PortBitSet* ports, bool daq_channel)
71d8ae0f 542{
71d8ae0f 543 using namespace std::placeholders;
71d8ae0f 544
728c88e5
MA
545 // If side channel ports were configured, find the first matching side channel to associate with
546 if (ports != nullptr)
1c8d6d53 547 {
728c88e5
MA
548 for (SCPort port = 0; port < ports->size(); port++)
549 {
550 if (!ports->test(port))
551 continue;
552
553 sc = SideChannelManager::get_side_channel(port);
554 if (sc)
71d8ae0f 555 {
728c88e5
MA
556 // We require a duplex channel
557 if (sc->get_direction() != Connector::CONN_DUPLEX)
33ac31c8 558 {
728c88e5
MA
559 sc = nullptr;
560 continue;
33ac31c8 561 }
728c88e5
MA
562 sc->set_default_port(port);
563 sc->register_receive_handler(ha_sc_receive_handler);
71d8ae0f 564 }
728c88e5
MA
565 break;
566 }
1c8d6d53 567 }
728c88e5 568 use_daq_channel = daq_channel;
40b54a1f
RC
569}
570
571HighAvailability::~HighAvailability()
572{
728c88e5 573 if (sc)
71d8ae0f 574 sc->unregister_receive_handler();
40b54a1f
RC
575}
576
728c88e5 577static void send_sc_update_message(Flow& flow, SideChannel& sc)
40b54a1f 578{
728c88e5
MA
579 const uint16_t header_len = calculate_msg_header_length(flow);
580 const uint16_t content_len = calculate_update_msg_content_length(flow, false);
581
582 SCMessage* sc_msg = sc.alloc_transmit_message((uint32_t) (header_len + content_len));
22def9c0 583 assert(sc_msg);
728c88e5 584 HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
40b54a1f 585
728c88e5
MA
586 write_msg_header(flow, HA_UPDATE_EVENT, header_len + content_len, ha_msg);
587 write_update_msg_content(flow, ha_msg, false);
588 update_msg_header_length(ha_msg);
589 sc.transmit_message(sc_msg);
590}
591
592static void send_daq_update_message(Flow& flow, Packet& p)
593{
594 static THREAD_LOCAL uint8_t daq_io_buffer[UINT16_MAX];
22def9c0 595
728c88e5 596 HAMessage ha_msg(daq_io_buffer, sizeof(daq_io_buffer));
22def9c0 597
728c88e5
MA
598 write_msg_header(flow, HA_UPDATE_EVENT, 0, ha_msg);
599 write_update_msg_content(flow, ha_msg, true);
600 uint32_t len = update_msg_header_length(ha_msg);
601
602 DIOCTL_FlowHAState fhs;
603 fhs.msg = p.daq_msg;
604 fhs.data = daq_io_buffer;
605 fhs.length = len;
606
607 p.daq_instance->ioctl(DIOCTL_SET_FLOW_HA_STATE, &fhs, sizeof(fhs));
608
609 ha_stats.daq_stores++;
40b54a1f
RC
610}
611
728c88e5 612void HighAvailability::process_update(Flow* flow, Packet* p)
40b54a1f 613{
728c88e5 614 if (!flow)
40b54a1f
RC
615 return;
616
5e1328c0 617 // We must have the map array and the session client
728c88e5 618 assert(client_map[0]);
5e1328c0 619
728c88e5 620 if ( !client_map[0]->is_update_required(flow) &&
5e1328c0
RC
621 ( !flow->ha_state->check_pending(ALL_CLIENTS) ||
622 flow->ha_state->check_any(FlowHAState::NEW) ) )
623 return;
624
728c88e5
MA
625 if (sc)
626 send_sc_update_message(*flow, *sc);
33ac31c8 627
728c88e5
MA
628 if (use_daq_channel && p && p->daq_msg)
629 send_daq_update_message(*flow, *p);
5e1328c0
RC
630
631 flow->ha_state->clear(FlowHAState::NEW | FlowHAState::MODIFIED |
632 FlowHAState::MAJOR | FlowHAState::CRITICAL);
633 flow->ha_state->clear_pending(ALL_CLIENTS);
634 flow->ha_state->set_next_update();
db78ffe8
EB
635}
636
728c88e5 637static void send_sc_deletion_message(Flow& flow, SideChannel& sc)
db78ffe8 638{
33ac31c8 639 const uint32_t msg_len = calculate_msg_header_length(flow);
728c88e5
MA
640 SCMessage* sc_msg = sc.alloc_transmit_message(msg_len);
641 HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
33ac31c8
EB
642
643 // No content, only header+key
728c88e5 644 write_msg_header(flow, HA_DELETE_EVENT, msg_len, ha_msg);
33ac31c8 645
728c88e5
MA
646 sc.transmit_message(sc_msg);
647}
648
649void HighAvailability::process_deletion(Flow& flow)
650{
651 // No need to send message if we already have, we are in standby, or
652 // we have just been created and haven't yet sent an update
653 if (flow.ha_state->check_any(FlowHAState::NEW | FlowHAState::DELETED | FlowHAState::STANDBY))
654 return;
655
656 // Only produce deletion messages when using a side channel
657 if (sc)
658 send_sc_deletion_message(flow, *sc);
33ac31c8 659
728c88e5 660 flow.ha_state->add(FlowHAState::DELETED);
db78ffe8 661}
40b54a1f 662
db78ffe8
EB
663void HighAvailability::process_receive()
664{
728c88e5 665 if (sc)
0047d0e6 666 sc->process(DISPATCH_ALL_RECEIVE);
40b54a1f
RC
667}
668
728c88e5 669Flow* HighAvailability::process_daq_import(Packet& p, FlowKey& key)
71d8ae0f 670{
728c88e5
MA
671 Flow* flow = nullptr;
672
673 if (use_daq_channel && p.pkth->flags & DAQ_PKT_FLAG_HA_STATE_AVAIL)
674 {
675 DIOCTL_FlowHAState fhs;
676 fhs.msg = p.daq_msg;
677
678 if (p.daq_instance->ioctl(DIOCTL_GET_FLOW_HA_STATE, &fhs, sizeof(fhs)) == DAQ_SUCCESS)
679 {
680 HAMessage ha_msg(fhs.data, fhs.length);
7b248512 681 flow = consume_ha_message(ha_msg, &key, &p);
728c88e5
MA
682 ha_stats.daq_imports++;
683 // Validate that the imported flow matches up with the given flow key.
684 if (flow)
685 {
0d3b62e3 686 if (Flow::FlowState::INSPECT < flow->flow_state)
728c88e5 687 {
7b248512
NG
688 flow->disable_inspection();
689 p.disable_inspect = true;
728c88e5 690 }
7b248512
NG
691 // Clear the standby bit so that we don't immediately trigger a new data store
692 // FIXIT-L streamline the consume process so this doesn't have to be done here
693 flow->ha_state->clear(FlowHAState::STANDBY);
728c88e5
MA
694 }
695 }
696 }
7902706d 697
728c88e5 698 return flow;
71d8ae0f
EB
699}
700
728c88e5 701void HighAvailabilityManager::reset_config()
71d8ae0f 702{
728c88e5
MA
703 if (ports)
704 {
705 delete ports;
706 ports = nullptr;
707 }
708}
709
710void HighAvailabilityManager::term()
711{
712 reset_config();
713}
714
715// Called within the main thread after the initial configuration has been read
716void HighAvailabilityManager::configure(HighAvailabilityConfig* config)
717{
718 if (!config)
719 {
720 reset_config();
721 return;
722 }
723
724 if (config->ports)
725 ports = new PortBitSet(*config->ports);
726 else if (ports)
727 {
728 delete ports;
729 ports = nullptr;
730 }
731
732 FlowHAState::config_timers(config->min_session_lifetime, config->min_sync_interval);
733
734 use_daq_channel = config->daq_channel;
71d8ae0f
EB
735}
736
737// Called within the packet thread prior to packet processing
40b54a1f
RC
738void HighAvailabilityManager::thread_init()
739{
71d8ae0f 740 // create a a thread local instance iff we are configured to operate.
728c88e5
MA
741 if (ports || use_daq_channel)
742 ha = new HighAvailability(ports, use_daq_channel);
71d8ae0f
EB
743 else
744 ha = nullptr;
40b54a1f
RC
745}
746
19aa43f5
TP
747void HighAvailabilityManager::thread_term_beginning()
748{
728c88e5
MA
749 if (ha)
750 ha->shutting_down = true;
19aa43f5
TP
751}
752
71d8ae0f 753// Called in the packet thread at run-down
40b54a1f
RC
754void HighAvailabilityManager::thread_term()
755{
728c88e5 756 if (ha)
33ac31c8 757 {
71d8ae0f 758 delete ha;
33ac31c8
EB
759 ha = nullptr;
760 }
40b54a1f
RC
761}
762
728c88e5 763void HighAvailabilityManager::process_update(Flow* flow, Packet* p)
db78ffe8 764{
01e1d7d0 765 if (ha && flow && !p->active->get_tunnel_bypass())
728c88e5 766 ha->process_update(flow, p);
db78ffe8
EB
767}
768
769// Deletion messages only contain session content
728c88e5 770void HighAvailabilityManager::process_deletion(Flow& flow)
db78ffe8 771{
728c88e5 772 if (ha && !ha->shutting_down)
db78ffe8
EB
773 ha->process_deletion(flow);
774}
775
776void HighAvailabilityManager::process_receive()
40b54a1f 777{
728c88e5 778 if (ha)
db78ffe8 779 ha->process_receive();
40b54a1f
RC
780}
781
71d8ae0f
EB
782// Called in the packet threads to determine whether or not HA is active
783bool HighAvailabilityManager::active()
784{
785 return (ha != nullptr);
786}
19aa43f5
TP
787
788void HighAvailabilityManager::set_modified(Flow* flow)
789{
728c88e5 790 if (ha && flow && flow->ha_state)
19aa43f5
TP
791 flow->ha_state->add(FlowHAState::MODIFIED);
792}
793
794bool HighAvailabilityManager::in_standby(Flow* flow)
795{
728c88e5 796 if (ha && flow && flow->ha_state)
19aa43f5 797 return flow->ha_state->check_any(FlowHAState::STANDBY);
728c88e5
MA
798
799 return false;
800}
801
802Flow* HighAvailabilityManager::import(Packet& p, FlowKey& key)
803{
804 if (!ha)
805 return nullptr;
806
807 return ha->process_daq_import(p, key);
19aa43f5 808}