--- /dev/null
+/* Copyright (C) 2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \ingroup afxdppacket
+ *
+ * @{
+ */
+
+/**
+ * \file
+ *
+ * \author Richard McConnell <richard_mcconnell@rapid7.com>
+ *
+ * AF_XDP socket runmode
+ *
+ */
+#define PCAP_DONT_INCLUDE_PCAP_BPF_H 1
+#define SC_PCAP_DONT_INCLUDE_PCAP_H 1
+#include "suricata-common.h"
+#include "tm-threads.h"
+#include "conf.h"
+#include "runmodes.h"
+#include "runmode-af-xdp.h"
+#include "output.h"
+#include "log-httplog.h"
+#include "detect-engine-mpm.h"
+
+#include "alert-fastlog.h"
+#include "alert-debuglog.h"
+
+#include "flow-bypass.h"
+
+#include "util-conf.h"
+#include "util-debug.h"
+#include "util-time.h"
+#include "util-cpu.h"
+#include "util-affinity.h"
+#include "util-device.h"
+#include "util-runmodes.h"
+#include "util-ioctl.h"
+#include "util-ebpf.h"
+#include "util-byte.h"
+
+#include "source-af-xdp.h"
+
+#ifdef HAVE_AF_XDP
+#include <linux/if_xdp.h>
+#include <linux/if_link.h>
+#include <xdp/xsk.h>
+#endif
+
+const char *RunModeAFXDPGetDefaultMode(void)
+{
+ return "workers";
+}
+
+void RunModeIdsAFXDPRegister(void)
+{
+ RunModeRegisterNewRunMode(
+ RUNMODE_AFXDP_DEV, "single", "Single threaded af-xdp mode", RunModeIdsAFXDPSingle);
+ RunModeRegisterNewRunMode(RUNMODE_AFXDP_DEV, "workers",
+ "Workers af-xdp mode, each thread does all"
+ " tasks from acquisition to logging",
+ RunModeIdsAFXDPWorkers);
+
+ return;
+}
+
+#ifdef HAVE_AF_XDP
+
+#define DEFAULT_BUSY_POLL_TIME 20
+#define DEFAULT_BUSY_POLL_BUDGET 64
+#define DEFAULT_GRO_FLUSH_TIMEOUT 2000000
+#define DEFAULT_NAPI_HARD_IRQS 2
+
+static void AFXDPDerefConfig(void *conf)
+{
+ AFXDPIfaceConfig *pfp = (AFXDPIfaceConfig *)conf;
+ /* Pcap config is used only once but cost of this low. */
+ if (SC_ATOMIC_SUB(pfp->ref, 1) <= 1) {
+ SCFree(pfp);
+ }
+}
+
+static TmEcode ConfigSetThreads(AFXDPIfaceConfig *aconf, const char *entry_str)
+{
+ SCEnter();
+ const char *active_runmode = RunmodeGetActive();
+
+ if (active_runmode && !strcmp("single", active_runmode)) {
+ aconf->threads = 1;
+ SCReturnInt(0);
+ }
+
+ if (entry_str == NULL) {
+ SCLogError(SC_ERR_INVALID_VALUE, "Number of threads for interface \"%s\" not specified",
+ aconf->iface);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ const int nr_queues = GetIfaceRSSQueuesNum(aconf->iface);
+
+ if (strcmp(entry_str, "auto") == 0) {
+
+ const int nr_cores = (int)UtilCpuGetNumProcessorsOnline();
+
+ /* Threads limited to MIN(cores vs queues) */
+ aconf->threads = (nr_cores <= nr_queues) ? nr_cores : nr_queues;
+ const char *sys_type = nr_cores <= nr_queues ? "cores" : "queues";
+
+ SCLogPerf("%u %s, so using %u threads", aconf->threads, sys_type, aconf->threads);
+ SCReturnInt(TM_ECODE_OK);
+ }
+
+ if (StringParseInt32(&aconf->threads, 10, 0, entry_str) < 0) {
+ SCLogError(SC_ERR_INVALID_VALUE,
+ "Threads entry for interface %s contain non-numerical characters - \"%s\"",
+ aconf->iface, entry_str);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (aconf->threads < 0) {
+ SCLogError(SC_ERR_INVALID_VALUE, "Interface %s has a negative number of threads",
+ aconf->iface);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (aconf->threads > nr_queues) {
+ SCLogWarning(SC_WARN_AFXDP_CONF,
+ "Selected threads greater than configured queues, using: %d thread(s)", nr_queues);
+ aconf->threads = nr_queues;
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief extract information from config file
+ *
+ * The returned structure will be freed by the thread init function.
+ * This is thus necessary to copy the structure before giving it
+ * to thread or to reparse the file for each thread (and thus have
+ * new structure.
+ *
+ * \return a AFXDPIfaceConfig corresponding to the interface name
+ */
+static void *ParseAFXDPConfig(const char *iface)
+{
+ const char *confstr = NULL;
+ ConfNode *if_root;
+ ConfNode *if_default = NULL;
+ ConfNode *af_xdp_node = NULL;
+ int conf_val = 0;
+ intmax_t conf_val_int = 0;
+ bool boolval = false;
+
+ if (iface == NULL) {
+ return NULL;
+ }
+
+ AFXDPIfaceConfig *aconf = SCCalloc(1, sizeof(*aconf));
+ if (unlikely(aconf == NULL)) {
+ return NULL;
+ }
+
+ /* default/basic config setup */
+ strlcpy(aconf->iface, iface, sizeof(aconf->iface));
+ aconf->DerefFunc = AFXDPDerefConfig;
+ aconf->threads = 1;
+ aconf->promisc = 1;
+ aconf->enable_busy_poll = true;
+ aconf->busy_poll_time = DEFAULT_BUSY_POLL_TIME;
+ aconf->busy_poll_budget = DEFAULT_BUSY_POLL_BUDGET;
+ aconf->mode = XDP_FLAGS_UPDATE_IF_NOEXIST;
+ aconf->gro_flush_timeout = DEFAULT_GRO_FLUSH_TIMEOUT;
+ aconf->napi_defer_hard_irqs = DEFAULT_NAPI_HARD_IRQS;
+ aconf->mem_alignment = XSK_UMEM__DEFAULT_FLAGS;
+
+ /* Find initial node */
+ af_xdp_node = ConfGetNode("af-xdp");
+ if (af_xdp_node == NULL) {
+ SCLogInfo("unable to find af-xdp config using default values");
+ goto finalize;
+ }
+
+ if_root = ConfFindDeviceConfig(af_xdp_node, iface);
+ if_default = ConfFindDeviceConfig(af_xdp_node, "default");
+
+ if (if_root == NULL && if_default == NULL) {
+ SCLogInfo("unable to find af-xdp config for "
+ "interface \"%s\" or \"default\", using default values",
+ iface);
+ goto finalize;
+ }
+
+ /* If there is no setting for current interface use default one as main iface */
+ if (if_root == NULL) {
+ if_root = if_default;
+ if_default = NULL;
+ }
+
+ /* Threading */
+ confstr = "auto";
+ (void)ConfGetChildValueWithDefault(if_root, if_default, "threads", &confstr);
+ if (ConfigSetThreads(aconf, confstr) != TM_ECODE_OK) {
+ aconf->DerefFunc(aconf);
+ return NULL;
+ }
+
+ SC_ATOMIC_RESET(aconf->ref);
+ (void)SC_ATOMIC_ADD(aconf->ref, aconf->threads);
+
+ /* Promisc Mode */
+ (void)ConfGetChildValueBoolWithDefault(if_root, if_default, "disable-promisc", (int *)&boolval);
+ if (boolval) {
+ SCLogConfig("Disabling promiscuous mode on iface %s", aconf->iface);
+ aconf->promisc = 0;
+ }
+
+#ifdef HAVE_AF_XDP
+ /* AF_XDP socket mode options */
+ if (ConfGetChildValueWithDefault(if_root, if_default, "force-xdp-mode", &confstr) == 1) {
+ if (strncasecmp(confstr, "drv", 3) == 0) {
+ aconf->mode |= XDP_FLAGS_DRV_MODE;
+ } else if (strncasecmp(confstr, "skb", 3) == 0) {
+ aconf->mode |= XDP_FLAGS_SKB_MODE;
+ } else if (strncasecmp(confstr, "none", 4) == 0) {
+ } else {
+ SCLogWarning(SC_WARN_AFXDP_CONF,
+ "Incorrect af-xdp xdp-mode setting, default (none) shall be applied");
+ }
+ }
+
+ /* copy and zerocopy binding options */
+ if (ConfGetChildValueWithDefault(if_root, if_default, "force-bind-mode", &confstr) == 1) {
+ if (strncasecmp(confstr, "zero", 4) == 0) {
+ aconf->bind_flags |= XDP_ZEROCOPY;
+ } else if (strncasecmp(confstr, "copy", 4) == 0) {
+ aconf->bind_flags |= XDP_COPY;
+ } else if (strncasecmp(confstr, "none", 4) == 0) {
+ } else {
+ SCLogWarning(SC_WARN_AFXDP_CONF,
+ "Incorrect af-xdp copy-mode setting, default (none) shall be applied");
+ }
+ }
+
+ /* memory alignment mode selection */
+ if (ConfGetChildValueWithDefault(if_root, if_default, "mem-unaligned", &confstr) == 1) {
+ if (strncasecmp(confstr, "yes", 3) == 0) {
+ aconf->mem_alignment = XDP_UMEM_UNALIGNED_CHUNK_FLAG;
+ }
+ }
+
+ /* Busy polling options */
+ if (ConfGetChildValueBoolWithDefault(if_root, if_default, "enable-busy-poll", &conf_val) == 1) {
+ if (conf_val == 0) {
+ aconf->enable_busy_poll = false;
+ }
+ }
+
+ if (aconf->enable_busy_poll) {
+ if (ConfGetChildValueIntWithDefault(if_root, if_default, "busy-poll-time", &conf_val_int) ==
+ 1) {
+ if (conf_val_int) {
+ aconf->busy_poll_time = conf_val_int;
+ }
+ }
+
+ if (ConfGetChildValueIntWithDefault(
+ if_root, if_default, "busy-poll-budget", &conf_val_int) == 1) {
+ if (conf_val_int) {
+ aconf->busy_poll_budget = conf_val_int;
+ }
+ }
+
+ /* 0 value is valid for these Linux tunable's */
+ if (ConfGetChildValueIntWithDefault(
+ if_root, if_default, "gro-flush-timeout", &conf_val_int) == 1) {
+ aconf->gro_flush_timeout = conf_val_int;
+ }
+
+ if (ConfGetChildValueIntWithDefault(
+ if_root, if_default, "napi-defer-hard-irq", &conf_val_int) == 1) {
+ aconf->napi_defer_hard_irqs = conf_val_int;
+ }
+ }
+#endif
+
+finalize:
+ if (LiveGetOffload() == 0) {
+ if (GetIfaceOffloading(iface, 0, 1) == 1) {
+ SCLogWarning(SC_ERR_NIC_OFFLOADING,
+ "Using AF_XDP with offloading activated leads to capture problems");
+ }
+ } else {
+ DisableIfaceOffloading(LiveGetDevice(iface), 0, 1);
+ }
+
+ return aconf;
+}
+
+static int AFXDPConfigGetThreadsCount(void *conf)
+{
+ if (conf == NULL)
+ FatalError(SC_ERR_AFXDP_CONF, "Configuration file is NULL");
+
+ AFXDPIfaceConfig *afxdp_conf = (AFXDPIfaceConfig *)conf;
+ return afxdp_conf->threads;
+}
+
+#endif /* HAVE_AF_XDP */
+
+/**
+ * \brief Single thread version of the AF_XDP processing.
+ */
+int RunModeIdsAFXDPSingle(void)
+{
+ SCEnter();
+
+#ifdef HAVE_AF_XDP
+ int ret;
+ const char *live_dev = NULL;
+
+ RunModeInitialize();
+ TimeModeSetLive();
+
+ (void)ConfGet("af-xdp.live-interface", &live_dev);
+
+ if (AFXDPQueueProtectionInit() != TM_ECODE_OK) {
+ FatalError(SC_ERR_FATAL, "Unable to init AF_XDP queue protection.");
+ }
+
+ ret = RunModeSetLiveCaptureSingle(ParseAFXDPConfig, AFXDPConfigGetThreadsCount, "ReceiveAFXDP",
+ "DecodeAFXDP", thread_name_single, live_dev);
+ if (ret != 0) {
+ FatalError(SC_ERR_FATAL, "Unable to start runmode");
+ }
+
+ SCLogDebug("RunModeIdsAFXDPSingle initialised");
+
+#endif /* HAVE_AF_XDP */
+ SCReturnInt(0);
+}
+
+/**
+ * \brief Workers version of the AF_XDP processing.
+ *
+ * Start N threads with each thread doing all the work.
+ *
+ */
+int RunModeIdsAFXDPWorkers(void)
+{
+ SCEnter();
+
+#ifdef HAVE_AF_XDP
+ int ret;
+ const char *live_dev = NULL;
+
+ RunModeInitialize();
+ TimeModeSetLive();
+
+ (void)ConfGet("af-xdp.live-interface", &live_dev);
+
+ if (AFXDPQueueProtectionInit() != TM_ECODE_OK) {
+ FatalError(SC_ERR_FATAL, "Unable to init AF_XDP queue protection.");
+ }
+
+ ret = RunModeSetLiveCaptureWorkers(ParseAFXDPConfig, AFXDPConfigGetThreadsCount, "ReceiveAFXDP",
+ "DecodeAFXDP", thread_name_workers, live_dev);
+ if (ret != 0) {
+ FatalError(SC_ERR_FATAL, "Unable to start runmode");
+ }
+
+ SCLogDebug("RunModeIdsAFXDPWorkers initialised");
+
+#endif /* HAVE_AF_XDP */
+ SCReturnInt(0);
+}
+/**
+ * @}
+ */
--- /dev/null
+/* Copyright (C) 2011-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \defgroup afxdppacket AF_XDP running mode
+ *
+ * @{
+ */
+
+/**
+ * \file
+ *
+ * \author Richard McConnell <richard_mcconnell@rapid7.com>
+ *
+ * AF_XDP socket acquisition support
+ *
+ */
+#define PCAP_DONT_INCLUDE_PCAP_BPF_H 1
+#define SC_PCAP_DONT_INCLUDE_PCAP_H 1
+#include "suricata-common.h"
+#include "suricata.h"
+#include "decode.h"
+#include "packet-queue.h"
+#include "threads.h"
+#include "threadvars.h"
+#include "tm-queuehandlers.h"
+#include "tm-modules.h"
+#include "tm-threads.h"
+#include "tm-threads-common.h"
+#include "conf.h"
+#include "util-cpu.h"
+#include "util-datalink.h"
+#include "util-debug.h"
+#include "util-device.h"
+#include "util-ebpf.h"
+#include "util-error.h"
+#include "util-privs.h"
+#include "util-optimize.h"
+#include "util-checksum.h"
+#include "util-ioctl.h"
+#include "util-host-info.h"
+#include "util-sysfs.h"
+#include "tmqh-packetpool.h"
+#include "source-af-xdp.h"
+#include "runmodes.h"
+#include "flow-storage.h"
+#include "util-validate.h"
+
+#ifdef HAVE_AF_XDP
+#include <xdp/xsk.h>
+#include <net/if.h>
+#endif
+
+#if HAVE_LINUX_IF_ETHER_H
+#include <linux/if_ether.h>
+#endif
+
+#ifndef HAVE_AF_XDP
+
+TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **);
+
+void TmModuleReceiveAFXDPRegister(void)
+{
+ tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
+ tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = NoAFXDPSupportExit;
+ tmm_modules[TMM_RECEIVEAFXDP].Func = NULL;
+ tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = NULL;
+ tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = NULL;
+ tmm_modules[TMM_RECEIVEAFXDP].cap_flags = 0;
+ tmm_modules[TMM_RECEIVEAFXDP].flags = TM_FLAG_RECEIVE_TM;
+}
+
+/**
+ * \brief Registration Function for DecodeAFXDP.
+ */
+void TmModuleDecodeAFXDPRegister(void)
+{
+ tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
+ tmm_modules[TMM_DECODEAFXDP].ThreadInit = NoAFXDPSupportExit;
+ tmm_modules[TMM_DECODEAFXDP].Func = NULL;
+ tmm_modules[TMM_DECODEAFXDP].ThreadExitPrintStats = NULL;
+ tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = NULL;
+ tmm_modules[TMM_DECODEAFXDP].cap_flags = 0;
+ tmm_modules[TMM_DECODEAFXDP].flags = TM_FLAG_DECODE_TM;
+}
+
+/**
+ * \brief this function prints an error message and exits.
+ */
+TmEcode NoAFXDPSupportExit(ThreadVars *tv, const void *initdata, void **data)
+{
+ SCLogError(SC_ERR_NO_AF_XDP,
+ "Error creating thread %s: you do not have "
+ "support for AF_XDP enabled, on Linux host please recompile "
+ "with --enable-af-xdp",
+ tv->name);
+ exit(EXIT_FAILURE);
+}
+
+#else /* We have AF_XDP support */
+
+#define POLL_TIMEOUT 100
+#define NUM_FRAMES XSK_RING_PROD__DEFAULT_NUM_DESCS
+#define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
+#define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
+#define RECONNECT_TIMEOUT 500000
+
+/* Interface state */
+enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
+
+struct XskInitProtect {
+ SCMutex queue_protect;
+ SC_ATOMIC_DECLARE(uint8_t, queue_num);
+} xsk_protect;
+
+struct UmemInfo {
+ void *buf;
+ struct xsk_umem *umem;
+ struct xsk_ring_prod fq;
+ struct xsk_ring_cons cq;
+ struct xsk_umem_config cfg;
+ int mmap_alignment_flag;
+};
+
+struct QueueAssignment {
+ uint32_t queue_num;
+ bool assigned;
+};
+
+struct XskSockInfo {
+ struct xsk_ring_cons rx;
+ struct xsk_ring_prod tx;
+ struct xsk_socket *xsk;
+
+ /* Queue assignment structure */
+ struct QueueAssignment queue;
+
+ /* Configuration items */
+ struct xsk_socket_config cfg;
+ bool enable_busy_poll;
+ uint32_t busy_poll_time;
+ uint32_t busy_poll_budget;
+
+ struct pollfd fd;
+};
+
+/**
+ * \brief Structure to hold thread specific variables.
+ */
+typedef struct AFXDPThreadVars_ {
+ ThreadVars *tv;
+ TmSlot *slot;
+ LiveDevice *livedev;
+
+ /* thread specific socket */
+ int promisc;
+ int threads;
+
+ char iface[AFXDP_IFACE_NAME_LENGTH];
+ uint32_t ifindex;
+
+ /* AF_XDP stucture */
+ struct UmemInfo umem;
+ struct XskSockInfo xsk;
+ uint32_t gro_flush_timeout;
+ uint32_t napi_defer_hard_irqs;
+ uint32_t prog_id;
+
+ /* Handle state */
+ uint8_t afxdp_state;
+
+ /* Stats parameters */
+ uint64_t pkts;
+ uint64_t bytes;
+ uint16_t capture_afxdp_packets;
+ uint16_t capture_kernel_drops;
+ uint16_t capture_afxdp_poll;
+ uint16_t capture_afxdp_poll_timeout;
+ uint16_t capture_afxdp_poll_failed;
+ uint16_t capture_afxdp_empty_reads;
+ uint16_t capture_afxdp_failed_reads;
+ uint16_t capture_afxdp_acquire_pkt_failed;
+} AFXDPThreadVars;
+
+static TmEcode ReceiveAFXDPThreadInit(ThreadVars *, const void *, void **);
+static void ReceiveAFXDPThreadExitStats(ThreadVars *, void *);
+static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *, void *);
+static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot);
+
+static TmEcode DecodeAFXDPThreadInit(ThreadVars *, const void *, void **);
+static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data);
+static TmEcode DecodeAFXDP(ThreadVars *, Packet *, void *);
+
+/**
+ * \brief Registration Function for RecieveAFXDP.
+ * \todo Unit tests are needed for this module.
+ */
+void TmModuleReceiveAFXDPRegister(void)
+{
+ tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
+ tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = ReceiveAFXDPThreadInit;
+ tmm_modules[TMM_RECEIVEAFXDP].Func = NULL;
+ tmm_modules[TMM_RECEIVEAFXDP].PktAcqLoop = ReceiveAFXDPLoop;
+ tmm_modules[TMM_RECEIVEAFXDP].PktAcqBreakLoop = NULL;
+ tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = ReceiveAFXDPThreadExitStats;
+ tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = ReceiveAFXDPThreadDeinit;
+ tmm_modules[TMM_RECEIVEAFXDP].cap_flags = SC_CAP_NET_RAW;
+ tmm_modules[TMM_RECEIVEAFXDP].flags = TM_FLAG_RECEIVE_TM;
+}
+
+/**
+ * \brief Registration Function for DecodeAFXDP.
+ * \todo Unit tests are needed for this module.
+ */
+void TmModuleDecodeAFXDPRegister(void)
+{
+ tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
+ tmm_modules[TMM_DECODEAFXDP].ThreadInit = DecodeAFXDPThreadInit;
+ tmm_modules[TMM_DECODEAFXDP].Func = DecodeAFXDP;
+ tmm_modules[TMM_DECODEAFXDP].ThreadExitPrintStats = NULL;
+ tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = DecodeAFXDPThreadDeinit;
+ tmm_modules[TMM_DECODEAFXDP].cap_flags = 0;
+ tmm_modules[TMM_DECODEAFXDP].flags = TM_FLAG_DECODE_TM;
+}
+
+static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
+{
+ struct xdp_statistics stats;
+ socklen_t len = sizeof(struct xdp_statistics);
+ int fd = xsk_socket__fd(ptv->xsk.xsk);
+
+ if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &len) >= 0) {
+ uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
+
+ StatsAddUI64(ptv->tv, ptv->capture_kernel_drops,
+ rx_dropped - StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
+ StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts);
+
+ (void)SC_ATOMIC_SET(ptv->livedev->drop, rx_dropped);
+ (void)SC_ATOMIC_ADD(ptv->livedev->pkts, ptv->pkts);
+
+ SCLogDebug("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "",
+ ptv->tv->name, StatsGetLocalCounterValue(ptv->tv, ptv->capture_afxdp_packets),
+ ptv->bytes, StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
+
+ ptv->pkts = 0;
+ }
+}
+
+/**
+ * \brief Init function for socket creation.
+ *
+ * Mutex used to synchonise initialisation - each socket opens a
+ * different queue. The specific order in which each queue is
+ * opened is not important, but it is vital the queue_num's
+ * are different.
+ *
+ * \param tv pointer to ThreadVars
+ */
+TmEcode AFXDPQueueProtectionInit(void)
+{
+ SCEnter();
+
+ SCMutexInit(&xsk_protect.queue_protect, NULL);
+ SC_ATOMIC_SET(xsk_protect.queue_num, 0);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+void AFXDPMutexClean(void)
+{
+ SCMutexDestroy(&xsk_protect.queue_protect);
+}
+
+static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
+{
+ if (ptv->xsk.queue.assigned == false) {
+ ptv->xsk.queue.queue_num = SC_ATOMIC_GET(xsk_protect.queue_num);
+ SC_ATOMIC_ADD(xsk_protect.queue_num, 1);
+
+ /* Queue only needs assigned once, on startup */
+ ptv->xsk.queue.assigned = true;
+ }
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
+{
+ SCMutexLock(&xsk_protect.queue_protect);
+ if ((ptv->threads - 1) == (int)ptv->xsk.queue.queue_num) {
+ SCLogDebug("All AF_XDP capture threads are running.");
+ }
+ SCMutexUnlock(&xsk_protect.queue_protect);
+}
+
+static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
+{
+ int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
+ ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
+
+ if (ptv->umem.buf == MAP_FAILED) {
+ SCLogError(SC_ERR_MEM_ALLOC, "mmap: failed to acquire memory");
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
+{
+ if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
+ &ptv->umem.cfg)) {
+ SCLogError(SC_ERR_AFXDP_CREATE, "failed to create umem: %s", strerror(errno));
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static TmEcode InitFillRing(AFXDPThreadVars *ptv, const uint32_t cnt)
+{
+ uint32_t idx_fq = 0;
+
+ uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq, cnt, &idx_fq);
+ if (ret != cnt) {
+ SCLogError(SC_ERR_AFXDP_INIT, "Failed to initialise the fill ring.");
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ for (uint32_t i = 0; i < cnt; i++) {
+ *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
+ }
+
+ xsk_ring_prod__submit(&ptv->umem.fq, cnt);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief Linux knobs are tuned to enable a NAPI polling context
+ *
+ * \param tv pointer to AFXDPThreadVars
+ */
+static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
+{
+ char fname[SYSFS_MAX_FILENAME_SIZE];
+
+ if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/gro_flush_timeout", ptv->iface) <
+ 0) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (SysFsWriteValue(fname, ptv->gro_flush_timeout) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/napi_defer_hard_irqs", ptv->iface) <
+ 0) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (SysFsWriteValue(fname, ptv->napi_defer_hard_irqs) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
+{
+ const int fd = xsk_socket__fd(ptv->xsk.xsk);
+ int sock_opt = 1;
+
+ if (!ptv->xsk.enable_busy_poll) {
+ SCReturnInt(TM_ECODE_OK);
+ }
+
+ /* Kernel version must be >= 5.11 to avail of SO_PREFER_BUSY_POLL
+ * see linux commit: 7fd3253a7de6a317a0683f83739479fb880bffc8
+ */
+ if (!SCKernelVersionIsAtLeast(5, 11)) {
+ SCLogWarning(SC_WARN_AFXDP_CONF,
+ "Kernel version older than required: v5.11,"
+ " upgrade kernel version to use 'enable-busy-poll' option.");
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (WriteLinuxTunables(ptv) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ sock_opt = ptv->xsk.busy_poll_time;
+ if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ sock_opt = ptv->xsk.busy_poll_budget;
+ if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static void AFXDPSwitchState(AFXDPThreadVars *ptv, int state)
+{
+ ptv->afxdp_state = state;
+}
+
+static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
+{
+ int ret;
+
+ SCMutexLock(&xsk_protect.queue_protect);
+
+ if (AFXDPAssignQueueID(ptv) != TM_ECODE_OK) {
+ SCLogError(SC_ERR_SOCKET, "Failed to assign queue ID");
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
+ ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
+ SCLogError(SC_ERR_SOCKET, "Failed to create socket: %s", strerror(-ret));
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ SCLogDebug("bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
+
+ /* For polling and socket options */
+ ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
+ ptv->xsk.fd.events = POLLIN;
+
+ /* Set state */
+ AFXDPSwitchState(ptv, AFXDP_STATE_UP);
+
+ SCMutexUnlock(&xsk_protect.queue_protect);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
+{
+ if (ptv->xsk.xsk) {
+ xsk_socket__delete(ptv->xsk.xsk);
+ ptv->xsk.xsk = NULL;
+ }
+
+ if (ptv->umem.umem) {
+ xsk_umem__delete(ptv->umem.umem);
+ ptv->umem.umem = NULL;
+ }
+
+ memset(&ptv->umem.fq, 0, sizeof(struct xsk_ring_prod));
+ memset(&ptv->umem.cq, 0, sizeof(struct xsk_ring_cons));
+}
+
+static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
+{
+ if (ConfigureXSKUmem(ptv) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (InitFillRing(ptv, NUM_FRAMES * 2) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ /* Open AF_XDP socket */
+ if (OpenXSKSocket(ptv) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (ConfigureBusyPolling(ptv) != TM_ECODE_OK) {
+ SCLogWarning(SC_WARN_AFXDP_CONF, "Failed to configure busy polling"
+ " performance may be reduced.");
+ }
+
+ /* Has the eBPF program successfully bound? */
+ if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
+ SCLogError(SC_ERR_BPF, "Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief Try to reopen AF_XDP socket
+ *
+ * \retval: TM_ECODE_OK in case of success
+ * TM_ECODE_FAILED if error occurs or a condition is not met.
+ */
+static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
+{
+ AFXDPCloseSocket(ptv);
+ usleep(RECONNECT_TIMEOUT);
+
+ int if_flags = GetIfaceFlags(ptv->iface);
+ if (if_flags == -1) {
+ SCLogDebug("Couldn't get flags for interface '%s'", ptv->iface);
+ goto sock_err;
+ } else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
+ SCLogDebug("Interface '%s' is down", ptv->iface);
+ goto sock_err;
+ }
+
+ if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCLogInfo("Interface '%s' is back", ptv->iface);
+ SCReturnInt(TM_ECODE_OK);
+
+sock_err:
+ SCReturnInt(TM_ECODE_FAILED);
+}
+
+/**
+ * \brief Write packet entry to the fill ring, freeing
+ * this slot for re/fill with inbound packet descriptor
+ * \param pointer to Packet
+ * \retval: None
+ */
+static void AFXDPReleasePacket(Packet *p)
+{
+ *xsk_ring_prod__fill_addr((struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
+ p->afxdp_v.orig;
+
+ PacketFreeOrRelease(p);
+}
+
+static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
+{
+ int stats_dumped = 0;
+ time_t current_time = time(NULL);
+
+ if (current_time != *last_dump) {
+ AFXDPDumpCounters(ptv);
+ *last_dump = current_time;
+ stats_dumped = 1;
+ }
+
+ StatsSyncCountersIfSignalled(ptv->tv);
+
+ return stats_dumped;
+}
+
+static inline ssize_t WakeupSocket(void *data)
+{
+ ssize_t res = 0;
+ AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
+
+ /* Assuming kernel >= 5.11 in use if xdp_busy_poll is enabled */
+ if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
+ res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
+ }
+
+ return res;
+}
+
+/**
+ * \brief Init function for ReceiveAFXDP.
+ *
+ * \param tv pointer to ThreadVars
+ * \param initdata pointer to the interface passed from the user
+ * \param data pointer gets populated with AFPThreadVars
+ *
+ * \todo Create a general AFP setup function.
+ */
+static TmEcode ReceiveAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
+{
+ SCEnter();
+
+ AFXDPIfaceConfig *afxdpconfig = (AFXDPIfaceConfig *)initdata;
+
+ if (initdata == NULL) {
+ SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ AFXDPThreadVars *ptv = SCMalloc(sizeof(AFXDPThreadVars));
+ if (unlikely(ptv == NULL)) {
+ afxdpconfig->DerefFunc(afxdpconfig);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ memset(ptv, 0, sizeof(AFXDPThreadVars));
+
+ ptv->tv = tv;
+
+ strlcpy(ptv->iface, afxdpconfig->iface, AFXDP_IFACE_NAME_LENGTH);
+ ptv->iface[AFXDP_IFACE_NAME_LENGTH - 1] = '\0';
+ ptv->ifindex = if_nametoindex(ptv->iface);
+
+ ptv->livedev = LiveGetDevice(ptv->iface);
+ if (ptv->livedev == NULL) {
+ SCLogError(SC_ERR_INVALID_VALUE, "Unable to find Live device");
+ SCFree(ptv);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ ptv->promisc = afxdpconfig->promisc;
+ if (ptv->promisc != 0) {
+ /* Force promiscuous mode */
+ if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
+ SCLogError(SC_ERR_AFXDP_CREATE,
+ "Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
+ strerror(errno));
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ }
+
+ ptv->threads = afxdpconfig->threads;
+
+ /* Socket configuration */
+ ptv->xsk.cfg.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
+ ptv->xsk.cfg.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS;
+ ptv->xsk.cfg.xdp_flags = afxdpconfig->mode;
+ ptv->xsk.cfg.bind_flags = afxdpconfig->bind_flags;
+
+ /* UMEM configuration */
+ ptv->umem.cfg.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2;
+ ptv->umem.cfg.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
+ ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
+ ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
+ ptv->umem.cfg.flags = afxdpconfig->mem_alignment;
+
+ /* Use hugepages if unaligned chunk mode */
+ if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
+ ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
+ }
+
+ /* Busy polling configuration */
+ ptv->xsk.enable_busy_poll = afxdpconfig->enable_busy_poll;
+ ptv->xsk.busy_poll_budget = afxdpconfig->busy_poll_budget;
+ ptv->xsk.busy_poll_time = afxdpconfig->busy_poll_time;
+ ptv->gro_flush_timeout = afxdpconfig->gro_flush_timeout;
+ ptv->napi_defer_hard_irqs = afxdpconfig->napi_defer_hard_irqs;
+
+ /* Stats registration */
+ ptv->capture_afxdp_packets = StatsRegisterCounter("capture.afxdp_packets", ptv->tv);
+ ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", ptv->tv);
+ ptv->capture_afxdp_poll = StatsRegisterCounter("capture.afxdp.poll", ptv->tv);
+ ptv->capture_afxdp_poll_timeout = StatsRegisterCounter("capture.afxdp.poll_timeout", ptv->tv);
+ ptv->capture_afxdp_poll_failed = StatsRegisterCounter("capture.afxdp.poll_failed", ptv->tv);
+ ptv->capture_afxdp_empty_reads = StatsRegisterCounter("capture.afxdp.empty_reads", ptv->tv);
+ ptv->capture_afxdp_failed_reads = StatsRegisterCounter("capture.afxdp.failed_reads", ptv->tv);
+ ptv->capture_afxdp_acquire_pkt_failed =
+ StatsRegisterCounter("capture.afxdp.acquire_pkt_failed", ptv->tv);
+
+ /* Reserve memory for umem */
+ if (AcquireBuffer(ptv) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ *data = (void *)ptv;
+ afxdpconfig->DerefFunc(afxdpconfig);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief Main AF_XDP reading Loop function
+ */
+static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot)
+{
+ SCEnter();
+
+ Packet *p;
+ time_t last_dump = 0;
+ struct timeval ts;
+ uint32_t idx_rx = 0, idx_fq = 0, rcvd;
+ int r;
+ AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
+ TmSlot *s = (TmSlot *)slot;
+
+ ptv->slot = s->slot_next;
+
+ AFXDPAllThreadsRunning(ptv);
+
+ // Indicate that the thread is actually running its application level code (i.e., it can poll
+ // packets)
+ TmThreadsSetFlag(tv, THV_RUNNING);
+
+ PacketPoolWait();
+ while (1) {
+ /* Start by checking the state of our interface */
+ if (unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
+ do {
+ usleep(RECONNECT_TIMEOUT);
+ if (unlikely(suricata_ctl_flags != 0)) {
+ break;
+ }
+ r = AFXDPTryReopen(ptv);
+ } while (r != TM_ECODE_OK);
+ }
+
+ if (unlikely(suricata_ctl_flags != 0)) {
+ SCLogDebug("Stopping Suricata!");
+ AFXDPDumpCounters(ptv);
+ break;
+ }
+
+ /* Busy polling is not set, using poll() to maintain (relatively) decent
+ * performance. xdp_busy_poll must be disabled for kernels < 5.11
+ */
+ if (!ptv->xsk.enable_busy_poll) {
+ StatsIncr(ptv->tv, ptv->capture_afxdp_poll);
+
+ r = poll(&ptv->xsk.fd, 1, POLL_TIMEOUT);
+
+ /* Report poll results */
+ if (r <= 0) {
+ if (r == 0) {
+ StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout);
+ } else if (r < 0) {
+ StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed);
+ SCLogWarning(SC_ERR_AFXDP_READ, "poll failed with retval %d", r);
+ AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
+ }
+
+ DumpStatsEverySecond(ptv, &last_dump);
+ continue;
+ }
+ }
+
+ rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
+ if (!rcvd) {
+ StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads);
+ ssize_t ret = WakeupSocket(ptv);
+ if (ret < 0) {
+ SCLogWarning(SC_ERR_AFXDP_READ, "recv failed with retval %ld", ret);
+ AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
+ }
+ DumpStatsEverySecond(ptv, &last_dump);
+ continue;
+ }
+
+ uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
+ while (res != rcvd) {
+ StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads);
+ ssize_t ret = WakeupSocket(ptv);
+ if (ret < 0) {
+ SCLogWarning(SC_ERR_AFXDP_READ, "recv failed with retval %ld", ret);
+ AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
+ continue;
+ }
+ res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
+ }
+
+ gettimeofday(&ts, NULL);
+ ptv->pkts += rcvd;
+ for (uint32_t i = 0; i < rcvd; i++) {
+ p = PacketGetFromQueueOrAlloc();
+ if (unlikely(p == NULL)) {
+ StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed);
+ continue;
+ }
+
+ PKT_SET_SRC(p, PKT_SRC_WIRE);
+ p->datalink = LINKTYPE_ETHERNET;
+ p->livedev = ptv->livedev;
+ p->ReleasePacket = AFXDPReleasePacket;
+ p->flags |= PKT_IGNORE_CHECKSUM;
+
+ p->ts = ts;
+
+ uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
+ uint32_t len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
+ uint64_t orig = xsk_umem__extract_addr(addr);
+ addr = xsk_umem__add_offset_to_addr(addr);
+
+ uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
+
+ ptv->bytes += len;
+
+ p->afxdp_v.fq_idx = idx_fq++;
+ p->afxdp_v.orig = orig;
+ p->afxdp_v.fq = &ptv->umem.fq;
+
+ PacketSetData(p, pkt_data, len);
+
+ if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
+ TmqhOutputPacketpool(ptv->tv, p);
+ SCReturnInt(EXIT_FAILURE);
+ }
+ }
+
+ xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
+ xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
+
+ /* Trigger one dump of stats every second */
+ DumpStatsEverySecond(ptv, &last_dump);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief DeInit function closes af-xdp socket at exit.
+ * \param tv pointer to ThreadVars
+ * \param data pointer that gets cast into AFXDPPThreadVars for ptv
+ */
+static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *tv, void *data)
+{
+ AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
+
+ if (ptv->xsk.xsk) {
+ xsk_socket__delete(ptv->xsk.xsk);
+ ptv->xsk.xsk = NULL;
+ }
+
+ if (ptv->umem.umem) {
+ xsk_umem__delete(ptv->umem.umem);
+ ptv->umem.umem = NULL;
+ }
+ munmap(ptv->umem.buf, MEM_BYTES);
+
+ SCFree(ptv);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/**
+ * \brief This function prints stats to the screen at exit.
+ * \param tv pointer to ThreadVars
+ * \param data pointer that gets cast into AFXDPThreadVars for ptv
+ */
+static void ReceiveAFXDPThreadExitStats(ThreadVars *tv, void *data)
+{
+ SCEnter();
+ AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
+
+ AFXDPDumpCounters(ptv);
+
+ SCLogPerf("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", tv->name,
+ StatsGetLocalCounterValue(tv, ptv->capture_afxdp_packets), ptv->bytes,
+ StatsGetLocalCounterValue(tv, ptv->capture_kernel_drops));
+}
+
+/**
+ * \brief This function passes off to link type decoders.
+ *
+ * DecodeAFXDP decodes packets from AF_XDP and passes
+ * them off to the proper link type decoder.
+ *
+ * \param t pointer to ThreadVars
+ * \param p pointer to the current packet
+ * \param data pointer that gets cast into AFXDPThreadVars for ptv
+ */
+static TmEcode DecodeAFXDP(ThreadVars *tv, Packet *p, void *data)
+{
+ SCEnter();
+
+ DecodeThreadVars *dtv = (DecodeThreadVars *)data;
+
+ DEBUG_VALIDATE_BUG_ON(PKT_IS_PSEUDOPKT(p));
+
+ /* update counters */
+ DecodeUpdatePacketCounters(tv, dtv, p);
+
+ /* If suri has set vlan during reading, we increase vlan counter */
+ if (p->vlan_idx) {
+ StatsIncr(tv, dtv->counter_vlan);
+ }
+
+ /* call the decoder */
+ DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
+
+ PacketDecodeFinalize(tv, dtv, p);
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static TmEcode DecodeAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
+{
+ SCEnter();
+ DecodeThreadVars *dtv = DecodeThreadVarsAlloc(tv);
+ if (dtv == NULL)
+ SCReturnInt(TM_ECODE_FAILED);
+
+ DecodeRegisterPerfCounters(dtv, tv);
+
+ *data = (void *)dtv;
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data)
+{
+ if (data != NULL)
+ DecodeThreadVarsFree(tv, data);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+#endif /* HAVE_AF_XDP */
+/* eof */
+/**
+ * @}
+ */