]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Replace ringbuffer in Packet Pool with a stack for better cache locality
authorKen Steele <ken@tilera.com>
Fri, 28 Mar 2014 18:51:25 +0000 (14:51 -0400)
committerKen Steele <ken@tilera.com>
Mon, 21 Jul 2014 18:48:31 +0000 (14:48 -0400)
Using a stack for free Packet storage causes recently freed Packets to be
reused quickly, while there is more likelihood of the data still being in
cache.

The new structure has a per-thread private stack for allocating Packets
which does not need any locking. Since Packets can be freed by any thread,
there is a second stack (return stack) for freeing packets by other threads.
The return stack is protected by a mutex. Packets are moved from the return
stack to the private stack when the private stack is empty.

Returning packets back to their "home" stack keeps the stacks from getting out
of balance.

The PacketPoolInit() function is now called by each thread that will be
allocating packets. Each thread allocates max_pending_packets, which is a
change from before, where that was the total number of packets across all
threads.

15 files changed:
src/decode.c
src/decode.h
src/source-af-packet.c
src/source-erf-dag.c
src/source-erf-file.c
src/source-ipfw.c
src/source-mpipe.c
src/source-napatech.c
src/source-nfq.c
src/source-pcap-file.c
src/source-pcap.c
src/source-pfring.c
src/suricata.c
src/tmqh-packetpool.c
src/tmqh-packetpool.h

index 17ca90b3bc6ab92acdcd1e9023a1b874afd14ea4..ad9d7fe5f1f15a9eac202ac7041b4bac90981853 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2013 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -169,12 +169,8 @@ void PacketFreeOrRelease(Packet *p)
  */
 Packet *PacketGetFromQueueOrAlloc(void)
 {
-    Packet *p = NULL;
-
     /* try the pool first */
-    if (PacketPoolSize() > 0) {
-        p = PacketPoolGetPacket();
-    }
+    Packet *p = PacketPoolGetPacket();
 
     if (p == NULL) {
         /* non fatal, we're just not processing a packet then */
index 2ce5bbf387caf3ac7518fcf45c36d047b1e4d0b9..fa48f4ac896ee163fdbfdb4f54d92851a68163e0 100644 (file)
@@ -88,6 +88,8 @@ enum PktSrcEnum {
 struct DetectionEngineThreadCtx_;
 typedef struct AppLayerThreadCtx_ AppLayerThreadCtx;
 
+struct PktPool_;
+
 /* declare these here as they are called from the
  * PACKET_RECYCLE and PACKET_CLEANUP macro's. */
 typedef struct AppLayerDecoderEvents_ AppLayerDecoderEvents;
@@ -528,6 +530,10 @@ typedef struct Packet_
     /* tunnel packet ref count */
     uint16_t tunnel_tpr_cnt;
 
+    /* The Packet pool from which this packet was allocated. Used when returning
+     * the packet to its owner's stack. If NULL, then allocated with malloc.
+     */
+    struct PktPool_ *pool;
 
 #ifdef PROFILING
     PktProfiling *profile;
index 1afae6a4ecaa33c1c7f6c532bc176749565cc8b4..99acb86801ef380240a60542141652bc77940542 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2011-2013 Open Information Security Foundation
+/* Copyright (C) 2011-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -1118,7 +1118,6 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
 {
     SCEnter();
 
-    uint16_t packet_q_len = 0;
     AFPThreadVars *ptv = (AFPThreadVars *)data;
     struct pollfd fds;
     int r;
@@ -1179,12 +1178,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
 
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         r = poll(&fds, 1, POLL_TIMEOUT);
 
@@ -1787,6 +1781,8 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) {
         ptv->vlan_disabled = 1;
     }
 
+    PacketPoolInit();
+
     SCReturnInt(TM_ECODE_OK);
 }
 
index 032873a1a431c0083e4ce54f57496f81d368bc69..8eef943d43d463ca4e7ccc4fffc9f484a93b09d8 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2010 Open Information Security Foundation
+/* Copyright (C) 2010-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -316,6 +316,8 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
     SCLogInfo("Starting processing packets from stream: %d on DAG: %s",
         ewtn->dagstream, ewtn->dagname);
 
+    PacketPoolInit();
+    
     SCReturnInt(TM_ECODE_OK);
 }
 
@@ -417,12 +419,7 @@ ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top, uint32_t *pkts_read)
 
         /* Make sure we have at least one packet in the packet pool,
          * to prevent us from alloc'ing packets at line rate. */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         prec = (char *)ewtn->btm;
         dr = (dag_record_t*)prec;
index 53ffee1df91c9e5fc737086e049855290adc60a9..24a1e4719181945273f1a7197ec749b1755bcba2 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2010 Open Information Security Foundation
+/* Copyright (C) 2010-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -123,12 +123,7 @@ TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
 
         /* Make sure we have at least one packet in the packet pool,
          * to prevent us from alloc'ing packets at line rate. */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         p = PacketGetFromQueueOrAlloc();
         if (unlikely(p == NULL)) {
@@ -240,6 +235,8 @@ ReceiveErfFileThreadInit(ThreadVars *tv, void *initdata, void **data)
     etv->tv = tv;
     *data = (void *)etv;
 
+    PacketPoolInit();
+
     SCLogInfo("Processing ERF file %s", (char *)initdata);
 
     SCReturnInt(TM_ECODE_OK);
index e152b69ca550c3b2b773584382fc6f55a3572af3..60192361b57a63dabc13b7229c990d2cadc2acfb 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2011 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -271,12 +271,7 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
 
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         p = PacketGetFromQueueOrAlloc();
         if (p == NULL) {
@@ -379,6 +374,8 @@ TmEcode ReceiveIPFWThreadInit(ThreadVars *tv, void *initdata, void **data)
 
     *data = (void *)ntv;
 
+    PacketPoolInit();
+
     SCReturnInt(TM_ECODE_OK);
 }
 
index 675882efff423f14f15bcf2a418b411285a40726..285068162b96318061e0ad019967e026d7286db4 100644 (file)
@@ -899,6 +899,9 @@ TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
 
     *data = (void *)ptv;
 
+    PacketPoolInit();
+
+    /* Only rank 0 does initialization of mpipe */
     if (rank != 0)
         SCReturnInt(TM_ECODE_OK);
 
index 76e1069fc8a54613cc118832ffc6c9641ff88c84..60f9f246ce971f849f30481a17b5b8678369bcab 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2012 Open Information Security Foundation
+/* Copyright (C) 2012-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -171,6 +171,8 @@ TmEcode NapatechStreamThreadInit(ThreadVars *tv, void *initdata, void **data)
 
     *data = (void *)ntv;
 
+    PacketPoolInit();
+
     SCReturnInt(TM_ECODE_OK);
 }
 
@@ -208,12 +210,7 @@ TmEcode NapatechStreamLoop(ThreadVars *tv, void *data, void *slot)
     while (!(suricata_ctl_flags & (SURICATA_STOP | SURICATA_KILL))) {
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         /*
          * Napatech returns packets 1 at a time
index 70195b907db00387e33ce38b9b5d034157b79480..7c6359a31eef867423de98eaca9c930492ad2c51 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2013 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -717,6 +717,9 @@ TmEcode ReceiveNFQThreadInit(ThreadVars *tv, void *initdata, void **data) {
 #undef T_DATA_SIZE
 
     *data = (void *)ntv;
+
+    PacketPoolInit();
+
     SCMutexUnlock(&nfq_init_lock);
     return TM_ECODE_OK;
 }
index 156b286225dcbcad8787e372b4775941a9511016..ee7418d1c668a983fcf5d41535c3c90513620924 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2010 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -200,12 +200,7 @@ TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
 
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         /* Right now we just support reading packets one at a time. */
         r = pcap_dispatch(pcap_g.pcap_handle, (int)packet_q_len,
@@ -345,6 +340,9 @@ TmEcode ReceivePcapFileThreadInit(ThreadVars *tv, void *initdata, void **data) {
 
     ptv->tv = tv;
     *data = (void *)ptv;
+
+    PacketPoolInit();
+
     SCReturnInt(TM_ECODE_OK);
 }
 
index 81acb1f8c69e2db8699805d3beddfdf226744be7..2b13d21dd3be482c3b40a14c486619043e7107aa 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2013 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -309,12 +309,7 @@ TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
 
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         /* Right now we just support reading packets one at a time. */
         r = pcap_dispatch(ptv->pcap_handle, (int)packet_q_len,
@@ -643,6 +638,8 @@ TmEcode ReceivePcapThreadInit(ThreadVars *tv, void *initdata, void **data) {
 
     *data = (void *)ptv;
 
+    PacketPoolInit();
+
     /* Dereference config */
     pcapconfig->DerefFunc(pcapconfig);
     SCReturnInt(TM_ECODE_OK);
index c239bfa102cc49b3de8fcff0b6129d4defa8622b..5735d3ec3afc37081363aa13db29b509a53e6819 100644 (file)
@@ -317,12 +317,7 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
 
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
+        PacketPoolWait();
 
         p = PacketGetFromQueueOrAlloc();
         if (p == NULL) {
@@ -544,6 +539,9 @@ TmEcode ReceivePfringThreadInit(ThreadVars *tv, void *initdata, void **data) {
 
     *data = (void *)ptv;
     pfconf->DerefFunc(pfconf);
+
+    PacketPoolInit();
+
     return TM_ECODE_OK;
 }
 
index 76e919ab433528889a372508266c5372863c2e82..1741dd48049e954765e28ae3b766a63b2ec77a3d 100644 (file)
@@ -2195,7 +2195,6 @@ int main(int argc, char **argv)
     NSS_NoDB_Init(NULL);
 #endif
 
-    PacketPoolInit(max_pending_packets);
     HostInitConfig(HOST_VERBOSE);
     if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
         FlowInitConfig(FLOW_VERBOSE);
index ec4a4fe116539f093eaebb84e4ea2562871ce22f..38ef1aa9be94f72afb7c36284306c5196b557453 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2013 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
  *
  * \author Victor Julien <victor@inliniac.net>
  *
- * Packetpool queue handlers. Packet pool is implemented as a ringbuffer.
- * We're using a multi reader / multi writer version of the ringbuffer,
- * that is relatively expensive due to the CAS function. But it is necessary
- * because every thread can return packets to the pool and multiple parts
- * of the code retrieve packets (Decode, Defrag) and these can run in their
- * own threads as well.
+ * Packetpool queue handlers. Packet pool is implemented as a stack.
  */
 
 #include "suricata.h"
 
 #include "tmqh-packetpool.h"
 
-#include "util-ringbuffer.h"
 #include "util-debug.h"
 #include "util-error.h"
 #include "util-profiling.h"
 #include "util-device.h"
 
-static RingBuffer16 *ringbuffer = NULL;
+/* TODO: Handle case without __thread */
+__thread PktPool thread_pkt_pool;
+
 /**
  * \brief TmqhPacketpoolRegister
  * \initonly
@@ -63,58 +59,78 @@ void TmqhPacketpoolRegister (void) {
     tmqh_table[TMQH_PACKETPOOL].name = "packetpool";
     tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool;
     tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool;
-
-    ringbuffer = RingBufferInit();
-    if (ringbuffer == NULL) {
-        SCLogError(SC_ERR_FATAL, "Error registering Packet pool handler (at ring buffer init)");
-        exit(EXIT_FAILURE);
-    }
-}
-
-void TmqhPacketpoolDestroy (void) {
-    /* doing this clean up PacketPoolDestroy now,
-     * where we also clean the packets */
 }
 
-int PacketPoolIsEmpty(void) {
-    return RingBufferIsEmpty(ringbuffer);
-}
+static int PacketPoolIsEmpty(void)
+{
+    /* Check local stack first. */
+    if (thread_pkt_pool.head || thread_pkt_pool.return_head)
+        return 0;
 
-uint16_t PacketPoolSize(void) {
-    return RingBufferSize(ringbuffer);
+    return 1;
 }
 
-void PacketPoolWait(void) {
-    RingBufferWait(ringbuffer);
+void PacketPoolWait(void)
+{
+    while(PacketPoolIsEmpty())
+        ;
 }
 
 /** \brief a initialized packet
  *
  *  \warning Use *only* at init, not at packet runtime
  */
-void PacketPoolStorePacket(Packet *p) {
-    if (RingBufferIsFull(ringbuffer)) {
-        exit(1);
-    }
-
+static void PacketPoolStorePacket(Packet *p)
+{
     /* Clear the PKT_ALLOC flag, since that indicates to push back
      * onto the ring buffer. */
     p->flags &= ~PKT_ALLOC;
+    p->pool = &thread_pkt_pool;;
     p->ReleasePacket = PacketPoolReturnPacket;
     PacketPoolReturnPacket(p);
-
-    SCLogDebug("buffersize %u", RingBufferSize(ringbuffer));
 }
 
-/** \brief get a packet from the packet pool, but if the
- *         pool is empty, don't wait, just return NULL
+/** \brief Get a new packet from the packet pool
+ *
+ * Only allocates from the thread's local stack, or mallocs new packets.
+ * If the local stack is empty, first move all the return stack packets to
+ * the local stack.
+ *  \retval Packet pointer, or NULL on failure.
  */
-Packet *PacketPoolGetPacket(void) {
-    if (RingBufferIsEmpty(ringbuffer))
-        return NULL;
+Packet *PacketPoolGetPacket(void)
+{
+    PktPool *pool = &thread_pkt_pool;
+
+    if (pool->head) {
+        /* Stack is not empty. */
+        Packet *p = pool->head;
+        pool->head = p->next;
+        p->pool = pool;
+        return p;
+    }
+
+    /* Local Stack is empty, so check the return stack, which requires
+     * locking. */
+    SCMutexLock(&pool->return_mutex);
+    /* Move all the packets from the locked return stack to the local stack. */
+    pool->head = pool->return_head;
+    pool->return_head = NULL;
+    SCMutexUnlock(&pool->return_mutex);
+
+    /* Try to allocate again. Need to check for not empty again, since the
+     * return stack might have been empty too.
+     */
+    if (pool->head) {
+        /* Stack is not empty. */
+        Packet *p = pool->head;
+        pool->head = p->next;
+        p->pool = pool;
+        return p;
+    }
 
-    Packet *p = RingBufferMrMwGetNoWait(ringbuffer);
-    return p;
+    /* Failed to allocate a packet, so return NULL. */
+    /* Optionally, could allocate a new packet here. */
+    return NULL;
 }
 
 /** \brief Return packet to Packet pool
@@ -122,11 +138,33 @@ Packet *PacketPoolGetPacket(void) {
  */
 void PacketPoolReturnPacket(Packet *p)
 {
+    PktPool *pool = p->pool;
+    if (pool == NULL) {
+        free(p);
+        return;
+    }
+   
     PACKET_RECYCLE(p);
-    RingBufferMrMwPut(ringbuffer, (void *)p);
+
+    if (pool == &thread_pkt_pool) {
+        /* Push back onto this thread's own stack, so no locking. */
+        p->next = thread_pkt_pool.head;
+        thread_pkt_pool.head = p;
+    } else {
+        /* Push onto return stack for this pool */
+        SCMutexLock(&pool->return_mutex);
+        p->next = pool->return_head;
+        pool->return_head = p;
+        SCMutexUnlock(&pool->return_mutex);
+    }
 }
 
-void PacketPoolInit(intmax_t max_pending_packets) {
+void PacketPoolInit(void)
+{
+    extern intmax_t max_pending_packets;
+
+    SCMutexInit(&thread_pkt_pool.return_mutex, NULL);
+
     /* pre allocate packets */
     SCLogDebug("preallocating packets... packet size %" PRIuMAX "", (uintmax_t)SIZE_OF_PACKET);
     int i = 0;
@@ -143,31 +181,18 @@ void PacketPoolInit(intmax_t max_pending_packets) {
 }
 
 void PacketPoolDestroy(void) {
-    if (ringbuffer == NULL) {
-        return;
-    }
-
+#if 0
     Packet *p = NULL;
     while ((p = PacketPoolGetPacket()) != NULL) {
         PACKET_CLEANUP(p);
         SCFree(p);
     }
-
-    RingBufferDestroy(ringbuffer);
-    ringbuffer = NULL;
+#endif
 }
 
-Packet *TmqhInputPacketpool(ThreadVars *t)
+Packet *TmqhInputPacketpool(ThreadVars *tv)
 {
-    Packet *p = NULL;
-
-    while (p == NULL && ringbuffer->shutdown == FALSE) {
-        p = RingBufferMrMwGet(ringbuffer);
-    }
-
-    /* packet is clean */
-
-    return p;
+    return PacketPoolGetPacket();
 }
 
 void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
index a248c037a8ec79b853db9a3384bba94e88a0478a..aa564f9d2d8b89ce2eee31563c95ef54c3a0c8a0 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2013 Open Information Security Foundation
+/* Copyright (C) 2007-2014 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 #ifndef __TMQH_PACKETPOOL_H__
 #define __TMQH_PACKETPOOL_H__
 
+#include "decode.h"
+
+typedef struct PktPool_ {
+    /* link listed of free packets local to this thread. */
+    Packet *head;
+  
+    /* Return stack, onto which other threads free packets. */
+    struct {
+        /* linked list of free packets. */
+        Packet *return_head;
+        SCMutex return_mutex;
+    } __attribute__((aligned(CLS)));
+} PktPool;
+
 Packet *TmqhInputPacketpool(ThreadVars *);
 void TmqhOutputPacketpool(ThreadVars *, Packet *);
 void TmqhReleasePacketsToPacketPool(PacketQueue *);
-void TmqhPacketpoolRegister (void);
-void TmqhPacketpoolDestroy (void);
+void TmqhPacketpoolRegister(void);
 Packet *PacketPoolGetPacket(void);
-uint16_t PacketPoolSize(void);
-void PacketPoolStorePacket(Packet *);
 void PacketPoolWait(void);
 void PacketPoolReturnPacket(Packet *p);
-void PacketPoolInit(intmax_t max_pending_packets);
+void PacketPoolInit(void);
 void PacketPoolDestroy(void);
 
 #endif /* __TMQH_PACKETPOOL_H__ */