rte_spinlock_unlock(&(intr_lock[port_id]));
}
-static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
+static inline void DPDKFreeMbufArray(
+ struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
{
for (int i = offset; i < mbuf_cnt; i++) {
rte_pktmbuf_free(mbuf_array[i]);
PacketFreeOrRelease(p);
}
-/**
- * \brief Main DPDK reading Loop function
- */
-static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
+static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
{
SCEnter();
- Packet *p;
- uint16_t nb_rx;
- time_t last_dump = 0;
- time_t current_time;
- bool segmented_mbufs_warned = 0;
- SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
- uint64_t last_timeout_msec = SCTIME_MSECS(t);
-
- DPDKThreadVars *ptv = (DPDKThreadVars *)data;
- TmSlot *s = (TmSlot *)slot;
-
- ptv->slot = s->slot_next;
-
- // Indicate that the thread is actually running its application level code (i.e., it can poll
- // packets)
+ // Indicate that the thread is actually running its application level
+ // code (i.e., it can poll packets)
TmThreadsSetFlag(tv, THV_RUNNING);
PacketPoolWait();
rte_eth_stats_reset(ptv->port_id);
rte_eth_xstats_reset(ptv->port_id);
- uint32_t pwd_zero_rx_packet_polls_count = 0;
if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
SCReturnInt(TM_ECODE_FAILED);
- while (1) {
- if (unlikely(suricata_ctl_flags != 0)) {
- SCLogDebug("Stopping Suricata!");
- SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
- while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) <
- ptv->workers_sync->worker_cnt) {
- rte_delay_us(10);
+ SCReturnInt(TM_ECODE_OK);
+}
+
+static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
+{
+ static uint64_t last_timeout_msec = 0;
+ SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
+ uint64_t msecs = SCTIME_MSECS(t);
+ if (msecs > last_timeout_msec + 100) {
+ TmThreadsCaptureHandleTimeout(tv, NULL);
+ last_timeout_msec = msecs;
+ }
+}
+
+/**
+ * \brief Decides if it should retry the packet poll or continue with the packet processing
+ * \return true if the poll should be retried, false otherwise
+ */
+static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
+{
+ static uint32_t zero_pkt_polls_cnt = 0;
+
+ if (nb_rx > 0) {
+ zero_pkt_polls_cnt = 0;
+ return false;
+ }
+
+ LoopHandleTimeoutOnIdle(tv);
+ if (!ptv->intr_enabled)
+ return true;
+
+ zero_pkt_polls_cnt++;
+ if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
+ return true;
+
+ uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
+ if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
+ rte_delay_us(pwd_idle_hint);
+ } else {
+ InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
+ struct rte_epoll_event event;
+ rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
+ InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
+ return true;
+ }
+
+ return false;
+}
+
+/**
+ * \brief Initializes a packet from an mbuf
+ * \return true if the packet was initialized successfully, false otherwise
+ */
+static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
+{
+ Packet *p = PacketGetFromQueueOrAlloc();
+ if (unlikely(p == NULL)) {
+ return NULL;
+ }
+ PKT_SET_SRC(p, PKT_SRC_WIRE);
+ p->datalink = LINKTYPE_ETHERNET;
+ if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
+ p->flags |= PKT_IGNORE_CHECKSUM;
+ }
+
+ p->ts = DPDKSetTimevalReal(&machine_start_time);
+ p->dpdk_v.mbuf = mbuf;
+ p->ReleasePacket = DPDKReleasePacket;
+ p->dpdk_v.copy_mode = ptv->copy_mode;
+ p->dpdk_v.out_port_id = ptv->out_port_id;
+ p->dpdk_v.out_queue_id = ptv->queue_id;
+ p->livedev = ptv->livedev;
+
+ if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
+ p->flags |= PKT_IGNORE_CHECKSUM;
+ } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
+ uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
+ if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
+ (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
+ SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
+ p->flags |= PKT_IGNORE_CHECKSUM;
+ } else {
+ if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
+ SCLogDebug("HW detected BAD IP checksum");
+ // chsum recalc will not be triggered but rule keyword check will be
+ p->level3_comp_csum = 0;
}
- if (ptv->queue_id == 0) {
- rte_delay_us(20); // wait for all threads to get out of the sync loop
- SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
- // If Suricata runs in peered mode, the peer threads might still want to send
- // packets to our port. Instead, we know, that we are done with the peered port, so
- // we stop it. The peered threads will stop our port.
- if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
- rte_eth_dev_stop(ptv->out_port_id);
- } else {
- // in IDS we stop our port - no peer threads are running
- rte_eth_dev_stop(ptv->port_id);
- }
+ if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
+ SCLogDebug("HW detected BAD L4 chsum");
+ p->level4_comp_csum = 0;
}
- DPDKDumpCounters(ptv);
- break;
+ }
+ }
+
+ return p;
+}
+
+static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
+{
+ static bool segmented_mbufs_warned = false;
+ if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
+ char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
+ "Check your configuration or report the issue";
+ enum rte_proc_type_t eal_t = rte_eal_process_type();
+ if (eal_t == RTE_PROC_SECONDARY) {
+ SCLogWarning("%s. To avoid segmented mbufs, "
+ "try to increase mbuf size in your primary application",
+ warn_s);
+ } else if (eal_t == RTE_PROC_PRIMARY) {
+ SCLogWarning("%s. To avoid segmented mbufs, "
+ "try to increase MTU in your suricata.yaml",
+ warn_s);
}
- nb_rx = rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
- if (unlikely(nb_rx == 0)) {
- t = DPDKSetTimevalReal(&machine_start_time);
- uint64_t msecs = SCTIME_MSECS(t);
- if (msecs > last_timeout_msec + 100) {
- TmThreadsCaptureHandleTimeout(tv, NULL);
- last_timeout_msec = msecs;
- }
+ segmented_mbufs_warned = true;
+ }
+}
- if (!ptv->intr_enabled)
- continue;
+static void HandleShutdown(DPDKThreadVars *ptv)
+{
+ SCLogDebug("Stopping Suricata!");
+ SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
+ while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
+ rte_delay_us(10);
+ }
+ if (ptv->queue_id == 0) {
+ rte_delay_us(20); // wait for all threads to get out of the sync loop
+ SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
+ // If Suricata runs in peered mode, the peer threads might still want to send
+ // packets to our port. Instead, we know, that we are done with the peered port, so
+ // we stop it. The peered threads will stop our port.
+ if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
+ rte_eth_dev_stop(ptv->out_port_id);
+ } else {
+ // in IDS we stop our port - no peer threads are running
+ rte_eth_dev_stop(ptv->port_id);
+ }
+ }
+ DPDKDumpCounters(ptv);
+}
- pwd_zero_rx_packet_polls_count++;
- if (pwd_zero_rx_packet_polls_count <= MIN_ZERO_POLL_COUNT)
- continue;
+static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
+{
+ static time_t last_dump = 0;
+ time_t current_time = DPDKGetSeconds();
+ /* Trigger one dump of stats every second */
+ if (current_time != last_dump) {
+ DPDKDumpCounters(ptv);
+ last_dump = current_time;
+ }
+}
- uint32_t pwd_idle_hint = InterruptsSleepHeuristic(pwd_zero_rx_packet_polls_count);
+/**
+ * \brief Main DPDK reading Loop function
+ */
+static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
+{
+ SCEnter();
+ DPDKThreadVars *ptv = (DPDKThreadVars *)data;
+ ptv->slot = (TmSlot *)slot;
+ TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
+ if (ret != TM_ECODE_OK) {
+ SCReturnInt(ret);
+ }
+ while (true) {
+ if (unlikely(suricata_ctl_flags != 0)) {
+ HandleShutdown(ptv);
+ break;
+ }
- if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
- rte_delay_us(pwd_idle_hint);
- } else {
- InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
- struct rte_epoll_event event;
- rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
- InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
- continue;
- }
- } else if (ptv->intr_enabled && pwd_zero_rx_packet_polls_count) {
- pwd_zero_rx_packet_polls_count = 0;
+ uint16_t nb_rx =
+ rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
+ if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
+ continue;
}
ptv->pkts += (uint64_t)nb_rx;
for (uint16_t i = 0; i < nb_rx; i++) {
- p = PacketGetFromQueueOrAlloc();
- if (unlikely(p == NULL)) {
+ Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
+ if (p == NULL) {
+ rte_pktmbuf_free(ptv->received_mbufs[i]);
continue;
}
- PKT_SET_SRC(p, PKT_SRC_WIRE);
- p->datalink = LINKTYPE_ETHERNET;
- if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
- p->flags |= PKT_IGNORE_CHECKSUM;
- }
-
- p->ts = DPDKSetTimevalReal(&machine_start_time);
- p->dpdk_v.mbuf = ptv->received_mbufs[i];
- p->ReleasePacket = DPDKReleasePacket;
- p->dpdk_v.copy_mode = ptv->copy_mode;
- p->dpdk_v.out_port_id = ptv->out_port_id;
- p->dpdk_v.out_queue_id = ptv->queue_id;
- p->livedev = ptv->livedev;
-
- if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
- p->flags |= PKT_IGNORE_CHECKSUM;
- } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
- uint64_t ol_flags = ptv->received_mbufs[i]->ol_flags;
- if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
- (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
- SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
- p->flags |= PKT_IGNORE_CHECKSUM;
- } else {
- if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
- SCLogDebug("HW detected BAD IP checksum");
- // chsum recalc will not be triggered but rule keyword check will be
- p->level3_comp_csum = 0;
- }
- if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
- SCLogDebug("HW detected BAD L4 chsum");
- p->level4_comp_csum = 0;
- }
- }
- }
-
- if (!rte_pktmbuf_is_contiguous(p->dpdk_v.mbuf) && !segmented_mbufs_warned) {
- char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
- "Check your configuration or report the issue";
- enum rte_proc_type_t eal_t = rte_eal_process_type();
- if (eal_t == RTE_PROC_SECONDARY) {
- SCLogWarning("%s. To avoid segmented mbufs, "
- "try to increase mbuf size in your primary application",
- warn_s);
- } else if (eal_t == RTE_PROC_PRIMARY) {
- SCLogWarning("%s. To avoid segmented mbufs, "
- "try to increase MTU in your suricata.yaml",
- warn_s);
- }
-
- segmented_mbufs_warned = 1;
- }
-
+ DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
}
}
- /* Trigger one dump of stats every second */
- current_time = DPDKGetSeconds();
- if (current_time != last_dump) {
- DPDKDumpCounters(ptv);
- last_dump = current_time;
- }
+ PeriodicDPDKDumpCounters(ptv);
StatsSyncCountersIfSignalled(tv);
}