]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
packetpool: dynamic return threshold
authorVictor Julien <vjulien@oisf.net>
Wed, 13 Sep 2023 05:01:53 +0000 (07:01 +0200)
committerVictor Julien <victor@inliniac.net>
Tue, 21 Nov 2023 16:45:59 +0000 (17:45 +0100)
Problem:

In pcap autofp mode, there is one threads reading packets (RX). These packets
are then passed on to worker threads. When these workers are done with a
packet, they return packets to the pcap reader threads packet pool, which is
the owner of the packets. Since this requires expensive synchronization between
threads, there is logic in place to batch this operation.

When the reader thread depletes its pool, it notifies the other threads that
it is starving and that a sync needs to happen asap. Then the reader enters
a wait state. During this time no new packets are read.

However, there is a problem with this approach. When the reader encountered
an empty pool, it would set an atomic flag that it needed a sync. The first
worker to return a packet to the pool would then set this flag, sync, and
unset the flag. This forced sync could result in just a single packet being
synchronized, or several. So if unlucky, the reader would just get a single
packet before hitting the same condition again.

Solution:

This patch updates the logic to use a new approach. Instead of using a
binary flag approach where the behavior only changes when the reader is
already starved, it uses a dynamic sync threshold that is controlled by
the reader. The reader keeps a running count of packets it its pool,
and calculates the percentage of available packets. This percentage is
then used to set the sync threshold.

When the pool is starved, it sets the threshold to 1 (sync for each packet).
After each successful get/sync the threshold is adjusted.

src/tmqh-packetpool.c
src/tmqh-packetpool.h

index 85946517953a2cc8a6fe037e6be68ee36e55a405..c302bde187320a0cf47ed9c340eb3fb24654eb64 100644 (file)
@@ -35,6 +35,8 @@
 #include "util-validate.h"
 #include "action-globals.h"
 
+extern uint16_t max_pending_packets;
+
 /* Number of freed packet to save for one pool before freeing them. */
 #define MAX_PENDING_RETURN_PACKETS 32
 static uint32_t max_pending_return_packets = MAX_PENDING_RETURN_PACKETS;
@@ -66,15 +68,27 @@ static int PacketPoolIsEmpty(PktPool *pool)
     return 1;
 }
 
+static void UpdateReturnThreshold(PktPool *pool)
+{
+    const float perc = (float)pool->cnt / (float)max_pending_packets;
+    uint32_t threshold = (uint32_t)(perc * (float)max_pending_return_packets);
+    if (threshold != SC_ATOMIC_GET(pool->return_stack.return_threshold)) {
+        SC_ATOMIC_SET(pool->return_stack.return_threshold, threshold);
+    }
+}
+
 void PacketPoolWait(void)
 {
     PktPool *my_pool = GetThreadPacketPool();
 
     if (PacketPoolIsEmpty(my_pool)) {
+        SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 1);
+
         SCMutexLock(&my_pool->return_stack.mutex);
-        SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1);
         SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex);
         SCMutexUnlock(&my_pool->return_stack.mutex);
+
+        UpdateReturnThreshold(my_pool);
     }
 
     while(PacketPoolIsEmpty(my_pool))
@@ -98,6 +112,8 @@ static void PacketPoolGetReturnedPackets(PktPool *pool)
     /* Move all the packets from the locked return stack to the local stack. */
     pool->head = pool->return_stack.head;
     pool->return_stack.head = NULL;
+    pool->cnt += pool->return_stack.cnt;
+    pool->return_stack.cnt = 0;
     SCMutexUnlock(&pool->return_stack.mutex);
 }
 
@@ -119,8 +135,14 @@ Packet *PacketPoolGetPacket(void)
         /* Stack is not empty. */
         Packet *p = pool->head;
         pool->head = p->next;
+        pool->cnt--;
         p->pool = pool;
         PacketReinit(p);
+
+        UpdateReturnThreshold(pool);
+        SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u",
+                ((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt,
+                max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold));
         return p;
     }
 
@@ -135,8 +157,14 @@ Packet *PacketPoolGetPacket(void)
         /* Stack is not empty. */
         Packet *p = pool->head;
         pool->head = p->next;
+        pool->cnt--;
         p->pool = pool;
         PacketReinit(p);
+
+        UpdateReturnThreshold(pool);
+        SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u",
+                ((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt,
+                max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold));
         return p;
     }
 
@@ -170,6 +198,7 @@ void PacketPoolReturnPacket(Packet *p)
         /* Push back onto this thread's own stack, so no locking. */
         p->next = my_pool->head;
         my_pool->head = p;
+        my_pool->cnt++;
     } else {
         PktPool *pending_pool = my_pool->pending_pool;
         if (pending_pool == NULL || pending_pool == pool) {
@@ -187,12 +216,13 @@ void PacketPoolReturnPacket(Packet *p)
                 my_pool->pending_count++;
             }
 
-            if (SC_ATOMIC_GET(pool->return_stack.sync_now) || my_pool->pending_count > max_pending_return_packets) {
+            const uint32_t threshold = SC_ATOMIC_GET(pool->return_stack.return_threshold);
+            if (my_pool->pending_count >= threshold) {
                 /* Return the entire list of pending packets. */
                 SCMutexLock(&pool->return_stack.mutex);
                 my_pool->pending_tail->next = pool->return_stack.head;
                 pool->return_stack.head = my_pool->pending_head;
-                SC_ATOMIC_RESET(pool->return_stack.sync_now);
+                pool->return_stack.cnt += my_pool->pending_count;
                 SCCondSignal(&pool->return_stack.cond);
                 SCMutexUnlock(&pool->return_stack.mutex);
                 /* Clear the list of pending packets to return. */
@@ -206,7 +236,7 @@ void PacketPoolReturnPacket(Packet *p)
             SCMutexLock(&pool->return_stack.mutex);
             p->next = pool->return_stack.head;
             pool->return_stack.head = p;
-            SC_ATOMIC_RESET(pool->return_stack.sync_now);
+            pool->return_stack.cnt++;
             SCMutexUnlock(&pool->return_stack.mutex);
             SCCondSignal(&pool->return_stack.cond);
         }
@@ -225,13 +255,12 @@ void PacketPoolInitEmpty(void)
 
     SCMutexInit(&my_pool->return_stack.mutex, NULL);
     SCCondInit(&my_pool->return_stack.cond, NULL);
-    SC_ATOMIC_INIT(my_pool->return_stack.sync_now);
+    SC_ATOMIC_INIT(my_pool->return_stack.return_threshold);
+    SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32);
 }
 
 void PacketPoolInit(void)
 {
-    extern uint16_t max_pending_packets;
-
     PktPool *my_pool = GetThreadPacketPool();
 
 #ifdef DEBUG_VALIDATION
@@ -242,7 +271,8 @@ void PacketPoolInit(void)
 
     SCMutexInit(&my_pool->return_stack.mutex, NULL);
     SCCondInit(&my_pool->return_stack.cond, NULL);
-    SC_ATOMIC_INIT(my_pool->return_stack.sync_now);
+    SC_ATOMIC_INIT(my_pool->return_stack.return_threshold);
+    SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32);
 
     /* pre allocate packets */
     SCLogDebug("preallocating packets... packet size %" PRIuMAX "",
index 2e9672d4458c9cb3bb80556ccbecc482e01dab0e..74074f953378a16c093e110e30fd21ccf934994c 100644 (file)
@@ -32,7 +32,11 @@ typedef struct PktPoolLockedStack_{
     /* linked list of free packets. */
     SCMutex mutex;
     SCCondT cond;
-    SC_ATOMIC_DECLARE(int, sync_now);
+    /** number of packets in needed to trigger a sync during
+     *  the return to pool logic. Updated by pool owner based
+     *  on how full the pool is. */
+    SC_ATOMIC_DECLARE(uint32_t, return_threshold);
+    uint32_t cnt;
     Packet *head;
 } __attribute__((aligned(CLS))) PktPoolLockedStack;
 
@@ -41,6 +45,8 @@ typedef struct PktPool_ {
      * No mutex is needed.
      */
     Packet *head;
+    uint32_t cnt;
+
     /* Packets waiting (pending) to be returned to the given Packet
      * Pool. Accumulate packets for the same pool until a threshold is
      * reached, then return them all at once.  Keep the head and tail