]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Merge pull request #1619 in SNORT/snort3 from ~MIALTIZE/snort3:ha_daq to master
authorMichael Altizer (mialtize) <mialtize@cisco.com>
Wed, 12 Jun 2019 03:31:44 +0000 (23:31 -0400)
committerMichael Altizer (mialtize) <mialtize@cisco.com>
Wed, 12 Jun 2019 03:31:44 +0000 (23:31 -0400)
Squashed commit of the following:

commit 5aacc37644226329a02dc2637093c457614b351d
Author: Michael Altizer <mialtize@cisco.com>
Date:   Mon Jun 10 17:43:32 2019 -0400

    flow: Implement storing and importing HA data via DAQ IOCTLs

    This involved significant refactoring of the Flow HA code and added many
    peg counts to the module.  Export FlowHAClient, HighAvailabilityManager,
    and FlowHAState in flow/ha.h.  Specify that HA time parameters are in
    seconds.  The useless HA module unit tests were removed in the process.

commit 9fec6bc1993d35969c9aca4198ec0865ef7597e5
Author: Michael Altizer <mialtize@cisco.com>
Date:   Fri Jun 7 14:32:18 2019 -0400

    check: Fix missing semicolons on CHECK calls

commit fb6e8988fd3790f54c790110150b965a3abb456b
Author: Michael Altizer <mialtize@cisco.com>
Date:   Tue May 28 12:30:33 2019 -0400

    build: Fix unused parameter warnings in unit tests

41 files changed:
src/flow/CMakeLists.txt
src/flow/dev_notes.txt
src/flow/flow.cc
src/flow/flow.h
src/flow/flow_control.cc
src/flow/ha.cc
src/flow/ha.h
src/flow/ha_module.cc
src/flow/ha_module.h
src/flow/test/CMakeLists.txt
src/flow/test/flow_stash_test.cc
src/flow/test/ha_module_test.cc [deleted file]
src/flow/test/ha_test.cc
src/main/analyzer.cc
src/main/snort.cc
src/main/snort_config.cc
src/main/snort_config.h
src/network_inspectors/appid/test/appid_debug_test.cc
src/network_inspectors/appid/test/appid_discovery_test.cc
src/network_inspectors/appid/test/appid_mock_flow.h
src/network_inspectors/appid/test/appid_session_api_test.cc
src/network_inspectors/appid/test/service_state_test.cc
src/service_inspectors/sip/test/sip_splitter_test.cc
src/stream/base/stream_ha.cc
src/stream/base/stream_ha.h
src/stream/icmp/icmp_ha.cc
src/stream/icmp/icmp_ha.h
src/stream/icmp/icmp_session.cc
src/stream/ip/ip_ha.cc
src/stream/ip/ip_ha.h
src/stream/ip/ip_session.cc
src/stream/libtcp/stream_tcp_unit_test.cc
src/stream/libtcp/tcp_stream_session.cc
src/stream/stream.cc
src/stream/stream.h
src/stream/tcp/tcp_ha.cc
src/stream/tcp/tcp_ha.h
src/stream/tcp/tcp_session.cc
src/stream/udp/udp_ha.cc
src/stream/udp/udp_ha.h
src/stream/udp/udp_session.cc

index f567ae5800668abde9c16f48b6b38cfe27c2ecb9..f84e141317e436cee60e99b0f301a4c0c5b93879 100644 (file)
@@ -3,6 +3,7 @@ set (FLOW_INCLUDES
     flow.h
     flow_key.h
     flow_stash.h
+    ha.h
     stash_item.h
 )
 
@@ -19,7 +20,6 @@ add_library (flow OBJECT
     flow_stash.h
     flow_stash.cc
     ha.cc
-    ha.h
     ha_module.cc
     ha_module.h
     prune_stats.h
index 2d110be73f6c3b7f45de3f1b07dd8893cc4257ff..24bdde08498ec75718cf31316d70ffd79f81bfbf 100644 (file)
@@ -20,27 +20,39 @@ can be freed (via garbage collection) after a reload.
 There are many flags that may be set on a flow to indicate session tracking
 state, disposition, etc.
 
+=== High Availability
+
 HighAvailability (ha.cc, ha.h) serves to synchronize session state between high
-availabity partners.  HighAvailability uses Side Channel Connectors to transmit
-and receive messages.  The HA side channel must be full duplex (both a
-transmit and receive connector).  The primary purpose is to exchange session
-state with the goal of keeping the session caches in sync.  But other clients
-may also subscribe to the HA service to exchange additional session data.  Each
-client uses the FlowHAClient class to subscribe to the service.
+availabity partners.  The primary purpose is to exchange session state with the
+goal of keeping the session caches in sync.  Other clients may also register
+with the HA service to exchange additional session data by implementing one or
+more FlowHAClient classes.
+
+HA uses Side Channel Connectors and/or DAQ module IOCTLs to transmit and receive
+HA state messages.  A full duplex Side Channel is required for Side Channel
+communications.  A DAQ module that supports the IOCTLs for setting and getting
+HA state is required for a functional DAQ-backed setup.  Modules that do not
+support these IOCTLs will silently fail and present no HA state on future
+packets in the flow.  HA state will be queried from the DAQ module prior to new
+flow creation when the appropriate DAQ packet message flag has been set on the
+intiating packet.
 
 The HA subsystem exchanges two high level message types:
   - DELETE: Indicate to the partner that a session has neen removed.  No
-additional HA client status will be exchanged.
+additional HA client status will be exchanged.  (Not used for DAQ-backed
+storage.)
   - UPDATE: Indicate all other state changes.  The message always includes
 the session state and optionally may include state from other HA clients.
+By default, the update messages are incremental.  In the case of DAQ-backed
+storage, the update messages are always fully formed.
 
 The HA subsystem implements these classes:
   - HighAvailabilityManager - A collection of static elements providing the
     top-most interface to HA capabilities.
-  - HAMessage - A wrapper around the actual message (SideChannel only for now)
-    and includes a cursor for producer/consumer activity.  Passed around
+  - HAMessage - A wrapper around the actual message and includes a cursor and
+    convenience functions for producer/consumer activity.  Passed around
     among all message handing classes/methods.
-  - HighAvailability - If HA is enabled instantiated in each packet thread and
+  - HighAvailability - If HA is enabled, instantiated in each packet thread and
     provides all primary HA functionality for the thread.  Referenced via a
     THREAD_LOCAL object pointer.
   - FlowHAState - One per flow and referenced via a pointer in the Flow object.
index a8adde24d093d6216e7a053b52d2139a1e0ad078..db315555e0d802ea861077967fd35b6caeedc4f1 100644 (file)
@@ -79,6 +79,10 @@ Flow::Flow()
     memset(this, 0, sizeof(*this));
 }
 
+Flow::~Flow()
+{
+    term();
+}
 
 void Flow::init(PktType type)
 {
index 48ca0df581a754ad05b2495d8747789981b6bf0d..9e827f9e9bdc0a2e88656fc289039e350a9eabe1 100644 (file)
 #define STREAM_STATE_BLOCK_PENDING     0x4000
 
 class BitOp;
-class FlowHAState;
 class Session;
 
 namespace snort
 {
+class FlowHAState;
 struct FlowKey;
 class IpsContext;
 struct Packet;
@@ -164,6 +164,7 @@ public:
         ALLOW
     };
     Flow();
+    ~Flow();
 
     Flow(const Flow&) = delete;
     Flow& operator=(const Flow&) = delete;
index e6bba57cf4e36299224773a394849cda5360ef6a..42ecce4813519fd82c9fac3d2531eb4018d263a7 100644 (file)
@@ -36,6 +36,7 @@
 
 #include "expect_cache.h"
 #include "flow_cache.h"
+#include "ha.h"
 #include "session.h"
 
 using namespace snort;
@@ -362,6 +363,8 @@ bool FlowControl::process(PktType type, Packet* p)
     FlowKey key;
     set_key(&key, p);
     Flow* flow = con.cache->find(&key);
+    if ( !flow )
+        flow = HighAvailabilityManager::import(*p, key);
 
     if ( !flow )
     {
index 2f9b23c13a38d9192d314a4375f09f4f77579c95..6e25cf730dac401a47fde94eacac8d6526f702d4 100644 (file)
@@ -15,7 +15,7 @@
 // with this program; if not, write to the Free Software Foundation, Inc.,
 // 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 //--------------------------------------------------------------------------
-// ha.cc author Ed Borgoyn <eborgoyn@cisco.com>
+// ha.cc authors Ed Borgoyn <eborgoyn@cisco.com>, Michael Altizer <mialtize@cisco.com>
 
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 
 #include "ha.h"
 
-#include <array>
-
 #include "framework/counts.h"
 #include "log/messages.h"
-#include "profiler/profiler_defs.h"
+#include "packet_io/sfdaq_instance.h"
+#include "protocols/packet.h"
+#include "side_channel/side_channel.h"
 #include "stream/stream.h"
 #include "time/packet_time.h"
 
 #include "flow.h"
 #include "flow_key.h"
+#include "ha_module.h"
 
 using namespace snort;
 
-static const uint8_t HA_MESSAGE_VERSION = 3;
+enum HAEvent
+{
+    HA_DELETE_EVENT = 1,
+    HA_UPDATE_EVENT = 2
+};
 
-// define message size and content constants.
-static const uint8_t KEY_SIZE_IP6 = sizeof(FlowKey);
-// ip4 key is smaller by 2*(ip6-addr-size - ip4-addr-size) or 2*(16 - 4) = 24
-static const uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24;
+struct __attribute__((__packed__)) HAMessageHeader
+{
+    uint8_t event;
+    uint8_t version;
+    uint16_t total_length;
+    uint8_t key_type;
+};
+
+struct __attribute__((__packed__)) HAClientHeader
+{
+    uint8_t client;
+    uint8_t length;
+};
+
+// One client for each mask bit plus one 'automatic' session client
+//   client handle = (1<<(client_index-1)
+//   session client has handle of 0 and index of 0
+static constexpr uint8_t MAX_CLIENTS = 17;
 
-static const suseconds_t USEC_PER_SEC = 1000000;
+// HighAvailability is the thread-local state/configuration instantiated for each packet thread.
+typedef std::array<FlowHAClient*, MAX_CLIENTS> ClientMap;
+class HighAvailability
+{
+public:
+    HighAvailability(PortBitSet*,bool);
+    ~HighAvailability();
+
+    void process_update(Flow*, Packet*);
+    void process_deletion(Flow&);
+    void process_receive();
+
+    Flow* process_daq_import(Packet&, FlowKey&);
+
+    // The [0] entry contains the stream client (always present)
+    // Entries [1] to [MAX_CLIENTS-1] contain the optional clients
+    ClientMap client_map = { };
+    uint8_t handle_counter = 1; // stream client (index == 0) always exists
+    bool shutting_down = false;
+
+private:
+    SideChannel* sc = nullptr;
+    bool use_daq_channel;
+};
+
+static constexpr uint8_t HA_MESSAGE_VERSION = 3;
+
+// define message size and content constants.
+static constexpr uint8_t KEY_SIZE_IP6 = sizeof(FlowKey);
+// ip4 key is smaller by 2*(ip6_addr_size - ip4_addr_size) or 2 * (16 - 4) = 24
+static constexpr uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24;
 
 enum
 {
@@ -51,22 +100,16 @@ enum
     KEY_TYPE_IP4 = 2
 };
 
-typedef std::array<FlowHAClient*, MAX_CLIENTS> ClientMap;
-
-THREAD_LOCAL SimpleStats ha_stats;
-THREAD_LOCAL ProfileStats ha_perf_stats;
+static constexpr FlowHAClientHandle SESSION_HA_CLIENT = 0x0000;
+static constexpr uint8_t SESSION_HA_CLIENT_INDEX = 0;
 
-static THREAD_LOCAL HighAvailability* ha;
 PortBitSet* HighAvailabilityManager::ports = nullptr;
 bool HighAvailabilityManager::use_daq_channel = false;
-THREAD_LOCAL bool HighAvailabilityManager::shutting_down = false;
+
 struct timeval FlowHAState::min_session_lifetime;
 struct timeval FlowHAState::min_sync_interval;
-uint8_t s_handle_counter = 1; // stream client (index == 0) always exists
 
-// The [0] entry contains the stream client (always present)
-// Entries [1] to [MAX_CLIENTS-1] contain the optional clients
-static THREAD_LOCAL ClientMap* s_client_map;
+static THREAD_LOCAL HighAvailability* ha;
 
 static inline bool is_ip6_key(const FlowKey* key)
 {
@@ -82,28 +125,22 @@ FlowHAState::FlowHAState()
 
     // Set the initial update time to now+min_session_lifetime
     packet_gettimeofday(&next_update);
-    next_update.tv_usec += min_session_lifetime.tv_usec;
-    if (next_update.tv_usec > USEC_PER_SEC)
-    {
-        next_update.tv_usec -= USEC_PER_SEC;
-        next_update.tv_sec++;
-    }
-    next_update.tv_sec += min_session_lifetime.tv_sec;
+    timeradd(&next_update, &min_session_lifetime, &next_update);
 }
 
 void FlowHAState::set_pending(FlowHAClientHandle handle)
 {
-    pending |= (uint16_t)handle;
+    pending |= handle;
 }
 
 bool FlowHAState::check_pending(FlowHAClientHandle handle)
 {
-    return ((pending & (uint16_t)handle) != 0);
+    return ((pending & handle) != 0);
 }
 
 void FlowHAState::clear_pending(FlowHAClientHandle handle)
 {
-    pending &= ~((uint16_t)handle);
+    pending &= ~handle;
 }
 
 void FlowHAState::set(uint8_t new_state)
@@ -145,13 +182,7 @@ bool FlowHAState::sync_interval_elapsed()
 
 void FlowHAState::set_next_update()
 {
-    next_update.tv_usec += min_sync_interval.tv_usec;
-    if (next_update.tv_usec > USEC_PER_SEC)
-    {
-        next_update.tv_usec -= USEC_PER_SEC;
-        next_update.tv_sec++;
-    }
-    next_update.tv_sec += min_sync_interval.tv_sec;
+    timeradd(&next_update, &min_sync_interval, &next_update);
 }
 
 void FlowHAState::reset()
@@ -162,144 +193,128 @@ void FlowHAState::reset()
 
 FlowHAClient::FlowHAClient(uint8_t length, bool session_client)
 {
-    if ( !s_client_map )
+    if (!ha)
         return;
 
-    header.length = length;
+    max_length = length;
 
-    if ( session_client )
+    if (session_client)
     {
+        index = SESSION_HA_CLIENT_INDEX;
         handle = SESSION_HA_CLIENT;
-        header.client = SESSION_HA_CLIENT_INDEX;
-        (*s_client_map)[0] = this;
+        ha->client_map[0] = this;
     }
     else
     {
-        if ( s_handle_counter >= MAX_CLIENTS )
+        if (ha->handle_counter >= MAX_CLIENTS)
         {
             ErrorMessage("Attempting to register too many FlowHAClients\n");
             return;
         }
 
-        header.client = s_handle_counter;
-        handle = (1 << (s_handle_counter-1));
-        (*s_client_map)[s_handle_counter] = this;
-        s_handle_counter += 1;
-    }
-}
-
-bool FlowHAClient::fit(HAMessage* msg, uint8_t size)
-{
-    return ( (int)(msg->cursor - msg->content()) < (int)(msg->content_length() - size) );
-}
-
-bool FlowHAClient::place(HAMessage* msg, uint8_t* data, uint8_t length)
-{
-    if ( fit(msg, length) )
-    {
-        memcpy(msg->cursor,data,(size_t)length);
-        msg->cursor += length;
-        return true;
+        index = ha->handle_counter;
+        handle = (1 << (index - 1));
+        ha->client_map[index] = this;
+        ha->handle_counter++;
     }
-    else
-        return false;
 }
 
 // Write the key type, key length, and key into the message.
-// Does not use the message cursor coming in.
-// Leave the message cursor just after the key. Return
-// the key length.
-static uint8_t write_flow_key(Flow* flow, HAMessage* msg)
-{
-    HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
-    msg->cursor = (uint8_t*)hdr + sizeof(HAMessageHeader);
-    const FlowKey* key = flow->key;
+// Return the type of key written so it can be stored in the message header.
+static uint8_t write_flow_key(Flow& flow, HAMessage& msg)
+{
+    const FlowKey* key = flow.key;
     assert(key);
 
-    if ( is_ip6_key(flow->key) )
+    if (is_ip6_key(flow.key))
     {
-        hdr->key_type = KEY_TYPE_IP6;
-        memcpy(msg->cursor, key, KEY_SIZE_IP6);
-        msg->cursor += KEY_SIZE_IP6;
+        memcpy(msg.cursor, key, KEY_SIZE_IP6);
+        msg.advance_cursor(KEY_SIZE_IP6);
 
-        return KEY_SIZE_IP6;
+        return KEY_TYPE_IP6;
     }
 
-    hdr->key_type = KEY_TYPE_IP4;
-    memcpy(msg->cursor, &key->ip_l[3], sizeof(key->ip_l[3]));
-    msg->cursor += sizeof(key->ip_l[3]);
-    memcpy(msg->cursor, &key->ip_h[3], sizeof(key->ip_h[3]));
-    msg->cursor += sizeof(key->ip_h[3]);
-    memcpy(msg->cursor, ((const uint8_t*)key) + 32, KEY_SIZE_IP4 - 8);
-    msg->cursor += KEY_SIZE_IP4 - 8;
+    memcpy(msg.cursor, &key->ip_l[3], sizeof(key->ip_l[3]));
+    msg.advance_cursor(sizeof(key->ip_l[3]));
+    memcpy(msg.cursor, &key->ip_h[3], sizeof(key->ip_h[3]));
+    msg.advance_cursor(sizeof(key->ip_h[3]));
+    memcpy(msg.cursor, ((const uint8_t*) key) + 32, KEY_SIZE_IP4 - 8);
+    msg.advance_cursor(KEY_SIZE_IP4 - 8);
 
-    return KEY_SIZE_IP4;
+    return KEY_TYPE_IP4;
 }
 
-// Regardless of the message cursor, extract the key and
-// return the key length.  Position the cursor just after the key.
-static uint8_t read_flow_key(FlowKey* key, HAMessage* msg)
+// Extract the key and return the key length.  Position the cursor just after the key.
+static uint8_t read_flow_key(HAMessage& msg, const HAMessageHeader* hdr, FlowKey& key)
 {
-    assert(key);
-    HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
-    msg->cursor = (uint8_t*)hdr + sizeof(HAMessageHeader);
-
-    if ( hdr->key_type == KEY_TYPE_IP6 )
+    if (hdr->key_type == KEY_TYPE_IP6)
     {
-        memcpy(key, msg->cursor, KEY_SIZE_IP6);
-        msg->cursor += KEY_SIZE_IP6;
+        if (!msg.fits(KEY_SIZE_IP6))
+        {
+            ha_stats.truncated_msgs++;
+            return 0;
+        }
+
+        memcpy(&key, msg.cursor, KEY_SIZE_IP6);
+        msg.advance_cursor(KEY_SIZE_IP6);
 
         return KEY_SIZE_IP6;
     }
-    else if ( hdr->key_type == KEY_TYPE_IP4 )
+    else if (hdr->key_type == KEY_TYPE_IP4)
     {
+        if (!msg.fits(KEY_SIZE_IP4))
+        {
+            ha_stats.truncated_msgs++;
+            return 0;
+        }
+
         /* Lower IPv4 address */
-        memcpy(&key->ip_l[3], msg->cursor, sizeof(key->ip_l[3]));
-        key->ip_l[0] = key->ip_l[1] = 0;
-        key->ip_l[2] = htonl(0xFFFF);
-        msg->cursor += sizeof(key->ip_l[3]);
+        memcpy(&key.ip_l[3], msg.cursor, sizeof(key.ip_l[3]));
+        key.ip_l[0] = key.ip_l[1] = 0;
+        key.ip_l[2] = htonl(0xFFFF);
+        msg.advance_cursor(sizeof(key.ip_l[3]));
         /* Higher IPv4 address */
-        memcpy(&key->ip_h[3], msg->cursor, sizeof(key->ip_h[3]));
-        key->ip_h[0] = key->ip_h[1] = 0;
-        key->ip_h[2] = htonl(0xFFFF);
-        msg->cursor += sizeof(key->ip_h[3]);
+        memcpy(&key.ip_h[3], msg.cursor, sizeof(key.ip_h[3]));
+        key.ip_h[0] = key.ip_h[1] = 0;
+        key.ip_h[2] = htonl(0xFFFF);
+        msg.advance_cursor(sizeof(key.ip_h[3]));
         /* The remainder of the key */
-        memcpy(((uint8_t*)key) + 32, msg->cursor, KEY_SIZE_IP4 - 8);
-        msg->cursor += KEY_SIZE_IP4 - 8;
+        memcpy(((uint8_t*) &key) + 32, msg.cursor, KEY_SIZE_IP4 - 8);
+        msg.advance_cursor(KEY_SIZE_IP4 - 8);
 
         return KEY_SIZE_IP4;
     }
-    else
-        return 0;
+
+    ha_stats.unknown_key_type++;
+    return 0;
 }
 
-static inline uint8_t key_size(Flow* flow)
+static inline uint8_t key_size(Flow& flow)
 {
-    assert(flow->key);
-    return is_ip6_key(flow->key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4;
+    assert(flow.key);
+    return is_ip6_key(flow.key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4;
 }
 
-static uint16_t calculate_msg_header_length(Flow* flow)
+static uint16_t calculate_msg_header_length(Flow& flow)
 {
     return sizeof(HAMessageHeader) + key_size(flow);
 }
 
 // Calculate the UPDATE message content length based on the
 // set of active clients.  The Session client is always present.
-static uint16_t calculate_update_msg_content_length(Flow* flow)
+static uint16_t calculate_update_msg_content_length(Flow& flow, bool full)
 {
-    assert(s_client_map);
-    assert((*s_client_map)[0]);
+    assert(ha->client_map[0]);
 
     uint16_t length = 0;
 
-    for (int i=0; i<s_handle_counter; i++)
+    for (int i = 0; i < ha->handle_counter; i++)
     {
         // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck
-        if ( (i == 0 ) || flow->ha_state->check_pending(1<<(i-1)) )
+        if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1)))
         {
-            assert((*s_client_map)[i]);
-            length += ((*s_client_map)[i]->get_message_size() + sizeof(HAClientHeader));
+            assert(ha->client_map[i]);
+            length += (ha->client_map[i]->get_message_size() + sizeof(HAClientHeader));
         }
     }
 
@@ -308,77 +323,93 @@ static uint16_t calculate_update_msg_content_length(Flow* flow)
 
 // Write the HA header and key sections.  Position the cursor
 // at the beginning of the content section.
-static void write_msg_header(Flow* flow, HAEvent event, uint16_t content_length, HAMessage* msg)
+static void write_msg_header(Flow& flow, HAEvent event, uint16_t content_length, HAMessage& msg)
 {
-    HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
-    hdr->event = (uint8_t)event;
+    HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor;
+    hdr->event = (uint8_t) event;
     hdr->version = HA_MESSAGE_VERSION;
     hdr->total_length = content_length;
-    write_flow_key(flow, msg);  // set cursor to just beyond key
+    msg.advance_cursor(sizeof(HAMessageHeader));
+    hdr->key_type = write_flow_key(flow, msg);
 }
 
-static void write_update_msg_client( FlowHAClient* client, Flow* flow, HAMessage* msg)
+static uint16_t update_msg_header_length(HAMessage& msg)
+{
+    HAMessageHeader* hdr = (HAMessageHeader*) msg.buffer;
+    hdr->total_length = msg.cursor_position();
+    return hdr->total_length;
+}
+
+static void write_update_msg_client(FlowHAClient* client, Flow& flow, HAMessage& msg)
 {
     assert(client);
-    assert(msg);
 
-    client->place(msg,(uint8_t*)&(client->header),(uint8_t)sizeof(client->header));
-    client->produce(flow, msg);
+    if (!msg.fits(sizeof(HAClientHeader)))
+        return;
+
+    // Preemptively insert the client header.  If production fails, roll back the message cursor
+    // to its original position.
+    uint8_t* original_cursor = msg.cursor;
+    HAClientHeader* header = (HAClientHeader*) original_cursor;
+    header->client = client->index;
+    msg.advance_cursor(sizeof(HAClientHeader));
+    if (!client->produce(flow, msg))
+    {
+        msg.reset_cursor(original_cursor);
+        return;
+    }
+    assert(msg.cursor >= (original_cursor + sizeof(HAClientHeader)));
+    header->length = (uint32_t) (msg.cursor - original_cursor - sizeof(HAClientHeader));
 }
 
-static void write_update_msg_content(Flow* flow, HAMessage* msg)
+static void write_update_msg_content(Flow& flow, HAMessage& msg, bool full)
 {
-    assert(s_client_map);
-
-    for ( int i=0; i<s_handle_counter; i++ )
+    for (int i = 0; i < ha->handle_counter; i++)
     {
         // Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck
-        if ( (i == 0) || flow->ha_state->check_pending(1<<(i-1)) )
-            write_update_msg_client((*s_client_map)[i],flow, msg);
+        if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1)))
+            write_update_msg_client(ha->client_map[i], flow, msg);
     }
 }
 
-static void consume_receive_delete_message(HAMessage* msg)
+static void consume_ha_delete_message(HAMessage&, const FlowKey& key)
 {
-    FlowKey key;
-    (void)read_flow_key(&key, msg);
     Stream::delete_flow(&key);
 }
 
-static void consume_receive_update_message(HAMessage* msg)
+static Flow* consume_ha_update_message(HAMessage& msg, const FlowKey& key)
 {
-    FlowKey key;
-    (void)read_flow_key(&key, msg);
     // flow will be nullptr if/when the session does not exist in the caches
     Flow* flow = Stream::get_flow(&key);
+    if (!flow)
+        ha_stats.update_msgs_recv_no_flow++;
 
-    assert(s_client_map);
-
-    // pointer to the last byte in the message
-    uint8_t* content_end = msg->content() + msg->content_length() - 1;
+    // pointer to one past the last byte in the message
+    const uint8_t* content_end = msg.buffer + msg.buffer_length;
 
-    while( msg->cursor <= content_end )
+    while (msg.cursor < content_end)
     {
         // do we have sufficient message left to be able to have an HAClientHeader?
-        if ( (int)(content_end - msg->cursor + 1) < (int)sizeof( HAClientHeader ) )
+        if (!msg.fits(sizeof(HAClientHeader)))
         {
             ErrorMessage("Consuming HA Update message - no HAClientHeader\n");
+            ha_stats.truncated_msgs++;
             break;
         }
 
-        HAClientHeader* header = (HAClientHeader*)msg->cursor;
-        msg->cursor += sizeof( HAClientHeader ); // step to the client content
-
-        if ( (header->client >= s_handle_counter) ||
-            ((*s_client_map)[header->client] == nullptr)  )
+        HAClientHeader* header = (HAClientHeader*) msg.cursor;
+        if ((header->client >= ha->handle_counter) || (ha->client_map[header->client] == nullptr))
         {
             ErrorMessage("Consuming HA Update message - invalid client index\n");
+            ha_stats.unknown_client_idx++;
             break;
         }
+        msg.advance_cursor(sizeof(HAClientHeader)); // step to the client content
 
-        if ( (content_end - msg->cursor + 1) < header->length )
+        if (!msg.fits(header->length))
         {
             ErrorMessage("Consuming HA Update message - message too short\n");
+            ha_stats.truncated_msgs++;
             break;
         }
 
@@ -387,121 +418,172 @@ static void consume_receive_update_message(HAMessage* msg)
         // client is always the first segment of the message, the consume()
         // invocation for the session client will create the flow.  This
         // flow can in turn be used by subsequent FlowHAClient's.
-        if ( !(*s_client_map)[header->client]->consume(flow,&key,msg) )
+        if (!ha->client_map[header->client]->consume(flow, &key, msg, header->length))
         {
             ErrorMessage("Consuming HA Update message - error from client consume()\n");
+            ha_stats.client_consume_errors++;
             break;
         }
     }
+
+    if (msg.cursor == content_end)
+        ha_stats.update_msgs_consumed++;
+
+    return flow;
 }
 
-static void consume_receive_message(HAMessage* msg)
+static Flow* consume_ha_message(HAMessage& msg)
 {
-    HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
+    ha_stats.msgs_recv++;
 
-    if ( hdr->version != HA_MESSAGE_VERSION)
-        return;
+    if (!msg.fits(sizeof(HAMessageHeader)))
+    {
+        ha_stats.truncated_msgs++;
+        return nullptr;
+    }
 
-    switch ( hdr->event )
+    const HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor;
+
+    if (hdr->version != HA_MESSAGE_VERSION)
+    {
+        ha_stats.msg_version_mismatch++;
+        return nullptr;
+    }
+
+    if (hdr->total_length != msg.buffer_length)
+    {
+        ha_stats.msg_length_mismatch++;
+        return nullptr;
+    }
+
+    msg.advance_cursor(sizeof(HAMessageHeader));
+
+    FlowKey key;
+    if (read_flow_key(msg, hdr, key) == 0)
+        return nullptr;
+
+    Flow* flow = nullptr;
+    switch (hdr->event)
     {
         case HA_DELETE_EVENT:
         {
-            consume_receive_delete_message(msg);
+            consume_ha_delete_message(msg, key);
+            ha_stats.delete_msgs_consumed++;
             break;
         }
         case HA_UPDATE_EVENT:
         {
-            consume_receive_update_message(msg);
+            flow = consume_ha_update_message(msg, key);
+            ha_stats.update_msgs_recv++;
             break;
         }
-        default:
-            break;
     }
+
+    return flow;
+}
+
+static void ha_sc_receive_handler(SCMessage* sc_msg)
+{
+    assert(sc_msg);
+
+    // SC received messages must have reference back to SideChannel object
+    assert(sc_msg->sc);
+
+    HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
+    consume_ha_message(ha_msg);
+
+    sc_msg->sc->discard_message(sc_msg);
 }
 
-HighAvailability::HighAvailability(PortBitSet* ports, bool)
+HighAvailability::HighAvailability(PortBitSet* ports, bool daq_channel)
 {
     using namespace std::placeholders;
 
-    // If we have ports, configure the side channel
-    if ( ports != nullptr )
+    // If side channel ports were configured, find the first matching side channel to associate with
+    if (ports != nullptr)
     {
-        for ( SCPort port = 0; port < ports->size(); port++ )
-            if ( ports->test(port) )
+        for (SCPort port = 0; port < ports->size(); port++)
+        {
+            if (!ports->test(port))
+                continue;
+
+            sc = SideChannelManager::get_side_channel(port);
+            if (sc)
             {
-                sc = SideChannelManager::get_side_channel(port);
-                if (sc)
+                // We require a duplex channel
+                if (sc->get_direction() != Connector::CONN_DUPLEX)
                 {
-                    // We need a duplex channel
-                    if (sc->get_direction() != Connector::CONN_DUPLEX)
-                    {
-                        // Otherwise indicate that we don't have a sidechannel
-                        sc = nullptr;
-                        break;
-                    }
-                    sc->set_default_port(port);
-                    sc->register_receive_handler(
-                        std::bind(&HighAvailability::receive_handler, this, _1));
+                    sc = nullptr;
+                    continue;
                 }
-                break;
+                sc->set_default_port(port);
+                sc->register_receive_handler(ha_sc_receive_handler);
             }
+            break;
+        }
     }
-    s_client_map = new ClientMap;
-    for ( int i=0; i<MAX_CLIENTS; i++ )
-        (*s_client_map)[i] = nullptr;
-
-    // Only looking for side channel processing - FIXIT-H
+    use_daq_channel = daq_channel;
 }
 
 HighAvailability::~HighAvailability()
 {
-    if ( sc )
-    {
+    if (sc)
         sc->unregister_receive_handler();
-    }
-
-    delete s_client_map;
 }
 
-void HighAvailability::receive_handler(SCMessage* sc_msg)
+static void send_sc_update_message(Flow& flow, SideChannel& sc)
 {
+    const uint16_t header_len = calculate_msg_header_length(flow);
+    const uint16_t content_len = calculate_update_msg_content_length(flow, false);
+
+    SCMessage* sc_msg = sc.alloc_transmit_message((uint32_t) (header_len + content_len));
     assert(sc_msg);
+    HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
 
-    // SC received messages must have reference back to SideChannel object
-    assert(sc_msg->sc);
+    write_msg_header(flow, HA_UPDATE_EVENT, header_len + content_len, ha_msg);
+    write_update_msg_content(flow, ha_msg, false);
+    update_msg_header_length(ha_msg);
+    sc.transmit_message(sc_msg);
+}
+
+static void send_daq_update_message(Flow& flow, Packet& p)
+{
+    static THREAD_LOCAL uint8_t daq_io_buffer[UINT16_MAX];
 
-    HAMessage ha_msg(sc_msg);
-    consume_receive_message(&ha_msg);
+    HAMessage ha_msg(daq_io_buffer, sizeof(daq_io_buffer));
 
-    sc_msg->sc->discard_message(sc_msg);
+    write_msg_header(flow, HA_UPDATE_EVENT, 0, ha_msg);
+    write_update_msg_content(flow, ha_msg, true);
+    uint32_t len = update_msg_header_length(ha_msg);
+
+    DIOCTL_FlowHAState fhs;
+    fhs.msg = p.daq_msg;
+    fhs.data = daq_io_buffer;
+    fhs.length = len;
+
+    p.daq_instance->ioctl(DIOCTL_SET_FLOW_HA_STATE, &fhs, sizeof(fhs));
+
+    ha_stats.daq_stores++;
 }
 
-void HighAvailability::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr)
+void HighAvailability::process_update(Flow* flow, Packet* p)
 {
-    // Only looking for side channel processing - FIXIT-H
-    UNUSED(pkthdr); // until we add DAQ communications channel
-    if ( !sc || !flow )
+    if (!flow)
         return;
 
     // We must have the map array and the session client
-    assert(s_client_map);
-    assert((*s_client_map)[0]);
+    assert(client_map[0]);
 
-    if ( !(*s_client_map)[0]->is_update_required(flow) &&
+    if ( !client_map[0]->is_update_required(flow) &&
         ( !flow->ha_state->check_pending(ALL_CLIENTS) ||
             flow->ha_state->check_any(FlowHAState::NEW) ) )
         return;
 
-    const uint16_t header_len = calculate_msg_header_length(flow);
-    const uint16_t content_len = calculate_update_msg_content_length(flow);
-
-    SCMessage* sc_msg = sc->alloc_transmit_message((uint32_t)(header_len+content_len));
-    assert(sc_msg);
-    HAMessage ha_msg(sc_msg);
+    if (sc)
+        send_sc_update_message(*flow, *sc);
 
-    write_msg_header(flow, HA_UPDATE_EVENT, content_len, &ha_msg);
-    write_update_msg_content(flow, &ha_msg);
-    sc->transmit_message(sc_msg);
+    if (use_daq_channel && p && p->daq_msg)
+        send_daq_update_message(*flow, *p);
 
     flow->ha_state->clear(FlowHAState::NEW | FlowHAState::MODIFIED |
         FlowHAState::MAJOR | FlowHAState::CRITICAL);
@@ -509,95 +591,148 @@ void HighAvailability::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr)
     flow->ha_state->set_next_update();
 }
 
-void HighAvailability::process_deletion(Flow* flow)
+static void send_sc_deletion_message(Flow& flow, SideChannel& sc)
 {
-    // No need to send message if we already have, we are in standby, or
-    // we have just been created and haven't yet sent an update
-    if ( flow->ha_state->check_any(FlowHAState::NEW |
-        FlowHAState::DELETED |
-        FlowHAState::STANDBY))
-        return;
-
-    // Deletion messages only use the side channel
-    if ( !sc )
-        return;
-
     const uint32_t msg_len = calculate_msg_header_length(flow);
-    SCMessage* sc_msg = sc->alloc_transmit_message(msg_len);
-    HAMessage ha_msg(sc_msg);
+    SCMessage* sc_msg = sc.alloc_transmit_message(msg_len);
+    HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
 
     // No content, only header+key
-    write_msg_header(flow, HA_DELETE_EVENT, 0, &ha_msg);
+    write_msg_header(flow, HA_DELETE_EVENT, msg_len, ha_msg);
 
-    sc->transmit_message(sc_msg);
+    sc.transmit_message(sc_msg);
+}
+
+void HighAvailability::process_deletion(Flow& flow)
+{
+    // No need to send message if we already have, we are in standby, or
+    // we have just been created and haven't yet sent an update
+    if (flow.ha_state->check_any(FlowHAState::NEW | FlowHAState::DELETED | FlowHAState::STANDBY))
+        return;
+
+    // Only produce deletion messages when using a side channel
+    if (sc)
+        send_sc_deletion_message(flow, *sc);
 
-    flow->ha_state->add(FlowHAState::DELETED);
+    flow.ha_state->add(FlowHAState::DELETED);
 }
 
 void HighAvailability::process_receive()
 {
-    if ( sc != nullptr )
+    if (sc)
         sc->process(DISPATCH_ALL_RECEIVE);
 }
 
-// Called by the configuration parsing activity in the main thread.
-bool HighAvailabilityManager::instantiate(PortBitSet* mod_ports, bool mod_use_daq_channel,
-        struct timeval* min_session_lifetime, struct timeval* min_sync_interval)
+Flow* HighAvailability::process_daq_import(Packet& p, FlowKey& key)
 {
-    ports = mod_ports;
-    FlowHAState::config_timers(*min_session_lifetime, *min_sync_interval);
-    use_daq_channel = mod_use_daq_channel;
+    Flow* flow = nullptr;
+
+    if (use_daq_channel && p.pkth->flags & DAQ_PKT_FLAG_HA_STATE_AVAIL)
+    {
+        DIOCTL_FlowHAState fhs;
+        fhs.msg = p.daq_msg;
+
+        if (p.daq_instance->ioctl(DIOCTL_GET_FLOW_HA_STATE, &fhs, sizeof(fhs)) == DAQ_SUCCESS)
+        {
+            HAMessage ha_msg(fhs.data, fhs.length);
+            flow = consume_ha_message(ha_msg);
+            ha_stats.daq_imports++;
+            // Validate that the imported flow matches up with the given flow key.
+            if (flow)
+            {
+                if (FlowKey::compare(&key, flow->key, 0) == 0)
+                {
+                    // Clear the standby bit so that we don't immediately trigger a new data store
+                    // FIXIT-L streamline the consume process so this doesn't have to be done here
+                    flow->ha_state->clear(FlowHAState::STANDBY);
+                }
+                else
+                    flow = nullptr;
+            }
+        }
+    }
 
-    return true;
+    return flow;
 }
 
-// Called prior to the starts of configuration in the main thread.
-void HighAvailabilityManager::pre_config_init()
+void HighAvailabilityManager::reset_config()
 {
-    ports = nullptr;
+    if (ports)
+    {
+        delete ports;
+        ports = nullptr;
+    }
+}
+
+void HighAvailabilityManager::term()
+{
+    reset_config();
+}
+
+// Called within the main thread after the initial configuration has been read
+void HighAvailabilityManager::configure(HighAvailabilityConfig* config)
+{
+    if (!config)
+    {
+        reset_config();
+        return;
+    }
+
+    if (config->ports)
+        ports = new PortBitSet(*config->ports);
+    else if (ports)
+    {
+        delete ports;
+        ports = nullptr;
+    }
+
+    FlowHAState::config_timers(config->min_session_lifetime, config->min_sync_interval);
+
+    use_daq_channel = config->daq_channel;
 }
 
 // Called within the packet thread prior to packet processing
 void HighAvailabilityManager::thread_init()
 {
     // create a a thread local instance iff we are configured to operate.
-    if ( (ports != nullptr) || use_daq_channel )
-        ha = new HighAvailability(ports,use_daq_channel);
+    if (ports || use_daq_channel)
+        ha = new HighAvailability(ports, use_daq_channel);
     else
         ha = nullptr;
 }
 
 void HighAvailabilityManager::thread_term_beginning()
 {
-    shutting_down = true;
+    if (ha)
+        ha->shutting_down = true;
 }
 
 // Called in the packet thread at run-down
 void HighAvailabilityManager::thread_term()
 {
-    if ( ha != nullptr )
+    if (ha)
     {
         delete ha;
         ha = nullptr;
     }
 }
 
-void HighAvailabilityManager::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr)
+void HighAvailabilityManager::process_update(Flow* flow, Packet* p)
 {
-    if ( (ha != nullptr) && (pkthdr != nullptr) && (flow != nullptr) )
-        ha->process_update(flow,pkthdr);
+    if (ha && p && flow)
+        ha->process_update(flow, p);
 }
 
 // Deletion messages only contain session content
-void HighAvailabilityManager::process_deletion(Flow* flow)
+void HighAvailabilityManager::process_deletion(Flow& flow)
 {
-    if ( (ha != nullptr) && !shutting_down )
+    if (ha && !ha->shutting_down)
         ha->process_deletion(flow);
 }
 
 void HighAvailabilityManager::process_receive()
 {
-    if ( ha != nullptr )
+    if (ha)
         ha->process_receive();
 }
 
@@ -609,14 +744,22 @@ bool HighAvailabilityManager::active()
 
 void HighAvailabilityManager::set_modified(Flow* flow)
 {
-    if ( (ha != nullptr) && (flow != nullptr) && (flow->ha_state != nullptr) )
+    if (ha && flow && flow->ha_state)
         flow->ha_state->add(FlowHAState::MODIFIED);
 }
 
 bool HighAvailabilityManager::in_standby(Flow* flow)
 {
-    if ( (ha != nullptr) && (flow != nullptr) && (flow->ha_state != nullptr) )
+    if (ha && flow && flow->ha_state)
         return flow->ha_state->check_any(FlowHAState::STANDBY);
-    else
-        return false;
+
+    return false;
+}
+
+Flow* HighAvailabilityManager::import(Packet& p, FlowKey& key)
+{
+    if (!ha)
+        return nullptr;
+
+    return ha->process_daq_import(p, key);
 }
index 406b14107b5d0ab7cd7630aa457465f5c155dd95..47525d27940e78e4d5016d27f0bbe8f6dceaa838 100644 (file)
 
 #include <daq_common.h>
 
+#include <cassert>
+
+#include "framework/bits.h"
 #include "main/thread.h"
-#include "side_channel/side_channel.h"
 
 //-------------------------------------------------------------------------
 
+struct HighAvailabilityConfig;
+
 namespace snort
 {
 class Flow;
 struct FlowKey;
-}
+struct Packet;
+struct ProfileStats;
 
 // The FlowHAHandle is the dynamically allocated index used uniquely identify
 //   the client.  Used both in the API and HA messages.
 // Handle 0 is defined to be the primary session client.
 // NOTE: The type, masks, and count values must be in sync,
 typedef uint16_t FlowHAClientHandle;
-const FlowHAClientHandle SESSION_HA_CLIENT = 0x0000;
-const uint8_t SESSION_HA_CLIENT_INDEX = 0;
-const FlowHAClientHandle ALL_CLIENTS = 0xffff;
-// One client for each mask bit plus one 'automatic' session client
-//   client handle = (1<<(client_index-1)
-//   session client has handle of 0 and index of 0
-const uint8_t MAX_CLIENTS = 17;
-
-enum HAEvent
-{
-    HA_DELETE_EVENT = 1,
-    HA_UPDATE_EVENT = 2
-};
+constexpr FlowHAClientHandle ALL_CLIENTS = 0xffff;
 
 // Each active flow will have an associated FlowHAState instance.
-class FlowHAState
+class SO_PUBLIC FlowHAState
 {
 public:
-    static const uint8_t CRITICAL = 0x80;
-    static const uint8_t MAJOR = 0x40;
-
-    static const uint8_t NEW = 0x01;
-    static const uint8_t MODIFIED = 0x02;
-    static const uint8_t DELETED = 0x04;
-    static const uint8_t STANDBY = 0x08;
-    static const uint8_t NEW_SESSION = 0x10;
+    static constexpr uint8_t CRITICAL = 0x80;
+    static constexpr uint8_t MAJOR = 0x40;
+
+    enum : uint8_t
+    {
+        NEW = 0x01,
+        MODIFIED = 0x02,
+        DELETED = 0x04,
+        STANDBY = 0x08,
+        NEW_SESSION = 0x10,
+    };
 
     FlowHAState();
 
@@ -74,123 +70,108 @@ public:
     void add(uint8_t state);
     void clear(uint8_t state);
     bool check_any(uint8_t state);
-    static void config_timers(timeval,timeval);
+    static void config_timers(struct timeval, struct timeval);
     bool sync_interval_elapsed();
     void set_next_update();
     void reset();
 
 private:
-    static const uint8_t INITIAL_STATE = 0x00;
-    static const uint16_t NONE_PENDING = 0x0000;
-
+    static constexpr uint8_t INITIAL_STATE = 0x00;
+    static constexpr uint16_t NONE_PENDING = 0x0000;
     static struct timeval min_session_lifetime;
     static struct timeval min_sync_interval;
-    uint8_t state;
-    uint16_t pending;
-    struct timeval next_update;
-};
-
-struct __attribute__((__packed__)) HAMessageHeader
-{
-    uint8_t event;
-    uint8_t version;
-    uint16_t total_length;
-    uint8_t key_type;
-};
 
-struct __attribute__((__packed__)) HAClientHeader
-{
-    uint8_t client;
-    uint8_t length;
+    struct timeval next_update;
+    uint16_t pending;
+    uint8_t state;
 };
 
 // Describe the message being produced or consumed.
 class HAMessage
 {
 public:
-    HAMessage(SCMessage* msg)
-    { sc_msg = msg; }
-
-    uint8_t* content()
-    { return sc_msg->content; }
-    uint16_t content_length()
-    { return sc_msg->content_length; }
+    HAMessage(uint8_t* buffer, uint32_t buffer_length) :
+        buffer(buffer), buffer_length(buffer_length), cursor(buffer) { }
+
+    bool fits(uint32_t size) const { return size <= (buffer_length - (cursor - buffer)); }
+
+    void advance_cursor(uint32_t size)
+    {
+        assert(fits(size));
+        cursor += size;
+    }
+
+    void reset_cursor(uint8_t* pos = nullptr)
+    {
+        if (pos)
+        {
+            assert(pos >= buffer && pos <= (buffer + buffer_length));
+            cursor = pos;
+        }
+        else
+            cursor = buffer;
+    }
+
+    uint32_t cursor_position() const { return (uint32_t) (cursor - buffer); }
+
+    uint8_t* buffer;
+    const uint32_t buffer_length;
     uint8_t* cursor;
-
-private:
-    SCMessage* sc_msg;
 };
 
 // A FlowHAClient subclass for each producer/consumer of flow HA data
-class FlowHAClient
+class SO_PUBLIC FlowHAClient
 {
 public:
     virtual ~FlowHAClient() = default;
-    virtual bool consume(snort::Flow*&, snort::FlowKey*, HAMessage*) { return false; }
-    virtual bool produce(snort::Flow*, HAMessage*) { return false; }
+    virtual bool consume(snort::Flow*&, const snort::FlowKey*, snort::HAMessage&, uint8_t size) = 0;
+    virtual bool produce(snort::Flow&, snort::HAMessage&) = 0;
     virtual bool is_update_required(snort::Flow*) { return false; }
-    virtual bool is_delete_required(snort::Flow*) { return false; }
-    uint8_t get_message_size() { return header.length; }
-    bool fit(HAMessage*, uint8_t);
-    bool place(HAMessage*, uint8_t*, uint8_t);
+    uint8_t get_message_size() { return max_length; }
+
     FlowHAClientHandle handle;  // Actual handle for the instance
-    HAClientHeader header;
+    uint8_t index;
+    uint8_t max_length;
 
 protected:
-    FlowHAClient(uint8_t, bool);
-
-};
-
-// HighAvailability is instantiated for each packet-thread.
-// FIXIT-M make the SideChannel the THREAD_LOCAL element and collapse
-//  into HighAvailabilityManager
-class HighAvailability
-{
-public:
-    HighAvailability(PortBitSet*,bool);
-    ~HighAvailability();
-
-    void process_update(snort::Flow*, const DAQ_PktHdr_t*);
-    void process_deletion(snort::Flow*);
-    void process_receive();
-
-private:
-    void receive_handler(SCMessage*);
-    SideChannel* sc = nullptr;
+    FlowHAClient(uint8_t length, bool session_client);
 };
 
 // Top level management of HighAvailability components.
-class HighAvailabilityManager
+class SO_PUBLIC HighAvailabilityManager
 {
 public:
-    // Prior to parsing configuration
-    static void pre_config_init();
-
-    // Invoked by the module configuration parsing to create HA instance
-    static bool instantiate(PortBitSet*,bool,struct timeval*,struct timeval*);
+    static void configure(HighAvailabilityConfig*);
     static void thread_init();
     static void thread_term_beginning(); // thread is about to be terminated
     static void thread_term();
+    static void term();
 
     // true if we are configured and able to process
     static bool active();
 
-    // Within the packet callback, analyze the packet and flow for potential update messages
-    static void process_update(snort::Flow*, const DAQ_PktHdr_t*);
+    // Within packet processing, analyze the packet and flow for potential update messages
+    static void process_update(snort::Flow*, snort::Packet*);
 
     // Anytime a flow is deleted, potentially generate a deletion message
-    static void process_deletion(snort::Flow*);
+    static void process_deletion(snort::Flow&);
 
     // Look for and dispatch receive messages.
     static void process_receive();
     static void set_modified(snort::Flow*);
     static bool in_standby(snort::Flow*);
 
+    // Attempt to import HA data from the Packet
+    static Flow* import(snort::Packet& p, snort::FlowKey& key);
+
 private:
+    static void reset_config();
+
     HighAvailabilityManager() = delete;
     static bool use_daq_channel;
     static PortBitSet* ports;
-    static THREAD_LOCAL bool shutting_down;
 };
+}
+
 #endif
 
index f9f6ba661ec5ff81ca6358e5b7ee9a23fd3b9cae..fb50170365bb1967f66ea0dafc64418772599ed7 100644 (file)
@@ -27,8 +27,8 @@
 #include <cmath>
 
 #include "log/messages.h"
-
-#include "ha.h"
+#include "main/snort_config.h"
+#include "profiler/profiler_defs.h"
 
 using namespace snort;
 
@@ -48,14 +48,37 @@ static const Parameter ha_params[] =
       "side channel message port list" },
 
     { "min_age", Parameter::PT_REAL, "0.0:100.0", "1.0",
-      "minimum session life before HA updates" },
+      "minimum session life in seconds before HA updates" },
 
-    { "min_sync", Parameter::PT_REAL, "0.0:100.0", "1.0",
-      "minimum interval between HA updates" },
+    { "min_sync", Parameter::PT_REAL, "0.0:100.0", "0.1",
+      "minimum interval in seconds between HA updates" },
 
     { nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
 };
 
+static const PegInfo ha_pegs[] =
+{
+    { CountType::SUM, "msgs_recv", "total messages received" },
+    { CountType::SUM, "update_msgs_recv", "update messages received" },
+    { CountType::SUM, "update_msgs_recv_no_flow", "update messages received without a local flow" },
+    { CountType::SUM, "update_msgs_consumed", "update messages fully consumed" },
+    { CountType::SUM, "delete_msgs_consumed", "deletion messages consumed" },
+    { CountType::SUM, "daq_stores", "states stored via daq" },
+    { CountType::SUM, "daq_imports", "states imported via daq" },
+    { CountType::SUM, "msg_version_mismatch", "messages received with a version mismatch" },
+    { CountType::SUM, "msg_length_mismatch", "messages received with an inconsistent total length" },
+    { CountType::SUM, "truncated_msgs", "truncated messages received" },
+    { CountType::SUM, "unknown_key_type", "messages received with an unknown flow key type" },
+    { CountType::SUM, "unknown_client_idx", "messages received with an unknown client index" },
+    { CountType::SUM, "client_consume_errors", "client data consume failure count" },
+    { CountType::END, nullptr, nullptr }
+};
+
+THREAD_LOCAL HAStats ha_stats;
+THREAD_LOCAL ProfileStats ha_perf_stats;
+
+//-------------------------------------------------------------------------
+
 static void convert_real_seconds_to_timeval(double seconds, struct timeval* tv)
 {
     double whole = trunc(seconds);
@@ -67,42 +90,61 @@ static void convert_real_seconds_to_timeval(double seconds, struct timeval* tv)
 HighAvailabilityModule::HighAvailabilityModule() :
     Module(HA_NAME, HA_HELP, ha_params)
 {
-    config.enabled = false;
-    config.daq_channel = false;
-    config.ports = nullptr;
-    convert_real_seconds_to_timeval(1.0, &config.min_session_lifetime);
-    convert_real_seconds_to_timeval(0.1, &config.min_sync_interval);
+    config = nullptr;
 }
 
 HighAvailabilityModule::~HighAvailabilityModule()
 {
-    delete config.ports;
+    if (config)
+        delete config;
+}
+
+const PegInfo* HighAvailabilityModule::get_pegs() const
+{
+    return ha_pegs;
+}
+
+PegCount* HighAvailabilityModule::get_counts() const
+{
+    return (PegCount*) &ha_stats;
 }
 
 ProfileStats* HighAvailabilityModule::get_profile() const
-{ return &ha_perf_stats; }
+{
+    return &ha_perf_stats;
+}
+
+bool HighAvailabilityModule::begin(const char*, int, SnortConfig*)
+{
+    assert(!config);
+    config = new HighAvailabilityConfig();
+
+    return true;
+}
 
 bool HighAvailabilityModule::set(const char*, Value& v, SnortConfig*)
 {
     if ( v.is("enable") )
-        config.enabled = v.get_bool();
-
+    {
+        config->enabled = v.get_bool();
+    }
     else if ( v.is("daq_channel") )
-        config.daq_channel = v.get_bool();
-
+    {
+        config->daq_channel = v.get_bool();
+    }
     else if ( v.is("ports") )
     {
-        if ( !config.ports )
-            config.ports = new PortBitSet;
-        v.get_bits(*(config.ports) );
+        if ( !config->ports )
+            config->ports = new PortBitSet;
+        v.get_bits(*(config->ports));
     }
     else if ( v.is("min_age") )
     {
-        convert_real_seconds_to_timeval(v.get_real(), &config.min_session_lifetime);
+        convert_real_seconds_to_timeval(v.get_real(), &config->min_session_lifetime);
     }
     else if ( v.is("min_sync") )
     {
-        convert_real_seconds_to_timeval(v.get_real(), &config.min_sync_interval);
+        convert_real_seconds_to_timeval(v.get_real(), &config->min_sync_interval);
     }
     else
         return false;
@@ -110,20 +152,13 @@ bool HighAvailabilityModule::set(const char*, Value& v, SnortConfig*)
     return true;
 }
 
-bool HighAvailabilityModule::begin(const char*, int, SnortConfig*)
-{
-    return true;
-}
-
-bool HighAvailabilityModule::end(const char*, int, SnortConfig*)
+bool HighAvailabilityModule::end(const char*, int, SnortConfig* sc)
 {
-    if ( config.enabled &&
-        !HighAvailabilityManager::instantiate(config.ports, config.daq_channel,
-                        &config.min_session_lifetime, &config.min_sync_interval) )
-    {
-        ParseWarning(WARN_CONF, "Illegal HighAvailability configuration");
-        return false;
-    }
+    if ( config->enabled )
+        sc->ha_config = config;
+    else
+        delete config;
+    config = nullptr;
 
     return true;
 }
index 65256e0bc90bdaca51f3d3dc348da3728eb4c8fc..6b5a62a8f85771449f5168fff21cf4b71fc93447 100644 (file)
@@ -31,6 +31,8 @@
 
 struct HighAvailabilityConfig
 {
+    ~HighAvailabilityConfig() { delete ports; }
+
     bool enabled;
     bool daq_channel;
     PortBitSet* ports = nullptr;
@@ -38,24 +40,18 @@ struct HighAvailabilityConfig
     struct timeval min_sync_interval;
 };
 
-extern THREAD_LOCAL SimpleStats ha_stats;
-extern THREAD_LOCAL snort::ProfileStats ha_perf_stats;
-
 class HighAvailabilityModule : public snort::Module
 {
 public:
     HighAvailabilityModule();
     ~HighAvailabilityModule() override;
 
-    bool set(const char*, snort::Value&, snort::SnortConfig*) override;
     bool begin(const char*, int, snort::SnortConfig*) override;
+    bool set(const char*, snort::Value&, snort::SnortConfig*) override;
     bool end(const char*, int, snort::SnortConfig*) override;
 
-    PegCount* get_counts() const override
-    { return (PegCount*)&ha_stats; }
-
-    const PegInfo* get_pegs() const override
-    { return snort::simple_pegs; }
+    const PegInfo* get_pegs() const override;
+    PegCount* get_counts() const override;
 
     snort::ProfileStats* get_profile() const override;
 
@@ -63,8 +59,28 @@ public:
     { return GLOBAL; }
 
 private:
-    HighAvailabilityConfig config;
+    HighAvailabilityConfig* config;
 };
 
+struct HAStats
+{
+    PegCount msgs_recv;
+    PegCount update_msgs_recv;
+    PegCount update_msgs_recv_no_flow;
+    PegCount update_msgs_consumed;
+    PegCount delete_msgs_consumed;
+    PegCount daq_stores;
+    PegCount daq_imports;
+    PegCount msg_version_mismatch;
+    PegCount msg_length_mismatch;
+    PegCount truncated_msgs;
+    PegCount unknown_key_type;
+    PegCount unknown_client_idx;
+    PegCount client_consume_errors;
+};
+
+extern THREAD_LOCAL HAStats ha_stats;
+extern THREAD_LOCAL snort::ProfileStats ha_perf_stats;
+
 #endif
 
index a94b730cc4f721d2e11f74bcbfc20418607e4895..819801eb96bee5533fccca75018117d218b58041 100644 (file)
@@ -1,15 +1,4 @@
-add_cpputest( ha_test
-    SOURCES ../ha.cc
-)
-
-add_cpputest( ha_module_test
-    SOURCES
-        ../ha_module.cc
-        ../../framework/module.cc
-        ../../framework/value.cc
-        ../../sfip/sf_ip.cc
-        $<TARGET_OBJECTS:catch_tests>
-)
+add_cpputest( ha_test )
 
 add_cpputest( flow_stash_test
     SOURCES ../flow_stash.cc
index 5fde609deceedc94ff2102a9760508e7dd9672c1..74f554e6db417344f9aa74f8acf8b99ddacf6f11 100644 (file)
@@ -94,13 +94,13 @@ void DataBus::subscribe(const char* key, DataHandler* h)
 {
     DB->_subscribe(key, h);
 }
-void DataBus::subscribe_default(const char* key, DataHandler* h, SnortConfig* sc)
+void DataBus::subscribe_default(const char* key, DataHandler* h, SnortConfig*)
 {
     DB->_subscribe(key, h);
 }
 
 void DataBus::unsubscribe(const char*, DataHandler*) {}
-void DataBus::unsubscribe_default(const char*, DataHandler*, SnortConfig* sc) {}
+void DataBus::unsubscribe_default(const char*, DataHandler*, SnortConfig*) {}
 
 void DataBus::publish(const char* key, DataEvent& e, Flow* f)
 {
diff --git a/src/flow/test/ha_module_test.cc b/src/flow/test/ha_module_test.cc
deleted file mode 100644 (file)
index 81b7066..0000000
+++ /dev/null
@@ -1,177 +0,0 @@
-//--------------------------------------------------------------------------
-// Copyright (C) 2015-2019 Cisco and/or its affiliates. All rights reserved.
-//
-// This program is free software; you can redistribute it and/or modify it
-// under the terms of the GNU General Public License Version 2 as published
-// by the Free Software Foundation.  You may not use, modify or distribute
-// this program under any other version of the GNU General Public License.
-//
-// This program is distributed in the hope that it will be useful, but
-// WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-// General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along
-// with this program; if not, write to the Free Software Foundation, Inc.,
-// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
-//--------------------------------------------------------------------------
-
-// ha_module_test.cc author Ed Borgoyn <eborgoyn@cisco.com>
-// unit test main
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "flow/ha.h"
-#include "flow/ha_module.h"
-
-#include "log/messages.h"
-#include "main/snort_debug.h"
-#include "profiler/profiler.h"
-
-#include <CppUTest/CommandLineTestRunner.h>
-#include <CppUTest/TestHarness.h>
-
-using namespace snort;
-
-THREAD_LOCAL SimpleStats ha_stats;
-THREAD_LOCAL ProfileStats ha_perf_stats;
-
-void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { }
-void show_stats(PegCount*, const PegInfo*, IndexVec&, const char*) { }
-void show_stats(PegCount*, const PegInfo*, IndexVec&, const char*, FILE*) { }
-
-namespace snort
-{
-void LogMessage(const char*,...) { }
-void ParseWarning(WarningGroup, const char*, ...) { }
-char* snort_strdup(const char* str) { return strdup(str); }
-}
-
-static bool s_port_1_set = false;
-static bool s_use_daq = false;
-static bool s_instantiate_called = false;
-
-static char* make_bit_string(int bit)
-{
-    static char bit_string[65537];
-    for ( int i=0; i<65536; i++)
-        bit_string[i] = (bit == i) ? '1' : '0';
-
-    bit_string[65536] = '\0';
-
-    return bit_string;
-}
-
-bool HighAvailabilityManager::instantiate(PortBitSet* mod_ports, bool mod_use_daq_channel, struct timeval*, struct timeval*)
-{
-    s_instantiate_called = true;
-    s_port_1_set = mod_ports->test(1);
-    s_use_daq = mod_use_daq_channel;
-
-    return true;
-}
-
-TEST_GROUP(high_availability_module_test)
-{
-    void setup() override
-    {
-    }
-
-    void teardown() override
-    {
-    }
-};
-
-TEST(high_availability_module_test, test_ha_valid)
-{
-    Value ports_val(make_bit_string(1));
-    Value enable_val(true);
-    Value min_age_val(1.0);
-    Value min_sync_val(0.1);
-    Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"};
-    Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr };
-    Parameter min_age_param = {"min_age", Parameter::PT_REAL, nullptr, "1.0", nullptr };
-    Parameter min_sync_param = {"min_sync", Parameter::PT_REAL, nullptr, "0.1", nullptr };
-
-    HighAvailabilityModule module;
-
-    ports_val.set(&ports_param);
-    enable_val.set(&enable_param);
-    min_age_val.set(&min_age_param);
-    min_sync_val.set(&min_sync_param);
-
-    s_instantiate_called = false;
-    s_port_1_set = false;
-    s_use_daq = false;
-
-    module.begin("high_availability", 0, nullptr);
-    module.set("high_availability.ports", ports_val, nullptr);
-    module.set("high_availability.enable", enable_val, nullptr);
-    module.set("high_availability.min_age", min_age_val, nullptr);
-    module.set("high_availability.min_sync", min_sync_val, nullptr);
-    module.end("high_availability", 0, nullptr);
-
-    CHECK(s_instantiate_called == true);
-    CHECK(s_port_1_set == true);
-    CHECK(s_use_daq == false);
-}
-
-TEST(high_availability_module_test, test_ha_disabled)
-{
-    Value enable_val(false);
-    Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr };
-
-    HighAvailabilityModule module;
-
-    enable_val.set(&enable_param);
-
-    s_instantiate_called = false;
-    s_port_1_set = false;
-    s_use_daq = false;
-
-    module.begin("high_availability", 0, nullptr);
-    module.set("high_availability.enable", enable_val, nullptr);
-    module.end("high_availability", 0, nullptr);
-
-    CHECK(s_instantiate_called == false);
-    CHECK(s_port_1_set == false);
-    CHECK(s_use_daq == false);
-}
-
-TEST(high_availability_module_test, test_ha_valid_daq)
-{
-    Value ports_val(make_bit_string(1));
-    Value enable_val(true);
-    Value daq_val(true);
-    Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"};
-    Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr };
-    Parameter daq_param = {"daq_channel", Parameter::PT_BOOL, nullptr, "false", nullptr };
-
-    HighAvailabilityModule module;
-
-    ports_val.set(&ports_param);
-    enable_val.set(&enable_param);
-    daq_val.set(&daq_param);
-
-    s_instantiate_called = false;
-    s_port_1_set = false;
-    s_use_daq = false;
-
-    module.begin("high_availability", 0, nullptr);
-    module.set("high_availability.ports", ports_val, nullptr);
-    module.set("high_availability.enable", enable_val, nullptr);
-    module.set("high_availability.daq_channel", daq_val, nullptr);
-    module.end("high_availability", 0, nullptr);
-
-    CHECK(s_instantiate_called == true);
-    CHECK(s_port_1_set == true);
-    CHECK(s_use_daq == true);
-}
-
-int main(int argc, char** argv)
-{
-    return CommandLineTestRunner::RunAllTests(argc, argv);
-}
-
index 44ea78ab00607848784f7d34967fa9b77dec077a..7e1f8baf0727fb3a660ba9a0b74a8dba5b3c5bfe 100644 (file)
 // 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 //--------------------------------------------------------------------------
 
-// ha_test.cc author Ed Borgoyn <eborgoyn@cisco.com>
+// ha_test.cc authors Ed Borgoyn <eborgoyn@cisco.com>, Michael Altizer <mialtize@cisco.com>
 // unit test main
 
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 
-#include "flow/ha.h"
-
-#include "flow/flow.h"
-#include "flow/flow_key.h"
-#include "main/snort_debug.h"
-#include "stream/stream.h"
+#include "flow/ha.cc"
 
 #include <CppUTest/CommandLineTestRunner.h>
 #include <CppUTest/TestHarness.h>
@@ -40,31 +35,53 @@ using namespace snort;
 
 class StreamHAClient;
 
-static const uint8_t s_test_key[] =
+static const FlowKey s_test_key =
 {
-TEST_KEY
+    { 1, 2, 3, 4 },
+    { 5, 6, 7, 8 },
+    9,
+    10,
+    11,
+    12,
+    13,
+    14,
+    PktType::TCP,
+    14,
+    0,
 };
 
-static const uint8_t s_delete_message[] =
+static struct __attribute__((__packed__)) TestDeleteMessage {
+    HAMessageHeader mhdr;
+    FlowKey key;
+} s_delete_message =
 {
-    0x01,
-    0x03,
-    0x00,
-    0x00,
-    0x01,
-TEST_KEY
+    {
+        HA_DELETE_EVENT,
+        HA_MESSAGE_VERSION,
+        0x35,
+        KEY_TYPE_IP6
+    },
+    s_test_key
 };
 
-static const uint8_t s_update_stream_message[] =
+static struct __attribute__((__packed__)) TestUpdateMessage {
+    HAMessageHeader mhdr;
+    FlowKey key;
+    HAClientHeader schdr;
+    uint8_t scmsg[10];
+} s_update_stream_message =
 {
-    0x02,
-    0x03,
-    0x00,
-    0x00,
-    0x01,
-TEST_KEY,
-    0x00,
-    10,
+    {
+        HA_UPDATE_EVENT,
+        HA_MESSAGE_VERSION,
+        0x41,
+        KEY_TYPE_IP6
+    },
+    s_test_key,
+    {
+        0,
+        10
+    },
     0, 1, 2, 3, 4, 5, 6, 7, 8, 9
 };
 
@@ -75,7 +92,9 @@ static SideChannel s_side_channel;
 static SCMessage s_sc_message;
 static SCMessage s_rec_sc_message;
 static bool s_stream_consume_called = false;
+static uint8_t s_stream_consume_size = 0;
 static bool s_other_consume_called = false;
+static uint8_t s_other_consume_size = 0;
 static bool s_get_session_called = false;
 static bool s_delete_session_called = false;
 static bool s_transmit_message_called = false;
@@ -85,7 +104,7 @@ static uint8_t* s_message_content = nullptr;
 static uint8_t s_message_length = 0;
 static Flow s_flow;
 static FlowKey s_flowkey;
-static DAQ_PktHdr_t s_pkthdr;
+static Packet s_pkt;
 static StreamHAClient* s_ha_client;
 static FlowHAClient* s_other_ha_client;
 static std::function<void (SCMessage*)> s_handler = nullptr;
@@ -96,23 +115,33 @@ class StreamHAClient : public FlowHAClient
 public:
     StreamHAClient() : FlowHAClient(10, true) { }
     ~StreamHAClient() override = default;
-    bool consume(Flow*&, FlowKey*, HAMessage*) override
+    bool consume(Flow*&, const FlowKey*, HAMessage& msg, uint8_t size) override
     {
         s_stream_consume_called = true;
+        s_stream_consume_size = size;
+
+        for ( uint8_t i = 0; i < 10; i++ )
+        {
+            if (*msg.cursor != i)
+                return false;
+            msg.advance_cursor(sizeof(*msg.cursor));
+        }
+
         return true;
     }
-    bool produce(Flow*, HAMessage* msg) override
+    bool produce(Flow&, HAMessage& msg) override
     {
-        for ( uint8_t i=0; i<10; i++ )
-            *(msg->cursor)++ = i;
+        if (!msg.fits(10))
+            return false;
+
+        for ( uint8_t i = 0; i < 10; i++ )
+        {
+            *msg.cursor = i;
+            msg.advance_cursor(sizeof(*msg.cursor));
+        }
         return true;
     }
-    uint8_t get_message_size() { return 10; }
     bool is_update_required(Flow*) override { return s_stream_update_required; }
-    bool fit(HAMessage*,uint8_t) { return true; }
-    bool place(HAMessage*,uint8_t*,uint8_t) { return true; }
-
-private:
 };
 
 class OtherHAClient : public FlowHAClient
@@ -120,32 +149,51 @@ class OtherHAClient : public FlowHAClient
 public:
     OtherHAClient() : FlowHAClient(5, false) { }
     ~OtherHAClient() override = default;
-    bool consume(Flow*&, HAMessage*)
+    bool consume(Flow*&, const FlowKey*, HAMessage& msg, uint8_t size) override
     {
         s_other_consume_called = true;
+        s_other_consume_size = size;
+
+        for ( uint8_t i = 0; i < 5; i++ )
+        {
+            if (*msg.cursor != i)
+                return false;
+            msg.advance_cursor(sizeof(*msg.cursor));
+        }
+
         return true;
     }
-    bool produce(Flow*, HAMessage* msg) override
+    bool produce(Flow&, HAMessage& msg) override
     {
-        for ( uint8_t i=0; i<5; i++ )
-            *(msg->cursor)++ = i;
+        if (!msg.fits(5))
+            return false;
+
+        for ( uint8_t i = 0; i < 5; i++ )
+        {
+            *msg.cursor = i;
+            msg.advance_cursor(sizeof(*msg.cursor));
+        }
         return true;
     }
-    uint8_t get_message_size() { return 5; }
     bool is_update_required(Flow*) override { return s_other_update_required; }
-    bool fit(HAMessage*,uint8_t) { return true; }
-    bool place(HAMessage*,uint8_t*,uint8_t) { return true; }
-
-private:
 };
 
-Flow*  Stream::get_flow(const FlowKey* flowkey)
+//-------------------------------------------------------------------------
+// stubs, spies, etc.
+//-------------------------------------------------------------------------
+
+THREAD_LOCAL HAStats ha_stats = { };
+
+Flow* Stream::get_flow(const FlowKey* flowkey)
 {
     s_flowkey = *flowkey;
     s_get_session_called = true;
     return &s_flow;
 }
 
+Packet::Packet(bool) { }
+Packet::~Packet() = default;
+
 void Stream::delete_flow(const FlowKey* flowkey)
 {
     s_flowkey = *flowkey;
@@ -158,10 +206,15 @@ void ErrorMessage(const char*,...) { }
 void LogMessage(const char*,...) { }
 }
 
+int FlowKey::compare(const void*, const void*, size_t) { return 0; }
+
+int SFDAQInstance::ioctl(DAQ_IoctlCmd, void*, size_t) { return DAQ_SUCCESS; }
+
 void packet_gettimeofday(struct timeval* tv)
 { *tv = s_packet_time; }
 
 Flow::Flow() { ha_state = new FlowHAState; key = new FlowKey; }
+Flow::~Flow() { delete key; delete ha_state; }
 
 FlowStash::~FlowStash() { }
 
@@ -205,7 +258,8 @@ bool SideChannel::transmit_message(SCMessage* msg)
     s_transmit_message_called = true;
     s_message_content = msg->content;
     s_message_length = msg->content_length;
-    return true; }
+    return true;
+}
 
 SCMessage* SideChannel::alloc_transmit_message(uint32_t len)
 {
@@ -217,22 +271,22 @@ SCMessage* SideChannel::alloc_transmit_message(uint32_t len)
     return &s_sc_message;
 }
 
+//-------------------------------------------------------------------------
+// tests
+//-------------------------------------------------------------------------
+
 TEST_GROUP(high_availability_manager_test)
 {
-    void setup() override
-    {
-        MemoryLeakWarningPlugin::turnOffNewDeleteOverloads();
-    }
-
     void teardown() override
     {
-        MemoryLeakWarningPlugin::turnOnNewDeleteOverloads();
+        HighAvailabilityManager::term();
     }
 };
 
 TEST(high_availability_manager_test, init_term)
 {
-    HighAvailabilityManager::pre_config_init();
+    HighAvailabilityConfig hac = { };
+    HighAvailabilityManager::configure(&hac);
     HighAvailabilityManager::thread_init();
     CHECK(HighAvailabilityManager::active()==false);
     HighAvailabilityManager::thread_term();
@@ -241,12 +295,15 @@ TEST(high_availability_manager_test, init_term)
 
 TEST(high_availability_manager_test, inst_init_term)
 {
-    HighAvailabilityManager::pre_config_init();
-    PortBitSet port_set;
-    port_set.set(1);
-    struct timeval age = { 1, 0 };
-    struct timeval interval = { 0, 500000 };
-    HighAvailabilityManager::instantiate(&port_set, false, &age, &interval);
+    HighAvailabilityConfig hac;
+    hac.enabled = true;
+    hac.daq_channel = false;
+    hac.ports = new PortBitSet();
+    hac.ports->set(1);
+    hac.min_session_lifetime = { 1, 0 };
+    hac.min_sync_interval = { 0, 500000 };
+
+    HighAvailabilityManager::configure(&hac);
     HighAvailabilityManager::thread_init();
     s_ha_client = new StreamHAClient;
     CHECK(HighAvailabilityManager::active()==true);
@@ -322,13 +379,17 @@ TEST_GROUP(high_availability_test)
 {
     void setup() override
     {
-        MemoryLeakWarningPlugin::turnOffNewDeleteOverloads();
-        HighAvailabilityManager::pre_config_init();
-        PortBitSet port_set;
-        port_set.set(1);
-        struct timeval age = { 1, 0 };
-        struct timeval interval = { 0, 500000 };
-        HighAvailabilityManager::instantiate(&port_set, false, &age, &interval);
+        memset(&ha_stats, 0, sizeof(ha_stats));
+
+        HighAvailabilityConfig hac;
+        hac.enabled = true;
+        hac.daq_channel = false;
+        hac.ports = new PortBitSet();
+        hac.ports->set(1);
+        hac.min_session_lifetime = { 1, 0 };
+        hac.min_sync_interval = { 0, 500000 };
+
+        HighAvailabilityManager::configure(&hac);
         HighAvailabilityManager::thread_init();
         s_ha_client = new StreamHAClient;
         s_other_ha_client = new OtherHAClient;
@@ -339,14 +400,14 @@ TEST_GROUP(high_availability_test)
         delete s_other_ha_client;
         delete s_ha_client;
         HighAvailabilityManager::thread_term();
-        MemoryLeakWarningPlugin::turnOnNewDeleteOverloads();
+        HighAvailabilityManager::term();
     }
 };
 
 TEST(high_availability_test, receive_deletion)
 {
     s_delete_session_called = false;
-    s_message_content = (uint8_t*)s_delete_message;
+    s_message_content = (uint8_t*) &s_delete_message;
     s_message_length = sizeof(s_delete_message);
     HighAvailabilityManager::process_receive();
     CHECK(s_delete_session_called == true);
@@ -356,17 +417,19 @@ TEST(high_availability_test, receive_deletion)
 TEST(high_availability_test, receive_update_stream_only)
 {
     s_stream_consume_called = false;
-    s_message_content = (uint8_t*)s_update_stream_message;
+    s_stream_consume_size = 0;
+    s_message_content = (uint8_t*) &s_update_stream_message;
     s_message_length = sizeof(s_update_stream_message);
     HighAvailabilityManager::process_receive();
     CHECK(s_stream_consume_called == true);
+    CHECK(s_stream_consume_size == 10);
     CHECK(memcmp((const void*)&s_flowkey, (const void*)&s_test_key, sizeof(s_test_key)) == 0);
 }
 
 TEST(high_availability_test, transmit_deletion)
 {
     s_transmit_message_called = false;
-    HighAvailabilityManager::process_deletion(&s_flow);
+    HighAvailabilityManager::process_deletion(s_flow);
     CHECK(s_transmit_message_called == true);
 }
 
@@ -375,7 +438,7 @@ TEST(high_availability_test, transmit_update_no_update)
     s_transmit_message_called = false;
     s_stream_update_required = false;
     s_other_update_required = false;
-    HighAvailabilityManager::process_update(&s_flow, &s_pkthdr);
+    HighAvailabilityManager::process_update(&s_flow, &s_pkt);
     CHECK(s_transmit_message_called == false);
 }
 
@@ -384,7 +447,7 @@ TEST(high_availability_test, transmit_update_stream_only)
     s_transmit_message_called = false;
     s_stream_update_required = true;
     s_other_update_required = false;
-    HighAvailabilityManager::process_update(&s_flow, &s_pkthdr);
+    HighAvailabilityManager::process_update(&s_flow, &s_pkt);
     CHECK(s_transmit_message_called == true);
 }
 
@@ -395,10 +458,139 @@ TEST(high_availability_test, transmit_update_both_update)
     s_other_update_required = true;
     CHECK(s_other_ha_client->handle == 1);
     s_flow.ha_state->set_pending(s_other_ha_client->handle);
-    HighAvailabilityManager::process_update(&s_flow, &s_pkthdr);
+    HighAvailabilityManager::process_update(&s_flow, &s_pkt);
     CHECK(s_transmit_message_called == true);
 }
 
+TEST(high_availability_test, read_flow_key_error_v4)
+{
+    HAMessageHeader hdr = { 0, 0, 0, KEY_TYPE_IP4 };
+    HAMessage msg((uint8_t*) &s_test_key, KEY_SIZE_IP4 / 2);
+    FlowKey key;
+
+    CHECK(read_flow_key(msg, &hdr, key) == 0);
+    CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, read_flow_key_error_v6)
+{
+    HAMessageHeader hdr = { 0, 0, 0, KEY_TYPE_IP6 };
+    HAMessage msg((uint8_t*) &s_test_key, KEY_SIZE_IP6 / 2);
+    FlowKey key;
+
+    CHECK(read_flow_key(msg, &hdr, key) == 0);
+    CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, read_flow_key_error_unknown)
+{
+    HAMessageHeader hdr = { 0, 0, 0, 0x42 };
+    HAMessage msg((uint8_t*) &s_test_key, sizeof(s_test_key));
+    FlowKey key;
+
+    CHECK(read_flow_key(msg, &hdr, key) == 0);
+    CHECK(ha_stats.unknown_key_type == 1);
+}
+
+TEST(high_availability_test, consume_error_truncated_client_hdr)
+{
+    HAClientHeader chdr = { 0, 0 };
+    HAMessage msg((uint8_t*) &chdr, sizeof(chdr) / 2);
+    FlowKey key;
+
+    consume_ha_update_message(msg, key);
+    CHECK(ha_stats.update_msgs_consumed == 0);
+    CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, consume_error_invalid_client_idx)
+{
+    HAClientHeader chdr = { 0x42, 0 };
+    HAMessage msg((uint8_t*) &chdr, sizeof(chdr));
+    FlowKey key;
+
+    consume_ha_update_message(msg, key);
+    CHECK(ha_stats.update_msgs_consumed == 0);
+    CHECK(ha_stats.unknown_client_idx == 1);
+}
+
+TEST(high_availability_test, consume_error_truncated_client_msg)
+{
+    struct __attribute__((__packed__))
+    {
+        HAClientHeader chdr = { 0, 0x42 };
+        uint8_t cmsg[0x42 / 2] = { };
+    } input;
+    HAMessage msg((uint8_t*) &input, sizeof(input));
+    FlowKey key;
+
+    consume_ha_update_message(msg, key);
+    CHECK(ha_stats.update_msgs_consumed == 0);
+    CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, consume_error_client_consume)
+{
+    struct __attribute__((__packed__))
+    {
+        HAClientHeader chdr = { 0, 10 };
+        uint8_t cmsg[0x42 / 2] = { };
+    } input;
+    HAMessage msg((uint8_t*) &input, sizeof(input));
+    FlowKey key;
+
+    consume_ha_update_message(msg, key);
+    CHECK(ha_stats.update_msgs_consumed == 0);
+    CHECK(ha_stats.client_consume_errors == 1);
+}
+
+TEST(high_availability_test, consume_error_truncated_msg_hdr)
+{
+    HAMessageHeader hdr = { };
+    HAMessage msg((uint8_t*) &hdr, sizeof(hdr) / 2);
+
+    CHECK(consume_ha_message(msg) == nullptr);
+    CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, consume_error_version_mismatch)
+{
+    HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION + 1, 0, 0 };
+    HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
+
+    CHECK(consume_ha_message(msg) == nullptr);
+    CHECK(ha_stats.msg_version_mismatch == 1);
+}
+
+TEST(high_availability_test, consume_error_length_mismatch)
+{
+    HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION, 0x42, 0 };
+    HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
+
+    CHECK(consume_ha_message(msg) == nullptr);
+    CHECK(ha_stats.msg_length_mismatch == 1);
+}
+
+TEST(high_availability_test, produce_error_client_hdr_overflow)
+{
+    uint8_t buffer[sizeof(HAClientHeader) / 2];
+    HAMessage msg(buffer, sizeof(buffer));
+    Flow flow;
+
+    write_update_msg_client(s_ha_client, flow, msg);
+    CHECK(msg.cursor == msg.buffer);
+}
+
+TEST(high_availability_test, produce_error_client_produce)
+{
+    uint8_t buffer[sizeof(HAClientHeader)];
+    HAMessage msg(buffer, sizeof(buffer));
+    Flow flow;
+
+    write_update_msg_client(s_ha_client, flow, msg);
+    CHECK(msg.cursor == msg.buffer);
+}
+
 int main(int argc, char** argv)
 {
     return CommandLineTestRunner::RunAllTests(argc, argv);
index 2271472d30a20f71e16e46060979cda31e27b7a4..0df55e5ee226694e188a46b6b4f86db9e9c82682 100644 (file)
@@ -288,7 +288,7 @@ void Analyzer::post_process_daq_pkt_msg(Packet* p)
         PacketTracer::dump(p);
     }
 
-    HighAvailabilityManager::process_update(p->flow, p->pkth);
+    HighAvailabilityManager::process_update(p->flow, p);
 
     p->pkth = nullptr;  // no longer avail upon sig segv
 
index 86d1c6d55cd2a02d2a971304458f422b0f58ce41..5d5c1ee97e89297e17b910b0bcf84770beee9c47 100644 (file)
@@ -195,7 +195,6 @@ void Snort::init(int argc, char** argv)
 #endif
 
     SideChannelManager::pre_config_init();
-    HighAvailabilityManager::pre_config_init();
 
     ModuleManager::init();
     ScriptManager::load_scripts(snort_cmd_line_conf->script_paths);
@@ -230,6 +229,8 @@ void Snort::init(int argc, char** argv)
     if ( !sc->output.empty() )
         EventManager::instantiate(sc->output.c_str(), sc);
 
+    HighAvailabilityManager::configure(sc->ha_config);
+
     if (SnortConfig::alert_before_pass())
         sc->rule_order = "reset block drop alert pass log";
 
@@ -408,6 +409,7 @@ void Snort::term()
     }
 
     CleanupProtoNames();
+    HighAvailabilityManager::term();
     SideChannelManager::term();
     ModuleManager::term();
     PluginManager::release_plugins();
index dc93daf5f40d60d3680402212773213e808a0c96..059dc7fa7773b27227018e6eae3ce494a0da62a2 100644 (file)
@@ -35,6 +35,7 @@
 #include "filters/rate_filter.h"
 #include "filters/sfrf.h"
 #include "filters/sfthreshold.h"
+#include "flow/ha_module.h"
 #include "hash/xhash.h"
 #include "helpers/process.h"
 #include "ips_options/ips_flowbits.h"
@@ -298,6 +299,7 @@ SnortConfig::~SnortConfig()
 
     delete[] state;
     delete thread_config;
+    delete ha_config;
 
     if (gtp_ports)
         delete gtp_ports;
index 1463e8e0dd30e33f56ab6c4c43dc171dbeb9710d..7cc5225f0b87c2ee3c2f1e7f3d2b3546bd681464 100644 (file)
@@ -116,29 +116,29 @@ enum TunnelFlags
     TUNNEL_MPLS   = 0x80
 };
 
-struct ClassType;
+class FastPatternConfig;
+class RuleState;
+class ThreadConfig;
+
 struct srmm_table_t;
 struct sopg_table_t;
-
-struct MemoryConfig;
-struct LatencyConfig;
-struct PORT_RULE_MAP;
-struct RuleListNode;
-struct RulePortTables;
-class RuleState;
+struct ClassType;
 struct DetectionFilterConfig;
 struct EventQueueConfig;
-struct IpsActionsConfig;
-class FastPatternConfig;
+struct FlowBitState;
 struct FrameworkConfig;
-struct ThresholdConfig;
+struct HighAvailabilityConfig;
+struct IpsActionsConfig;
+struct LatencyConfig;
+struct MemoryConfig;
+struct PORT_RULE_MAP;
 struct RateFilterConfig;
-struct SFDAQConfig;
-class ThreadConfig;
 struct ReferenceSystemNode;
+struct RuleListNode;
+struct RulePortTables;
+struct SFDAQConfig;
+struct ThresholdConfig;
 struct VarNode;
-struct _IntelPmHandles;
-struct FlowBitState;
 
 namespace snort
 {
@@ -372,6 +372,7 @@ public:
     unsigned num_slots = 0;
 
     ThreadConfig* thread_config;
+    HighAvailabilityConfig* ha_config = nullptr;
 
     //------------------------------------------------------
     //Reload inspector related
index 893eff60f557b08fcc9e8eb4a4ae2a3b299152e4..3f7c5156832f594b97c82002d259b38e2ed9f616 100644 (file)
@@ -51,7 +51,7 @@ public:
     AppIdInspector() = default;
 };
 
-AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector& inspector)
+AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector&)
     : FlowData(0) { }
 AppIdSession::~AppIdSession() = default;
 
index c5cea991a185cc8e66c4ce9ba3970eb3fb9e8ef6..8e843ab156420d58e8d3ab80b21b5bf88bdc27f3 100644 (file)
@@ -218,7 +218,7 @@ ServiceDiscovery& ServiceDiscovery::get_instance()
         s_discovery_manager = new ServiceDiscovery();
     return *s_discovery_manager;
 }
-AppId snort::host_cache_find_app_mapping(snort::SfIp const*, Port port , Protocol proto){ return 0; }
+AppId snort::host_cache_find_app_mapping(snort::SfIp const*, Port, Protocol){ return 0; }
 // Stubs for ClientDiscovery
 ClientDiscovery::ClientDiscovery(){}
 ClientDiscovery::~ClientDiscovery() {}
index 6004686f8dd430c3adaf2cc721490d73a7250825..39dbe6f03285211996c9e6943ae278fc2c0e8979 100644 (file)
@@ -34,6 +34,7 @@ FlowData* mock_flow_data = nullptr;
 
 typedef int32_t AppId;
 Flow::Flow() = default;
+Flow::~Flow() = default;
 
 class FakeFlow : public Flow
 {
index ce1bd493a75f484867e1009479de561da30e469c..4a613289e8b6dd89c174693ffbf8488a4dd72cd0 100644 (file)
@@ -262,7 +262,7 @@ TEST(appid_session_api, get_appid_session_attribute)
         CHECK_TRUE((fv & flags) == flags);
         mock_session->clear_session_flags(flags);
         fv = appid_session_api->get_appid_session_attribute(flags);
-        CHECK_TRUE((fv & flags) == 0)
+        CHECK_TRUE((fv & flags) == 0);
     }
 }
 
index daaa8e5a6d0efa0ba7263ecad4ec06cf88e2df82..b51afdc5474d827018d2ef38219034a52f649be7 100644 (file)
@@ -71,7 +71,7 @@ THREAD_LOCAL AppIdStats appid_stats;
 
 void AppIdDebug::activate(const Flow*, const AppIdSession*, bool) { active = true; }
 
-AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector& inspector)
+AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector&)
     : FlowData(0) {}
 AppIdSession::~AppIdSession() = default;
 AppIdDiscovery::AppIdDiscovery() {}
index 39a51ba04cda7ec8d99ba9a692fd62056d54c35f..d87e0aaf4b8b5da3befda284849c9f1411c24e29 100644 (file)
@@ -45,7 +45,7 @@ TEST_GROUP(sip_splitter_test)
 TEST(sip_splitter_test, callispaf)
 {
     bool result = ssut.splitter_is_paf();
-    CHECK(result)
+    CHECK(result);
 }
 
 TEST(sip_splitter_test, reset_states_test)
index 3eff59a440345479e12629ba62a69d9b1d5d25a6..ff7bc6b9b4bf5d632bc68ab447e20acf9db29d7a 100644 (file)
@@ -60,25 +60,25 @@ static void protocol_deactivate_session(Flow* flow)
         protocol_ha->deactivate_session(flow);
 }
 
-static Flow* protocol_create_session(FlowKey* key)
+static Flow* protocol_create_session(const FlowKey* key)
 {
     ProtocolHA* protocol_ha = get_protocol_ha(key->pkt_type);
     return protocol_ha ?  protocol_ha->create_session(key) : nullptr;
 }
 
-static bool is_client_lower(Flow* flow)
+static bool is_client_lower(const Flow& flow)
 {
-    if (flow->client_ip.fast_lt6(flow->server_ip))
+    if (flow.client_ip.fast_lt6(flow.server_ip))
         return true;
 
-    if (flow->server_ip.fast_lt6(flow->client_ip))
+    if (flow.server_ip.fast_lt6(flow.client_ip))
         return false;
 
-    switch (flow->key->pkt_type)
+    switch (flow.key->pkt_type)
     {
         case PktType::TCP:
         case PktType::UDP:
-            if (flow->client_port < flow->server_port)
+            if (flow.client_port < flow.server_port)
                 return true;
             break;
         default:
@@ -87,18 +87,14 @@ static bool is_client_lower(Flow* flow)
     return false;
 }
 
-bool StreamHAClient::consume(Flow*& flow, FlowKey* key, HAMessage* msg)
+bool StreamHAClient::consume(Flow*& flow, const FlowKey* key, HAMessage& msg, uint8_t size)
 {
     assert(key);
-    assert(msg);
 
-    // Is the message long enough to have our content?
-    if ( ((unsigned)(msg->content_length()) - (unsigned)(msg->cursor - msg->content())) <
-        sizeof(SessionHAContent) )
+    if (size != sizeof(SessionHAContent))
         return false;
 
-    SessionHAContent* hac = (SessionHAContent*)msg->cursor;
-    msg->cursor += sizeof(SessionHAContent);
+    SessionHAContent* hac = (SessionHAContent*) msg.cursor;
 
     // If flow is missing, we need to create a new one.
     if ( flow == nullptr )
@@ -137,33 +133,28 @@ bool StreamHAClient::consume(Flow*& flow, FlowKey* key, HAMessage* msg)
         flow->ha_state->add(FlowHAState::STANDBY);
     }
 
+    msg.advance_cursor(sizeof(SessionHAContent));
+
     return true;
 }
 
-bool StreamHAClient::produce(Flow* flow, HAMessage* msg)
+bool StreamHAClient::produce(Flow& flow, HAMessage& msg)
 {
-    assert(flow);
-    assert(msg);
+    if (!msg.fits(sizeof(SessionHAContent)))
+        return false;
 
-    // Check for buffer overflows
-    if ( (int)(msg->cursor - msg->content()) <= (int)(msg->content_length() -
-        sizeof(SessionHAContent)) )
-    {
-        SessionHAContent* hac = (SessionHAContent*)msg->cursor;
+    SessionHAContent* hac = (SessionHAContent*) msg.cursor;
 
-        memcpy(&(hac->ssn_state),&(flow->ssn_state),sizeof(LwState));
-        hac->flow_state = flow->flow_state;
-        hac->flags = 0;
-        msg->cursor += sizeof(SessionHAContent);
+    hac->ssn_state = flow.ssn_state;
+    hac->flow_state = flow.flow_state;
+    hac->flags = 0;
+    if (!is_client_lower(flow))
+        hac->flags |= SessionHAContent::FLAG_LOW;
+    hac->flags |= SessionHAContent::FLAG_IP6;
 
-        if ( !is_client_lower(flow) )
-            hac->flags |= SessionHAContent::FLAG_LOW;
+    msg.advance_cursor(sizeof(SessionHAContent));
 
-        hac->flags |= SessionHAContent::FLAG_IP6;
-        return true;
-    }
-    else
-        return false;
+    return true;
 }
 
 static void update_flags(Flow* flow)
@@ -241,13 +232,8 @@ bool StreamHAClient::is_update_required(Flow* flow)
         and that we're not overrunning the synchronization threshold. */
     if ( flow->ha_state->sync_interval_elapsed() )
         return true;
-    else
-        return flow->ha_state->check_any(FlowHAState::CRITICAL);
-}
 
-bool StreamHAClient::is_delete_required(Flow*)
-{
-    return true;
+    return flow->ha_state->check_any(FlowHAState::CRITICAL);
 }
 
 ProtocolHA::ProtocolHA(PktType protocol)
@@ -277,7 +263,7 @@ ProtocolHA::~ProtocolHA()
     }
 }
 
-void ProtocolHA::process_deletion(Flow* flow)
+void ProtocolHA::process_deletion(Flow& flow)
 {
     HighAvailabilityManager::process_deletion(flow);
 }
index eae05b294b669afc3e0ac095d66a55f819ede117..3276a81c1f84f7ef39fdba4fba6b3437d05afa5a 100644 (file)
 #include "flow/flow.h"
 #include "flow/ha.h"
 
-//-------------------------------------------------------------------------
-
 class __attribute__((__packed__)) SessionHAContent
 {
 public:
     snort::LwState ssn_state;
     snort::Flow::FlowState flow_state;
     uint8_t flags;
-    static const uint8_t FLAG_LOW = 0x01; // client address / port is low in key
-    static const uint8_t FLAG_IP6 = 0x02; // key addresses are ip6
+    static constexpr uint8_t FLAG_LOW = 0x01; // client address / port is low in key
+    static constexpr uint8_t FLAG_IP6 = 0x02; // key addresses are ip6
 };
 
-class StreamHAClient : public FlowHAClient
+class StreamHAClient : public snort::FlowHAClient
 {
 public:
     StreamHAClient() : FlowHAClient(sizeof(SessionHAContent), true) { }
-    bool consume(snort::Flow*&, snort::FlowKey*, HAMessage*) override;
-    bool produce(snort::Flow*, HAMessage*) override;
+    bool consume(snort::Flow*&, const snort::FlowKey*, snort::HAMessage&, uint8_t size) override;
+    bool produce(snort::Flow&, snort::HAMessage&) override;
     bool is_update_required(snort::Flow*) override;
-    bool is_delete_required(snort::Flow*) override;
-
-private:
 };
 
 class ProtocolHA
@@ -52,12 +47,9 @@ class ProtocolHA
 public:
     ProtocolHA(PktType);
     virtual ~ProtocolHA();
-    virtual void delete_session(snort::Flow*) { }
-    virtual snort::Flow* create_session(snort::FlowKey*) { return nullptr; }
+    virtual snort::Flow* create_session(const snort::FlowKey*) { return nullptr; }
     virtual void deactivate_session(snort::Flow*) { }
-    virtual void process_deletion(snort::Flow*);
-
-private:
+    virtual void process_deletion(snort::Flow&);
 };
 
 class StreamHAManager
index 0b0e706229df4d90381db7f2c60e7699c8b1119e..509186223271ea6806eaa92b8ba2fcffd54e0b55 100644 (file)
@@ -28,7 +28,7 @@
 
 using namespace snort;
 
-Flow* IcmpHA::create_session(FlowKey* key)
+Flow* IcmpHA::create_session(const FlowKey* key)
 {
     assert(key);
     Flow* flow = Stream::new_flow(key);
@@ -45,7 +45,7 @@ Flow* IcmpHA::create_session(FlowKey* key)
 
 THREAD_LOCAL IcmpHA* IcmpHAManager::icmp_ha = nullptr;
 
-void IcmpHAManager::process_deletion(Flow* flow)
+void IcmpHAManager::process_deletion(Flow& flow)
 {
     if( icmp_ha != nullptr )
         icmp_ha->process_deletion(flow);
index 1437e3a420dd1a63b14d46fbf086d10337445ddb..e3df35337c5b4419e9e582795876995fbab1f364 100644 (file)
@@ -33,7 +33,7 @@ class IcmpHA : public ProtocolHA
 {
 public:
     IcmpHA() : ProtocolHA(PktType::ICMP) { }
-    snort::Flow* create_session(snort::FlowKey*) override;
+    snort::Flow* create_session(const snort::FlowKey*) override;
 
 private:
 };
@@ -41,7 +41,7 @@ private:
 class IcmpHAManager
 {
 public:
-    static void process_deletion(snort::Flow* flow);
+    static void process_deletion(snort::Flow& flow);
     static void tinit();
     static void tterm();
     static THREAD_LOCAL IcmpHA* icmp_ha;
index 230bc34572d7a8f457c5d5901e56758b6ee3f854..7bc5b6a778dd82eeb7c8e49b95223fb1d773077f 100644 (file)
@@ -196,7 +196,7 @@ bool IcmpSession::setup(Packet*)
 void IcmpSession::clear()
 {
     IcmpSessionCleanup(flow);
-    IcmpHAManager::process_deletion(flow);
+    IcmpHAManager::process_deletion(*flow);
 }
 
 int IcmpSession::process(Packet* p)
index 153af32cf0b4e9cf0f1143960594ce85c5b9df61..a9bb3205538f86176d577dc7dfd49eabe3d0f24e 100644 (file)
@@ -29,7 +29,7 @@
 
 using namespace snort;
 
-Flow* IpHA::create_session(FlowKey* key)
+Flow* IpHA::create_session(const FlowKey* key)
 {
     assert(key);
 
@@ -47,7 +47,7 @@ Flow* IpHA::create_session(FlowKey* key)
 
 THREAD_LOCAL IpHA* IpHAManager::ip_ha = nullptr;
 
-void IpHAManager::process_deletion(Flow* flow)
+void IpHAManager::process_deletion(Flow& flow)
 {
     if( ip_ha != nullptr )
         ip_ha->process_deletion(flow);
index e3333e1f8d55453449f498b6b133aca94a12ec4f..9dd0a13aae42c7f16316238d557110b9a9edb185 100644 (file)
@@ -33,7 +33,7 @@ class IpHA : public ProtocolHA
 {
 public:
     IpHA() : ProtocolHA(PktType::IP) { }
-    snort::Flow* create_session(snort::FlowKey*) override;
+    snort::Flow* create_session(const snort::FlowKey*) override;
 
 private:
 };
@@ -41,7 +41,7 @@ private:
 class IpHAManager
 {
 public:
-    static void process_deletion(snort::Flow* flow);
+    static void process_deletion(snort::Flow& flow);
     static void tinit();
     static void tterm();
     static THREAD_LOCAL IpHA* ip_ha;
index dc5092885c9011c522b2a57d26f2ebdb7353a7f8..2a44eb5da4d5852166cd96a1d929168fe0a8f3c2 100644 (file)
@@ -136,7 +136,7 @@ void IpSession::clear()
     }
 
     IpSessionCleanup(flow, &tracker);
-    IpHAManager::process_deletion(flow);
+    IpHAManager::process_deletion(*flow);
 }
 
 bool IpSession::setup(Packet* p)
@@ -173,7 +173,7 @@ int IpSession::process(Packet* p)
         if ( Stream::expected_flow(flow, p) )
             return 0;
 #endif
-        IpHAManager::process_deletion(flow);
+        IpHAManager::process_deletion(*flow);
     }
 
     if ( Stream::blocked_flow(p) || Stream::ignored_flow(flow, p) )
index c472dd6bb0bd09857a4c666f3ee3eaa27ae52ffb..9772dd815493cfb7460631cb2bac415ba1f8d2d3 100644 (file)
@@ -99,7 +99,6 @@ static Packet* init_packet(Flow* flow, uint32_t talker)
 
 void release_packet(Packet* p)
 {
-    delete p->flow->session;
     delete p->context;
     delete p;
 }
index 1668a85fa4f3fdc762ddeda0fa978d93553b3eb0..b3e2fa1aec3fe7f45f6e4bb3afca2ffc9b065d64 100644 (file)
@@ -437,7 +437,7 @@ void TcpStreamSession::clear()
         // this does NOT flush data
         clear_session( true, false, false );
 
-    TcpHAManager::process_deletion(flow);
+    TcpHAManager::process_deletion(*flow);
 }
 
 void TcpStreamSession::set_splitter(bool to_server, StreamSplitter* ss)
index 5f5b47293b0f0c5e4350eb47547073f66fe95348..3ca0d7234f9b1ce971a9e1d7bdd11a687bc4987f 100644 (file)
@@ -73,11 +73,6 @@ Flow* Stream::get_flow(const FlowKey* key)
 Flow* Stream::new_flow(const FlowKey* key)
 { return flow_con->new_flow(key); }
 
-Flow* Stream::new_flow(FlowKey* key)
-{
-    return flow_con ? flow_con->new_flow(key) : nullptr;
-}
-
 void Stream::delete_flow(const FlowKey* key)
 { flow_con->delete_flow(key); }
 
index 36d0db3990f6cd052223dabad263778438f9d88f..be66d1794cf3d9fb27fa98ccc7c5909c9e452011 100644 (file)
@@ -75,7 +75,6 @@ public:
     static void timeout_flows(time_t cur_time);
     static void prune_flows();
     static bool expected_flow(Flow*, Packet*);
-    static Flow* new_flow(FlowKey*);
 
     // Looks in the flow cache for flow session with specified key and returns
     // pointer to flow session object if found, otherwise null.
index 5932ae8854cd851c3ce6aab7ddb025fef997401f..9b37fee633ad9008f4171f07d686db6f464816b6 100644 (file)
@@ -29,7 +29,7 @@
 
 using namespace snort;
 
-Flow* TcpHA::create_session(FlowKey* key)
+Flow* TcpHA::create_session(const FlowKey* key)
 {
     assert(key);
 
@@ -60,7 +60,7 @@ void TcpHA::deactivate_session(Flow* flow)
 
 THREAD_LOCAL TcpHA* TcpHAManager::tcp_ha = nullptr;
 
-void TcpHAManager::process_deletion(Flow* flow)
+void TcpHAManager::process_deletion(Flow& flow)
 {
     if( tcp_ha != nullptr )
         tcp_ha->process_deletion(flow);
index 23d888b1ee95939eadaf361e5c94c18d0dd43170..1ed0204e74bfa6646178ee8697da5a8b1a831a6a 100644 (file)
@@ -33,7 +33,7 @@ class TcpHA : public ProtocolHA
 {
 public:
     TcpHA() : ProtocolHA(PktType::TCP) { }
-    snort::Flow* create_session(snort::FlowKey*) override;
+    snort::Flow* create_session(const snort::FlowKey*) override;
     void deactivate_session(snort::Flow*) override;
 
 private:
@@ -42,7 +42,7 @@ private:
 class TcpHAManager
 {
 public:
-    static void process_deletion(snort::Flow* flow);
+    static void process_deletion(snort::Flow& flow);
     static void tinit();
     static void tterm();
     static THREAD_LOCAL TcpHA* tcp_ha;
index ede24a9433b49dcb369c346c8162eb958a371a9d..823b1847921e3f8f13e6062b9a010b54f9454534 100644 (file)
@@ -902,7 +902,7 @@ void TcpSession::do_packet_analysis_post_checks(Packet* p)
         flow->set_expire(p, config->session_timeout);
     }
     else
-        TcpHAManager::process_deletion(p->flow);
+        TcpHAManager::process_deletion(*p->flow);
 
     if (pkt_action_mask & ACTION_DISABLE_INSPECTION)
     {
@@ -954,7 +954,7 @@ void TcpSession::cleanup_session_if_expired(Packet* p)
             clear_session(true, true, false, p);
 
         tcpStats.timeouts++;
-        TcpHAManager::process_deletion(flow);
+        TcpHAManager::process_deletion(*flow);
     }
 }
 
index 5f76ae7959795d8bcf6f69d218832e911abc6c8d..fdef875de29f23dce3eb0d6b60c621b6af43e4e5 100644 (file)
@@ -29,7 +29,7 @@
 
 using namespace snort;
 
-Flow* UdpHA::create_session(FlowKey* key)
+Flow* UdpHA::create_session(const FlowKey* key)
 {
     assert(key);
 
@@ -46,7 +46,7 @@ Flow* UdpHA::create_session(FlowKey* key)
 
 THREAD_LOCAL UdpHA* UdpHAManager::udp_ha = nullptr;
 
-void UdpHAManager::process_deletion(Flow* flow)
+void UdpHAManager::process_deletion(Flow& flow)
 {
     if( udp_ha != nullptr )
         udp_ha->process_deletion(flow);
index 1c7848bc313c753429bd60bbfba833808e333996..55ca9e8cc539b480192f2d1fdc63e637694909ee 100644 (file)
@@ -33,7 +33,7 @@ class UdpHA : public ProtocolHA
 {
 public:
     UdpHA() : ProtocolHA(PktType::UDP) { }
-    snort::Flow* create_session(snort::FlowKey*) override;
+    snort::Flow* create_session(const snort::FlowKey*) override;
 
 private:
 };
@@ -41,7 +41,7 @@ private:
 class UdpHAManager
 {
 public:
-    static void process_deletion(snort::Flow* flow);
+    static void process_deletion(snort::Flow& flow);
     static void tinit();
     static void tterm();
     static THREAD_LOCAL UdpHA* udp_ha;
index d5e79c4c2a28cf473ea95f59cefecd5f2b51d63f..daa35ff64e5a18a5a6dc027beda379f72f4f60e7 100644 (file)
@@ -137,7 +137,7 @@ bool UdpSession::setup(Packet* p)
 void UdpSession::clear()
 {
     UdpSessionCleanup(flow);
-    UdpHAManager::process_deletion(flow);
+    UdpHAManager::process_deletion(*flow);
     flow->clear();
 }
 
@@ -187,7 +187,7 @@ int UdpSession::process(Packet* p)
         flow->restart();
         flow->ssn_state.session_flags |= SSNFLAG_SEEN_SENDER;
         udpStats.created++;
-        UdpHAManager::process_deletion(flow);
+        UdpHAManager::process_deletion(*flow);
     }
 
     ProcessUdp(flow, p, pc, nullptr);