Remove auto mode due to bad performance.
#include "util-time.h"
#include "util-cpu.h"
#include "util-affinity.h"
+#include "util-runmodes.h"
static const char *default_mode;
+static int DagConfigGetThreadCount(void *conf)
+{
+ return 1;
+}
+
+static void *ParseDagConfig(const char *iface)
+{
+ return (void *)iface;
+}
+
const char *RunModeErfDagGetDefaultMode(void)
{
return default_mode;
void RunModeErfDagRegister(void)
{
- default_mode = "auto";
- RunModeRegisterNewRunMode(RUNMODE_DAG, "auto",
- "Multi threaded Erf dag mode",
- RunModeErfDagAuto);
+ default_mode = "autofp";
+
+ RunModeRegisterNewRunMode(RUNMODE_DAG, "autofp",
+ "Multi threaded DAG mode. Packets from "
+ "each flow are assigned to a single detect "
+ "thread, unlike \"dag_auto\" where packets "
+ "from the same flow can be processed by any "
+ "detect thread",
+ RunModeIdsErfDagAutoFp);
+
+ RunModeRegisterNewRunMode(RUNMODE_DAG, "single",
+ "Singled threaded DAG mode",
+ RunModeIdsErfDagSingle);
+
+ RunModeRegisterNewRunMode(RUNMODE_DAG, "workers",
+ "Workers DAG mode, each thread does all "
+ " tasks from acquisition to logging",
+ RunModeIdsErfDagWorkers);
return;
}
-/**
- *
- * \brief Sets up support for reading from a DAG card.
- *
- * \param de_ctx
- * \param file
- * \notes Currently only supports a single interface.
- */
-int RunModeErfDagAuto(DetectEngineCtx *de_ctx)
+int RunModeIdsErfDagSingle(DetectEngineCtx *de_ctx)
{
- SCEnter();
- char tname[12];
- uint16_t cpu = 0;
+ int ret;
- /* Available cpus */
- uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
+ SCEnter();
RunModeInitialize();
- char *iface = NULL;
- if (ConfGet("erf-dag.iface", &iface) == 0) {
- SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-dag.iface from Conf");
+ TimeModeSetLive();
+
+ ret = RunModeSetLiveCaptureSingle(de_ctx,
+ ParseDagConfig,
+ DagConfigGetThreadCount,
+ "ReceiveErfDag",
+ "DecodeErfDag",
+ "RxDAG",
+ NULL);
+ if (ret != 0) {
+ SCLogError(SC_ERR_RUNMODE, "DAG single runmode failed to start");
exit(EXIT_FAILURE);
}
- SCLogDebug("iface %s", iface);
- TimeModeSetOffline();
+ SCLogInfo("RunModeIdsDagSingle initialised");
- /* @TODO/JNM: We need to create a separate processing pipeliine for each
- * interface supported by the
- */
-
- ThreadVars *tv_receiveerf =
- TmThreadCreatePacketHandler("ReceiveErfDag",
- "packetpool","packetpool",
- "pickup-queue","simple",
- "1slot");
- if (tv_receiveerf == NULL) {
- printf("ERROR: TmThreadsCreate failed\n");
- exit(EXIT_FAILURE);
- }
- TmModule *tm_module = TmModuleGetByName("ReceiveErfDag");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName failed for ReceiveErfDag\n");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_receiveerf, tm_module, iface);
+ SCReturnInt(0);
+}
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_receiveerf, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
- }
+int RunModeIdsErfDagAutoFp(DetectEngineCtx *de_ctx)
+{
+ int ret;
- if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(EXIT_FAILURE);
- }
+ SCEnter();
- ThreadVars *tv_decode1 =
- TmThreadCreatePacketHandler("Decode & Stream",
- "pickup-queue","simple",
- "stream-queue1","simple",
- "varslot");
- if (tv_decode1 == NULL) {
- printf("ERROR: TmThreadsCreate failed for Decode1\n");
- exit(EXIT_FAILURE);
- }
- tm_module = TmModuleGetByName("DecodeErfDag");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName DecodeErfDag failed\n");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
+ RunModeInitialize();
- tm_module = TmModuleGetByName("StreamTcp");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+ TimeModeSetLive();
+
+ ret = RunModeSetLiveCaptureAutoFp(de_ctx,
+ ParseDagConfig,
+ DagConfigGetThreadCount,
+ "ReceiveErfDag",
+ "DecodeErfDag",
+ "RxDAG",
+ NULL);
+ if (ret != 0) {
+ SCLogError(SC_ERR_RUNMODE, "DAG autofp runmode failed to start");
exit(EXIT_FAILURE);
}
- TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_decode1, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
- }
+ SCLogInfo("RunModeIdsDagAutoFp initialised");
- if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(EXIT_FAILURE);
- }
-
- /* start with cpu 1 so that if we're creating an odd number of detect
- * threads we're not creating the most on CPU0. */
- if (ncpus > 0)
- cpu = 1;
-
- /* always create at least one thread */
- int thread_max = TmThreadGetNbThreads(DETECT_CPU_SET);
- if (thread_max == 0)
- thread_max = ncpus * threading_detect_ratio;
- if (thread_max < 1)
- thread_max = 1;
-
- int thread;
- for (thread = 0; thread < thread_max; thread++) {
- snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
-
- char *thread_name = SCStrdup(tname);
- SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
-
- ThreadVars *tv_detect_ncpu =
- TmThreadCreatePacketHandler(thread_name,
- "stream-queue1","simple",
- "alert-queue1","simple",
- "1slot");
- if (tv_detect_ncpu == NULL) {
- printf("ERROR: TmThreadsCreate failed\n");
- exit(EXIT_FAILURE);
- }
- tm_module = TmModuleGetByName("Detect");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName Detect failed\n");
- exit(EXIT_FAILURE);
- }
- TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
-
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
-
- /* If we have more than one core/cpu, the first Detect thread
- * (at cpu 0) will have less priority (higher 'nice' value)
- * In this case we will set the thread priority to +10 (default is 0)
- */
- if (cpu == 0 && ncpus > 1) {
- TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
- } else if (ncpus > 1) {
- TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
- }
- }
-
- if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(EXIT_FAILURE);
- }
- }
+ SCReturnInt(0);
+}
- ThreadVars *tv_outputs =
- TmThreadCreatePacketHandler("Outputs",
- "alert-queue1", "simple",
- "packetpool", "packetpool",
- "varslot");
- if (tv_outputs == NULL) {
- printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
- exit(EXIT_FAILURE);
- }
+int RunModeIdsErfDagWorkers(DetectEngineCtx *de_ctx)
+{
+ int ret;
- SetupOutputs(tv_outputs);
+ SCEnter();
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_outputs, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
- }
+ RunModeInitialize();
- if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
+ TimeModeSetLive();
+
+ ret = RunModeSetLiveCaptureWorkers(de_ctx,
+ ParseDagConfig,
+ DagConfigGetThreadCount,
+ "ReceiveErfDag",
+ "DecodeErfDag",
+ "RxDAG",
+ NULL);
+ if (ret != 0) {
+ SCLogError(SC_ERR_RUNMODE, "DAG workers runmode failed to start");
exit(EXIT_FAILURE);
}
- return 0;
+ SCLogInfo("RunModeIdsErfDagWorkers initialised");
+
+ SCReturnInt(0);
}
#ifndef __RUNMODE_ERF_DAG_H__
#define __RUNMODE_ERF_DAG_H__
-int RunModeErfDagAuto(DetectEngineCtx *);
+int RunModeIdsErfDagAutoFp(DetectEngineCtx *);
+int RunModeIdsErfDagSingle(DetectEngineCtx *);
+int RunModeIdsErfDagWorkers(DetectEngineCtx *);
void RunModeErfDagRegister(void);
const char *RunModeErfDagGetDefaultMode(void);
* \file
*
* \author Endace Technology Limited.
- * \author Jason MacLulich <jason.maclulich@eendace.com>
+ * \author Jason MacLulich <jason.maclulich@endace.com>
*
* Support for reading ERF records from a DAG card.
*
typedef struct ErfDagThreadVars_ {
ThreadVars *tv;
+ TmSlot *slot;
+
int dagfd;
int dagstream;
char dagname[DAGNAME_BUFSIZE];
} ErfDagThreadVars;
-TmEcode ReceiveErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
-TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, Packet *p, uint8_t* top,
- PacketQueue *postpq, uint32_t *pkts_read);
+TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
+ uint32_t *pkts_read);
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
{
tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = ReceiveErfDagThreadInit;
- tmm_modules[TMM_RECEIVEERFDAG].Func = ReceiveErfDag;
+ tmm_modules[TMM_RECEIVEERFDAG].Func = NULL;
+ tmm_modules[TMM_RECEIVEERFDAG].PktAcqLoop = ReceiveErfDagLoop;
tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats =
ReceiveErfDagThreadExitStats;
tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL;
}
/**
- * \brief Thread entry function for reading ERF records from a DAG card.
- *
- * Reads a new ERF record the DAG input buffer and copies it to
- * an internal Suricata packet buffer -- similar to the way the
- * pcap packet handler works.
+ * \brief Receives packets from a DAG interface.
*
- * We create new packet structures using PacketGetFromQueueOrAlloc
- * for each packet between the top and btm pointers except for
- * the first packet for which a Packet buffer is provided
- * from the packetpool.
+ * \param tv pointer to ThreadVars
+ * \param data pointer to ErfDagThreadVars
+ * \param slot slot containing task information
*
- * We always read up to dag_max_read_packets ERF packets from the
- * DAG buffer, but we might read less. This differs from the
- * ReceivePcap handler -- it will only read pkts up to a maximum
- * of either the packetpool count or the pcap_max_read_packets.
- *
- * \param tv pointer to ThreadVars
- * \param p data pointer
- * \param data
- * \param pq pointer to the PacketQueue (not used here)
- * \param postpq
- * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success.
- * \note We also use the packetpool hack first used in the source-pcap
- * handler so we don't keep producing packets without any dying.
- * This implies that if we are in this situation we run the risk
- * of dropping packets at the interface.
+ * \retval TM_ECODE_OK on success
+ * \retval TM_ECODE_FAILED on failure
*/
-TmEcode
-ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
- PacketQueue *postpq)
+TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
{
+ ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
+ TmSlot *s = (TmSlot *)slot;
+ dtv->slot = s->slot_next;
+
SCEnter();
uint16_t packet_q_len = 0;
uint8_t *top = NULL;
uint32_t pkts_read = 0;
- assert(p);
- assert(pq);
- assert(postpq);
-
- ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
-
- /* NOTE/JNM: Hack copied from source-pcap.c
- *
- * Make sure we have at least one packet in the packet pool, to
- * prevent us from alloc'ing packets at line rate
- */
- while (packet_q_len == 0) {
- packet_q_len = PacketPoolSize();
- if (packet_q_len == 0) {
- PacketPoolWait();
- }
- }
-
- if (postpq == NULL) {
- ewtn->dag_max_read_packets = 1;
- }
-
- while(pkts_read == 0)
+ while (1)
{
- if (suricata_ctl_flags != 0) {
- break;
+ if (suricata_ctl_flags & SURICATA_STOP ||
+ suricata_ctl_flags & SURICATA_KILL) {
+ SCReturnInt(TM_ECODE_FAILED);
}
+ /* Make sure we have at least one packet in the packet pool,
+ * to prevent us from alloc'ing packets at line rate. */
+ do {
+ packet_q_len = PacketPoolSize();
+ if (unlikely(packet_q_len == 0)) {
+ PacketPoolWait();
+ }
+ } while (packet_q_len == 0);
+
/* NOTE/JNM: This might not work well if we start restricting the
- * number of ERF records processed per call to a small number as
- * the over head required here could exceed the time it takes to
- * process a small number of ERF records.
- *
- * XXX/JNM: Possibly process the DAG stream buffer first if there
- * are ERF packets or else call dag_advance_stream and then process
- * the DAG stream buffer.
- */
- top = dag_advance_stream(ewtn->dagfd, ewtn->dagstream, &(ewtn->btm));
-
- if (NULL == top)
- {
- if((ewtn->dagstream & 0x1) && (errno == EAGAIN)) {
- usleep(10 * 1000);
- ewtn->btm = ewtn->top;
+ * number of ERF records processed per call to a small number as
+ * the over head required here could exceed the time it takes to
+ * process a small number of ERF records.
+ *
+ * XXX/JNM: Possibly process the DAG stream buffer first if there
+ * are ERF packets or else call dag_advance_stream and then process
+ * the DAG stream buffer.
+ */
+ top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
+
+ if (NULL == top)
+ {
+ if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
+ usleep(10 * 1000);
+ dtv->btm = dtv->top;
continue;
- }
- else {
- SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
- "Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
- ewtn->dagstream, ewtn->dagname);
- SCReturnInt(TM_ECODE_FAILED);
- }
- }
+ }
+ else {
+ SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
+ "Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
+ dtv->dagstream, dtv->dagname);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ }
- diff = top - ewtn->btm;
- if (diff == 0)
- {
- continue;
- }
+ diff = top - dtv->btm;
+ if (diff == 0)
+ {
+ continue;
+ }
- assert(diff >= dag_record_size);
+ assert(diff >= dag_record_size);
- err = ProcessErfDagRecords(ewtn, p, top, postpq, &pkts_read);
+ err = ProcessErfDagRecords(dtv, top, &pkts_read);
if (err == TM_ECODE_FAILED) {
- SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
- "Failed to read from stream: %d, DAG: %s",
- ewtn->dagstream, ewtn->dagname);
- ReceiveErfDagCloseStream(ewtn->dagfd, ewtn->dagstream);
+ SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
+ "Failed to read from stream: %d, DAG: %s",
+ dtv->dagstream, dtv->dagname);
+ ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
SCReturnInt(err);
}
}
SCLogDebug("Read %d records from stream: %d, DAG: %s",
- pkts_read, ewtn->dagstream, ewtn->dagname);
+ pkts_read, dtv->dagstream, dtv->dagname);
if (suricata_ctl_flags != 0) {
SCReturnInt(TM_ECODE_FAILED);
}
- SCReturnInt(err);
+ SCReturnInt(TM_ECODE_OK);
}
+/**
+ * \brief Process a chunk of records read from a DAG interface.
+ *
+ * This function takes a pointer to buffer read from the DAG interface
+ * and processes it individual records.
+ */
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
- Packet *p,
uint8_t* top,
- PacketQueue *postpq,
uint32_t *pkts_read)
{
SCEnter();
+ Packet *p;
int err = 0;
dag_record_t* dr = NULL;
char *prec = NULL;
if ((top-(ewtn->btm)) < rlen)
SCReturnInt(TM_ECODE_OK);
- p = p ? p : PacketGetFromQueueOrAlloc();
-
+ p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate a Packet on stream: %d, DAG: %s",
err = ProcessErfDagRecord(ewtn, prec, p);
- if (err != TM_ECODE_OK)
+ if (err != TM_ECODE_OK) {
+ TmqhOutputPacketpool(ewtn->tv, p);
SCReturnInt(err);
+ }
ewtn->btm += rlen;
- /* XXX/JNM: Hack to get around the fact that the first Packet from
- * Suricata is added explicitly by the Slot code and shouldn't go
- * onto the post queue -- else it is added twice to the next queue.
- */
- if (*pkts_read) {
- PacketEnqueue(postpq, p);
+ err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
+ if (err != TM_ECODE_OK) {
+ return err;
}
(*pkts_read)++;
-
- p = NULL;
}
SCReturnInt(TM_ECODE_OK);
*/
PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
- SCLogDebug("pktlen: %" PRIu32 " (pkt %02x, pkt data %02x)",
- GET_PKT_LEN(p), *p, *GET_PKT_DATA(p));
-
/* Convert ERF time to timeval - from libpcap. */
uint64_t ts = dr->ts;
p->ts.tv_sec = ts >> 32;
#endif /* HAVE_LIBCAP_NG */
printf("\t--erf-in <path> : process an ERF file\n");
#ifdef HAVE_DAG
- printf("\t--dag <dag0,dag1,...> : process ERF records from 0,1,...,n DAG input streams\n");
+ printf("\t--dag <dagX:Y> : process ERF records from DAG interface X, stream Y\n");
#endif
#ifdef HAVE_NAPATECH
printf("\t--napatech <adapter> : run Napatech feeds using <adapter>\n");
exit(EXIT_FAILURE);
}
}
- else if (strcmp((long_opts[option_index]).name, "dag") == 0) {
+ else if (strcmp((long_opts[option_index]).name, "dag") == 0) {
#ifdef HAVE_DAG
- run_mode = RUNMODE_DAG;
- if (ConfSet("erf-dag.iface", optarg, 0) != 1) {
- fprintf(stderr, "ERROR: Failed to set erf_dag.iface\n");
+ if (run_mode == RUNMODE_UNKNOWN) {
+ run_mode = RUNMODE_DAG;
+ }
+ else if (run_mode != RUNMODE_DAG) {
+ SCLogError(SC_ERR_MULTIPLE_RUN_MODE,
+ "more than one run mode has been specified");
+ usage(argv[0]);
exit(EXIT_FAILURE);
}
+ LiveRegisterDevice(optarg);
#else
- SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required"
+ SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required"
" to receieve packets using --dag.");
- exit(EXIT_FAILURE);
+ exit(EXIT_FAILURE);
#endif /* HAVE_DAG */
}
else if (strcmp((long_opts[option_index]).name, "napatech") == 0) {