TmEcode DecodeMpipeThreadInit(ThreadVars *, void *, void **);
TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+static int MpipeReceiveOpenIqueue(int rank);
#define MAX_CHANNELS 32 /* can probably find this in the MDE */
static gxio_mpipe_context_t context_body;
static gxio_mpipe_context_t* context = &context_body;
-/* The ingress queues (one per worker) */
-static gxio_mpipe_iqueue_t** iqueues;
+/* First allocated Notification ring for iQueues. */
+static int first_notif_ring;
+
+/* The ingress queue for this worker thread */
+static __thread gxio_mpipe_iqueue_t* thread_iqueue;
/* The egress queues (one per port) */
static gxio_mpipe_equeue_t equeue[MAX_CHANNELS];
/* Release Packet without sending. */
void MpipeReleasePacket(Packet *p)
{
- gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank];
+ /* Use this thread's context to free the packet. */
+ // TODO: Check for dual mPipes.
+ gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
int bucket = p->mpipe_v.idesc.bucket_id;
gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
- gxio_mpipe_push_buffer(context,
+ gxio_mpipe_push_buffer(iqueue->context,
p->mpipe_v.idesc.stack_idx,
(void*)(intptr_t)p->mpipe_v.idesc.va);
}
/* Unconditionally send packet, then release packet buffer. */
void MpipeReleasePacketCopyTap(Packet *p)
{
- gxio_mpipe_iqueue_t* iqueue = iqueues[p->mpipe_v.rank];
+ gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
int bucket = p->mpipe_v.idesc.bucket_id;
gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
gxio_mpipe_edesc_t edesc;
}
}
- gxio_mpipe_iqueue_t* iqueue = iqueues[rank];
+ /* Open Ingress Queue for this worker thread. */
+ MpipeReceiveOpenIqueue(rank);
+ gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
for (;;) {
if (suricata_ctl_flags != 0) {
return gxio_mpipe_rules_commit(&rules);
}
+/* \brief Initialize mPIPE ingress ring
+ *
+ * \param name of interface to open
+ * \param Array of port configuations
+ *
+ * \return Output port channel number, or -1 on error
+ */
+static int MpipeReceiveOpenIqueue(int rank)
+{
+ /* Init the NotifRings. */
+ const size_t notif_ring_entries = 2048;
+
+ size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t);
+
+ tmc_alloc_t alloc = TMC_ALLOC_INIT;
+ /* Allocate the memory locally on this thread's CPU. */
+ tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_TASK);
+ /* Allocate all the memory on one page. Which is required for the
+ notif ring, not the iqueue. */
+ if (notif_ring_size > (size_t)getpagesize())
+ tmc_alloc_set_huge(&alloc);
+ int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t);
+ // TODO - Save the rest of the Huge Page for other allocations.
+ void *iqueue_mem = tmc_alloc_map(&alloc, needed);
+ if (iqueue_mem == NULL) {
+ SCLogError(SC_ERR_FATAL, "Failed to allocate memory for mPIPE iQueue");
+ return TM_ECODE_FAILED;
+ }
+
+ thread_iqueue = iqueue_mem + notif_ring_size;
+ int result = gxio_mpipe_iqueue_init(thread_iqueue, context, first_notif_ring + rank,
+ iqueue_mem, notif_ring_size, 0);
+ if (result < 0) {
+ VERIFY(result, "gxio_mpipe_iqueue_init()");
+ }
+
+ return TM_ECODE_OK;
+}
+
/* \brief Initialize on MPIPE egress port
*
* Initialize one mPIPE egress port for use in IPS mode.
result = gxio_mpipe_link_open(&link, context, link_name, 0);
VERIFY(result, "gxio_mpipe_link_open()");
}
-
- /* Allocate some ingress queues. */
- iqueues = SCCalloc(num_workers, sizeof(*iqueues));
- if (unlikely(iqueues == NULL))
- SCReturnInt(TM_ECODE_FAILED);
-
/* Allocate some NotifRings. */
result = gxio_mpipe_alloc_notif_rings(context,
num_workers,
0, 0);
VERIFY(result, "gxio_mpipe_alloc_notif_rings()");
- int ring = result;
-
- /* Init the NotifRings. */
- size_t notif_ring_entries = 2048;
- size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t);
- for (int i = 0; i < num_workers; i++) {
- tmc_alloc_t alloc = TMC_ALLOC_INIT;
- tmc_alloc_set_home(&alloc, 1 + i); // FIXME: static worker to Core mapping
- if (notif_ring_size > (size_t)getpagesize())
- tmc_alloc_set_huge(&alloc);
- int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t);
- void *iqueue_mem = tmc_alloc_map(&alloc, needed);
- if (iqueue_mem == NULL)
- SCReturnInt(TM_ECODE_FAILED);
-
- gxio_mpipe_iqueue_t *iqueue = iqueue_mem + notif_ring_size;
- result = gxio_mpipe_iqueue_init(iqueue, context, ring + i,
- iqueue_mem, notif_ring_size, 0);
- VERIFY(result, "gxio_mpipe_iqueue_init()");
- iqueues[i] = iqueue;
- }
+ first_notif_ring = result;
int first_bucket = 0;
int rc;
- rc = ReceiveMpipeCreateBuckets(ring, num_workers,
+ rc = ReceiveMpipeCreateBuckets(first_notif_ring, num_workers,
&first_bucket, &num_buckets);
if (rc != TM_ECODE_OK)
SCReturnInt(rc);