]> git.ipfire.org Git - people/ms/suricata.git/commitdiff
Add initial support for reading packets from a DAG card, we only support reading...
authorJason MacLulich <jason.maclulich@endace.com>
Wed, 16 Jun 2010 07:36:02 +0000 (03:36 -0400)
committerVictor Julien <victor@inliniac.net>
Wed, 16 Jun 2010 09:54:13 +0000 (11:54 +0200)
Use the --dag <dagname> cmd line option to specify from which DAG card to read pkts
from.

Issue at the moment with pkts being ejected during shutdown -- at the moment we
ignore any packets that are not of link type Ethernet.

12 files changed:
autogen.sh
configure.in
src/Makefile.am
src/runmodes.c
src/runmodes.h
src/source-erf-dag.c [new file with mode: 0644]
src/source-erf-dag.h [new file with mode: 0644]
src/suricata.c
src/suricata.h
src/tm-modules.h
src/util-error.c
src/util-error.h

index c83d11ad12c6d51eb25846c287f1a3077b33f768..b7a4df05c90725b872a04918c977b4a17b92a30a 100755 (executable)
@@ -1,5 +1,14 @@
 #!/bin/sh
 # Run this to generate all the initial makefiles, etc.
-libtoolize -c
+if which libtoolize > /dev/null; then
+  echo "Found libtoolize"
+  libtoolize -c
+elif which glibtoolize > /dev/null; then
+  echo "Found glibtoolize"
+  glibtoolize -c
+else
+  echo "Failed to find libtoolize or glibtoolize, please ensure it is installed and accessible via your PATH env variable"
+  exit 1
+fi;
 autoreconf -fv --install
 echo "You can now run \"./configure\" and then \"make\"."
index c278f6c225650aeb3c7fd4a90afcbbcf234031c7..350ec93b9d132034c39095007b1027428bd64677 100644 (file)
@@ -746,6 +746,47 @@ AC_CHECK_HEADER(pcap.h,,[AC_ERROR(pcap.h not found ...)])
         CFLAGS="${CFLAGS} -DPROFILING"
     ])
 
+# Check for DAG support.
+
+    AC_ARG_ENABLE(dag,
+               [  --enable-dag  Enable DAG capture],
+               [ enable_dag=yes ],
+               [ enable_dag=no])
+    AC_ARG_WITH(dag_includes,
+            [  --with-dag-includes=DIR  dagapi include directory],
+            [with_dag_includes="$withval"],[with_dag_includes="no"])
+    AC_ARG_WITH(dag_libraries,
+            [  --with-dag-libraries=DIR  dagapi library directory],
+            [with_dag_libraries="$withval"],[with_dag_libraries="no"])
+
+    if test "$enable_dag" = "yes"; then
+
+           if test "$with_dag_includes" != "no"; then
+            CPPFLAGS="${CPPFLAGS} -I${with_dag_includes}"
+        fi
+
+        if test "$with_dag_libraries" != "no"; then
+            LDFLAGS="${LDFLAGS} -I${with_dag_libraries}"
+        fi
+
+        AC_CHECK_HEADER(dagapi.h,DAG="yes",DAG="no")
+        if test "$DAG" != "no"; then
+            DAG=""
+               AC_CHECK_LIB(dag,dag_open,DAG="yes",DAG="no")
+        fi
+
+        if test "$DAG" != "no"; then
+            CFLAGS="${CFLAGS} -DHAVE_DAG"
+        fi
+
+        if test "$DAG" = "no"; then
+            echo
+            echo "  ERROR! libdag library not found"
+            echo
+            exit 1
+        fi
+    fi
+
 AC_SUBST(CFLAGS)
 AC_SUBST(LDFLAGS)
 AC_SUBST(CPPFLAGS)
@@ -761,6 +802,7 @@ Suricata Configuration:
   Unit tests enabled:       ${enable_unittests}
   Debug output enabled:     ${enable_debug}
   CUDA enabled:             ${enable_cuda}
+  DAG enabled:              ${enable_dag}
   Profiling enabled:        ${enable_profiling}
   GCC Protect enabled:      ${enable_gccprotect}
   GCC march native enabled: ${enable_gccmarch_native}
index a0380cfabf42b388c0f0437e8f9802139c1c929a..487b4301efc5a8d0c0b6d504f6be746cb6e3218d 100644 (file)
@@ -14,6 +14,7 @@ source-pcap-file.c source-pcap-file.h \
 source-pfring.c source-pfring.h \
 source-ipfw.c source-ipfw.h \
 source-erf-file.c source-erf-file.h \
+source-erf-dag.c source-erf-dag.h \
 decode.c decode.h \
 decode-ethernet.c decode-ethernet.h \
 decode-vlan.c decode-vlan.h \
index 36f98813923ff2a0cd912f4486ea118b704969c3..2fcbe5aa6732407fef8893ac069398950d57dfbc 100644 (file)
@@ -3155,3 +3155,166 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx, char *file)
 
     return 0;
 }
+
+/**
+ *
+ * \brief   Sets up support for reading from a DAG card.
+ *
+ * \param   de_ctx
+ * \param   file
+ * \notes   Currently only supports a single interface.
+ */
+int RunModeErfDagAuto(DetectEngineCtx *de_ctx, char *file)
+{
+    SCEnter();
+    char tname[12];
+    uint16_t cpu = 0;
+
+    /* Available cpus */
+    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
+
+    RunModeInitialize();
+
+    SCLogDebug("file %s", file);
+    TimeModeSetOffline();
+
+    /* @TODO/JNM: We need to create a separate processing pipeliine for each
+     * interface supported by the
+     */
+
+    ThreadVars *tv_receiveerf =
+        TmThreadCreatePacketHandler("ReceiveErfDag",
+                                    "packetpool","packetpool",
+                                    "pickup-queue","simple",
+                                    "1slot");
+    if (tv_receiveerf == NULL) {
+        printf("ERROR: TmThreadsCreate failed\n");
+        exit(EXIT_FAILURE);
+    }
+    TmModule *tm_module = TmModuleGetByName("ReceiveErfDag");
+    if (tm_module == NULL) {
+        printf("ERROR: TmModuleGetByName failed for ReceiveErfDag\n");
+        exit(EXIT_FAILURE);
+    }
+    Tm1SlotSetFunc(tv_receiveerf, tm_module, file);
+
+    if (threading_set_cpu_affinity) {
+        TmThreadSetCPUAffinity(tv_receiveerf, 0);
+        if (ncpus > 1)
+            TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
+    }
+
+    if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
+        printf("ERROR: TmThreadSpawn failed\n");
+        exit(EXIT_FAILURE);
+    }
+
+    ThreadVars *tv_decode1 =
+        TmThreadCreatePacketHandler("Decode & Stream",
+                                    "pickup-queue","simple",
+                                    "stream-queue1","simple",
+                                    "varslot");
+    if (tv_decode1 == NULL) {
+        printf("ERROR: TmThreadsCreate failed for Decode1\n");
+        exit(EXIT_FAILURE);
+    }
+    tm_module = TmModuleGetByName("DecodeErfDag");
+    if (tm_module == NULL) {
+        printf("ERROR: TmModuleGetByName DecodeErfDag failed\n");
+        exit(EXIT_FAILURE);
+    }
+    TmVarSlotSetFuncAppend(tv_decode1,tm_module,NULL);
+
+    tm_module = TmModuleGetByName("StreamTcp");
+    if (tm_module == NULL) {
+        printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+        exit(EXIT_FAILURE);
+    }
+    TmVarSlotSetFuncAppend(tv_decode1,tm_module,NULL);
+
+    if (threading_set_cpu_affinity) {
+        TmThreadSetCPUAffinity(tv_decode1, 0);
+        if (ncpus > 1)
+            TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
+    }
+
+    if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
+        printf("ERROR: TmThreadSpawn failed\n");
+        exit(EXIT_FAILURE);
+    }
+
+    /* start with cpu 1 so that if we're creating an odd number of detect
+     * threads we're not creating the most on CPU0. */
+    if (ncpus > 0)
+        cpu = 1;
+
+    /* always create at least one thread */
+    int thread_max = ncpus * threading_detect_ratio;
+    if (thread_max < 1)
+        thread_max = 1;
+
+    int thread;
+    for (thread = 0; thread < thread_max; thread++) {
+        snprintf(tname, sizeof(tname),"Detect%"PRIu16, thread+1);
+        if (tname == NULL)
+            break;
+
+        char *thread_name = SCStrdup(tname);
+        SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
+
+        ThreadVars *tv_detect_ncpu =
+            TmThreadCreatePacketHandler(thread_name,
+                                        "stream-queue1","simple",
+                                        "alert-queue1","simple",
+                                        "1slot");
+        if (tv_detect_ncpu == NULL) {
+            printf("ERROR: TmThreadsCreate failed\n");
+            exit(EXIT_FAILURE);
+        }
+        tm_module = TmModuleGetByName("Detect");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName Detect failed\n");
+            exit(EXIT_FAILURE);
+        }
+        Tm1SlotSetFunc(tv_detect_ncpu,tm_module,(void *)de_ctx);
+
+        if (threading_set_cpu_affinity) {
+            TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
+
+            /* If we have more than one core/cpu, the first Detect thread
+             * (at cpu 0) will have less priority (higher 'nice' value)
+             * In this case we will set the thread priority to +10 (default is 0)
+             */
+            if (cpu == 0 && ncpus > 1) {
+                TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
+            } else if (ncpus > 1) {
+                TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
+            }
+        }
+
+        if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
+            printf("ERROR: TmThreadSpawn failed\n");
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    ThreadVars *tv_outputs =
+        TmThreadCreatePacketHandler("Outputs",
+                                    "alert-queue1", "simple",
+                                    "packetpool", "packetpool",
+                                    "varslot");
+    SetupOutputs(tv_outputs);
+
+    if (threading_set_cpu_affinity) {
+        TmThreadSetCPUAffinity(tv_outputs, 0);
+        if (ncpus > 1)
+            TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
+    }
+
+    if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
+        printf("ERROR: TmThreadSpawn failed\n");
+        exit(EXIT_FAILURE);
+    }
+
+    return 0;
+}
index 16ebab800d8cea4bb1b1082c2bbe375c0e3a95d5..4c20fdae8eb23b46d4818547267c76318ccf0ff3 100644 (file)
@@ -48,6 +48,7 @@ int RunModeIpsIPFW(DetectEngineCtx *);
 int RunModeIpsIPFWAuto(DetectEngineCtx *);
 
 int RunModeErfFileAuto(DetectEngineCtx *, char *);
+int RunModeErfDagAuto(DetectEngineCtx *, char *);
 
 void RunModeShutDown(void);
 
diff --git a/src/source-erf-dag.c b/src/source-erf-dag.c
new file mode 100644 (file)
index 0000000..7c953af
--- /dev/null
@@ -0,0 +1,643 @@
+/* Copyright (C) 2010 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Endace Technology Limited.
+ * \author Jason MacLulich <jason.maclulich@eendace.com>
+ *
+ * Support for reading ERF records from a DAG card.
+ *
+ * Only ethernet supported at this time.
+ */
+
+#include "suricata-common.h"
+#include "suricata.h"
+#include "tm-modules.h"
+
+#include "util-privs.h"
+
+#ifndef HAVE_DAG
+
+TmEcode NoErfDagSupportExit(ThreadVars *, void *, void **);
+
+void TmModuleReceiveErfDagRegister (void) {
+    tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
+    tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = NoErfDagSupportExit;
+    tmm_modules[TMM_RECEIVEERFDAG].Func = NULL;
+    tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats = NULL;
+    tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL;
+    tmm_modules[TMM_RECEIVEERFDAG].RegisterTests = NULL;
+    tmm_modules[TMM_RECEIVEERFDAG].cap_flags = SC_CAP_NET_ADMIN;
+}
+
+void TmModuleDecodeErfDagRegister (void) {
+    tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag";
+    tmm_modules[TMM_DECODEERFDAG].ThreadInit = NoErfDagSupportExit;
+    tmm_modules[TMM_DECODEERFDAG].Func = NULL;
+    tmm_modules[TMM_DECODEERFDAG].ThreadExitPrintStats = NULL;
+    tmm_modules[TMM_DECODEERFDAG].ThreadDeinit = NULL;
+    tmm_modules[TMM_DECODEERFDAG].RegisterTests = NULL;
+    tmm_modules[TMM_DECODEERFDAG].cap_flags = 0;
+}
+
+TmEcode NoErfDagSupportExit(ThreadVars *tv, void *initdata, void **data)
+{
+    SCLogError(SC_ERR_DAG_NOSUPPORT,
+               "Error creating thread %s: you do not have support for DAG cards "
+               "enabled please recompile with --enable-dag", tv->name);
+    exit(EXIT_FAILURE);
+}
+
+#else /* Implied we do have DAG support */
+
+#define DAG_MAX_READ_PKTS 256
+
+#include "source-erf-dag.h"
+#include <dagapi.h>
+// #include <dagutil.h>
+
+extern int max_pending_packets;
+extern uint8_t suricata_ctl_flags;
+
+typedef struct ErfDagThreadVars_ {
+    ThreadVars *tv;
+    int dagfd;
+    int dagstream;
+    char dagname[DAGNAME_BUFSIZE];
+    uint32_t dag_max_read_packets;
+
+    struct timeval  maxwait, poll;  /* Could possibly be made static */
+
+    uint32_t pkts;
+    uint64_t bytes;
+
+    /* Track current location in the DAG stream input buffer
+     */
+    uint8_t* top;                      /* We track top as well so we don't have to
+                                       call dag_advance_stream again if there
+                                       are still pkts to process.
+                                     */
+    uint8_t* btm;
+
+} ErfDagThreadVars;
+
+TmEcode ReceiveErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
+void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
+TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
+TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, Packet *p, uint8_t* top,
+                             PacketQueue *postpq, uint32_t *pkts_read);
+TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
+
+TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
+TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+
+/**
+ * \brief Register the ERF file receiver (reader) module.
+ */
+void
+TmModuleReceiveErfDagRegister(void)
+{
+    tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
+    tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = ReceiveErfDagThreadInit;
+    tmm_modules[TMM_RECEIVEERFDAG].Func = ReceiveErfDag;
+    tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats =
+        ReceiveErfDagThreadExitStats;
+    tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL;
+    tmm_modules[TMM_RECEIVEERFDAG].RegisterTests = NULL;
+    tmm_modules[TMM_RECEIVEERFDAG].cap_flags = 0;
+}
+
+/**
+ * \brief Register the ERF file decoder module.
+ */
+void
+TmModuleDecodeErfDagRegister(void)
+{
+    tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag";
+    tmm_modules[TMM_DECODEERFDAG].ThreadInit = DecodeErfDagThreadInit;
+    tmm_modules[TMM_DECODEERFDAG].Func = DecodeErfDag;
+    tmm_modules[TMM_DECODEERFDAG].ThreadExitPrintStats = NULL;
+    tmm_modules[TMM_DECODEERFDAG].ThreadDeinit = NULL;
+    tmm_modules[TMM_DECODEERFDAG].RegisterTests = NULL;
+    tmm_modules[TMM_DECODEERFDAG].cap_flags = 0;
+}
+
+/**
+ * \brief   Initialize the ERF receiver thread, generate a single
+ *          ErfDagThreadVar structure for each thread, this will
+ *          contain a DAG file descriptor which is read when the
+ *          thread executes.
+ *
+ * \param tv        Thread variable to ThreadVars
+ * \param initdata  Initial data to the interface passed from the user,
+ *                  this is processed by the user.
+ *
+ *                  We assume that we have only a single name for the DAG
+ *                  interface.
+ *
+ * \param data      data pointer gets populated with
+ *
+ */
+TmEcode
+ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
+{
+    SCEnter();
+    int stream_count = 0;
+
+    if (initdata == NULL) {
+        SCLogError(SC_ERR_INVALID_ARGUMENT, "Error: No DAG interface provided.");
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    ErfDagThreadVars *ewtn = SCMalloc(sizeof(ErfDagThreadVars));
+    if (ewtn == NULL) {
+        SCLogError(SC_ERR_MEM_ALLOC,
+                   "Failed to allocate memory for ERF DAG thread vars.");
+        exit(EXIT_FAILURE);
+    }
+
+    memset(ewtn, 0, sizeof(*ewtn));
+
+    /*  Use max_pending_packets as our maximum number of packets read
+     from the DAG buffer.
+     */
+    ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
+        DAG_MAX_READ_PKTS : max_pending_packets;
+
+
+    /* dag_parse_name will return a DAG device name and stream number
+     * to open for this thread.
+     */
+    if (dag_parse_name(initdata, ewtn->dagname, DAGNAME_BUFSIZE,
+                       &ewtn->dagstream) < 0)
+    {
+        SCLogError(SC_ERR_INVALID_ARGUMENT,
+                   "Failed to parse DAG interface: %s",
+                   (char*)initdata);
+        SCFree(ewtn);
+        exit(EXIT_FAILURE);
+    }
+
+    SCLogInfo("Opening DAG: %s on stream: %d for processing",
+        ewtn->dagname, ewtn->dagstream);
+
+    if ((ewtn->dagfd = dag_open(ewtn->dagname)) < 0)
+    {
+        SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED, "Failed to open DAG: %s",
+                   ewtn->dagname);
+        SCFree(ewtn);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    /* Check to make sure the card has enough available streams to
+     * support reading from the one specified.
+     */
+    if ((stream_count = dag_rx_get_stream_count(ewtn->dagfd)) < 0)
+    {
+        SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED,
+                   "Failed to open stream: %d, DAG: %s, could not query stream count",
+                   ewtn->dagstream, ewtn->dagname);
+        SCFree(ewtn);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    /* Check to make sure we have enough rx streams to open the stream
+     * the user is asking for.
+     */
+    if (ewtn->dagstream > stream_count*2)
+    {
+        SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED,
+                   "Failed to open stream: %d, DAG: %s, insufficient streams: %d",
+                   ewtn->dagstream, ewtn->dagname, stream_count);
+        SCFree(ewtn);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    /* If we are transmitting into a soft DAG card then set the stream
+     * to act in reverse mode.
+     */
+    if (0 != (ewtn->dagstream & 0x01))
+    {
+        /* Setting reverse mode for using with soft dag from daemon side */
+        if(dag_set_mode(ewtn->dagfd, ewtn->dagstream, DAG_REVERSE_MODE)) {
+            SCLogError(SC_ERR_ERF_DAG_STREAM_OPEN_FAILED,
+                       "Failed to set mode to DAG_REVERSE_MODE on stream: %d, DAG: %s",
+                       ewtn->dagstream, ewtn->dagname);
+            SCFree(ewtn);
+            SCReturnInt(TM_ECODE_FAILED);
+        }
+    }
+
+    if (dag_attach_stream(ewtn->dagfd, ewtn->dagstream, 0, 0) < 0)
+    {
+        SCLogError(SC_ERR_ERF_DAG_STREAM_OPEN_FAILED,
+                   "Failed to open DAG stream: %d, DAG: %s",
+                   ewtn->dagstream, ewtn->dagname);
+        SCFree(ewtn);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    if (dag_start_stream(ewtn->dagfd, ewtn->dagstream) < 0)
+    {
+        SCLogError(SC_ERR_ERF_DAG_STREAM_START_FAILED,
+                   "Failed to start DAG stream: %d, DAG: %s",
+                   ewtn->dagstream, ewtn->dagname);
+        SCFree(ewtn);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    SCLogInfo("Attached and started stream: %d on DAG: %s",
+        ewtn->dagstream, ewtn->dagname);
+
+    /*
+     * Initialise DAG Polling parameters.
+     */
+    timerclear(&ewtn->maxwait);
+    ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
+    timerclear(&ewtn->poll);
+    ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
+
+    /* 32kB minimum data to return -- we still restrict the number of
+     * pkts that are processed to a maximum of dag_max_read_packets.
+     */
+    if (dag_set_stream_poll(ewtn->dagfd, ewtn->dagstream, 32*1024, &(ewtn->maxwait), &(ewtn->poll)) < 0)
+    {
+        SCLogError(SC_ERR_ERF_DAG_STREAM_SET_FAILED,
+                   "Failed to set poll parameters for stream: %d, DAG: %s",
+                   ewtn->dagstream, ewtn->dagname);
+        SCFree(ewtn);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    ewtn->tv = tv;
+    *data = (void *)ewtn;
+
+    SCLogInfo("Starting processing packets from stream: %d on DAG: %s",
+              ewtn->dagstream, ewtn->dagname);
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief   Thread entry function for reading ERF records from a DAG card.
+ *
+ *          Reads a new ERF record the DAG input buffer and copies it to
+ *          an internal Suricata packet buffer -- similar to the way the
+ *          pcap packet handler works.
+ *
+ *          We create new packet structures using PacketGetFromQueueOrAlloc
+ *          for each packet between the top and btm pointers except for
+ *          the first packet for which a Packet buffer is provided
+ *          from the packetpool.
+ *
+ *          We always read up to dag_max_read_packets ERF packets from the
+ *          DAG buffer, but we might read less. This differs from the
+ *          ReceivePcap handler -- it will only read pkts up to a maximum
+ *          of either the packetpool count or the pcap_max_read_packets.
+ *
+ * \param   tv pointer to ThreadVars
+ * \param   p data pointer
+ * \param   data
+ * \param   pq pointer to the PacketQueue (not used here)
+ * \param   postpq
+ * \retval  TM_ECODE_FAILED on failure and TM_ECODE_OK on success.
+ * \note    We also use the packetpool hack first used in the source-pcap
+ *          handler so we don't keep producing packets without any dying.
+ *          This implies that if we are in this situation we run the risk
+ *          of dropping packets at the interface.
+ */
+TmEcode
+ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
+               PacketQueue *postpq)
+{
+    SCEnter();
+
+    uint16_t packet_q_len = 0;
+    uint32_t diff = 0;
+    int      err;
+    uint8_t  *top = NULL;
+    uint32_t pkts_read = 0;
+
+    assert(p);
+    assert(pq);
+    assert(postpq);
+
+    ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
+
+    /* NOTE/JNM: Hack copied from source-pcap.c
+     *
+     * Make sure we have at least one packet in the packet pool, to
+     * prevent us from alloc'ing packets at line rate
+     */
+    while (packet_q_len == 0) {
+        SCMutexLock(&packet_q.mutex_q);
+        packet_q_len = packet_q.len;
+        if (packet_q.len == 0) {
+            SCondWait(&packet_q.cond_q, &packet_q.mutex_q);
+        }
+        packet_q_len = packet_q.len;
+        SCMutexUnlock(&packet_q.mutex_q);
+    }
+
+    if (postpq == NULL) {
+        ewtn->dag_max_read_packets = 1;
+    }
+
+    while(pkts_read == 0)
+    {
+           if (suricata_ctl_flags != 0) {
+            break;
+        }
+
+        /* NOTE/JNM: This might not work well if we start restricting the
+            * number of ERF records processed per call to a small number as
+            * the over head required here could exceed the time it takes to
+            * process a small number of ERF records.
+            *
+            * XXX/JNM: Possibly process the DAG stream buffer first if there
+            * are ERF packets or else call dag_advance_stream and then process
+            * the DAG stream buffer.
+            */
+           top = dag_advance_stream(ewtn->dagfd, ewtn->dagstream, &(ewtn->btm));
+
+           if (NULL == top)
+           {
+               if((ewtn->dagstream & 0x1) && (errno == EAGAIN)) {
+                   usleep(10 * 1000);
+                   ewtn->btm = ewtn->top;
+                continue;
+               }
+               else {
+                   SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
+                              "Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
+                              ewtn->dagstream, ewtn->dagname);
+                   SCReturnInt(TM_ECODE_FAILED);
+               }
+           }
+
+           diff = top - ewtn->btm;
+           if (diff == 0)
+           {
+               continue;
+           }
+
+           assert(diff >= dag_record_size);
+
+           err = ProcessErfDagRecords(ewtn, p, top, postpq, &pkts_read);
+
+        if (err == TM_ECODE_FAILED) {
+             SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
+                   "Failed to read from stream: %d, DAG: %s",
+                   ewtn->dagstream, ewtn->dagname);
+            SCReturnInt(err);
+        }
+    }
+
+    SCLogDebug("Read %d records from stream: %d, DAG: %s",
+        pkts_read, ewtn->dagstream, ewtn->dagname);
+
+    if (suricata_ctl_flags != 0) {
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    SCReturnInt(err);
+}
+
+TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
+                             Packet *p,
+                             uint8_t* top,
+                             PacketQueue *postpq,
+                             uint32_t *pkts_read)
+{
+    SCEnter();
+
+    int     err = 0;
+    dag_record_t* dr = NULL;
+    char    *prec = NULL;
+    int     rlen;
+
+    *pkts_read = 0;
+
+    while(((top-(ewtn->btm))>=dag_record_size) &&
+          ((*pkts_read)<(ewtn->dag_max_read_packets)))
+    {
+        prec = (char*)ewtn->btm;
+        dr = (dag_record_t*)prec;
+
+        rlen = htons(dr->rlen);
+
+        if (rlen == 20) {
+            rlen = 28;
+            SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
+                "Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
+                ewtn->dagstream, ewtn->dagname);
+        }
+
+        /* If we don't have enough data to finsih processing this ERF record
+         * return and maybe next time we will.
+         */
+        if ((top-(ewtn->btm)) < rlen)
+            SCReturnInt(TM_ECODE_OK);
+
+        p = p ? p : PacketGetFromQueueOrAlloc();
+
+        if (p == NULL) {
+            SCLogError(SC_ERR_MEM_ALLOC,
+                       "Failed to allocate a Packet on stream: %d, DAG: %s",
+                       ewtn->dagstream, ewtn->dagname);
+            SCReturnInt(TM_ECODE_FAILED);
+        }
+
+        err = ProcessErfDagRecord(ewtn, prec, p);
+
+        if (err != TM_ECODE_OK)
+            SCReturnInt(err);
+
+        ewtn->btm += rlen;
+
+        /* XXX/JNM: Hack to get around the fact that the first Packet from
+         * Suricata is added explicitly by the Slot code and shouldn't go
+         * onto the post queue -- else it is added twice to the next queue.
+         */
+        if (*pkts_read) {
+            PacketEnqueue(postpq, p);
+        }
+
+        (*pkts_read)++;
+
+        p = NULL;
+    }
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief   Process a DAG record into a TM packet buffer.
+ * \param   prec pointer to a DAG record.
+ * \param
+ */
+TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
+{
+    SCEnter();
+
+    int wlen = 0;
+    dag_record_t  *dr = (dag_record_t*)prec;
+    erf_payload_t *pload;
+
+    assert(prec);
+    assert(p);
+
+    if (p == NULL) SCReturnInt(TM_ECODE_OK);
+
+    /* Only support ethernet at this time. */
+    if (dr->type != TYPE_ETH) {
+        SCLogError(SC_ERR_UNIMPLEMENTED,
+                   "Processing of DAG record type: %d not implemented.", dr->type);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    wlen = htons(dr->wlen);
+
+    pload = &(dr->rec);
+
+    p->pktlen = wlen - 4;   /* Trim the FCS... */
+    p->datalink = LINKTYPE_ETHERNET;
+
+    /* Take into account for link type Ethernet ETH frame starts
+     * after ther ERF header + pad.
+     */
+    memcpy(p->pkt, pload->eth.dst, p->pktlen);
+
+    SCLogDebug("p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)",
+               p->pktlen, *p, *p->pkt);
+
+    /* Convert ERF time to timeval - from libpcap. */
+    uint64_t ts = dr->ts;
+    p->ts.tv_sec = ts >> 32;
+    ts = (ts & 0xffffffffULL) * 1000000;
+    ts += 0x80000000; /* rounding */
+    p->ts.tv_usec = ts >> 32;
+    if (p->ts.tv_usec >= 1000000) {
+        p->ts.tv_usec -= 1000000;
+        p->ts.tv_sec++;
+    }
+
+    ewtn->pkts++;
+    ewtn->bytes += wlen;
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief Print some stats to the log at program exit.
+ *
+ * \param tv Pointer to ThreadVars.
+ * \param data Pointer to data, ErfFileThreadVars.
+ */
+void
+ReceiveErfDagThreadExitStats(ThreadVars *tv, void *data)
+{
+    ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
+
+    SCLogInfo("Packets: %"PRIu32"; Bytes: %"PRIu64, ewtn->pkts, ewtn->bytes);
+}
+
+/**
+ * \brief   Deinitializes the DAG card.
+ * \param   tv pointer to ThreadVars
+ * \param   data pointer that gets cast into PcapThreadVars for ptv
+ */
+TmEcode ReceiveErfDagThreadDeinit(ThreadVars *tv, void *data) {
+
+    SCEnter();
+
+    ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
+
+    dag_stop_stream(ewtn->dagfd, ewtn->dagstream);
+    dag_detach_stream(ewtn->dagfd, ewtn->dagstream);
+    dag_close(ewtn->dagfd);
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+
+/** Decode ErfDag */
+
+/**
+ * \brief   This function passes off to link type decoders.
+ *
+ * DecodeErfDag reads packets from the PacketQueue and passes
+ * them off to the proper link type decoder.
+ *
+ * \param t pointer to ThreadVars
+ * \param p pointer to the current packet
+ * \param data pointer that gets cast into PcapThreadVars for ptv
+ * \param pq pointer to the current PacketQueue
+ */
+TmEcode DecodeErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
+                   PacketQueue *postpq)
+{
+    SCEnter();
+    DecodeThreadVars *dtv = (DecodeThreadVars *)data;
+
+    /* update counters */
+    SCPerfCounterIncr(dtv->counter_pkts, tv->sc_perf_pca);
+    SCPerfCounterIncr(dtv->counter_pkts_per_sec, tv->sc_perf_pca);
+
+    SCPerfCounterAddUI64(dtv->counter_bytes, tv->sc_perf_pca, p->pktlen);
+    SCPerfCounterAddDouble(dtv->counter_bytes_per_sec, tv->sc_perf_pca, p->pktlen);
+    SCPerfCounterAddDouble(dtv->counter_mbit_per_sec, tv->sc_perf_pca,
+                           (p->pktlen * 8)/1000000.0);
+
+    SCPerfCounterAddUI64(dtv->counter_avg_pkt_size, tv->sc_perf_pca, p->pktlen);
+    SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, p->pktlen);
+
+        /* call the decoder */
+    switch(p->datalink) {
+        case LINKTYPE_ETHERNET:
+            DecodeEthernet(tv, dtv, p, p->pkt, p->pktlen, pq);
+            break;
+        default:
+            SCLogError(SC_ERR_DATALINK_UNIMPLEMENTED,
+                "Error: datalink type %" PRId32 " not yet supported in module DecodeErfDag",
+                p->datalink);
+            break;
+    }
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+TmEcode DecodeErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
+{
+    SCEnter();
+    DecodeThreadVars *dtv = NULL;
+
+    if ( (dtv = SCMalloc(sizeof(DecodeThreadVars))) == NULL)
+        SCReturnInt(TM_ECODE_FAILED);
+    memset(dtv, 0, sizeof(DecodeThreadVars));
+
+    DecodeRegisterPerfCounters(dtv, tv);
+
+    *data = (void *)dtv;
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+#endif /* HAVE_DAG */
diff --git a/src/source-erf-dag.h b/src/source-erf-dag.h
new file mode 100644 (file)
index 0000000..2b718e1
--- /dev/null
@@ -0,0 +1,32 @@
+/* Copyright (C) 2010 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Endace Technology Limited
+ * \author Jason MacLulich <jason.maclulich@endace.com>
+ */
+
+#ifndef __SOURCE_ERR_DAG_H__
+#define __SOURCE_ERF_DAG_H__
+
+void TmModuleReceiveErfDagRegister(void);
+void TmModuleDecodeErfDagRegister(void);
+
+#endif /* __SOURCE_ERF_DAG_H__ */
+
index 5341ca352c42843663d94c416eae46a64ffc2b93..6f540c11f8e7eeeda2d5d017f17103ee87f641d6 100644 (file)
@@ -84,6 +84,7 @@
 #include "source-pfring.h"
 
 #include "source-erf-file.h"
+#include "source-erf-dag.h"
 
 #include "respond-reject.h"
 
@@ -321,6 +322,9 @@ void usage(const char *progname)
     printf("\t--group <group>              : run suricata as this group after init\n");
 #endif /* HAVE_LIBCAP_NG */
     printf("\t--erf-in <path>              : process an ERF file\n");
+#ifdef HAVE_DAG
+    printf("\t--dag <dag0,dag1,...>        : process ERF records from 0,1,...,n DAG input streams\n");
+#endif
     printf("\n");
     printf("\nTo run the engine with default configuration on "
             "interface eth0 with signature file \"signatures.rules\", run the "
@@ -351,6 +355,7 @@ int main(int argc, char **argv)
     uint32_t userid = 0;
     uint32_t groupid = 0;
     char *erf_file = NULL;
+    char *dag_input = NULL;
 
     char *log_dir;
     struct stat buf;
@@ -410,6 +415,7 @@ int main(int argc, char **argv)
         {"user", required_argument, 0, 0},
         {"group", required_argument, 0, 0},
         {"erf-in", required_argument, 0, 0},
+        {"dag", required_argument, 0, 0},
         {NULL, 0, NULL, 0}
     };
 
@@ -531,6 +537,16 @@ int main(int argc, char **argv)
                 run_mode = MODE_ERF_FILE;
                 erf_file = optarg;
             }
+                       else if (strcmp((long_opts[option_index]).name, "dag") == 0) {
+#ifdef HAVE_DAG
+                               run_mode = MODE_DAG;
+                               dag_input = optarg;
+#else
+                               SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required"
+                                               " to receieve packets using --dag.");
+                               exit(EXIT_FAILURE);
+#endif /* HAVE_DAG */
+                       }
             else if(strcmp((long_opts[option_index]).name, "pcap-buffer-size") == 0) {
 #ifdef HAVE_PCAP_SET_BUFF
                 if (ConfSet("pcap.buffer-size", optarg, 0) != 1) {
@@ -775,6 +791,8 @@ int main(int argc, char **argv)
 #endif
     TmModuleReceiveErfFileRegister();
     TmModuleDecodeErfFileRegister();
+    TmModuleReceiveErfDagRegister();
+    TmModuleDecodeErfDagRegister();
     TmModuleDebugList();
 
     /** \todo we need an api for these */
@@ -1015,6 +1033,9 @@ int main(int argc, char **argv)
     else if (run_mode == MODE_ERF_FILE) {
         RunModeErfFileAuto(de_ctx, erf_file);
     }
+    else if (run_mode == MODE_DAG) {
+        RunModeErfDagAuto(de_ctx, dag_input);
+    }
     else {
         SCLogError(SC_ERR_UNKNOWN_RUN_MODE, "Unknown runtime mode. Aborting");
         exit(EXIT_FAILURE);
index ff9d835f3b442a02cbff03c8365665f986be471a..f6e7495da865c9b3a447b36222f9ec1946c682c9 100644 (file)
@@ -60,6 +60,7 @@ enum {
     MODE_IPFW,
     MODE_UNITTEST,
     MODE_ERF_FILE,
+    MODE_DAG,
 };
 
 /* preallocated packet structures here
index aecad59c9d46735c95cd80a310c3ee42d79f6b9b..cdb394369a8180de57c7f3b0098ec839ba17c13c 100644 (file)
@@ -81,6 +81,8 @@ enum {
 #endif
     TMM_RECEIVEERFFILE,
     TMM_DECODEERFFILE,
+    TMM_RECEIVEERFDAG,
+    TMM_DECODEERFDAG,
     TMM_SIZE,
 };
 
index cfe6568bc594fc45461d0183bdfbb56bbb7dbf8b..ffc168b75946fc7498f3cbe92665154a733e2ee0 100644 (file)
@@ -181,6 +181,12 @@ const char * SCErrorToString(SCError err)
         CASE_CODE (SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG);
         CASE_CODE (SC_WARN_FLOW_EMERGENCY);
                CASE_CODE (SC_ERR_SVC);
+        CASE_CODE (SC_ERR_ERF_DAG_OPEN_FAILED);
+        CASE_CODE (SC_ERR_ERF_DAG_STREAM_OPEN_FAILED);
+        CASE_CODE (SC_ERR_ERF_DAG_STREAM_START_FAILED);
+        CASE_CODE (SC_ERR_ERF_DAG_STREAM_SET_FAILED);
+        CASE_CODE (SC_ERR_ERF_DAG_STREAM_READ_FAILED);
+        CASE_CODE (SC_WARN_ERF_DAG_REC_LEN_CHANGED);
 
         default:
             return "UNKNOWN_ERROR";
index 0490c430a90ef4eaa6445d0c8d2d912a8c4ebd25..4a36ae2ea8bf99f867ac969e55cf9ea1a239b2bb 100644 (file)
@@ -189,6 +189,14 @@ typedef enum {
     SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG,
     SC_WARN_FLOW_EMERGENCY,
        SC_ERR_SVC,
+    SC_ERR_ERF_DAG_OPEN_FAILED,
+    SC_ERR_ERF_DAG_STREAM_OPEN_FAILED,
+    SC_ERR_ERF_DAG_STREAM_START_FAILED,
+    SC_ERR_ERF_DAG_STREAM_SET_FAILED,
+    SC_ERR_ERF_DAG_STREAM_READ_FAILED,
+    SC_WARN_ERF_DAG_REC_LEN_CHANGED,
+    SC_ERR_DAG_REQUIRED,
+    SC_ERR_DAG_NOSUPPORT,           /**< no ERF/DAG support compiled in */
     SC_ERR_FATAL,
 
 } SCError;