]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
dpdk: refactor the main packet loop into smaller functions
authorLukas Sismis <lsismis@oisf.net>
Tue, 12 Mar 2024 22:24:07 +0000 (23:24 +0100)
committerVictor Julien <victor@inliniac.net>
Sat, 16 Mar 2024 08:29:37 +0000 (09:29 +0100)
src/source-dpdk.c

index 21d67526972adf982cc242e5bd215242b855bddc..69e13bc1640fbabff3330a6078eea8c5effa34c8 100644 (file)
@@ -185,7 +185,8 @@ static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool
     rte_spinlock_unlock(&(intr_lock[port_id]));
 }
 
-static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
+static inline void DPDKFreeMbufArray(
+        struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
 {
     for (int i = offset; i < mbuf_cnt; i++) {
         rte_pktmbuf_free(mbuf_array[i]);
@@ -394,151 +395,204 @@ static void DPDKReleasePacket(Packet *p)
     PacketFreeOrRelease(p);
 }
 
-/**
- *  \brief Main DPDK reading Loop function
- */
-static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
+static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
 {
     SCEnter();
-    Packet *p;
-    uint16_t nb_rx;
-    time_t last_dump = 0;
-    time_t current_time;
-    bool segmented_mbufs_warned = 0;
-    SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
-    uint64_t last_timeout_msec = SCTIME_MSECS(t);
-
-    DPDKThreadVars *ptv = (DPDKThreadVars *)data;
-    TmSlot *s = (TmSlot *)slot;
-
-    ptv->slot = s->slot_next;
-
-    // Indicate that the thread is actually running its application level code (i.e., it can poll
-    // packets)
+    // Indicate that the thread is actually running its application level
+    // code (i.e., it can poll packets)
     TmThreadsSetFlag(tv, THV_RUNNING);
     PacketPoolWait();
 
     rte_eth_stats_reset(ptv->port_id);
     rte_eth_xstats_reset(ptv->port_id);
 
-    uint32_t pwd_zero_rx_packet_polls_count = 0;
     if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
         SCReturnInt(TM_ECODE_FAILED);
 
-    while (1) {
-        if (unlikely(suricata_ctl_flags != 0)) {
-            SCLogDebug("Stopping Suricata!");
-            SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
-            while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) <
-                    ptv->workers_sync->worker_cnt) {
-                rte_delay_us(10);
+    SCReturnInt(TM_ECODE_OK);
+}
+
+static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
+{
+    static uint64_t last_timeout_msec = 0;
+    SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
+    uint64_t msecs = SCTIME_MSECS(t);
+    if (msecs > last_timeout_msec + 100) {
+        TmThreadsCaptureHandleTimeout(tv, NULL);
+        last_timeout_msec = msecs;
+    }
+}
+
+/**
+ * \brief Decides if it should retry the packet poll or continue with the packet processing
+ * \return true if the poll should be retried, false otherwise
+ */
+static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
+{
+    static uint32_t zero_pkt_polls_cnt = 0;
+
+    if (nb_rx > 0) {
+        zero_pkt_polls_cnt = 0;
+        return false;
+    }
+
+    LoopHandleTimeoutOnIdle(tv);
+    if (!ptv->intr_enabled)
+        return true;
+
+    zero_pkt_polls_cnt++;
+    if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
+        return true;
+
+    uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
+    if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
+        rte_delay_us(pwd_idle_hint);
+    } else {
+        InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
+        struct rte_epoll_event event;
+        rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
+        InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
+        return true;
+    }
+
+    return false;
+}
+
+/**
+ * \brief Initializes a packet from an mbuf
+ * \return true if the packet was initialized successfully, false otherwise
+ */
+static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
+{
+    Packet *p = PacketGetFromQueueOrAlloc();
+    if (unlikely(p == NULL)) {
+        return NULL;
+    }
+    PKT_SET_SRC(p, PKT_SRC_WIRE);
+    p->datalink = LINKTYPE_ETHERNET;
+    if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
+        p->flags |= PKT_IGNORE_CHECKSUM;
+    }
+
+    p->ts = DPDKSetTimevalReal(&machine_start_time);
+    p->dpdk_v.mbuf = mbuf;
+    p->ReleasePacket = DPDKReleasePacket;
+    p->dpdk_v.copy_mode = ptv->copy_mode;
+    p->dpdk_v.out_port_id = ptv->out_port_id;
+    p->dpdk_v.out_queue_id = ptv->queue_id;
+    p->livedev = ptv->livedev;
+
+    if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
+        p->flags |= PKT_IGNORE_CHECKSUM;
+    } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
+        uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
+        if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
+                (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
+            SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
+            p->flags |= PKT_IGNORE_CHECKSUM;
+        } else {
+            if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
+                SCLogDebug("HW detected BAD IP checksum");
+                // chsum recalc will not be triggered but rule keyword check will be
+                p->level3_comp_csum = 0;
             }
-            if (ptv->queue_id == 0) {
-                rte_delay_us(20); // wait for all threads to get out of the sync loop
-                SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
-                // If Suricata runs in peered mode, the peer threads might still want to send
-                // packets to our port. Instead, we know, that we are done with the peered port, so
-                // we stop it. The peered threads will stop our port.
-                if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
-                    rte_eth_dev_stop(ptv->out_port_id);
-                } else {
-                    // in IDS we stop our port - no peer threads are running
-                    rte_eth_dev_stop(ptv->port_id);
-                }
+            if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
+                SCLogDebug("HW detected BAD L4 chsum");
+                p->level4_comp_csum = 0;
             }
-            DPDKDumpCounters(ptv);
-            break;
+        }
+    }
+
+    return p;
+}
+
+static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
+{
+    static bool segmented_mbufs_warned = false;
+    if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
+        char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
+                        "Check your configuration or report the issue";
+        enum rte_proc_type_t eal_t = rte_eal_process_type();
+        if (eal_t == RTE_PROC_SECONDARY) {
+            SCLogWarning("%s. To avoid segmented mbufs, "
+                         "try to increase mbuf size in your primary application",
+                    warn_s);
+        } else if (eal_t == RTE_PROC_PRIMARY) {
+            SCLogWarning("%s. To avoid segmented mbufs, "
+                         "try to increase MTU in your suricata.yaml",
+                    warn_s);
         }
 
-        nb_rx = rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
-        if (unlikely(nb_rx == 0)) {
-            t = DPDKSetTimevalReal(&machine_start_time);
-            uint64_t msecs = SCTIME_MSECS(t);
-            if (msecs > last_timeout_msec + 100) {
-                TmThreadsCaptureHandleTimeout(tv, NULL);
-                last_timeout_msec = msecs;
-            }
+        segmented_mbufs_warned = true;
+    }
+}
 
-            if (!ptv->intr_enabled)
-                continue;
+static void HandleShutdown(DPDKThreadVars *ptv)
+{
+    SCLogDebug("Stopping Suricata!");
+    SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
+    while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
+        rte_delay_us(10);
+    }
+    if (ptv->queue_id == 0) {
+        rte_delay_us(20); // wait for all threads to get out of the sync loop
+        SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
+        // If Suricata runs in peered mode, the peer threads might still want to send
+        // packets to our port. Instead, we know, that we are done with the peered port, so
+        // we stop it. The peered threads will stop our port.
+        if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
+            rte_eth_dev_stop(ptv->out_port_id);
+        } else {
+            // in IDS we stop our port - no peer threads are running
+            rte_eth_dev_stop(ptv->port_id);
+        }
+    }
+    DPDKDumpCounters(ptv);
+}
 
-            pwd_zero_rx_packet_polls_count++;
-            if (pwd_zero_rx_packet_polls_count <= MIN_ZERO_POLL_COUNT)
-                continue;
+static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
+{
+    static time_t last_dump = 0;
+    time_t current_time = DPDKGetSeconds();
+    /* Trigger one dump of stats every second */
+    if (current_time != last_dump) {
+        DPDKDumpCounters(ptv);
+        last_dump = current_time;
+    }
+}
 
-            uint32_t pwd_idle_hint = InterruptsSleepHeuristic(pwd_zero_rx_packet_polls_count);
+/**
+ *  \brief Main DPDK reading Loop function
+ */
+static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
+{
+    SCEnter();
+    DPDKThreadVars *ptv = (DPDKThreadVars *)data;
+    ptv->slot = (TmSlot *)slot;
+    TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
+    if (ret != TM_ECODE_OK) {
+        SCReturnInt(ret);
+    }
+    while (true) {
+        if (unlikely(suricata_ctl_flags != 0)) {
+            HandleShutdown(ptv);
+            break;
+        }
 
-            if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
-                rte_delay_us(pwd_idle_hint);
-            } else {
-                InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
-                struct rte_epoll_event event;
-                rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
-                InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
-                continue;
-            }
-        } else if (ptv->intr_enabled && pwd_zero_rx_packet_polls_count) {
-            pwd_zero_rx_packet_polls_count = 0;
+        uint16_t nb_rx =
+                rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
+        if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
+            continue;
         }
 
         ptv->pkts += (uint64_t)nb_rx;
         for (uint16_t i = 0; i < nb_rx; i++) {
-            p = PacketGetFromQueueOrAlloc();
-            if (unlikely(p == NULL)) {
+            Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
+            if (p == NULL) {
+                rte_pktmbuf_free(ptv->received_mbufs[i]);
                 continue;
             }
-            PKT_SET_SRC(p, PKT_SRC_WIRE);
-            p->datalink = LINKTYPE_ETHERNET;
-            if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
-                p->flags |= PKT_IGNORE_CHECKSUM;
-            }
-
-            p->ts = DPDKSetTimevalReal(&machine_start_time);
-            p->dpdk_v.mbuf = ptv->received_mbufs[i];
-            p->ReleasePacket = DPDKReleasePacket;
-            p->dpdk_v.copy_mode = ptv->copy_mode;
-            p->dpdk_v.out_port_id = ptv->out_port_id;
-            p->dpdk_v.out_queue_id = ptv->queue_id;
-            p->livedev = ptv->livedev;
-
-            if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
-                p->flags |= PKT_IGNORE_CHECKSUM;
-            } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
-                uint64_t ol_flags = ptv->received_mbufs[i]->ol_flags;
-                if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
-                        (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
-                    SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
-                    p->flags |= PKT_IGNORE_CHECKSUM;
-                } else {
-                    if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
-                        SCLogDebug("HW detected BAD IP checksum");
-                        // chsum recalc will not be triggered but rule keyword check will be
-                        p->level3_comp_csum = 0;
-                    }
-                    if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
-                        SCLogDebug("HW detected BAD L4 chsum");
-                        p->level4_comp_csum = 0;
-                    }
-                }
-            }
-
-            if (!rte_pktmbuf_is_contiguous(p->dpdk_v.mbuf) && !segmented_mbufs_warned) {
-                char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
-                                "Check your configuration or report the issue";
-                enum rte_proc_type_t eal_t = rte_eal_process_type();
-                if (eal_t == RTE_PROC_SECONDARY) {
-                    SCLogWarning("%s. To avoid segmented mbufs, "
-                                 "try to increase mbuf size in your primary application",
-                            warn_s);
-                } else if (eal_t == RTE_PROC_PRIMARY) {
-                    SCLogWarning("%s. To avoid segmented mbufs, "
-                                 "try to increase MTU in your suricata.yaml",
-                            warn_s);
-                }
-
-                segmented_mbufs_warned = 1;
-            }
-
+            DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
             PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
                     rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
             if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
@@ -548,12 +602,7 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
             }
         }
 
-        /* Trigger one dump of stats every second */
-        current_time = DPDKGetSeconds();
-        if (current_time != last_dump) {
-            DPDKDumpCounters(ptv);
-            last_dump = current_time;
-        }
+        PeriodicDPDKDumpCounters(ptv);
         StatsSyncCountersIfSignalled(tv);
     }