From: Victor Julien Date: Wed, 13 Sep 2023 05:01:53 +0000 (+0200) Subject: packetpool: dynamic return threshold X-Git-Tag: suricata-8.0.0-beta1~2039 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=edc89ce7915824b825d7d67e691b3b3add688427;p=thirdparty%2Fsuricata.git packetpool: dynamic return threshold Problem: In pcap autofp mode, there is one threads reading packets (RX). These packets are then passed on to worker threads. When these workers are done with a packet, they return packets to the pcap reader threads packet pool, which is the owner of the packets. Since this requires expensive synchronization between threads, there is logic in place to batch this operation. When the reader thread depletes its pool, it notifies the other threads that it is starving and that a sync needs to happen asap. Then the reader enters a wait state. During this time no new packets are read. However, there is a problem with this approach. When the reader encountered an empty pool, it would set an atomic flag that it needed a sync. The first worker to return a packet to the pool would then set this flag, sync, and unset the flag. This forced sync could result in just a single packet being synchronized, or several. So if unlucky, the reader would just get a single packet before hitting the same condition again. Solution: This patch updates the logic to use a new approach. Instead of using a binary flag approach where the behavior only changes when the reader is already starved, it uses a dynamic sync threshold that is controlled by the reader. The reader keeps a running count of packets it its pool, and calculates the percentage of available packets. This percentage is then used to set the sync threshold. When the pool is starved, it sets the threshold to 1 (sync for each packet). After each successful get/sync the threshold is adjusted. --- diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index 8594651795..c302bde187 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -35,6 +35,8 @@ #include "util-validate.h" #include "action-globals.h" +extern uint16_t max_pending_packets; + /* Number of freed packet to save for one pool before freeing them. */ #define MAX_PENDING_RETURN_PACKETS 32 static uint32_t max_pending_return_packets = MAX_PENDING_RETURN_PACKETS; @@ -66,15 +68,27 @@ static int PacketPoolIsEmpty(PktPool *pool) return 1; } +static void UpdateReturnThreshold(PktPool *pool) +{ + const float perc = (float)pool->cnt / (float)max_pending_packets; + uint32_t threshold = (uint32_t)(perc * (float)max_pending_return_packets); + if (threshold != SC_ATOMIC_GET(pool->return_stack.return_threshold)) { + SC_ATOMIC_SET(pool->return_stack.return_threshold, threshold); + } +} + void PacketPoolWait(void) { PktPool *my_pool = GetThreadPacketPool(); if (PacketPoolIsEmpty(my_pool)) { + SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 1); + SCMutexLock(&my_pool->return_stack.mutex); - SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1); SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex); SCMutexUnlock(&my_pool->return_stack.mutex); + + UpdateReturnThreshold(my_pool); } while(PacketPoolIsEmpty(my_pool)) @@ -98,6 +112,8 @@ static void PacketPoolGetReturnedPackets(PktPool *pool) /* Move all the packets from the locked return stack to the local stack. */ pool->head = pool->return_stack.head; pool->return_stack.head = NULL; + pool->cnt += pool->return_stack.cnt; + pool->return_stack.cnt = 0; SCMutexUnlock(&pool->return_stack.mutex); } @@ -119,8 +135,14 @@ Packet *PacketPoolGetPacket(void) /* Stack is not empty. */ Packet *p = pool->head; pool->head = p->next; + pool->cnt--; p->pool = pool; PacketReinit(p); + + UpdateReturnThreshold(pool); + SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u", + ((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt, + max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold)); return p; } @@ -135,8 +157,14 @@ Packet *PacketPoolGetPacket(void) /* Stack is not empty. */ Packet *p = pool->head; pool->head = p->next; + pool->cnt--; p->pool = pool; PacketReinit(p); + + UpdateReturnThreshold(pool); + SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u", + ((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt, + max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold)); return p; } @@ -170,6 +198,7 @@ void PacketPoolReturnPacket(Packet *p) /* Push back onto this thread's own stack, so no locking. */ p->next = my_pool->head; my_pool->head = p; + my_pool->cnt++; } else { PktPool *pending_pool = my_pool->pending_pool; if (pending_pool == NULL || pending_pool == pool) { @@ -187,12 +216,13 @@ void PacketPoolReturnPacket(Packet *p) my_pool->pending_count++; } - if (SC_ATOMIC_GET(pool->return_stack.sync_now) || my_pool->pending_count > max_pending_return_packets) { + const uint32_t threshold = SC_ATOMIC_GET(pool->return_stack.return_threshold); + if (my_pool->pending_count >= threshold) { /* Return the entire list of pending packets. */ SCMutexLock(&pool->return_stack.mutex); my_pool->pending_tail->next = pool->return_stack.head; pool->return_stack.head = my_pool->pending_head; - SC_ATOMIC_RESET(pool->return_stack.sync_now); + pool->return_stack.cnt += my_pool->pending_count; SCCondSignal(&pool->return_stack.cond); SCMutexUnlock(&pool->return_stack.mutex); /* Clear the list of pending packets to return. */ @@ -206,7 +236,7 @@ void PacketPoolReturnPacket(Packet *p) SCMutexLock(&pool->return_stack.mutex); p->next = pool->return_stack.head; pool->return_stack.head = p; - SC_ATOMIC_RESET(pool->return_stack.sync_now); + pool->return_stack.cnt++; SCMutexUnlock(&pool->return_stack.mutex); SCCondSignal(&pool->return_stack.cond); } @@ -225,13 +255,12 @@ void PacketPoolInitEmpty(void) SCMutexInit(&my_pool->return_stack.mutex, NULL); SCCondInit(&my_pool->return_stack.cond, NULL); - SC_ATOMIC_INIT(my_pool->return_stack.sync_now); + SC_ATOMIC_INIT(my_pool->return_stack.return_threshold); + SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32); } void PacketPoolInit(void) { - extern uint16_t max_pending_packets; - PktPool *my_pool = GetThreadPacketPool(); #ifdef DEBUG_VALIDATION @@ -242,7 +271,8 @@ void PacketPoolInit(void) SCMutexInit(&my_pool->return_stack.mutex, NULL); SCCondInit(&my_pool->return_stack.cond, NULL); - SC_ATOMIC_INIT(my_pool->return_stack.sync_now); + SC_ATOMIC_INIT(my_pool->return_stack.return_threshold); + SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32); /* pre allocate packets */ SCLogDebug("preallocating packets... packet size %" PRIuMAX "", diff --git a/src/tmqh-packetpool.h b/src/tmqh-packetpool.h index 2e9672d445..74074f9533 100644 --- a/src/tmqh-packetpool.h +++ b/src/tmqh-packetpool.h @@ -32,7 +32,11 @@ typedef struct PktPoolLockedStack_{ /* linked list of free packets. */ SCMutex mutex; SCCondT cond; - SC_ATOMIC_DECLARE(int, sync_now); + /** number of packets in needed to trigger a sync during + * the return to pool logic. Updated by pool owner based + * on how full the pool is. */ + SC_ATOMIC_DECLARE(uint32_t, return_threshold); + uint32_t cnt; Packet *head; } __attribute__((aligned(CLS))) PktPoolLockedStack; @@ -41,6 +45,8 @@ typedef struct PktPool_ { * No mutex is needed. */ Packet *head; + uint32_t cnt; + /* Packets waiting (pending) to be returned to the given Packet * Pool. Accumulate packets for the same pool until a threshold is * reached, then return them all at once. Keep the head and tail