// This counter is increased by worker threads that individually pick queue IDs.
SC_ATOMIC_RESET(iconf->queue_id);
SC_ATOMIC_RESET(iconf->inconsitent_numa_cnt);
+ iconf->workers_sync = SCCalloc(1, sizeof(*iconf->workers_sync));
+ if (iconf->workers_sync == NULL) {
+ FatalError("Failed to allocate memory for workers_sync");
+ }
+ SC_ATOMIC_RESET(iconf->workers_sync->worker_checked_in);
+ iconf->workers_sync->worker_cnt = iconf->threads;
// initialize LiveDev DPDK values
LiveDevice *ldev_instance = LiveGetDevice(iface);
int32_t port_socket_id;
struct rte_mempool *pkt_mempool;
struct rte_mbuf *received_mbufs[BURST_SIZE];
+ DPDKWorkerSync *workers_sync;
} DPDKThreadVars;
static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
while (1) {
if (unlikely(suricata_ctl_flags != 0)) {
SCLogDebug("Stopping Suricata!");
+ SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
+ while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) <
+ ptv->workers_sync->worker_cnt) {
+ rte_delay_us(10);
+ }
if (ptv->queue_id == 0) {
- rte_eth_dev_stop(ptv->port_id);
+ rte_delay_us(20); // wait for all threads to get out of the sync loop
+ SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
+ // If Suricata runs in peered mode, the peer threads might still want to send
+ // packets to our port. Instead, we know, that we are done with the peered port, so
+ // we stop it. The peered threads will stop our port.
if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
rte_eth_dev_stop(ptv->out_port_id);
+ } else {
+ // in IDS we stop our port - no peer threads are running
+ rte_eth_dev_stop(ptv->port_id);
}
}
DPDKDumpCounters(ptv);
ptv->port_socket_id, thread_numa);
}
+ ptv->workers_sync = dpdk_config->workers_sync;
uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
ptv->queue_id = queue_id;
}
DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
+
+ if (ptv->workers_sync) {
+ SCFree(ptv->workers_sync);
+ }
}
ptv->pkt_mempool = NULL; // MP is released when device is closed
#define DPDK_RX_CHECKSUM_OFFLOAD (1 << 4) /**< Enable chsum offload */
void DPDKSetTimevalOfMachineStart(void);
+
+typedef struct DPDKWorkerSync_ {
+ uint16_t worker_cnt;
+ SC_ATOMIC_DECLARE(uint16_t, worker_checked_in);
+} DPDKWorkerSync;
+
typedef struct DPDKIfaceConfig_ {
#ifdef HAVE_DPDK
char iface[RTE_ETH_NAME_MAX_LEN];
/* threads bind queue id one by one */
SC_ATOMIC_DECLARE(uint16_t, queue_id);
SC_ATOMIC_DECLARE(uint16_t, inconsitent_numa_cnt);
+ DPDKWorkerSync *workers_sync;
void (*DerefFunc)(void *);
struct rte_flow *flow[100];