]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Add TILE-Gx mPIPE packet processing support. 465/head
authorKen Steele <ken@tilera.com>
Fri, 26 Jul 2013 17:22:19 +0000 (13:22 -0400)
committerKen Steele <ken@tilera.com>
Fri, 26 Jul 2013 17:22:19 +0000 (13:22 -0400)
The TILE-Gx processor includes a packet processing engine, called
mPIPE, that can deliver packets directly into user space memory. It
handles buffer allocation and load balancing (either static 5-tuple
hashing, or dynamic flow affinity hashing are used here). The new
packet source code is in source-mpipe.c and source-mpipe.h

A new Tile runmode is added that configures the Suricata pipelines in
worker mode, where each thread does the entire packet processing
pipeline.  It scales across all the Gx chips sizes of 9, 16, 36 or 72
cores. The new runmode is in runmode-tile.c and runmode-tile.h

The configure script detects the TILE-Gx architecture and defines
HAVE_MPIPE, which is then used to conditionally enable the code to
support mPIPE packet processing. Suricata runs on TILE-Gx even without
mPIPE support enabled.

The Suricata Packet structures are allocated by the mPIPE hardware by
allocating the Suricata Packet structure immediatley before the mPIPE
packet buffer and then pushing the mPIPE packet buffer pointer onto
the mPIPE buffer stack.  This way, mPIPE writes the packet data into
the buffer, returns the mPIPE packet buffer pointer, which is then
converted into a Suricata Packet pointer for processing inside
Suricata. When the Packet is freed, the buffer is returned to mPIPE's
buffer stack, by setting ReleasePacket to an mPIPE release specific
function.

The code checks for the largest Huge page available in Linux when
Suricata is started. TILE-Gx supports Huge pages sizes of 16MB, 64MB,
256MB, 1GB and 4GB. Suricata then divides one of those page into
packet buffers for mPIPE.

The code is not yet optimized for high performance. Performance
improvements will follow shortly.

The code was originally written by Tom Decanio and then further
modified by Tilera.

This code has been tested with Tilera's Multicore Developement
Environment (MDE) version 4.1.5. The TILEncore-Gx36 (PCIe card) and
TILEmpower-Gx (1U Rack mount).

src/Makefile.am
src/decode.h
src/runmode-tile.c [new file with mode: 0644]
src/runmode-tile.h [new file with mode: 0644]
src/runmodes.c
src/runmodes.h
src/source-mpipe.c [new file with mode: 0644]
src/source-mpipe.h [new file with mode: 0644]
src/suricata.c
src/tm-threads-common.h
suricata.yaml.in

index d879f4a560413fd045a6472a8bc2645c5f23350b..42d0d0d885bc21c18b6d675292052387752327c0 100644 (file)
@@ -221,11 +221,13 @@ runmode-pcap.c runmode-pcap.h \
 runmode-pcap-file.c runmode-pcap-file.h \
 runmode-pfring.c runmode-pfring.h \
 runmode-unix-socket.c runmode-unix-socket.h \
+runmode-tile.c runmode-tile.h \
 runmodes.c runmodes.h \
 source-af-packet.c source-af-packet.h \
 source-erf-dag.c source-erf-dag.h \
 source-erf-file.c source-erf-file.h \
 source-ipfw.c source-ipfw.h \
+source-mpipe.c source-mpipe.h \
 source-napatech.c source-napatech.h \
 source-nfq.c source-nfq.h \
 source-pcap.c source-pcap.h \
index 3a31d74250af2f1ea16ee73f13f86b1561f91381..9e4420c93fc96dbfa42b86fb53c83d95a01794ad 100644 (file)
@@ -59,6 +59,7 @@ enum PktSrcEnum {
 #include "source-ipfw.h"
 #include "source-pcap.h"
 #include "source-af-packet.h"
+#include "source-mpipe.h"
 
 #include "action-globals.h"
 
@@ -394,6 +395,10 @@ typedef struct Packet_
 #ifdef AF_PACKET
         AFPPacketVars afp_v;
 #endif
+#ifdef HAVE_MPIPE
+        /* tilegx mpipe stuff */
+        MpipePacketVars mpipe_v;
+#endif
 
         /** libpcap vars: shared by Pcap Live mode and Pcap File mode */
         PcapPacketVars pcap_v;
@@ -498,7 +503,12 @@ typedef struct Packet_
 #ifdef __SC_CUDA_SUPPORT__
     CudaPacketVars cuda_pkt_vars;
 #endif
-} Packet;
+}
+#ifdef HAVE_MPIPE
+    /* mPIPE requires packet buffers to be aligned to 128 byte boundaries. */
+    __attribute__((aligned(128)))
+#endif
+Packet;
 
 #define DEFAULT_PACKET_SIZE (1500 + ETHERNET_HEADER_LEN)
 /* storage: maximum ip packet size + link header */
diff --git a/src/runmode-tile.c b/src/runmode-tile.c
new file mode 100644 (file)
index 0000000..3e5e58f
--- /dev/null
@@ -0,0 +1,269 @@
+/* Copyright (C) 2011-2013 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/** 
+ * \file
+ *
+ * \author Tom DeCanio <decanio.tom@gmail.com>
+ * \author Ken Steele, Tilera Corporation <suricata@tilera.com>
+ *
+ * Tilera TILE-Gx runmode support
+ */
+
+#include "suricata-common.h"
+#include "tm-threads.h"
+#include "conf.h"
+#include "runmodes.h"
+#include "runmode-tile.h"
+#include "log-httplog.h"
+#include "output.h"
+#include "source-mpipe.h"
+
+#include "alert-fastlog.h"
+#include "alert-prelude.h"
+#include "alert-unified2-alert.h"
+#include "alert-debuglog.h"
+
+#include "util-debug.h"
+#include "util-time.h"
+#include "util-cpu.h"
+#include "util-affinity.h"
+#include "util-device.h"
+
+#ifdef HAVE_MPIPE
+/* Number of configured parallel pipelines. */
+unsigned int tile_num_pipelines;
+#endif
+
+/*
+ * runmode support for tilegx
+ */
+
+static const char *mpipe_default_mode = "workers";
+
+const char *RunModeTileMpipeGetDefaultMode(void)
+{
+    return mpipe_default_mode;
+}
+
+void RunModeTileMpipeRegister(void)
+{
+#ifdef HAVE_MPIPE
+    RunModeRegisterNewRunMode(RUNMODE_TILERA_MPIPE, "workers",
+                              "Workers tilegx mpipe mode, each thread does all"
+                              " tasks from acquisition to logging",
+                              RunModeTileMpipeWorkers);
+    mpipe_default_mode = "workers";
+#endif
+}
+
+#ifdef HAVE_MPIPE
+
+void *ParseMpipeConfig(const char *iface)
+{
+    ConfNode *if_root;
+    ConfNode *mpipe_node;
+    MpipeIfaceConfig *aconf = SCMalloc(sizeof(*aconf));
+    char *copymodestr;
+    char *out_iface = NULL;
+
+    if (aconf == NULL) {
+        return NULL;
+    }
+
+    if (iface == NULL) {
+        SCFree(aconf);
+        return NULL;
+    }
+
+    strlcpy(aconf->iface, iface, sizeof(aconf->iface));
+
+    /* Find initial node */
+    mpipe_node = ConfGetNode("mpipe.inputs");
+    if (mpipe_node == NULL) {
+        SCLogInfo("Unable to find mpipe config using default value");
+        return aconf;
+    }
+
+    if_root = ConfNodeLookupKeyValue(mpipe_node, "interface", iface);
+    if (if_root == NULL) {
+        SCLogInfo("Unable to find mpipe config for "
+                  "interface %s, using default value",
+                  iface);
+        return aconf;
+    }
+
+    if (ConfGetChildValue(if_root, "copy-iface", &out_iface) == 1) {
+        if (strlen(out_iface) > 0) {
+            aconf->out_iface = out_iface;
+        }
+    }
+    aconf->copy_mode = MPIPE_COPY_MODE_NONE;
+    if (ConfGetChildValue(if_root, "copy-mode", &copymodestr) == 1) {
+        if (aconf->out_iface == NULL) {
+            SCLogInfo("Copy mode activated but no destination"
+                      " iface. Disabling feature");
+        } else if (strlen(copymodestr) <= 0) {
+            aconf->out_iface = NULL;
+        } else if (strcmp(copymodestr, "ips") == 0) {
+            SCLogInfo("MPIPE IPS mode activated %s->%s",
+                      iface,
+                      aconf->out_iface);
+            aconf->copy_mode = MPIPE_COPY_MODE_IPS;
+        } else if (strcmp(copymodestr, "tap") == 0) {
+            SCLogInfo("MPIPE TAP mode activated %s->%s",
+                      iface,
+                      aconf->out_iface);
+            aconf->copy_mode = MPIPE_COPY_MODE_TAP;
+        } else {
+            SCLogInfo("Invalid mode (no in tap, ips)");
+        }
+    }
+    return aconf;
+}
+
+/**
+ * \brief RunModeTileMpipeWorkers set up to process all modules in each thread.
+ *
+ * \param de_ctx pointer to the Detection Engine
+ * \param iface pointer to the name of the interface from which we will
+ *              fetch the packets
+ * \retval 0 if all goes well. (If any problem is detected the engine will
+ *           exit())
+ */
+int RunModeTileMpipeWorkers(DetectEngineCtx *de_ctx)
+{
+    SCEnter();
+    char tname[32];
+    char *thread_name;
+    TmModule *tm_module;
+    int pipe;
+
+    RunModeInitialize();
+
+    /* Available cpus */
+    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
+
+    TimeModeSetLive();
+
+    unsigned int pipe_max = 1;
+    if (ncpus > 1)
+        pipe_max = ncpus - 1;
+
+    intmax_t threads;
+
+    if (ConfGetInt("mpipe.threads", &threads) == 1) {
+        tile_num_pipelines = threads;
+    } else {
+        tile_num_pipelines = pipe_max;
+    }
+    SCLogInfo("%d Tilera worker threads", tile_num_pipelines);
+
+    ReceiveMpipeInit();
+
+    char *mpipe_dev = NULL;
+    int nlive = LiveGetDeviceCount();
+    if (nlive > 0) {
+        SCLogInfo("Using %d live device(s).", nlive);
+        /*mpipe_dev = LiveGetDevice(0);*/
+    } else {
+        /*
+         * Attempt to get interface from config file
+         * overrides -i from command line.
+         */
+        if (ConfGet("mpipe.interface", &mpipe_dev) == 0) {
+            if (ConfGet("mpipe.single_mpipe_dev", &mpipe_dev) == 0) {
+                SCLogError(SC_ERR_RUNMODE, "Failed retrieving "
+                           "mpipe.single_mpipe_dev from Conf");
+                exit(EXIT_FAILURE);
+            }
+        }
+    }
+
+    for (pipe = 0; pipe < tile_num_pipelines; pipe++) {
+        char *mpipe_devc;
+
+        if (nlive > 0) {
+            mpipe_devc = SCStrdup("multi");
+        } else {
+            mpipe_devc = SCStrdup(mpipe_dev);
+        }
+
+        snprintf(tname, sizeof(tname), "Worker%d", pipe+1);
+        thread_name = SCStrdup(tname);
+
+        /* create the threads */
+        ThreadVars *tv_worker =
+             TmThreadCreatePacketHandler(thread_name,
+                                         "packetpool", "packetpool",
+                                         "packetpool", "packetpool", 
+                                         "pktacqloop");
+        if (tv_worker == NULL) {
+            printf("ERROR: TmThreadsCreate failed\n");
+            exit(EXIT_FAILURE);
+        }
+        tm_module = TmModuleGetByName("ReceiveMpipe");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName failed for ReceiveMpipe\n");
+            exit(EXIT_FAILURE);
+        }
+        TmSlotSetFuncAppend(tv_worker, tm_module, (void *)mpipe_devc);
+
+        /* set affinity for worker */
+        int pipe_cpu = pipe + 1;
+        TmThreadSetCPUAffinity(tv_worker, pipe_cpu);
+
+        tm_module = TmModuleGetByName("DecodeMpipe");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName DecodeMpipe failed\n");
+            exit(EXIT_FAILURE);
+        }
+        TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
+
+        tm_module = TmModuleGetByName("StreamTcp");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+            exit(EXIT_FAILURE);
+        }
+        TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
+
+        tm_module = TmModuleGetByName("Detect");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName Detect failed\n");
+            exit(EXIT_FAILURE);
+        }
+        TmSlotSetFuncAppend(tv_worker, tm_module, (void *)de_ctx);
+
+        tm_module = TmModuleGetByName("RespondReject");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName for RespondReject failed\n");
+            exit(EXIT_FAILURE);
+        }
+        TmSlotSetFuncAppend(tv_worker, tm_module, NULL);
+
+        SetupOutputs(tv_worker);
+
+        if (TmThreadSpawn(tv_worker) != TM_ECODE_OK) {
+            printf("ERROR: TmThreadSpawn failed\n");
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    return 0;
+}
+
+#endif
diff --git a/src/runmode-tile.h b/src/runmode-tile.h
new file mode 100644 (file)
index 0000000..f75c054
--- /dev/null
@@ -0,0 +1,41 @@
+/* Copyright (C) 2011-2013 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/** 
+ * \file
+ *
+ * \author Tom DeCanio <decanio.tom@gmail.com>
+ * \author Ken Steele, Tilera Corporation <suricata@tilera.com>
+ *
+ * Tilera TILE-Gx runmode support
+ */
+
+#ifndef __RUNMODE_TILE_H__
+#define __RUNMODE_TILE_H__
+
+#include "suricata-common.h"
+
+const char *RunModeTileMpipeGetDefaultMode(void);
+void RunModeTileMpipeRegister(void);
+
+extern unsigned int tile_num_pipelines;
+
+int RunModeTileMpipeWorkers(DetectEngineCtx *);
+
+void *ParseMpipeConfig(const char *iface);
+
+#endif /* __RUNMODE_TILE_H__ */
index 5109051213d22a523a18451e2b9858e26ab24b14..3e1a648a063f06c45b7fb025be164738d5d1b8a9 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2013 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -116,6 +116,8 @@ static const char *RunModeTranslateModeToName(int runmode)
             return "NAPATECH";
         case RUNMODE_UNITTEST:
             return "UNITTEST";
+        case RUNMODE_TILERA_MPIPE:
+            return "MPIPE";
         case RUNMODE_AFP_DEV:
             return "AF_PACKET_DEV";
         case RUNMODE_UNIX_SOCKET:
@@ -190,6 +192,7 @@ void RunModeRegisterRunModes(void)
     RunModeErfDagRegister();
     RunModeNapatechRegister();
     RunModeIdsAFPRegister();
+    RunModeTileMpipeRegister();
     RunModeUnixSocketRegister();
 #ifdef UNITTESTS
     UtRunModeRegister();
@@ -275,6 +278,9 @@ void RunModeDispatch(int runmode, const char *custom_mode, DetectEngineCtx *de_c
             case RUNMODE_DAG:
                 custom_mode = RunModeErfDagGetDefaultMode();
                 break;
+            case RUNMODE_TILERA_MPIPE:
+                custom_mode = RunModeTileMpipeGetDefaultMode();
+                break;
             case RUNMODE_NAPATECH:
                 custom_mode = RunModeNapatechGetDefaultMode();
                 break;
index 9687ea008393f0d85c5be988bfd587b4c13a21aa..3b0ee46cb327ebd66a28fd7e54776c3c61f41bab 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2013 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -34,6 +34,7 @@ enum {
     RUNMODE_ERF_FILE,
     RUNMODE_DAG,
     RUNMODE_AFP_DEV,
+    RUNMODE_TILERA_MPIPE,
     RUNMODE_UNITTEST,
     RUNMODE_NAPATECH,
     RUNMODE_UNIX_SOCKET,
@@ -56,6 +57,7 @@ void RunModeShutDown(void);
 #include "runmode-pcap.h"
 #include "runmode-pcap-file.h"
 #include "runmode-pfring.h"
+#include "runmode-tile.h"
 #include "runmode-nfq.h"
 #include "runmode-ipfw.h"
 #include "runmode-erf-file.h"
diff --git a/src/source-mpipe.c b/src/source-mpipe.c
new file mode 100644 (file)
index 0000000..b98164e
--- /dev/null
@@ -0,0 +1,1017 @@
+/* Copyright (C) 2011-2013 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Tom DeCanio <decanio.tom@gmail.com>
+ * \author Ken Steele, Tilera Corporation <suricata@tilera.com>
+ *
+ * Tilera TILE-Gx mpipe ingress packet support.
+ */
+
+#include "suricata-common.h"
+#include "suricata.h"
+#include "host.h"
+#include "decode.h"
+#include "packet-queue.h"
+#include "threads.h"
+#include "threadvars.h"
+#include "tm-queuehandlers.h"
+#include "tm-threads.h"
+#include "tm-threads-common.h"
+#include "runmode-tile.h"
+#include "source-mpipe.h"
+#include "conf.h"
+#include "util-debug.h"
+#include "util-error.h"
+#include "util-privs.h"
+#include "util-device.h"
+#include "util-mem.h"
+#include "util-profiling.h"
+#include "tmqh-packetpool.h"
+#include "pkt-var.h"
+
+#ifdef HAVE_MPIPE
+
+#include <mde-version.h>
+#include <tmc/alloc.h>
+#include <arch/sim.h>
+#include <arch/atomic.h>
+#include <arch/cycle.h>
+#include <gxio/mpipe.h>
+#include <gxio/trio.h>
+#include <tmc/cpus.h>
+#include <tmc/spin.h>
+#include <tmc/sync.h>
+#include <tmc/task.h>
+#include <tmc/perf.h>
+#include <arch/sim.h>
+
+/* Align "p" mod "align", assuming "p" is a "void*". */
+#define ALIGN(p, align) do { (p) += -(long)(p) & ((align) - 1); } while(0)
+
+#define VERIFY(VAL, WHAT)                                       \
+    do {                                                        \
+        int __val = (VAL);                                      \
+        if (__val < 0) {                                        \
+            SCLogError(SC_ERR_INVALID_ARGUMENT,(WHAT));         \
+            SCReturnInt(TM_ECODE_FAILED);                       \
+        }                                                       \
+    } while (0)
+
+#define min(a,b) (((a) < (b)) ? (a) : (b))
+
+extern uint8_t suricata_ctl_flags;
+
+/** storage for mpipe device names */
+typedef struct MpipeDevice_ {
+    char *dev;  /**< the device (e.g. "xgbe1") */
+    TAILQ_ENTRY(MpipeDevice_) next;
+} MpipeDevice;
+
+
+/** private device list */
+static TAILQ_HEAD(, MpipeDevice_) mpipe_devices =
+    TAILQ_HEAD_INITIALIZER(mpipe_devices);
+
+static int first_stack;
+static uint32_t headroom = 2;
+
+/**
+ * \brief Structure to hold thread specific variables.
+ */
+typedef struct MpipeThreadVars_
+{
+    ChecksumValidationMode checksum_mode;
+
+    /* counters */
+    uint32_t pkts;
+    uint64_t bytes;
+    uint32_t errs;
+
+    ThreadVars *tv;
+    TmSlot *slot;
+
+    Packet *in_p;
+
+    /** stats/counters */
+    uint16_t max_mpipe_depth;
+    uint16_t mpipe_drop;
+    uint16_t counter_no_buffers_0;
+    uint16_t counter_no_buffers_1;
+    uint16_t counter_no_buffers_2;
+    uint16_t counter_no_buffers_3;
+    uint16_t counter_no_buffers_4;
+    uint16_t counter_no_buffers_5;
+    uint16_t counter_no_buffers_6;
+    uint16_t counter_no_buffers_7;
+
+} MpipeThreadVars;
+
+TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot);
+TmEcode ReceiveMpipeThreadInit(ThreadVars *, void *, void **);
+void ReceiveMpipeThreadExitStats(ThreadVars *, void *);
+
+TmEcode DecodeMpipeThreadInit(ThreadVars *, void *, void **);
+TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+
+#define MAX_CHANNELS 32   /* can probably find this in the MDE */
+
+/*
+ * mpipe configuration.
+ */
+
+/* The mpipe context (shared by all workers) */
+static gxio_mpipe_context_t context_body;
+static gxio_mpipe_context_t* context = &context_body;
+
+/* The ingress queues (one per worker) */
+static gxio_mpipe_iqueue_t** iqueues;
+
+/* The egress queues (one per port) */
+static gxio_mpipe_equeue_t equeue[MAX_CHANNELS];
+
+/* the number of entries in an equeue ring */
+static const int equeue_entries = 8192;
+
+/* Array of mpipe links */
+static gxio_mpipe_link_t mpipe_link[MAX_CHANNELS];
+
+/* Per interface configuration data */
+static MpipeIfaceConfig *mpipe_conf[MAX_CHANNELS];
+
+/* Per interface TAP/IPS configuration */
+
+/* egress equeue associated with each ingress channel */
+static MpipePeerVars channel_to_equeue[MAX_CHANNELS];
+
+/**
+ * \brief Registration Function for ReceiveMpipe.
+ * \todo Unit tests are needed for this module.
+ */
+void TmModuleReceiveMpipeRegister (void)
+{
+    tmm_modules[TMM_RECEIVEMPIPE].name = "ReceiveMpipe";
+    tmm_modules[TMM_RECEIVEMPIPE].ThreadInit = ReceiveMpipeThreadInit;
+    tmm_modules[TMM_RECEIVEMPIPE].Func = NULL;
+    tmm_modules[TMM_RECEIVEMPIPE].PktAcqLoop = ReceiveMpipeLoop;
+    tmm_modules[TMM_RECEIVEMPIPE].ThreadExitPrintStats = ReceiveMpipeThreadExitStats;
+    tmm_modules[TMM_RECEIVEMPIPE].ThreadDeinit = NULL;
+    tmm_modules[TMM_RECEIVEMPIPE].RegisterTests = NULL;
+    tmm_modules[TMM_RECEIVEMPIPE].cap_flags = SC_CAP_NET_RAW;
+    tmm_modules[TMM_RECEIVEMPIPE].flags = TM_FLAG_RECEIVE_TM;
+}
+
+/**
+ * \brief Registraction Function for DecodeNetio.
+ * \todo Unit tests are needed for this module.
+ */
+void TmModuleDecodeMpipeRegister (void)
+{
+    tmm_modules[TMM_DECODEMPIPE].name = "DecodeMpipe";
+    tmm_modules[TMM_DECODEMPIPE].ThreadInit = DecodeMpipeThreadInit;
+    tmm_modules[TMM_DECODEMPIPE].Func = DecodeMpipe;
+    tmm_modules[TMM_DECODEMPIPE].ThreadExitPrintStats = NULL;
+    tmm_modules[TMM_DECODEMPIPE].ThreadDeinit = NULL;
+    tmm_modules[TMM_DECODEMPIPE].RegisterTests = NULL;
+    tmm_modules[TMM_DECODEMPIPE].cap_flags = 0;
+    tmm_modules[TMM_DECODEMPIPE].flags = TM_FLAG_DECODE_TM;
+}
+
+/* Release Packet without sending. */
+void MpipeReleasePacket(Packet *p)
+{
+    gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank];
+    int bucket = p->mpipe_v.idesc.bucket_id;
+    gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
+
+    gxio_mpipe_push_buffer(context,
+                           p->mpipe_v.idesc.stack_idx,
+                           (void*)(intptr_t)p->mpipe_v.idesc.va);
+}
+
+/* Unconditionally send packet, then release packet buffer. */
+void MpipeReleasePacketCopyTap(Packet *p)
+{
+    gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank];
+    int bucket = p->mpipe_v.idesc.bucket_id;
+    gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
+    gxio_mpipe_edesc_t edesc;
+    edesc.words[0] = 0;
+    edesc.words[1] = 0;
+    edesc.bound = 1;
+    edesc.xfer_size = p->mpipe_v.idesc.l2_size;
+    edesc.va = p->mpipe_v.idesc.va;
+    edesc.stack_idx = p->mpipe_v.idesc.stack_idx;
+    edesc.hwb = 1; /* mPIPE will return packet buffer to proper stack. */
+    edesc.size = p->mpipe_v.idesc.size;
+    int channel = p->mpipe_v.idesc.channel;
+    /* Tell mPIPE to egress the packet. */
+    gxio_mpipe_equeue_put(channel_to_equeue[channel].peer_equeue, edesc);
+}
+
+/* Release Packet and send copy if action is not DROP. */
+void MpipeReleasePacketCopyIPS(Packet *p)
+{
+    if (unlikely(PACKET_TEST_ACTION(p, ACTION_DROP))) {
+        /* Return packet buffer without sending the packet. */
+        MpipeReleasePacket(p);
+    } else {
+        /* Send packet */
+        MpipeReleasePacketCopyTap(p);
+    }
+}
+
+/**
+ * \brief Mpipe Packet Process function.
+ *
+ * This function fills in our packet structure from mpipe.
+ * From here the packets are picked up by the  DecodeMpipe thread.
+ *
+ * \param user pointer to MpipeThreadVars passed from pcap_dispatch
+ * \param h pointer to gxio packet header
+ * \param pkt pointer to current packet
+ */
+static inline 
+Packet *MpipeProcessPacket(MpipeThreadVars *ptv, gxio_mpipe_idesc_t *idesc)
+{
+    int caplen = idesc->l2_size;
+    u_char *pkt = gxio_mpipe_idesc_get_va(idesc);
+    Packet *p = (Packet *)(pkt - sizeof(Packet) - headroom/*2*/);
+
+    PACKET_RECYCLE(p);
+    PKT_SET_SRC(p, PKT_SRC_WIRE);
+
+    ptv->bytes += caplen;
+    ptv->pkts++;
+
+    gettimeofday(&p->ts, NULL);
+
+    p->datalink = LINKTYPE_ETHERNET;
+    /* No need to check return value, since the only error is pkt == NULL which can't happen here. */
+    PacketSetData(p, pkt, caplen);
+
+    /* copy only the fields we use later */
+    p->mpipe_v.idesc.bucket_id = idesc->bucket_id;
+    p->mpipe_v.idesc.nr = idesc->nr;
+    p->mpipe_v.idesc.cs = idesc->cs;
+    p->mpipe_v.idesc.va = idesc->va;
+    p->mpipe_v.idesc.stack_idx = idesc->stack_idx;
+    MpipePeerVars *equeue_info = &channel_to_equeue[idesc->channel];
+    if (equeue_info->copy_mode != MPIPE_COPY_MODE_NONE) {
+        p->mpipe_v.idesc.size = idesc->size;
+        p->mpipe_v.idesc.l2_size = idesc->l2_size;
+        p->mpipe_v.idesc.channel = idesc->channel;
+        p->ReleasePacket = equeue_info->ReleasePacket;
+    } else {
+        p->ReleasePacket = MpipeReleasePacket;
+    }
+
+    if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE)
+        p->flags |= PKT_IGNORE_CHECKSUM;
+
+    return p;
+}
+
+static uint16_t XlateStack(MpipeThreadVars *ptv, int stack_idx)
+{
+    switch(stack_idx - first_stack) {
+    case 0:
+        return ptv->counter_no_buffers_0;
+    case 1:
+        return ptv->counter_no_buffers_1;
+    case 2:
+        return ptv->counter_no_buffers_2;
+    case 3:
+        return ptv->counter_no_buffers_3;
+    case 4:
+        return ptv->counter_no_buffers_4;
+    case 5:
+        return ptv->counter_no_buffers_5;
+    case 6:
+        return ptv->counter_no_buffers_6;
+    case 7:
+        return ptv->counter_no_buffers_7;
+    default:
+        return ptv->counter_no_buffers_7;
+    }
+}
+
+/**
+ * \brief Receives packets from an interface via gxio mpipe.
+ */
+TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot)
+{
+    SCEnter();
+
+    MpipeThreadVars *ptv = (MpipeThreadVars *)data;
+    TmSlot *s = (TmSlot *)slot;
+    ptv->slot = s->slot_next;
+    Packet *p = NULL;
+    int cpu = tmc_cpus_get_my_cpu();
+    int rank = cpu - 1;
+    int max_queued = 0;
+    char *ctype;
+
+    ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE;
+    if (ConfGet("mpipe.checksum-checks", &ctype) == 1) {
+        if (strcmp(ctype, "yes") == 0) {
+            ptv->checksum_mode = CHECKSUM_VALIDATION_ENABLE;
+        } else if (strcmp(ctype, "no") == 0)  {
+            ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE;
+        } else {
+            SCLogError(SC_ERR_INVALID_ARGUMENT, 
+                       "Invalid value for checksum-check for mpipe");
+        }
+    }
+
+    gxio_mpipe_iqueue_t* iqueue = iqueues[rank];
+
+    for (;;) {
+        if (suricata_ctl_flags != 0) {
+            break;
+        }
+
+        gxio_mpipe_idesc_t *idesc;
+        int n = gxio_mpipe_iqueue_try_peek(iqueue, &idesc);
+        if (likely(n > 0)) {
+            int i;
+            int m = min(n, 16);
+
+            /* Prefetch the idescs (64 bytes each). */
+            for (i = 0; i < m; i++) {
+                __insn_prefetch(&idesc[i]);
+            }
+            if (unlikely(n > max_queued)) {
+                SCPerfCounterSetUI64(ptv->max_mpipe_depth,
+                                     tv->sc_perf_pca,
+                                     (uint64_t)n);
+                max_queued = n;
+            }
+            for (i = 0; i < m; i++, idesc++) {
+                if (likely(!gxio_mpipe_idesc_has_error(idesc))) {
+                    p = MpipeProcessPacket(ptv, idesc);
+                    p->mpipe_v.rank = rank;
+                    if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
+                        TmqhOutputPacketpool(ptv->tv, p);
+                        SCReturnInt(TM_ECODE_FAILED);
+                    }
+                } else {
+                    if (idesc->be) {
+                        /* Buffer Error - No buffer available, so mPipe
+                         * dropped the packet. */
+                        SCPerfCounterIncr(XlateStack(ptv, idesc->stack_idx),
+                                          tv->sc_perf_pca);
+                    } else {
+                        /* Bad packet. CRC error */
+                        SCPerfCounterIncr(ptv->mpipe_drop, tv->sc_perf_pca);
+                        gxio_mpipe_iqueue_drop(iqueue, idesc);
+                    }
+                    gxio_mpipe_iqueue_release(iqueue, idesc);
+                }
+            }
+            /* Move forward M packets in ingress ring. */
+            gxio_mpipe_iqueue_advance(iqueue, m);
+        }
+        SCPerfSyncCountersIfSignalled(tv, 0);
+    }
+    SCReturnInt(TM_ECODE_OK);
+}
+
+static void MpipeRegisterPerfCounters(MpipeThreadVars *ptv, ThreadVars *tv)
+{
+    /* register counters */
+    ptv->max_mpipe_depth = SCPerfTVRegisterCounter("mpipe.max_mpipe_depth",
+                                                    tv,
+                                                    SC_PERF_TYPE_UINT64,
+                                                    "NULL");
+    ptv->mpipe_drop = SCPerfTVRegisterCounter("mpipe.drop",
+                                              tv,
+                                              SC_PERF_TYPE_UINT64,
+                                              "NULL");
+    ptv->counter_no_buffers_0 = SCPerfTVRegisterCounter("mpipe.no_buf0", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_1 = SCPerfTVRegisterCounter("mpipe.no_buf1", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_2 = SCPerfTVRegisterCounter("mpipe.no_buf2", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_3 = SCPerfTVRegisterCounter("mpipe.no_buf3", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_4 = SCPerfTVRegisterCounter("mpipe.no_buf4", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_5 = SCPerfTVRegisterCounter("mpipe.no_buf5", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_6 = SCPerfTVRegisterCounter("mpipe.no_buf6", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    ptv->counter_no_buffers_7 = SCPerfTVRegisterCounter("mpipe.no_buf7", tv,
+                                                        SC_PERF_TYPE_UINT64,
+                                                        "NULL");
+    tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx);
+    SCPerfAddToClubbedTMTable(tv->name, &tv->sc_perf_pctx);
+}
+
+static const gxio_mpipe_buffer_size_enum_t gxio_buffer_sizes[] = {
+    GXIO_MPIPE_BUFFER_SIZE_128,
+    GXIO_MPIPE_BUFFER_SIZE_256,
+    GXIO_MPIPE_BUFFER_SIZE_512,
+    GXIO_MPIPE_BUFFER_SIZE_1024,
+    GXIO_MPIPE_BUFFER_SIZE_1664,
+    GXIO_MPIPE_BUFFER_SIZE_4096,
+    GXIO_MPIPE_BUFFER_SIZE_10368,
+    GXIO_MPIPE_BUFFER_SIZE_16384
+};
+
+static const int buffer_sizes[] = {
+    128,
+    256,
+    512,
+    1024,
+    1664,
+    4096,
+    10368,
+    16384
+};
+
+static int NormalizeBufferWeights(float buffer_weights[], int num_weights)
+{
+    int stack_count = 0;
+    /* Count required buffer stacks and normalize weights to sum to 1.0. */
+    float total_weight = 0;
+    for (int i = 0; i < num_weights; i++) {
+        if (buffer_weights[i] != 0) {
+            ++stack_count;
+            total_weight += buffer_weights[i];
+        }
+    }
+    /* Convert each weight to a value between 0 and 1. inclusive. */
+    for (int i = 0; i < num_weights; i++) {
+        if (buffer_weights[i] != 0) {
+            buffer_weights[i] /= total_weight;
+        }
+    }
+
+    SCLogInfo("DEBUG: %u non-zero sized stacks", stack_count);
+    return stack_count;
+}
+
+static TmEcode ReceiveMpipeAllocatePacketBuffers(void)
+{
+    SCEnter();
+    int num_buffers;
+    int result;
+    int total_buffers = 0;
+
+    /* Relative weighting for the number of buffers of each size.
+     */
+    float buffer_weight[] = {
+        0 , /* 128 */
+        4 , /* 256 */
+        0 , /* 512 */
+        0 , /* 1024 */
+        4 , /* 1664 */
+        0 , /* 4096 */
+        0 , /* 10386 */
+        0   /* 16384 */
+    };
+
+    int num_weights = sizeof(buffer_weight)/sizeof(buffer_weight[0]);
+    if (ConfGetNode("mpipe.stack") != NULL) {
+        float weight;
+        for (int i = 0; i < num_weights; i++)
+            buffer_weight[i] = 0;
+        if (ConfGetFloat("mpipe.stack.size128", &weight))
+            buffer_weight[0] = weight;
+        if (ConfGetFloat("mpipe.stack.size256", &weight))
+            buffer_weight[1] = weight;
+        if (ConfGetFloat("mpipe.stack.size512", &weight))
+            buffer_weight[2] = weight;
+        if (ConfGetFloat("mpipe.stack.size1024", &weight))
+            buffer_weight[3] = weight;
+        if (ConfGetFloat("mpipe.stack.size1664", &weight))
+            buffer_weight[4] = weight;
+        if (ConfGetFloat("mpipe.stack.size4096", &weight))
+            buffer_weight[5] = weight;
+        if (ConfGetFloat("mpipe.stack.size10386", &weight))
+            buffer_weight[6] = weight;
+        if (ConfGetFloat("mpipe.stack.size16384", &weight))
+            buffer_weight[7] = weight;
+    }
+
+    int stack_count = NormalizeBufferWeights(buffer_weight, num_weights);
+
+    /* Allocate one of the largest pages to hold our buffer stack,
+     * notif ring, and packets.  First get a bit map of the
+     * available page sizes. */
+    unsigned long available_pagesizes = tmc_alloc_get_pagesizes();
+
+    void *packet_page = NULL;
+    size_t tile_vhuge_size;
+
+    /* Try the largest available page size first to see if any
+     * pages of that size can be allocated. */
+    for (int i = sizeof(available_pagesizes) * 8 - 1; i;  i--) {
+        unsigned long size = 1UL<<i;
+        if (available_pagesizes & size) {
+            tile_vhuge_size = (size_t)size;
+
+            tmc_alloc_t alloc = TMC_ALLOC_INIT;
+            tmc_alloc_set_huge(&alloc);
+            tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_HASH);
+            if (tmc_alloc_set_pagesize_exact(&alloc, tile_vhuge_size) == NULL)
+                continue; // Couldn't get the page size requested
+            packet_page = tmc_alloc_map(&alloc, tile_vhuge_size);
+            if (packet_page)
+                break;
+        }
+    }
+    assert(packet_page);
+    void* packet_mem = packet_page;
+    SCLogInfo("DEBUG: tile_vhuge_size %lu", tile_vhuge_size);
+    /* Allocate one Huge page just to store buffer stacks, since they are 
+     *  only ever accessed by mPipe.
+     */
+    size_t stack_page_size = tmc_alloc_get_huge_pagesize();
+    tmc_alloc_t alloc = TMC_ALLOC_INIT;
+    tmc_alloc_set_huge(&alloc);
+    void *buffer_stack_page = tmc_alloc_map(&alloc, stack_page_size);
+    void *buffer_stack_mem = buffer_stack_page;
+    void *buffer_stack_mem_end = buffer_stack_mem + stack_page_size;
+    assert(buffer_stack_mem);
+
+    /* Allocate buffer stacks. */
+    result = gxio_mpipe_alloc_buffer_stacks(context, stack_count, 0, 0);
+    VERIFY(result, "gxio_mpipe_alloc_buffer_stacks()");
+    int stack = result;
+    first_stack = stack;
+    
+    /* Divide up the Very Huge page into packet buffers. */
+    int i = 0;
+    for (int ss = 0; ss < stack_count; i++, ss++) {
+        /* Skip empty buffer stacks. */
+        for (;buffer_weight[i] == 0; i++) ;
+  
+        int stackidx = first_stack + ss;
+        /* Bytes from the Huge page used for this buffer stack. */
+        size_t packet_buffer_slice = tile_vhuge_size * buffer_weight[i];
+        int buffer_size = buffer_sizes[i];
+        num_buffers = packet_buffer_slice / (buffer_size + sizeof(Packet));
+
+        /* Initialize the buffer stack. Must be aligned mod 64K. */
+        size_t stack_bytes = gxio_mpipe_calc_buffer_stack_bytes(num_buffers);
+        gxio_mpipe_buffer_size_enum_t buf_size = gxio_buffer_sizes[i];
+        result = gxio_mpipe_init_buffer_stack(context, stackidx, buf_size,
+                                              buffer_stack_mem, stack_bytes, 0);
+        VERIFY(result, "gxio_mpipe_init_buffer_stack()");
+        buffer_stack_mem += stack_bytes;
+              
+        /* Buffer stack must be aligned to 64KB page boundary. */
+        ALIGN(buffer_stack_mem, 0x10000);
+        assert(buffer_stack_mem < buffer_stack_mem_end);
+        
+        /* Register the entire huge page of memory which contains all
+         * the buffers.
+         */
+        result = gxio_mpipe_register_page(context, stackidx, packet_page,
+                                          tile_vhuge_size, 0);
+        VERIFY(result, "gxio_mpipe_register_page()");
+        
+        /* And register the memory holding the buffer stack. */
+        result = gxio_mpipe_register_page(context, stackidx, 
+                                          buffer_stack_page,
+                                          stack_page_size, 0);
+        VERIFY(result, "gxio_mpipe_register_page()");
+        
+        total_buffers += num_buffers;
+        
+        SCLogInfo("Adding %d %d byte packet buffers",
+                  num_buffers, buffer_size);
+        
+        /* Push some buffers onto the stack. */
+        for (int j = 0; j < num_buffers; j++) {
+            Packet *p = (Packet *)packet_mem;
+            memset(p, 0, sizeof(Packet));
+            PACKET_INITIALIZE(p);
+          
+            gxio_mpipe_push_buffer(context, stackidx, 
+                                   packet_mem + sizeof(Packet));
+            packet_mem += (sizeof(Packet) + buffer_size);
+        }
+        
+        /* Paranoia. */
+        assert(packet_mem <= packet_page + tile_vhuge_size);
+    }
+    SCLogInfo("%d total packet buffers", total_buffers);
+    SCReturnInt(TM_ECODE_OK);
+}
+
+static TmEcode ReceiveMpipeCreateBuckets(int ring, int num_workers, 
+                                         int *first_bucket, int *num_buckets)
+{
+    SCEnter();
+    int result;
+
+    /* Allocate a NotifGroup. */
+    int group = gxio_mpipe_alloc_notif_groups(context, 1, 0, 0);
+    VERIFY(group, "gxio_mpipe_alloc_notif_groups()");
+
+    intmax_t value = 0;
+    if (ConfGetInt("mpipe.buckets", &value) == 1) {
+        /* range check */
+        if ((value >= 1) && (value <= 4096)) {
+            *num_buckets = (int) value;
+        } else {
+            SCLogError(SC_ERR_INVALID_ARGUMENT, "Illegal mpipe.buckets value.");
+        }
+    }
+
+    /* Allocate buckets. */
+    *first_bucket = gxio_mpipe_alloc_buckets(context, *num_buckets, 0, 0);
+    if (*first_bucket == GXIO_MPIPE_ERR_NO_BUCKET) {
+        SCLogError(SC_ERR_INVALID_ARGUMENT,
+                   "Could not allocate mpipe buckets. "
+                   "Try a smaller mpipe.buckets value in suricata.yaml");
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    /* Init group and buckets, preserving packet order among flows. */
+    gxio_mpipe_bucket_mode_t mode = GXIO_MPIPE_BUCKET_STATIC_FLOW_AFFINITY;
+    char *balance;
+    if (ConfGet("mpipe.load-balance", &balance) == 1) {
+        if (balance) {
+            if (strcmp(balance, "static") == 0) {
+                mode = GXIO_MPIPE_BUCKET_STATIC_FLOW_AFFINITY;
+                SCLogInfo("Using \"static\" flow affinity.");
+            } else if (strcmp(balance, "dynamic") == 0) {
+                mode = GXIO_MPIPE_BUCKET_DYNAMIC_FLOW_AFFINITY;
+                SCLogInfo("Using \"dynamic\" flow affinity.");
+            } else {
+                SCLogWarning(SC_ERR_INVALID_ARGUMENT, 
+                             "Illegal load balancing mode %s using \"static\"",
+                             balance);
+            }
+        }
+    }
+    result = gxio_mpipe_init_notif_group_and_buckets(context, group,
+                                                     ring, num_workers,
+                                                     *first_bucket, 
+                                                     *num_buckets, 
+                                                     mode);
+    VERIFY(result, "gxio_mpipe_init_notif_group_and_buckets()");
+    
+    SCReturnInt(TM_ECODE_OK);
+}
+
+/* \brief Register mPIPE classifier rules to start receiving packets.
+ *
+ * \param Index of the first classifier bucket
+ * \param Number of classifier buckets.
+ *
+ * \return result code where <0 is an error.
+ */
+static int ReceiveMpipeRegisterRules(int bucket, int num_buckets)
+{
+    /* Register for packets. */
+    gxio_mpipe_rules_t rules;
+    gxio_mpipe_rules_init(&rules, context);
+    gxio_mpipe_rules_begin(&rules, bucket, num_buckets, NULL);
+    return gxio_mpipe_rules_commit(&rules);
+}
+
+/* \brief Initialize on MPIPE egress port
+ *
+ * Initialize one mPIPE egress port for use in IPS mode.
+ * The port must be one of the input ports.
+ *
+ * \param name of interface to open
+ * \param Array of port configuations
+ *
+ * \return Output port channel number, or -1 on error
+ */
+static int MpipeReceiveOpenEgress(char *out_iface, int iface_idx,
+                                  int copy_mode,
+                                  MpipeIfaceConfig *mpipe_conf[])
+{
+    int channel;
+    int nlive = LiveGetDeviceCount();
+    int result;
+
+    /* Initialize an equeue */
+    result = gxio_mpipe_alloc_edma_rings(context, 1, 0, 0);
+    if (result < 0) {
+        SCLogError(SC_ERR_FATAL, "Failed to allocate mPIPE egress ring");
+        return result;
+    }
+    uint32_t ering = result;
+    size_t edescs_size = equeue_entries * sizeof(gxio_mpipe_edesc_t);
+    tmc_alloc_t edescs_alloc = TMC_ALLOC_INIT;
+    tmc_alloc_set_pagesize(&edescs_alloc, edescs_size);
+    void *edescs = tmc_alloc_map(&edescs_alloc, edescs_size);
+    if (edescs == NULL) {
+        SCLogError(SC_ERR_FATAL,
+                   "Failed to allocate egress descriptors");
+        return -1;
+    }
+    /* retrieve channel of outbound interface */
+    for (int j = 0; j < nlive; j++) {
+        if (strcmp(out_iface, mpipe_conf[j]->iface) == 0) {
+            channel = gxio_mpipe_link_channel(&mpipe_link[j]);
+            SCLogInfo("egress link: %s is channel: %d", 
+                      out_iface, channel);
+            result = gxio_mpipe_equeue_init(&equeue[iface_idx],
+                                            context,
+                                            ering,
+                                            channel,
+                                            edescs,
+                                            edescs_size,
+                                            0);
+            if (result < 0) {
+                SCLogError(SC_ERR_FATAL,
+                           "mPIPE Failed to initialize egress queue");
+                return -1;
+            }
+            /* Record the mapping from ingress port to egress port.
+             * The egress information is stored indexed by ingress channel.
+             */
+            channel = gxio_mpipe_link_channel(&mpipe_link[iface_idx]);
+            channel_to_equeue[channel].peer_equeue = &equeue[iface_idx];
+            channel_to_equeue[channel].copy_mode = copy_mode;
+            if (copy_mode == MPIPE_COPY_MODE_IPS)
+                channel_to_equeue[channel].ReleasePacket = MpipeReleasePacketCopyIPS;
+            else
+                channel_to_equeue[channel].ReleasePacket = MpipeReleasePacketCopyTap;
+            
+            SCLogInfo("ingress link: %s is channel: %d copy_mode: %d", 
+                      out_iface, channel, copy_mode);
+
+            return channel;
+        }
+    }
+
+    /* Did not find matching interface name */
+    SCLogError(SC_ERR_INVALID_ARGUMENT, "Could not find egress interface: %s",
+               out_iface);
+    return -1;
+}
+
+TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
+{
+    SCEnter();
+    int cpu = tmc_cpus_get_my_cpu();
+    int rank = (cpu-1); // FIXME: Assumes worker CPUs start at 1.
+    int num_buckets = 4096; 
+    int num_workers = tile_num_pipelines;
+
+    if (initdata == NULL) {
+        SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    MpipeThreadVars *ptv = SCMalloc(sizeof(MpipeThreadVars));
+    if (ptv == NULL)
+        SCReturnInt(TM_ECODE_FAILED);
+
+    memset(ptv, 0, sizeof(MpipeThreadVars));
+
+    ptv->tv = tv;
+
+    int result;
+    char *link_name = (char *)initdata;
+  
+    /* Bind to a single cpu. */
+    cpu_set_t cpus;
+    result = tmc_cpus_get_my_affinity(&cpus);
+    VERIFY(result, "tmc_cpus_get_my_affinity()");
+    result = tmc_cpus_set_my_cpu(tmc_cpus_find_first_cpu(&cpus));
+    VERIFY(result, "tmc_cpus_set_my_cpu()");
+
+    MpipeRegisterPerfCounters(ptv, tv);
+
+    *data = (void *)ptv;
+
+    if (rank != 0)
+        SCReturnInt(TM_ECODE_OK);
+
+    /* Initialize and configure mPIPE, which is only done by one core. */
+
+    if (strcmp(link_name, "multi") == 0) {
+        int nlive = LiveGetDeviceCount();
+        int instance = gxio_mpipe_link_instance(LiveGetDeviceName(0));
+        for (int i = 1; i < nlive; i++) {
+            link_name = LiveGetDeviceName(i);
+            if (gxio_mpipe_link_instance(link_name) != instance) {
+                SCLogError(SC_ERR_INVALID_ARGUMENT, 
+                           "All interfaces not on same mpipe instance");
+                SCReturnInt(TM_ECODE_FAILED);
+            }
+        }
+        gxio_mpipe_init(context, instance);
+        VERIFY(result, "gxio_mpipe_init()");
+        /* open ingress interfaces */
+        for (int i = 0; i < nlive; i++) {
+            link_name = LiveGetDeviceName(i);
+            SCLogInfo("opening interface %s", link_name);
+            result = gxio_mpipe_link_open(&mpipe_link[i], context,
+                                          link_name, 0);
+            VERIFY(result, "gxio_mpipe_link_open()");
+            mpipe_conf[i] = ParseMpipeConfig(link_name);
+        }
+        /* find and open egress interfaces for IPS modes */
+        for (int i = 0; i < nlive; i++) {
+            MpipeIfaceConfig *aconf = mpipe_conf[i];
+            if (aconf != NULL) {
+                if (aconf->copy_mode != MPIPE_COPY_MODE_NONE) {
+                    int channel = MpipeReceiveOpenEgress(aconf->out_iface,
+                                                         i, aconf->copy_mode,
+                                                         mpipe_conf);
+                    if (channel < 0) {
+                        SCReturnInt(TM_ECODE_FAILED);
+                    }
+                }
+            }
+        }
+    } else {
+        SCLogInfo("using single interface %s", (char *)initdata);
+        
+        /* Start the driver. */
+        result = gxio_mpipe_init(context, gxio_mpipe_link_instance(link_name));
+        VERIFY(result, "gxio_mpipe_init()");
+
+        gxio_mpipe_link_t link;
+        result = gxio_mpipe_link_open(&link, context, link_name, 0);
+        VERIFY(result, "gxio_mpipe_link_open()");
+    }
+
+    /* Allocate some ingress queues. */
+    iqueues = SCCalloc(num_workers, sizeof(*iqueues));
+    if (iqueues == NULL)
+        SCReturnInt(TM_ECODE_FAILED);
+
+    /* Allocate some NotifRings. */
+    result = gxio_mpipe_alloc_notif_rings(context,
+                                          num_workers,
+                                          0, 0);
+    VERIFY(result, "gxio_mpipe_alloc_notif_rings()");
+    int ring = result;
+
+    /* Init the NotifRings. */
+    size_t notif_ring_entries = 2048;
+    size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t);
+    for (int i = 0; i < num_workers; i++) {
+        tmc_alloc_t alloc = TMC_ALLOC_INIT;
+        tmc_alloc_set_home(&alloc, 1 + i); // FIXME: static worker to Core mapping
+        if (notif_ring_size > (size_t)getpagesize())
+            tmc_alloc_set_huge(&alloc);
+        int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t);
+        void *iqueue_mem = tmc_alloc_map(&alloc, needed);
+        if (iqueue_mem == NULL)
+            SCReturnInt(TM_ECODE_FAILED);
+
+        gxio_mpipe_iqueue_t *iqueue = iqueue_mem + notif_ring_size;
+        result = gxio_mpipe_iqueue_init(iqueue, context, ring + i,
+                                        iqueue_mem, notif_ring_size, 0);
+        VERIFY(result, "gxio_mpipe_iqueue_init()");
+        iqueues[i] = iqueue;
+    }
+
+    int first_bucket = 0;
+    int rc;
+    rc = ReceiveMpipeCreateBuckets(ring, num_workers,
+                                   &first_bucket, &num_buckets);
+    if (rc != TM_ECODE_OK)
+        SCReturnInt(rc);
+
+    rc = ReceiveMpipeAllocatePacketBuffers();
+    if (rc != TM_ECODE_OK)
+        SCReturnInt(rc);
+
+    result = ReceiveMpipeRegisterRules(first_bucket, num_buckets);
+    if (result < 0) {
+        SCLogError(SC_ERR_INVALID_ARGUMENT,
+                   "Registering mPIPE classifier rules, %s", 
+                   gxio_strerror(result));
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+TmEcode ReceiveMpipeInit(void)
+{
+    SCEnter();
+
+    SCLogInfo("tile_num_pipelines: %d", tile_num_pipelines);
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief This function prints stats to the screen at exit.
+ * \param tv pointer to ThreadVars
+ * \param data pointer that gets cast into NetiohreadVars for ptv
+ */
+void ReceiveMpipeThreadExitStats(ThreadVars *tv, void *data)
+{
+    SCEnter();
+    SCReturn;
+}
+
+TmEcode DecodeMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
+{
+    SCEnter();
+    DecodeThreadVars *dtv = NULL;
+
+    dtv = DecodeThreadVarsAlloc(tv);
+
+    if (dtv == NULL)
+        SCReturnInt(TM_ECODE_FAILED);
+
+    DecodeRegisterPerfCounters(dtv, tv);
+
+    *data = (void *)dtv;
+
+    SCReturnInt(TM_ECODE_OK);
+}
+
+TmEcode DecodeMpipe(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, 
+                    PacketQueue *postq)
+{
+    SCEnter();
+    DecodeThreadVars *dtv = (DecodeThreadVars *)data;
+
+    /* update counters */
+    SCPerfCounterIncr(dtv->counter_pkts, tv->sc_perf_pca);
+    SCPerfCounterIncr(dtv->counter_pkts_per_sec, tv->sc_perf_pca);
+
+    SCPerfCounterAddUI64(dtv->counter_bytes, tv->sc_perf_pca, p->pktlen);
+
+    SCPerfCounterAddUI64(dtv->counter_avg_pkt_size, tv->sc_perf_pca, p->pktlen);
+    SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, p->pktlen);
+
+    /* call the decoder */
+    DecodeEthernet(tv, dtv, p, GET_PKT_DATA(p), GET_PKT_LEN(p), pq);
+    SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ *  \brief Add a mpipe device for monitoring
+ *
+ *  \param dev string with the device name
+ *
+ *  \retval 0 on success.
+ *  \retval -1 on failure.
+ */
+int MpipeLiveRegisterDevice(char *dev)
+{
+    MpipeDevice *nd = SCMalloc(sizeof(MpipeDevice));
+    if (nd == NULL) {
+        return -1;
+    }
+
+    nd->dev = SCStrdup(dev);
+    TAILQ_INSERT_TAIL(&mpipe_devices, nd, next);
+
+    SCLogDebug("Mpipe device \"%s\" registered.", dev);
+    return 0;
+}
+
+/**
+ *  \brief Get the number of registered devices
+ *
+ *  \retval cnt the number of registered devices
+ */
+int MpipeLiveGetDeviceCount(void)
+{
+    int i = 0;
+    MpipeDevice *nd;
+
+    TAILQ_FOREACH(nd, &mpipe_devices, next) {
+        i++;
+    }
+
+    return i;
+}
+
+#endif // HAVE_MPIPE
diff --git a/src/source-mpipe.h b/src/source-mpipe.h
new file mode 100644 (file)
index 0000000..2c335bf
--- /dev/null
@@ -0,0 +1,93 @@
+/* Copyright (C) 2011-2013 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Tom DeCanio <decanio.tom@gmail.com>
+ * \author Ken Steele, Tilera Corporation <suricata@tilera.com>
+ */
+
+#ifndef __SOURCE_MPIPE_H__
+#define __SOURCE_MPIPE_H__
+
+#ifdef HAVE_MPIPE
+
+#include <gxio/mpipe.h>
+#include <tmc/cpus.h>
+
+#define MPIPE_FREE_PACKET(p) MpipeFreePacket((p))
+
+#define MPIPE_COPY_MODE_NONE    0
+#define MPIPE_COPY_MODE_TAP     1
+#define MPIPE_COPY_MODE_IPS     2
+
+#define MPIPE_IFACE_NAME_LENGTH 8
+
+typedef struct MpipeIfaceConfig_
+{
+    char iface[MPIPE_IFACE_NAME_LENGTH];
+    int copy_mode;
+    char *out_iface;
+} MpipeIfaceConfig;
+
+typedef struct MpipePeer_
+{
+    int channel;
+    char iface[MPIPE_IFACE_NAME_LENGTH];
+} MpipePeer;
+
+/* per interface TAP/IPS configuration */
+typedef struct MpipePeerVars_
+{
+    gxio_mpipe_equeue_t *peer_equeue;
+    void (*ReleasePacket)(struct Packet_ *);
+    int copy_mode;
+} MpipePeerVars;
+
+/* per packet Mpipe vars */
+typedef struct MpipePacketVars_
+{
+    /* TileGX mpipe stuff */
+    struct {
+        uint_reg_t channel : 5;
+        uint_reg_t l2_size : 14;
+        uint_reg_t size : 3;
+        uint_reg_t bucket_id : 13;
+        uint_reg_t nr : 1;
+        uint_reg_t cs : 1;
+        uint_reg_t va : 42;
+        uint_reg_t stack_idx : 5;
+    } idesc;
+
+    /* packetpool this was allocated from */   
+    uint8_t rank;
+
+    gxio_mpipe_equeue_t *peer_equeue;
+} MpipePacketVars;
+
+int MpipeLiveRegisterDevice(char *);
+int MpipeLiveGetDeviceCount(void);
+char *MpipeLiveGetDevice(int);
+void MpipeFreePacket(void *arg);
+void TmModuleReceiveMpipeRegister (void);
+void TmModuleDecodeMpipeRegister (void);
+
+TmEcode ReceiveMpipeInit(void);
+
+#endif /* HAVE_MPIPE */
+#endif /* __SOURCE_MPIPE_H__ */
index a0a769fbb24af4e2a2ef2af4925f94f0bd7eaf17..95b1f2937642949043070f33b85d9a203faa0769 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2013 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 #include "source-napatech.h"
 
 #include "source-af-packet.h"
+#include "source-mpipe.h"
 
 #include "respond-reject.h"
 
@@ -557,6 +558,9 @@ void usage(const char *progname)
 #endif
 #ifdef BUILD_UNIX_SOCKET
     printf("\t--unix-socket[=<file>]       : use unix socket to control suricata work\n");
+#endif
+#ifdef HAVE_MPIPE
+    printf("\t--mpipe                      : run with tilegx mpipe interface(s)\n");
 #endif
     printf("\n");
     printf("\nTo run the engine with default configuration on "
@@ -834,6 +838,9 @@ int main(int argc, char **argv)
         {"dag", required_argument, 0, 0},
         {"napatech", 0, 0, 0},
         {"build-info", 0, &build_info, 1},
+#ifdef HAVE_MPIPE
+        {"mpipe", optional_argument, 0, 0},
+#endif
         {NULL, 0, NULL, 0}
     };
 
@@ -1105,6 +1112,25 @@ int main(int argc, char **argv)
                 SCPrintBuildInfo();
                 exit(EXIT_SUCCESS);
             }
+#ifdef HAVE_MPIPE
+            else if(strcmp((long_opts[option_index]).name , "mpipe") == 0) {
+                if (run_mode == RUNMODE_UNKNOWN) {
+                    run_mode = RUNMODE_TILERA_MPIPE;
+                    if (optarg != NULL) {
+                        memset(pcap_dev, 0, sizeof(pcap_dev));
+                        strlcpy(pcap_dev, optarg,
+                                ((strlen(optarg) < sizeof(pcap_dev)) ?
+                                 (strlen(optarg) + 1) : sizeof(pcap_dev)));
+                        LiveRegisterDevice(optarg);
+                    }
+                } else {
+                    SCLogError(SC_ERR_MULTIPLE_RUN_MODE, 
+                               "more than one run mode has been specified");
+                    usage(argv[0]);
+                    exit(EXIT_FAILURE);
+                }
+            }
+#endif
             break;
         case 'c':
             conf_filename = optarg;
@@ -1548,6 +1574,11 @@ int main(int argc, char **argv)
     /* pcap file */
     TmModuleReceivePcapFileRegister();
     TmModuleDecodePcapFileRegister();
+#ifdef HAVE_MPIPE
+    /* mpipe */
+    TmModuleReceiveMpipeRegister();
+    TmModuleDecodeMpipeRegister();
+#endif
     /* af-packet */
     TmModuleReceiveAFPRegister();
     TmModuleDecodeAFPRegister();
@@ -1916,6 +1947,21 @@ int main(int argc, char **argv)
                 exit(EXIT_FAILURE);
             }
         }
+#ifdef HAVE_MPIPE
+    } else if (run_mode == RUNMODE_TILERA_MPIPE) {
+        if (strlen(pcap_dev)) {
+            if (ConfSet("mpipe.single_mpipe_dev", pcap_dev, 0) != 1) {
+                fprintf(stderr, "ERROR: Failed to set mpipe.single_mpipe_dev\n");
+                exit(EXIT_FAILURE);
+            }
+        } else {
+            int ret = LiveBuildDeviceList("mpipe.inputs");
+            if (ret == 0) {
+                fprintf(stderr, "ERROR: No interface found in config for mpipe\n");
+                exit(EXIT_FAILURE);
+            }
+        }
+#endif
 #ifdef HAVE_PFRING
     } else if (run_mode == RUNMODE_PFRING) {
         /* FIXME add backward compat support */
index f5dc03f233232ecdaffac4e4f0151dfea5fdcfc8..a7e8ad7638caee03364532e672754b8ef94af796 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2013 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -73,6 +73,10 @@ typedef enum {
     TMM_RECEIVEAFP,
     TMM_DECODEAFP,
     TMM_ALERTPCAPINFO,
+#ifdef HAVE_MPIPE
+    TMM_RECEIVEMPIPE,
+    TMM_DECODEMPIPE,
+#endif
     TMM_RECEIVENAPATECH,
     TMM_DECODENAPATECH,
     TMM_SIZE,
index 31afdf34847ba5bbca03ee570d5328a39e92752c..769c08b86da9bf2f82d81b239d0faa464a64dabb 100644 (file)
@@ -681,6 +681,30 @@ logging:
       facility: local5
       format: "[%i] <%d> -- "
 
+# Tilera mpipe configuration. for use on Tilera TILE-Gx.
+mpipe:
+
+  # Load balancing mode "static" or "dynamic".
+  load-balance: dynamic
+
+  # List of interfaces we will listen on.
+  inputs:
+  - interface: xgbe2
+  - interface: xgbe3
+  - interface: xgbe4
+
+
+  # Relative weight of memory for packets of each mPipe buffer size.
+  stack:
+    size128: 0
+    size256: 9
+    size512: 0
+    size1024: 0
+    size1664: 7
+    size4096: 0
+    size10386: 0
+    size16384: 0
+
 # PF_RING configuration. for use with native PF_RING support
 # for more info see http://www.ntop.org/PF_RING.html
 pfring: