From 17d9616fde7f06257920ad4de6ba09f030a34d1f Mon Sep 17 00:00:00 2001 From: Phil Young Date: Mon, 17 Jul 2017 10:59:00 -0400 Subject: [PATCH] napatech: Implementation of packet counters added util-napatech module which contains implementation threads for processing statistics. And modified source-napatech and runmode-napatech to instantiate the threads. napatech: Implementation of packet counters napatech: implementation of statistics counters napatech: Implementation of packet counters. napatech: added util-napatech module napatech: added utils-napatech module. added include declaration and napatech specific structure when HAVE_NAPATECH is defined. Added util-napatech module to project. --- src/Makefile.am | 1 + src/decode.h | 8 + src/runmode-napatech.c | 174 +++++----- src/runmode-napatech.h | 10 +- src/source-napatech.c | 236 +++++++------ src/source-napatech.h | 17 +- src/util-napatech.c | 736 +++++++++++++++++++++++++++++++++++++++++ src/util-napatech.h | 60 ++++ 8 files changed, 1045 insertions(+), 197 deletions(-) create mode 100644 src/util-napatech.c create mode 100644 src/util-napatech.h diff --git a/src/Makefile.am b/src/Makefile.am index 09fcafae2d..52f02a1e0e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -427,6 +427,7 @@ util-mpm-ac-tile.c util-mpm-ac-tile.h \ util-mpm-ac-tile-small.c \ util-mpm-hs.c util-mpm-hs.h \ util-mpm.c util-mpm.h \ +util-napatech.c util-napatech.h \ util-optimize.h \ util-pages.c util-pages.h \ util-path.c util-path.h \ diff --git a/src/decode.h b/src/decode.h index 64a416ce3d..4c0eeee7d5 100644 --- a/src/decode.h +++ b/src/decode.h @@ -37,6 +37,11 @@ #include "util-cuda-vars.h" #endif /* __SC_CUDA_SUPPORT__ */ +#ifdef HAVE_NAPATECH +#include "util-napatech.h" +#endif /* HAVE_NAPATECH */ + + typedef enum { CHECKSUM_VALIDATION_DISABLE, CHECKSUM_VALIDATION_ENABLE, @@ -586,6 +591,9 @@ typedef struct Packet_ #ifdef __SC_CUDA_SUPPORT__ CudaPacketVars cuda_pkt_vars; #endif +#ifdef HAVE_NAPATECH + NapatechPacketVars ntpv; +#endif } #ifdef HAVE_MPIPE /* mPIPE requires packet buffers to be aligned to 128 byte boundaries. */ diff --git a/src/runmode-napatech.c b/src/runmode-napatech.c index 6d883f7402..d03fed623c 100644 --- a/src/runmode-napatech.c +++ b/src/runmode-napatech.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2012 Open Information Security Foundation +/* Copyright (C) 2012-2017 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -21,30 +21,35 @@ * \author nPulse Technologies, LLC. * \author Matt Keeler */ + #include "suricata-common.h" #include "tm-threads.h" #include "conf.h" #include "runmodes.h" #include "output.h" - #include "util-debug.h" #include "util-time.h" #include "util-cpu.h" #include "util-affinity.h" #include "util-runmodes.h" #include "util-device.h" - +#include "util-napatech.h" #include "runmode-napatech.h" - -// need NapatechStreamDevConf structure -#include "source-napatech.h" +#include "source-napatech.h" // need NapatechStreamDevConf structure #define NT_RUNMODE_AUTOFP 1 #define NT_RUNMODE_WORKERS 2 static const char *default_mode = NULL; #ifdef HAVE_NAPATECH -static int num_configured_streams = 0; + +#define MAX_STREAMS 256 +static uint16_t num_configured_streams = 0; + +uint16_t GetNumConfiguredStreams(void) { + return num_configured_streams; +} + #endif const char *RunModeNapatechGetDefaultMode(void) @@ -69,128 +74,96 @@ void RunModeNapatechRegister(void) #endif } + #ifdef HAVE_NAPATECH -int NapatechRegisterDeviceStreams() + + +static int NapatechRegisterDeviceStreams(void) { - NtInfoStream_t info_stream; - NtInfo_t info; - char error_buf[100]; - int status; - int i; - char live_dev_buf[9]; - int use_all_streams; - ConfNode *ntstreams; - ConfNode *stream_id; - if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) - { + /* Display the configuration mode */ + int use_all_streams; + if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) { SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf"); exit(EXIT_FAILURE); } - if (use_all_streams) - { + if (use_all_streams) { SCLogInfo("Using All Napatech Streams"); - // When using the default streams we need to query the service for a list of all configured - if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) - { - NT_ExplainError(status, error_buf, sizeof(error_buf) -1); - SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buf); - return -1; - } + } else { + SCLogInfo("Using Selected Napatech Streams"); + } - info.cmd = NT_INFO_CMD_READ_STREAM; - if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) - { - NT_ExplainError(status, error_buf, sizeof(error_buf) -1); - SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buf); - return -1; - } + /* Get the stream ID's either from the conf or by querying Napatech */ + NapatechStreamConfig stream_config[MAX_STREAMS]; + uint16_t stream_cnt = NapatechGetStreamConfig(stream_config); + num_configured_streams = stream_cnt; + SCLogDebug("Configuring %d Napatech Streams...", stream_cnt); - num_configured_streams = info.u.stream.data.count; - for (i = 0; i < num_configured_streams; i++) - { - // The Stream IDs do not have to be sequential - snprintf(live_dev_buf, sizeof(live_dev_buf), "nt%d", info.u.stream.data.streamIDList[i]); - LiveRegisterDevice(live_dev_buf); - } - - if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) - { - NT_ExplainError(status, error_buf, sizeof(error_buf) -1); - SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", error_buf); - return -1; - } - } - else - { - SCLogInfo("Using Selected Napatech Streams"); - // When not using the default streams we need to parse the array of streams from the conf - if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) - { - SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf"); + for (uint16_t inst = 0; inst < stream_cnt; ++inst) { + char *plive_dev_buf = SCCalloc(1, 9); + if (unlikely(plive_dev_buf == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter."); exit(EXIT_FAILURE); } - - // Loop through all stream numbers in the array and register the devices - TAILQ_FOREACH(stream_id, &ntstreams->head, next) - { - if (stream_id == NULL) - { - SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration"); - exit(EXIT_FAILURE); - } - num_configured_streams++; - - snprintf(live_dev_buf, sizeof(live_dev_buf), "nt%d", atoi(stream_id->val)); - LiveRegisterDevice(live_dev_buf); - } + snprintf(plive_dev_buf, 9, "nt%d", stream_config[inst].stream_id); + SCLogInfo("registering Napatech device: %s - active stream%sfound.", + plive_dev_buf, stream_config[inst].is_active ? " " : " NOT "); + LiveRegisterDevice(plive_dev_buf); } + + /* Napatech stats come from a separate thread. This will surpress + * the counters when suricata exits. + */ + LiveDeviceHasNoStats(); return 0; } -void *NapatechConfigParser(const char *device) +static void *NapatechConfigParser(const char *device) { - // Expect device to be of the form nt%d where %d is the stream id to use + /* Expect device to be of the form nt%d where %d is the stream id to use */ int dev_len = strlen(device); - struct NapatechStreamDevConf *conf = SCMalloc(sizeof(struct NapatechStreamDevConf)); - if (unlikely(conf == NULL)) + struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf)); + if (unlikely(conf == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device name."); return NULL; - if (dev_len < 3 || dev_len > 5) - { + } + if (dev_len < 3 || dev_len > 5) { SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG, "Could not parse config for device: %s - invalid length", device); return NULL; } - // device+5 is a pointer to the beginning of the stream id after the constant nt portion - conf->stream_id = atoi(device+2); + /* device+5 is a pointer to the beginning of the stream id after the constant nt portion */ + conf->stream_id = atoi(device + 2); - // Set the host buffer allowance for this stream - // Right now we just look at the global default - there is no per-stream hba configuration - if (ConfGetInt("napatech.hba", &conf->hba) == 0) + /* Set the host buffer allowance for this stream + * Right now we just look at the global default - there is no per-stream hba configuration + */ + if (ConfGetInt("napatech.hba", &conf->hba) == 0) { conf->hba = -1; - + } return (void *) conf; } -int NapatechGetThreadsCount(void *conf __attribute__((unused))) { - // No matter which live device it is there is no reason to ever use more than 1 thread - // 2 or more thread would cause packet duplication +static int NapatechGetThreadsCount(void *conf __attribute__((unused))) +{ + /* No matter which live device it is there is no reason to ever use more than 1 thread + 2 or more thread would cause packet duplication */ return 1; } static int NapatechInit(int runmode) { int ret; - char errbuf[100]; + char error_buf[100]; RunModeInitialize(); TimeModeSetLive(); /* Initialize the API and check version compatibility */ if ((ret = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) { - NT_ExplainError(ret, errbuf, sizeof(errbuf)); - SCLogError(SC_ERR_NAPATECH_INIT_FAILED ,"NT_Init failed. Code 0x%X = %s", ret, errbuf); + NT_ExplainError(ret, error_buf, sizeof (error_buf)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_Init failed. Code 0x%X = %s", ret, error_buf); exit(EXIT_FAILURE); } @@ -200,16 +173,29 @@ static int NapatechInit(int runmode) exit(EXIT_FAILURE); } - switch(runmode) { + struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf)); + if (unlikely(conf == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device."); + exit(EXIT_FAILURE); + } + + if ( (ConfGetInt("napatech.hba", &conf->hba) != 0) && (conf->hba > 0)){ + SCLogInfo("Host Buffer Allowance: %d", (int)conf->hba); + } + + /* Start a thread to process the statistics */ + NapatechStartStats(); + + switch (runmode) { case NT_RUNMODE_AUTOFP: ret = RunModeSetLiveCaptureAutoFp(NapatechConfigParser, NapatechGetThreadsCount, - "NapatechStream", "NapatechDecode", - thread_name_autofp, NULL); + "NapatechStream", "NapatechDecode", + thread_name_autofp, NULL); break; case NT_RUNMODE_WORKERS: ret = RunModeSetLiveCaptureWorkers(NapatechConfigParser, NapatechGetThreadsCount, - "NapatechStream", "NapatechDecode", - thread_name_workers, NULL); + "NapatechStream", "NapatechDecode", + thread_name_workers, NULL); break; default: ret = -1; diff --git a/src/runmode-napatech.h b/src/runmode-napatech.h index d10b406376..aba5a6256b 100644 --- a/src/runmode-napatech.h +++ b/src/runmode-napatech.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2012 Open Information Security Foundation +/* Copyright (C) 2012-2017 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -22,11 +22,15 @@ * \author Matt Keeler */ + #ifndef __RUNMODE_NAPATECH_H__ #define __RUNMODE_NAPATECH_H__ #ifdef HAVE_NAPATECH +#include "util-napatech.h" #include + + #endif int RunModeNapatechAutoFp(void); @@ -34,4 +38,8 @@ int RunModeNapatechWorkers(void); void RunModeNapatechRegister(void); const char *RunModeNapatechGetDefaultMode(void); +uint16_t GetNumConfiguredStreams(void); + + + #endif /* __RUNMODE_NAPATECH_H__ */ diff --git a/src/source-napatech.c b/src/source-napatech.c index 941d626094..348793ad27 100644 --- a/src/source-napatech.c +++ b/src/source-napatech.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2012-2014 Open Information Security Foundation +/* Copyright (C) 2012-2017 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -18,14 +18,13 @@ /** * \file * - * \author nPulse Technologies, LLC. - * \author Matt Keeler - * +- * \author nPulse Technologies, LLC. +- * \author Matt Keeler + * * * Support for NAPATECH adapter with the 3GD Driver/API. * Requires libntapi from Napatech A/S. * */ - #include "suricata-common.h" #include "suricata.h" #include "threadvars.h" @@ -33,18 +32,16 @@ #include "tm-queuehandlers.h" #include "tm-threads.h" #include "tm-modules.h" - #include "util-privs.h" #include "tmqh-packetpool.h" +#include "util-napatech.h" #include "source-napatech.h" #ifndef HAVE_NAPATECH TmEcode NoNapatechSupportExit(ThreadVars *, const void *, void **); - -void TmModuleNapatechStreamRegister (void) -{ +void TmModuleNapatechStreamRegister(void) { tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream"; tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NoNapatechSupportExit; tmm_modules[TMM_RECEIVENAPATECH].Func = NULL; @@ -54,8 +51,7 @@ void TmModuleNapatechStreamRegister (void) tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_ADMIN; } -void TmModuleNapatechDecodeRegister (void) -{ +void TmModuleNapatechDecodeRegister(void) { tmm_modules[TMM_DECODENAPATECH].name = "NapatechDecode"; tmm_modules[TMM_DECODENAPATECH].ThreadInit = NoNapatechSupportExit; tmm_modules[TMM_DECODENAPATECH].Func = NULL; @@ -66,8 +62,7 @@ void TmModuleNapatechDecodeRegister (void) tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM; } -TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data) -{ +TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data) { SCLogError(SC_ERR_NAPATECH_NOSUPPORT, "Error creating thread %s: you do not have support for Napatech adapter " "enabled please recompile with --enable-napatech", tv->name); @@ -76,31 +71,37 @@ TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data) #else /* Implied we do have NAPATECH support */ + #include +#define MAX_STREAMS 256 + extern int max_pending_packets; typedef struct NapatechThreadVars_ { ThreadVars *tv; NtNetStreamRx_t rx_stream; - uint64_t stream_id; + uint16_t stream_id; int hba; - uint64_t pkts; - uint64_t drops; - uint64_t bytes; - TmSlot *slot; } NapatechThreadVars; TmEcode NapatechStreamThreadInit(ThreadVars *, const void *, void **); void NapatechStreamThreadExitStats(ThreadVars *, void *); -TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot); +TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot); TmEcode NapatechDecodeThreadInit(ThreadVars *, const void *, void **); TmEcode NapatechDecodeThreadDeinit(ThreadVars *tv, void *data); TmEcode NapatechDecode(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +/* These are used as the threads are exiting to get a comprehensive count of + * all the packets received and dropped. + */ +SC_ATOMIC_DECLARE(uint64_t, total_packets); +SC_ATOMIC_DECLARE(uint64_t, total_drops); +SC_ATOMIC_DECLARE(uint16_t, total_tallied); + /** * \brief Register the Napatech receiver (reader) module. */ @@ -109,13 +110,17 @@ void TmModuleNapatechStreamRegister(void) tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream"; tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NapatechStreamThreadInit; tmm_modules[TMM_RECEIVENAPATECH].Func = NULL; - tmm_modules[TMM_RECEIVENAPATECH].PktAcqLoop = NapatechStreamLoop; + tmm_modules[TMM_RECEIVENAPATECH].PktAcqLoop = NapatechPacketLoopZC; tmm_modules[TMM_RECEIVENAPATECH].PktAcqBreakLoop = NULL; tmm_modules[TMM_RECEIVENAPATECH].ThreadExitPrintStats = NapatechStreamThreadExitStats; tmm_modules[TMM_RECEIVENAPATECH].ThreadDeinit = NapatechStreamThreadDeinit; tmm_modules[TMM_RECEIVENAPATECH].RegisterTests = NULL; tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_RAW; tmm_modules[TMM_RECEIVENAPATECH].flags = TM_FLAG_RECEIVE_TM; + + SC_ATOMIC_INIT(total_packets); + SC_ATOMIC_INIT(total_drops); + SC_ATOMIC_INIT(total_tallied); } /** @@ -133,6 +138,13 @@ void TmModuleNapatechDecodeRegister(void) tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM; } +/* + *----------------------------------------------------------------------------- + *----------------------------------------------------------------------------- + * Statistics code + *----------------------------------------------------------------------------- +*/ + /** * \brief Initialize the Napatech receiver thread, generate a single * NapatechThreadVar structure for each thread, this will @@ -152,13 +164,11 @@ void TmModuleNapatechDecodeRegister(void) TmEcode NapatechStreamThreadInit(ThreadVars *tv, const void *initdata, void **data) { SCEnter(); - struct NapatechStreamDevConf *conf = (struct NapatechStreamDevConf *)initdata; - uintmax_t stream_id = conf->stream_id; + struct NapatechStreamDevConf *conf = (struct NapatechStreamDevConf *) initdata; + uint16_t stream_id = conf->stream_id; *data = NULL; - SCLogInfo("Napatech Thread Stream ID:%lu", stream_id); - - NapatechThreadVars *ntv = SCMalloc(sizeof(NapatechThreadVars)); + NapatechThreadVars *ntv = SCCalloc(1, sizeof (NapatechThreadVars)); if (unlikely(ntv == NULL)) { SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH thread vars."); exit(EXIT_FAILURE); @@ -168,42 +178,62 @@ TmEcode NapatechStreamThreadInit(ThreadVars *tv, const void *initdata, void **da ntv->stream_id = stream_id; ntv->tv = tv; ntv->hba = conf->hba; + SCLogDebug("Started processing packets from NAPATECH Stream: %lu", ntv->stream_id); - SCLogInfo("Started processing packets from NAPATECH Stream: %lu", ntv->stream_id); - - *data = (void *)ntv; - + *data = (void *) ntv; SCReturnInt(TM_ECODE_OK); } -/** - * \brief Main Napatech reading Loop function - */ -TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot) +static PacketQueue packets_to_release[MAX_STREAMS]; + +static void NapatechReleasePacket(struct Packet_ *p) { - SCEnter(); + PacketFreeOrRelease(p); + PacketEnqueue(&packets_to_release[p->ntpv.stream_id], p); +} +TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot) +{ int32_t status; - char errbuf[100]; + char error_buffer[100]; uint64_t pkt_ts; NtNetBuf_t packet_buffer; - NapatechThreadVars *ntv = (NapatechThreadVars *)data; - NtNetRx_t stat_cmd; - - SCLogInfo("Opening NAPATECH Stream: %lu for processing", ntv->stream_id); + NapatechThreadVars *ntv = (NapatechThreadVars *) data; + uint64_t hba_pkt_drops = 0; + uint64_t hba_byte_drops = 0; + uint16_t hba_pkt = 0; + + /* This just keeps the startup output more orderly. */ + usleep(200000 * ntv->stream_id); + + if (ntv->hba > 0) { + char *s_hbad_pkt = SCCalloc(1, 32); + if (unlikely(s_hbad_pkt == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter."); + exit(EXIT_FAILURE); + } + snprintf(s_hbad_pkt, 32, "nt%d.hba_drop", ntv->stream_id); + hba_pkt = StatsRegisterCounter(s_hbad_pkt, tv); + StatsSetupPrivate(tv); + StatsSetUI64(tv, hba_pkt, 0); + } + SCLogDebug("Opening NAPATECH Stream: %lu for processing", ntv->stream_id); if ((status = NT_NetRxOpen(&(ntv->rx_stream), "SuricataStream", NT_NET_INTERFACE_PACKET, ntv->stream_id, ntv->hba)) != NT_SUCCESS) { - NT_ExplainError(status, errbuf, sizeof(errbuf)); - SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %lu - %s", ntv->stream_id, errbuf); + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %u - %s", ntv->stream_id, error_buffer); SCFree(ntv); SCReturnInt(TM_ECODE_FAILED); } - stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP; +#if defined(__linux__) + SCLogInfo("Napatech Packet Loop Started - cpu: %3d, stream: %3u (numa: %u)", + sched_getcpu(), ntv->stream_id, NapatechGetNumaNode(ntv->stream_id)); +#else + SCLogInfo("Napatech Packet Loop Started - stream: %lu ", ntv->stream_id); +#endif - SCLogInfo("Napatech Packet Stream Loop Started for Stream ID: %lu", ntv->stream_id); - - TmSlot *s = (TmSlot *)slot; + TmSlot *s = (TmSlot *) slot; ntv->slot = s->slot_next; while (!(suricata_ctl_flags & SURICATA_STOP)) { @@ -211,19 +241,15 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot) * us from alloc'ing packets at line rate */ PacketPoolWait(); - /* - * Napatech returns packets 1 at a time - */ + /* Napatech returns packets 1 at a time */ status = NT_NetRxGet(ntv->rx_stream, &packet_buffer, 1000); if (unlikely(status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN)) { - /* - * no frames currently available - */ continue; } else if (unlikely(status != NT_SUCCESS)) { - SCLogError(SC_ERR_NAPATECH_STREAM_NEXT_FAILED, - "Failed to read from Napatech Stream: %lu", - ntv->stream_id); + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + + SCLogInfo("Failed to read from Napatech Stream%d: %s", + ntv->stream_id, error_buffer); SCReturnInt(TM_ECODE_FAILED); } @@ -239,7 +265,7 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot) * Handle the different timestamp forms that the napatech cards could use * - NT_TIMESTAMP_TYPE_NATIVE is not supported due to having an base of 0 as opposed to NATIVE_UNIX which has a base of 1/1/1970 */ - switch(NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) { + switch (NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) { case NT_TIMESTAMP_TYPE_NATIVE_UNIX: p->ts.tv_sec = pkt_ts / 100000000; p->ts.tv_usec = ((pkt_ts % 100000000) / 100) + (pkt_ts % 100) > 50 ? 1 : 0; @@ -259,30 +285,34 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot) break; default: SCLogError(SC_ERR_NAPATECH_TIMESTAMP_TYPE_NOT_SUPPORTED, - "Packet from Napatech Stream: %lu does not have a supported timestamp format", - ntv->stream_id); + "Packet from Napatech Stream: %u does not have a supported timestamp format", + ntv->stream_id); NT_NetRxRelease(ntv->rx_stream, packet_buffer); SCReturnInt(TM_ECODE_FAILED); } - SCLogDebug("p->ts.tv_sec %"PRIuMAX"", (uintmax_t)p->ts.tv_sec); - p->datalink = LINKTYPE_ETHERNET; - - ntv->pkts++; - ntv->bytes += NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer); - - // Update drop counter - if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS)) - { - NT_ExplainError(status, errbuf, sizeof(errbuf)); - SCLogWarning(SC_ERR_NAPATECH_STAT_DROPS_FAILED, "Couldn't retrieve drop statistics from the RX stream: %lu - %s", ntv->stream_id, errbuf); - } - else - { - ntv->drops += stat_cmd.u.streamDrop.pktsDropped; + if (unlikely(ntv->hba > 0)) { + NtNetRx_t stat_cmd; + stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP; + // Update drop counter + if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS)) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogInfo("Couldn't retrieve drop statistics from the RX stream: %u - %s", + ntv->stream_id, error_buffer); + } else { + hba_pkt_drops = stat_cmd.u.streamDrop.pktsDropped; + + StatsSetUI64(tv, hba_pkt, hba_pkt_drops); + } + StatsSyncCountersIfSignalled(tv); } - if (unlikely(PacketCopyData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) { + p->ReleasePacket = NapatechReleasePacket; + p->ntpv.nt_packet_buf = packet_buffer; + p->ntpv.stream_id = ntv->stream_id; + p->datalink = LINKTYPE_ETHERNET; + + if (unlikely(PacketSetData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) { TmqhOutputPacketpool(ntv->tv, p); NT_NetRxRelease(ntv->rx_stream, packet_buffer); SCReturnInt(TM_ECODE_FAILED); @@ -294,13 +324,23 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot) SCReturnInt(TM_ECODE_FAILED); } - NT_NetRxRelease(ntv->rx_stream, packet_buffer); + /* Release any packets that were returned by the callback function */ + Packet *rel_pkt = PacketDequeue(&packets_to_release[ntv->stream_id]); + while (rel_pkt != NULL) { + NT_NetRxRelease(ntv->rx_stream, rel_pkt->ntpv.nt_packet_buf); + rel_pkt = PacketDequeue(&packets_to_release[ntv->stream_id]); + } StatsSyncCountersIfSignalled(tv); } + if (unlikely(ntv->hba > 0)) { + SCLogInfo("Host Buffer Allowance Drops - pkts: %ld, bytes: %ld", hba_pkt_drops, hba_byte_drops); + } + SCReturnInt(TM_ECODE_OK); } + /** * \brief Print some stats to the log at program exit. * @@ -309,12 +349,31 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot) */ void NapatechStreamThreadExitStats(ThreadVars *tv, void *data) { - NapatechThreadVars *ntv = (NapatechThreadVars *)data; - double percent = 0; - if (ntv->drops > 0) - percent = (((double) ntv->drops) / (ntv->pkts+ntv->drops)) * 100; + NapatechThreadVars *ntv = (NapatechThreadVars *) data; + NapatechCurrentStats stat = NapatechGetCurrentStats(ntv->stream_id); - SCLogNotice("Stream: %lu; Packets: %"PRIu64"; Drops: %"PRIu64" (%5.2f%%); Bytes: %"PRIu64, ntv->stream_id, ntv->pkts, ntv->drops, percent, ntv->bytes); + double percent = 0; + if (stat.current_drops > 0) + percent = (((double) stat.current_drops) + / (stat.current_packets + stat.current_drops)) * 100; + + SCLogInfo("nt%lu - pkts: %lu; drop: %lu (%5.2f%%); bytes: %lu", + (uint64_t) ntv->stream_id, stat.current_packets, + stat.current_drops, percent, stat.current_bytes); + + SC_ATOMIC_ADD(total_packets, stat.current_packets); + SC_ATOMIC_ADD(total_drops, stat.current_drops); + SC_ATOMIC_ADD(total_tallied, 1); + + if (SC_ATOMIC_GET(total_tallied) == GetNumConfiguredStreams()) { + if (SC_ATOMIC_GET(total_drops) > 0) + percent = (((double) SC_ATOMIC_GET(total_drops)) / (SC_ATOMIC_GET(total_packets) + + SC_ATOMIC_GET(total_drops))) * 100; + + SCLogInfo(" "); + SCLogInfo("--- Total Packets: %ld Total Dropped: %ld (%5.2f%%)", + SC_ATOMIC_GET(total_packets), SC_ATOMIC_GET(total_drops), percent); + } } /** @@ -325,13 +384,12 @@ void NapatechStreamThreadExitStats(ThreadVars *tv, void *data) TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data) { SCEnter(); - NapatechThreadVars *ntv = (NapatechThreadVars *)data; + NapatechThreadVars *ntv = (NapatechThreadVars *) data; SCLogDebug("Closing Napatech Stream: %d", ntv->stream_id); NT_NetRxClose(ntv->rx_stream); SCReturnInt(TM_ECODE_OK); } - /** Decode Napatech */ /** @@ -350,14 +408,14 @@ TmEcode NapatechDecode(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, { SCEnter(); - DecodeThreadVars *dtv = (DecodeThreadVars *)data; + DecodeThreadVars *dtv = (DecodeThreadVars *) data; /* XXX HACK: flow timeout can call us for injected pseudo packets * see bug: https://redmine.openinfosecfoundation.org/issues/1107 */ if (p->flags & PKT_PSEUDO_STREAM_END) return TM_ECODE_OK; - /* update counters */ + // update counters DecodeUpdatePacketCounters(tv, dtv, p); switch (p->datalink) { @@ -372,7 +430,6 @@ TmEcode NapatechDecode(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, } PacketDecodeFinalize(tv, dtv, p); - SCReturnInt(TM_ECODE_OK); } @@ -380,16 +437,12 @@ TmEcode NapatechDecodeThreadInit(ThreadVars *tv, const void *initdata, void **da { SCEnter(); DecodeThreadVars *dtv = NULL; - dtv = DecodeThreadVarsAlloc(tv); - - if(dtv == NULL) + if (dtv == NULL) SCReturnInt(TM_ECODE_FAILED); DecodeRegisterPerfCounters(dtv, tv); - - *data = (void *)dtv; - + *data = (void *) dtv; SCReturnInt(TM_ECODE_OK); } @@ -397,7 +450,6 @@ TmEcode NapatechDecodeThreadDeinit(ThreadVars *tv, void *data) { if (data != NULL) DecodeThreadVarsFree(tv, data); - SCReturnInt(TM_ECODE_OK); -} + SCReturnInt(TM_ECODE_OK); } #endif /* HAVE_NAPATECH */ diff --git a/src/source-napatech.h b/src/source-napatech.h index eee79dc76d..be40d781e0 100644 --- a/src/source-napatech.h +++ b/src/source-napatech.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2012 Open Information Security Foundation +/* Copyright (C) 2012-2017 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -18,10 +18,9 @@ /** * \file * - * \author nPulse Technologies, LLC + * \author nPulse Technologies, LLC. * \author Matt Keeler */ - #ifndef __SOURCE_NAPATECH_H__ #define __SOURCE_NAPATECH_H__ @@ -29,16 +28,14 @@ void TmModuleNapatechStreamRegister (void); TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data); void TmModuleNapatechDecodeRegister (void); +#ifdef HAVE_NAPATECH +#include + struct NapatechStreamDevConf { - int stream_id; + uint16_t stream_id; intmax_t hba; }; -#ifdef HAVE_NAPATECH - -#include - -#endif - +#endif /* HAVE_NAPATECH */ #endif /* __SOURCE_NAPATECH_H__ */ diff --git a/src/util-napatech.c b/src/util-napatech.c new file mode 100644 index 0000000000..ba2aac0f5b --- /dev/null +++ b/src/util-napatech.c @@ -0,0 +1,736 @@ +/* Copyright (C) 2017 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Napatech Inc. + * \author Phil Young + * + * + */ + + +#include "suricata-common.h" +#ifdef HAVE_NAPATECH +#include "suricata.h" +#include "threadvars.h" +#include "tm-threads.h" + + +uint16_t NapatechGetNumaNode(uint16_t stream_id) +{ + int status; + char buffer[80]; // Error buffer + NtInfoStream_t info_stream; + NtInfo_t info; + uint16_t numa_node; + + if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) { + NT_ExplainError(status, buffer, sizeof (buffer) - 1); + SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", buffer); + exit(EXIT_FAILURE); + } + + // Read the info on this specific stream + info.cmd = NT_INFO_CMD_READ_STREAMID; + info.u.streamID.streamId = stream_id; + if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) { + NT_ExplainError(status, buffer, sizeof (buffer) - 1); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", buffer); + exit(EXIT_FAILURE); + } + + numa_node = info.u.streamID.data.numaNode; + + if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) { + NT_ExplainError(status, buffer, sizeof (buffer) - 1); + SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", buffer); + exit(EXIT_FAILURE); + } + return numa_node; +} + + +/*----------------------------------------------------------------------------- + *----------------------------------------------------------------------------- + * Statistics code + *----------------------------------------------------------------------------- +*/ + +typedef struct StreamCounters_ { + uint16_t pkts; + uint16_t byte; + uint16_t drop; +} StreamCounters; + + +NapatechCurrentStats current_stats[MAX_STREAMS]; + +NapatechCurrentStats NapatechGetCurrentStats(uint16_t id) +{ + + return current_stats[id]; +} + + +static uint16_t TestStreamConfig( + NtInfoStream_t hInfo, + NtStatStream_t hStatStream, + NapatechStreamConfig stream_config[], + uint16_t num_inst) +{ + + uint16_t num_active = 0; + + for (uint16_t inst = 0; inst < num_inst; ++inst) { + int status; + char buffer[80]; // Error buffer + NtStatistics_t stat; // Stat handle. + + /* Check to see if it is an active stream */ + memset(&stat, 0, sizeof (NtStatistics_t)); + + /* Read usage data for the chosen stream ID */ + stat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0; + stat.u.usageData_v0.streamid = (uint8_t) stream_config[inst].stream_id; + + if ((status = NT_StatRead(hStatStream, &stat)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, buffer, sizeof (buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead():2 failed: %s\n", buffer); + return 0; + } + + if (stat.u.usageData_v0.data.numHostBufferUsed > 0) { + stream_config[inst].is_active = true; + num_active++; + } else { + stream_config[inst].is_active = false; + } + } + + return num_active; +} + +static uint32_t UpdateStreamStats(ThreadVars *tv, + NtInfoStream_t hInfo, + NtStatStream_t hStatStream, + uint16_t num_streams, + NapatechStreamConfig stream_config[], + StreamCounters streamCounters[] + ) +{ + static uint64_t rxPktsStart[MAX_STREAMS] = {0}; + static uint64_t rxByteStart[MAX_STREAMS] = {0}; + static uint64_t dropStart[MAX_STREAMS] = {0}; + + int status; + char error_buffer[80]; // Error buffer + NtInfo_t hStreamInfo; + NtStatistics_t hStat; // Stat handle. + + /* Query the system to get the number of streams currently instantiated */ + hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM; + if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer); + exit(EXIT_FAILURE); + } + + uint16_t num_active; + if ((num_active = TestStreamConfig(hInfo, hStatStream, stream_config, num_streams)) == 0) { + /* None of the configured streams are active */ + return 0; + } + + /* At least one stream is active so proceed with the stats. */ + uint16_t inst_id = 0; + uint32_t stream_cnt = 0; + for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) { + + + while(inst_id < num_streams) { + if (stream_config[inst_id].is_active) { + break; + } else { + ++inst_id; + } + } + if (inst_id == num_streams) + break; + + /* Read usage data for the chosen stream ID */ + memset(&hStat, 0, sizeof (NtStatistics_t)); + hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0; + hStat.u.usageData_v0.streamid = (uint8_t) stream_config[inst_id].stream_id; + + if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer); + return 0; + } + + uint16_t stream_id = stream_config[inst_id].stream_id; + if (stream_config[inst_id].is_active) { + uint64_t rxPktsTotal = 0; + uint64_t rxByteTotal = 0; + uint64_t dropTotal = 0; + + for (uint32_t hbCount = 0; hbCount < hStat.u.usageData_v0.data.numHostBufferUsed; hbCount++) { + if (unlikely(stream_config[inst_id].initialized == false)) { + rxPktsStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames; + rxByteStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes; + dropStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames; + stream_config[inst_id].initialized = true; + } else { + rxPktsTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames; + rxByteTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes; + dropTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames; + } + } + + current_stats[stream_id].current_packets = rxPktsTotal - rxPktsStart[stream_id]; + current_stats[stream_id].current_bytes = rxByteTotal - rxByteStart[stream_id]; + current_stats[stream_id].current_drops = dropTotal - dropStart[stream_id]; + } + + StatsSetUI64(tv, streamCounters[inst_id].pkts, current_stats[stream_id].current_packets); + StatsSetUI64(tv, streamCounters[inst_id].byte, current_stats[stream_id].current_bytes); + StatsSetUI64(tv, streamCounters[inst_id].drop, current_stats[stream_id].current_drops); + + ++inst_id; + } + return num_active; +} + +static void *NapatechStatsLoop(void *arg) +{ + ThreadVars *tv = (ThreadVars *) arg; + + int status; + char error_buffer[80]; // Error buffer + NtInfoStream_t hInfo; + NtStatStream_t hStatStream; + + NapatechStreamConfig stream_config[MAX_STREAMS]; + uint16_t stream_cnt = NapatechGetStreamConfig(stream_config); + + /* Open the info and Statistics */ + if ((status = NT_InfoOpen(&hInfo, "StatsLoopInfoStream")) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_RUNMODE, "NT_InfoOpen() failed: %s\n", error_buffer); + return NULL; + } + + if ((status = NT_StatOpen(&hStatStream, "StatsLoopStatsStream")) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer); + return NULL; + } + + StreamCounters streamCounters[MAX_STREAMS]; + for (int i = 0; i < stream_cnt; ++i) { + char *pkts_buf = SCCalloc(1, 32); + if (unlikely(pkts_buf == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter."); + exit(EXIT_FAILURE); + } + + snprintf(pkts_buf, 32, "nt%d.pkts", stream_config[i].stream_id); + streamCounters[i].pkts = StatsRegisterCounter(pkts_buf, tv); + + char *byte_buf = SCCalloc(1, 32); + if (unlikely(byte_buf == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter."); + exit(EXIT_FAILURE); + } + snprintf(byte_buf, 32, "nt%d.bytes", stream_config[i].stream_id); + streamCounters[i].byte = StatsRegisterCounter(byte_buf, tv); + + char *drop_buf = SCCalloc(1, 32); + if (unlikely(drop_buf == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter."); + exit(EXIT_FAILURE); + } + snprintf(drop_buf, 32, "nt%d.drop", stream_config[i].stream_id); + streamCounters[i].drop = StatsRegisterCounter(drop_buf, tv); + } + + StatsSetupPrivate(tv); + + for (int i = 0; i < stream_cnt; ++i) { + StatsSetUI64(tv, streamCounters[i].pkts, 0); + StatsSetUI64(tv, streamCounters[i].byte, 0); + StatsSetUI64(tv, streamCounters[i].drop, 0); + } + + uint32_t num_active = UpdateStreamStats(tv, hInfo, hStatStream, + stream_cnt, stream_config, streamCounters); + + if (num_active < stream_cnt) { + SCLogInfo("num_active: %d, stream_cnt: %d", num_active, stream_cnt); + SCLogWarning(SC_ERR_NAPATECH_CONFIG_STREAM, + "Some or all of the configured streams are not created. Proceeding with active streams."); + } + + TmThreadsSetFlag(tv, THV_INIT_DONE); + while (1) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { + SCLogDebug("NapatechStatsLoop THV_KILL detected"); + break; + } + + UpdateStreamStats(tv, hInfo, hStatStream, + stream_cnt, stream_config, streamCounters); + + StatsSyncCountersIfSignalled(tv); + usleep(1000000); + } + + /* CLEAN UP NT Resources and Close the info stream */ + if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_RUNMODE, "NT_InfoClose() failed: %s\n", error_buffer); + return NULL; + } + + /* Close the statistics stream */ + if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer); + return NULL; + } + + + SCLogDebug("Exiting NapatechStatsLoop"); + TmThreadsSetFlag(tv, THV_RUNNING_DONE); + TmThreadWaitForFlag(tv, THV_DEINIT); + TmThreadsSetFlag(tv, THV_CLOSED); + + return NULL; +} + +#define MAX_HOSTBUFFER 4 +#define MAX_STREAMS 256 +#define HB_HIGHWATER 2048 //1982 + +static bool RegisteredStream(uint16_t stream_id, uint16_t num_registered, + NapatechStreamConfig registered_streams[]) +{ + for (uint16_t reg_id = 0; reg_id < num_registered; ++reg_id) { + if (stream_id == registered_streams[reg_id].stream_id) { + return true; + } + } + return false; +} + +uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[]) +{ + int status; + char error_buffer[80]; // Error buffer + NtStatStream_t hStatStream; + NtStatistics_t hStat; // Stat handle. + NtInfoStream_t info_stream; + NtInfo_t info; + uint16_t instance_cnt = 0; + int use_all_streams; + ConfNode *ntstreams; + + for (uint16_t i = 0; i < MAX_STREAMS; ++i) { + stream_config[i].stream_id = 0; + stream_config[i].is_active = false; + stream_config[i].initialized = false; + } + + if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) { + SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf"); + exit(EXIT_FAILURE); + } + + if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buffer); + exit(EXIT_FAILURE); + } + + if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer); + exit(EXIT_FAILURE); + } + + if (use_all_streams) { + info.cmd = NT_INFO_CMD_READ_STREAM; + if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buffer); + exit(EXIT_FAILURE); + } + + uint16_t stream_id = 0; + while (instance_cnt < info.u.stream.data.count) { + + /* + * For each stream ID query the number of host-buffers used by + * the stream. If zero, then that streamID is not used; skip + * over it and continue until we get a streamID with a non-zero + * count of the host-buffers. + */ + memset(&hStat, 0, sizeof (NtStatistics_t)); + + /* Read usage data for the chosen stream ID */ + hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0; + hStat.u.usageData_v0.streamid = (uint8_t) stream_id; + + if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer); + return 0; + } + + if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) { + ++stream_id; + continue; + } + + /* if we get here it is an active stream */ + stream_config[instance_cnt].stream_id = stream_id++; + stream_config[instance_cnt].is_active = true; + instance_cnt++; + } + + } else { + /* When not using the default streams we need to parse the array of streams from the conf */ + if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) { + SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf"); + exit(EXIT_FAILURE); + } + + /* Loop through all stream numbers in the array and register the devices */ + ConfNode *stream; + instance_cnt = 0; + + TAILQ_FOREACH(stream, &ntstreams->head, next) { + uint16_t stream_id = 0; + + if (stream == NULL) { + SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration"); + exit(EXIT_FAILURE); + } + + uint16_t start, end; + if (strchr(stream->val, '-')) { + char copystr[16]; + strlcpy(copystr, stream->val, 16); + + start = atoi(copystr); + end = atoi(strchr(copystr, '-')+1); + } else { + stream_config[instance_cnt].stream_id = atoi(stream->val); + start = stream_config[instance_cnt].stream_id; + end = stream_config[instance_cnt].stream_id; + } + SCLogInfo("%s start: %d end: %d", stream->val, start, end); + + for (stream_id = start; stream_id <= end; ++stream_id) { + /* if we get here it is configured in the .yaml file */ + stream_config[instance_cnt].stream_id = stream_id; + + /* Check to see if it is an active stream */ + memset(&hStat, 0, sizeof (NtStatistics_t)); + + /* Read usage data for the chosen stream ID */ + hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0; + hStat.u.usageData_v0.streamid = (uint8_t) stream_config[instance_cnt].stream_id; + + if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer); + return 0; + } + + if (hStat.u.usageData_v0.data.numHostBufferUsed > 0) { + stream_config[instance_cnt].is_active = true; + } + instance_cnt++; + } + } + } + + /* Close the statistics stream */ + if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer); + exit(EXIT_FAILURE); + } + + if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, + "NT_InfoClose failed: %s", error_buffer); + exit(EXIT_FAILURE); + } + + return instance_cnt; +} + + +static void *NapatechBufMonitorLoop(void *arg) +{ + ThreadVars *tv = (ThreadVars *) arg; + + NtInfo_t hStreamInfo; + NtStatistics_t hStat; // Stat handle. + NtInfoStream_t hInfo; + NtStatStream_t hStatStream; + + char error_buffer[NT_ERRBUF_SIZE]; // Error buffer + int status; // Status variable + + const uint32_t alertInterval = 25; + + uint32_t OB_fill_level[MAX_STREAMS] = {0}; + uint32_t OB_alert_level[MAX_STREAMS] = {0}; + uint32_t ave_OB_fill_level[MAX_STREAMS] = {0}; + + uint32_t HB_fill_level[MAX_STREAMS] = {0}; + uint32_t HB_alert_level[MAX_STREAMS] = {0}; + uint32_t ave_HB_fill_level[MAX_STREAMS] = {0}; + + /* Open the info and Statistics */ + if ((status = NT_InfoOpen(&hInfo, "InfoStream")) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoOpen() failed: %s\n", error_buffer); + exit(1); + } + + if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatOpen() failed: %s\n", error_buffer); + exit(1); + } + + /* Read the info on all streams instantiated in the system */ + hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM; + if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer); + exit(EXIT_FAILURE); + } + + NapatechStreamConfig registered_streams[MAX_STREAMS]; + uint16_t num_registered = NapatechGetStreamConfig(registered_streams); + + TmThreadsSetFlag(tv, THV_INIT_DONE); + while (1) { + if (TmThreadsCheckFlag(tv, THV_KILL)) { + SCLogDebug("NapatechBufMonitorLoop THV_KILL detected"); + break; + } + + usleep(200000); + + /* Read the info on all streams instantiated in the system */ + hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM; + if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer); + exit(EXIT_FAILURE); + } + + char pktCntStr[4096]; + memset(pktCntStr, 0, sizeof (pktCntStr)); + + uint32_t stream_id = 0; + uint32_t stream_cnt = 0; + uint32_t num_streams = hStreamInfo.u.stream.data.count; + + for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) { + + do { + + /* Read usage data for the chosen stream ID */ + hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0; + hStat.u.usageData_v0.streamid = (uint8_t) stream_id; + + if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer); + exit(1); + } + + if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) { + ++stream_id; + continue; + } + } while (hStat.u.usageData_v0.data.numHostBufferUsed == 0); + + if (RegisteredStream(stream_id, num_registered, registered_streams)) { + ave_OB_fill_level[stream_id] = 0; + ave_HB_fill_level[stream_id] = 0; + + for (uint32_t hb_count = 0; hb_count < hStat.u.usageData_v0.data.numHostBufferUsed; hb_count++) { + + OB_fill_level[hb_count] = + ((100 * hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.used) / + hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.size); + + if (OB_fill_level[hb_count] > 100) { + OB_fill_level[hb_count] = 100; + } + + uint32_t bufSize = hStat.u.usageData_v0.data.hb[hb_count].enQueuedAdapter / 1024 + + hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024 + + hStat.u.usageData_v0.data.hb[hb_count].enQueued / 1024 + - HB_HIGHWATER; + + HB_fill_level[hb_count] = (uint32_t) + ((100 * hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024) / + bufSize); + + ave_OB_fill_level[stream_id] += OB_fill_level[hb_count]; + ave_HB_fill_level[stream_id] += HB_fill_level[hb_count]; + } + + ave_OB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed; + ave_HB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed; + + /* Host Buffer Fill Level warnings... */ + if (ave_HB_fill_level[stream_id] >= (HB_alert_level[stream_id] + alertInterval)) { + + while (ave_HB_fill_level[stream_id] >= HB_alert_level[stream_id] + alertInterval) { + HB_alert_level[stream_id] += alertInterval; + } + SCLogInfo("nt%d - Increasing Host Buffer Fill Level : %4d%%", + stream_id, ave_HB_fill_level[stream_id]); + } + + if (HB_alert_level[stream_id] > 0) { + if ((ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval))) { + SCLogInfo("nt%d - Decreasing Host Buffer Fill Level: %4d%%", + stream_id, ave_HB_fill_level[stream_id]); + + while (ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval)) { + if ((HB_alert_level[stream_id]) > 0) { + HB_alert_level[stream_id] -= alertInterval; + } else break; + } + } + } + + /* On Board SDRAM Fill Level warnings... */ + if (ave_OB_fill_level[stream_id] >= (OB_alert_level[stream_id] + alertInterval)) { + while (ave_OB_fill_level[stream_id] >= OB_alert_level[stream_id] + alertInterval) { + OB_alert_level[stream_id] += alertInterval; + + } + SCLogInfo("nt%d - Increasing Adapter SDRAM Fill Level: %4d%%", + stream_id, ave_OB_fill_level[stream_id]); + } + + if (OB_alert_level[stream_id] > 0) { + if ((ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval))) { + SCLogInfo("nt%d - Decreasing Adapter SDRAM Fill Level : %4d%%", + stream_id, ave_OB_fill_level[stream_id]); + + while (ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval)) { + if ((OB_alert_level[stream_id]) > 0) { + OB_alert_level[stream_id] -= alertInterval; + } else break; + } + } + } + } + ++stream_id; + } + } + + if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) { + NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoClose() failed: %s\n", error_buffer); + exit(1); + } + + /* Close the statistics stream */ + if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) { + /* Get the status code as text */ + NT_ExplainError(status, error_buffer, sizeof (error_buffer)); + SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatClose() failed: %s\n", error_buffer); + exit(1); + } + + SCLogDebug("Exiting NapatechStatsLoop"); + TmThreadsSetFlag(tv, THV_RUNNING_DONE); + TmThreadWaitForFlag(tv, THV_DEINIT); + TmThreadsSetFlag(tv, THV_CLOSED); + + return NULL; +} + +void NapatechStartStats(void) +{ + /* Creates the Statistic threads */ + ThreadVars *stats_tv = TmThreadCreate("NapatechStats", + NULL, NULL, + NULL, NULL, + "custom", NapatechStatsLoop, 0); + + if (stats_tv == NULL) { + SCLogError(SC_ERR_THREAD_CREATE, + "Error creating a thread for NapatechStats - Killing engine."); + exit(EXIT_FAILURE); + } + + if (TmThreadSpawn(stats_tv) != 0) { + SCLogError(SC_ERR_THREAD_SPAWN, + "Failed to spawn thread for NapatechStats - Killing engine."); + exit(EXIT_FAILURE); + } + + ThreadVars *buf_monitor_tv = TmThreadCreate("NapatechBufMonitor", + NULL, NULL, + NULL, NULL, + "custom", NapatechBufMonitorLoop, 0); + + if (buf_monitor_tv == NULL) { + SCLogError(SC_ERR_THREAD_CREATE, + "Error creating a thread for NapatechBufMonitor - Killing engine."); + exit(EXIT_FAILURE); + } + + if (TmThreadSpawn(buf_monitor_tv) != 0) { + SCLogError(SC_ERR_THREAD_SPAWN, + "Failed to spawn thread for NapatechBufMonitor - Killing engine."); + exit(EXIT_FAILURE); + } + + + return; +} + +#endif // HAVE_NAPATECH diff --git a/src/util-napatech.h b/src/util-napatech.h new file mode 100644 index 0000000000..5e1d0a4da4 --- /dev/null +++ b/src/util-napatech.h @@ -0,0 +1,60 @@ +/* Copyright (C) 2017 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Phil Young + * + */ + +#ifndef __UTIL_NAPATECH_H__ +#define __UTIL_NAPATECH_H__ + +#ifdef HAVE_NAPATECH +#include + +typedef struct NapatechPacketVars_ +{ + uint64_t stream_id; + NtNetBuf_t nt_packet_buf; + ThreadVars *tv; +} NapatechPacketVars; + + +typedef struct NapatechStreamConfig_ +{ + uint16_t stream_id; + bool is_active; + bool initialized; +} NapatechStreamConfig; + +typedef struct NapatechCurrentStats_ { + uint64_t current_packets; + uint64_t current_drops; + uint64_t current_bytes; +} NapatechCurrentStats; + +#define MAX_STREAMS 256 + +extern void NapatechStartStats(void); +uint16_t NapatechGetNumaNode(uint16_t stream_id); +NapatechCurrentStats NapatechGetCurrentStats(uint16_t id); +uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[]); + +#endif //HAVE_NAPATECH +#endif /* __UTIL_NAPATECH_H__ */ -- 2.47.2