#include "profiler/profiler.h"
#include "protocols/packet.h"
#include "sfip/sf_ip.h"
+#include "src/utils/endian.h"
#include "utils/util.h"
using namespace snort;
// cache required to dump the output
static NetflowCache* dump_cache = nullptr;
+// Netflow version 9 Template fields cache.
+typedef std::unordered_map<uint16_t, std::vector<Netflow9TemplateField>> TemplateFieldCache;
+static THREAD_LOCAL TemplateFieldCache* template_cache = nullptr;
// -----------------------------------------------------------------------------
// static functions
return false;
}
-// FIXIT-M - keeping only few checks right now
-static bool decode_netflow_v9(const unsigned char* data, uint16_t size)
+static bool version_9_record_update(const unsigned char* data, uint32_t unix_secs,
+ std::vector<Netflow9TemplateField>::iterator field, NetflowSessionRecord &record)
+{
+
+ switch ( field->field_type )
+ {
+ case NETFLOW_PROTOCOL:
+
+ // invalid protocol
+ if( field->field_length != sizeof(record.proto) )
+ return false;
+
+ record.proto = (uint8_t)*data;
+ break;
+
+ case NETFLOW_TCP_FLAGS:
+
+ // invalid tcp flags
+ if( field->field_length != sizeof(record.tcp_flags ) )
+ return false;
+
+ record.tcp_flags = (uint8_t)*data;
+ break;
+
+ case NETFLOW_SRC_PORT:
+
+ // invalid src port
+ if( field->field_length != sizeof(record.initiator_port) )
+ return false;
+
+ record.initiator_port = ntohs(*(const uint16_t*) data);
+ break;
+
+ case NETFLOW_SRC_IP:
+
+ // invalid source ip
+ if( field->field_length != sizeof(uint32_t) )
+ return false;
+
+ // Invalid source IP address provided
+ if ( record.initiator_ip.set((const uint32_t *)data, AF_INET) != SFIP_SUCCESS )
+ return false;
+ break;
+
+ case NETFLOW_SRC_IPV6:
+
+ // Invalid source IP address provided
+ if ( record.initiator_ip.set((const uint32_t *)data, AF_INET6) != SFIP_SUCCESS )
+ return false;
+ break;
+
+ case NETFLOW_DST_PORT:
+
+ // invalid destination port
+ if( field->field_length != sizeof(record.responder_port) )
+ return false;
+
+ record.responder_port = ntohs(*(const uint16_t*) data);
+ break;
+
+ case NETFLOW_DST_IP:
+
+ // invalid length
+ if( field->field_length != sizeof(uint32_t) )
+ return false;
+
+ // Invalid destination IP address
+ if ( record.responder_ip.set((const uint32_t *)data, AF_INET) != SFIP_SUCCESS )
+ return false;
+ break;
+
+ case NETFLOW_DST_IPV6:
+
+ // Invalid destination IP address
+ if ( record.responder_ip.set((const uint32_t *)data, AF_INET6) != SFIP_SUCCESS )
+ return false;
+ break;
+
+ case NETFLOW_IPV4_NEXT_HOP:
+
+ // invalid length
+ if( field->field_length != sizeof(uint32_t) )
+ return false;
+
+ // Invalid next-hop IP address
+ if ( record.next_hop_ip.set((const uint32_t *)data, AF_INET) != SFIP_SUCCESS )
+ return false;
+ break;
+
+ case NETFLOW_LAST_PKT:
+
+ if( field->field_length != sizeof(record.last_pkt_second) )
+ return false;
+
+ record.last_pkt_second = unix_secs + ntohl(*(const time_t*)data)/1000;
+
+ // invalid flow time value
+ if( record.last_pkt_second > MAX_TIME )
+ return false;
+
+ break;
+
+ case NETFLOW_FIRST_PKT:
+
+ if( field->field_length != sizeof(record.first_pkt_second) )
+ return false;
+
+ record.first_pkt_second = unix_secs + ntohl(*(const time_t*)data)/1000;
+
+ // invalid flow time value
+ if( record.first_pkt_second > MAX_TIME )
+ return 0;
+
+ break;
+
+ case NETFLOW_IN_BYTES:
+
+ if ( field->field_length == sizeof(uint64_t) )
+ record.initiator_bytes = ntohll(*(const uint64_t*)data);
+ else if ( field->field_length == sizeof(uint32_t) )
+ record.initiator_bytes = (uint64_t)ntohl(*(const uint32_t*)data);
+ else if ( field->field_length == sizeof(uint16_t) )
+ record.initiator_bytes = (uint64_t)ntohs(*(const uint16_t*) data);
+ else
+ return false;
+
+ break;
+
+ case NETFLOW_IN_PKTS:
+
+ if ( field->field_length == sizeof(uint64_t) )
+ record.initiator_pkts = ntohll(*(const uint64_t*)data);
+ else if ( field->field_length == sizeof(uint32_t) )
+ record.initiator_pkts = (uint64_t)ntohl(*(const uint32_t*)data);
+ else if ( field->field_length == sizeof(uint16_t) )
+ record.initiator_pkts = (uint64_t)ntohs(*(const uint16_t*) data);
+ else
+ return false;
+
+ break;
+
+ case NETFLOW_SRC_TOS:
+
+ if( field->field_length != sizeof(record.nf_src_tos) )
+ return false;
+
+ record.nf_src_tos = (uint8_t)*data;
+ break;
+
+ case NETFLOW_DST_TOS:
+
+ if( field->field_length != sizeof(record.nf_dst_tos))
+ return false;
+
+ record.nf_dst_tos = (uint8_t)*data;
+ break;
+
+ case NETFLOW_SNMP_IN:
+
+ if ( field->field_length == sizeof(uint32_t) )
+ record.nf_snmp_in = ntohl(*(const uint32_t*)data);
+ else if ( field->field_length == sizeof(uint16_t) )
+ record.nf_snmp_in = (uint32_t)ntohs(*(const uint16_t*) data);
+ else
+ return false;
+
+ break;
+
+ case NETFLOW_SNMP_OUT:
+
+ if ( field->field_length == sizeof(uint32_t) )
+ record.nf_snmp_out = ntohl(*(const uint32_t*)data);
+ else if ( field->field_length == sizeof(uint16_t) )
+ record.nf_snmp_out = (uint32_t)ntohs(*(const uint16_t*) data);
+ else
+ return false;
+
+ break;
+
+ case NETFLOW_SRC_AS:
+
+ if( field->field_length == sizeof(uint16_t) )
+ record.nf_src_as = (uint32_t)ntohs(*(const uint16_t*) data);
+ else if( field->field_length == sizeof(uint32_t) )
+ record.nf_src_as = ntohl(*(const uint32_t*)data);
+ else
+ return false;
+ break;
+
+ case NETFLOW_DST_AS:
+
+ if( field->field_length == sizeof(uint16_t) )
+ record.nf_dst_as = (uint32_t)ntohs(*(const uint16_t*) data);
+ else if( field->field_length == sizeof(uint32_t) )
+ record.nf_dst_as = ntohl(*(const uint32_t*)data);
+ else
+ return false;
+ break;
+
+ case NETFLOW_SRC_MASK:
+ case NETFLOW_SRC_MASK_IPV6:
+
+ if( field->field_length != sizeof(record.nf_src_mask) )
+ return false;
+
+ record.nf_src_mask = (uint8_t)*data;
+ break;
+
+ case NETFLOW_DST_MASK:
+ case NETFLOW_DST_MASK_IPV6:
+
+ if( field->field_length != sizeof(record.nf_dst_mask) )
+ return false;
+
+ record.nf_dst_mask = (uint8_t)*data;
+ break;
+
+ default:
+ break;
+ }
+
+ return true;
+
+}
+
+static bool decode_netflow_v9(const unsigned char* data, uint16_t size,
+ const Packet* p, const NetflowConfig* cfg)
{
Netflow9Hdr header;
const Netflow9Hdr *pheader;
+ const Netflow9FlowSet *flowset;
+ const uint8_t *end;
+ const uint8_t *flowset_end;
+ uint16_t records;
if( size < sizeof(Netflow9Hdr) )
return false;
+ end = data + size;
+
pheader = (const Netflow9Hdr *)data;
header.flow_count = ntohs(pheader->flow_count);
- // Invalid header flow count
+ // invalid header flow count
if( header.flow_count < NETFLOW_MIN_COUNT or header.flow_count > NETFLOW_MAX_COUNT)
return false;
+ // stats
+ netflow_stats.records += header.flow_count;
+ records = header.flow_count;
+
+ header.sys_uptime = ntohl(pheader->sys_uptime) / 1000;
+ header.unix_secs = ntohl(pheader->unix_secs);
+ header.unix_secs -= header.sys_uptime;
+
+ const NetflowRules* p_rules = nullptr;
+ auto d = cfg->device_rule_map.find(*p->ptrs.ip_api.get_src());
+
+ if ( d != cfg->device_rule_map.end() )
+ p_rules = &(d->second);
+
+ if ( p_rules == nullptr )
+ return false;
+
+ const int zone = p->pkth->ingress_index;
+
+ data += sizeof(Netflow9Hdr);
+
+ while ( data < end )
+ {
+ uint16_t length, f_id;
+
+ // invalid data length
+ if ( data + sizeof(*flowset) > end )
+ return false;
+
+ flowset = (const Netflow9FlowSet *)data;
+
+ // length includes the flowset_id and length fields
+ length = ntohs(flowset->field_length);
+
+ // invalid Netflow length
+ if( data + length > end )
+ return false;
+
+ flowset_end = data + length;
+ data += sizeof(*flowset);
+
+ // field id
+ f_id = ntohs(flowset->field_id);
+
+ // It's a data flowset
+ if ( f_id > 255 && template_cache->count(f_id) > 0 )
+ {
+ std::vector<Netflow9TemplateField> tf;
+ tf = template_cache->at(f_id);
+
+ while( data < flowset_end && records )
+ {
+
+ NetflowSessionRecord record = {};
+ bool bad_field = false;
+
+ for ( auto t_field = tf.begin(); t_field != tf.end(); ++t_field )
+ {
+ // invalid field length
+ if ( data + t_field->field_length > flowset_end )
+ bad_field = true;
+
+ if ( !bad_field )
+ {
+ bool status = version_9_record_update(data, header.unix_secs, t_field, record);
+
+ if ( !status )
+ bad_field = true;
+ }
+
+ data += t_field->field_length;
+ }
+
+ if ( bad_field )
+ {
+ ++netflow_stats.invalid_netflow_record;
+ records--;
+ continue;
+ }
+
+ // filter based on configuration
+ if ( !filter_record(p_rules, zone, &record.initiator_ip, &record.responder_ip) )
+ {
+ records--;
+ continue;
+ }
+
+ // create flow event here
+
+ // check if record exists
+ auto result = netflow_cache->find(record.initiator_ip);
+
+ if ( result != netflow_cache->end() )
+ {
+ // record exists and hence first remove the element
+ netflow_cache->erase(record.initiator_ip);
+ --netflow_stats.unique_flows;
+ }
+
+ // emplace doesn't replace element if exist, hence removing it first
+ netflow_cache->emplace(record.initiator_ip, record);
+ ++netflow_stats.unique_flows;
+
+ records--;
+ }
+ }
+ // template flowset
+ else if ( f_id == 0 )
+ {
+ // Step through the templates in this flowset and store them
+ while ( data < flowset_end && records )
+ {
+ const Netflow9Template* t_template;
+ uint16_t field_count, t_id;
+ const Netflow9TemplateField* field;
+ std::vector<Netflow9TemplateField> tf;
+
+ t_template = (const Netflow9Template *)data;
+ field_count = ntohs(t_template->template_field_count);
+
+ if ( data + sizeof(*t_template) > flowset_end )
+ return false;
+
+ data += sizeof(*t_template);
+
+ // template id
+ t_id = ntohs(t_template->template_id);
+
+ // Parse the data and add the template fields for this template id
+ for ( int i = 0; i < field_count; i++ )
+ {
+ // invalid flowset field
+ if ( data + sizeof(*field) > flowset_end )
+ return false;
+
+ field = (const Netflow9TemplateField *)data;
+ tf.emplace_back(ntohs(field->field_type), ntohs(field->field_length));
+ data += sizeof(*field);
+ }
+
+ if ( field_count > 0 )
+ {
+ // remove if there any entry exists for this template
+ auto is_erased = template_cache->erase(t_id);
+
+ // count only unique templates
+ if ( is_erased == 1 )
+ --netflow_stats.v9_templates;
+
+ // add template to cache
+ template_cache->emplace(t_id, tf);
+
+ // update the total templates count
+ ++netflow_stats.v9_templates;
+
+ // don't count template as record
+ netflow_stats.records--;
+ }
+ records--;
+ }
+ }
+
+ // It's an option template flowset
+ else if ( f_id == 1 )
+ {
+ ++netflow_stats.v9_options_template;
+
+ // don't count option template as record
+ netflow_stats.records--;
+ }
+
+ // its data and no templates are defined yet
+ else
+ {
+ // Skip options, we don't use them currently
+ data = flowset_end;
+ ++netflow_stats.v9_missing_template;
+ }
+
+ if ( flowset_end != data )
+ {
+ // Invalid flowset Length
+ if ( length != (length >> 2 ) << 2 )
+ return false;
+
+ // Data is not at flowset_end
+ if ( flowset_end - data > 3 )
+ return false;
+
+ data = flowset_end;
+ }
+ }
return true;
}
auto d = cfg->device_rule_map.find(*p->ptrs.ip_api.get_src());
if ( d != cfg->device_rule_map.end() )
p_rules = &(d->second);
-
+
if ( p_rules == nullptr )
return false;
const int zone = p->pkth->ingress_index;
++netflow_stats.version_5;
}
}
- else if (version == 9)
+ else if ( version == 9 )
{
- retval = decode_netflow_v9(data, size);
+ retval = decode_netflow_v9(data, size, p, cfg);
if ( retval )
{
++netflow_stats.packets;
assert(netflow_cache);
if ( ! validate_netflow(p, config) )
- ++netflow_stats.invalid_netflow_pkts;
+ ++netflow_stats.invalid_netflow_record;
}
void NetflowInspector::tinit()
{
if ( !netflow_cache )
netflow_cache = new NetflowCache;
+
+ if ( !template_cache )
+ template_cache = new TemplateFieldCache;
+
}
void NetflowInspector::tterm()
}
}
delete netflow_cache;
+ delete template_cache;
}
//-------------------------------------------------------------------------
&netflow_api.base,
nullptr
};
-