flow.h
flow_key.h
flow_stash.h
+ ha.h
stash_item.h
)
flow_stash.h
flow_stash.cc
ha.cc
- ha.h
ha_module.cc
ha_module.h
prune_stats.h
There are many flags that may be set on a flow to indicate session tracking
state, disposition, etc.
+=== High Availability
+
HighAvailability (ha.cc, ha.h) serves to synchronize session state between high
-availabity partners. HighAvailability uses Side Channel Connectors to transmit
-and receive messages. The HA side channel must be full duplex (both a
-transmit and receive connector). The primary purpose is to exchange session
-state with the goal of keeping the session caches in sync. But other clients
-may also subscribe to the HA service to exchange additional session data. Each
-client uses the FlowHAClient class to subscribe to the service.
+availabity partners. The primary purpose is to exchange session state with the
+goal of keeping the session caches in sync. Other clients may also register
+with the HA service to exchange additional session data by implementing one or
+more FlowHAClient classes.
+
+HA uses Side Channel Connectors and/or DAQ module IOCTLs to transmit and receive
+HA state messages. A full duplex Side Channel is required for Side Channel
+communications. A DAQ module that supports the IOCTLs for setting and getting
+HA state is required for a functional DAQ-backed setup. Modules that do not
+support these IOCTLs will silently fail and present no HA state on future
+packets in the flow. HA state will be queried from the DAQ module prior to new
+flow creation when the appropriate DAQ packet message flag has been set on the
+intiating packet.
The HA subsystem exchanges two high level message types:
- DELETE: Indicate to the partner that a session has neen removed. No
-additional HA client status will be exchanged.
+additional HA client status will be exchanged. (Not used for DAQ-backed
+storage.)
- UPDATE: Indicate all other state changes. The message always includes
the session state and optionally may include state from other HA clients.
+By default, the update messages are incremental. In the case of DAQ-backed
+storage, the update messages are always fully formed.
The HA subsystem implements these classes:
- HighAvailabilityManager - A collection of static elements providing the
top-most interface to HA capabilities.
- - HAMessage - A wrapper around the actual message (SideChannel only for now)
- and includes a cursor for producer/consumer activity. Passed around
+ - HAMessage - A wrapper around the actual message and includes a cursor and
+ convenience functions for producer/consumer activity. Passed around
among all message handing classes/methods.
- - HighAvailability - If HA is enabled instantiated in each packet thread and
+ - HighAvailability - If HA is enabled, instantiated in each packet thread and
provides all primary HA functionality for the thread. Referenced via a
THREAD_LOCAL object pointer.
- FlowHAState - One per flow and referenced via a pointer in the Flow object.
memset(this, 0, sizeof(*this));
}
+Flow::~Flow()
+{
+ term();
+}
void Flow::init(PktType type)
{
#define STREAM_STATE_BLOCK_PENDING 0x4000
class BitOp;
-class FlowHAState;
class Session;
namespace snort
{
+class FlowHAState;
struct FlowKey;
class IpsContext;
struct Packet;
ALLOW
};
Flow();
+ ~Flow();
Flow(const Flow&) = delete;
Flow& operator=(const Flow&) = delete;
#include "expect_cache.h"
#include "flow_cache.h"
+#include "ha.h"
#include "session.h"
using namespace snort;
FlowKey key;
set_key(&key, p);
Flow* flow = con.cache->find(&key);
+ if ( !flow )
+ flow = HighAvailabilityManager::import(*p, key);
if ( !flow )
{
// with this program; if not, write to the Free Software Foundation, Inc.,
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//--------------------------------------------------------------------------
-// ha.cc author Ed Borgoyn <eborgoyn@cisco.com>
+// ha.cc authors Ed Borgoyn <eborgoyn@cisco.com>, Michael Altizer <mialtize@cisco.com>
#ifdef HAVE_CONFIG_H
#include "config.h"
#include "ha.h"
-#include <array>
-
#include "framework/counts.h"
#include "log/messages.h"
-#include "profiler/profiler_defs.h"
+#include "packet_io/sfdaq_instance.h"
+#include "protocols/packet.h"
+#include "side_channel/side_channel.h"
#include "stream/stream.h"
#include "time/packet_time.h"
#include "flow.h"
#include "flow_key.h"
+#include "ha_module.h"
using namespace snort;
-static const uint8_t HA_MESSAGE_VERSION = 3;
+enum HAEvent
+{
+ HA_DELETE_EVENT = 1,
+ HA_UPDATE_EVENT = 2
+};
-// define message size and content constants.
-static const uint8_t KEY_SIZE_IP6 = sizeof(FlowKey);
-// ip4 key is smaller by 2*(ip6-addr-size - ip4-addr-size) or 2*(16 - 4) = 24
-static const uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24;
+struct __attribute__((__packed__)) HAMessageHeader
+{
+ uint8_t event;
+ uint8_t version;
+ uint16_t total_length;
+ uint8_t key_type;
+};
+
+struct __attribute__((__packed__)) HAClientHeader
+{
+ uint8_t client;
+ uint8_t length;
+};
+
+// One client for each mask bit plus one 'automatic' session client
+// client handle = (1<<(client_index-1)
+// session client has handle of 0 and index of 0
+static constexpr uint8_t MAX_CLIENTS = 17;
-static const suseconds_t USEC_PER_SEC = 1000000;
+// HighAvailability is the thread-local state/configuration instantiated for each packet thread.
+typedef std::array<FlowHAClient*, MAX_CLIENTS> ClientMap;
+class HighAvailability
+{
+public:
+ HighAvailability(PortBitSet*,bool);
+ ~HighAvailability();
+
+ void process_update(Flow*, Packet*);
+ void process_deletion(Flow&);
+ void process_receive();
+
+ Flow* process_daq_import(Packet&, FlowKey&);
+
+ // The [0] entry contains the stream client (always present)
+ // Entries [1] to [MAX_CLIENTS-1] contain the optional clients
+ ClientMap client_map = { };
+ uint8_t handle_counter = 1; // stream client (index == 0) always exists
+ bool shutting_down = false;
+
+private:
+ SideChannel* sc = nullptr;
+ bool use_daq_channel;
+};
+
+static constexpr uint8_t HA_MESSAGE_VERSION = 3;
+
+// define message size and content constants.
+static constexpr uint8_t KEY_SIZE_IP6 = sizeof(FlowKey);
+// ip4 key is smaller by 2*(ip6_addr_size - ip4_addr_size) or 2 * (16 - 4) = 24
+static constexpr uint8_t KEY_SIZE_IP4 = sizeof(FlowKey)-24;
enum
{
KEY_TYPE_IP4 = 2
};
-typedef std::array<FlowHAClient*, MAX_CLIENTS> ClientMap;
-
-THREAD_LOCAL SimpleStats ha_stats;
-THREAD_LOCAL ProfileStats ha_perf_stats;
+static constexpr FlowHAClientHandle SESSION_HA_CLIENT = 0x0000;
+static constexpr uint8_t SESSION_HA_CLIENT_INDEX = 0;
-static THREAD_LOCAL HighAvailability* ha;
PortBitSet* HighAvailabilityManager::ports = nullptr;
bool HighAvailabilityManager::use_daq_channel = false;
-THREAD_LOCAL bool HighAvailabilityManager::shutting_down = false;
+
struct timeval FlowHAState::min_session_lifetime;
struct timeval FlowHAState::min_sync_interval;
-uint8_t s_handle_counter = 1; // stream client (index == 0) always exists
-// The [0] entry contains the stream client (always present)
-// Entries [1] to [MAX_CLIENTS-1] contain the optional clients
-static THREAD_LOCAL ClientMap* s_client_map;
+static THREAD_LOCAL HighAvailability* ha;
static inline bool is_ip6_key(const FlowKey* key)
{
// Set the initial update time to now+min_session_lifetime
packet_gettimeofday(&next_update);
- next_update.tv_usec += min_session_lifetime.tv_usec;
- if (next_update.tv_usec > USEC_PER_SEC)
- {
- next_update.tv_usec -= USEC_PER_SEC;
- next_update.tv_sec++;
- }
- next_update.tv_sec += min_session_lifetime.tv_sec;
+ timeradd(&next_update, &min_session_lifetime, &next_update);
}
void FlowHAState::set_pending(FlowHAClientHandle handle)
{
- pending |= (uint16_t)handle;
+ pending |= handle;
}
bool FlowHAState::check_pending(FlowHAClientHandle handle)
{
- return ((pending & (uint16_t)handle) != 0);
+ return ((pending & handle) != 0);
}
void FlowHAState::clear_pending(FlowHAClientHandle handle)
{
- pending &= ~((uint16_t)handle);
+ pending &= ~handle;
}
void FlowHAState::set(uint8_t new_state)
void FlowHAState::set_next_update()
{
- next_update.tv_usec += min_sync_interval.tv_usec;
- if (next_update.tv_usec > USEC_PER_SEC)
- {
- next_update.tv_usec -= USEC_PER_SEC;
- next_update.tv_sec++;
- }
- next_update.tv_sec += min_sync_interval.tv_sec;
+ timeradd(&next_update, &min_sync_interval, &next_update);
}
void FlowHAState::reset()
FlowHAClient::FlowHAClient(uint8_t length, bool session_client)
{
- if ( !s_client_map )
+ if (!ha)
return;
- header.length = length;
+ max_length = length;
- if ( session_client )
+ if (session_client)
{
+ index = SESSION_HA_CLIENT_INDEX;
handle = SESSION_HA_CLIENT;
- header.client = SESSION_HA_CLIENT_INDEX;
- (*s_client_map)[0] = this;
+ ha->client_map[0] = this;
}
else
{
- if ( s_handle_counter >= MAX_CLIENTS )
+ if (ha->handle_counter >= MAX_CLIENTS)
{
ErrorMessage("Attempting to register too many FlowHAClients\n");
return;
}
- header.client = s_handle_counter;
- handle = (1 << (s_handle_counter-1));
- (*s_client_map)[s_handle_counter] = this;
- s_handle_counter += 1;
- }
-}
-
-bool FlowHAClient::fit(HAMessage* msg, uint8_t size)
-{
- return ( (int)(msg->cursor - msg->content()) < (int)(msg->content_length() - size) );
-}
-
-bool FlowHAClient::place(HAMessage* msg, uint8_t* data, uint8_t length)
-{
- if ( fit(msg, length) )
- {
- memcpy(msg->cursor,data,(size_t)length);
- msg->cursor += length;
- return true;
+ index = ha->handle_counter;
+ handle = (1 << (index - 1));
+ ha->client_map[index] = this;
+ ha->handle_counter++;
}
- else
- return false;
}
// Write the key type, key length, and key into the message.
-// Does not use the message cursor coming in.
-// Leave the message cursor just after the key. Return
-// the key length.
-static uint8_t write_flow_key(Flow* flow, HAMessage* msg)
-{
- HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
- msg->cursor = (uint8_t*)hdr + sizeof(HAMessageHeader);
- const FlowKey* key = flow->key;
+// Return the type of key written so it can be stored in the message header.
+static uint8_t write_flow_key(Flow& flow, HAMessage& msg)
+{
+ const FlowKey* key = flow.key;
assert(key);
- if ( is_ip6_key(flow->key) )
+ if (is_ip6_key(flow.key))
{
- hdr->key_type = KEY_TYPE_IP6;
- memcpy(msg->cursor, key, KEY_SIZE_IP6);
- msg->cursor += KEY_SIZE_IP6;
+ memcpy(msg.cursor, key, KEY_SIZE_IP6);
+ msg.advance_cursor(KEY_SIZE_IP6);
- return KEY_SIZE_IP6;
+ return KEY_TYPE_IP6;
}
- hdr->key_type = KEY_TYPE_IP4;
- memcpy(msg->cursor, &key->ip_l[3], sizeof(key->ip_l[3]));
- msg->cursor += sizeof(key->ip_l[3]);
- memcpy(msg->cursor, &key->ip_h[3], sizeof(key->ip_h[3]));
- msg->cursor += sizeof(key->ip_h[3]);
- memcpy(msg->cursor, ((const uint8_t*)key) + 32, KEY_SIZE_IP4 - 8);
- msg->cursor += KEY_SIZE_IP4 - 8;
+ memcpy(msg.cursor, &key->ip_l[3], sizeof(key->ip_l[3]));
+ msg.advance_cursor(sizeof(key->ip_l[3]));
+ memcpy(msg.cursor, &key->ip_h[3], sizeof(key->ip_h[3]));
+ msg.advance_cursor(sizeof(key->ip_h[3]));
+ memcpy(msg.cursor, ((const uint8_t*) key) + 32, KEY_SIZE_IP4 - 8);
+ msg.advance_cursor(KEY_SIZE_IP4 - 8);
- return KEY_SIZE_IP4;
+ return KEY_TYPE_IP4;
}
-// Regardless of the message cursor, extract the key and
-// return the key length. Position the cursor just after the key.
-static uint8_t read_flow_key(FlowKey* key, HAMessage* msg)
+// Extract the key and return the key length. Position the cursor just after the key.
+static uint8_t read_flow_key(HAMessage& msg, const HAMessageHeader* hdr, FlowKey& key)
{
- assert(key);
- HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
- msg->cursor = (uint8_t*)hdr + sizeof(HAMessageHeader);
-
- if ( hdr->key_type == KEY_TYPE_IP6 )
+ if (hdr->key_type == KEY_TYPE_IP6)
{
- memcpy(key, msg->cursor, KEY_SIZE_IP6);
- msg->cursor += KEY_SIZE_IP6;
+ if (!msg.fits(KEY_SIZE_IP6))
+ {
+ ha_stats.truncated_msgs++;
+ return 0;
+ }
+
+ memcpy(&key, msg.cursor, KEY_SIZE_IP6);
+ msg.advance_cursor(KEY_SIZE_IP6);
return KEY_SIZE_IP6;
}
- else if ( hdr->key_type == KEY_TYPE_IP4 )
+ else if (hdr->key_type == KEY_TYPE_IP4)
{
+ if (!msg.fits(KEY_SIZE_IP4))
+ {
+ ha_stats.truncated_msgs++;
+ return 0;
+ }
+
/* Lower IPv4 address */
- memcpy(&key->ip_l[3], msg->cursor, sizeof(key->ip_l[3]));
- key->ip_l[0] = key->ip_l[1] = 0;
- key->ip_l[2] = htonl(0xFFFF);
- msg->cursor += sizeof(key->ip_l[3]);
+ memcpy(&key.ip_l[3], msg.cursor, sizeof(key.ip_l[3]));
+ key.ip_l[0] = key.ip_l[1] = 0;
+ key.ip_l[2] = htonl(0xFFFF);
+ msg.advance_cursor(sizeof(key.ip_l[3]));
/* Higher IPv4 address */
- memcpy(&key->ip_h[3], msg->cursor, sizeof(key->ip_h[3]));
- key->ip_h[0] = key->ip_h[1] = 0;
- key->ip_h[2] = htonl(0xFFFF);
- msg->cursor += sizeof(key->ip_h[3]);
+ memcpy(&key.ip_h[3], msg.cursor, sizeof(key.ip_h[3]));
+ key.ip_h[0] = key.ip_h[1] = 0;
+ key.ip_h[2] = htonl(0xFFFF);
+ msg.advance_cursor(sizeof(key.ip_h[3]));
/* The remainder of the key */
- memcpy(((uint8_t*)key) + 32, msg->cursor, KEY_SIZE_IP4 - 8);
- msg->cursor += KEY_SIZE_IP4 - 8;
+ memcpy(((uint8_t*) &key) + 32, msg.cursor, KEY_SIZE_IP4 - 8);
+ msg.advance_cursor(KEY_SIZE_IP4 - 8);
return KEY_SIZE_IP4;
}
- else
- return 0;
+
+ ha_stats.unknown_key_type++;
+ return 0;
}
-static inline uint8_t key_size(Flow* flow)
+static inline uint8_t key_size(Flow& flow)
{
- assert(flow->key);
- return is_ip6_key(flow->key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4;
+ assert(flow.key);
+ return is_ip6_key(flow.key) ? KEY_SIZE_IP6 : KEY_SIZE_IP4;
}
-static uint16_t calculate_msg_header_length(Flow* flow)
+static uint16_t calculate_msg_header_length(Flow& flow)
{
return sizeof(HAMessageHeader) + key_size(flow);
}
// Calculate the UPDATE message content length based on the
// set of active clients. The Session client is always present.
-static uint16_t calculate_update_msg_content_length(Flow* flow)
+static uint16_t calculate_update_msg_content_length(Flow& flow, bool full)
{
- assert(s_client_map);
- assert((*s_client_map)[0]);
+ assert(ha->client_map[0]);
uint16_t length = 0;
- for (int i=0; i<s_handle_counter; i++)
+ for (int i = 0; i < ha->handle_counter; i++)
{
// Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck
- if ( (i == 0 ) || flow->ha_state->check_pending(1<<(i-1)) )
+ if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1)))
{
- assert((*s_client_map)[i]);
- length += ((*s_client_map)[i]->get_message_size() + sizeof(HAClientHeader));
+ assert(ha->client_map[i]);
+ length += (ha->client_map[i]->get_message_size() + sizeof(HAClientHeader));
}
}
// Write the HA header and key sections. Position the cursor
// at the beginning of the content section.
-static void write_msg_header(Flow* flow, HAEvent event, uint16_t content_length, HAMessage* msg)
+static void write_msg_header(Flow& flow, HAEvent event, uint16_t content_length, HAMessage& msg)
{
- HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
- hdr->event = (uint8_t)event;
+ HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor;
+ hdr->event = (uint8_t) event;
hdr->version = HA_MESSAGE_VERSION;
hdr->total_length = content_length;
- write_flow_key(flow, msg); // set cursor to just beyond key
+ msg.advance_cursor(sizeof(HAMessageHeader));
+ hdr->key_type = write_flow_key(flow, msg);
}
-static void write_update_msg_client( FlowHAClient* client, Flow* flow, HAMessage* msg)
+static uint16_t update_msg_header_length(HAMessage& msg)
+{
+ HAMessageHeader* hdr = (HAMessageHeader*) msg.buffer;
+ hdr->total_length = msg.cursor_position();
+ return hdr->total_length;
+}
+
+static void write_update_msg_client(FlowHAClient* client, Flow& flow, HAMessage& msg)
{
assert(client);
- assert(msg);
- client->place(msg,(uint8_t*)&(client->header),(uint8_t)sizeof(client->header));
- client->produce(flow, msg);
+ if (!msg.fits(sizeof(HAClientHeader)))
+ return;
+
+ // Preemptively insert the client header. If production fails, roll back the message cursor
+ // to its original position.
+ uint8_t* original_cursor = msg.cursor;
+ HAClientHeader* header = (HAClientHeader*) original_cursor;
+ header->client = client->index;
+ msg.advance_cursor(sizeof(HAClientHeader));
+ if (!client->produce(flow, msg))
+ {
+ msg.reset_cursor(original_cursor);
+ return;
+ }
+ assert(msg.cursor >= (original_cursor + sizeof(HAClientHeader)));
+ header->length = (uint32_t) (msg.cursor - original_cursor - sizeof(HAClientHeader));
}
-static void write_update_msg_content(Flow* flow, HAMessage* msg)
+static void write_update_msg_content(Flow& flow, HAMessage& msg, bool full)
{
- assert(s_client_map);
-
- for ( int i=0; i<s_handle_counter; i++ )
+ for (int i = 0; i < ha->handle_counter; i++)
{
// Don't check 'i' against SESSION_HA_CLIENT_INDEX (==0), as this creates a false positive with cppcheck
- if ( (i == 0) || flow->ha_state->check_pending(1<<(i-1)) )
- write_update_msg_client((*s_client_map)[i],flow, msg);
+ if ((i == 0) || full || flow.ha_state->check_pending(1 << (i - 1)))
+ write_update_msg_client(ha->client_map[i], flow, msg);
}
}
-static void consume_receive_delete_message(HAMessage* msg)
+static void consume_ha_delete_message(HAMessage&, const FlowKey& key)
{
- FlowKey key;
- (void)read_flow_key(&key, msg);
Stream::delete_flow(&key);
}
-static void consume_receive_update_message(HAMessage* msg)
+static Flow* consume_ha_update_message(HAMessage& msg, const FlowKey& key)
{
- FlowKey key;
- (void)read_flow_key(&key, msg);
// flow will be nullptr if/when the session does not exist in the caches
Flow* flow = Stream::get_flow(&key);
+ if (!flow)
+ ha_stats.update_msgs_recv_no_flow++;
- assert(s_client_map);
-
- // pointer to the last byte in the message
- uint8_t* content_end = msg->content() + msg->content_length() - 1;
+ // pointer to one past the last byte in the message
+ const uint8_t* content_end = msg.buffer + msg.buffer_length;
- while( msg->cursor <= content_end )
+ while (msg.cursor < content_end)
{
// do we have sufficient message left to be able to have an HAClientHeader?
- if ( (int)(content_end - msg->cursor + 1) < (int)sizeof( HAClientHeader ) )
+ if (!msg.fits(sizeof(HAClientHeader)))
{
ErrorMessage("Consuming HA Update message - no HAClientHeader\n");
+ ha_stats.truncated_msgs++;
break;
}
- HAClientHeader* header = (HAClientHeader*)msg->cursor;
- msg->cursor += sizeof( HAClientHeader ); // step to the client content
-
- if ( (header->client >= s_handle_counter) ||
- ((*s_client_map)[header->client] == nullptr) )
+ HAClientHeader* header = (HAClientHeader*) msg.cursor;
+ if ((header->client >= ha->handle_counter) || (ha->client_map[header->client] == nullptr))
{
ErrorMessage("Consuming HA Update message - invalid client index\n");
+ ha_stats.unknown_client_idx++;
break;
}
+ msg.advance_cursor(sizeof(HAClientHeader)); // step to the client content
- if ( (content_end - msg->cursor + 1) < header->length )
+ if (!msg.fits(header->length))
{
ErrorMessage("Consuming HA Update message - message too short\n");
+ ha_stats.truncated_msgs++;
break;
}
// client is always the first segment of the message, the consume()
// invocation for the session client will create the flow. This
// flow can in turn be used by subsequent FlowHAClient's.
- if ( !(*s_client_map)[header->client]->consume(flow,&key,msg) )
+ if (!ha->client_map[header->client]->consume(flow, &key, msg, header->length))
{
ErrorMessage("Consuming HA Update message - error from client consume()\n");
+ ha_stats.client_consume_errors++;
break;
}
}
+
+ if (msg.cursor == content_end)
+ ha_stats.update_msgs_consumed++;
+
+ return flow;
}
-static void consume_receive_message(HAMessage* msg)
+static Flow* consume_ha_message(HAMessage& msg)
{
- HAMessageHeader* hdr = (HAMessageHeader*)msg->content();
+ ha_stats.msgs_recv++;
- if ( hdr->version != HA_MESSAGE_VERSION)
- return;
+ if (!msg.fits(sizeof(HAMessageHeader)))
+ {
+ ha_stats.truncated_msgs++;
+ return nullptr;
+ }
- switch ( hdr->event )
+ const HAMessageHeader* hdr = (HAMessageHeader*) msg.cursor;
+
+ if (hdr->version != HA_MESSAGE_VERSION)
+ {
+ ha_stats.msg_version_mismatch++;
+ return nullptr;
+ }
+
+ if (hdr->total_length != msg.buffer_length)
+ {
+ ha_stats.msg_length_mismatch++;
+ return nullptr;
+ }
+
+ msg.advance_cursor(sizeof(HAMessageHeader));
+
+ FlowKey key;
+ if (read_flow_key(msg, hdr, key) == 0)
+ return nullptr;
+
+ Flow* flow = nullptr;
+ switch (hdr->event)
{
case HA_DELETE_EVENT:
{
- consume_receive_delete_message(msg);
+ consume_ha_delete_message(msg, key);
+ ha_stats.delete_msgs_consumed++;
break;
}
case HA_UPDATE_EVENT:
{
- consume_receive_update_message(msg);
+ flow = consume_ha_update_message(msg, key);
+ ha_stats.update_msgs_recv++;
break;
}
- default:
- break;
}
+
+ return flow;
+}
+
+static void ha_sc_receive_handler(SCMessage* sc_msg)
+{
+ assert(sc_msg);
+
+ // SC received messages must have reference back to SideChannel object
+ assert(sc_msg->sc);
+
+ HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
+ consume_ha_message(ha_msg);
+
+ sc_msg->sc->discard_message(sc_msg);
}
-HighAvailability::HighAvailability(PortBitSet* ports, bool)
+HighAvailability::HighAvailability(PortBitSet* ports, bool daq_channel)
{
using namespace std::placeholders;
- // If we have ports, configure the side channel
- if ( ports != nullptr )
+ // If side channel ports were configured, find the first matching side channel to associate with
+ if (ports != nullptr)
{
- for ( SCPort port = 0; port < ports->size(); port++ )
- if ( ports->test(port) )
+ for (SCPort port = 0; port < ports->size(); port++)
+ {
+ if (!ports->test(port))
+ continue;
+
+ sc = SideChannelManager::get_side_channel(port);
+ if (sc)
{
- sc = SideChannelManager::get_side_channel(port);
- if (sc)
+ // We require a duplex channel
+ if (sc->get_direction() != Connector::CONN_DUPLEX)
{
- // We need a duplex channel
- if (sc->get_direction() != Connector::CONN_DUPLEX)
- {
- // Otherwise indicate that we don't have a sidechannel
- sc = nullptr;
- break;
- }
- sc->set_default_port(port);
- sc->register_receive_handler(
- std::bind(&HighAvailability::receive_handler, this, _1));
+ sc = nullptr;
+ continue;
}
- break;
+ sc->set_default_port(port);
+ sc->register_receive_handler(ha_sc_receive_handler);
}
+ break;
+ }
}
- s_client_map = new ClientMap;
- for ( int i=0; i<MAX_CLIENTS; i++ )
- (*s_client_map)[i] = nullptr;
-
- // Only looking for side channel processing - FIXIT-H
+ use_daq_channel = daq_channel;
}
HighAvailability::~HighAvailability()
{
- if ( sc )
- {
+ if (sc)
sc->unregister_receive_handler();
- }
-
- delete s_client_map;
}
-void HighAvailability::receive_handler(SCMessage* sc_msg)
+static void send_sc_update_message(Flow& flow, SideChannel& sc)
{
+ const uint16_t header_len = calculate_msg_header_length(flow);
+ const uint16_t content_len = calculate_update_msg_content_length(flow, false);
+
+ SCMessage* sc_msg = sc.alloc_transmit_message((uint32_t) (header_len + content_len));
assert(sc_msg);
+ HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
- // SC received messages must have reference back to SideChannel object
- assert(sc_msg->sc);
+ write_msg_header(flow, HA_UPDATE_EVENT, header_len + content_len, ha_msg);
+ write_update_msg_content(flow, ha_msg, false);
+ update_msg_header_length(ha_msg);
+ sc.transmit_message(sc_msg);
+}
+
+static void send_daq_update_message(Flow& flow, Packet& p)
+{
+ static THREAD_LOCAL uint8_t daq_io_buffer[UINT16_MAX];
- HAMessage ha_msg(sc_msg);
- consume_receive_message(&ha_msg);
+ HAMessage ha_msg(daq_io_buffer, sizeof(daq_io_buffer));
- sc_msg->sc->discard_message(sc_msg);
+ write_msg_header(flow, HA_UPDATE_EVENT, 0, ha_msg);
+ write_update_msg_content(flow, ha_msg, true);
+ uint32_t len = update_msg_header_length(ha_msg);
+
+ DIOCTL_FlowHAState fhs;
+ fhs.msg = p.daq_msg;
+ fhs.data = daq_io_buffer;
+ fhs.length = len;
+
+ p.daq_instance->ioctl(DIOCTL_SET_FLOW_HA_STATE, &fhs, sizeof(fhs));
+
+ ha_stats.daq_stores++;
}
-void HighAvailability::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr)
+void HighAvailability::process_update(Flow* flow, Packet* p)
{
- // Only looking for side channel processing - FIXIT-H
- UNUSED(pkthdr); // until we add DAQ communications channel
- if ( !sc || !flow )
+ if (!flow)
return;
// We must have the map array and the session client
- assert(s_client_map);
- assert((*s_client_map)[0]);
+ assert(client_map[0]);
- if ( !(*s_client_map)[0]->is_update_required(flow) &&
+ if ( !client_map[0]->is_update_required(flow) &&
( !flow->ha_state->check_pending(ALL_CLIENTS) ||
flow->ha_state->check_any(FlowHAState::NEW) ) )
return;
- const uint16_t header_len = calculate_msg_header_length(flow);
- const uint16_t content_len = calculate_update_msg_content_length(flow);
-
- SCMessage* sc_msg = sc->alloc_transmit_message((uint32_t)(header_len+content_len));
- assert(sc_msg);
- HAMessage ha_msg(sc_msg);
+ if (sc)
+ send_sc_update_message(*flow, *sc);
- write_msg_header(flow, HA_UPDATE_EVENT, content_len, &ha_msg);
- write_update_msg_content(flow, &ha_msg);
- sc->transmit_message(sc_msg);
+ if (use_daq_channel && p && p->daq_msg)
+ send_daq_update_message(*flow, *p);
flow->ha_state->clear(FlowHAState::NEW | FlowHAState::MODIFIED |
FlowHAState::MAJOR | FlowHAState::CRITICAL);
flow->ha_state->set_next_update();
}
-void HighAvailability::process_deletion(Flow* flow)
+static void send_sc_deletion_message(Flow& flow, SideChannel& sc)
{
- // No need to send message if we already have, we are in standby, or
- // we have just been created and haven't yet sent an update
- if ( flow->ha_state->check_any(FlowHAState::NEW |
- FlowHAState::DELETED |
- FlowHAState::STANDBY))
- return;
-
- // Deletion messages only use the side channel
- if ( !sc )
- return;
-
const uint32_t msg_len = calculate_msg_header_length(flow);
- SCMessage* sc_msg = sc->alloc_transmit_message(msg_len);
- HAMessage ha_msg(sc_msg);
+ SCMessage* sc_msg = sc.alloc_transmit_message(msg_len);
+ HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
// No content, only header+key
- write_msg_header(flow, HA_DELETE_EVENT, 0, &ha_msg);
+ write_msg_header(flow, HA_DELETE_EVENT, msg_len, ha_msg);
- sc->transmit_message(sc_msg);
+ sc.transmit_message(sc_msg);
+}
+
+void HighAvailability::process_deletion(Flow& flow)
+{
+ // No need to send message if we already have, we are in standby, or
+ // we have just been created and haven't yet sent an update
+ if (flow.ha_state->check_any(FlowHAState::NEW | FlowHAState::DELETED | FlowHAState::STANDBY))
+ return;
+
+ // Only produce deletion messages when using a side channel
+ if (sc)
+ send_sc_deletion_message(flow, *sc);
- flow->ha_state->add(FlowHAState::DELETED);
+ flow.ha_state->add(FlowHAState::DELETED);
}
void HighAvailability::process_receive()
{
- if ( sc != nullptr )
+ if (sc)
sc->process(DISPATCH_ALL_RECEIVE);
}
-// Called by the configuration parsing activity in the main thread.
-bool HighAvailabilityManager::instantiate(PortBitSet* mod_ports, bool mod_use_daq_channel,
- struct timeval* min_session_lifetime, struct timeval* min_sync_interval)
+Flow* HighAvailability::process_daq_import(Packet& p, FlowKey& key)
{
- ports = mod_ports;
- FlowHAState::config_timers(*min_session_lifetime, *min_sync_interval);
- use_daq_channel = mod_use_daq_channel;
+ Flow* flow = nullptr;
+
+ if (use_daq_channel && p.pkth->flags & DAQ_PKT_FLAG_HA_STATE_AVAIL)
+ {
+ DIOCTL_FlowHAState fhs;
+ fhs.msg = p.daq_msg;
+
+ if (p.daq_instance->ioctl(DIOCTL_GET_FLOW_HA_STATE, &fhs, sizeof(fhs)) == DAQ_SUCCESS)
+ {
+ HAMessage ha_msg(fhs.data, fhs.length);
+ flow = consume_ha_message(ha_msg);
+ ha_stats.daq_imports++;
+ // Validate that the imported flow matches up with the given flow key.
+ if (flow)
+ {
+ if (FlowKey::compare(&key, flow->key, 0) == 0)
+ {
+ // Clear the standby bit so that we don't immediately trigger a new data store
+ // FIXIT-L streamline the consume process so this doesn't have to be done here
+ flow->ha_state->clear(FlowHAState::STANDBY);
+ }
+ else
+ flow = nullptr;
+ }
+ }
+ }
- return true;
+ return flow;
}
-// Called prior to the starts of configuration in the main thread.
-void HighAvailabilityManager::pre_config_init()
+void HighAvailabilityManager::reset_config()
{
- ports = nullptr;
+ if (ports)
+ {
+ delete ports;
+ ports = nullptr;
+ }
+}
+
+void HighAvailabilityManager::term()
+{
+ reset_config();
+}
+
+// Called within the main thread after the initial configuration has been read
+void HighAvailabilityManager::configure(HighAvailabilityConfig* config)
+{
+ if (!config)
+ {
+ reset_config();
+ return;
+ }
+
+ if (config->ports)
+ ports = new PortBitSet(*config->ports);
+ else if (ports)
+ {
+ delete ports;
+ ports = nullptr;
+ }
+
+ FlowHAState::config_timers(config->min_session_lifetime, config->min_sync_interval);
+
+ use_daq_channel = config->daq_channel;
}
// Called within the packet thread prior to packet processing
void HighAvailabilityManager::thread_init()
{
// create a a thread local instance iff we are configured to operate.
- if ( (ports != nullptr) || use_daq_channel )
- ha = new HighAvailability(ports,use_daq_channel);
+ if (ports || use_daq_channel)
+ ha = new HighAvailability(ports, use_daq_channel);
else
ha = nullptr;
}
void HighAvailabilityManager::thread_term_beginning()
{
- shutting_down = true;
+ if (ha)
+ ha->shutting_down = true;
}
// Called in the packet thread at run-down
void HighAvailabilityManager::thread_term()
{
- if ( ha != nullptr )
+ if (ha)
{
delete ha;
ha = nullptr;
}
}
-void HighAvailabilityManager::process_update(Flow* flow, const DAQ_PktHdr_t* pkthdr)
+void HighAvailabilityManager::process_update(Flow* flow, Packet* p)
{
- if ( (ha != nullptr) && (pkthdr != nullptr) && (flow != nullptr) )
- ha->process_update(flow,pkthdr);
+ if (ha && p && flow)
+ ha->process_update(flow, p);
}
// Deletion messages only contain session content
-void HighAvailabilityManager::process_deletion(Flow* flow)
+void HighAvailabilityManager::process_deletion(Flow& flow)
{
- if ( (ha != nullptr) && !shutting_down )
+ if (ha && !ha->shutting_down)
ha->process_deletion(flow);
}
void HighAvailabilityManager::process_receive()
{
- if ( ha != nullptr )
+ if (ha)
ha->process_receive();
}
void HighAvailabilityManager::set_modified(Flow* flow)
{
- if ( (ha != nullptr) && (flow != nullptr) && (flow->ha_state != nullptr) )
+ if (ha && flow && flow->ha_state)
flow->ha_state->add(FlowHAState::MODIFIED);
}
bool HighAvailabilityManager::in_standby(Flow* flow)
{
- if ( (ha != nullptr) && (flow != nullptr) && (flow->ha_state != nullptr) )
+ if (ha && flow && flow->ha_state)
return flow->ha_state->check_any(FlowHAState::STANDBY);
- else
- return false;
+
+ return false;
+}
+
+Flow* HighAvailabilityManager::import(Packet& p, FlowKey& key)
+{
+ if (!ha)
+ return nullptr;
+
+ return ha->process_daq_import(p, key);
}
#include <daq_common.h>
+#include <cassert>
+
+#include "framework/bits.h"
#include "main/thread.h"
-#include "side_channel/side_channel.h"
//-------------------------------------------------------------------------
+struct HighAvailabilityConfig;
+
namespace snort
{
class Flow;
struct FlowKey;
-}
+struct Packet;
+struct ProfileStats;
// The FlowHAHandle is the dynamically allocated index used uniquely identify
// the client. Used both in the API and HA messages.
// Handle 0 is defined to be the primary session client.
// NOTE: The type, masks, and count values must be in sync,
typedef uint16_t FlowHAClientHandle;
-const FlowHAClientHandle SESSION_HA_CLIENT = 0x0000;
-const uint8_t SESSION_HA_CLIENT_INDEX = 0;
-const FlowHAClientHandle ALL_CLIENTS = 0xffff;
-// One client for each mask bit plus one 'automatic' session client
-// client handle = (1<<(client_index-1)
-// session client has handle of 0 and index of 0
-const uint8_t MAX_CLIENTS = 17;
-
-enum HAEvent
-{
- HA_DELETE_EVENT = 1,
- HA_UPDATE_EVENT = 2
-};
+constexpr FlowHAClientHandle ALL_CLIENTS = 0xffff;
// Each active flow will have an associated FlowHAState instance.
-class FlowHAState
+class SO_PUBLIC FlowHAState
{
public:
- static const uint8_t CRITICAL = 0x80;
- static const uint8_t MAJOR = 0x40;
-
- static const uint8_t NEW = 0x01;
- static const uint8_t MODIFIED = 0x02;
- static const uint8_t DELETED = 0x04;
- static const uint8_t STANDBY = 0x08;
- static const uint8_t NEW_SESSION = 0x10;
+ static constexpr uint8_t CRITICAL = 0x80;
+ static constexpr uint8_t MAJOR = 0x40;
+
+ enum : uint8_t
+ {
+ NEW = 0x01,
+ MODIFIED = 0x02,
+ DELETED = 0x04,
+ STANDBY = 0x08,
+ NEW_SESSION = 0x10,
+ };
FlowHAState();
void add(uint8_t state);
void clear(uint8_t state);
bool check_any(uint8_t state);
- static void config_timers(timeval,timeval);
+ static void config_timers(struct timeval, struct timeval);
bool sync_interval_elapsed();
void set_next_update();
void reset();
private:
- static const uint8_t INITIAL_STATE = 0x00;
- static const uint16_t NONE_PENDING = 0x0000;
-
+ static constexpr uint8_t INITIAL_STATE = 0x00;
+ static constexpr uint16_t NONE_PENDING = 0x0000;
static struct timeval min_session_lifetime;
static struct timeval min_sync_interval;
- uint8_t state;
- uint16_t pending;
- struct timeval next_update;
-};
-
-struct __attribute__((__packed__)) HAMessageHeader
-{
- uint8_t event;
- uint8_t version;
- uint16_t total_length;
- uint8_t key_type;
-};
-struct __attribute__((__packed__)) HAClientHeader
-{
- uint8_t client;
- uint8_t length;
+ struct timeval next_update;
+ uint16_t pending;
+ uint8_t state;
};
// Describe the message being produced or consumed.
class HAMessage
{
public:
- HAMessage(SCMessage* msg)
- { sc_msg = msg; }
-
- uint8_t* content()
- { return sc_msg->content; }
- uint16_t content_length()
- { return sc_msg->content_length; }
+ HAMessage(uint8_t* buffer, uint32_t buffer_length) :
+ buffer(buffer), buffer_length(buffer_length), cursor(buffer) { }
+
+ bool fits(uint32_t size) const { return size <= (buffer_length - (cursor - buffer)); }
+
+ void advance_cursor(uint32_t size)
+ {
+ assert(fits(size));
+ cursor += size;
+ }
+
+ void reset_cursor(uint8_t* pos = nullptr)
+ {
+ if (pos)
+ {
+ assert(pos >= buffer && pos <= (buffer + buffer_length));
+ cursor = pos;
+ }
+ else
+ cursor = buffer;
+ }
+
+ uint32_t cursor_position() const { return (uint32_t) (cursor - buffer); }
+
+ uint8_t* buffer;
+ const uint32_t buffer_length;
uint8_t* cursor;
-
-private:
- SCMessage* sc_msg;
};
// A FlowHAClient subclass for each producer/consumer of flow HA data
-class FlowHAClient
+class SO_PUBLIC FlowHAClient
{
public:
virtual ~FlowHAClient() = default;
- virtual bool consume(snort::Flow*&, snort::FlowKey*, HAMessage*) { return false; }
- virtual bool produce(snort::Flow*, HAMessage*) { return false; }
+ virtual bool consume(snort::Flow*&, const snort::FlowKey*, snort::HAMessage&, uint8_t size) = 0;
+ virtual bool produce(snort::Flow&, snort::HAMessage&) = 0;
virtual bool is_update_required(snort::Flow*) { return false; }
- virtual bool is_delete_required(snort::Flow*) { return false; }
- uint8_t get_message_size() { return header.length; }
- bool fit(HAMessage*, uint8_t);
- bool place(HAMessage*, uint8_t*, uint8_t);
+ uint8_t get_message_size() { return max_length; }
+
FlowHAClientHandle handle; // Actual handle for the instance
- HAClientHeader header;
+ uint8_t index;
+ uint8_t max_length;
protected:
- FlowHAClient(uint8_t, bool);
-
-};
-
-// HighAvailability is instantiated for each packet-thread.
-// FIXIT-M make the SideChannel the THREAD_LOCAL element and collapse
-// into HighAvailabilityManager
-class HighAvailability
-{
-public:
- HighAvailability(PortBitSet*,bool);
- ~HighAvailability();
-
- void process_update(snort::Flow*, const DAQ_PktHdr_t*);
- void process_deletion(snort::Flow*);
- void process_receive();
-
-private:
- void receive_handler(SCMessage*);
- SideChannel* sc = nullptr;
+ FlowHAClient(uint8_t length, bool session_client);
};
// Top level management of HighAvailability components.
-class HighAvailabilityManager
+class SO_PUBLIC HighAvailabilityManager
{
public:
- // Prior to parsing configuration
- static void pre_config_init();
-
- // Invoked by the module configuration parsing to create HA instance
- static bool instantiate(PortBitSet*,bool,struct timeval*,struct timeval*);
+ static void configure(HighAvailabilityConfig*);
static void thread_init();
static void thread_term_beginning(); // thread is about to be terminated
static void thread_term();
+ static void term();
// true if we are configured and able to process
static bool active();
- // Within the packet callback, analyze the packet and flow for potential update messages
- static void process_update(snort::Flow*, const DAQ_PktHdr_t*);
+ // Within packet processing, analyze the packet and flow for potential update messages
+ static void process_update(snort::Flow*, snort::Packet*);
// Anytime a flow is deleted, potentially generate a deletion message
- static void process_deletion(snort::Flow*);
+ static void process_deletion(snort::Flow&);
// Look for and dispatch receive messages.
static void process_receive();
static void set_modified(snort::Flow*);
static bool in_standby(snort::Flow*);
+ // Attempt to import HA data from the Packet
+ static Flow* import(snort::Packet& p, snort::FlowKey& key);
+
private:
+ static void reset_config();
+
HighAvailabilityManager() = delete;
static bool use_daq_channel;
static PortBitSet* ports;
- static THREAD_LOCAL bool shutting_down;
};
+}
+
#endif
#include <cmath>
#include "log/messages.h"
-
-#include "ha.h"
+#include "main/snort_config.h"
+#include "profiler/profiler_defs.h"
using namespace snort;
"side channel message port list" },
{ "min_age", Parameter::PT_REAL, "0.0:100.0", "1.0",
- "minimum session life before HA updates" },
+ "minimum session life in seconds before HA updates" },
- { "min_sync", Parameter::PT_REAL, "0.0:100.0", "1.0",
- "minimum interval between HA updates" },
+ { "min_sync", Parameter::PT_REAL, "0.0:100.0", "0.1",
+ "minimum interval in seconds between HA updates" },
{ nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
};
+static const PegInfo ha_pegs[] =
+{
+ { CountType::SUM, "msgs_recv", "total messages received" },
+ { CountType::SUM, "update_msgs_recv", "update messages received" },
+ { CountType::SUM, "update_msgs_recv_no_flow", "update messages received without a local flow" },
+ { CountType::SUM, "update_msgs_consumed", "update messages fully consumed" },
+ { CountType::SUM, "delete_msgs_consumed", "deletion messages consumed" },
+ { CountType::SUM, "daq_stores", "states stored via daq" },
+ { CountType::SUM, "daq_imports", "states imported via daq" },
+ { CountType::SUM, "msg_version_mismatch", "messages received with a version mismatch" },
+ { CountType::SUM, "msg_length_mismatch", "messages received with an inconsistent total length" },
+ { CountType::SUM, "truncated_msgs", "truncated messages received" },
+ { CountType::SUM, "unknown_key_type", "messages received with an unknown flow key type" },
+ { CountType::SUM, "unknown_client_idx", "messages received with an unknown client index" },
+ { CountType::SUM, "client_consume_errors", "client data consume failure count" },
+ { CountType::END, nullptr, nullptr }
+};
+
+THREAD_LOCAL HAStats ha_stats;
+THREAD_LOCAL ProfileStats ha_perf_stats;
+
+//-------------------------------------------------------------------------
+
static void convert_real_seconds_to_timeval(double seconds, struct timeval* tv)
{
double whole = trunc(seconds);
HighAvailabilityModule::HighAvailabilityModule() :
Module(HA_NAME, HA_HELP, ha_params)
{
- config.enabled = false;
- config.daq_channel = false;
- config.ports = nullptr;
- convert_real_seconds_to_timeval(1.0, &config.min_session_lifetime);
- convert_real_seconds_to_timeval(0.1, &config.min_sync_interval);
+ config = nullptr;
}
HighAvailabilityModule::~HighAvailabilityModule()
{
- delete config.ports;
+ if (config)
+ delete config;
+}
+
+const PegInfo* HighAvailabilityModule::get_pegs() const
+{
+ return ha_pegs;
+}
+
+PegCount* HighAvailabilityModule::get_counts() const
+{
+ return (PegCount*) &ha_stats;
}
ProfileStats* HighAvailabilityModule::get_profile() const
-{ return &ha_perf_stats; }
+{
+ return &ha_perf_stats;
+}
+
+bool HighAvailabilityModule::begin(const char*, int, SnortConfig*)
+{
+ assert(!config);
+ config = new HighAvailabilityConfig();
+
+ return true;
+}
bool HighAvailabilityModule::set(const char*, Value& v, SnortConfig*)
{
if ( v.is("enable") )
- config.enabled = v.get_bool();
-
+ {
+ config->enabled = v.get_bool();
+ }
else if ( v.is("daq_channel") )
- config.daq_channel = v.get_bool();
-
+ {
+ config->daq_channel = v.get_bool();
+ }
else if ( v.is("ports") )
{
- if ( !config.ports )
- config.ports = new PortBitSet;
- v.get_bits(*(config.ports) );
+ if ( !config->ports )
+ config->ports = new PortBitSet;
+ v.get_bits(*(config->ports));
}
else if ( v.is("min_age") )
{
- convert_real_seconds_to_timeval(v.get_real(), &config.min_session_lifetime);
+ convert_real_seconds_to_timeval(v.get_real(), &config->min_session_lifetime);
}
else if ( v.is("min_sync") )
{
- convert_real_seconds_to_timeval(v.get_real(), &config.min_sync_interval);
+ convert_real_seconds_to_timeval(v.get_real(), &config->min_sync_interval);
}
else
return false;
return true;
}
-bool HighAvailabilityModule::begin(const char*, int, SnortConfig*)
-{
- return true;
-}
-
-bool HighAvailabilityModule::end(const char*, int, SnortConfig*)
+bool HighAvailabilityModule::end(const char*, int, SnortConfig* sc)
{
- if ( config.enabled &&
- !HighAvailabilityManager::instantiate(config.ports, config.daq_channel,
- &config.min_session_lifetime, &config.min_sync_interval) )
- {
- ParseWarning(WARN_CONF, "Illegal HighAvailability configuration");
- return false;
- }
+ if ( config->enabled )
+ sc->ha_config = config;
+ else
+ delete config;
+ config = nullptr;
return true;
}
struct HighAvailabilityConfig
{
+ ~HighAvailabilityConfig() { delete ports; }
+
bool enabled;
bool daq_channel;
PortBitSet* ports = nullptr;
struct timeval min_sync_interval;
};
-extern THREAD_LOCAL SimpleStats ha_stats;
-extern THREAD_LOCAL snort::ProfileStats ha_perf_stats;
-
class HighAvailabilityModule : public snort::Module
{
public:
HighAvailabilityModule();
~HighAvailabilityModule() override;
- bool set(const char*, snort::Value&, snort::SnortConfig*) override;
bool begin(const char*, int, snort::SnortConfig*) override;
+ bool set(const char*, snort::Value&, snort::SnortConfig*) override;
bool end(const char*, int, snort::SnortConfig*) override;
- PegCount* get_counts() const override
- { return (PegCount*)&ha_stats; }
-
- const PegInfo* get_pegs() const override
- { return snort::simple_pegs; }
+ const PegInfo* get_pegs() const override;
+ PegCount* get_counts() const override;
snort::ProfileStats* get_profile() const override;
{ return GLOBAL; }
private:
- HighAvailabilityConfig config;
+ HighAvailabilityConfig* config;
};
+struct HAStats
+{
+ PegCount msgs_recv;
+ PegCount update_msgs_recv;
+ PegCount update_msgs_recv_no_flow;
+ PegCount update_msgs_consumed;
+ PegCount delete_msgs_consumed;
+ PegCount daq_stores;
+ PegCount daq_imports;
+ PegCount msg_version_mismatch;
+ PegCount msg_length_mismatch;
+ PegCount truncated_msgs;
+ PegCount unknown_key_type;
+ PegCount unknown_client_idx;
+ PegCount client_consume_errors;
+};
+
+extern THREAD_LOCAL HAStats ha_stats;
+extern THREAD_LOCAL snort::ProfileStats ha_perf_stats;
+
#endif
-add_cpputest( ha_test
- SOURCES ../ha.cc
-)
-
-add_cpputest( ha_module_test
- SOURCES
- ../ha_module.cc
- ../../framework/module.cc
- ../../framework/value.cc
- ../../sfip/sf_ip.cc
- $<TARGET_OBJECTS:catch_tests>
-)
+add_cpputest( ha_test )
add_cpputest( flow_stash_test
SOURCES ../flow_stash.cc
{
DB->_subscribe(key, h);
}
-void DataBus::subscribe_default(const char* key, DataHandler* h, SnortConfig* sc)
+void DataBus::subscribe_default(const char* key, DataHandler* h, SnortConfig*)
{
DB->_subscribe(key, h);
}
void DataBus::unsubscribe(const char*, DataHandler*) {}
-void DataBus::unsubscribe_default(const char*, DataHandler*, SnortConfig* sc) {}
+void DataBus::unsubscribe_default(const char*, DataHandler*, SnortConfig*) {}
void DataBus::publish(const char* key, DataEvent& e, Flow* f)
{
+++ /dev/null
-//--------------------------------------------------------------------------
-// Copyright (C) 2015-2019 Cisco and/or its affiliates. All rights reserved.
-//
-// This program is free software; you can redistribute it and/or modify it
-// under the terms of the GNU General Public License Version 2 as published
-// by the Free Software Foundation. You may not use, modify or distribute
-// this program under any other version of the GNU General Public License.
-//
-// This program is distributed in the hope that it will be useful, but
-// WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-// General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along
-// with this program; if not, write to the Free Software Foundation, Inc.,
-// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-//--------------------------------------------------------------------------
-
-// ha_module_test.cc author Ed Borgoyn <eborgoyn@cisco.com>
-// unit test main
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "flow/ha.h"
-#include "flow/ha_module.h"
-
-#include "log/messages.h"
-#include "main/snort_debug.h"
-#include "profiler/profiler.h"
-
-#include <CppUTest/CommandLineTestRunner.h>
-#include <CppUTest/TestHarness.h>
-
-using namespace snort;
-
-THREAD_LOCAL SimpleStats ha_stats;
-THREAD_LOCAL ProfileStats ha_perf_stats;
-
-void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { }
-void show_stats(PegCount*, const PegInfo*, IndexVec&, const char*) { }
-void show_stats(PegCount*, const PegInfo*, IndexVec&, const char*, FILE*) { }
-
-namespace snort
-{
-void LogMessage(const char*,...) { }
-void ParseWarning(WarningGroup, const char*, ...) { }
-char* snort_strdup(const char* str) { return strdup(str); }
-}
-
-static bool s_port_1_set = false;
-static bool s_use_daq = false;
-static bool s_instantiate_called = false;
-
-static char* make_bit_string(int bit)
-{
- static char bit_string[65537];
- for ( int i=0; i<65536; i++)
- bit_string[i] = (bit == i) ? '1' : '0';
-
- bit_string[65536] = '\0';
-
- return bit_string;
-}
-
-bool HighAvailabilityManager::instantiate(PortBitSet* mod_ports, bool mod_use_daq_channel, struct timeval*, struct timeval*)
-{
- s_instantiate_called = true;
- s_port_1_set = mod_ports->test(1);
- s_use_daq = mod_use_daq_channel;
-
- return true;
-}
-
-TEST_GROUP(high_availability_module_test)
-{
- void setup() override
- {
- }
-
- void teardown() override
- {
- }
-};
-
-TEST(high_availability_module_test, test_ha_valid)
-{
- Value ports_val(make_bit_string(1));
- Value enable_val(true);
- Value min_age_val(1.0);
- Value min_sync_val(0.1);
- Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"};
- Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr };
- Parameter min_age_param = {"min_age", Parameter::PT_REAL, nullptr, "1.0", nullptr };
- Parameter min_sync_param = {"min_sync", Parameter::PT_REAL, nullptr, "0.1", nullptr };
-
- HighAvailabilityModule module;
-
- ports_val.set(&ports_param);
- enable_val.set(&enable_param);
- min_age_val.set(&min_age_param);
- min_sync_val.set(&min_sync_param);
-
- s_instantiate_called = false;
- s_port_1_set = false;
- s_use_daq = false;
-
- module.begin("high_availability", 0, nullptr);
- module.set("high_availability.ports", ports_val, nullptr);
- module.set("high_availability.enable", enable_val, nullptr);
- module.set("high_availability.min_age", min_age_val, nullptr);
- module.set("high_availability.min_sync", min_sync_val, nullptr);
- module.end("high_availability", 0, nullptr);
-
- CHECK(s_instantiate_called == true);
- CHECK(s_port_1_set == true);
- CHECK(s_use_daq == false);
-}
-
-TEST(high_availability_module_test, test_ha_disabled)
-{
- Value enable_val(false);
- Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr };
-
- HighAvailabilityModule module;
-
- enable_val.set(&enable_param);
-
- s_instantiate_called = false;
- s_port_1_set = false;
- s_use_daq = false;
-
- module.begin("high_availability", 0, nullptr);
- module.set("high_availability.enable", enable_val, nullptr);
- module.end("high_availability", 0, nullptr);
-
- CHECK(s_instantiate_called == false);
- CHECK(s_port_1_set == false);
- CHECK(s_use_daq == false);
-}
-
-TEST(high_availability_module_test, test_ha_valid_daq)
-{
- Value ports_val(make_bit_string(1));
- Value enable_val(true);
- Value daq_val(true);
- Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"};
- Parameter enable_param = {"enable", Parameter::PT_BOOL, nullptr, "false", nullptr };
- Parameter daq_param = {"daq_channel", Parameter::PT_BOOL, nullptr, "false", nullptr };
-
- HighAvailabilityModule module;
-
- ports_val.set(&ports_param);
- enable_val.set(&enable_param);
- daq_val.set(&daq_param);
-
- s_instantiate_called = false;
- s_port_1_set = false;
- s_use_daq = false;
-
- module.begin("high_availability", 0, nullptr);
- module.set("high_availability.ports", ports_val, nullptr);
- module.set("high_availability.enable", enable_val, nullptr);
- module.set("high_availability.daq_channel", daq_val, nullptr);
- module.end("high_availability", 0, nullptr);
-
- CHECK(s_instantiate_called == true);
- CHECK(s_port_1_set == true);
- CHECK(s_use_daq == true);
-}
-
-int main(int argc, char** argv)
-{
- return CommandLineTestRunner::RunAllTests(argc, argv);
-}
-
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//--------------------------------------------------------------------------
-// ha_test.cc author Ed Borgoyn <eborgoyn@cisco.com>
+// ha_test.cc authors Ed Borgoyn <eborgoyn@cisco.com>, Michael Altizer <mialtize@cisco.com>
// unit test main
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
-#include "flow/ha.h"
-
-#include "flow/flow.h"
-#include "flow/flow_key.h"
-#include "main/snort_debug.h"
-#include "stream/stream.h"
+#include "flow/ha.cc"
#include <CppUTest/CommandLineTestRunner.h>
#include <CppUTest/TestHarness.h>
class StreamHAClient;
-static const uint8_t s_test_key[] =
+static const FlowKey s_test_key =
{
-TEST_KEY
+ { 1, 2, 3, 4 },
+ { 5, 6, 7, 8 },
+ 9,
+ 10,
+ 11,
+ 12,
+ 13,
+ 14,
+ PktType::TCP,
+ 14,
+ 0,
};
-static const uint8_t s_delete_message[] =
+static struct __attribute__((__packed__)) TestDeleteMessage {
+ HAMessageHeader mhdr;
+ FlowKey key;
+} s_delete_message =
{
- 0x01,
- 0x03,
- 0x00,
- 0x00,
- 0x01,
-TEST_KEY
+ {
+ HA_DELETE_EVENT,
+ HA_MESSAGE_VERSION,
+ 0x35,
+ KEY_TYPE_IP6
+ },
+ s_test_key
};
-static const uint8_t s_update_stream_message[] =
+static struct __attribute__((__packed__)) TestUpdateMessage {
+ HAMessageHeader mhdr;
+ FlowKey key;
+ HAClientHeader schdr;
+ uint8_t scmsg[10];
+} s_update_stream_message =
{
- 0x02,
- 0x03,
- 0x00,
- 0x00,
- 0x01,
-TEST_KEY,
- 0x00,
- 10,
+ {
+ HA_UPDATE_EVENT,
+ HA_MESSAGE_VERSION,
+ 0x41,
+ KEY_TYPE_IP6
+ },
+ s_test_key,
+ {
+ 0,
+ 10
+ },
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
};
static SCMessage s_sc_message;
static SCMessage s_rec_sc_message;
static bool s_stream_consume_called = false;
+static uint8_t s_stream_consume_size = 0;
static bool s_other_consume_called = false;
+static uint8_t s_other_consume_size = 0;
static bool s_get_session_called = false;
static bool s_delete_session_called = false;
static bool s_transmit_message_called = false;
static uint8_t s_message_length = 0;
static Flow s_flow;
static FlowKey s_flowkey;
-static DAQ_PktHdr_t s_pkthdr;
+static Packet s_pkt;
static StreamHAClient* s_ha_client;
static FlowHAClient* s_other_ha_client;
static std::function<void (SCMessage*)> s_handler = nullptr;
public:
StreamHAClient() : FlowHAClient(10, true) { }
~StreamHAClient() override = default;
- bool consume(Flow*&, FlowKey*, HAMessage*) override
+ bool consume(Flow*&, const FlowKey*, HAMessage& msg, uint8_t size) override
{
s_stream_consume_called = true;
+ s_stream_consume_size = size;
+
+ for ( uint8_t i = 0; i < 10; i++ )
+ {
+ if (*msg.cursor != i)
+ return false;
+ msg.advance_cursor(sizeof(*msg.cursor));
+ }
+
return true;
}
- bool produce(Flow*, HAMessage* msg) override
+ bool produce(Flow&, HAMessage& msg) override
{
- for ( uint8_t i=0; i<10; i++ )
- *(msg->cursor)++ = i;
+ if (!msg.fits(10))
+ return false;
+
+ for ( uint8_t i = 0; i < 10; i++ )
+ {
+ *msg.cursor = i;
+ msg.advance_cursor(sizeof(*msg.cursor));
+ }
return true;
}
- uint8_t get_message_size() { return 10; }
bool is_update_required(Flow*) override { return s_stream_update_required; }
- bool fit(HAMessage*,uint8_t) { return true; }
- bool place(HAMessage*,uint8_t*,uint8_t) { return true; }
-
-private:
};
class OtherHAClient : public FlowHAClient
public:
OtherHAClient() : FlowHAClient(5, false) { }
~OtherHAClient() override = default;
- bool consume(Flow*&, HAMessage*)
+ bool consume(Flow*&, const FlowKey*, HAMessage& msg, uint8_t size) override
{
s_other_consume_called = true;
+ s_other_consume_size = size;
+
+ for ( uint8_t i = 0; i < 5; i++ )
+ {
+ if (*msg.cursor != i)
+ return false;
+ msg.advance_cursor(sizeof(*msg.cursor));
+ }
+
return true;
}
- bool produce(Flow*, HAMessage* msg) override
+ bool produce(Flow&, HAMessage& msg) override
{
- for ( uint8_t i=0; i<5; i++ )
- *(msg->cursor)++ = i;
+ if (!msg.fits(5))
+ return false;
+
+ for ( uint8_t i = 0; i < 5; i++ )
+ {
+ *msg.cursor = i;
+ msg.advance_cursor(sizeof(*msg.cursor));
+ }
return true;
}
- uint8_t get_message_size() { return 5; }
bool is_update_required(Flow*) override { return s_other_update_required; }
- bool fit(HAMessage*,uint8_t) { return true; }
- bool place(HAMessage*,uint8_t*,uint8_t) { return true; }
-
-private:
};
-Flow* Stream::get_flow(const FlowKey* flowkey)
+//-------------------------------------------------------------------------
+// stubs, spies, etc.
+//-------------------------------------------------------------------------
+
+THREAD_LOCAL HAStats ha_stats = { };
+
+Flow* Stream::get_flow(const FlowKey* flowkey)
{
s_flowkey = *flowkey;
s_get_session_called = true;
return &s_flow;
}
+Packet::Packet(bool) { }
+Packet::~Packet() = default;
+
void Stream::delete_flow(const FlowKey* flowkey)
{
s_flowkey = *flowkey;
void LogMessage(const char*,...) { }
}
+int FlowKey::compare(const void*, const void*, size_t) { return 0; }
+
+int SFDAQInstance::ioctl(DAQ_IoctlCmd, void*, size_t) { return DAQ_SUCCESS; }
+
void packet_gettimeofday(struct timeval* tv)
{ *tv = s_packet_time; }
Flow::Flow() { ha_state = new FlowHAState; key = new FlowKey; }
+Flow::~Flow() { delete key; delete ha_state; }
FlowStash::~FlowStash() { }
s_transmit_message_called = true;
s_message_content = msg->content;
s_message_length = msg->content_length;
- return true; }
+ return true;
+}
SCMessage* SideChannel::alloc_transmit_message(uint32_t len)
{
return &s_sc_message;
}
+//-------------------------------------------------------------------------
+// tests
+//-------------------------------------------------------------------------
+
TEST_GROUP(high_availability_manager_test)
{
- void setup() override
- {
- MemoryLeakWarningPlugin::turnOffNewDeleteOverloads();
- }
-
void teardown() override
{
- MemoryLeakWarningPlugin::turnOnNewDeleteOverloads();
+ HighAvailabilityManager::term();
}
};
TEST(high_availability_manager_test, init_term)
{
- HighAvailabilityManager::pre_config_init();
+ HighAvailabilityConfig hac = { };
+ HighAvailabilityManager::configure(&hac);
HighAvailabilityManager::thread_init();
CHECK(HighAvailabilityManager::active()==false);
HighAvailabilityManager::thread_term();
TEST(high_availability_manager_test, inst_init_term)
{
- HighAvailabilityManager::pre_config_init();
- PortBitSet port_set;
- port_set.set(1);
- struct timeval age = { 1, 0 };
- struct timeval interval = { 0, 500000 };
- HighAvailabilityManager::instantiate(&port_set, false, &age, &interval);
+ HighAvailabilityConfig hac;
+ hac.enabled = true;
+ hac.daq_channel = false;
+ hac.ports = new PortBitSet();
+ hac.ports->set(1);
+ hac.min_session_lifetime = { 1, 0 };
+ hac.min_sync_interval = { 0, 500000 };
+
+ HighAvailabilityManager::configure(&hac);
HighAvailabilityManager::thread_init();
s_ha_client = new StreamHAClient;
CHECK(HighAvailabilityManager::active()==true);
{
void setup() override
{
- MemoryLeakWarningPlugin::turnOffNewDeleteOverloads();
- HighAvailabilityManager::pre_config_init();
- PortBitSet port_set;
- port_set.set(1);
- struct timeval age = { 1, 0 };
- struct timeval interval = { 0, 500000 };
- HighAvailabilityManager::instantiate(&port_set, false, &age, &interval);
+ memset(&ha_stats, 0, sizeof(ha_stats));
+
+ HighAvailabilityConfig hac;
+ hac.enabled = true;
+ hac.daq_channel = false;
+ hac.ports = new PortBitSet();
+ hac.ports->set(1);
+ hac.min_session_lifetime = { 1, 0 };
+ hac.min_sync_interval = { 0, 500000 };
+
+ HighAvailabilityManager::configure(&hac);
HighAvailabilityManager::thread_init();
s_ha_client = new StreamHAClient;
s_other_ha_client = new OtherHAClient;
delete s_other_ha_client;
delete s_ha_client;
HighAvailabilityManager::thread_term();
- MemoryLeakWarningPlugin::turnOnNewDeleteOverloads();
+ HighAvailabilityManager::term();
}
};
TEST(high_availability_test, receive_deletion)
{
s_delete_session_called = false;
- s_message_content = (uint8_t*)s_delete_message;
+ s_message_content = (uint8_t*) &s_delete_message;
s_message_length = sizeof(s_delete_message);
HighAvailabilityManager::process_receive();
CHECK(s_delete_session_called == true);
TEST(high_availability_test, receive_update_stream_only)
{
s_stream_consume_called = false;
- s_message_content = (uint8_t*)s_update_stream_message;
+ s_stream_consume_size = 0;
+ s_message_content = (uint8_t*) &s_update_stream_message;
s_message_length = sizeof(s_update_stream_message);
HighAvailabilityManager::process_receive();
CHECK(s_stream_consume_called == true);
+ CHECK(s_stream_consume_size == 10);
CHECK(memcmp((const void*)&s_flowkey, (const void*)&s_test_key, sizeof(s_test_key)) == 0);
}
TEST(high_availability_test, transmit_deletion)
{
s_transmit_message_called = false;
- HighAvailabilityManager::process_deletion(&s_flow);
+ HighAvailabilityManager::process_deletion(s_flow);
CHECK(s_transmit_message_called == true);
}
s_transmit_message_called = false;
s_stream_update_required = false;
s_other_update_required = false;
- HighAvailabilityManager::process_update(&s_flow, &s_pkthdr);
+ HighAvailabilityManager::process_update(&s_flow, &s_pkt);
CHECK(s_transmit_message_called == false);
}
s_transmit_message_called = false;
s_stream_update_required = true;
s_other_update_required = false;
- HighAvailabilityManager::process_update(&s_flow, &s_pkthdr);
+ HighAvailabilityManager::process_update(&s_flow, &s_pkt);
CHECK(s_transmit_message_called == true);
}
s_other_update_required = true;
CHECK(s_other_ha_client->handle == 1);
s_flow.ha_state->set_pending(s_other_ha_client->handle);
- HighAvailabilityManager::process_update(&s_flow, &s_pkthdr);
+ HighAvailabilityManager::process_update(&s_flow, &s_pkt);
CHECK(s_transmit_message_called == true);
}
+TEST(high_availability_test, read_flow_key_error_v4)
+{
+ HAMessageHeader hdr = { 0, 0, 0, KEY_TYPE_IP4 };
+ HAMessage msg((uint8_t*) &s_test_key, KEY_SIZE_IP4 / 2);
+ FlowKey key;
+
+ CHECK(read_flow_key(msg, &hdr, key) == 0);
+ CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, read_flow_key_error_v6)
+{
+ HAMessageHeader hdr = { 0, 0, 0, KEY_TYPE_IP6 };
+ HAMessage msg((uint8_t*) &s_test_key, KEY_SIZE_IP6 / 2);
+ FlowKey key;
+
+ CHECK(read_flow_key(msg, &hdr, key) == 0);
+ CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, read_flow_key_error_unknown)
+{
+ HAMessageHeader hdr = { 0, 0, 0, 0x42 };
+ HAMessage msg((uint8_t*) &s_test_key, sizeof(s_test_key));
+ FlowKey key;
+
+ CHECK(read_flow_key(msg, &hdr, key) == 0);
+ CHECK(ha_stats.unknown_key_type == 1);
+}
+
+TEST(high_availability_test, consume_error_truncated_client_hdr)
+{
+ HAClientHeader chdr = { 0, 0 };
+ HAMessage msg((uint8_t*) &chdr, sizeof(chdr) / 2);
+ FlowKey key;
+
+ consume_ha_update_message(msg, key);
+ CHECK(ha_stats.update_msgs_consumed == 0);
+ CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, consume_error_invalid_client_idx)
+{
+ HAClientHeader chdr = { 0x42, 0 };
+ HAMessage msg((uint8_t*) &chdr, sizeof(chdr));
+ FlowKey key;
+
+ consume_ha_update_message(msg, key);
+ CHECK(ha_stats.update_msgs_consumed == 0);
+ CHECK(ha_stats.unknown_client_idx == 1);
+}
+
+TEST(high_availability_test, consume_error_truncated_client_msg)
+{
+ struct __attribute__((__packed__))
+ {
+ HAClientHeader chdr = { 0, 0x42 };
+ uint8_t cmsg[0x42 / 2] = { };
+ } input;
+ HAMessage msg((uint8_t*) &input, sizeof(input));
+ FlowKey key;
+
+ consume_ha_update_message(msg, key);
+ CHECK(ha_stats.update_msgs_consumed == 0);
+ CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, consume_error_client_consume)
+{
+ struct __attribute__((__packed__))
+ {
+ HAClientHeader chdr = { 0, 10 };
+ uint8_t cmsg[0x42 / 2] = { };
+ } input;
+ HAMessage msg((uint8_t*) &input, sizeof(input));
+ FlowKey key;
+
+ consume_ha_update_message(msg, key);
+ CHECK(ha_stats.update_msgs_consumed == 0);
+ CHECK(ha_stats.client_consume_errors == 1);
+}
+
+TEST(high_availability_test, consume_error_truncated_msg_hdr)
+{
+ HAMessageHeader hdr = { };
+ HAMessage msg((uint8_t*) &hdr, sizeof(hdr) / 2);
+
+ CHECK(consume_ha_message(msg) == nullptr);
+ CHECK(ha_stats.truncated_msgs == 1);
+}
+
+TEST(high_availability_test, consume_error_version_mismatch)
+{
+ HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION + 1, 0, 0 };
+ HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
+
+ CHECK(consume_ha_message(msg) == nullptr);
+ CHECK(ha_stats.msg_version_mismatch == 1);
+}
+
+TEST(high_availability_test, consume_error_length_mismatch)
+{
+ HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION, 0x42, 0 };
+ HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
+
+ CHECK(consume_ha_message(msg) == nullptr);
+ CHECK(ha_stats.msg_length_mismatch == 1);
+}
+
+TEST(high_availability_test, produce_error_client_hdr_overflow)
+{
+ uint8_t buffer[sizeof(HAClientHeader) / 2];
+ HAMessage msg(buffer, sizeof(buffer));
+ Flow flow;
+
+ write_update_msg_client(s_ha_client, flow, msg);
+ CHECK(msg.cursor == msg.buffer);
+}
+
+TEST(high_availability_test, produce_error_client_produce)
+{
+ uint8_t buffer[sizeof(HAClientHeader)];
+ HAMessage msg(buffer, sizeof(buffer));
+ Flow flow;
+
+ write_update_msg_client(s_ha_client, flow, msg);
+ CHECK(msg.cursor == msg.buffer);
+}
+
int main(int argc, char** argv)
{
return CommandLineTestRunner::RunAllTests(argc, argv);
PacketTracer::dump(p);
}
- HighAvailabilityManager::process_update(p->flow, p->pkth);
+ HighAvailabilityManager::process_update(p->flow, p);
p->pkth = nullptr; // no longer avail upon sig segv
#endif
SideChannelManager::pre_config_init();
- HighAvailabilityManager::pre_config_init();
ModuleManager::init();
ScriptManager::load_scripts(snort_cmd_line_conf->script_paths);
if ( !sc->output.empty() )
EventManager::instantiate(sc->output.c_str(), sc);
+ HighAvailabilityManager::configure(sc->ha_config);
+
if (SnortConfig::alert_before_pass())
sc->rule_order = "reset block drop alert pass log";
}
CleanupProtoNames();
+ HighAvailabilityManager::term();
SideChannelManager::term();
ModuleManager::term();
PluginManager::release_plugins();
#include "filters/rate_filter.h"
#include "filters/sfrf.h"
#include "filters/sfthreshold.h"
+#include "flow/ha_module.h"
#include "hash/xhash.h"
#include "helpers/process.h"
#include "ips_options/ips_flowbits.h"
delete[] state;
delete thread_config;
+ delete ha_config;
if (gtp_ports)
delete gtp_ports;
TUNNEL_MPLS = 0x80
};
-struct ClassType;
+class FastPatternConfig;
+class RuleState;
+class ThreadConfig;
+
struct srmm_table_t;
struct sopg_table_t;
-
-struct MemoryConfig;
-struct LatencyConfig;
-struct PORT_RULE_MAP;
-struct RuleListNode;
-struct RulePortTables;
-class RuleState;
+struct ClassType;
struct DetectionFilterConfig;
struct EventQueueConfig;
-struct IpsActionsConfig;
-class FastPatternConfig;
+struct FlowBitState;
struct FrameworkConfig;
-struct ThresholdConfig;
+struct HighAvailabilityConfig;
+struct IpsActionsConfig;
+struct LatencyConfig;
+struct MemoryConfig;
+struct PORT_RULE_MAP;
struct RateFilterConfig;
-struct SFDAQConfig;
-class ThreadConfig;
struct ReferenceSystemNode;
+struct RuleListNode;
+struct RulePortTables;
+struct SFDAQConfig;
+struct ThresholdConfig;
struct VarNode;
-struct _IntelPmHandles;
-struct FlowBitState;
namespace snort
{
unsigned num_slots = 0;
ThreadConfig* thread_config;
+ HighAvailabilityConfig* ha_config = nullptr;
//------------------------------------------------------
//Reload inspector related
AppIdInspector() = default;
};
-AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector& inspector)
+AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector&)
: FlowData(0) { }
AppIdSession::~AppIdSession() = default;
s_discovery_manager = new ServiceDiscovery();
return *s_discovery_manager;
}
-AppId snort::host_cache_find_app_mapping(snort::SfIp const*, Port port , Protocol proto){ return 0; }
+AppId snort::host_cache_find_app_mapping(snort::SfIp const*, Port, Protocol){ return 0; }
// Stubs for ClientDiscovery
ClientDiscovery::ClientDiscovery(){}
ClientDiscovery::~ClientDiscovery() {}
typedef int32_t AppId;
Flow::Flow() = default;
+Flow::~Flow() = default;
class FakeFlow : public Flow
{
CHECK_TRUE((fv & flags) == flags);
mock_session->clear_session_flags(flags);
fv = appid_session_api->get_appid_session_attribute(flags);
- CHECK_TRUE((fv & flags) == 0)
+ CHECK_TRUE((fv & flags) == 0);
}
}
void AppIdDebug::activate(const Flow*, const AppIdSession*, bool) { active = true; }
-AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector& inspector)
+AppIdSession::AppIdSession(IpProtocol, const SfIp*, uint16_t, AppIdInspector&)
: FlowData(0) {}
AppIdSession::~AppIdSession() = default;
AppIdDiscovery::AppIdDiscovery() {}
TEST(sip_splitter_test, callispaf)
{
bool result = ssut.splitter_is_paf();
- CHECK(result)
+ CHECK(result);
}
TEST(sip_splitter_test, reset_states_test)
protocol_ha->deactivate_session(flow);
}
-static Flow* protocol_create_session(FlowKey* key)
+static Flow* protocol_create_session(const FlowKey* key)
{
ProtocolHA* protocol_ha = get_protocol_ha(key->pkt_type);
return protocol_ha ? protocol_ha->create_session(key) : nullptr;
}
-static bool is_client_lower(Flow* flow)
+static bool is_client_lower(const Flow& flow)
{
- if (flow->client_ip.fast_lt6(flow->server_ip))
+ if (flow.client_ip.fast_lt6(flow.server_ip))
return true;
- if (flow->server_ip.fast_lt6(flow->client_ip))
+ if (flow.server_ip.fast_lt6(flow.client_ip))
return false;
- switch (flow->key->pkt_type)
+ switch (flow.key->pkt_type)
{
case PktType::TCP:
case PktType::UDP:
- if (flow->client_port < flow->server_port)
+ if (flow.client_port < flow.server_port)
return true;
break;
default:
return false;
}
-bool StreamHAClient::consume(Flow*& flow, FlowKey* key, HAMessage* msg)
+bool StreamHAClient::consume(Flow*& flow, const FlowKey* key, HAMessage& msg, uint8_t size)
{
assert(key);
- assert(msg);
- // Is the message long enough to have our content?
- if ( ((unsigned)(msg->content_length()) - (unsigned)(msg->cursor - msg->content())) <
- sizeof(SessionHAContent) )
+ if (size != sizeof(SessionHAContent))
return false;
- SessionHAContent* hac = (SessionHAContent*)msg->cursor;
- msg->cursor += sizeof(SessionHAContent);
+ SessionHAContent* hac = (SessionHAContent*) msg.cursor;
// If flow is missing, we need to create a new one.
if ( flow == nullptr )
flow->ha_state->add(FlowHAState::STANDBY);
}
+ msg.advance_cursor(sizeof(SessionHAContent));
+
return true;
}
-bool StreamHAClient::produce(Flow* flow, HAMessage* msg)
+bool StreamHAClient::produce(Flow& flow, HAMessage& msg)
{
- assert(flow);
- assert(msg);
+ if (!msg.fits(sizeof(SessionHAContent)))
+ return false;
- // Check for buffer overflows
- if ( (int)(msg->cursor - msg->content()) <= (int)(msg->content_length() -
- sizeof(SessionHAContent)) )
- {
- SessionHAContent* hac = (SessionHAContent*)msg->cursor;
+ SessionHAContent* hac = (SessionHAContent*) msg.cursor;
- memcpy(&(hac->ssn_state),&(flow->ssn_state),sizeof(LwState));
- hac->flow_state = flow->flow_state;
- hac->flags = 0;
- msg->cursor += sizeof(SessionHAContent);
+ hac->ssn_state = flow.ssn_state;
+ hac->flow_state = flow.flow_state;
+ hac->flags = 0;
+ if (!is_client_lower(flow))
+ hac->flags |= SessionHAContent::FLAG_LOW;
+ hac->flags |= SessionHAContent::FLAG_IP6;
- if ( !is_client_lower(flow) )
- hac->flags |= SessionHAContent::FLAG_LOW;
+ msg.advance_cursor(sizeof(SessionHAContent));
- hac->flags |= SessionHAContent::FLAG_IP6;
- return true;
- }
- else
- return false;
+ return true;
}
static void update_flags(Flow* flow)
and that we're not overrunning the synchronization threshold. */
if ( flow->ha_state->sync_interval_elapsed() )
return true;
- else
- return flow->ha_state->check_any(FlowHAState::CRITICAL);
-}
-bool StreamHAClient::is_delete_required(Flow*)
-{
- return true;
+ return flow->ha_state->check_any(FlowHAState::CRITICAL);
}
ProtocolHA::ProtocolHA(PktType protocol)
}
}
-void ProtocolHA::process_deletion(Flow* flow)
+void ProtocolHA::process_deletion(Flow& flow)
{
HighAvailabilityManager::process_deletion(flow);
}
#include "flow/flow.h"
#include "flow/ha.h"
-//-------------------------------------------------------------------------
-
class __attribute__((__packed__)) SessionHAContent
{
public:
snort::LwState ssn_state;
snort::Flow::FlowState flow_state;
uint8_t flags;
- static const uint8_t FLAG_LOW = 0x01; // client address / port is low in key
- static const uint8_t FLAG_IP6 = 0x02; // key addresses are ip6
+ static constexpr uint8_t FLAG_LOW = 0x01; // client address / port is low in key
+ static constexpr uint8_t FLAG_IP6 = 0x02; // key addresses are ip6
};
-class StreamHAClient : public FlowHAClient
+class StreamHAClient : public snort::FlowHAClient
{
public:
StreamHAClient() : FlowHAClient(sizeof(SessionHAContent), true) { }
- bool consume(snort::Flow*&, snort::FlowKey*, HAMessage*) override;
- bool produce(snort::Flow*, HAMessage*) override;
+ bool consume(snort::Flow*&, const snort::FlowKey*, snort::HAMessage&, uint8_t size) override;
+ bool produce(snort::Flow&, snort::HAMessage&) override;
bool is_update_required(snort::Flow*) override;
- bool is_delete_required(snort::Flow*) override;
-
-private:
};
class ProtocolHA
public:
ProtocolHA(PktType);
virtual ~ProtocolHA();
- virtual void delete_session(snort::Flow*) { }
- virtual snort::Flow* create_session(snort::FlowKey*) { return nullptr; }
+ virtual snort::Flow* create_session(const snort::FlowKey*) { return nullptr; }
virtual void deactivate_session(snort::Flow*) { }
- virtual void process_deletion(snort::Flow*);
-
-private:
+ virtual void process_deletion(snort::Flow&);
};
class StreamHAManager
using namespace snort;
-Flow* IcmpHA::create_session(FlowKey* key)
+Flow* IcmpHA::create_session(const FlowKey* key)
{
assert(key);
Flow* flow = Stream::new_flow(key);
THREAD_LOCAL IcmpHA* IcmpHAManager::icmp_ha = nullptr;
-void IcmpHAManager::process_deletion(Flow* flow)
+void IcmpHAManager::process_deletion(Flow& flow)
{
if( icmp_ha != nullptr )
icmp_ha->process_deletion(flow);
{
public:
IcmpHA() : ProtocolHA(PktType::ICMP) { }
- snort::Flow* create_session(snort::FlowKey*) override;
+ snort::Flow* create_session(const snort::FlowKey*) override;
private:
};
class IcmpHAManager
{
public:
- static void process_deletion(snort::Flow* flow);
+ static void process_deletion(snort::Flow& flow);
static void tinit();
static void tterm();
static THREAD_LOCAL IcmpHA* icmp_ha;
void IcmpSession::clear()
{
IcmpSessionCleanup(flow);
- IcmpHAManager::process_deletion(flow);
+ IcmpHAManager::process_deletion(*flow);
}
int IcmpSession::process(Packet* p)
using namespace snort;
-Flow* IpHA::create_session(FlowKey* key)
+Flow* IpHA::create_session(const FlowKey* key)
{
assert(key);
THREAD_LOCAL IpHA* IpHAManager::ip_ha = nullptr;
-void IpHAManager::process_deletion(Flow* flow)
+void IpHAManager::process_deletion(Flow& flow)
{
if( ip_ha != nullptr )
ip_ha->process_deletion(flow);
{
public:
IpHA() : ProtocolHA(PktType::IP) { }
- snort::Flow* create_session(snort::FlowKey*) override;
+ snort::Flow* create_session(const snort::FlowKey*) override;
private:
};
class IpHAManager
{
public:
- static void process_deletion(snort::Flow* flow);
+ static void process_deletion(snort::Flow& flow);
static void tinit();
static void tterm();
static THREAD_LOCAL IpHA* ip_ha;
}
IpSessionCleanup(flow, &tracker);
- IpHAManager::process_deletion(flow);
+ IpHAManager::process_deletion(*flow);
}
bool IpSession::setup(Packet* p)
if ( Stream::expected_flow(flow, p) )
return 0;
#endif
- IpHAManager::process_deletion(flow);
+ IpHAManager::process_deletion(*flow);
}
if ( Stream::blocked_flow(p) || Stream::ignored_flow(flow, p) )
void release_packet(Packet* p)
{
- delete p->flow->session;
delete p->context;
delete p;
}
// this does NOT flush data
clear_session( true, false, false );
- TcpHAManager::process_deletion(flow);
+ TcpHAManager::process_deletion(*flow);
}
void TcpStreamSession::set_splitter(bool to_server, StreamSplitter* ss)
Flow* Stream::new_flow(const FlowKey* key)
{ return flow_con->new_flow(key); }
-Flow* Stream::new_flow(FlowKey* key)
-{
- return flow_con ? flow_con->new_flow(key) : nullptr;
-}
-
void Stream::delete_flow(const FlowKey* key)
{ flow_con->delete_flow(key); }
static void timeout_flows(time_t cur_time);
static void prune_flows();
static bool expected_flow(Flow*, Packet*);
- static Flow* new_flow(FlowKey*);
// Looks in the flow cache for flow session with specified key and returns
// pointer to flow session object if found, otherwise null.
using namespace snort;
-Flow* TcpHA::create_session(FlowKey* key)
+Flow* TcpHA::create_session(const FlowKey* key)
{
assert(key);
THREAD_LOCAL TcpHA* TcpHAManager::tcp_ha = nullptr;
-void TcpHAManager::process_deletion(Flow* flow)
+void TcpHAManager::process_deletion(Flow& flow)
{
if( tcp_ha != nullptr )
tcp_ha->process_deletion(flow);
{
public:
TcpHA() : ProtocolHA(PktType::TCP) { }
- snort::Flow* create_session(snort::FlowKey*) override;
+ snort::Flow* create_session(const snort::FlowKey*) override;
void deactivate_session(snort::Flow*) override;
private:
class TcpHAManager
{
public:
- static void process_deletion(snort::Flow* flow);
+ static void process_deletion(snort::Flow& flow);
static void tinit();
static void tterm();
static THREAD_LOCAL TcpHA* tcp_ha;
flow->set_expire(p, config->session_timeout);
}
else
- TcpHAManager::process_deletion(p->flow);
+ TcpHAManager::process_deletion(*p->flow);
if (pkt_action_mask & ACTION_DISABLE_INSPECTION)
{
clear_session(true, true, false, p);
tcpStats.timeouts++;
- TcpHAManager::process_deletion(flow);
+ TcpHAManager::process_deletion(*flow);
}
}
using namespace snort;
-Flow* UdpHA::create_session(FlowKey* key)
+Flow* UdpHA::create_session(const FlowKey* key)
{
assert(key);
THREAD_LOCAL UdpHA* UdpHAManager::udp_ha = nullptr;
-void UdpHAManager::process_deletion(Flow* flow)
+void UdpHAManager::process_deletion(Flow& flow)
{
if( udp_ha != nullptr )
udp_ha->process_deletion(flow);
{
public:
UdpHA() : ProtocolHA(PktType::UDP) { }
- snort::Flow* create_session(snort::FlowKey*) override;
+ snort::Flow* create_session(const snort::FlowKey*) override;
private:
};
class UdpHAManager
{
public:
- static void process_deletion(snort::Flow* flow);
+ static void process_deletion(snort::Flow& flow);
static void tinit();
static void tterm();
static THREAD_LOCAL UdpHA* udp_ha;
void UdpSession::clear()
{
UdpSessionCleanup(flow);
- UdpHAManager::process_deletion(flow);
+ UdpHAManager::process_deletion(*flow);
flow->clear();
}
flow->restart();
flow->ssn_state.session_flags |= SSNFLAG_SEEN_SENDER;
udpStats.created++;
- UdpHAManager::process_deletion(flow);
+ UdpHAManager::process_deletion(*flow);
}
ProcessUdp(flow, p, pc, nullptr);