From: Lukas Sismis Date: Tue, 12 Mar 2024 22:24:07 +0000 (+0100) Subject: dpdk: refactor the main packet loop into smaller functions X-Git-Tag: suricata-8.0.0-beta1~1629 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5592ec07;p=thirdparty%2Fsuricata.git dpdk: refactor the main packet loop into smaller functions --- diff --git a/src/source-dpdk.c b/src/source-dpdk.c index 21d6752697..69e13bc164 100644 --- a/src/source-dpdk.c +++ b/src/source-dpdk.c @@ -185,7 +185,8 @@ static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool rte_spinlock_unlock(&(intr_lock[port_id])); } -static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset) +static inline void DPDKFreeMbufArray( + struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset) { for (int i = offset; i < mbuf_cnt; i++) { rte_pktmbuf_free(mbuf_array[i]); @@ -394,151 +395,204 @@ static void DPDKReleasePacket(Packet *p) PacketFreeOrRelease(p); } -/** - * \brief Main DPDK reading Loop function - */ -static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) +static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv) { SCEnter(); - Packet *p; - uint16_t nb_rx; - time_t last_dump = 0; - time_t current_time; - bool segmented_mbufs_warned = 0; - SCTime_t t = DPDKSetTimevalReal(&machine_start_time); - uint64_t last_timeout_msec = SCTIME_MSECS(t); - - DPDKThreadVars *ptv = (DPDKThreadVars *)data; - TmSlot *s = (TmSlot *)slot; - - ptv->slot = s->slot_next; - - // Indicate that the thread is actually running its application level code (i.e., it can poll - // packets) + // Indicate that the thread is actually running its application level + // code (i.e., it can poll packets) TmThreadsSetFlag(tv, THV_RUNNING); PacketPoolWait(); rte_eth_stats_reset(ptv->port_id); rte_eth_xstats_reset(ptv->port_id); - uint32_t pwd_zero_rx_packet_polls_count = 0; if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id)) SCReturnInt(TM_ECODE_FAILED); - while (1) { - if (unlikely(suricata_ctl_flags != 0)) { - SCLogDebug("Stopping Suricata!"); - SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1); - while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < - ptv->workers_sync->worker_cnt) { - rte_delay_us(10); + SCReturnInt(TM_ECODE_OK); +} + +static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv) +{ + static uint64_t last_timeout_msec = 0; + SCTime_t t = DPDKSetTimevalReal(&machine_start_time); + uint64_t msecs = SCTIME_MSECS(t); + if (msecs > last_timeout_msec + 100) { + TmThreadsCaptureHandleTimeout(tv, NULL); + last_timeout_msec = msecs; + } +} + +/** + * \brief Decides if it should retry the packet poll or continue with the packet processing + * \return true if the poll should be retried, false otherwise + */ +static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx) +{ + static uint32_t zero_pkt_polls_cnt = 0; + + if (nb_rx > 0) { + zero_pkt_polls_cnt = 0; + return false; + } + + LoopHandleTimeoutOnIdle(tv); + if (!ptv->intr_enabled) + return true; + + zero_pkt_polls_cnt++; + if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT) + return true; + + uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt); + if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) { + rte_delay_us(pwd_idle_hint); + } else { + InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true); + struct rte_epoll_event event; + rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS); + InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false); + return true; + } + + return false; +} + +/** + * \brief Initializes a packet from an mbuf + * \return true if the packet was initialized successfully, false otherwise + */ +static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf) +{ + Packet *p = PacketGetFromQueueOrAlloc(); + if (unlikely(p == NULL)) { + return NULL; + } + PKT_SET_SRC(p, PKT_SRC_WIRE); + p->datalink = LINKTYPE_ETHERNET; + if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { + p->flags |= PKT_IGNORE_CHECKSUM; + } + + p->ts = DPDKSetTimevalReal(&machine_start_time); + p->dpdk_v.mbuf = mbuf; + p->ReleasePacket = DPDKReleasePacket; + p->dpdk_v.copy_mode = ptv->copy_mode; + p->dpdk_v.out_port_id = ptv->out_port_id; + p->dpdk_v.out_queue_id = ptv->queue_id; + p->livedev = ptv->livedev; + + if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { + p->flags |= PKT_IGNORE_CHECKSUM; + } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) { + uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags; + if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD && + (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) { + SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation"); + p->flags |= PKT_IGNORE_CHECKSUM; + } else { + if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) { + SCLogDebug("HW detected BAD IP checksum"); + // chsum recalc will not be triggered but rule keyword check will be + p->level3_comp_csum = 0; } - if (ptv->queue_id == 0) { - rte_delay_us(20); // wait for all threads to get out of the sync loop - SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0); - // If Suricata runs in peered mode, the peer threads might still want to send - // packets to our port. Instead, we know, that we are done with the peered port, so - // we stop it. The peered threads will stop our port. - if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) { - rte_eth_dev_stop(ptv->out_port_id); - } else { - // in IDS we stop our port - no peer threads are running - rte_eth_dev_stop(ptv->port_id); - } + if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) { + SCLogDebug("HW detected BAD L4 chsum"); + p->level4_comp_csum = 0; } - DPDKDumpCounters(ptv); - break; + } + } + + return p; +} + +static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf) +{ + static bool segmented_mbufs_warned = false; + if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) { + char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 " + "Check your configuration or report the issue"; + enum rte_proc_type_t eal_t = rte_eal_process_type(); + if (eal_t == RTE_PROC_SECONDARY) { + SCLogWarning("%s. To avoid segmented mbufs, " + "try to increase mbuf size in your primary application", + warn_s); + } else if (eal_t == RTE_PROC_PRIMARY) { + SCLogWarning("%s. To avoid segmented mbufs, " + "try to increase MTU in your suricata.yaml", + warn_s); } - nb_rx = rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE); - if (unlikely(nb_rx == 0)) { - t = DPDKSetTimevalReal(&machine_start_time); - uint64_t msecs = SCTIME_MSECS(t); - if (msecs > last_timeout_msec + 100) { - TmThreadsCaptureHandleTimeout(tv, NULL); - last_timeout_msec = msecs; - } + segmented_mbufs_warned = true; + } +} - if (!ptv->intr_enabled) - continue; +static void HandleShutdown(DPDKThreadVars *ptv) +{ + SCLogDebug("Stopping Suricata!"); + SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1); + while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) { + rte_delay_us(10); + } + if (ptv->queue_id == 0) { + rte_delay_us(20); // wait for all threads to get out of the sync loop + SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0); + // If Suricata runs in peered mode, the peer threads might still want to send + // packets to our port. Instead, we know, that we are done with the peered port, so + // we stop it. The peered threads will stop our port. + if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) { + rte_eth_dev_stop(ptv->out_port_id); + } else { + // in IDS we stop our port - no peer threads are running + rte_eth_dev_stop(ptv->port_id); + } + } + DPDKDumpCounters(ptv); +} - pwd_zero_rx_packet_polls_count++; - if (pwd_zero_rx_packet_polls_count <= MIN_ZERO_POLL_COUNT) - continue; +static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv) +{ + static time_t last_dump = 0; + time_t current_time = DPDKGetSeconds(); + /* Trigger one dump of stats every second */ + if (current_time != last_dump) { + DPDKDumpCounters(ptv); + last_dump = current_time; + } +} - uint32_t pwd_idle_hint = InterruptsSleepHeuristic(pwd_zero_rx_packet_polls_count); +/** + * \brief Main DPDK reading Loop function + */ +static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) +{ + SCEnter(); + DPDKThreadVars *ptv = (DPDKThreadVars *)data; + ptv->slot = (TmSlot *)slot; + TmEcode ret = ReceiveDPDKLoopInit(tv, ptv); + if (ret != TM_ECODE_OK) { + SCReturnInt(ret); + } + while (true) { + if (unlikely(suricata_ctl_flags != 0)) { + HandleShutdown(ptv); + break; + } - if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) { - rte_delay_us(pwd_idle_hint); - } else { - InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true); - struct rte_epoll_event event; - rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS); - InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false); - continue; - } - } else if (ptv->intr_enabled && pwd_zero_rx_packet_polls_count) { - pwd_zero_rx_packet_polls_count = 0; + uint16_t nb_rx = + rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE); + if (RXPacketCountHeuristic(tv, ptv, nb_rx)) { + continue; } ptv->pkts += (uint64_t)nb_rx; for (uint16_t i = 0; i < nb_rx; i++) { - p = PacketGetFromQueueOrAlloc(); - if (unlikely(p == NULL)) { + Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]); + if (p == NULL) { + rte_pktmbuf_free(ptv->received_mbufs[i]); continue; } - PKT_SET_SRC(p, PKT_SRC_WIRE); - p->datalink = LINKTYPE_ETHERNET; - if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { - p->flags |= PKT_IGNORE_CHECKSUM; - } - - p->ts = DPDKSetTimevalReal(&machine_start_time); - p->dpdk_v.mbuf = ptv->received_mbufs[i]; - p->ReleasePacket = DPDKReleasePacket; - p->dpdk_v.copy_mode = ptv->copy_mode; - p->dpdk_v.out_port_id = ptv->out_port_id; - p->dpdk_v.out_queue_id = ptv->queue_id; - p->livedev = ptv->livedev; - - if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { - p->flags |= PKT_IGNORE_CHECKSUM; - } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) { - uint64_t ol_flags = ptv->received_mbufs[i]->ol_flags; - if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD && - (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) { - SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation"); - p->flags |= PKT_IGNORE_CHECKSUM; - } else { - if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) { - SCLogDebug("HW detected BAD IP checksum"); - // chsum recalc will not be triggered but rule keyword check will be - p->level3_comp_csum = 0; - } - if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) { - SCLogDebug("HW detected BAD L4 chsum"); - p->level4_comp_csum = 0; - } - } - } - - if (!rte_pktmbuf_is_contiguous(p->dpdk_v.mbuf) && !segmented_mbufs_warned) { - char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 " - "Check your configuration or report the issue"; - enum rte_proc_type_t eal_t = rte_eal_process_type(); - if (eal_t == RTE_PROC_SECONDARY) { - SCLogWarning("%s. To avoid segmented mbufs, " - "try to increase mbuf size in your primary application", - warn_s); - } else if (eal_t == RTE_PROC_PRIMARY) { - SCLogWarning("%s. To avoid segmented mbufs, " - "try to increase MTU in your suricata.yaml", - warn_s); - } - - segmented_mbufs_warned = 1; - } - + DPDKSegmentedMbufWarning(ptv->received_mbufs[i]); PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *), rte_pktmbuf_pkt_len(p->dpdk_v.mbuf)); if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { @@ -548,12 +602,7 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) } } - /* Trigger one dump of stats every second */ - current_time = DPDKGetSeconds(); - if (current_time != last_dump) { - DPDKDumpCounters(ptv); - last_dump = current_time; - } + PeriodicDPDKDumpCounters(ptv); StatsSyncCountersIfSignalled(tv); }