util-mpm-ac-tile-small.c \
util-mpm-hs.c util-mpm-hs.h \
util-mpm.c util-mpm.h \
+util-napatech.c util-napatech.h \
util-optimize.h \
util-pages.c util-pages.h \
util-path.c util-path.h \
#include "util-cuda-vars.h"
#endif /* __SC_CUDA_SUPPORT__ */
+#ifdef HAVE_NAPATECH
+#include "util-napatech.h"
+#endif /* HAVE_NAPATECH */
+
+
typedef enum {
CHECKSUM_VALIDATION_DISABLE,
CHECKSUM_VALIDATION_ENABLE,
#ifdef __SC_CUDA_SUPPORT__
CudaPacketVars cuda_pkt_vars;
#endif
+#ifdef HAVE_NAPATECH
+ NapatechPacketVars ntpv;
+#endif
}
#ifdef HAVE_MPIPE
/* mPIPE requires packet buffers to be aligned to 128 byte boundaries. */
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* \author nPulse Technologies, LLC.
* \author Matt Keeler <mk@npulsetech.com>
*/
+
#include "suricata-common.h"
#include "tm-threads.h"
#include "conf.h"
#include "runmodes.h"
#include "output.h"
-
#include "util-debug.h"
#include "util-time.h"
#include "util-cpu.h"
#include "util-affinity.h"
#include "util-runmodes.h"
#include "util-device.h"
-
+#include "util-napatech.h"
#include "runmode-napatech.h"
-
-// need NapatechStreamDevConf structure
-#include "source-napatech.h"
+#include "source-napatech.h" // need NapatechStreamDevConf structure
#define NT_RUNMODE_AUTOFP 1
#define NT_RUNMODE_WORKERS 2
static const char *default_mode = NULL;
#ifdef HAVE_NAPATECH
-static int num_configured_streams = 0;
+
+#define MAX_STREAMS 256
+static uint16_t num_configured_streams = 0;
+
+uint16_t GetNumConfiguredStreams(void) {
+ return num_configured_streams;
+}
+
#endif
const char *RunModeNapatechGetDefaultMode(void)
#endif
}
+
#ifdef HAVE_NAPATECH
-int NapatechRegisterDeviceStreams()
+
+
+static int NapatechRegisterDeviceStreams(void)
{
- NtInfoStream_t info_stream;
- NtInfo_t info;
- char error_buf[100];
- int status;
- int i;
- char live_dev_buf[9];
- int use_all_streams;
- ConfNode *ntstreams;
- ConfNode *stream_id;
- if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0)
- {
+ /* Display the configuration mode */
+ int use_all_streams;
+ if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf");
exit(EXIT_FAILURE);
}
- if (use_all_streams)
- {
+ if (use_all_streams) {
SCLogInfo("Using All Napatech Streams");
- // When using the default streams we need to query the service for a list of all configured
- if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS)
- {
- NT_ExplainError(status, error_buf, sizeof(error_buf) -1);
- SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buf);
- return -1;
- }
+ } else {
+ SCLogInfo("Using Selected Napatech Streams");
+ }
- info.cmd = NT_INFO_CMD_READ_STREAM;
- if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS)
- {
- NT_ExplainError(status, error_buf, sizeof(error_buf) -1);
- SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buf);
- return -1;
- }
+ /* Get the stream ID's either from the conf or by querying Napatech */
+ NapatechStreamConfig stream_config[MAX_STREAMS];
+ uint16_t stream_cnt = NapatechGetStreamConfig(stream_config);
+ num_configured_streams = stream_cnt;
+ SCLogDebug("Configuring %d Napatech Streams...", stream_cnt);
- num_configured_streams = info.u.stream.data.count;
- for (i = 0; i < num_configured_streams; i++)
- {
- // The Stream IDs do not have to be sequential
- snprintf(live_dev_buf, sizeof(live_dev_buf), "nt%d", info.u.stream.data.streamIDList[i]);
- LiveRegisterDevice(live_dev_buf);
- }
-
- if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS)
- {
- NT_ExplainError(status, error_buf, sizeof(error_buf) -1);
- SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", error_buf);
- return -1;
- }
- }
- else
- {
- SCLogInfo("Using Selected Napatech Streams");
- // When not using the default streams we need to parse the array of streams from the conf
- if ((ntstreams = ConfGetNode("napatech.streams")) == NULL)
- {
- SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf");
+ for (uint16_t inst = 0; inst < stream_cnt; ++inst) {
+ char *plive_dev_buf = SCCalloc(1, 9);
+ if (unlikely(plive_dev_buf == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
exit(EXIT_FAILURE);
}
-
- // Loop through all stream numbers in the array and register the devices
- TAILQ_FOREACH(stream_id, &ntstreams->head, next)
- {
- if (stream_id == NULL)
- {
- SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration");
- exit(EXIT_FAILURE);
- }
- num_configured_streams++;
-
- snprintf(live_dev_buf, sizeof(live_dev_buf), "nt%d", atoi(stream_id->val));
- LiveRegisterDevice(live_dev_buf);
- }
+ snprintf(plive_dev_buf, 9, "nt%d", stream_config[inst].stream_id);
+ SCLogInfo("registering Napatech device: %s - active stream%sfound.",
+ plive_dev_buf, stream_config[inst].is_active ? " " : " NOT ");
+ LiveRegisterDevice(plive_dev_buf);
}
+
+ /* Napatech stats come from a separate thread. This will surpress
+ * the counters when suricata exits.
+ */
+ LiveDeviceHasNoStats();
return 0;
}
-void *NapatechConfigParser(const char *device)
+static void *NapatechConfigParser(const char *device)
{
- // Expect device to be of the form nt%d where %d is the stream id to use
+ /* Expect device to be of the form nt%d where %d is the stream id to use */
int dev_len = strlen(device);
- struct NapatechStreamDevConf *conf = SCMalloc(sizeof(struct NapatechStreamDevConf));
- if (unlikely(conf == NULL))
+ struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf));
+ if (unlikely(conf == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device name.");
return NULL;
- if (dev_len < 3 || dev_len > 5)
- {
+ }
+ if (dev_len < 3 || dev_len > 5) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG, "Could not parse config for device: %s - invalid length", device);
return NULL;
}
- // device+5 is a pointer to the beginning of the stream id after the constant nt portion
- conf->stream_id = atoi(device+2);
+ /* device+5 is a pointer to the beginning of the stream id after the constant nt portion */
+ conf->stream_id = atoi(device + 2);
- // Set the host buffer allowance for this stream
- // Right now we just look at the global default - there is no per-stream hba configuration
- if (ConfGetInt("napatech.hba", &conf->hba) == 0)
+ /* Set the host buffer allowance for this stream
+ * Right now we just look at the global default - there is no per-stream hba configuration
+ */
+ if (ConfGetInt("napatech.hba", &conf->hba) == 0) {
conf->hba = -1;
-
+ }
return (void *) conf;
}
-int NapatechGetThreadsCount(void *conf __attribute__((unused))) {
- // No matter which live device it is there is no reason to ever use more than 1 thread
- // 2 or more thread would cause packet duplication
+static int NapatechGetThreadsCount(void *conf __attribute__((unused)))
+{
+ /* No matter which live device it is there is no reason to ever use more than 1 thread
+ 2 or more thread would cause packet duplication */
return 1;
}
static int NapatechInit(int runmode)
{
int ret;
- char errbuf[100];
+ char error_buf[100];
RunModeInitialize();
TimeModeSetLive();
/* Initialize the API and check version compatibility */
if ((ret = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) {
- NT_ExplainError(ret, errbuf, sizeof(errbuf));
- SCLogError(SC_ERR_NAPATECH_INIT_FAILED ,"NT_Init failed. Code 0x%X = %s", ret, errbuf);
+ NT_ExplainError(ret, error_buf, sizeof (error_buf));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_Init failed. Code 0x%X = %s", ret, error_buf);
exit(EXIT_FAILURE);
}
exit(EXIT_FAILURE);
}
- switch(runmode) {
+ struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf));
+ if (unlikely(conf == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device.");
+ exit(EXIT_FAILURE);
+ }
+
+ if ( (ConfGetInt("napatech.hba", &conf->hba) != 0) && (conf->hba > 0)){
+ SCLogInfo("Host Buffer Allowance: %d", (int)conf->hba);
+ }
+
+ /* Start a thread to process the statistics */
+ NapatechStartStats();
+
+ switch (runmode) {
case NT_RUNMODE_AUTOFP:
ret = RunModeSetLiveCaptureAutoFp(NapatechConfigParser, NapatechGetThreadsCount,
- "NapatechStream", "NapatechDecode",
- thread_name_autofp, NULL);
+ "NapatechStream", "NapatechDecode",
+ thread_name_autofp, NULL);
break;
case NT_RUNMODE_WORKERS:
ret = RunModeSetLiveCaptureWorkers(NapatechConfigParser, NapatechGetThreadsCount,
- "NapatechStream", "NapatechDecode",
- thread_name_workers, NULL);
+ "NapatechStream", "NapatechDecode",
+ thread_name_workers, NULL);
break;
default:
ret = -1;
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* \author Matt Keeler <mk@npulsetech.com>
*/
+
#ifndef __RUNMODE_NAPATECH_H__
#define __RUNMODE_NAPATECH_H__
#ifdef HAVE_NAPATECH
+#include "util-napatech.h"
#include <nt.h>
+
+
#endif
int RunModeNapatechAutoFp(void);
void RunModeNapatechRegister(void);
const char *RunModeNapatechGetDefaultMode(void);
+uint16_t GetNumConfiguredStreams(void);
+
+
+
#endif /* __RUNMODE_NAPATECH_H__ */
-/* Copyright (C) 2012-2014 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
/**
* \file
*
- * \author nPulse Technologies, LLC.
- * \author Matt Keeler <mk@npulsetech.com>
- *
+- * \author nPulse Technologies, LLC.
+- * \author Matt Keeler <mk@npulsetech.com>
+ * *
* Support for NAPATECH adapter with the 3GD Driver/API.
* Requires libntapi from Napatech A/S.
*
*/
-
#include "suricata-common.h"
#include "suricata.h"
#include "threadvars.h"
#include "tm-queuehandlers.h"
#include "tm-threads.h"
#include "tm-modules.h"
-
#include "util-privs.h"
#include "tmqh-packetpool.h"
+#include "util-napatech.h"
#include "source-napatech.h"
#ifndef HAVE_NAPATECH
TmEcode NoNapatechSupportExit(ThreadVars *, const void *, void **);
-
-void TmModuleNapatechStreamRegister (void)
-{
+void TmModuleNapatechStreamRegister(void) {
tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream";
tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NoNapatechSupportExit;
tmm_modules[TMM_RECEIVENAPATECH].Func = NULL;
tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_ADMIN;
}
-void TmModuleNapatechDecodeRegister (void)
-{
+void TmModuleNapatechDecodeRegister(void) {
tmm_modules[TMM_DECODENAPATECH].name = "NapatechDecode";
tmm_modules[TMM_DECODENAPATECH].ThreadInit = NoNapatechSupportExit;
tmm_modules[TMM_DECODENAPATECH].Func = NULL;
tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM;
}
-TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data)
-{
+TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data) {
SCLogError(SC_ERR_NAPATECH_NOSUPPORT,
"Error creating thread %s: you do not have support for Napatech adapter "
"enabled please recompile with --enable-napatech", tv->name);
#else /* Implied we do have NAPATECH support */
+
#include <nt.h>
+#define MAX_STREAMS 256
+
extern int max_pending_packets;
typedef struct NapatechThreadVars_ {
ThreadVars *tv;
NtNetStreamRx_t rx_stream;
- uint64_t stream_id;
+ uint16_t stream_id;
int hba;
- uint64_t pkts;
- uint64_t drops;
- uint64_t bytes;
-
TmSlot *slot;
} NapatechThreadVars;
TmEcode NapatechStreamThreadInit(ThreadVars *, const void *, void **);
void NapatechStreamThreadExitStats(ThreadVars *, void *);
-TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot);
+TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot);
TmEcode NapatechDecodeThreadInit(ThreadVars *, const void *, void **);
TmEcode NapatechDecodeThreadDeinit(ThreadVars *tv, void *data);
TmEcode NapatechDecode(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+/* These are used as the threads are exiting to get a comprehensive count of
+ * all the packets received and dropped.
+ */
+SC_ATOMIC_DECLARE(uint64_t, total_packets);
+SC_ATOMIC_DECLARE(uint64_t, total_drops);
+SC_ATOMIC_DECLARE(uint16_t, total_tallied);
+
/**
* \brief Register the Napatech receiver (reader) module.
*/
tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream";
tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NapatechStreamThreadInit;
tmm_modules[TMM_RECEIVENAPATECH].Func = NULL;
- tmm_modules[TMM_RECEIVENAPATECH].PktAcqLoop = NapatechStreamLoop;
+ tmm_modules[TMM_RECEIVENAPATECH].PktAcqLoop = NapatechPacketLoopZC;
tmm_modules[TMM_RECEIVENAPATECH].PktAcqBreakLoop = NULL;
tmm_modules[TMM_RECEIVENAPATECH].ThreadExitPrintStats = NapatechStreamThreadExitStats;
tmm_modules[TMM_RECEIVENAPATECH].ThreadDeinit = NapatechStreamThreadDeinit;
tmm_modules[TMM_RECEIVENAPATECH].RegisterTests = NULL;
tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_RAW;
tmm_modules[TMM_RECEIVENAPATECH].flags = TM_FLAG_RECEIVE_TM;
+
+ SC_ATOMIC_INIT(total_packets);
+ SC_ATOMIC_INIT(total_drops);
+ SC_ATOMIC_INIT(total_tallied);
}
/**
tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM;
}
+/*
+ *-----------------------------------------------------------------------------
+ *-----------------------------------------------------------------------------
+ * Statistics code
+ *-----------------------------------------------------------------------------
+*/
+
/**
* \brief Initialize the Napatech receiver thread, generate a single
* NapatechThreadVar structure for each thread, this will
TmEcode NapatechStreamThreadInit(ThreadVars *tv, const void *initdata, void **data)
{
SCEnter();
- struct NapatechStreamDevConf *conf = (struct NapatechStreamDevConf *)initdata;
- uintmax_t stream_id = conf->stream_id;
+ struct NapatechStreamDevConf *conf = (struct NapatechStreamDevConf *) initdata;
+ uint16_t stream_id = conf->stream_id;
*data = NULL;
- SCLogInfo("Napatech Thread Stream ID:%lu", stream_id);
-
- NapatechThreadVars *ntv = SCMalloc(sizeof(NapatechThreadVars));
+ NapatechThreadVars *ntv = SCCalloc(1, sizeof (NapatechThreadVars));
if (unlikely(ntv == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH thread vars.");
exit(EXIT_FAILURE);
ntv->stream_id = stream_id;
ntv->tv = tv;
ntv->hba = conf->hba;
+ SCLogDebug("Started processing packets from NAPATECH Stream: %lu", ntv->stream_id);
- SCLogInfo("Started processing packets from NAPATECH Stream: %lu", ntv->stream_id);
-
- *data = (void *)ntv;
-
+ *data = (void *) ntv;
SCReturnInt(TM_ECODE_OK);
}
-/**
- * \brief Main Napatech reading Loop function
- */
-TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
+static PacketQueue packets_to_release[MAX_STREAMS];
+
+static void NapatechReleasePacket(struct Packet_ *p)
{
- SCEnter();
+ PacketFreeOrRelease(p);
+ PacketEnqueue(&packets_to_release[p->ntpv.stream_id], p);
+}
+TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
+{
int32_t status;
- char errbuf[100];
+ char error_buffer[100];
uint64_t pkt_ts;
NtNetBuf_t packet_buffer;
- NapatechThreadVars *ntv = (NapatechThreadVars *)data;
- NtNetRx_t stat_cmd;
-
- SCLogInfo("Opening NAPATECH Stream: %lu for processing", ntv->stream_id);
+ NapatechThreadVars *ntv = (NapatechThreadVars *) data;
+ uint64_t hba_pkt_drops = 0;
+ uint64_t hba_byte_drops = 0;
+ uint16_t hba_pkt = 0;
+
+ /* This just keeps the startup output more orderly. */
+ usleep(200000 * ntv->stream_id);
+
+ if (ntv->hba > 0) {
+ char *s_hbad_pkt = SCCalloc(1, 32);
+ if (unlikely(s_hbad_pkt == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+ exit(EXIT_FAILURE);
+ }
+ snprintf(s_hbad_pkt, 32, "nt%d.hba_drop", ntv->stream_id);
+ hba_pkt = StatsRegisterCounter(s_hbad_pkt, tv);
+ StatsSetupPrivate(tv);
+ StatsSetUI64(tv, hba_pkt, 0);
+ }
+ SCLogDebug("Opening NAPATECH Stream: %lu for processing", ntv->stream_id);
if ((status = NT_NetRxOpen(&(ntv->rx_stream), "SuricataStream", NT_NET_INTERFACE_PACKET, ntv->stream_id, ntv->hba)) != NT_SUCCESS) {
- NT_ExplainError(status, errbuf, sizeof(errbuf));
- SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %lu - %s", ntv->stream_id, errbuf);
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %u - %s", ntv->stream_id, error_buffer);
SCFree(ntv);
SCReturnInt(TM_ECODE_FAILED);
}
- stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP;
+#if defined(__linux__)
+ SCLogInfo("Napatech Packet Loop Started - cpu: %3d, stream: %3u (numa: %u)",
+ sched_getcpu(), ntv->stream_id, NapatechGetNumaNode(ntv->stream_id));
+#else
+ SCLogInfo("Napatech Packet Loop Started - stream: %lu ", ntv->stream_id);
+#endif
- SCLogInfo("Napatech Packet Stream Loop Started for Stream ID: %lu", ntv->stream_id);
-
- TmSlot *s = (TmSlot *)slot;
+ TmSlot *s = (TmSlot *) slot;
ntv->slot = s->slot_next;
while (!(suricata_ctl_flags & SURICATA_STOP)) {
* us from alloc'ing packets at line rate */
PacketPoolWait();
- /*
- * Napatech returns packets 1 at a time
- */
+ /* Napatech returns packets 1 at a time */
status = NT_NetRxGet(ntv->rx_stream, &packet_buffer, 1000);
if (unlikely(status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN)) {
- /*
- * no frames currently available
- */
continue;
} else if (unlikely(status != NT_SUCCESS)) {
- SCLogError(SC_ERR_NAPATECH_STREAM_NEXT_FAILED,
- "Failed to read from Napatech Stream: %lu",
- ntv->stream_id);
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+
+ SCLogInfo("Failed to read from Napatech Stream%d: %s",
+ ntv->stream_id, error_buffer);
SCReturnInt(TM_ECODE_FAILED);
}
* Handle the different timestamp forms that the napatech cards could use
* - NT_TIMESTAMP_TYPE_NATIVE is not supported due to having an base of 0 as opposed to NATIVE_UNIX which has a base of 1/1/1970
*/
- switch(NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) {
+ switch (NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) {
case NT_TIMESTAMP_TYPE_NATIVE_UNIX:
p->ts.tv_sec = pkt_ts / 100000000;
p->ts.tv_usec = ((pkt_ts % 100000000) / 100) + (pkt_ts % 100) > 50 ? 1 : 0;
break;
default:
SCLogError(SC_ERR_NAPATECH_TIMESTAMP_TYPE_NOT_SUPPORTED,
- "Packet from Napatech Stream: %lu does not have a supported timestamp format",
- ntv->stream_id);
+ "Packet from Napatech Stream: %u does not have a supported timestamp format",
+ ntv->stream_id);
NT_NetRxRelease(ntv->rx_stream, packet_buffer);
SCReturnInt(TM_ECODE_FAILED);
}
- SCLogDebug("p->ts.tv_sec %"PRIuMAX"", (uintmax_t)p->ts.tv_sec);
- p->datalink = LINKTYPE_ETHERNET;
-
- ntv->pkts++;
- ntv->bytes += NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer);
-
- // Update drop counter
- if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS))
- {
- NT_ExplainError(status, errbuf, sizeof(errbuf));
- SCLogWarning(SC_ERR_NAPATECH_STAT_DROPS_FAILED, "Couldn't retrieve drop statistics from the RX stream: %lu - %s", ntv->stream_id, errbuf);
- }
- else
- {
- ntv->drops += stat_cmd.u.streamDrop.pktsDropped;
+ if (unlikely(ntv->hba > 0)) {
+ NtNetRx_t stat_cmd;
+ stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP;
+ // Update drop counter
+ if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS)) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogInfo("Couldn't retrieve drop statistics from the RX stream: %u - %s",
+ ntv->stream_id, error_buffer);
+ } else {
+ hba_pkt_drops = stat_cmd.u.streamDrop.pktsDropped;
+
+ StatsSetUI64(tv, hba_pkt, hba_pkt_drops);
+ }
+ StatsSyncCountersIfSignalled(tv);
}
- if (unlikely(PacketCopyData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) {
+ p->ReleasePacket = NapatechReleasePacket;
+ p->ntpv.nt_packet_buf = packet_buffer;
+ p->ntpv.stream_id = ntv->stream_id;
+ p->datalink = LINKTYPE_ETHERNET;
+
+ if (unlikely(PacketSetData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) {
TmqhOutputPacketpool(ntv->tv, p);
NT_NetRxRelease(ntv->rx_stream, packet_buffer);
SCReturnInt(TM_ECODE_FAILED);
SCReturnInt(TM_ECODE_FAILED);
}
- NT_NetRxRelease(ntv->rx_stream, packet_buffer);
+ /* Release any packets that were returned by the callback function */
+ Packet *rel_pkt = PacketDequeue(&packets_to_release[ntv->stream_id]);
+ while (rel_pkt != NULL) {
+ NT_NetRxRelease(ntv->rx_stream, rel_pkt->ntpv.nt_packet_buf);
+ rel_pkt = PacketDequeue(&packets_to_release[ntv->stream_id]);
+ }
StatsSyncCountersIfSignalled(tv);
}
+ if (unlikely(ntv->hba > 0)) {
+ SCLogInfo("Host Buffer Allowance Drops - pkts: %ld, bytes: %ld", hba_pkt_drops, hba_byte_drops);
+ }
+
SCReturnInt(TM_ECODE_OK);
}
+
/**
* \brief Print some stats to the log at program exit.
*
*/
void NapatechStreamThreadExitStats(ThreadVars *tv, void *data)
{
- NapatechThreadVars *ntv = (NapatechThreadVars *)data;
- double percent = 0;
- if (ntv->drops > 0)
- percent = (((double) ntv->drops) / (ntv->pkts+ntv->drops)) * 100;
+ NapatechThreadVars *ntv = (NapatechThreadVars *) data;
+ NapatechCurrentStats stat = NapatechGetCurrentStats(ntv->stream_id);
- SCLogNotice("Stream: %lu; Packets: %"PRIu64"; Drops: %"PRIu64" (%5.2f%%); Bytes: %"PRIu64, ntv->stream_id, ntv->pkts, ntv->drops, percent, ntv->bytes);
+ double percent = 0;
+ if (stat.current_drops > 0)
+ percent = (((double) stat.current_drops)
+ / (stat.current_packets + stat.current_drops)) * 100;
+
+ SCLogInfo("nt%lu - pkts: %lu; drop: %lu (%5.2f%%); bytes: %lu",
+ (uint64_t) ntv->stream_id, stat.current_packets,
+ stat.current_drops, percent, stat.current_bytes);
+
+ SC_ATOMIC_ADD(total_packets, stat.current_packets);
+ SC_ATOMIC_ADD(total_drops, stat.current_drops);
+ SC_ATOMIC_ADD(total_tallied, 1);
+
+ if (SC_ATOMIC_GET(total_tallied) == GetNumConfiguredStreams()) {
+ if (SC_ATOMIC_GET(total_drops) > 0)
+ percent = (((double) SC_ATOMIC_GET(total_drops)) / (SC_ATOMIC_GET(total_packets)
+ + SC_ATOMIC_GET(total_drops))) * 100;
+
+ SCLogInfo(" ");
+ SCLogInfo("--- Total Packets: %ld Total Dropped: %ld (%5.2f%%)",
+ SC_ATOMIC_GET(total_packets), SC_ATOMIC_GET(total_drops), percent);
+ }
}
/**
TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data)
{
SCEnter();
- NapatechThreadVars *ntv = (NapatechThreadVars *)data;
+ NapatechThreadVars *ntv = (NapatechThreadVars *) data;
SCLogDebug("Closing Napatech Stream: %d", ntv->stream_id);
NT_NetRxClose(ntv->rx_stream);
SCReturnInt(TM_ECODE_OK);
}
-
/** Decode Napatech */
/**
{
SCEnter();
- DecodeThreadVars *dtv = (DecodeThreadVars *)data;
+ DecodeThreadVars *dtv = (DecodeThreadVars *) data;
/* XXX HACK: flow timeout can call us for injected pseudo packets
* see bug: https://redmine.openinfosecfoundation.org/issues/1107 */
if (p->flags & PKT_PSEUDO_STREAM_END)
return TM_ECODE_OK;
- /* update counters */
+ // update counters
DecodeUpdatePacketCounters(tv, dtv, p);
switch (p->datalink) {
}
PacketDecodeFinalize(tv, dtv, p);
-
SCReturnInt(TM_ECODE_OK);
}
{
SCEnter();
DecodeThreadVars *dtv = NULL;
-
dtv = DecodeThreadVarsAlloc(tv);
-
- if(dtv == NULL)
+ if (dtv == NULL)
SCReturnInt(TM_ECODE_FAILED);
DecodeRegisterPerfCounters(dtv, tv);
-
- *data = (void *)dtv;
-
+ *data = (void *) dtv;
SCReturnInt(TM_ECODE_OK);
}
{
if (data != NULL)
DecodeThreadVarsFree(tv, data);
- SCReturnInt(TM_ECODE_OK);
-}
+ SCReturnInt(TM_ECODE_OK); }
#endif /* HAVE_NAPATECH */
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2017 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
/**
* \file
*
- * \author nPulse Technologies, LLC
+ * \author nPulse Technologies, LLC.
* \author Matt Keeler <mk@npulsetech.com>
*/
-
#ifndef __SOURCE_NAPATECH_H__
#define __SOURCE_NAPATECH_H__
TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data);
void TmModuleNapatechDecodeRegister (void);
+#ifdef HAVE_NAPATECH
+#include <nt.h>
+
struct NapatechStreamDevConf
{
- int stream_id;
+ uint16_t stream_id;
intmax_t hba;
};
-#ifdef HAVE_NAPATECH
-
-#include <nt.h>
-
-#endif
-
+#endif /* HAVE_NAPATECH */
#endif /* __SOURCE_NAPATECH_H__ */
--- /dev/null
+/* Copyright (C) 2017 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Napatech Inc.
+ * \author Phil Young <py@napatech.com>
+ *
+ *
+ */
+
+
+#include "suricata-common.h"
+#ifdef HAVE_NAPATECH
+#include "suricata.h"
+#include "threadvars.h"
+#include "tm-threads.h"
+
+
+uint16_t NapatechGetNumaNode(uint16_t stream_id)
+{
+ int status;
+ char buffer[80]; // Error buffer
+ NtInfoStream_t info_stream;
+ NtInfo_t info;
+ uint16_t numa_node;
+
+ if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
+ NT_ExplainError(status, buffer, sizeof (buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ // Read the info on this specific stream
+ info.cmd = NT_INFO_CMD_READ_STREAMID;
+ info.u.streamID.streamId = stream_id;
+ if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
+ NT_ExplainError(status, buffer, sizeof (buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ numa_node = info.u.streamID.data.numaNode;
+
+ if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
+ NT_ExplainError(status, buffer, sizeof (buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", buffer);
+ exit(EXIT_FAILURE);
+ }
+ return numa_node;
+}
+
+
+/*-----------------------------------------------------------------------------
+ *-----------------------------------------------------------------------------
+ * Statistics code
+ *-----------------------------------------------------------------------------
+*/
+
+typedef struct StreamCounters_ {
+ uint16_t pkts;
+ uint16_t byte;
+ uint16_t drop;
+} StreamCounters;
+
+
+NapatechCurrentStats current_stats[MAX_STREAMS];
+
+NapatechCurrentStats NapatechGetCurrentStats(uint16_t id)
+{
+
+ return current_stats[id];
+}
+
+
+static uint16_t TestStreamConfig(
+ NtInfoStream_t hInfo,
+ NtStatStream_t hStatStream,
+ NapatechStreamConfig stream_config[],
+ uint16_t num_inst)
+{
+
+ uint16_t num_active = 0;
+
+ for (uint16_t inst = 0; inst < num_inst; ++inst) {
+ int status;
+ char buffer[80]; // Error buffer
+ NtStatistics_t stat; // Stat handle.
+
+ /* Check to see if it is an active stream */
+ memset(&stat, 0, sizeof (NtStatistics_t));
+
+ /* Read usage data for the chosen stream ID */
+ stat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+ stat.u.usageData_v0.streamid = (uint8_t) stream_config[inst].stream_id;
+
+ if ((status = NT_StatRead(hStatStream, &stat)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, buffer, sizeof (buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead():2 failed: %s\n", buffer);
+ return 0;
+ }
+
+ if (stat.u.usageData_v0.data.numHostBufferUsed > 0) {
+ stream_config[inst].is_active = true;
+ num_active++;
+ } else {
+ stream_config[inst].is_active = false;
+ }
+ }
+
+ return num_active;
+}
+
+static uint32_t UpdateStreamStats(ThreadVars *tv,
+ NtInfoStream_t hInfo,
+ NtStatStream_t hStatStream,
+ uint16_t num_streams,
+ NapatechStreamConfig stream_config[],
+ StreamCounters streamCounters[]
+ )
+{
+ static uint64_t rxPktsStart[MAX_STREAMS] = {0};
+ static uint64_t rxByteStart[MAX_STREAMS] = {0};
+ static uint64_t dropStart[MAX_STREAMS] = {0};
+
+ int status;
+ char error_buffer[80]; // Error buffer
+ NtInfo_t hStreamInfo;
+ NtStatistics_t hStat; // Stat handle.
+
+ /* Query the system to get the number of streams currently instantiated */
+ hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
+ if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ uint16_t num_active;
+ if ((num_active = TestStreamConfig(hInfo, hStatStream, stream_config, num_streams)) == 0) {
+ /* None of the configured streams are active */
+ return 0;
+ }
+
+ /* At least one stream is active so proceed with the stats. */
+ uint16_t inst_id = 0;
+ uint32_t stream_cnt = 0;
+ for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
+
+
+ while(inst_id < num_streams) {
+ if (stream_config[inst_id].is_active) {
+ break;
+ } else {
+ ++inst_id;
+ }
+ }
+ if (inst_id == num_streams)
+ break;
+
+ /* Read usage data for the chosen stream ID */
+ memset(&hStat, 0, sizeof (NtStatistics_t));
+ hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+ hStat.u.usageData_v0.streamid = (uint8_t) stream_config[inst_id].stream_id;
+
+ if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+ return 0;
+ }
+
+ uint16_t stream_id = stream_config[inst_id].stream_id;
+ if (stream_config[inst_id].is_active) {
+ uint64_t rxPktsTotal = 0;
+ uint64_t rxByteTotal = 0;
+ uint64_t dropTotal = 0;
+
+ for (uint32_t hbCount = 0; hbCount < hStat.u.usageData_v0.data.numHostBufferUsed; hbCount++) {
+ if (unlikely(stream_config[inst_id].initialized == false)) {
+ rxPktsStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames;
+ rxByteStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes;
+ dropStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames;
+ stream_config[inst_id].initialized = true;
+ } else {
+ rxPktsTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames;
+ rxByteTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes;
+ dropTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames;
+ }
+ }
+
+ current_stats[stream_id].current_packets = rxPktsTotal - rxPktsStart[stream_id];
+ current_stats[stream_id].current_bytes = rxByteTotal - rxByteStart[stream_id];
+ current_stats[stream_id].current_drops = dropTotal - dropStart[stream_id];
+ }
+
+ StatsSetUI64(tv, streamCounters[inst_id].pkts, current_stats[stream_id].current_packets);
+ StatsSetUI64(tv, streamCounters[inst_id].byte, current_stats[stream_id].current_bytes);
+ StatsSetUI64(tv, streamCounters[inst_id].drop, current_stats[stream_id].current_drops);
+
+ ++inst_id;
+ }
+ return num_active;
+}
+
+static void *NapatechStatsLoop(void *arg)
+{
+ ThreadVars *tv = (ThreadVars *) arg;
+
+ int status;
+ char error_buffer[80]; // Error buffer
+ NtInfoStream_t hInfo;
+ NtStatStream_t hStatStream;
+
+ NapatechStreamConfig stream_config[MAX_STREAMS];
+ uint16_t stream_cnt = NapatechGetStreamConfig(stream_config);
+
+ /* Open the info and Statistics */
+ if ((status = NT_InfoOpen(&hInfo, "StatsLoopInfoStream")) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_RUNMODE, "NT_InfoOpen() failed: %s\n", error_buffer);
+ return NULL;
+ }
+
+ if ((status = NT_StatOpen(&hStatStream, "StatsLoopStatsStream")) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
+ return NULL;
+ }
+
+ StreamCounters streamCounters[MAX_STREAMS];
+ for (int i = 0; i < stream_cnt; ++i) {
+ char *pkts_buf = SCCalloc(1, 32);
+ if (unlikely(pkts_buf == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+ exit(EXIT_FAILURE);
+ }
+
+ snprintf(pkts_buf, 32, "nt%d.pkts", stream_config[i].stream_id);
+ streamCounters[i].pkts = StatsRegisterCounter(pkts_buf, tv);
+
+ char *byte_buf = SCCalloc(1, 32);
+ if (unlikely(byte_buf == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+ exit(EXIT_FAILURE);
+ }
+ snprintf(byte_buf, 32, "nt%d.bytes", stream_config[i].stream_id);
+ streamCounters[i].byte = StatsRegisterCounter(byte_buf, tv);
+
+ char *drop_buf = SCCalloc(1, 32);
+ if (unlikely(drop_buf == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
+ exit(EXIT_FAILURE);
+ }
+ snprintf(drop_buf, 32, "nt%d.drop", stream_config[i].stream_id);
+ streamCounters[i].drop = StatsRegisterCounter(drop_buf, tv);
+ }
+
+ StatsSetupPrivate(tv);
+
+ for (int i = 0; i < stream_cnt; ++i) {
+ StatsSetUI64(tv, streamCounters[i].pkts, 0);
+ StatsSetUI64(tv, streamCounters[i].byte, 0);
+ StatsSetUI64(tv, streamCounters[i].drop, 0);
+ }
+
+ uint32_t num_active = UpdateStreamStats(tv, hInfo, hStatStream,
+ stream_cnt, stream_config, streamCounters);
+
+ if (num_active < stream_cnt) {
+ SCLogInfo("num_active: %d, stream_cnt: %d", num_active, stream_cnt);
+ SCLogWarning(SC_ERR_NAPATECH_CONFIG_STREAM,
+ "Some or all of the configured streams are not created. Proceeding with active streams.");
+ }
+
+ TmThreadsSetFlag(tv, THV_INIT_DONE);
+ while (1) {
+ if (TmThreadsCheckFlag(tv, THV_KILL)) {
+ SCLogDebug("NapatechStatsLoop THV_KILL detected");
+ break;
+ }
+
+ UpdateStreamStats(tv, hInfo, hStatStream,
+ stream_cnt, stream_config, streamCounters);
+
+ StatsSyncCountersIfSignalled(tv);
+ usleep(1000000);
+ }
+
+ /* CLEAN UP NT Resources and Close the info stream */
+ if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_RUNMODE, "NT_InfoClose() failed: %s\n", error_buffer);
+ return NULL;
+ }
+
+ /* Close the statistics stream */
+ if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
+ return NULL;
+ }
+
+
+ SCLogDebug("Exiting NapatechStatsLoop");
+ TmThreadsSetFlag(tv, THV_RUNNING_DONE);
+ TmThreadWaitForFlag(tv, THV_DEINIT);
+ TmThreadsSetFlag(tv, THV_CLOSED);
+
+ return NULL;
+}
+
+#define MAX_HOSTBUFFER 4
+#define MAX_STREAMS 256
+#define HB_HIGHWATER 2048 //1982
+
+static bool RegisteredStream(uint16_t stream_id, uint16_t num_registered,
+ NapatechStreamConfig registered_streams[])
+{
+ for (uint16_t reg_id = 0; reg_id < num_registered; ++reg_id) {
+ if (stream_id == registered_streams[reg_id].stream_id) {
+ return true;
+ }
+ }
+ return false;
+}
+
+uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
+{
+ int status;
+ char error_buffer[80]; // Error buffer
+ NtStatStream_t hStatStream;
+ NtStatistics_t hStat; // Stat handle.
+ NtInfoStream_t info_stream;
+ NtInfo_t info;
+ uint16_t instance_cnt = 0;
+ int use_all_streams;
+ ConfNode *ntstreams;
+
+ for (uint16_t i = 0; i < MAX_STREAMS; ++i) {
+ stream_config[i].stream_id = 0;
+ stream_config[i].is_active = false;
+ stream_config[i].initialized = false;
+ }
+
+ if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
+ SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf");
+ exit(EXIT_FAILURE);
+ }
+
+ if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ if (use_all_streams) {
+ info.cmd = NT_INFO_CMD_READ_STREAM;
+ if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ uint16_t stream_id = 0;
+ while (instance_cnt < info.u.stream.data.count) {
+
+ /*
+ * For each stream ID query the number of host-buffers used by
+ * the stream. If zero, then that streamID is not used; skip
+ * over it and continue until we get a streamID with a non-zero
+ * count of the host-buffers.
+ */
+ memset(&hStat, 0, sizeof (NtStatistics_t));
+
+ /* Read usage data for the chosen stream ID */
+ hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+ hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
+
+ if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+ return 0;
+ }
+
+ if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
+ ++stream_id;
+ continue;
+ }
+
+ /* if we get here it is an active stream */
+ stream_config[instance_cnt].stream_id = stream_id++;
+ stream_config[instance_cnt].is_active = true;
+ instance_cnt++;
+ }
+
+ } else {
+ /* When not using the default streams we need to parse the array of streams from the conf */
+ if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) {
+ SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf");
+ exit(EXIT_FAILURE);
+ }
+
+ /* Loop through all stream numbers in the array and register the devices */
+ ConfNode *stream;
+ instance_cnt = 0;
+
+ TAILQ_FOREACH(stream, &ntstreams->head, next) {
+ uint16_t stream_id = 0;
+
+ if (stream == NULL) {
+ SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration");
+ exit(EXIT_FAILURE);
+ }
+
+ uint16_t start, end;
+ if (strchr(stream->val, '-')) {
+ char copystr[16];
+ strlcpy(copystr, stream->val, 16);
+
+ start = atoi(copystr);
+ end = atoi(strchr(copystr, '-')+1);
+ } else {
+ stream_config[instance_cnt].stream_id = atoi(stream->val);
+ start = stream_config[instance_cnt].stream_id;
+ end = stream_config[instance_cnt].stream_id;
+ }
+ SCLogInfo("%s start: %d end: %d", stream->val, start, end);
+
+ for (stream_id = start; stream_id <= end; ++stream_id) {
+ /* if we get here it is configured in the .yaml file */
+ stream_config[instance_cnt].stream_id = stream_id;
+
+ /* Check to see if it is an active stream */
+ memset(&hStat, 0, sizeof (NtStatistics_t));
+
+ /* Read usage data for the chosen stream ID */
+ hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+ hStat.u.usageData_v0.streamid = (uint8_t) stream_config[instance_cnt].stream_id;
+
+ if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+ return 0;
+ }
+
+ if (hStat.u.usageData_v0.data.numHostBufferUsed > 0) {
+ stream_config[instance_cnt].is_active = true;
+ }
+ instance_cnt++;
+ }
+ }
+ }
+
+ /* Close the statistics stream */
+ if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
+ "NT_InfoClose failed: %s", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ return instance_cnt;
+}
+
+
+static void *NapatechBufMonitorLoop(void *arg)
+{
+ ThreadVars *tv = (ThreadVars *) arg;
+
+ NtInfo_t hStreamInfo;
+ NtStatistics_t hStat; // Stat handle.
+ NtInfoStream_t hInfo;
+ NtStatStream_t hStatStream;
+
+ char error_buffer[NT_ERRBUF_SIZE]; // Error buffer
+ int status; // Status variable
+
+ const uint32_t alertInterval = 25;
+
+ uint32_t OB_fill_level[MAX_STREAMS] = {0};
+ uint32_t OB_alert_level[MAX_STREAMS] = {0};
+ uint32_t ave_OB_fill_level[MAX_STREAMS] = {0};
+
+ uint32_t HB_fill_level[MAX_STREAMS] = {0};
+ uint32_t HB_alert_level[MAX_STREAMS] = {0};
+ uint32_t ave_HB_fill_level[MAX_STREAMS] = {0};
+
+ /* Open the info and Statistics */
+ if ((status = NT_InfoOpen(&hInfo, "InfoStream")) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoOpen() failed: %s\n", error_buffer);
+ exit(1);
+ }
+
+ if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatOpen() failed: %s\n", error_buffer);
+ exit(1);
+ }
+
+ /* Read the info on all streams instantiated in the system */
+ hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
+ if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ NapatechStreamConfig registered_streams[MAX_STREAMS];
+ uint16_t num_registered = NapatechGetStreamConfig(registered_streams);
+
+ TmThreadsSetFlag(tv, THV_INIT_DONE);
+ while (1) {
+ if (TmThreadsCheckFlag(tv, THV_KILL)) {
+ SCLogDebug("NapatechBufMonitorLoop THV_KILL detected");
+ break;
+ }
+
+ usleep(200000);
+
+ /* Read the info on all streams instantiated in the system */
+ hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
+ if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
+ exit(EXIT_FAILURE);
+ }
+
+ char pktCntStr[4096];
+ memset(pktCntStr, 0, sizeof (pktCntStr));
+
+ uint32_t stream_id = 0;
+ uint32_t stream_cnt = 0;
+ uint32_t num_streams = hStreamInfo.u.stream.data.count;
+
+ for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
+
+ do {
+
+ /* Read usage data for the chosen stream ID */
+ hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
+ hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
+
+ if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
+ exit(1);
+ }
+
+ if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
+ ++stream_id;
+ continue;
+ }
+ } while (hStat.u.usageData_v0.data.numHostBufferUsed == 0);
+
+ if (RegisteredStream(stream_id, num_registered, registered_streams)) {
+ ave_OB_fill_level[stream_id] = 0;
+ ave_HB_fill_level[stream_id] = 0;
+
+ for (uint32_t hb_count = 0; hb_count < hStat.u.usageData_v0.data.numHostBufferUsed; hb_count++) {
+
+ OB_fill_level[hb_count] =
+ ((100 * hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.used) /
+ hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.size);
+
+ if (OB_fill_level[hb_count] > 100) {
+ OB_fill_level[hb_count] = 100;
+ }
+
+ uint32_t bufSize = hStat.u.usageData_v0.data.hb[hb_count].enQueuedAdapter / 1024
+ + hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024
+ + hStat.u.usageData_v0.data.hb[hb_count].enQueued / 1024
+ - HB_HIGHWATER;
+
+ HB_fill_level[hb_count] = (uint32_t)
+ ((100 * hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024) /
+ bufSize);
+
+ ave_OB_fill_level[stream_id] += OB_fill_level[hb_count];
+ ave_HB_fill_level[stream_id] += HB_fill_level[hb_count];
+ }
+
+ ave_OB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed;
+ ave_HB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed;
+
+ /* Host Buffer Fill Level warnings... */
+ if (ave_HB_fill_level[stream_id] >= (HB_alert_level[stream_id] + alertInterval)) {
+
+ while (ave_HB_fill_level[stream_id] >= HB_alert_level[stream_id] + alertInterval) {
+ HB_alert_level[stream_id] += alertInterval;
+ }
+ SCLogInfo("nt%d - Increasing Host Buffer Fill Level : %4d%%",
+ stream_id, ave_HB_fill_level[stream_id]);
+ }
+
+ if (HB_alert_level[stream_id] > 0) {
+ if ((ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval))) {
+ SCLogInfo("nt%d - Decreasing Host Buffer Fill Level: %4d%%",
+ stream_id, ave_HB_fill_level[stream_id]);
+
+ while (ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval)) {
+ if ((HB_alert_level[stream_id]) > 0) {
+ HB_alert_level[stream_id] -= alertInterval;
+ } else break;
+ }
+ }
+ }
+
+ /* On Board SDRAM Fill Level warnings... */
+ if (ave_OB_fill_level[stream_id] >= (OB_alert_level[stream_id] + alertInterval)) {
+ while (ave_OB_fill_level[stream_id] >= OB_alert_level[stream_id] + alertInterval) {
+ OB_alert_level[stream_id] += alertInterval;
+
+ }
+ SCLogInfo("nt%d - Increasing Adapter SDRAM Fill Level: %4d%%",
+ stream_id, ave_OB_fill_level[stream_id]);
+ }
+
+ if (OB_alert_level[stream_id] > 0) {
+ if ((ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval))) {
+ SCLogInfo("nt%d - Decreasing Adapter SDRAM Fill Level : %4d%%",
+ stream_id, ave_OB_fill_level[stream_id]);
+
+ while (ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval)) {
+ if ((OB_alert_level[stream_id]) > 0) {
+ OB_alert_level[stream_id] -= alertInterval;
+ } else break;
+ }
+ }
+ }
+ }
+ ++stream_id;
+ }
+ }
+
+ if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoClose() failed: %s\n", error_buffer);
+ exit(1);
+ }
+
+ /* Close the statistics stream */
+ if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
+ /* Get the status code as text */
+ NT_ExplainError(status, error_buffer, sizeof (error_buffer));
+ SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatClose() failed: %s\n", error_buffer);
+ exit(1);
+ }
+
+ SCLogDebug("Exiting NapatechStatsLoop");
+ TmThreadsSetFlag(tv, THV_RUNNING_DONE);
+ TmThreadWaitForFlag(tv, THV_DEINIT);
+ TmThreadsSetFlag(tv, THV_CLOSED);
+
+ return NULL;
+}
+
+void NapatechStartStats(void)
+{
+ /* Creates the Statistic threads */
+ ThreadVars *stats_tv = TmThreadCreate("NapatechStats",
+ NULL, NULL,
+ NULL, NULL,
+ "custom", NapatechStatsLoop, 0);
+
+ if (stats_tv == NULL) {
+ SCLogError(SC_ERR_THREAD_CREATE,
+ "Error creating a thread for NapatechStats - Killing engine.");
+ exit(EXIT_FAILURE);
+ }
+
+ if (TmThreadSpawn(stats_tv) != 0) {
+ SCLogError(SC_ERR_THREAD_SPAWN,
+ "Failed to spawn thread for NapatechStats - Killing engine.");
+ exit(EXIT_FAILURE);
+ }
+
+ ThreadVars *buf_monitor_tv = TmThreadCreate("NapatechBufMonitor",
+ NULL, NULL,
+ NULL, NULL,
+ "custom", NapatechBufMonitorLoop, 0);
+
+ if (buf_monitor_tv == NULL) {
+ SCLogError(SC_ERR_THREAD_CREATE,
+ "Error creating a thread for NapatechBufMonitor - Killing engine.");
+ exit(EXIT_FAILURE);
+ }
+
+ if (TmThreadSpawn(buf_monitor_tv) != 0) {
+ SCLogError(SC_ERR_THREAD_SPAWN,
+ "Failed to spawn thread for NapatechBufMonitor - Killing engine.");
+ exit(EXIT_FAILURE);
+ }
+
+
+ return;
+}
+
+#endif // HAVE_NAPATECH
--- /dev/null
+/* Copyright (C) 2017 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Phil Young <py@napatech.com>
+ *
+ */
+
+#ifndef __UTIL_NAPATECH_H__
+#define __UTIL_NAPATECH_H__
+
+#ifdef HAVE_NAPATECH
+#include <nt.h>
+
+typedef struct NapatechPacketVars_
+{
+ uint64_t stream_id;
+ NtNetBuf_t nt_packet_buf;
+ ThreadVars *tv;
+} NapatechPacketVars;
+
+
+typedef struct NapatechStreamConfig_
+{
+ uint16_t stream_id;
+ bool is_active;
+ bool initialized;
+} NapatechStreamConfig;
+
+typedef struct NapatechCurrentStats_ {
+ uint64_t current_packets;
+ uint64_t current_drops;
+ uint64_t current_bytes;
+} NapatechCurrentStats;
+
+#define MAX_STREAMS 256
+
+extern void NapatechStartStats(void);
+uint16_t NapatechGetNumaNode(uint16_t stream_id);
+NapatechCurrentStats NapatechGetCurrentStats(uint16_t id);
+uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[]);
+
+#endif //HAVE_NAPATECH
+#endif /* __UTIL_NAPATECH_H__ */