From: Michael Altizer (mialtize) Date: Wed, 12 Jun 2019 03:31:44 +0000 (-0400) Subject: Merge pull request #1619 in SNORT/snort3 from ~MIALTIZE/snort3:ha_daq to master X-Git-Tag: 3.0.0-257~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=728c88e59013c67ade9a009f488438bfcf874bcf;p=thirdparty%2Fsnort3.git Merge pull request #1619 in SNORT/snort3 from ~MIALTIZE/snort3:ha_daq to master Squashed commit of the following: commit 5aacc37644226329a02dc2637093c457614b351d Author: Michael Altizer Date: Mon Jun 10 17:43:32 2019 -0400 flow: Implement storing and importing HA data via DAQ IOCTLs This involved significant refactoring of the Flow HA code and added many peg counts to the module. Export FlowHAClient, HighAvailabilityManager, and FlowHAState in flow/ha.h. Specify that HA time parameters are in seconds. The useless HA module unit tests were removed in the process. commit 9fec6bc1993d35969c9aca4198ec0865ef7597e5 Author: Michael Altizer Date: Fri Jun 7 14:32:18 2019 -0400 check: Fix missing semicolons on CHECK calls commit fb6e8988fd3790f54c790110150b965a3abb456b Author: Michael Altizer Date: Tue May 28 12:30:33 2019 -0400 build: Fix unused parameter warnings in unit tests --- diff --git a/src/flow/CMakeLists.txt b/src/flow/CMakeLists.txt index f567ae580..f84e14131 100644 --- a/src/flow/CMakeLists.txt +++ b/src/flow/CMakeLists.txt @@ -3,6 +3,7 @@ set (FLOW_INCLUDES flow.h flow_key.h flow_stash.h + ha.h stash_item.h ) @@ -19,7 +20,6 @@ add_library (flow OBJECT flow_stash.h flow_stash.cc ha.cc - ha.h ha_module.cc ha_module.h prune_stats.h diff --git a/src/flow/dev_notes.txt b/src/flow/dev_notes.txt index 2d110be73..24bdde084 100644 --- a/src/flow/dev_notes.txt +++ b/src/flow/dev_notes.txt @@ -20,27 +20,39 @@ can be freed (via garbage collection) after a reload. There are many flags that may be set on a flow to indicate session tracking state, disposition, etc. +=== High Availability + HighAvailability (ha.cc, ha.h) serves to synchronize session state between high -availabity partners. HighAvailability uses Side Channel Connectors to transmit -and receive messages. The HA side channel must be full duplex (both a -transmit and receive connector). The primary purpose is to exchange session -state with the goal of keeping the session caches in sync. But other clients -may also subscribe to the HA service to exchange additional session data. Each -client uses the FlowHAClient class to subscribe to the service. +availabity partners. The primary purpose is to exchange session state with the +goal of keeping the session caches in sync. Other clients may also register +with the HA service to exchange additional session data by implementing one or +more FlowHAClient classes. + +HA uses Side Channel Connectors and/or DAQ module IOCTLs to transmit and receive +HA state messages. A full duplex Side Channel is required for Side Channel +communications. A DAQ module that supports the IOCTLs for setting and getting +HA state is required for a functional DAQ-backed setup. Modules that do not +support these IOCTLs will silently fail and present no HA state on future +packets in the flow. HA state will be queried from the DAQ module prior to new +flow creation when the appropriate DAQ packet message flag has been set on the +intiating packet. The HA subsystem exchanges two high level message types: - DELETE: Indicate to the partner that a session has neen removed. No -additional HA client status will be exchanged. +additional HA client status will be exchanged. (Not used for DAQ-backed +storage.) - UPDATE: Indicate all other state changes. The message always includes the session state and optionally may include state from other HA clients. +By default, the update messages are incremental. In the case of DAQ-backed +storage, the update messages are always fully formed. The HA subsystem implements these classes: - HighAvailabilityManager - A collection of static elements providing the top-most interface to HA capabilities. - - HAMessage - A wrapper around the actual message (SideChannel only for now) - and includes a cursor for producer/consumer activity. Passed around + - HAMessage - A wrapper around the actual message and includes a cursor and + convenience functions for producer/consumer activity. Passed around among all message handing classes/methods. - - HighAvailability - If HA is enabled instantiated in each packet thread and + - HighAvailability - If HA is enabled, instantiated in each packet thread and provides all primary HA functionality for the thread. Referenced via a THREAD_LOCAL object pointer. - FlowHAState - One per flow and referenced via a pointer in the Flow object. diff --git a/src/flow/flow.cc b/src/flow/flow.cc index a8adde24d..db315555e 100644 --- a/src/flow/flow.cc +++ b/src/flow/flow.cc @@ -79,6 +79,10 @@ Flow::Flow() memset(this, 0, sizeof(*this)); } +Flow::~Flow() +{ + term(); +} void Flow::init(PktType type) { diff --git a/src/flow/flow.h b/src/flow/flow.h index 48ca0df58..9e827f9e9 100644 --- a/src/flow/flow.h +++ b/src/flow/flow.h @@ -93,11 +93,11 @@ #define STREAM_STATE_BLOCK_PENDING 0x4000 class BitOp; -class FlowHAState; class Session; namespace snort { +class FlowHAState; struct FlowKey; class IpsContext; struct Packet; @@ -164,6 +164,7 @@ public: ALLOW }; Flow(); + ~Flow(); Flow(const Flow&) = delete; Flow& operator=(const Flow&) = delete; diff --git a/src/flow/flow_control.cc b/src/flow/flow_control.cc index e6bba57cf..42ecce481 100644 --- a/src/flow/flow_control.cc +++ b/src/flow/flow_control.cc @@ -36,6 +36,7 @@ #include "expect_cache.h" #include "flow_cache.h" +#include "ha.h" #include "session.h" using namespace snort; @@ -362,6 +363,8 @@ bool FlowControl::process(PktType type, Packet* p) FlowKey key; set_key(&key, p); Flow* flow = con.cache->find(&key); + if ( !flow ) + flow = HighAvailabilityManager::import(*p, key); if ( !flow ) { diff --git a/src/flow/ha.cc b/src/flow/ha.cc index 2f9b23c13..6e25cf730 100644 --- a/src/flow/ha.cc +++ b/src/flow/ha.cc @@ -15,7 +15,7 @@ // with this program; if not, write to the Free Software Foundation, Inc., // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //-------------------------------------------------------------------------- -// ha.cc author Ed Borgoyn +// ha.cc authors Ed Borgoyn , Michael Altizer #ifdef HAVE_CONFIG_H #include "config.h" @@ -23,27 +23,76 @@ #include "ha.h" -#include - #include "framework/counts.h" #include "log/messages.h" -#include "profiler/profiler_defs.h" +#include "packet_io/sfdaq_instance.h" +#include "protocols/packet.h" +#include "side_channel/side_channel.h" #include "stream/stream.h" #include "time/packet_time.h" #include "flow.h" #include "flow_key.h" +#include "ha_module.h" using namespace snort; -static const uint8_t HA_MESSAGE_VERSION = 3; +enum HAEvent +{ + HA_DELETE_EVENT = 1, + HA_UPDATE_EVENT = 2 +}; -// define message size and content constants. -static const uint8_t KEY_SIZE_IP6 = sizeof(FlowKey); -// ip4 key is smaller by 2*(ip6-addr-size - ip4-addr-size) or 2*(16 - 4) = 24 -static const uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24; +struct __attribute__((__packed__)) HAMessageHeader +{ + uint8_t event; + uint8_t version; + uint16_t total_length; + uint8_t key_type; +}; + +struct __attribute__((__packed__)) HAClientHeader +{ + uint8_t client; + uint8_t length; +}; + +// One client for each mask bit plus one 'automatic' session client +// client handle = (1<<(client_index-1) +// session client has handle of 0 and index of 0 +static constexpr uint8_t MAX_CLIENTS = 17; -static const suseconds_t USEC_PER_SEC = 1000000; +// HighAvailability is the thread-local state/configuration instantiated for each packet thread. +typedef std::array ClientMap; +class HighAvailability +{ +public: + HighAvailability(PortBitSet*,bool); + ~HighAvailability(); + + void process_update(Flow*, Packet*); + void process_deletion(Flow&); + void process_receive(); + + Flow* process_daq_import(Packet&, FlowKey&); + + // The [0] entry contains the stream client (always present) + // Entries [1] to [MAX_CLIENTS-1] contain the optional clients + ClientMap client_map = { }; + uint8_t handle_counter = 1; // stream client (index == 0) always exists + bool shutting_down = false; + +private: + SideChannel* sc = nullptr; + bool use_daq_channel; +}; + +static constexpr uint8_t HA_MESSAGE_VERSION = 3; + +// define message size and content constants. +static constexpr uint8_t KEY_SIZE_IP6 = sizeof(FlowKey); +// ip4 key is smaller by 2*(ip6_addr_size - ip4_addr_size) or 2 * (16 - 4) = 24 +static constexpr uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24; enum { @@ -51,22 +100,16 @@ enum KEY_TYPE_IP4 = 2 }; -typedef std::array ClientMap; - -THREAD_LOCAL SimpleStats ha_stats; -THREAD_LOCAL ProfileStats ha_perf_stats; +static constexpr FlowHAClientHandle SESSION_HA_CLIENT = 0x0000; +static constexpr uint8_t SESSION_HA_CLIENT_INDEX = 0; -static THREAD_LOCAL HighAvailability* ha; PortBitSet* HighAvailabilityManager::ports = nullptr; bool HighAvailabilityManager::use_daq_channel = false; -THREAD_LOCAL bool HighAvailabilityManager::shutting_down = false; + struct timeval FlowHAState::min_session_lifetime; struct timeval FlowHAState::min_sync_interval; -uint8_t s_handle_counter = 1; // stream client (index == 0) always exists -// The [0] entry contains the stream client (always present) -// Entries [1] to [MAX_CLIENTS-1] contain the optional clients -static THREAD_LOCAL ClientMap* s_client_map; +static THREAD_LOCAL HighAvailability* ha; static inline bool is_ip6_key(const FlowKey* key) { @@ -82,28 +125,22 @@ FlowHAState::FlowHAState() // Set the initial update time to now+min_session_lifetime packet_gettimeofday(&next_update); - next_update.tv_usec += min_session_lifetime.tv_usec; - if (next_update.tv_usec > USEC_PER_SEC) - { - next_update.tv_usec -= USEC_PER_SEC; - next_update.tv_sec++; - } - next_update.tv_sec += min_session_lifetime.tv_sec; + timeradd(&next_update, &min_session_lifetime, &next_update); } void FlowHAState::set_pending(FlowHAClientHandle handle) { - pending |= (uint16_t)handle; + pending |= handle; } bool FlowHAState::check_pending(FlowHAClientHandle handle) { - return ((pending & (uint16_t)handle) != 0); + return ((pending & handle) != 0); } void FlowHAState::clear_pending(FlowHAClientHandle handle) { - pending &= ~((uint16_t)handle); + pending &= ~handle; } void FlowHAState::set(uint8_t new_state) @@ -145,13 +182,7 @@ bool FlowHAState::sync_interval_elapsed() void FlowHAState::set_next_update() { - next_update.tv_usec += min_sync_interval.tv_usec; - if (next_update.tv_usec > USEC_PER_SEC) - { - next_update.tv_usec -= USEC_PER_SEC; - next_update.tv_sec++; - } - next_update.tv_sec += min_sync_interval.tv_sec; + timeradd(&next_update, &min_sync_interval, &next_update); } void FlowHAState::reset() @@ -162,144 +193,128 @@ void FlowHAState::reset() FlowHAClient::FlowHAClient(uint8_t length, bool session_client) { - if ( !s_client_map ) + if (!ha) return; - header.length = length; + max_length = length; - if ( session_client ) + if (session_client) { + index = SESSION_HA_CLIENT_INDEX; handle = SESSION_HA_CLIENT; - header.client = SESSION_HA_CLIENT_INDEX; - (*s_client_map)[0] = this; + ha->client_map[0] = this; } else { - if ( s_handle_counter >= MAX_CLIENTS ) + if (ha->handle_counter >= MAX_CLIENTS) { ErrorMessage("Attempting to register too many FlowHAClients\n"); return; } - header.client = s_handle_counter; - handle = (1 << (s_handle_counter-1)); - (*s_client_map)[s_handle_counter] = this; - s_handle_counter += 1; - } -} - -bool FlowHAClient::fit(HAMessage* msg, uint8_t size) -{ - return ( (int)(msg->cursor - msg->content()) < (int)(msg->content_length() - size) ); -} - -bool FlowHAClient::place(HAMessage* msg, uint8_t* data, uint8_t length) -{ - if ( fit(msg, length) ) - { - memcpy(msg->cursor,data,(size_t)length); - msg->cursor += length; - return true; + index = ha->handle_counter; + handle = (1 << (index - 1)); + ha->client_map[index] = this; + ha->handle_counter++; } - else - return false; } // Write the key type, key length, and key into the message. -// Does not use the message cursor coming in. -// Leave the message cursor just after the key. Return -// the key length. -static uint8_t write_flow_key(Flow* flow, HAMessage* msg) -{ - HAMessageHeader* hdr = (HAMessageHeader*)msg->content(); - msg->cursor = (uint8_t*)hdr + sizeof(HAMessageHeader); - const FlowKey* key = flow->key; +// Return the type of key written so it can be stored in the message header. +static uint8_t write_flow_key(Flow& flow, HAMessage& msg) +{ + const FlowKey* key = flow.key; assert(key); - if ( is_ip6_key(flow->key) ) + if (is_ip6_key(flow.key)) { - hdr->key_type = KEY_TYPE_IP6; - memcpy(msg->cursor, key, KEY_SIZE_IP6); - msg->cursor += KEY_SIZE_IP6; + memcpy(msg.cursor, key, KEY_SIZE_IP6); + msg.advance_cursor(KEY_SIZE_IP6); - return KEY_SIZE_IP6; + return KEY_TYPE_IP6; } - hdr->key_type = KEY_TYPE_IP4; - memcpy(msg->cursor, &key->ip_l[3], sizeof(key->ip_l[3])); - msg->cursor += sizeof(key->ip_l[3]); - memcpy(msg->cursor, &key->ip_h[3], sizeof(key->ip_h[3])); - msg->cursor += sizeof(key->ip_h[3]); - memcpy(msg->cursor, ((const uint8_t*)key) + 32, KEY_SIZE_IP4 - 8); - msg->cursor += KEY_SIZE_IP4 - 8; + memcpy(msg.cursor, &key->ip_l[3], sizeof(key->ip_l[3])); + msg.advance_cursor(sizeof(key->ip_l[3])); + memcpy(msg.cursor, &key->ip_h[3], sizeof(key->ip_h[3])); + msg.advance_cursor(sizeof(key->ip_h[3])); + memcpy(msg.cursor, ((const uint8_t*) key) + 32, KEY_SIZE_IP4 - 8); + msg.advance_cursor(KEY_SIZE_IP4 - 8); - return KEY_SIZE_IP4; + return KEY_TYPE_IP4; } -// Regardless of the message cursor, extract the key and -// return the key length. Position the cursor just after the key. -static uint8_t read_flow_key(FlowKey* key, HAMessage* msg) +// Extract the key and return the key length. Position the cursor just after the key. +static uint8_t read_flow_key(HAMessage& msg, const HAMessageHeader* hdr, FlowKey& key) { - assert(key); - HAMessageHeader* hdr = (HAMessageHeader*)msg->content(); - msg->cursor = (uint8_t*)hdr + sizeof(HAMessageHeader); - - if ( hdr->key_type == KEY_TYPE_IP6 ) + if (hdr->key_type == KEY_TYPE_IP6) { - memcpy(key, msg->cursor, KEY_SIZE_IP6); - msg->cursor += KEY_SIZE_IP6; + if (!msg.fits(KEY_SIZE_IP6)) + { + ha_stats.truncated_msgs++; + return 0; + } + + memcpy(&key, msg.cursor, KEY_SIZE_IP6); + msg.advance_cursor(KEY_SIZE_IP6); return KEY_SIZE_IP6; } - else if ( hdr->key_type == KEY_TYPE_IP4 ) + else if (hdr->key_type == KEY_TYPE_IP4) { + if (!msg.fits(KEY_SIZE_IP4)) + { + ha_stats.truncated_msgs++; + return 0; + } + /* Lower IPv4 address */ - memcpy(&key->ip_l[3], msg->cursor, sizeof(key->ip_l[3])); - key->ip_l[0] = key->ip_l[1] = 0; - key->ip_l[2] = htonl(0xFFFF); - msg->cursor += sizeof(key->ip_l[3]); + memcpy(&key.ip_l[3], msg.cursor, sizeof(key.ip_l[3])); + key.ip_l[0] = key.ip_l[1] = 0; + key.ip_l[2] = htonl(0xFFFF); + msg.advance_cursor(sizeof(key.ip_l[3])); /* Higher IPv4 address */ - memcpy(&key->ip_h[3], msg->cursor, sizeof(key->ip_h[3])); - key->ip_h[0] = key->ip_h[1] = 0; - key->ip_h[2] = htonl(0xFFFF); - msg->cursor += sizeof(key->ip_h[3]); + memcpy(&key.ip_h[3], msg.cursor, sizeof(key.ip_h[3])); + key.ip_h[0] = key.ip_h[1] = 0; + key.ip_h[2] = htonl(0xFFFF); + msg.advance_cursor(sizeof(key.ip_h[3])); /* The remainder of the key */ - memcpy(((uint8_t*)key) + 32, msg->cursor, KEY_SIZE_IP4 - 8); - msg->cursor += KEY_SIZE_IP4 - 8; + memcpy(((uint8_t*) &key) + 32, msg.cursor, KEY_SIZE_IP4 - 8); + msg.advance_cursor(KEY_SIZE_IP4 - 8); return KEY_SIZE_IP4; } - else - return 0; + + ha_stats.unknown_key_type++; + return 0; } -static inline uint8_t key_size(Flow* flow) +static inline uint8_t key_size(Flow& flow) { - assert(flow->key); - return is_ip6_key(flow->key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4; + assert(flow.key); + return is_ip6_key(flow.key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4; } -static uint16_t calculate_msg_header_length(Flow* flow) +static uint16_t calculate_msg_header_length(Flow& flow) { return sizeof(HAMessageHeader) + key_size(flow); } // Calculate the UPDATE message content length based on the // set of active clients. The Session client is always present. -static uint16_t calculate_update_msg_content_length(Flow* flow) +static uint16_t calculate_update_msg_content_length(Flow& flow, bool full) { - assert(s_client_map); - assert((*s_client_map)[0]); + assert(ha->client_map[0]); uint16_t length = 0; - for (int i=0; ihandle_counter; i++) { // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck - if ( (i == 0 ) || flow->ha_state->check_pending(1<<(i-1)) ) + if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1))) { - assert((*s_client_map)[i]); - length += ((*s_client_map)[i]->get_message_size() + sizeof(HAClientHeader)); + assert(ha->client_map[i]); + length += (ha->client_map[i]->get_message_size() + sizeof(HAClientHeader)); } } @@ -308,77 +323,93 @@ static uint16_t calculate_update_msg_content_length(Flow* flow) // Write the HA header and key sections. Position the cursor // at the beginning of the content section. -static void write_msg_header(Flow* flow, HAEvent event, uint16_t content_length, HAMessage* msg) +static void write_msg_header(Flow& flow, HAEvent event, uint16_t content_length, HAMessage& msg) { - HAMessageHeader* hdr = (HAMessageHeader*)msg->content(); - hdr->event = (uint8_t)event; + HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor; + hdr->event = (uint8_t) event; hdr->version = HA_MESSAGE_VERSION; hdr->total_length = content_length; - write_flow_key(flow, msg); // set cursor to just beyond key + msg.advance_cursor(sizeof(HAMessageHeader)); + hdr->key_type = write_flow_key(flow, msg); } -static void write_update_msg_client( FlowHAClient* client, Flow* flow, HAMessage* msg) +static uint16_t update_msg_header_length(HAMessage& msg) +{ + HAMessageHeader* hdr = (HAMessageHeader*) msg.buffer; + hdr->total_length = msg.cursor_position(); + return hdr->total_length; +} + +static void write_update_msg_client(FlowHAClient* client, Flow& flow, HAMessage& msg) { assert(client); - assert(msg); - client->place(msg,(uint8_t*)&(client->header),(uint8_t)sizeof(client->header)); - client->produce(flow, msg); + if (!msg.fits(sizeof(HAClientHeader))) + return; + + // Preemptively insert the client header. If production fails, roll back the message cursor + // to its original position. + uint8_t* original_cursor = msg.cursor; + HAClientHeader* header = (HAClientHeader*) original_cursor; + header->client = client->index; + msg.advance_cursor(sizeof(HAClientHeader)); + if (!client->produce(flow, msg)) + { + msg.reset_cursor(original_cursor); + return; + } + assert(msg.cursor >= (original_cursor + sizeof(HAClientHeader))); + header->length = (uint32_t) (msg.cursor - original_cursor - sizeof(HAClientHeader)); } -static void write_update_msg_content(Flow* flow, HAMessage* msg) +static void write_update_msg_content(Flow& flow, HAMessage& msg, bool full) { - assert(s_client_map); - - for ( int i=0; ihandle_counter; i++) { // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck - if ( (i == 0) || flow->ha_state->check_pending(1<<(i-1)) ) - write_update_msg_client((*s_client_map)[i],flow, msg); + if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1))) + write_update_msg_client(ha->client_map[i], flow, msg); } } -static void consume_receive_delete_message(HAMessage* msg) +static void consume_ha_delete_message(HAMessage&, const FlowKey& key) { - FlowKey key; - (void)read_flow_key(&key, msg); Stream::delete_flow(&key); } -static void consume_receive_update_message(HAMessage* msg) +static Flow* consume_ha_update_message(HAMessage& msg, const FlowKey& key) { - FlowKey key; - (void)read_flow_key(&key, msg); // flow will be nullptr if/when the session does not exist in the caches Flow* flow = Stream::get_flow(&key); + if (!flow) + ha_stats.update_msgs_recv_no_flow++; - assert(s_client_map); - - // pointer to the last byte in the message - uint8_t* content_end = msg->content() + msg->content_length() - 1; + // pointer to one past the last byte in the message + const uint8_t* content_end = msg.buffer + msg.buffer_length; - while( msg->cursor <= content_end ) + while (msg.cursor < content_end) { // do we have sufficient message left to be able to have an HAClientHeader? - if ( (int)(content_end - msg->cursor + 1) < (int)sizeof( HAClientHeader ) ) + if (!msg.fits(sizeof(HAClientHeader))) { ErrorMessage("Consuming HA Update message - no HAClientHeader\n"); + ha_stats.truncated_msgs++; break; } - HAClientHeader* header = (HAClientHeader*)msg->cursor; - msg->cursor += sizeof( HAClientHeader ); // step to the client content - - if ( (header->client >= s_handle_counter) || - ((*s_client_map)[header->client] == nullptr) ) + HAClientHeader* header = (HAClientHeader*) msg.cursor; + if ((header->client >= ha->handle_counter) || (ha->client_map[header->client] == nullptr)) { ErrorMessage("Consuming HA Update message - invalid client index\n"); + ha_stats.unknown_client_idx++; break; } + msg.advance_cursor(sizeof(HAClientHeader)); // step to the client content - if ( (content_end - msg->cursor + 1) < header->length ) + if (!msg.fits(header->length)) { ErrorMessage("Consuming HA Update message - message too short\n"); + ha_stats.truncated_msgs++; break; } @@ -387,121 +418,172 @@ static void consume_receive_update_message(HAMessage* msg) // client is always the first segment of the message, the consume() // invocation for the session client will create the flow. This // flow can in turn be used by subsequent FlowHAClient's. - if ( !(*s_client_map)[header->client]->consume(flow,&key,msg) ) + if (!ha->client_map[header->client]->consume(flow, &key, msg, header->length)) { ErrorMessage("Consuming HA Update message - error from client consume()\n"); + ha_stats.client_consume_errors++; break; } } + + if (msg.cursor == content_end) + ha_stats.update_msgs_consumed++; + + return flow; } -static void consume_receive_message(HAMessage* msg) +static Flow* consume_ha_message(HAMessage& msg) { - HAMessageHeader* hdr = (HAMessageHeader*)msg->content(); + ha_stats.msgs_recv++; - if ( hdr->version != HA_MESSAGE_VERSION) - return; + if (!msg.fits(sizeof(HAMessageHeader))) + { + ha_stats.truncated_msgs++; + return nullptr; + } - switch ( hdr->event ) + const HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor; + + if (hdr->version != HA_MESSAGE_VERSION) + { + ha_stats.msg_version_mismatch++; + return nullptr; + } + + if (hdr->total_length != msg.buffer_length) + { + ha_stats.msg_length_mismatch++; + return nullptr; + } + + msg.advance_cursor(sizeof(HAMessageHeader)); + + FlowKey key; + if (read_flow_key(msg, hdr, key) == 0) + return nullptr; + + Flow* flow = nullptr; + switch (hdr->event) { case HA_DELETE_EVENT: { - consume_receive_delete_message(msg); + consume_ha_delete_message(msg, key); + ha_stats.delete_msgs_consumed++; break; } case HA_UPDATE_EVENT: { - consume_receive_update_message(msg); + flow = consume_ha_update_message(msg, key); + ha_stats.update_msgs_recv++; break; } - default: - break; } + + return flow; +} + +static void ha_sc_receive_handler(SCMessage* sc_msg) +{ + assert(sc_msg); + + // SC received messages must have reference back to SideChannel object + assert(sc_msg->sc); + + HAMessage ha_msg(sc_msg->content, sc_msg->content_length); + consume_ha_message(ha_msg); + + sc_msg->sc->discard_message(sc_msg); } -HighAvailability::HighAvailability(PortBitSet* ports, bool) +HighAvailability::HighAvailability(PortBitSet* ports, bool daq_channel) { using namespace std::placeholders; - // If we have ports, configure the side channel - if ( ports != nullptr ) + // If side channel ports were configured, find the first matching side channel to associate with + if (ports != nullptr) { - for ( SCPort port = 0; port < ports->size(); port++ ) - if ( ports->test(port) ) + for (SCPort port = 0; port < ports->size(); port++) + { + if (!ports->test(port)) + continue; + + sc = SideChannelManager::get_side_channel(port); + if (sc) { - sc = SideChannelManager::get_side_channel(port); - if (sc) + // We require a duplex channel + if (sc->get_direction() != Connector::CONN_DUPLEX) { - // We need a duplex channel - if (sc->get_direction() != Connector::CONN_DUPLEX) - { - // Otherwise indicate that we don't have a sidechannel - sc = nullptr; - break; - } - sc->set_default_port(port); - sc->register_receive_handler( - std::bind(&HighAvailability::receive_handler, this, _1)); + sc = nullptr; + continue; } - break; + sc->set_default_port(port); + sc->register_receive_handler(ha_sc_receive_handler); } + break; + } } - s_client_map = new ClientMap; - for ( int i=0; iunregister_receive_handler(); - } - - delete s_client_map; } -void HighAvailability::receive_handler(SCMessage* sc_msg) +static void send_sc_update_message(Flow& flow, SideChannel& sc) { + const uint16_t header_len = calculate_msg_header_length(flow); + const uint16_t content_len = calculate_update_msg_content_length(flow, false); + + SCMessage* sc_msg = sc.alloc_transmit_message((uint32_t) (header_len + content_len)); assert(sc_msg); + HAMessage ha_msg(sc_msg->content, sc_msg->content_length); - // SC received messages must have reference back to SideChannel object - assert(sc_msg->sc); + write_msg_header(flow, HA_UPDATE_EVENT, header_len + content_len, ha_msg); + write_update_msg_content(flow, ha_msg, false); + update_msg_header_length(ha_msg); + sc.transmit_message(sc_msg); +} + +static void send_daq_update_message(Flow& flow, Packet& p) +{ + static THREAD_LOCAL uint8_t daq_io_buffer[UINT16_MAX]; - HAMessage ha_msg(sc_msg); - consume_receive_message(&ha_msg); + HAMessage ha_msg(daq_io_buffer, sizeof(daq_io_buffer)); - sc_msg->sc->discard_message(sc_msg); + write_msg_header(flow, HA_UPDATE_EVENT, 0, ha_msg); + write_update_msg_content(flow, ha_msg, true); + uint32_t len = update_msg_header_length(ha_msg); + + DIOCTL_FlowHAState fhs; + fhs.msg = p.daq_msg; + fhs.data = daq_io_buffer; + fhs.length = len; + + p.daq_instance->ioctl(DIOCTL_SET_FLOW_HA_STATE, &fhs, sizeof(fhs)); + + ha_stats.daq_stores++; } -void HighAvailability::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr) +void HighAvailability::process_update(Flow* flow, Packet* p) { - // Only looking for side channel processing - FIXIT-H - UNUSED(pkthdr); // until we add DAQ communications channel - if ( !sc || !flow ) + if (!flow) return; // We must have the map array and the session client - assert(s_client_map); - assert((*s_client_map)[0]); + assert(client_map[0]); - if ( !(*s_client_map)[0]->is_update_required(flow) && + if ( !client_map[0]->is_update_required(flow) && ( !flow->ha_state->check_pending(ALL_CLIENTS) || flow->ha_state->check_any(FlowHAState::NEW) ) ) return; - const uint16_t header_len = calculate_msg_header_length(flow); - const uint16_t content_len = calculate_update_msg_content_length(flow); - - SCMessage* sc_msg = sc->alloc_transmit_message((uint32_t)(header_len+content_len)); - assert(sc_msg); - HAMessage ha_msg(sc_msg); + if (sc) + send_sc_update_message(*flow, *sc); - write_msg_header(flow, HA_UPDATE_EVENT, content_len, &ha_msg); - write_update_msg_content(flow, &ha_msg); - sc->transmit_message(sc_msg); + if (use_daq_channel && p && p->daq_msg) + send_daq_update_message(*flow, *p); flow->ha_state->clear(FlowHAState::NEW | FlowHAState::MODIFIED | FlowHAState::MAJOR | FlowHAState::CRITICAL); @@ -509,95 +591,148 @@ void HighAvailability::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr) flow->ha_state->set_next_update(); } -void HighAvailability::process_deletion(Flow* flow) +static void send_sc_deletion_message(Flow& flow, SideChannel& sc) { - // No need to send message if we already have, we are in standby, or - // we have just been created and haven't yet sent an update - if ( flow->ha_state->check_any(FlowHAState::NEW | - FlowHAState::DELETED | - FlowHAState::STANDBY)) - return; - - // Deletion messages only use the side channel - if ( !sc ) - return; - const uint32_t msg_len = calculate_msg_header_length(flow); - SCMessage* sc_msg = sc->alloc_transmit_message(msg_len); - HAMessage ha_msg(sc_msg); + SCMessage* sc_msg = sc.alloc_transmit_message(msg_len); + HAMessage ha_msg(sc_msg->content, sc_msg->content_length); // No content, only header+key - write_msg_header(flow, HA_DELETE_EVENT, 0, &ha_msg); + write_msg_header(flow, HA_DELETE_EVENT, msg_len, ha_msg); - sc->transmit_message(sc_msg); + sc.transmit_message(sc_msg); +} + +void HighAvailability::process_deletion(Flow& flow) +{ + // No need to send message if we already have, we are in standby, or + // we have just been created and haven't yet sent an update + if (flow.ha_state->check_any(FlowHAState::NEW | FlowHAState::DELETED | FlowHAState::STANDBY)) + return; + + // Only produce deletion messages when using a side channel + if (sc) + send_sc_deletion_message(flow, *sc); - flow->ha_state->add(FlowHAState::DELETED); + flow.ha_state->add(FlowHAState::DELETED); } void HighAvailability::process_receive() { - if ( sc != nullptr ) + if (sc) sc->process(DISPATCH_ALL_RECEIVE); } -// Called by the configuration parsing activity in the main thread. -bool HighAvailabilityManager::instantiate(PortBitSet* mod_ports, bool mod_use_daq_channel, - struct timeval* min_session_lifetime, struct timeval* min_sync_interval) +Flow* HighAvailability::process_daq_import(Packet& p, FlowKey& key) { - ports = mod_ports; - FlowHAState::config_timers(*min_session_lifetime, *min_sync_interval); - use_daq_channel = mod_use_daq_channel; + Flow* flow = nullptr; + + if (use_daq_channel && p.pkth->flags & DAQ_PKT_FLAG_HA_STATE_AVAIL) + { + DIOCTL_FlowHAState fhs; + fhs.msg = p.daq_msg; + + if (p.daq_instance->ioctl(DIOCTL_GET_FLOW_HA_STATE, &fhs, sizeof(fhs)) == DAQ_SUCCESS) + { + HAMessage ha_msg(fhs.data, fhs.length); + flow = consume_ha_message(ha_msg); + ha_stats.daq_imports++; + // Validate that the imported flow matches up with the given flow key. + if (flow) + { + if (FlowKey::compare(&key, flow->key, 0) == 0) + { + // Clear the standby bit so that we don't immediately trigger a new data store + // FIXIT-L streamline the consume process so this doesn't have to be done here + flow->ha_state->clear(FlowHAState::STANDBY); + } + else + flow = nullptr; + } + } + } - return true; + return flow; } -// Called prior to the starts of configuration in the main thread. -void HighAvailabilityManager::pre_config_init() +void HighAvailabilityManager::reset_config() { - ports = nullptr; + if (ports) + { + delete ports; + ports = nullptr; + } +} + +void HighAvailabilityManager::term() +{ + reset_config(); +} + +// Called within the main thread after the initial configuration has been read +void HighAvailabilityManager::configure(HighAvailabilityConfig* config) +{ + if (!config) + { + reset_config(); + return; + } + + if (config->ports) + ports = new PortBitSet(*config->ports); + else if (ports) + { + delete ports; + ports = nullptr; + } + + FlowHAState::config_timers(config->min_session_lifetime, config->min_sync_interval); + + use_daq_channel = config->daq_channel; } // Called within the packet thread prior to packet processing void HighAvailabilityManager::thread_init() { // create a a thread local instance iff we are configured to operate. - if ( (ports != nullptr) || use_daq_channel ) - ha = new HighAvailability(ports,use_daq_channel); + if (ports || use_daq_channel) + ha = new HighAvailability(ports, use_daq_channel); else ha = nullptr; } void HighAvailabilityManager::thread_term_beginning() { - shutting_down = true; + if (ha) + ha->shutting_down = true; } // Called in the packet thread at run-down void HighAvailabilityManager::thread_term() { - if ( ha != nullptr ) + if (ha) { delete ha; ha = nullptr; } } -void HighAvailabilityManager::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr) +void HighAvailabilityManager::process_update(Flow* flow, Packet* p) { - if ( (ha != nullptr) && (pkthdr != nullptr) && (flow != nullptr) ) - ha->process_update(flow,pkthdr); + if (ha && p && flow) + ha->process_update(flow, p); } // Deletion messages only contain session content -void HighAvailabilityManager::process_deletion(Flow* flow) +void HighAvailabilityManager::process_deletion(Flow& flow) { - if ( (ha != nullptr) && !shutting_down ) + if (ha && !ha->shutting_down) ha->process_deletion(flow); } void HighAvailabilityManager::process_receive() { - if ( ha != nullptr ) + if (ha) ha->process_receive(); } @@ -609,14 +744,22 @@ bool HighAvailabilityManager::active() void HighAvailabilityManager::set_modified(Flow* flow) { - if ( (ha != nullptr) && (flow != nullptr) && (flow->ha_state != nullptr) ) + if (ha && flow && flow->ha_state) flow->ha_state->add(FlowHAState::MODIFIED); } bool HighAvailabilityManager::in_standby(Flow* flow) { - if ( (ha != nullptr) && (flow != nullptr) && (flow->ha_state != nullptr) ) + if (ha && flow && flow->ha_state) return flow->ha_state->check_any(FlowHAState::STANDBY); - else - return false; + + return false; +} + +Flow* HighAvailabilityManager::import(Packet& p, FlowKey& key) +{ + if (!ha) + return nullptr; + + return ha->process_daq_import(p, key); } diff --git a/src/flow/ha.h b/src/flow/ha.h index 406b14107..47525d279 100644 --- a/src/flow/ha.h +++ b/src/flow/ha.h @@ -22,48 +22,44 @@ #include +#include + +#include "framework/bits.h" #include "main/thread.h" -#include "side_channel/side_channel.h" //------------------------------------------------------------------------- +struct HighAvailabilityConfig; + namespace snort { class Flow; struct FlowKey; -} +struct Packet; +struct ProfileStats; // The FlowHAHandle is the dynamically allocated index used uniquely identify // the client. Used both in the API and HA messages. // Handle 0 is defined to be the primary session client. // NOTE: The type, masks, and count values must be in sync, typedef uint16_t FlowHAClientHandle; -const FlowHAClientHandle SESSION_HA_CLIENT = 0x0000; -const uint8_t SESSION_HA_CLIENT_INDEX = 0; -const FlowHAClientHandle ALL_CLIENTS = 0xffff; -// One client for each mask bit plus one 'automatic' session client -// client handle = (1<<(client_index-1) -// session client has handle of 0 and index of 0 -const uint8_t MAX_CLIENTS = 17; - -enum HAEvent -{ - HA_DELETE_EVENT = 1, - HA_UPDATE_EVENT = 2 -}; +constexpr FlowHAClientHandle ALL_CLIENTS = 0xffff; // Each active flow will have an associated FlowHAState instance. -class FlowHAState +class SO_PUBLIC FlowHAState { public: - static const uint8_t CRITICAL = 0x80; - static const uint8_t MAJOR = 0x40; - - static const uint8_t NEW = 0x01; - static const uint8_t MODIFIED = 0x02; - static const uint8_t DELETED = 0x04; - static const uint8_t STANDBY = 0x08; - static const uint8_t NEW_SESSION = 0x10; + static constexpr uint8_t CRITICAL = 0x80; + static constexpr uint8_t MAJOR = 0x40; + + enum : uint8_t + { + NEW = 0x01, + MODIFIED = 0x02, + DELETED = 0x04, + STANDBY = 0x08, + NEW_SESSION = 0x10, + }; FlowHAState(); @@ -74,123 +70,108 @@ public: void add(uint8_t state); void clear(uint8_t state); bool check_any(uint8_t state); - static void config_timers(timeval,timeval); + static void config_timers(struct timeval, struct timeval); bool sync_interval_elapsed(); void set_next_update(); void reset(); private: - static const uint8_t INITIAL_STATE = 0x00; - static const uint16_t NONE_PENDING = 0x0000; - + static constexpr uint8_t INITIAL_STATE = 0x00; + static constexpr uint16_t NONE_PENDING = 0x0000; static struct timeval min_session_lifetime; static struct timeval min_sync_interval; - uint8_t state; - uint16_t pending; - struct timeval next_update; -}; - -struct __attribute__((__packed__)) HAMessageHeader -{ - uint8_t event; - uint8_t version; - uint16_t total_length; - uint8_t key_type; -}; -struct __attribute__((__packed__)) HAClientHeader -{ - uint8_t client; - uint8_t length; + struct timeval next_update; + uint16_t pending; + uint8_t state; }; // Describe the message being produced or consumed. class HAMessage { public: - HAMessage(SCMessage* msg) - { sc_msg = msg; } - - uint8_t* content() - { return sc_msg->content; } - uint16_t content_length() - { return sc_msg->content_length; } + HAMessage(uint8_t* buffer, uint32_t buffer_length) : + buffer(buffer), buffer_length(buffer_length), cursor(buffer) { } + + bool fits(uint32_t size) const { return size <= (buffer_length - (cursor - buffer)); } + + void advance_cursor(uint32_t size) + { + assert(fits(size)); + cursor += size; + } + + void reset_cursor(uint8_t* pos = nullptr) + { + if (pos) + { + assert(pos >= buffer && pos <= (buffer + buffer_length)); + cursor = pos; + } + else + cursor = buffer; + } + + uint32_t cursor_position() const { return (uint32_t) (cursor - buffer); } + + uint8_t* buffer; + const uint32_t buffer_length; uint8_t* cursor; - -private: - SCMessage* sc_msg; }; // A FlowHAClient subclass for each producer/consumer of flow HA data -class FlowHAClient +class SO_PUBLIC FlowHAClient { public: virtual ~FlowHAClient() = default; - virtual bool consume(snort::Flow*&, snort::FlowKey*, HAMessage*) { return false; } - virtual bool produce(snort::Flow*, HAMessage*) { return false; } + virtual bool consume(snort::Flow*&, const snort::FlowKey*, snort::HAMessage&, uint8_t size) = 0; + virtual bool produce(snort::Flow&, snort::HAMessage&) = 0; virtual bool is_update_required(snort::Flow*) { return false; } - virtual bool is_delete_required(snort::Flow*) { return false; } - uint8_t get_message_size() { return header.length; } - bool fit(HAMessage*, uint8_t); - bool place(HAMessage*, uint8_t*, uint8_t); + uint8_t get_message_size() { return max_length; } + FlowHAClientHandle handle; // Actual handle for the instance - HAClientHeader header; + uint8_t index; + uint8_t max_length; protected: - FlowHAClient(uint8_t, bool); - -}; - -// HighAvailability is instantiated for each packet-thread. -// FIXIT-M make the SideChannel the THREAD_LOCAL element and collapse -// into HighAvailabilityManager -class HighAvailability -{ -public: - HighAvailability(PortBitSet*,bool); - ~HighAvailability(); - - void process_update(snort::Flow*, const DAQ_PktHdr_t*); - void process_deletion(snort::Flow*); - void process_receive(); - -private: - void receive_handler(SCMessage*); - SideChannel* sc = nullptr; + FlowHAClient(uint8_t length, bool session_client); }; // Top level management of HighAvailability components. -class HighAvailabilityManager +class SO_PUBLIC HighAvailabilityManager { public: - // Prior to parsing configuration - static void pre_config_init(); - - // Invoked by the module configuration parsing to create HA instance - static bool instantiate(PortBitSet*,bool,struct timeval*,struct timeval*); + static void configure(HighAvailabilityConfig*); static void thread_init(); static void thread_term_beginning(); // thread is about to be terminated static void thread_term(); + static void term(); // true if we are configured and able to process static bool active(); - // Within the packet callback, analyze the packet and flow for potential update messages - static void process_update(snort::Flow*, const DAQ_PktHdr_t*); + // Within packet processing, analyze the packet and flow for potential update messages + static void process_update(snort::Flow*, snort::Packet*); // Anytime a flow is deleted, potentially generate a deletion message - static void process_deletion(snort::Flow*); + static void process_deletion(snort::Flow&); // Look for and dispatch receive messages. static void process_receive(); static void set_modified(snort::Flow*); static bool in_standby(snort::Flow*); + // Attempt to import HA data from the Packet + static Flow* import(snort::Packet& p, snort::FlowKey& key); + private: + static void reset_config(); + HighAvailabilityManager() = delete; static bool use_daq_channel; static PortBitSet* ports; - static THREAD_LOCAL bool shutting_down; }; +} + #endif diff --git a/src/flow/ha_module.cc b/src/flow/ha_module.cc index f9f6ba661..fb5017036 100644 --- a/src/flow/ha_module.cc +++ b/src/flow/ha_module.cc @@ -27,8 +27,8 @@ #include #include "log/messages.h" - -#include "ha.h" +#include "main/snort_config.h" +#include "profiler/profiler_defs.h" using namespace snort; @@ -48,14 +48,37 @@ static const Parameter ha_params[] = "side channel message port list" }, { "min_age", Parameter::PT_REAL, "0.0:100.0", "1.0", - "minimum session life before HA updates" }, + "minimum session life in seconds before HA updates" }, - { "min_sync", Parameter::PT_REAL, "0.0:100.0", "1.0", - "minimum interval between HA updates" }, + { "min_sync", Parameter::PT_REAL, "0.0:100.0", "0.1", + "minimum interval in seconds between HA updates" }, { nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr } }; +static const PegInfo ha_pegs[] = +{ + { CountType::SUM, "msgs_recv", "total messages received" }, + { CountType::SUM, "update_msgs_recv", "update messages received" }, + { CountType::SUM, "update_msgs_recv_no_flow", "update messages received without a local flow" }, + { CountType::SUM, "update_msgs_consumed", "update messages fully consumed" }, + { CountType::SUM, "delete_msgs_consumed", "deletion messages consumed" }, + { CountType::SUM, "daq_stores", "states stored via daq" }, + { CountType::SUM, "daq_imports", "states imported via daq" }, + { CountType::SUM, "msg_version_mismatch", "messages received with a version mismatch" }, + { CountType::SUM, "msg_length_mismatch", "messages received with an inconsistent total length" }, + { CountType::SUM, "truncated_msgs", "truncated messages received" }, + { CountType::SUM, "unknown_key_type", "messages received with an unknown flow key type" }, + { CountType::SUM, "unknown_client_idx", "messages received with an unknown client index" }, + { CountType::SUM, "client_consume_errors", "client data consume failure count" }, + { CountType::END, nullptr, nullptr } +}; + +THREAD_LOCAL HAStats ha_stats; +THREAD_LOCAL ProfileStats ha_perf_stats; + +//------------------------------------------------------------------------- + static void convert_real_seconds_to_timeval(double seconds, struct timeval* tv) { double whole = trunc(seconds); @@ -67,42 +90,61 @@ static void convert_real_seconds_to_timeval(double seconds, struct timeval* tv) HighAvailabilityModule::HighAvailabilityModule() : Module(HA_NAME, HA_HELP, ha_params) { - config.enabled = false; - config.daq_channel = false; - config.ports = nullptr; - convert_real_seconds_to_timeval(1.0, &config.min_session_lifetime); - convert_real_seconds_to_timeval(0.1, &config.min_sync_interval); + config = nullptr; } HighAvailabilityModule::~HighAvailabilityModule() { - delete config.ports; + if (config) + delete config; +} + +const PegInfo* HighAvailabilityModule::get_pegs() const +{ + return ha_pegs; +} + +PegCount* HighAvailabilityModule::get_counts() const +{ + return (PegCount*) &ha_stats; } ProfileStats* HighAvailabilityModule::get_profile() const -{ return &ha_perf_stats; } +{ + return &ha_perf_stats; +} + +bool HighAvailabilityModule::begin(const char*, int, SnortConfig*) +{ + assert(!config); + config = new HighAvailabilityConfig(); + + return true; +} bool HighAvailabilityModule::set(const char*, Value& v, SnortConfig*) { if ( v.is("enable") ) - config.enabled = v.get_bool(); - + { + config->enabled = v.get_bool(); + } else if ( v.is("daq_channel") ) - config.daq_channel = v.get_bool(); - + { + config->daq_channel = v.get_bool(); + } else if ( v.is("ports") ) { - if ( !config.ports ) - config.ports = new PortBitSet; - v.get_bits(*(config.ports) ); + if ( !config->ports ) + config->ports = new PortBitSet; + v.get_bits(*(config->ports)); } else if ( v.is("min_age") ) { - convert_real_seconds_to_timeval(v.get_real(), &config.min_session_lifetime); + convert_real_seconds_to_timeval(v.get_real(), &config->min_session_lifetime); } else if ( v.is("min_sync") ) { - convert_real_seconds_to_timeval(v.get_real(), &config.min_sync_interval); + convert_real_seconds_to_timeval(v.get_real(), &config->min_sync_interval); } else return false; @@ -110,20 +152,13 @@ bool HighAvailabilityModule::set(const char*, Value& v, SnortConfig*) return true; } -bool HighAvailabilityModule::begin(const char*, int, SnortConfig*) -{ - return true; -} - -bool HighAvailabilityModule::end(const char*, int, SnortConfig*) +bool HighAvailabilityModule::end(const char*, int, SnortConfig* sc) { - if ( config.enabled && - !HighAvailabilityManager::instantiate(config.ports, config.daq_channel, - &config.min_session_lifetime, &config.min_sync_interval) ) - { - ParseWarning(WARN_CONF, "Illegal HighAvailability configuration"); - return false; - } + if ( config->enabled ) + sc->ha_config = config; + else + delete config; + config = nullptr; return true; } diff --git a/src/flow/ha_module.h b/src/flow/ha_module.h index 65256e0bc..6b5a62a8f 100644 --- a/src/flow/ha_module.h +++ b/src/flow/ha_module.h @@ -31,6 +31,8 @@ struct HighAvailabilityConfig { + ~HighAvailabilityConfig() { delete ports; } + bool enabled; bool daq_channel; PortBitSet* ports = nullptr; @@ -38,24 +40,18 @@ struct HighAvailabilityConfig struct timeval min_sync_interval; }; -extern THREAD_LOCAL SimpleStats ha_stats; -extern THREAD_LOCAL snort::ProfileStats ha_perf_stats; - class HighAvailabilityModule : public snort::Module { public: HighAvailabilityModule(); ~HighAvailabilityModule() override; - bool set(const char*, snort::Value&, snort::SnortConfig*) override; bool begin(const char*, int, snort::SnortConfig*) override; + bool set(const char*, snort::Value&, snort::SnortConfig*) override; bool end(const char*, int, snort::SnortConfig*) override; - PegCount* get_counts() const override - { return (PegCount*)&ha_stats; } - - const PegInfo* get_pegs() const override - { return snort::simple_pegs; } + const PegInfo* get_pegs() const override; + PegCount* get_counts() const override; snort::ProfileStats* get_profile() const override; @@ -63,8 +59,28 @@ public: { return GLOBAL; } private: - HighAvailabilityConfig config; + HighAvailabilityConfig* config; }; +struct HAStats +{ + PegCount msgs_recv; + PegCount update_msgs_recv; + PegCount update_msgs_recv_no_flow; + PegCount update_msgs_consumed; + PegCount delete_msgs_consumed; + PegCount daq_stores; + PegCount daq_imports; + PegCount msg_version_mismatch; + PegCount msg_length_mismatch; + PegCount truncated_msgs; + PegCount unknown_key_type; + PegCount unknown_client_idx; + PegCount client_consume_errors; +}; + +extern THREAD_LOCAL HAStats ha_stats; +extern THREAD_LOCAL snort::ProfileStats ha_perf_stats; + #endif diff --git a/src/flow/test/CMakeLists.txt b/src/flow/test/CMakeLists.txt index a94b730cc..819801eb9 100644 --- a/src/flow/test/CMakeLists.txt +++ b/src/flow/test/CMakeLists.txt @@ -1,15 +1,4 @@ -add_cpputest( ha_test - SOURCES ../ha.cc -) - -add_cpputest( ha_module_test - SOURCES - ../ha_module.cc - ../../framework/module.cc - ../../framework/value.cc - ../../sfip/sf_ip.cc - $ -) +add_cpputest( ha_test ) add_cpputest( flow_stash_test SOURCES ../flow_stash.cc diff --git a/src/flow/test/flow_stash_test.cc b/src/flow/test/flow_stash_test.cc index 5fde609de..74f554e6d 100644 --- a/src/flow/test/flow_stash_test.cc +++ b/src/flow/test/flow_stash_test.cc @@ -94,13 +94,13 @@ void DataBus::subscribe(const char* key, DataHandler* h) { DB->_subscribe(key, h); } -void DataBus::subscribe_default(const char* key, DataHandler* h, SnortConfig* sc) +void DataBus::subscribe_default(const char* key, DataHandler* h, SnortConfig*) { DB->_subscribe(key, h); } void DataBus::unsubscribe(const char*, DataHandler*) {} -void DataBus::unsubscribe_default(const char*, DataHandler*, SnortConfig* sc) {} +void DataBus::unsubscribe_default(const char*, DataHandler*, SnortConfig*) {} void DataBus::publish(const char* key, DataEvent& e, Flow* f) { diff --git a/src/flow/test/ha_module_test.cc b/src/flow/test/ha_module_test.cc deleted file mode 100644 index 81b706607..000000000 --- a/src/flow/test/ha_module_test.cc +++ /dev/null @@ -1,177 +0,0 @@ -//-------------------------------------------------------------------------- -// Copyright (C) 2015-2019 Cisco and/or its affiliates. All rights reserved. -// -// This program is free software; you can redistribute it and/or modify it -// under the terms of the GNU General Public License Version 2 as published -// by the Free Software Foundation. You may not use, modify or distribute -// this program under any other version of the GNU General Public License. -// -// This program is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// General Public License for more details. -// -// You should have received a copy of the GNU General Public License along -// with this program; if not, write to the Free Software Foundation, Inc., -// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -//-------------------------------------------------------------------------- - -// ha_module_test.cc author Ed Borgoyn -// unit test main - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "flow/ha.h" -#include "flow/ha_module.h" - -#include "log/messages.h" -#include "main/snort_debug.h" -#include "profiler/profiler.h" - -#include -#include - -using namespace snort; - -THREAD_LOCAL SimpleStats ha_stats; -THREAD_LOCAL ProfileStats ha_perf_stats; - -void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { } -void show_stats(PegCount*, const PegInfo*, IndexVec&, const char*) { } -void show_stats(PegCount*, const PegInfo*, IndexVec&, const char*, FILE*) { } - -namespace snort -{ -void LogMessage(const char*,...) { } -void ParseWarning(WarningGroup, const char*, ...) { } -char* snort_strdup(const char* str) { return strdup(str); } -} - -static bool s_port_1_set = false; -static bool s_use_daq = false; -static bool s_instantiate_called = false; - -static char* make_bit_string(int bit) -{ - static char bit_string[65537]; - for ( int i=0; i<65536; i++) - bit_string[i] = (bit == i) ? '1' : '0'; - - bit_string[65536] = '\0'; - - return bit_string; -} - -bool HighAvailabilityManager::instantiate(PortBitSet* mod_ports, bool mod_use_daq_channel, struct timeval*, struct timeval*) -{ - s_instantiate_called = true; - s_port_1_set = mod_ports->test(1); - s_use_daq = mod_use_daq_channel; - - return true; -} - -TEST_GROUP(high_availability_module_test) -{ - void setup() override - { - } - - void teardown() override - { - } -}; - -TEST(high_availability_module_test, test_ha_valid) -{ - Value ports_val(make_bit_string(1)); - Value enable_val(true); - Value min_age_val(1.0); - Value min_sync_val(0.1); - Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"}; - Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr }; - Parameter min_age_param = {"min_age", Parameter::PT_REAL, nullptr, "1.0", nullptr }; - Parameter min_sync_param = {"min_sync", Parameter::PT_REAL, nullptr, "0.1", nullptr }; - - HighAvailabilityModule module; - - ports_val.set(&ports_param); - enable_val.set(&enable_param); - min_age_val.set(&min_age_param); - min_sync_val.set(&min_sync_param); - - s_instantiate_called = false; - s_port_1_set = false; - s_use_daq = false; - - module.begin("high_availability", 0, nullptr); - module.set("high_availability.ports", ports_val, nullptr); - module.set("high_availability.enable", enable_val, nullptr); - module.set("high_availability.min_age", min_age_val, nullptr); - module.set("high_availability.min_sync", min_sync_val, nullptr); - module.end("high_availability", 0, nullptr); - - CHECK(s_instantiate_called == true); - CHECK(s_port_1_set == true); - CHECK(s_use_daq == false); -} - -TEST(high_availability_module_test, test_ha_disabled) -{ - Value enable_val(false); - Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr }; - - HighAvailabilityModule module; - - enable_val.set(&enable_param); - - s_instantiate_called = false; - s_port_1_set = false; - s_use_daq = false; - - module.begin("high_availability", 0, nullptr); - module.set("high_availability.enable", enable_val, nullptr); - module.end("high_availability", 0, nullptr); - - CHECK(s_instantiate_called == false); - CHECK(s_port_1_set == false); - CHECK(s_use_daq == false); -} - -TEST(high_availability_module_test, test_ha_valid_daq) -{ - Value ports_val(make_bit_string(1)); - Value enable_val(true); - Value daq_val(true); - Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"}; - Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr }; - Parameter daq_param = {"daq_channel", Parameter::PT_BOOL, nullptr, "false", nullptr }; - - HighAvailabilityModule module; - - ports_val.set(&ports_param); - enable_val.set(&enable_param); - daq_val.set(&daq_param); - - s_instantiate_called = false; - s_port_1_set = false; - s_use_daq = false; - - module.begin("high_availability", 0, nullptr); - module.set("high_availability.ports", ports_val, nullptr); - module.set("high_availability.enable", enable_val, nullptr); - module.set("high_availability.daq_channel", daq_val, nullptr); - module.end("high_availability", 0, nullptr); - - CHECK(s_instantiate_called == true); - CHECK(s_port_1_set == true); - CHECK(s_use_daq == true); -} - -int main(int argc, char** argv) -{ - return CommandLineTestRunner::RunAllTests(argc, argv); -} - diff --git a/src/flow/test/ha_test.cc b/src/flow/test/ha_test.cc index 44ea78ab0..7e1f8baf0 100644 --- a/src/flow/test/ha_test.cc +++ b/src/flow/test/ha_test.cc @@ -16,19 +16,14 @@ // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //-------------------------------------------------------------------------- -// ha_test.cc author Ed Borgoyn +// ha_test.cc authors Ed Borgoyn , Michael Altizer // unit test main #ifdef HAVE_CONFIG_H #include "config.h" #endif -#include "flow/ha.h" - -#include "flow/flow.h" -#include "flow/flow_key.h" -#include "main/snort_debug.h" -#include "stream/stream.h" +#include "flow/ha.cc" #include #include @@ -40,31 +35,53 @@ using namespace snort; class StreamHAClient; -static const uint8_t s_test_key[] = +static const FlowKey s_test_key = { -TEST_KEY + { 1, 2, 3, 4 }, + { 5, 6, 7, 8 }, + 9, + 10, + 11, + 12, + 13, + 14, + PktType::TCP, + 14, + 0, }; -static const uint8_t s_delete_message[] = +static struct __attribute__((__packed__)) TestDeleteMessage { + HAMessageHeader mhdr; + FlowKey key; +} s_delete_message = { - 0x01, - 0x03, - 0x00, - 0x00, - 0x01, -TEST_KEY + { + HA_DELETE_EVENT, + HA_MESSAGE_VERSION, + 0x35, + KEY_TYPE_IP6 + }, + s_test_key }; -static const uint8_t s_update_stream_message[] = +static struct __attribute__((__packed__)) TestUpdateMessage { + HAMessageHeader mhdr; + FlowKey key; + HAClientHeader schdr; + uint8_t scmsg[10]; +} s_update_stream_message = { - 0x02, - 0x03, - 0x00, - 0x00, - 0x01, -TEST_KEY, - 0x00, - 10, + { + HA_UPDATE_EVENT, + HA_MESSAGE_VERSION, + 0x41, + KEY_TYPE_IP6 + }, + s_test_key, + { + 0, + 10 + }, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; @@ -75,7 +92,9 @@ static SideChannel s_side_channel; static SCMessage s_sc_message; static SCMessage s_rec_sc_message; static bool s_stream_consume_called = false; +static uint8_t s_stream_consume_size = 0; static bool s_other_consume_called = false; +static uint8_t s_other_consume_size = 0; static bool s_get_session_called = false; static bool s_delete_session_called = false; static bool s_transmit_message_called = false; @@ -85,7 +104,7 @@ static uint8_t* s_message_content = nullptr; static uint8_t s_message_length = 0; static Flow s_flow; static FlowKey s_flowkey; -static DAQ_PktHdr_t s_pkthdr; +static Packet s_pkt; static StreamHAClient* s_ha_client; static FlowHAClient* s_other_ha_client; static std::function s_handler = nullptr; @@ -96,23 +115,33 @@ class StreamHAClient : public FlowHAClient public: StreamHAClient() : FlowHAClient(10, true) { } ~StreamHAClient() override = default; - bool consume(Flow*&, FlowKey*, HAMessage*) override + bool consume(Flow*&, const FlowKey*, HAMessage& msg, uint8_t size) override { s_stream_consume_called = true; + s_stream_consume_size = size; + + for ( uint8_t i = 0; i < 10; i++ ) + { + if (*msg.cursor != i) + return false; + msg.advance_cursor(sizeof(*msg.cursor)); + } + return true; } - bool produce(Flow*, HAMessage* msg) override + bool produce(Flow&, HAMessage& msg) override { - for ( uint8_t i=0; i<10; i++ ) - *(msg->cursor)++ = i; + if (!msg.fits(10)) + return false; + + for ( uint8_t i = 0; i < 10; i++ ) + { + *msg.cursor = i; + msg.advance_cursor(sizeof(*msg.cursor)); + } return true; } - uint8_t get_message_size() { return 10; } bool is_update_required(Flow*) override { return s_stream_update_required; } - bool fit(HAMessage*,uint8_t) { return true; } - bool place(HAMessage*,uint8_t*,uint8_t) { return true; } - -private: }; class OtherHAClient : public FlowHAClient @@ -120,32 +149,51 @@ class OtherHAClient : public FlowHAClient public: OtherHAClient() : FlowHAClient(5, false) { } ~OtherHAClient() override = default; - bool consume(Flow*&, HAMessage*) + bool consume(Flow*&, const FlowKey*, HAMessage& msg, uint8_t size) override { s_other_consume_called = true; + s_other_consume_size = size; + + for ( uint8_t i = 0; i < 5; i++ ) + { + if (*msg.cursor != i) + return false; + msg.advance_cursor(sizeof(*msg.cursor)); + } + return true; } - bool produce(Flow*, HAMessage* msg) override + bool produce(Flow&, HAMessage& msg) override { - for ( uint8_t i=0; i<5; i++ ) - *(msg->cursor)++ = i; + if (!msg.fits(5)) + return false; + + for ( uint8_t i = 0; i < 5; i++ ) + { + *msg.cursor = i; + msg.advance_cursor(sizeof(*msg.cursor)); + } return true; } - uint8_t get_message_size() { return 5; } bool is_update_required(Flow*) override { return s_other_update_required; } - bool fit(HAMessage*,uint8_t) { return true; } - bool place(HAMessage*,uint8_t*,uint8_t) { return true; } - -private: }; -Flow* Stream::get_flow(const FlowKey* flowkey) +//------------------------------------------------------------------------- +// stubs, spies, etc. +//------------------------------------------------------------------------- + +THREAD_LOCAL HAStats ha_stats = { }; + +Flow* Stream::get_flow(const FlowKey* flowkey) { s_flowkey = *flowkey; s_get_session_called = true; return &s_flow; } +Packet::Packet(bool) { } +Packet::~Packet() = default; + void Stream::delete_flow(const FlowKey* flowkey) { s_flowkey = *flowkey; @@ -158,10 +206,15 @@ void ErrorMessage(const char*,...) { } void LogMessage(const char*,...) { } } +int FlowKey::compare(const void*, const void*, size_t) { return 0; } + +int SFDAQInstance::ioctl(DAQ_IoctlCmd, void*, size_t) { return DAQ_SUCCESS; } + void packet_gettimeofday(struct timeval* tv) { *tv = s_packet_time; } Flow::Flow() { ha_state = new FlowHAState; key = new FlowKey; } +Flow::~Flow() { delete key; delete ha_state; } FlowStash::~FlowStash() { } @@ -205,7 +258,8 @@ bool SideChannel::transmit_message(SCMessage* msg) s_transmit_message_called = true; s_message_content = msg->content; s_message_length = msg->content_length; - return true; } + return true; +} SCMessage* SideChannel::alloc_transmit_message(uint32_t len) { @@ -217,22 +271,22 @@ SCMessage* SideChannel::alloc_transmit_message(uint32_t len) return &s_sc_message; } +//------------------------------------------------------------------------- +// tests +//------------------------------------------------------------------------- + TEST_GROUP(high_availability_manager_test) { - void setup() override - { - MemoryLeakWarningPlugin::turnOffNewDeleteOverloads(); - } - void teardown() override { - MemoryLeakWarningPlugin::turnOnNewDeleteOverloads(); + HighAvailabilityManager::term(); } }; TEST(high_availability_manager_test, init_term) { - HighAvailabilityManager::pre_config_init(); + HighAvailabilityConfig hac = { }; + HighAvailabilityManager::configure(&hac); HighAvailabilityManager::thread_init(); CHECK(HighAvailabilityManager::active()==false); HighAvailabilityManager::thread_term(); @@ -241,12 +295,15 @@ TEST(high_availability_manager_test, init_term) TEST(high_availability_manager_test, inst_init_term) { - HighAvailabilityManager::pre_config_init(); - PortBitSet port_set; - port_set.set(1); - struct timeval age = { 1, 0 }; - struct timeval interval = { 0, 500000 }; - HighAvailabilityManager::instantiate(&port_set, false, &age, &interval); + HighAvailabilityConfig hac; + hac.enabled = true; + hac.daq_channel = false; + hac.ports = new PortBitSet(); + hac.ports->set(1); + hac.min_session_lifetime = { 1, 0 }; + hac.min_sync_interval = { 0, 500000 }; + + HighAvailabilityManager::configure(&hac); HighAvailabilityManager::thread_init(); s_ha_client = new StreamHAClient; CHECK(HighAvailabilityManager::active()==true); @@ -322,13 +379,17 @@ TEST_GROUP(high_availability_test) { void setup() override { - MemoryLeakWarningPlugin::turnOffNewDeleteOverloads(); - HighAvailabilityManager::pre_config_init(); - PortBitSet port_set; - port_set.set(1); - struct timeval age = { 1, 0 }; - struct timeval interval = { 0, 500000 }; - HighAvailabilityManager::instantiate(&port_set, false, &age, &interval); + memset(&ha_stats, 0, sizeof(ha_stats)); + + HighAvailabilityConfig hac; + hac.enabled = true; + hac.daq_channel = false; + hac.ports = new PortBitSet(); + hac.ports->set(1); + hac.min_session_lifetime = { 1, 0 }; + hac.min_sync_interval = { 0, 500000 }; + + HighAvailabilityManager::configure(&hac); HighAvailabilityManager::thread_init(); s_ha_client = new StreamHAClient; s_other_ha_client = new OtherHAClient; @@ -339,14 +400,14 @@ TEST_GROUP(high_availability_test) delete s_other_ha_client; delete s_ha_client; HighAvailabilityManager::thread_term(); - MemoryLeakWarningPlugin::turnOnNewDeleteOverloads(); + HighAvailabilityManager::term(); } }; TEST(high_availability_test, receive_deletion) { s_delete_session_called = false; - s_message_content = (uint8_t*)s_delete_message; + s_message_content = (uint8_t*) &s_delete_message; s_message_length = sizeof(s_delete_message); HighAvailabilityManager::process_receive(); CHECK(s_delete_session_called == true); @@ -356,17 +417,19 @@ TEST(high_availability_test, receive_deletion) TEST(high_availability_test, receive_update_stream_only) { s_stream_consume_called = false; - s_message_content = (uint8_t*)s_update_stream_message; + s_stream_consume_size = 0; + s_message_content = (uint8_t*) &s_update_stream_message; s_message_length = sizeof(s_update_stream_message); HighAvailabilityManager::process_receive(); CHECK(s_stream_consume_called == true); + CHECK(s_stream_consume_size == 10); CHECK(memcmp((const void*)&s_flowkey, (const void*)&s_test_key, sizeof(s_test_key)) == 0); } TEST(high_availability_test, transmit_deletion) { s_transmit_message_called = false; - HighAvailabilityManager::process_deletion(&s_flow); + HighAvailabilityManager::process_deletion(s_flow); CHECK(s_transmit_message_called == true); } @@ -375,7 +438,7 @@ TEST(high_availability_test, transmit_update_no_update) s_transmit_message_called = false; s_stream_update_required = false; s_other_update_required = false; - HighAvailabilityManager::process_update(&s_flow, &s_pkthdr); + HighAvailabilityManager::process_update(&s_flow, &s_pkt); CHECK(s_transmit_message_called == false); } @@ -384,7 +447,7 @@ TEST(high_availability_test, transmit_update_stream_only) s_transmit_message_called = false; s_stream_update_required = true; s_other_update_required = false; - HighAvailabilityManager::process_update(&s_flow, &s_pkthdr); + HighAvailabilityManager::process_update(&s_flow, &s_pkt); CHECK(s_transmit_message_called == true); } @@ -395,10 +458,139 @@ TEST(high_availability_test, transmit_update_both_update) s_other_update_required = true; CHECK(s_other_ha_client->handle == 1); s_flow.ha_state->set_pending(s_other_ha_client->handle); - HighAvailabilityManager::process_update(&s_flow, &s_pkthdr); + HighAvailabilityManager::process_update(&s_flow, &s_pkt); CHECK(s_transmit_message_called == true); } +TEST(high_availability_test, read_flow_key_error_v4) +{ + HAMessageHeader hdr = { 0, 0, 0, KEY_TYPE_IP4 }; + HAMessage msg((uint8_t*) &s_test_key, KEY_SIZE_IP4 / 2); + FlowKey key; + + CHECK(read_flow_key(msg, &hdr, key) == 0); + CHECK(ha_stats.truncated_msgs == 1); +} + +TEST(high_availability_test, read_flow_key_error_v6) +{ + HAMessageHeader hdr = { 0, 0, 0, KEY_TYPE_IP6 }; + HAMessage msg((uint8_t*) &s_test_key, KEY_SIZE_IP6 / 2); + FlowKey key; + + CHECK(read_flow_key(msg, &hdr, key) == 0); + CHECK(ha_stats.truncated_msgs == 1); +} + +TEST(high_availability_test, read_flow_key_error_unknown) +{ + HAMessageHeader hdr = { 0, 0, 0, 0x42 }; + HAMessage msg((uint8_t*) &s_test_key, sizeof(s_test_key)); + FlowKey key; + + CHECK(read_flow_key(msg, &hdr, key) == 0); + CHECK(ha_stats.unknown_key_type == 1); +} + +TEST(high_availability_test, consume_error_truncated_client_hdr) +{ + HAClientHeader chdr = { 0, 0 }; + HAMessage msg((uint8_t*) &chdr, sizeof(chdr) / 2); + FlowKey key; + + consume_ha_update_message(msg, key); + CHECK(ha_stats.update_msgs_consumed == 0); + CHECK(ha_stats.truncated_msgs == 1); +} + +TEST(high_availability_test, consume_error_invalid_client_idx) +{ + HAClientHeader chdr = { 0x42, 0 }; + HAMessage msg((uint8_t*) &chdr, sizeof(chdr)); + FlowKey key; + + consume_ha_update_message(msg, key); + CHECK(ha_stats.update_msgs_consumed == 0); + CHECK(ha_stats.unknown_client_idx == 1); +} + +TEST(high_availability_test, consume_error_truncated_client_msg) +{ + struct __attribute__((__packed__)) + { + HAClientHeader chdr = { 0, 0x42 }; + uint8_t cmsg[0x42 / 2] = { }; + } input; + HAMessage msg((uint8_t*) &input, sizeof(input)); + FlowKey key; + + consume_ha_update_message(msg, key); + CHECK(ha_stats.update_msgs_consumed == 0); + CHECK(ha_stats.truncated_msgs == 1); +} + +TEST(high_availability_test, consume_error_client_consume) +{ + struct __attribute__((__packed__)) + { + HAClientHeader chdr = { 0, 10 }; + uint8_t cmsg[0x42 / 2] = { }; + } input; + HAMessage msg((uint8_t*) &input, sizeof(input)); + FlowKey key; + + consume_ha_update_message(msg, key); + CHECK(ha_stats.update_msgs_consumed == 0); + CHECK(ha_stats.client_consume_errors == 1); +} + +TEST(high_availability_test, consume_error_truncated_msg_hdr) +{ + HAMessageHeader hdr = { }; + HAMessage msg((uint8_t*) &hdr, sizeof(hdr) / 2); + + CHECK(consume_ha_message(msg) == nullptr); + CHECK(ha_stats.truncated_msgs == 1); +} + +TEST(high_availability_test, consume_error_version_mismatch) +{ + HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION + 1, 0, 0 }; + HAMessage msg((uint8_t*) &hdr, sizeof(hdr)); + + CHECK(consume_ha_message(msg) == nullptr); + CHECK(ha_stats.msg_version_mismatch == 1); +} + +TEST(high_availability_test, consume_error_length_mismatch) +{ + HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION, 0x42, 0 }; + HAMessage msg((uint8_t*) &hdr, sizeof(hdr)); + + CHECK(consume_ha_message(msg) == nullptr); + CHECK(ha_stats.msg_length_mismatch == 1); +} + +TEST(high_availability_test, produce_error_client_hdr_overflow) +{ + uint8_t buffer[sizeof(HAClientHeader) / 2]; + HAMessage msg(buffer, sizeof(buffer)); + Flow flow; + + write_update_msg_client(s_ha_client, flow, msg); + CHECK(msg.cursor == msg.buffer); +} + +TEST(high_availability_test, produce_error_client_produce) +{ + uint8_t buffer[sizeof(HAClientHeader)]; + HAMessage msg(buffer, sizeof(buffer)); + Flow flow; + + write_update_msg_client(s_ha_client, flow, msg); + CHECK(msg.cursor == msg.buffer); +} + int main(int argc, char** argv) { return CommandLineTestRunner::RunAllTests(argc, argv); diff --git a/src/main/analyzer.cc b/src/main/analyzer.cc index 2271472d3..0df55e5ee 100644 --- a/src/main/analyzer.cc +++ b/src/main/analyzer.cc @@ -288,7 +288,7 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p) PacketTracer::dump(p); } - HighAvailabilityManager::process_update(p->flow, p->pkth); + HighAvailabilityManager::process_update(p->flow, p); p->pkth = nullptr; // no longer avail upon sig segv diff --git a/src/main/snort.cc b/src/main/snort.cc index 86d1c6d55..5d5c1ee97 100644 --- a/src/main/snort.cc +++ b/src/main/snort.cc @@ -195,7 +195,6 @@ void Snort::init(int argc, char** argv) #endif SideChannelManager::pre_config_init(); - HighAvailabilityManager::pre_config_init(); ModuleManager::init(); ScriptManager::load_scripts(snort_cmd_line_conf->script_paths); @@ -230,6 +229,8 @@ void Snort::init(int argc, char** argv) if ( !sc->output.empty() ) EventManager::instantiate(sc->output.c_str(), sc); + HighAvailabilityManager::configure(sc->ha_config); + if (SnortConfig::alert_before_pass()) sc->rule_order = "reset block drop alert pass log"; @@ -408,6 +409,7 @@ void Snort::term() } CleanupProtoNames(); + HighAvailabilityManager::term(); SideChannelManager::term(); ModuleManager::term(); PluginManager::release_plugins(); diff --git a/src/main/snort_config.cc b/src/main/snort_config.cc index dc93daf5f..059dc7fa7 100644 --- a/src/main/snort_config.cc +++ b/src/main/snort_config.cc @@ -35,6 +35,7 @@ #include "filters/rate_filter.h" #include "filters/sfrf.h" #include "filters/sfthreshold.h" +#include "flow/ha_module.h" #include "hash/xhash.h" #include "helpers/process.h" #include "ips_options/ips_flowbits.h" @@ -298,6 +299,7 @@ SnortConfig::~SnortConfig() delete[] state; delete thread_config; + delete ha_config; if (gtp_ports) delete gtp_ports; diff --git a/src/main/snort_config.h b/src/main/snort_config.h index 1463e8e0d..7cc5225f0 100644 --- a/src/main/snort_config.h +++ b/src/main/snort_config.h @@ -116,29 +116,29 @@ enum TunnelFlags TUNNEL_MPLS = 0x80 }; -struct ClassType; +class FastPatternConfig; +class RuleState; +class ThreadConfig; + struct srmm_table_t; struct sopg_table_t; - -struct MemoryConfig; -struct LatencyConfig; -struct PORT_RULE_MAP; -struct RuleListNode; -struct RulePortTables; -class RuleState; +struct ClassType; struct DetectionFilterConfig; struct EventQueueConfig; -struct IpsActionsConfig; -class FastPatternConfig; +struct FlowBitState; struct FrameworkConfig; -struct ThresholdConfig; +struct HighAvailabilityConfig; +struct IpsActionsConfig; +struct LatencyConfig; +struct MemoryConfig; +struct PORT_RULE_MAP; struct RateFilterConfig; -struct SFDAQConfig; -class ThreadConfig; struct ReferenceSystemNode; +struct RuleListNode; +struct RulePortTables; +struct SFDAQConfig; +struct ThresholdConfig; struct VarNode; -struct _IntelPmHandles; -struct FlowBitState; namespace snort { @@ -372,6 +372,7 @@ public: unsigned num_slots = 0; ThreadConfig* thread_config; + HighAvailabilityConfig* ha_config = nullptr; //------------------------------------------------------ //Reload inspector related diff --git a/src/network_inspectors/appid/test/appid_debug_test.cc b/src/network_inspectors/appid/test/appid_debug_test.cc index 893eff60f..3f7c51568 100644 --- a/src/network_inspectors/appid/test/appid_debug_test.cc +++ b/src/network_inspectors/appid/test/appid_debug_test.cc @@ -51,7 +51,7 @@ public: AppIdInspector() = default; }; -AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector& inspector) +AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector&) : FlowData(0) { } AppIdSession::~AppIdSession() = default; diff --git a/src/network_inspectors/appid/test/appid_discovery_test.cc b/src/network_inspectors/appid/test/appid_discovery_test.cc index c5cea991a..8e843ab15 100644 --- a/src/network_inspectors/appid/test/appid_discovery_test.cc +++ b/src/network_inspectors/appid/test/appid_discovery_test.cc @@ -218,7 +218,7 @@ ServiceDiscovery& ServiceDiscovery::get_instance() s_discovery_manager = new ServiceDiscovery(); return *s_discovery_manager; } -AppId snort::host_cache_find_app_mapping(snort::SfIp const*, Port port , Protocol proto){ return 0; } +AppId snort::host_cache_find_app_mapping(snort::SfIp const*, Port, Protocol){ return 0; } // Stubs for ClientDiscovery ClientDiscovery::ClientDiscovery(){} ClientDiscovery::~ClientDiscovery() {} diff --git a/src/network_inspectors/appid/test/appid_mock_flow.h b/src/network_inspectors/appid/test/appid_mock_flow.h index 6004686f8..39dbe6f03 100644 --- a/src/network_inspectors/appid/test/appid_mock_flow.h +++ b/src/network_inspectors/appid/test/appid_mock_flow.h @@ -34,6 +34,7 @@ FlowData* mock_flow_data = nullptr; typedef int32_t AppId; Flow::Flow() = default; +Flow::~Flow() = default; class FakeFlow : public Flow { diff --git a/src/network_inspectors/appid/test/appid_session_api_test.cc b/src/network_inspectors/appid/test/appid_session_api_test.cc index ce1bd493a..4a613289e 100644 --- a/src/network_inspectors/appid/test/appid_session_api_test.cc +++ b/src/network_inspectors/appid/test/appid_session_api_test.cc @@ -262,7 +262,7 @@ TEST(appid_session_api, get_appid_session_attribute) CHECK_TRUE((fv & flags) == flags); mock_session->clear_session_flags(flags); fv = appid_session_api->get_appid_session_attribute(flags); - CHECK_TRUE((fv & flags) == 0) + CHECK_TRUE((fv & flags) == 0); } } diff --git a/src/network_inspectors/appid/test/service_state_test.cc b/src/network_inspectors/appid/test/service_state_test.cc index daaa8e5a6..b51afdc54 100644 --- a/src/network_inspectors/appid/test/service_state_test.cc +++ b/src/network_inspectors/appid/test/service_state_test.cc @@ -71,7 +71,7 @@ THREAD_LOCAL AppIdStats appid_stats; void AppIdDebug::activate(const Flow*, const AppIdSession*, bool) { active = true; } -AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector& inspector) +AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector&) : FlowData(0) {} AppIdSession::~AppIdSession() = default; AppIdDiscovery::AppIdDiscovery() {} diff --git a/src/service_inspectors/sip/test/sip_splitter_test.cc b/src/service_inspectors/sip/test/sip_splitter_test.cc index 39a51ba04..d87e0aaf4 100644 --- a/src/service_inspectors/sip/test/sip_splitter_test.cc +++ b/src/service_inspectors/sip/test/sip_splitter_test.cc @@ -45,7 +45,7 @@ TEST_GROUP(sip_splitter_test) TEST(sip_splitter_test, callispaf) { bool result = ssut.splitter_is_paf(); - CHECK(result) + CHECK(result); } TEST(sip_splitter_test, reset_states_test) diff --git a/src/stream/base/stream_ha.cc b/src/stream/base/stream_ha.cc index 3eff59a44..ff7bc6b9b 100644 --- a/src/stream/base/stream_ha.cc +++ b/src/stream/base/stream_ha.cc @@ -60,25 +60,25 @@ static void protocol_deactivate_session(Flow* flow) protocol_ha->deactivate_session(flow); } -static Flow* protocol_create_session(FlowKey* key) +static Flow* protocol_create_session(const FlowKey* key) { ProtocolHA* protocol_ha = get_protocol_ha(key->pkt_type); return protocol_ha ? protocol_ha->create_session(key) : nullptr; } -static bool is_client_lower(Flow* flow) +static bool is_client_lower(const Flow& flow) { - if (flow->client_ip.fast_lt6(flow->server_ip)) + if (flow.client_ip.fast_lt6(flow.server_ip)) return true; - if (flow->server_ip.fast_lt6(flow->client_ip)) + if (flow.server_ip.fast_lt6(flow.client_ip)) return false; - switch (flow->key->pkt_type) + switch (flow.key->pkt_type) { case PktType::TCP: case PktType::UDP: - if (flow->client_port < flow->server_port) + if (flow.client_port < flow.server_port) return true; break; default: @@ -87,18 +87,14 @@ static bool is_client_lower(Flow* flow) return false; } -bool StreamHAClient::consume(Flow*& flow, FlowKey* key, HAMessage* msg) +bool StreamHAClient::consume(Flow*& flow, const FlowKey* key, HAMessage& msg, uint8_t size) { assert(key); - assert(msg); - // Is the message long enough to have our content? - if ( ((unsigned)(msg->content_length()) - (unsigned)(msg->cursor - msg->content())) < - sizeof(SessionHAContent) ) + if (size != sizeof(SessionHAContent)) return false; - SessionHAContent* hac = (SessionHAContent*)msg->cursor; - msg->cursor += sizeof(SessionHAContent); + SessionHAContent* hac = (SessionHAContent*) msg.cursor; // If flow is missing, we need to create a new one. if ( flow == nullptr ) @@ -137,33 +133,28 @@ bool StreamHAClient::consume(Flow*& flow, FlowKey* key, HAMessage* msg) flow->ha_state->add(FlowHAState::STANDBY); } + msg.advance_cursor(sizeof(SessionHAContent)); + return true; } -bool StreamHAClient::produce(Flow* flow, HAMessage* msg) +bool StreamHAClient::produce(Flow& flow, HAMessage& msg) { - assert(flow); - assert(msg); + if (!msg.fits(sizeof(SessionHAContent))) + return false; - // Check for buffer overflows - if ( (int)(msg->cursor - msg->content()) <= (int)(msg->content_length() - - sizeof(SessionHAContent)) ) - { - SessionHAContent* hac = (SessionHAContent*)msg->cursor; + SessionHAContent* hac = (SessionHAContent*) msg.cursor; - memcpy(&(hac->ssn_state),&(flow->ssn_state),sizeof(LwState)); - hac->flow_state = flow->flow_state; - hac->flags = 0; - msg->cursor += sizeof(SessionHAContent); + hac->ssn_state = flow.ssn_state; + hac->flow_state = flow.flow_state; + hac->flags = 0; + if (!is_client_lower(flow)) + hac->flags |= SessionHAContent::FLAG_LOW; + hac->flags |= SessionHAContent::FLAG_IP6; - if ( !is_client_lower(flow) ) - hac->flags |= SessionHAContent::FLAG_LOW; + msg.advance_cursor(sizeof(SessionHAContent)); - hac->flags |= SessionHAContent::FLAG_IP6; - return true; - } - else - return false; + return true; } static void update_flags(Flow* flow) @@ -241,13 +232,8 @@ bool StreamHAClient::is_update_required(Flow* flow) and that we're not overrunning the synchronization threshold. */ if ( flow->ha_state->sync_interval_elapsed() ) return true; - else - return flow->ha_state->check_any(FlowHAState::CRITICAL); -} -bool StreamHAClient::is_delete_required(Flow*) -{ - return true; + return flow->ha_state->check_any(FlowHAState::CRITICAL); } ProtocolHA::ProtocolHA(PktType protocol) @@ -277,7 +263,7 @@ ProtocolHA::~ProtocolHA() } } -void ProtocolHA::process_deletion(Flow* flow) +void ProtocolHA::process_deletion(Flow& flow) { HighAvailabilityManager::process_deletion(flow); } diff --git a/src/stream/base/stream_ha.h b/src/stream/base/stream_ha.h index eae05b294..3276a81c1 100644 --- a/src/stream/base/stream_ha.h +++ b/src/stream/base/stream_ha.h @@ -23,28 +23,23 @@ #include "flow/flow.h" #include "flow/ha.h" -//------------------------------------------------------------------------- - class __attribute__((__packed__)) SessionHAContent { public: snort::LwState ssn_state; snort::Flow::FlowState flow_state; uint8_t flags; - static const uint8_t FLAG_LOW = 0x01; // client address / port is low in key - static const uint8_t FLAG_IP6 = 0x02; // key addresses are ip6 + static constexpr uint8_t FLAG_LOW = 0x01; // client address / port is low in key + static constexpr uint8_t FLAG_IP6 = 0x02; // key addresses are ip6 }; -class StreamHAClient : public FlowHAClient +class StreamHAClient : public snort::FlowHAClient { public: StreamHAClient() : FlowHAClient(sizeof(SessionHAContent), true) { } - bool consume(snort::Flow*&, snort::FlowKey*, HAMessage*) override; - bool produce(snort::Flow*, HAMessage*) override; + bool consume(snort::Flow*&, const snort::FlowKey*, snort::HAMessage&, uint8_t size) override; + bool produce(snort::Flow&, snort::HAMessage&) override; bool is_update_required(snort::Flow*) override; - bool is_delete_required(snort::Flow*) override; - -private: }; class ProtocolHA @@ -52,12 +47,9 @@ class ProtocolHA public: ProtocolHA(PktType); virtual ~ProtocolHA(); - virtual void delete_session(snort::Flow*) { } - virtual snort::Flow* create_session(snort::FlowKey*) { return nullptr; } + virtual snort::Flow* create_session(const snort::FlowKey*) { return nullptr; } virtual void deactivate_session(snort::Flow*) { } - virtual void process_deletion(snort::Flow*); - -private: + virtual void process_deletion(snort::Flow&); }; class StreamHAManager diff --git a/src/stream/icmp/icmp_ha.cc b/src/stream/icmp/icmp_ha.cc index 0b0e70622..509186223 100644 --- a/src/stream/icmp/icmp_ha.cc +++ b/src/stream/icmp/icmp_ha.cc @@ -28,7 +28,7 @@ using namespace snort; -Flow* IcmpHA::create_session(FlowKey* key) +Flow* IcmpHA::create_session(const FlowKey* key) { assert(key); Flow* flow = Stream::new_flow(key); @@ -45,7 +45,7 @@ Flow* IcmpHA::create_session(FlowKey* key) THREAD_LOCAL IcmpHA* IcmpHAManager::icmp_ha = nullptr; -void IcmpHAManager::process_deletion(Flow* flow) +void IcmpHAManager::process_deletion(Flow& flow) { if( icmp_ha != nullptr ) icmp_ha->process_deletion(flow); diff --git a/src/stream/icmp/icmp_ha.h b/src/stream/icmp/icmp_ha.h index 1437e3a42..e3df35337 100644 --- a/src/stream/icmp/icmp_ha.h +++ b/src/stream/icmp/icmp_ha.h @@ -33,7 +33,7 @@ class IcmpHA : public ProtocolHA { public: IcmpHA() : ProtocolHA(PktType::ICMP) { } - snort::Flow* create_session(snort::FlowKey*) override; + snort::Flow* create_session(const snort::FlowKey*) override; private: }; @@ -41,7 +41,7 @@ private: class IcmpHAManager { public: - static void process_deletion(snort::Flow* flow); + static void process_deletion(snort::Flow& flow); static void tinit(); static void tterm(); static THREAD_LOCAL IcmpHA* icmp_ha; diff --git a/src/stream/icmp/icmp_session.cc b/src/stream/icmp/icmp_session.cc index 230bc3457..7bc5b6a77 100644 --- a/src/stream/icmp/icmp_session.cc +++ b/src/stream/icmp/icmp_session.cc @@ -196,7 +196,7 @@ bool IcmpSession::setup(Packet*) void IcmpSession::clear() { IcmpSessionCleanup(flow); - IcmpHAManager::process_deletion(flow); + IcmpHAManager::process_deletion(*flow); } int IcmpSession::process(Packet* p) diff --git a/src/stream/ip/ip_ha.cc b/src/stream/ip/ip_ha.cc index 153af32cf..a9bb32055 100644 --- a/src/stream/ip/ip_ha.cc +++ b/src/stream/ip/ip_ha.cc @@ -29,7 +29,7 @@ using namespace snort; -Flow* IpHA::create_session(FlowKey* key) +Flow* IpHA::create_session(const FlowKey* key) { assert(key); @@ -47,7 +47,7 @@ Flow* IpHA::create_session(FlowKey* key) THREAD_LOCAL IpHA* IpHAManager::ip_ha = nullptr; -void IpHAManager::process_deletion(Flow* flow) +void IpHAManager::process_deletion(Flow& flow) { if( ip_ha != nullptr ) ip_ha->process_deletion(flow); diff --git a/src/stream/ip/ip_ha.h b/src/stream/ip/ip_ha.h index e3333e1f8..9dd0a13aa 100644 --- a/src/stream/ip/ip_ha.h +++ b/src/stream/ip/ip_ha.h @@ -33,7 +33,7 @@ class IpHA : public ProtocolHA { public: IpHA() : ProtocolHA(PktType::IP) { } - snort::Flow* create_session(snort::FlowKey*) override; + snort::Flow* create_session(const snort::FlowKey*) override; private: }; @@ -41,7 +41,7 @@ private: class IpHAManager { public: - static void process_deletion(snort::Flow* flow); + static void process_deletion(snort::Flow& flow); static void tinit(); static void tterm(); static THREAD_LOCAL IpHA* ip_ha; diff --git a/src/stream/ip/ip_session.cc b/src/stream/ip/ip_session.cc index dc5092885..2a44eb5da 100644 --- a/src/stream/ip/ip_session.cc +++ b/src/stream/ip/ip_session.cc @@ -136,7 +136,7 @@ void IpSession::clear() } IpSessionCleanup(flow, &tracker); - IpHAManager::process_deletion(flow); + IpHAManager::process_deletion(*flow); } bool IpSession::setup(Packet* p) @@ -173,7 +173,7 @@ int IpSession::process(Packet* p) if ( Stream::expected_flow(flow, p) ) return 0; #endif - IpHAManager::process_deletion(flow); + IpHAManager::process_deletion(*flow); } if ( Stream::blocked_flow(p) || Stream::ignored_flow(flow, p) ) diff --git a/src/stream/libtcp/stream_tcp_unit_test.cc b/src/stream/libtcp/stream_tcp_unit_test.cc index c472dd6bb..9772dd815 100644 --- a/src/stream/libtcp/stream_tcp_unit_test.cc +++ b/src/stream/libtcp/stream_tcp_unit_test.cc @@ -99,7 +99,6 @@ static Packet* init_packet(Flow* flow, uint32_t talker) void release_packet(Packet* p) { - delete p->flow->session; delete p->context; delete p; } diff --git a/src/stream/libtcp/tcp_stream_session.cc b/src/stream/libtcp/tcp_stream_session.cc index 1668a85fa..b3e2fa1ae 100644 --- a/src/stream/libtcp/tcp_stream_session.cc +++ b/src/stream/libtcp/tcp_stream_session.cc @@ -437,7 +437,7 @@ void TcpStreamSession::clear() // this does NOT flush data clear_session( true, false, false ); - TcpHAManager::process_deletion(flow); + TcpHAManager::process_deletion(*flow); } void TcpStreamSession::set_splitter(bool to_server, StreamSplitter* ss) diff --git a/src/stream/stream.cc b/src/stream/stream.cc index 5f5b47293..3ca0d7234 100644 --- a/src/stream/stream.cc +++ b/src/stream/stream.cc @@ -73,11 +73,6 @@ Flow* Stream::get_flow(const FlowKey* key) Flow* Stream::new_flow(const FlowKey* key) { return flow_con->new_flow(key); } -Flow* Stream::new_flow(FlowKey* key) -{ - return flow_con ? flow_con->new_flow(key) : nullptr; -} - void Stream::delete_flow(const FlowKey* key) { flow_con->delete_flow(key); } diff --git a/src/stream/stream.h b/src/stream/stream.h index 36d0db399..be66d1794 100644 --- a/src/stream/stream.h +++ b/src/stream/stream.h @@ -75,7 +75,6 @@ public: static void timeout_flows(time_t cur_time); static void prune_flows(); static bool expected_flow(Flow*, Packet*); - static Flow* new_flow(FlowKey*); // Looks in the flow cache for flow session with specified key and returns // pointer to flow session object if found, otherwise null. diff --git a/src/stream/tcp/tcp_ha.cc b/src/stream/tcp/tcp_ha.cc index 5932ae885..9b37fee63 100644 --- a/src/stream/tcp/tcp_ha.cc +++ b/src/stream/tcp/tcp_ha.cc @@ -29,7 +29,7 @@ using namespace snort; -Flow* TcpHA::create_session(FlowKey* key) +Flow* TcpHA::create_session(const FlowKey* key) { assert(key); @@ -60,7 +60,7 @@ void TcpHA::deactivate_session(Flow* flow) THREAD_LOCAL TcpHA* TcpHAManager::tcp_ha = nullptr; -void TcpHAManager::process_deletion(Flow* flow) +void TcpHAManager::process_deletion(Flow& flow) { if( tcp_ha != nullptr ) tcp_ha->process_deletion(flow); diff --git a/src/stream/tcp/tcp_ha.h b/src/stream/tcp/tcp_ha.h index 23d888b1e..1ed0204e7 100644 --- a/src/stream/tcp/tcp_ha.h +++ b/src/stream/tcp/tcp_ha.h @@ -33,7 +33,7 @@ class TcpHA : public ProtocolHA { public: TcpHA() : ProtocolHA(PktType::TCP) { } - snort::Flow* create_session(snort::FlowKey*) override; + snort::Flow* create_session(const snort::FlowKey*) override; void deactivate_session(snort::Flow*) override; private: @@ -42,7 +42,7 @@ private: class TcpHAManager { public: - static void process_deletion(snort::Flow* flow); + static void process_deletion(snort::Flow& flow); static void tinit(); static void tterm(); static THREAD_LOCAL TcpHA* tcp_ha; diff --git a/src/stream/tcp/tcp_session.cc b/src/stream/tcp/tcp_session.cc index ede24a943..823b18479 100644 --- a/src/stream/tcp/tcp_session.cc +++ b/src/stream/tcp/tcp_session.cc @@ -902,7 +902,7 @@ void TcpSession::do_packet_analysis_post_checks(Packet* p) flow->set_expire(p, config->session_timeout); } else - TcpHAManager::process_deletion(p->flow); + TcpHAManager::process_deletion(*p->flow); if (pkt_action_mask & ACTION_DISABLE_INSPECTION) { @@ -954,7 +954,7 @@ void TcpSession::cleanup_session_if_expired(Packet* p) clear_session(true, true, false, p); tcpStats.timeouts++; - TcpHAManager::process_deletion(flow); + TcpHAManager::process_deletion(*flow); } } diff --git a/src/stream/udp/udp_ha.cc b/src/stream/udp/udp_ha.cc index 5f76ae795..fdef875de 100644 --- a/src/stream/udp/udp_ha.cc +++ b/src/stream/udp/udp_ha.cc @@ -29,7 +29,7 @@ using namespace snort; -Flow* UdpHA::create_session(FlowKey* key) +Flow* UdpHA::create_session(const FlowKey* key) { assert(key); @@ -46,7 +46,7 @@ Flow* UdpHA::create_session(FlowKey* key) THREAD_LOCAL UdpHA* UdpHAManager::udp_ha = nullptr; -void UdpHAManager::process_deletion(Flow* flow) +void UdpHAManager::process_deletion(Flow& flow) { if( udp_ha != nullptr ) udp_ha->process_deletion(flow); diff --git a/src/stream/udp/udp_ha.h b/src/stream/udp/udp_ha.h index 1c7848bc3..55ca9e8cc 100644 --- a/src/stream/udp/udp_ha.h +++ b/src/stream/udp/udp_ha.h @@ -33,7 +33,7 @@ class UdpHA : public ProtocolHA { public: UdpHA() : ProtocolHA(PktType::UDP) { } - snort::Flow* create_session(snort::FlowKey*) override; + snort::Flow* create_session(const snort::FlowKey*) override; private: }; @@ -41,7 +41,7 @@ private: class UdpHAManager { public: - static void process_deletion(snort::Flow* flow); + static void process_deletion(snort::Flow& flow); static void tinit(); static void tterm(); static THREAD_LOCAL UdpHA* udp_ha; diff --git a/src/stream/udp/udp_session.cc b/src/stream/udp/udp_session.cc index d5e79c4c2..daa35ff64 100644 --- a/src/stream/udp/udp_session.cc +++ b/src/stream/udp/udp_session.cc @@ -137,7 +137,7 @@ bool UdpSession::setup(Packet* p) void UdpSession::clear() { UdpSessionCleanup(flow); - UdpHAManager::process_deletion(flow); + UdpHAManager::process_deletion(*flow); flow->clear(); } @@ -187,7 +187,7 @@ int UdpSession::process(Packet* p) flow->restart(); flow->ssn_state.session_flags |= SSNFLAG_SEEN_SENDER; udpStats.created++; - UdpHAManager::process_deletion(flow); + UdpHAManager::process_deletion(*flow); } ProcessUdp(flow, p, pc, nullptr);