]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
napatech: Implementation of packet counters 2862/head
authorPhil Young <py@napatech.com>
Mon, 17 Jul 2017 14:59:00 +0000 (10:59 -0400)
committerVictor Julien <victor@inliniac.net>
Tue, 1 Aug 2017 08:13:54 +0000 (10:13 +0200)
added util-napatech module which contains implementation threads
for processing statistics.  And modified source-napatech and
runmode-napatech to instantiate the threads.

napatech: Implementation of packet counters

napatech: implementation of statistics counters

napatech: Implementation of packet counters.

napatech: added util-napatech module

napatech: added utils-napatech module.

added include declaration and napatech specific structure when HAVE_NAPATECH
is defined.

Added util-napatech module to project.

src/Makefile.am
src/decode.h
src/runmode-napatech.c
src/runmode-napatech.h
src/source-napatech.c
src/source-napatech.h
src/util-napatech.c [new file with mode: 0644]
src/util-napatech.h [new file with mode: 0644]

index 09fcafae2d67a75ff895cc7d6fc80baf2172c1e9..52f02a1e0e1f02b0597383dc31066dc66ccbb3bb 100644 (file)
@@ -427,6 +427,7 @@ util-mpm-ac-tile.c util-mpm-ac-tile.h \
 util-mpm-ac-tile-small.c \
 util-mpm-hs.c util-mpm-hs.h \
 util-mpm.c util-mpm.h \
+util-napatech.c util-napatech.h \
 util-optimize.h \
 util-pages.c util-pages.h \
 util-path.c util-path.h \
index 64a416ce3dd60d57a0cc993aaf0f03188548b157..4c0eeee7d5abb8d6480224d67aa891001be7bdf0 100644 (file)
 #include "util-cuda-vars.h"
 #endif /* __SC_CUDA_SUPPORT__ */
 
+#ifdef HAVE_NAPATECH
+#include "util-napatech.h"
+#endif /* HAVE_NAPATECH */
+
+
 typedef enum {
     CHECKSUM_VALIDATION_DISABLE,
     CHECKSUM_VALIDATION_ENABLE,
@@ -586,6 +591,9 @@ typedef struct Packet_
 #ifdef __SC_CUDA_SUPPORT__
     CudaPacketVars cuda_pkt_vars;
 #endif
+#ifdef HAVE_NAPATECH
+    NapatechPacketVars ntpv;
+#endif
 }
 #ifdef HAVE_MPIPE
     /* mPIPE requires packet buffers to be aligned to 128 byte boundaries. */
index 6d883f740201d3114bbb74e2607fc6b3b6e7a69d..d03fed623c811968381cca885185dc036eb1171e 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
  *  \author nPulse Technologies, LLC.
  *  \author Matt Keeler <mk@npulsetech.com>
  */
+
 #include "suricata-common.h"
 #include "tm-threads.h"
 #include "conf.h"
 #include "runmodes.h"
 #include "output.h"
-
 #include "util-debug.h"
 #include "util-time.h"
 #include "util-cpu.h"
 #include "util-affinity.h"
 #include "util-runmodes.h"
 #include "util-device.h"
-
+#include "util-napatech.h"
 #include "runmode-napatech.h"
-
-// need NapatechStreamDevConf structure
-#include "source-napatech.h"
+#include "source-napatech.h" // need NapatechStreamDevConf structure
 
 #define NT_RUNMODE_AUTOFP  1
 #define NT_RUNMODE_WORKERS 2
 
 static const char *default_mode = NULL;
 #ifdef HAVE_NAPATECH
-static int num_configured_streams = 0;
+
+#define MAX_STREAMS 256
+static uint16_t num_configured_streams = 0;
+
+uint16_t GetNumConfiguredStreams(void) {
+    return num_configured_streams;
+}
+
 #endif
 
 const char *RunModeNapatechGetDefaultMode(void)
@@ -69,128 +74,96 @@ void RunModeNapatechRegister(void)
 #endif
 }
 
+
 #ifdef HAVE_NAPATECH
-int NapatechRegisterDeviceStreams()
+
+
+static int NapatechRegisterDeviceStreams(void)
 {
-    NtInfoStream_t info_stream;
-    NtInfo_t info;
-    char error_buf[100];
-    int status;
-    int i;
-    char live_dev_buf[9];
-    int use_all_streams;
-    ConfNode *ntstreams;
-    ConfNode *stream_id;
 
-    if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0)
-    {
+    /* Display the configuration mode */
+    int use_all_streams;
+    if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
         SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf");
         exit(EXIT_FAILURE);
     }
 
-    if (use_all_streams)
-    {
+    if (use_all_streams) {
         SCLogInfo("Using All Napatech Streams");
-        // When using the default streams we need to query the service for a list of all configured
-        if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS)
-        {
-            NT_ExplainError(status, error_buf, sizeof(error_buf) -1);
-            SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buf);
-            return -1;
-        }
+    } else {
+        SCLogInfo("Using Selected Napatech Streams");
+    }
 
-        info.cmd = NT_INFO_CMD_READ_STREAM;
-        if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS)
-        {
-            NT_ExplainError(status, error_buf, sizeof(error_buf) -1);
-            SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buf);
-            return -1;
-        }
+    /* Get the stream ID's either from the conf or by querying Napatech */
+    NapatechStreamConfig stream_config[MAX_STREAMS];
+    uint16_t stream_cnt = NapatechGetStreamConfig(stream_config);
+    num_configured_streams = stream_cnt;
+    SCLogDebug("Configuring %d Napatech Streams...", stream_cnt);
 
-        num_configured_streams = info.u.stream.data.count;
-        for (i = 0; i < num_configured_streams; i++)
-        {
-            // The Stream IDs do not have to be sequential
-            snprintf(live_dev_buf, sizeof(live_dev_buf), "nt%d", info.u.stream.data.streamIDList[i]);
-            LiveRegisterDevice(live_dev_buf);
-        }
-
-        if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS)
-        {
-            NT_ExplainError(status, error_buf, sizeof(error_buf) -1);
-            SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", error_buf);
-            return -1;
-        }
-    }
-    else
-    {
-        SCLogInfo("Using Selected Napatech Streams");
-        // When not using the default streams we need to parse the array of streams from the conf
-        if ((ntstreams = ConfGetNode("napatech.streams")) == NULL)
-        {
-            SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf");
+    for (uint16_t inst = 0; inst < stream_cnt; ++inst) {
+        char *plive_dev_buf = SCCalloc(1, 9);
+        if (unlikely(plive_dev_buf == NULL)) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
             exit(EXIT_FAILURE);
         }
-
-        // Loop through all stream numbers in the array and register the devices
-        TAILQ_FOREACH(stream_id, &ntstreams->head, next)
-        {
-            if (stream_id == NULL)
-            {
-                SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration");
-                exit(EXIT_FAILURE);
-            }
-            num_configured_streams++;
-
-            snprintf(live_dev_buf, sizeof(live_dev_buf), "nt%d", atoi(stream_id->val));
-            LiveRegisterDevice(live_dev_buf);
-        }
+        snprintf(plive_dev_buf, 9, "nt%d", stream_config[inst].stream_id);
+        SCLogInfo("registering Napatech device: %s - active stream%sfound.",
+                        plive_dev_buf, stream_config[inst].is_active ? " " : " NOT ");
+        LiveRegisterDevice(plive_dev_buf);
     }
+
+    /* Napatech stats come from a separate thread.  This will surpress
+     * the counters when suricata exits.
+     */
+    LiveDeviceHasNoStats();
     return 0;
 }
 
-void *NapatechConfigParser(const char *device)
+static void *NapatechConfigParser(const char *device)
 {
-    // Expect device to be of the form nt%d where %d is the stream id to use
+    /* Expect device to be of the form nt%d where %d is the stream id to use */
     int dev_len = strlen(device);
-    struct NapatechStreamDevConf *conf = SCMalloc(sizeof(struct NapatechStreamDevConf));
-    if (unlikely(conf == NULL))
+    struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf));
+    if (unlikely(conf == NULL)) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device name.");
         return NULL;
-    if (dev_len < 3 || dev_len > 5)
-    {
+    }
+    if (dev_len < 3 || dev_len > 5) {
         SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG, "Could not parse config for device: %s - invalid length", device);
         return NULL;
     }
 
-    // device+5 is a pointer to the beginning of the stream id after the constant nt portion
-    conf->stream_id = atoi(device+2);
+    /* device+5 is a pointer to the beginning of the stream id after the constant nt portion */
+    conf->stream_id = atoi(device + 2);
 
-    // Set the host buffer allowance for this stream
-    // Right now we just look at the global default - there is no per-stream hba configuration
-    if (ConfGetInt("napatech.hba", &conf->hba) == 0)
+    /* Set the host buffer allowance for this stream
+     * Right now we just look at the global default - there is no per-stream hba configuration
+     */
+    if (ConfGetInt("napatech.hba", &conf->hba) == 0) {
         conf->hba = -1;
-
+    }
     return (void *) conf;
 }
 
-int NapatechGetThreadsCount(void *conf __attribute__((unused))) {
-    // No matter which live device it is there is no reason to ever use more than 1 thread
-    //   2 or more thread would cause packet duplication
+static int NapatechGetThreadsCount(void *conf __attribute__((unused)))
+{
+    /* No matter which live device it is there is no reason to ever use more than 1 thread
+       2 or more thread would cause packet duplication */
     return 1;
 }
 
 static int NapatechInit(int runmode)
 {
     int ret;
-    char errbuf[100];
+    char error_buf[100];
 
     RunModeInitialize();
     TimeModeSetLive();
 
     /* Initialize the API and check version compatibility */
     if ((ret = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) {
-        NT_ExplainError(ret, errbuf, sizeof(errbuf));
-        SCLogError(SC_ERR_NAPATECH_INIT_FAILED ,"NT_Init failed. Code 0x%X = %s", ret, errbuf);
+        NT_ExplainError(ret, error_buf, sizeof (error_buf));
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_Init failed. Code 0x%X = %s", ret, error_buf);
         exit(EXIT_FAILURE);
     }
 
@@ -200,16 +173,29 @@ static int NapatechInit(int runmode)
         exit(EXIT_FAILURE);
     }
 
-    switch(runmode) {
+    struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf));
+    if (unlikely(conf == NULL)) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device.");
+        exit(EXIT_FAILURE);
+    }
+
+    if ( (ConfGetInt("napatech.hba", &conf->hba) != 0) && (conf->hba > 0)){
+        SCLogInfo("Host Buffer Allowance: %d", (int)conf->hba);
+    }
+
+    /* Start a thread to process the statistics */
+    NapatechStartStats();
+
+    switch (runmode) {
         case NT_RUNMODE_AUTOFP:
             ret = RunModeSetLiveCaptureAutoFp(NapatechConfigParser, NapatechGetThreadsCount,
-                                              "NapatechStream", "NapatechDecode",
-                                              thread_name_autofp, NULL);
+                    "NapatechStream", "NapatechDecode",
+                    thread_name_autofp, NULL);
             break;
         case NT_RUNMODE_WORKERS:
             ret = RunModeSetLiveCaptureWorkers(NapatechConfigParser, NapatechGetThreadsCount,
-                                               "NapatechStream", "NapatechDecode",
-                                               thread_name_workers, NULL);
+                    "NapatechStream", "NapatechDecode",
+                    thread_name_workers, NULL);
             break;
         default:
             ret = -1;
index d10b406376d01fe44ecf5eb514a46716ba2582e5..aba5a6256bd63e631cc474aed13887f87acc7329 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
  *  \author Matt Keeler <mk@npulsetech.com>
  */
 
+
 #ifndef __RUNMODE_NAPATECH_H__
 #define __RUNMODE_NAPATECH_H__
 
 #ifdef HAVE_NAPATECH
+#include "util-napatech.h"
 #include <nt.h>
+
+
 #endif
 
 int RunModeNapatechAutoFp(void);
@@ -34,4 +38,8 @@ int RunModeNapatechWorkers(void);
 void RunModeNapatechRegister(void);
 const char *RunModeNapatechGetDefaultMode(void);
 
+uint16_t GetNumConfiguredStreams(void);
+
+
+
 #endif /* __RUNMODE_NAPATECH_H__ */
index 941d626094a59f8d2adac258f1cc556e2167499a..348793ad27c4ccfb357404493ed90943c873f8ca 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2012-2014 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 /**
  * \file
  *
- * \author nPulse Technologies, LLC.
- * \author Matt Keeler <mk@npulsetech.com>
- *
+- * \author nPulse Technologies, LLC.
+- * \author Matt Keeler <mk@npulsetech.com>
+ *  *
  * Support for NAPATECH adapter with the 3GD Driver/API.
  * Requires libntapi from Napatech A/S.
  *
  */
-
 #include "suricata-common.h"
 #include "suricata.h"
 #include "threadvars.h"
 #include "tm-queuehandlers.h"
 #include "tm-threads.h"
 #include "tm-modules.h"
-
 #include "util-privs.h"
 #include "tmqh-packetpool.h"
+#include "util-napatech.h"
 #include "source-napatech.h"
 
 #ifndef HAVE_NAPATECH
 
 TmEcode NoNapatechSupportExit(ThreadVars *, const void *, void **);
 
-
-void TmModuleNapatechStreamRegister (void)
-{
+void TmModuleNapatechStreamRegister(void) {
     tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream";
     tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NoNapatechSupportExit;
     tmm_modules[TMM_RECEIVENAPATECH].Func = NULL;
@@ -54,8 +51,7 @@ void TmModuleNapatechStreamRegister (void)
     tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_ADMIN;
 }
 
-void TmModuleNapatechDecodeRegister (void)
-{
+void TmModuleNapatechDecodeRegister(void) {
     tmm_modules[TMM_DECODENAPATECH].name = "NapatechDecode";
     tmm_modules[TMM_DECODENAPATECH].ThreadInit = NoNapatechSupportExit;
     tmm_modules[TMM_DECODENAPATECH].Func = NULL;
@@ -66,8 +62,7 @@ void TmModuleNapatechDecodeRegister (void)
     tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM;
 }
 
-TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data)
-{
+TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data) {
     SCLogError(SC_ERR_NAPATECH_NOSUPPORT,
             "Error creating thread %s: you do not have support for Napatech adapter "
             "enabled please recompile with --enable-napatech", tv->name);
@@ -76,31 +71,37 @@ TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data)
 
 #else /* Implied we do have NAPATECH support */
 
+
 #include <nt.h>
 
+#define MAX_STREAMS 256
+
 extern int max_pending_packets;
 
 typedef struct NapatechThreadVars_ {
     ThreadVars *tv;
     NtNetStreamRx_t rx_stream;
-    uint64_t stream_id;
+    uint16_t stream_id;
     int hba;
-    uint64_t pkts;
-    uint64_t drops;
-    uint64_t bytes;
-
     TmSlot *slot;
 } NapatechThreadVars;
 
 
 TmEcode NapatechStreamThreadInit(ThreadVars *, const void *, void **);
 void NapatechStreamThreadExitStats(ThreadVars *, void *);
-TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot);
+TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot);
 
 TmEcode NapatechDecodeThreadInit(ThreadVars *, const void *, void **);
 TmEcode NapatechDecodeThreadDeinit(ThreadVars *tv, void *data);
 TmEcode NapatechDecode(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
 
+/* These are used as the threads are exiting to get a comprehensive count of
+ * all the packets received and dropped.
+ */
+SC_ATOMIC_DECLARE(uint64_t, total_packets);
+SC_ATOMIC_DECLARE(uint64_t, total_drops);
+SC_ATOMIC_DECLARE(uint16_t, total_tallied);
+
 /**
  * \brief Register the Napatech  receiver (reader) module.
  */
@@ -109,13 +110,17 @@ void TmModuleNapatechStreamRegister(void)
     tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream";
     tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NapatechStreamThreadInit;
     tmm_modules[TMM_RECEIVENAPATECH].Func = NULL;
-    tmm_modules[TMM_RECEIVENAPATECH].PktAcqLoop = NapatechStreamLoop;
+    tmm_modules[TMM_RECEIVENAPATECH].PktAcqLoop = NapatechPacketLoopZC;
     tmm_modules[TMM_RECEIVENAPATECH].PktAcqBreakLoop = NULL;
     tmm_modules[TMM_RECEIVENAPATECH].ThreadExitPrintStats = NapatechStreamThreadExitStats;
     tmm_modules[TMM_RECEIVENAPATECH].ThreadDeinit = NapatechStreamThreadDeinit;
     tmm_modules[TMM_RECEIVENAPATECH].RegisterTests = NULL;
     tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_RAW;
     tmm_modules[TMM_RECEIVENAPATECH].flags = TM_FLAG_RECEIVE_TM;
+
+    SC_ATOMIC_INIT(total_packets);
+    SC_ATOMIC_INIT(total_drops);
+    SC_ATOMIC_INIT(total_tallied);
 }
 
 /**
@@ -133,6 +138,13 @@ void TmModuleNapatechDecodeRegister(void)
     tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM;
 }
 
+/*
+ *-----------------------------------------------------------------------------
+ *-----------------------------------------------------------------------------
+ * Statistics code
+ *-----------------------------------------------------------------------------
+*/
+
 /**
  * \brief   Initialize the Napatech receiver thread, generate a single
  *          NapatechThreadVar structure for each thread, this will
@@ -152,13 +164,11 @@ void TmModuleNapatechDecodeRegister(void)
 TmEcode NapatechStreamThreadInit(ThreadVars *tv, const void *initdata, void **data)
 {
     SCEnter();
-    struct NapatechStreamDevConf *conf = (struct NapatechStreamDevConf *)initdata;
-    uintmax_t stream_id = conf->stream_id;
+    struct NapatechStreamDevConf *conf = (struct NapatechStreamDevConf *) initdata;
+    uint16_t stream_id = conf->stream_id;
     *data = NULL;
 
-    SCLogInfo("Napatech  Thread Stream ID:%lu", stream_id);
-
-    NapatechThreadVars *ntv = SCMalloc(sizeof(NapatechThreadVars));
+    NapatechThreadVars *ntv = SCCalloc(1, sizeof (NapatechThreadVars));
     if (unlikely(ntv == NULL)) {
         SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH  thread vars.");
         exit(EXIT_FAILURE);
@@ -168,42 +178,62 @@ TmEcode NapatechStreamThreadInit(ThreadVars *tv, const void *initdata, void **da
     ntv->stream_id = stream_id;
     ntv->tv = tv;
     ntv->hba = conf->hba;
+    SCLogDebug("Started processing packets from NAPATECH  Stream: %lu", ntv->stream_id);
 
-    SCLogInfo("Started processing packets from NAPATECH  Stream: %lu", ntv->stream_id);
-
-    *data = (void *)ntv;
-
+    *data = (void *) ntv;
     SCReturnInt(TM_ECODE_OK);
 }
 
-/**
- *  \brief Main Napatech reading Loop function
- */
-TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
+static PacketQueue packets_to_release[MAX_STREAMS];
+
+static void NapatechReleasePacket(struct Packet_ *p)
 {
-    SCEnter();
+    PacketFreeOrRelease(p);
+    PacketEnqueue(&packets_to_release[p->ntpv.stream_id], p);
+}
 
+TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
+{
     int32_t status;
-    char errbuf[100];
+    char error_buffer[100];
     uint64_t pkt_ts;
     NtNetBuf_t packet_buffer;
-    NapatechThreadVars *ntv = (NapatechThreadVars *)data;
-    NtNetRx_t stat_cmd;
-
-    SCLogInfo("Opening NAPATECH Stream: %lu for processing", ntv->stream_id);
+    NapatechThreadVars *ntv = (NapatechThreadVars *) data;
+    uint64_t hba_pkt_drops = 0;
+    uint64_t hba_byte_drops = 0;
+    uint16_t hba_pkt = 0;
+
+    /* This just keeps the startup output more orderly. */
+    usleep(200000 * ntv->stream_id);
+
+    if (ntv->hba > 0) {
+        char *s_hbad_pkt = SCCalloc(1, 32);
+        if (unlikely(s_hbad_pkt == NULL)) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+            exit(EXIT_FAILURE);
+        }
+        snprintf(s_hbad_pkt, 32, "nt%d.hba_drop", ntv->stream_id);
+        hba_pkt = StatsRegisterCounter(s_hbad_pkt, tv);
+        StatsSetupPrivate(tv);
+        StatsSetUI64(tv, hba_pkt, 0);
+    }
+    SCLogDebug("Opening NAPATECH Stream: %lu for processing", ntv->stream_id);
 
     if ((status = NT_NetRxOpen(&(ntv->rx_stream), "SuricataStream", NT_NET_INTERFACE_PACKET, ntv->stream_id, ntv->hba)) != NT_SUCCESS) {
-        NT_ExplainError(status, errbuf, sizeof(errbuf));
-        SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %lu - %s", ntv->stream_id, errbuf);
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %u - %s", ntv->stream_id, error_buffer);
         SCFree(ntv);
         SCReturnInt(TM_ECODE_FAILED);
     }
 
-    stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP;
+#if defined(__linux__)
+    SCLogInfo("Napatech Packet Loop Started - cpu: %3d,    stream: %3u (numa: %u)",
+            sched_getcpu(), ntv->stream_id, NapatechGetNumaNode(ntv->stream_id));
+#else
+    SCLogInfo("Napatech Packet Loop Started -  stream: %lu ", ntv->stream_id);
+#endif
 
-    SCLogInfo("Napatech Packet Stream Loop Started for Stream ID: %lu", ntv->stream_id);
-
-    TmSlot *s = (TmSlot *)slot;
+    TmSlot *s = (TmSlot *) slot;
     ntv->slot = s->slot_next;
 
     while (!(suricata_ctl_flags & SURICATA_STOP)) {
@@ -211,19 +241,15 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
          * us from alloc'ing packets at line rate */
         PacketPoolWait();
 
-        /*
-         * Napatech returns packets 1 at a time
-         */
+        /* Napatech returns packets 1 at a time */
         status = NT_NetRxGet(ntv->rx_stream, &packet_buffer, 1000);
         if (unlikely(status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN)) {
-            /*
-             * no frames currently available
-             */
             continue;
         } else if (unlikely(status != NT_SUCCESS)) {
-            SCLogError(SC_ERR_NAPATECH_STREAM_NEXT_FAILED,
-                       "Failed to read from Napatech Stream: %lu",
-                       ntv->stream_id);
+            NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+
+            SCLogInfo("Failed to read from Napatech Stream%d: %s",
+                    ntv->stream_id, error_buffer);
             SCReturnInt(TM_ECODE_FAILED);
         }
 
@@ -239,7 +265,7 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
          * Handle the different timestamp forms that the napatech cards could use
          *   - NT_TIMESTAMP_TYPE_NATIVE is not supported due to having an base of 0 as opposed to NATIVE_UNIX which has a base of 1/1/1970
          */
-        switch(NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) {
+        switch (NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) {
             case NT_TIMESTAMP_TYPE_NATIVE_UNIX:
                 p->ts.tv_sec = pkt_ts / 100000000;
                 p->ts.tv_usec = ((pkt_ts % 100000000) / 100) + (pkt_ts % 100) > 50 ? 1 : 0;
@@ -259,30 +285,34 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
                 break;
             default:
                 SCLogError(SC_ERR_NAPATECH_TIMESTAMP_TYPE_NOT_SUPPORTED,
-                           "Packet from Napatech Stream: %lu does not have a supported timestamp format",
-                           ntv->stream_id);
+                        "Packet from Napatech Stream: %u does not have a supported timestamp format",
+                        ntv->stream_id);
                 NT_NetRxRelease(ntv->rx_stream, packet_buffer);
                 SCReturnInt(TM_ECODE_FAILED);
         }
 
-        SCLogDebug("p->ts.tv_sec %"PRIuMAX"", (uintmax_t)p->ts.tv_sec);
-        p->datalink = LINKTYPE_ETHERNET;
-
-        ntv->pkts++;
-        ntv->bytes += NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer);
-
-        // Update drop counter
-        if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS))
-        {
-            NT_ExplainError(status, errbuf, sizeof(errbuf));
-            SCLogWarning(SC_ERR_NAPATECH_STAT_DROPS_FAILED, "Couldn't retrieve drop statistics from the RX stream: %lu - %s", ntv->stream_id, errbuf);
-        }
-        else
-        {
-            ntv->drops += stat_cmd.u.streamDrop.pktsDropped;
+        if (unlikely(ntv->hba > 0)) {
+            NtNetRx_t stat_cmd;
+            stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP;
+            // Update drop counter
+            if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS)) {
+                NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+                SCLogInfo("Couldn't retrieve drop statistics from the RX stream: %u - %s",
+                        ntv->stream_id, error_buffer);
+            } else {
+                hba_pkt_drops = stat_cmd.u.streamDrop.pktsDropped;
+
+                StatsSetUI64(tv, hba_pkt, hba_pkt_drops);
+            }
+            StatsSyncCountersIfSignalled(tv);
         }
 
-        if (unlikely(PacketCopyData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) {
+        p->ReleasePacket = NapatechReleasePacket;
+        p->ntpv.nt_packet_buf = packet_buffer;
+        p->ntpv.stream_id = ntv->stream_id;
+        p->datalink = LINKTYPE_ETHERNET;
+
+        if (unlikely(PacketSetData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) {
             TmqhOutputPacketpool(ntv->tv, p);
             NT_NetRxRelease(ntv->rx_stream, packet_buffer);
             SCReturnInt(TM_ECODE_FAILED);
@@ -294,13 +324,23 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
             SCReturnInt(TM_ECODE_FAILED);
         }
 
-        NT_NetRxRelease(ntv->rx_stream, packet_buffer);
+        /* Release any packets that were returned by the callback function */
+        Packet *rel_pkt = PacketDequeue(&packets_to_release[ntv->stream_id]);
+        while (rel_pkt != NULL) {
+            NT_NetRxRelease(ntv->rx_stream, rel_pkt->ntpv.nt_packet_buf);
+            rel_pkt = PacketDequeue(&packets_to_release[ntv->stream_id]);
+        }
         StatsSyncCountersIfSignalled(tv);
     }
 
+    if (unlikely(ntv->hba > 0)) {
+        SCLogInfo("Host Buffer Allowance Drops - pkts: %ld,  bytes: %ld", hba_pkt_drops, hba_byte_drops);
+    }
+
     SCReturnInt(TM_ECODE_OK);
 }
 
+
 /**
  * \brief Print some stats to the log at program exit.
  *
@@ -309,12 +349,31 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
  */
 void NapatechStreamThreadExitStats(ThreadVars *tv, void *data)
 {
-    NapatechThreadVars *ntv = (NapatechThreadVars *)data;
-    double percent = 0;
-    if (ntv->drops > 0)
-        percent = (((double) ntv->drops) / (ntv->pkts+ntv->drops)) * 100;
+    NapatechThreadVars *ntv = (NapatechThreadVars *) data;
+    NapatechCurrentStats stat = NapatechGetCurrentStats(ntv->stream_id);
 
-    SCLogNotice("Stream: %lu; Packets: %"PRIu64"; Drops: %"PRIu64" (%5.2f%%); Bytes: %"PRIu64, ntv->stream_id, ntv->pkts, ntv->drops, percent, ntv->bytes);
+    double percent = 0;
+    if (stat.current_drops > 0)
+        percent = (((double) stat.current_drops)
+                  / (stat.current_packets + stat.current_drops)) * 100;
+
+    SCLogInfo("nt%lu - pkts: %lu; drop: %lu (%5.2f%%); bytes: %lu",
+                 (uint64_t) ntv->stream_id, stat.current_packets,
+                  stat.current_drops, percent, stat.current_bytes);
+
+    SC_ATOMIC_ADD(total_packets, stat.current_packets);
+    SC_ATOMIC_ADD(total_drops, stat.current_drops);
+    SC_ATOMIC_ADD(total_tallied, 1);
+
+    if (SC_ATOMIC_GET(total_tallied) == GetNumConfiguredStreams()) {
+        if (SC_ATOMIC_GET(total_drops) > 0)
+            percent = (((double) SC_ATOMIC_GET(total_drops)) / (SC_ATOMIC_GET(total_packets)
+                         + SC_ATOMIC_GET(total_drops))) * 100;
+
+        SCLogInfo(" ");
+        SCLogInfo("--- Total Packets: %ld  Total Dropped: %ld (%5.2f%%)",
+                  SC_ATOMIC_GET(total_packets), SC_ATOMIC_GET(total_drops), percent);
+    }
 }
 
 /**
@@ -325,13 +384,12 @@ void NapatechStreamThreadExitStats(ThreadVars *tv, void *data)
 TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data)
 {
     SCEnter();
-    NapatechThreadVars *ntv = (NapatechThreadVars *)data;
+    NapatechThreadVars *ntv = (NapatechThreadVars *) data;
     SCLogDebug("Closing Napatech Stream: %d", ntv->stream_id);
     NT_NetRxClose(ntv->rx_stream);
     SCReturnInt(TM_ECODE_OK);
 }
 
-
 /** Decode Napatech */
 
 /**
@@ -350,14 +408,14 @@ TmEcode NapatechDecode(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
 {
     SCEnter();
 
-    DecodeThreadVars *dtv = (DecodeThreadVars *)data;
+    DecodeThreadVars *dtv = (DecodeThreadVars *) data;
 
     /* XXX HACK: flow timeout can call us for injected pseudo packets
      *           see bug: https://redmine.openinfosecfoundation.org/issues/1107 */
     if (p->flags & PKT_PSEUDO_STREAM_END)
         return TM_ECODE_OK;
 
-    /* update counters */
+    // update counters
     DecodeUpdatePacketCounters(tv, dtv, p);
 
     switch (p->datalink) {
@@ -372,7 +430,6 @@ TmEcode NapatechDecode(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
     }
 
     PacketDecodeFinalize(tv, dtv, p);
-
     SCReturnInt(TM_ECODE_OK);
 }
 
@@ -380,16 +437,12 @@ TmEcode NapatechDecodeThreadInit(ThreadVars *tv, const void *initdata, void **da
 {
     SCEnter();
     DecodeThreadVars *dtv = NULL;
-
     dtv = DecodeThreadVarsAlloc(tv);
-
-    if(dtv == NULL)
+    if (dtv == NULL)
         SCReturnInt(TM_ECODE_FAILED);
 
     DecodeRegisterPerfCounters(dtv, tv);
-
-    *data = (void *)dtv;
-
+    *data = (void *) dtv;
     SCReturnInt(TM_ECODE_OK);
 }
 
@@ -397,7 +450,6 @@ TmEcode NapatechDecodeThreadDeinit(ThreadVars *tv, void *data)
 {
     if (data != NULL)
         DecodeThreadVarsFree(tv, data);
-    SCReturnInt(TM_ECODE_OK);
-}
+    SCReturnInt(TM_ECODE_OK);    }
 
 #endif /* HAVE_NAPATECH */
index eee79dc76d0586f4bd547c4f91af70e2fcc17896..be40d781e0dab71725130c54234a0beecd2da96f 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 /**
  * \file
  *
- * \author nPulse Technologies, LLC
+ * \author nPulse Technologies, LLC.
  * \author Matt Keeler <mk@npulsetech.com>
  */
-
 #ifndef __SOURCE_NAPATECH_H__
 #define __SOURCE_NAPATECH_H__
 
@@ -29,16 +28,14 @@ void TmModuleNapatechStreamRegister (void);
 TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data);
 void TmModuleNapatechDecodeRegister (void);
 
+#ifdef HAVE_NAPATECH
+#include <nt.h>
+
 struct NapatechStreamDevConf
 {
-    int stream_id;
+    uint16_t stream_id;
     intmax_t hba;
 };
 
-#ifdef HAVE_NAPATECH
-
-#include <nt.h>
-
-#endif
-
+#endif /* HAVE_NAPATECH */
 #endif /* __SOURCE_NAPATECH_H__ */
diff --git a/src/util-napatech.c b/src/util-napatech.c
new file mode 100644 (file)
index 0000000..ba2aac0
--- /dev/null
@@ -0,0 +1,736 @@
+/* Copyright (C) 2017 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Napatech Inc.
+ * \author Phil Young <py@napatech.com>
+ *
+ *
+  */
+
+
+#include "suricata-common.h"
+#ifdef HAVE_NAPATECH
+#include "suricata.h"
+#include "threadvars.h"
+#include "tm-threads.h"
+
+
+uint16_t NapatechGetNumaNode(uint16_t stream_id)
+{
+    int status;
+    char buffer[80]; // Error buffer
+    NtInfoStream_t info_stream;
+    NtInfo_t info;
+    uint16_t numa_node;
+
+    if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
+        NT_ExplainError(status, buffer, sizeof (buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    // Read the info on this specific stream
+    info.cmd = NT_INFO_CMD_READ_STREAMID;
+    info.u.streamID.streamId = stream_id;
+    if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
+        NT_ExplainError(status, buffer, sizeof (buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    numa_node = info.u.streamID.data.numaNode;
+
+    if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
+        NT_ExplainError(status, buffer, sizeof (buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", buffer);
+        exit(EXIT_FAILURE);
+    }
+    return numa_node;
+}
+
+
+/*-----------------------------------------------------------------------------
+ *-----------------------------------------------------------------------------
+ * Statistics code
+ *-----------------------------------------------------------------------------
+*/
+
+typedef struct StreamCounters_ {
+    uint16_t pkts;
+    uint16_t byte;
+    uint16_t drop;
+} StreamCounters;
+
+
+NapatechCurrentStats current_stats[MAX_STREAMS];
+
+NapatechCurrentStats NapatechGetCurrentStats(uint16_t id)
+{
+
+    return current_stats[id];
+}
+
+
+static uint16_t TestStreamConfig(
+        NtInfoStream_t hInfo,
+        NtStatStream_t hStatStream,
+        NapatechStreamConfig stream_config[],
+        uint16_t num_inst)
+{
+
+    uint16_t num_active = 0;
+
+    for (uint16_t inst = 0; inst < num_inst; ++inst) {
+        int status;
+        char buffer[80]; // Error buffer
+        NtStatistics_t stat; // Stat handle.
+
+        /* Check to see if it is an active stream */
+        memset(&stat, 0, sizeof (NtStatistics_t));
+
+        /* Read usage data for the chosen stream ID */
+        stat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+        stat.u.usageData_v0.streamid = (uint8_t) stream_config[inst].stream_id;
+
+        if ((status = NT_StatRead(hStatStream, &stat)) != NT_SUCCESS) {
+            /* Get the status code as text */
+            NT_ExplainError(status, buffer, sizeof (buffer));
+            SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead():2 failed: %s\n", buffer);
+            return 0;
+        }
+
+        if (stat.u.usageData_v0.data.numHostBufferUsed > 0) {
+            stream_config[inst].is_active = true;
+            num_active++;
+        } else {
+            stream_config[inst].is_active = false;
+        }
+    }
+
+    return num_active;
+}
+
+static uint32_t UpdateStreamStats(ThreadVars *tv,
+        NtInfoStream_t hInfo,
+        NtStatStream_t hStatStream,
+        uint16_t num_streams,
+        NapatechStreamConfig stream_config[],
+        StreamCounters streamCounters[]
+        )
+{
+    static uint64_t rxPktsStart[MAX_STREAMS] = {0};
+    static uint64_t rxByteStart[MAX_STREAMS] = {0};
+    static uint64_t dropStart[MAX_STREAMS] = {0};
+
+    int status;
+    char error_buffer[80]; // Error buffer
+    NtInfo_t hStreamInfo;
+    NtStatistics_t hStat; // Stat handle.
+
+    /* Query the system to get the number of streams currently instantiated */
+    hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
+    if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    uint16_t num_active;
+    if ((num_active = TestStreamConfig(hInfo, hStatStream, stream_config, num_streams)) == 0) {
+        /* None of the configured streams are active */
+        return 0;
+    }
+
+    /* At least one stream is active so proceed with the stats. */
+    uint16_t inst_id = 0;
+    uint32_t stream_cnt = 0;
+    for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
+
+
+        while(inst_id < num_streams) {
+            if (stream_config[inst_id].is_active) {
+                break;
+            } else {
+                ++inst_id;
+            }
+        }
+        if (inst_id == num_streams)
+            break;
+
+        /* Read usage data for the chosen stream ID */
+        memset(&hStat, 0, sizeof (NtStatistics_t));
+        hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+        hStat.u.usageData_v0.streamid = (uint8_t) stream_config[inst_id].stream_id;
+
+        if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+            /* Get the status code as text */
+            NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+            SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+            return 0;
+        }
+
+        uint16_t stream_id = stream_config[inst_id].stream_id;
+        if (stream_config[inst_id].is_active) {
+            uint64_t rxPktsTotal = 0;
+            uint64_t rxByteTotal = 0;
+            uint64_t dropTotal = 0;
+
+            for (uint32_t hbCount = 0; hbCount < hStat.u.usageData_v0.data.numHostBufferUsed; hbCount++) {
+                if (unlikely(stream_config[inst_id].initialized == false)) {
+                    rxPktsStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames;
+                    rxByteStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes;
+                    dropStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames;
+                    stream_config[inst_id].initialized = true;
+                } else {
+                    rxPktsTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames;
+                    rxByteTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes;
+                    dropTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames;
+                }
+            }
+
+            current_stats[stream_id].current_packets = rxPktsTotal - rxPktsStart[stream_id];
+            current_stats[stream_id].current_bytes = rxByteTotal - rxByteStart[stream_id];
+            current_stats[stream_id].current_drops = dropTotal - dropStart[stream_id];
+        }
+
+        StatsSetUI64(tv, streamCounters[inst_id].pkts, current_stats[stream_id].current_packets);
+        StatsSetUI64(tv, streamCounters[inst_id].byte, current_stats[stream_id].current_bytes);
+        StatsSetUI64(tv, streamCounters[inst_id].drop, current_stats[stream_id].current_drops);
+
+        ++inst_id;
+    }
+    return num_active;
+}
+
+static void *NapatechStatsLoop(void *arg)
+{
+    ThreadVars *tv = (ThreadVars *) arg;
+
+    int status;
+    char error_buffer[80]; // Error buffer
+    NtInfoStream_t hInfo;
+    NtStatStream_t hStatStream;
+
+    NapatechStreamConfig stream_config[MAX_STREAMS];
+    uint16_t stream_cnt = NapatechGetStreamConfig(stream_config);
+
+    /* Open the info and Statistics */
+    if ((status = NT_InfoOpen(&hInfo, "StatsLoopInfoStream")) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_RUNMODE, "NT_InfoOpen() failed: %s\n", error_buffer);
+        return NULL;
+    }
+
+    if ((status = NT_StatOpen(&hStatStream, "StatsLoopStatsStream")) != NT_SUCCESS) {
+        /* Get the status code as text */
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
+        return NULL;
+    }
+
+    StreamCounters streamCounters[MAX_STREAMS];
+    for (int i = 0; i < stream_cnt; ++i) {
+        char *pkts_buf = SCCalloc(1, 32);
+        if (unlikely(pkts_buf == NULL)) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+            exit(EXIT_FAILURE);
+        }
+
+        snprintf(pkts_buf, 32, "nt%d.pkts", stream_config[i].stream_id);
+        streamCounters[i].pkts = StatsRegisterCounter(pkts_buf, tv);
+
+        char *byte_buf = SCCalloc(1, 32);
+        if (unlikely(byte_buf == NULL)) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+            exit(EXIT_FAILURE);
+        }
+        snprintf(byte_buf, 32, "nt%d.bytes", stream_config[i].stream_id);
+        streamCounters[i].byte = StatsRegisterCounter(byte_buf, tv);
+
+        char *drop_buf = SCCalloc(1, 32);
+        if (unlikely(drop_buf == NULL)) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+            exit(EXIT_FAILURE);
+        }
+        snprintf(drop_buf, 32, "nt%d.drop", stream_config[i].stream_id);
+        streamCounters[i].drop = StatsRegisterCounter(drop_buf, tv);
+    }
+
+    StatsSetupPrivate(tv);
+
+    for (int i = 0; i < stream_cnt; ++i) {
+        StatsSetUI64(tv, streamCounters[i].pkts, 0);
+        StatsSetUI64(tv, streamCounters[i].byte, 0);
+        StatsSetUI64(tv, streamCounters[i].drop, 0);
+    }
+
+    uint32_t num_active = UpdateStreamStats(tv, hInfo, hStatStream,
+            stream_cnt, stream_config, streamCounters);
+
+    if (num_active < stream_cnt) {
+        SCLogInfo("num_active: %d,  stream_cnt: %d", num_active, stream_cnt);
+        SCLogWarning(SC_ERR_NAPATECH_CONFIG_STREAM,
+                "Some or all of the configured streams are not created.  Proceeding with active streams.");
+    }
+
+    TmThreadsSetFlag(tv, THV_INIT_DONE);
+    while (1) {
+        if (TmThreadsCheckFlag(tv, THV_KILL)) {
+            SCLogDebug("NapatechStatsLoop THV_KILL detected");
+            break;
+        }
+
+        UpdateStreamStats(tv, hInfo, hStatStream,
+                stream_cnt, stream_config, streamCounters);
+
+        StatsSyncCountersIfSignalled(tv);
+        usleep(1000000);
+    }
+
+    /* CLEAN UP NT Resources and Close the info stream */
+    if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_RUNMODE, "NT_InfoClose() failed: %s\n", error_buffer);
+        return NULL;
+    }
+
+    /* Close the statistics stream */
+    if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
+        /* Get the status code as text */
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
+        return NULL;
+    }
+
+
+    SCLogDebug("Exiting NapatechStatsLoop");
+    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
+    TmThreadWaitForFlag(tv, THV_DEINIT);
+    TmThreadsSetFlag(tv, THV_CLOSED);
+
+    return NULL;
+}
+
+#define MAX_HOSTBUFFER 4
+#define MAX_STREAMS 256
+#define HB_HIGHWATER 2048 //1982
+
+static bool RegisteredStream(uint16_t stream_id, uint16_t num_registered,
+                             NapatechStreamConfig registered_streams[])
+{
+    for (uint16_t reg_id = 0; reg_id < num_registered; ++reg_id) {
+        if (stream_id == registered_streams[reg_id].stream_id) {
+            return true;
+        }
+    }
+    return false;
+}
+
+uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
+{
+    int status;
+    char error_buffer[80]; // Error buffer
+    NtStatStream_t hStatStream;
+    NtStatistics_t hStat; // Stat handle.
+    NtInfoStream_t info_stream;
+    NtInfo_t info;
+    uint16_t instance_cnt = 0;
+    int use_all_streams;
+    ConfNode *ntstreams;
+
+    for (uint16_t i = 0; i < MAX_STREAMS; ++i) {
+        stream_config[i].stream_id = 0;
+        stream_config[i].is_active = false;
+        stream_config[i].initialized = false;
+    }
+
+    if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
+        SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf");
+        exit(EXIT_FAILURE);
+    }
+
+    if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
+        /* Get the status code as text */
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    if (use_all_streams) {
+        info.cmd = NT_INFO_CMD_READ_STREAM;
+        if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
+            NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+            SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buffer);
+            exit(EXIT_FAILURE);
+        }
+
+        uint16_t stream_id = 0;
+        while (instance_cnt < info.u.stream.data.count) {
+
+            /*
+             *  For each stream ID query the number of host-buffers used by
+             * the stream.  If zero, then that streamID is not used; skip
+             * over it and continue until we get a streamID with a non-zero
+             * count of the host-buffers.
+             */
+            memset(&hStat, 0, sizeof (NtStatistics_t));
+
+            /* Read usage data for the chosen stream ID */
+            hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+            hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
+
+            if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+                /* Get the status code as text */
+                NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+                SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+                return 0;
+            }
+
+            if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
+                ++stream_id;
+                continue;
+            }
+
+            /* if we get here it is an active  stream */
+            stream_config[instance_cnt].stream_id = stream_id++;
+            stream_config[instance_cnt].is_active = true;
+            instance_cnt++;
+        }
+
+    } else {
+        /* When not using the default streams we need to parse the array of streams from the conf */
+        if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) {
+            SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf");
+            exit(EXIT_FAILURE);
+        }
+
+        /* Loop through all stream numbers in the array and register the devices */
+        ConfNode *stream;
+        instance_cnt = 0;
+
+        TAILQ_FOREACH(stream, &ntstreams->head, next) {
+            uint16_t stream_id = 0;
+
+            if (stream == NULL) {
+                SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration");
+                exit(EXIT_FAILURE);
+            }
+
+            uint16_t start, end;
+            if (strchr(stream->val, '-')) {
+                char copystr[16];
+                strlcpy(copystr, stream->val, 16);
+
+                start = atoi(copystr);
+                end = atoi(strchr(copystr, '-')+1);
+            } else {
+                stream_config[instance_cnt].stream_id = atoi(stream->val);
+                start = stream_config[instance_cnt].stream_id;
+                end = stream_config[instance_cnt].stream_id;
+            }
+            SCLogInfo("%s start: %d  end: %d", stream->val, start, end);
+
+            for (stream_id = start; stream_id <= end; ++stream_id) {
+                /* if we get here it is configured in the .yaml file */
+                stream_config[instance_cnt].stream_id = stream_id;
+
+                /* Check to see if it is an active stream */
+                memset(&hStat, 0, sizeof (NtStatistics_t));
+
+                /* Read usage data for the chosen stream ID */
+                hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+                hStat.u.usageData_v0.streamid = (uint8_t) stream_config[instance_cnt].stream_id;
+
+                if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+                    /* Get the status code as text */
+                    NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+                    SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+                    return 0;
+                }
+
+                if (hStat.u.usageData_v0.data.numHostBufferUsed > 0) {
+                    stream_config[instance_cnt].is_active = true;
+                }
+                instance_cnt++;
+            }
+        }
+    }
+
+    /* Close the statistics stream */
+    if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
+        /* Get the status code as text */
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
+                    "NT_InfoClose failed: %s", error_buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    return instance_cnt;
+}
+
+
+static void *NapatechBufMonitorLoop(void *arg)
+{
+    ThreadVars *tv = (ThreadVars *) arg;
+
+    NtInfo_t hStreamInfo;
+    NtStatistics_t hStat; // Stat handle.
+    NtInfoStream_t hInfo;
+    NtStatStream_t hStatStream;
+
+    char error_buffer[NT_ERRBUF_SIZE]; // Error buffer
+    int status; // Status variable
+
+    const uint32_t alertInterval = 25;
+
+    uint32_t OB_fill_level[MAX_STREAMS] = {0};
+    uint32_t OB_alert_level[MAX_STREAMS] = {0};
+    uint32_t ave_OB_fill_level[MAX_STREAMS] = {0};
+
+    uint32_t HB_fill_level[MAX_STREAMS] = {0};
+    uint32_t HB_alert_level[MAX_STREAMS] = {0};
+    uint32_t ave_HB_fill_level[MAX_STREAMS] = {0};
+
+    /* Open the info and Statistics */
+    if ((status = NT_InfoOpen(&hInfo, "InfoStream")) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoOpen() failed: %s\n", error_buffer);
+        exit(1);
+    }
+
+    if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
+        /* Get the status code as text */
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatOpen() failed: %s\n", error_buffer);
+        exit(1);
+    }
+
+    /* Read the info on all streams instantiated in the system */
+    hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
+    if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
+        exit(EXIT_FAILURE);
+    }
+
+    NapatechStreamConfig registered_streams[MAX_STREAMS];
+    uint16_t num_registered = NapatechGetStreamConfig(registered_streams);
+
+    TmThreadsSetFlag(tv, THV_INIT_DONE);
+    while (1) {
+        if (TmThreadsCheckFlag(tv, THV_KILL)) {
+            SCLogDebug("NapatechBufMonitorLoop THV_KILL detected");
+            break;
+        }
+
+        usleep(200000);
+
+        /* Read the info on all streams instantiated in the system */
+        hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
+        if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
+            NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+            SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
+            exit(EXIT_FAILURE);
+        }
+
+        char pktCntStr[4096];
+        memset(pktCntStr, 0, sizeof (pktCntStr));
+
+        uint32_t stream_id = 0;
+        uint32_t stream_cnt = 0;
+        uint32_t num_streams = hStreamInfo.u.stream.data.count;
+
+        for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
+
+            do {
+
+                /* Read usage data for the chosen stream ID */
+                hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+                hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
+
+                if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+                    /* Get the status code as text */
+                    NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+                    SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+                    exit(1);
+                }
+
+                if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
+                    ++stream_id;
+                    continue;
+                }
+            } while (hStat.u.usageData_v0.data.numHostBufferUsed == 0);
+
+            if (RegisteredStream(stream_id, num_registered, registered_streams)) {
+                ave_OB_fill_level[stream_id] = 0;
+                ave_HB_fill_level[stream_id] = 0;
+
+                for (uint32_t hb_count = 0; hb_count < hStat.u.usageData_v0.data.numHostBufferUsed; hb_count++) {
+
+                    OB_fill_level[hb_count] =
+                            ((100 * hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.used) /
+                            hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.size);
+
+                    if (OB_fill_level[hb_count] > 100) {
+                        OB_fill_level[hb_count] = 100;
+                    }
+
+                    uint32_t bufSize = hStat.u.usageData_v0.data.hb[hb_count].enQueuedAdapter / 1024
+                            + hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024
+                            + hStat.u.usageData_v0.data.hb[hb_count].enQueued / 1024
+                            - HB_HIGHWATER;
+
+                    HB_fill_level[hb_count] = (uint32_t)
+                            ((100 * hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024) /
+                            bufSize);
+
+                    ave_OB_fill_level[stream_id] += OB_fill_level[hb_count];
+                    ave_HB_fill_level[stream_id] += HB_fill_level[hb_count];
+                }
+
+                ave_OB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed;
+                ave_HB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed;
+
+                /* Host Buffer Fill Level warnings... */
+                if (ave_HB_fill_level[stream_id] >= (HB_alert_level[stream_id] + alertInterval)) {
+
+                    while (ave_HB_fill_level[stream_id] >= HB_alert_level[stream_id] + alertInterval) {
+                        HB_alert_level[stream_id] += alertInterval;
+                    }
+                    SCLogInfo("nt%d - Increasing Host Buffer Fill Level : %4d%%",
+                            stream_id, ave_HB_fill_level[stream_id]);
+                }
+
+                if (HB_alert_level[stream_id] > 0) {
+                    if ((ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval))) {
+                        SCLogInfo("nt%d - Decreasing Host Buffer Fill Level: %4d%%",
+                                stream_id, ave_HB_fill_level[stream_id]);
+
+                        while (ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval)) {
+                            if ((HB_alert_level[stream_id]) > 0) {
+                                HB_alert_level[stream_id] -= alertInterval;
+                            } else break;
+                        }
+                    }
+                }
+
+                /* On Board SDRAM Fill Level warnings... */
+                if (ave_OB_fill_level[stream_id] >= (OB_alert_level[stream_id] + alertInterval)) {
+                    while (ave_OB_fill_level[stream_id] >= OB_alert_level[stream_id] + alertInterval) {
+                        OB_alert_level[stream_id] += alertInterval;
+
+                    }
+                    SCLogInfo("nt%d - Increasing Adapter SDRAM Fill Level: %4d%%",
+                            stream_id, ave_OB_fill_level[stream_id]);
+                }
+
+                if (OB_alert_level[stream_id] > 0) {
+                    if ((ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval))) {
+                        SCLogInfo("nt%d - Decreasing Adapter SDRAM Fill Level : %4d%%",
+                                stream_id, ave_OB_fill_level[stream_id]);
+
+                        while (ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval)) {
+                            if ((OB_alert_level[stream_id]) > 0) {
+                                OB_alert_level[stream_id] -= alertInterval;
+                            } else break;
+                        }
+                    }
+                }
+            }
+            ++stream_id;
+        }
+    }
+
+    if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoClose() failed: %s\n", error_buffer);
+        exit(1);
+    }
+
+    /* Close the statistics stream */
+    if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
+        /* Get the status code as text */
+        NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+        SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatClose() failed: %s\n", error_buffer);
+        exit(1);
+    }
+
+    SCLogDebug("Exiting NapatechStatsLoop");
+    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
+    TmThreadWaitForFlag(tv, THV_DEINIT);
+    TmThreadsSetFlag(tv, THV_CLOSED);
+
+    return NULL;
+}
+
+void NapatechStartStats(void)
+{
+    /* Creates the Statistic threads */
+    ThreadVars *stats_tv = TmThreadCreate("NapatechStats",
+            NULL, NULL,
+            NULL, NULL,
+            "custom", NapatechStatsLoop, 0);
+
+    if (stats_tv == NULL) {
+        SCLogError(SC_ERR_THREAD_CREATE,
+                "Error creating a thread for NapatechStats - Killing engine.");
+        exit(EXIT_FAILURE);
+    }
+
+    if (TmThreadSpawn(stats_tv) != 0) {
+        SCLogError(SC_ERR_THREAD_SPAWN,
+                "Failed to spawn thread for NapatechStats - Killing engine.");
+        exit(EXIT_FAILURE);
+    }
+
+    ThreadVars *buf_monitor_tv = TmThreadCreate("NapatechBufMonitor",
+            NULL, NULL,
+            NULL, NULL,
+            "custom", NapatechBufMonitorLoop, 0);
+
+    if (buf_monitor_tv == NULL) {
+        SCLogError(SC_ERR_THREAD_CREATE,
+                "Error creating a thread for NapatechBufMonitor - Killing engine.");
+        exit(EXIT_FAILURE);
+    }
+
+    if (TmThreadSpawn(buf_monitor_tv) != 0) {
+        SCLogError(SC_ERR_THREAD_SPAWN,
+                "Failed to spawn thread for NapatechBufMonitor - Killing engine.");
+        exit(EXIT_FAILURE);
+    }
+
+
+    return;
+}
+
+#endif // HAVE_NAPATECH
diff --git a/src/util-napatech.h b/src/util-napatech.h
new file mode 100644 (file)
index 0000000..5e1d0a4
--- /dev/null
@@ -0,0 +1,60 @@
+/* Copyright (C) 2017 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Phil Young <py@napatech.com>
+ *
+ */
+
+#ifndef __UTIL_NAPATECH_H__
+#define __UTIL_NAPATECH_H__
+
+#ifdef HAVE_NAPATECH
+#include <nt.h>
+
+typedef struct NapatechPacketVars_
+{
+    uint64_t stream_id;
+    NtNetBuf_t nt_packet_buf;
+    ThreadVars *tv;
+} NapatechPacketVars;
+
+
+typedef struct NapatechStreamConfig_
+{
+    uint16_t stream_id;
+    bool is_active;
+    bool initialized;
+} NapatechStreamConfig;
+
+typedef struct NapatechCurrentStats_ {
+    uint64_t current_packets;
+    uint64_t current_drops;
+    uint64_t current_bytes;
+} NapatechCurrentStats;
+
+#define MAX_STREAMS 256
+
+extern void NapatechStartStats(void);
+uint16_t NapatechGetNumaNode(uint16_t stream_id);
+NapatechCurrentStats NapatechGetCurrentStats(uint16_t id);
+uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[]);
+
+#endif //HAVE_NAPATECH
+#endif /* __UTIL_NAPATECH_H__ */