]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
util: add initial flow rate tracking implementation
authorShivani Bhardwaj <shivani@oisf.net>
Sat, 29 Mar 2025 13:33:16 +0000 (19:03 +0530)
committerVictor Julien <victor@inliniac.net>
Thu, 3 Apr 2025 08:05:51 +0000 (10:05 +0200)
In order to track flow rate and thus determine a course of action or
categorize it as elephant flow, track a flow's byte count per direction
in a ring buffer for a given time interval.

The implementation is simple and keeps overwriting the buffer and
updating the final sum. The sum of all the elements in the ring buffer
at any point in time should reflect the number of bytes for the
respective flow in the last of a given configured interval.
e.g. if the definition says that the flows must be tracked by a rate of
100k bytes in 10 seconds, the ring buffer at any point in time should
carry the total number of bytes seen by the respective flow in the last
10 seconds.

So far, the implementation only supports reading the flow rate
definition from suricata.yaml and using it to track the flows.

This solution adds up a space complexity to the existing Flow struct.
However, the added space complexity should only take effect if the
feature is in use. Since this buffer extends the Flow struct, it does
not impact the usual business logic or complexity of the code.

This implementation is currently limited to defining the time interval
of flow rate in seconds only. However, the number of seconds defined are
directly proportional to the aforementioned added space complexity as
that's the size of the ring buffer.

Feature 5647

13 files changed:
etc/schema.json
src/Makefile.am
src/decode.c
src/decode.h
src/flow-util.c
src/flow.c
src/flow.h
src/output-json-flow.c
src/runmode-unittests.c
src/suricata.c
src/util-flow-rate.c [new file with mode: 0644]
src/util-flow-rate.h [new file with mode: 0644]
suricata.yaml.in

index f0a8ba5af9ffcc3ce53b248ef59c2669191b234b..a69bcec35e137168bbfa765248b943ecdbb927a0 100644 (file)
                 "alerted": {
                     "type": "boolean"
                 },
+                "elephant": {
+                    "type": "boolean"
+                },
                 "bypass": {
                     "type": "string"
                 },
                             "description": "Total number of flows",
                             "type": "integer"
                         },
+                        "elephant": {
+                            "description": "Total number of elephant flows",
+                            "type": "integer"
+                        },
                         "udp": {
                             "description": "Number of UDP flows",
                             "type": "integer"
index 47fcf90eceac40f3f6aae795b0ba23b4c0401202..7b69b4f7247be169dcadfc2868ac41bae1fa8303 100755 (executable)
@@ -535,6 +535,7 @@ noinst_HEADERS = \
        util-lua-smtp.h \
        util-lua-ssh.h \
        util-lua-tls.h \
+       util-flow-rate.h \
        util-macset.h \
        util-magic.h \
        util-memcmp.h \
@@ -1101,6 +1102,7 @@ libsuricata_c_a_SOURCES = \
        util-lua-ssh.c \
        util-lua-tls.c \
        util-macset.c \
+       util-flow-rate.c \
        util-magic.c \
        util-mem.c \
        util-memcmp.c \
index 1933745f6917fad12f1ae8c0a44db0319fb5e8e6..3ba77b92292bf2c4646bfa4c46cb63d32af4d757 100644 (file)
@@ -659,6 +659,7 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
     dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv);
     dtv->counter_flow_icmp6 = StatsRegisterCounter("flow.icmpv6", tv);
     dtv->counter_flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", tv);
+    dtv->counter_flow_elephant = StatsRegisterCounter("flow.elephant", tv);
     dtv->counter_flow_get_used = StatsRegisterCounter("flow.get_used", tv);
     dtv->counter_flow_get_used_eval = StatsRegisterCounter("flow.get_used_eval", tv);
     dtv->counter_flow_get_used_eval_reject = StatsRegisterCounter("flow.get_used_eval_reject", tv);
index b98568a26fd66c0d23cdbee60dfbce294f259dfd..e5147fd8ed8c1282a3c121554d066798fb53ae9e 100644 (file)
@@ -1004,6 +1004,7 @@ typedef struct DecodeThreadVars_
     uint16_t counter_flow_icmp4;
     uint16_t counter_flow_icmp6;
     uint16_t counter_flow_tcp_reuse;
+    uint16_t counter_flow_elephant;
     uint16_t counter_flow_get_used;
     uint16_t counter_flow_get_used_eval;
     uint16_t counter_flow_get_used_eval_reject;
index 9e90ae5be9bc07649036bb86e09332aa1da47673..d9d60a681ca3f7f940071735f654ec395d53b9c2 100644 (file)
@@ -36,6 +36,7 @@
 #include "util-var.h"
 #include "util-debug.h"
 #include "util-macset.h"
+#include "util-flow-rate.h"
 #include "flow-storage.h"
 
 #include "detect.h"
@@ -202,6 +203,12 @@ void FlowInit(ThreadVars *tv, Flow *f, const Packet *p)
         FlowSetStorageById(f, MacSetGetFlowStorageID(), ms);
     }
 
+    if (FlowRateStorageEnabled()) {
+        DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, FlowRateGetStorageID()) != NULL);
+        FlowRateStore *frs = FlowRateStoreInit();
+        FlowSetStorageById(f, FlowRateGetStorageID(), frs);
+    }
+
     SCFlowRunInitCallbacks(tv, f, p);
 
     SCReturn;
index 07562abac71dd3bb50f7fe533161cfbd843c590a..4374c955564a206c4dbc5d7f260d327d85d666c1 100644 (file)
@@ -53,6 +53,7 @@
 #include "util-byte.h"
 #include "util-misc.h"
 #include "util-macset.h"
+#include "util-flow-rate.h"
 
 #include "util-debug.h"
 
@@ -344,6 +345,30 @@ static inline void FlowUpdateTtlTC(Flow *f, uint8_t ttl)
     f->max_ttl_toclient = MAX(f->max_ttl_toclient, ttl);
 }
 
+static inline void FlowUpdateFlowRate(
+        ThreadVars *tv, DecodeThreadVars *dtv, Flow *f, const Packet *p, int dir)
+{
+    if (FlowRateStorageEnabled()) {
+        /* No need to update the struct if flow is already marked as elephant flow */
+        if (f->flags & FLOW_IS_ELEPHANT)
+            return;
+        FlowRateStore *frs = FlowGetStorageById(f, FlowRateGetStorageID());
+        if (frs != NULL) {
+            FlowRateStoreUpdate(frs, p->ts, GET_PKT_LEN(p), dir);
+            bool fr_exceeds = FlowRateIsExceeding(frs, dir);
+            if (fr_exceeds) {
+                SCLogDebug("Flow rate for flow %p exceeds the configured values, marking it as an "
+                           "elephant flow",
+                        f);
+                f->flags |= FLOW_IS_ELEPHANT;
+                if (tv != NULL) {
+                    StatsIncr(tv, dtv->counter_flow_elephant);
+                }
+            }
+        }
+    }
+}
+
 static inline void FlowUpdateEthernet(
         ThreadVars *tv, DecodeThreadVars *dtv, Flow *f, const Packet *p, bool toserver)
 {
@@ -407,6 +432,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars
     if (pkt_dir == TOSERVER) {
         f->todstpktcnt++;
         f->todstbytecnt += GET_PKT_LEN(p);
+        FlowUpdateFlowRate(tv, dtv, f, p, TOSERVER);
         p->flowflags = FLOW_PKT_TOSERVER;
         if (!(f->flags & FLOW_TO_DST_SEEN)) {
             if (FlowUpdateSeenFlag(p)) {
@@ -431,6 +457,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars
     } else {
         f->tosrcpktcnt++;
         f->tosrcbytecnt += GET_PKT_LEN(p);
+        FlowUpdateFlowRate(tv, dtv, f, p, TOCLIENT);
         p->flowflags = FLOW_PKT_TOCLIENT;
         if (!(f->flags & FLOW_TO_SRC_SEEN)) {
             if (FlowUpdateSeenFlag(p)) {
index cf083387a308801fe718b934bdf7574441297e73..53d4c8bb51c6a1428a0af316b01510b86a54998f 100644 (file)
@@ -55,7 +55,8 @@ typedef struct AppLayerParserState_ AppLayerParserState;
 /** next packet in toclient direction will act on updated app-layer state */
 #define FLOW_TC_APP_UPDATE_NEXT BIT_U32(2)
 
-// vacancy bit 3
+/** Flow is marked an elephant flow */
+#define FLOW_IS_ELEPHANT BIT_U32(3)
 
 // vacancy bit 4
 
index 1c4f1dcc3d8ea0cd2bb75a37e1dabecf110b4642..fa404536030323ffe8a362a555d46ccc2f854130 100644 (file)
@@ -334,6 +334,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, SCJsonBuilder *jb, Flow *f)
     if (f->flags & FLOW_WRONG_THREAD)
         JB_SET_TRUE(jb, "wrong_thread");
 
+    if (f->flags & FLOW_IS_ELEPHANT)
+        JB_SET_TRUE(jb, "elephant");
+
     if (f->flags & FLOW_ACTION_DROP) {
         JB_SET_STRING(jb, "action", "drop");
     } else if (f->flags & FLOW_ACTION_PASS) {
index 634c9184351a07c6319a2855c17356aabefbd0ef..f11cce5fb3643abcf6a6a827e18ddd9f441e0606 100644 (file)
@@ -89,6 +89,7 @@
 #include "util-byte.h"
 #include "util-proto-name.h"
 #include "util-macset.h"
+#include "util-flow-rate.h"
 #include "util-memrchr.h"
 
 #include "util-mpm-ac.h"
@@ -205,6 +206,7 @@ static void RegisterUnittests(void)
     AppLayerUnittestsRegister();
     StreamingBufferRegisterTests();
     MacSetRegisterTests();
+    FlowRateRegisterTests();
 #ifdef OS_WIN32
     Win32SyscallRegisterTests();
 #endif
index ebabf5068a9749b42cee14231b4fe381c8d1ee2f..68aadaf406f3c893147d2b57deb9d0c514fe07a5 100644 (file)
 #include "util-ioctl.h"
 #include "util-landlock.h"
 #include "util-macset.h"
+#include "util-flow-rate.h"
 #include "util-misc.h"
 #include "util-mpm-hs.h"
 #include "util-path.h"
@@ -2677,6 +2678,7 @@ int PostConfLoadedSetup(SCInstance *suri)
     RegisterFlowBypassInfo();
 
     MacSetRegisterFlowStorage();
+    FlowRateRegisterFlowStorage();
 
     SigTableInit();
 
diff --git a/src/util-flow-rate.c b/src/util-flow-rate.c
new file mode 100644 (file)
index 0000000..44e74db
--- /dev/null
@@ -0,0 +1,629 @@
+/* Copyright (C) 2025 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Shivani Bhardwaj <shivani@oisf.net>
+ *
+ */
+
+#include "suricata-common.h"
+#include "flow-storage.h"
+#include "flow-util.h"
+#include "flow-private.h"
+#include "util-storage.h"
+#include "conf.h"
+#include "util-misc.h"
+#include "util-byte.h"
+#include "util-flow-rate.h"
+#include "util-unittest.h"
+#include "util-unittest-helper.h"
+
+FlowStorageId g_flowrate_storage_id = { .id = -1 };
+
+FlowRateConfig flow_rate_config;
+
+static void FlowRateStoreFree(void *ptr)
+{
+    FlowRateStore *frs = (FlowRateStore *)ptr;
+    size_t total_free = 0;
+    if (frs == NULL)
+        return;
+
+    for (int i = 0; i < 2; i++) {
+        if (frs->dir[i].buf != NULL) {
+            SCFree(frs->dir[i].buf);
+            total_free += (frs->dir[i].size * sizeof(uint64_t));
+        }
+    }
+
+    SCFree(frs);
+    total_free += sizeof(*frs);
+    (void)SC_ATOMIC_SUB(flow_memuse, total_free);
+}
+
+void FlowRateRegisterFlowStorage(void)
+{
+    SCConfNode *root = SCConfGetNode("flow");
+    if (root == NULL)
+        return;
+
+    bool track_flow = false;
+    track_flow = SCConfNodeLookupChild(root, "rate-tracking") != NULL ? true : false;
+    if (!track_flow)
+        return;
+
+    SCConfNode *node = SCConfGetNode("flow.rate-tracking");
+    const char *val = SCConfNodeLookupChildValue(node, "bytes");
+    if (val == NULL) {
+        FatalError("No value for flow tracking bytes");
+    }
+    uint64_t bytes = 0;
+    if (ParseSizeStringU64(val, &bytes) < 0) {
+        FatalError("Invalid value for flow tracking bytes");
+    }
+    flow_rate_config.bytes = bytes;
+
+    val = SCConfNodeLookupChildValue(node, "interval");
+    if (val == NULL) {
+        FatalError("No value for flow tracking interval");
+    }
+    SCTime_t interval = SCTIME_INITIALIZER;
+    uint16_t secs = 0;
+    if ((StringParseUint16(&secs, 10, 0, val) < 0) || (secs == 0)) {
+        FatalError("Invalid value for flow tracking interval");
+    }
+    flow_rate_config.interval = SCTIME_ADD_SECS(interval, secs);
+
+    g_flowrate_storage_id =
+            FlowStorageRegister("flowrate", sizeof(void *), NULL, FlowRateStoreFree);
+}
+
+bool FlowRateStorageEnabled(void)
+{
+    return (g_flowrate_storage_id.id != -1);
+}
+
+FlowRateStore *FlowRateStoreInit(void)
+{
+    FlowRateStore *frs = NULL;
+    size_t total_memuse = 0;
+    size_t expected_memuse = (2 * flow_rate_config.interval.secs * sizeof(uint64_t)) + sizeof(*frs);
+
+    if (!FLOW_CHECK_MEMCAP(expected_memuse)) {
+        return NULL;
+    }
+    frs = SCCalloc(1, sizeof(*frs));
+    if (unlikely(frs == NULL)) {
+        return NULL;
+    }
+
+    total_memuse += sizeof(*frs);
+    for (int i = 0; i < 2; i++) {
+        frs->dir[i].size = (uint16_t)flow_rate_config.interval.secs;
+        frs->dir[i].buf = SCCalloc(frs->dir[i].size, sizeof(uint64_t));
+        if (unlikely(frs->dir[i].buf == NULL)) {
+            FlowRateStoreFree(frs);
+            return NULL;
+        }
+        frs->dir[i].start_ts = SCTIME_INITIALIZER;
+        frs->dir[i].last_ts = SCTIME_INITIALIZER;
+        total_memuse += (frs->dir[i].size * sizeof(uint64_t));
+    }
+    DEBUG_VALIDATE_BUG_ON(total_memuse != expected_memuse);
+    (void)SC_ATOMIC_ADD(flow_memuse, total_memuse);
+
+    return frs;
+}
+
+FlowStorageId FlowRateGetStorageID(void)
+{
+    return g_flowrate_storage_id;
+}
+
+static inline void FlowRateClearSumInRange(
+        FlowRateStore *frs, uint16_t start, uint16_t end, int direction)
+{
+    for (uint16_t i = start; i <= end; i++) {
+        uint64_t byte_count_at_i = frs->dir[direction].buf[i];
+        frs->dir[direction].buf[i] = 0;
+        DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum < byte_count_at_i);
+        frs->dir[direction].sum -= byte_count_at_i;
+    }
+}
+
+static inline void FlowRateStoreUpdateCurrentRing(
+        FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, uint16_t idx, int direction)
+{
+    if (idx > frs->dir[direction].last_idx + 1) {
+        /* Index is not the same as last or the next so, the ring must be flushed for the items
+         * in between and sum updated */
+        FlowRateClearSumInRange(frs, frs->dir[direction].last_idx + 1, idx, direction);
+        frs->dir[direction].buf[idx] += pkt_len;
+        /* Update the total sum */
+        frs->dir[direction].sum += pkt_len;
+    } else if ((idx == frs->dir[direction].last_idx) || (idx == frs->dir[direction].last_idx + 1)) {
+        /* Index matches the last updated index in the ring buffer or is the next index in the
+         * buffer */
+        /* Add to the existing open time interval */
+        frs->dir[direction].buf[idx] += pkt_len;
+        /* Update the total sum */
+        frs->dir[direction].sum += pkt_len;
+    } else {
+        /* Index is revisited after a full round of the buffer */
+        uint64_t prev_byte_count = frs->dir[direction].buf[idx];
+        /* Overwrite the buffer */
+        frs->dir[direction].buf[idx] = pkt_len;
+        DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum < prev_byte_count);
+        /* Sum should get rid of previous count on the same index */
+        frs->dir[direction].sum += pkt_len - prev_byte_count;
+        frs->dir[direction].start_ts = p_ts;
+    }
+    frs->dir[direction].last_idx = idx;
+}
+
+static inline void FlowRateStoreFlushRing(
+        FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, int direction)
+{
+    memset(frs->dir[direction].buf, 0, frs->dir[direction].size);
+    frs->dir[direction].last_idx = 0;
+    frs->dir[direction].start_ts = p_ts;
+    frs->dir[direction].buf[0] = pkt_len;
+    /* Overwrite the sum calculated so far */
+    frs->dir[direction].sum = pkt_len;
+}
+
+void FlowRateStoreUpdate(FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, int direction)
+{
+    if (frs->dir[direction].last_ts.secs == 0) {
+        /* Should only happen when the ring is first used */
+        DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum > 0);
+        /* Initialize last_ts and start_ts with the first packet's timestamp */
+        frs->dir[direction].last_ts = p_ts;
+        frs->dir[direction].start_ts = p_ts;
+    }
+
+    SCTime_t start_ts = frs->dir[direction].start_ts;
+    uint16_t idx = (p_ts.secs - start_ts.secs) % frs->dir[direction].size;
+    /* Update start_ts in case of initiating the revisit of buffer */
+    if ((frs->dir[direction].last_idx == frs->dir[direction].size - 1) &&
+            (frs->dir[direction].last_idx != idx)) {
+        start_ts = p_ts;
+        if (idx != 0) {
+            /* Update the sum */
+            FlowRateClearSumInRange(frs, 0, idx, direction);
+            /* Consider current packet a new start of the ring */
+            idx = 0;
+        }
+    }
+    /* If the packet has come in the last open interval of time */
+    if (p_ts.secs - start_ts.secs < frs->dir[direction].size) {
+        FlowRateStoreUpdateCurrentRing(frs, p_ts, pkt_len, idx, direction);
+    } else {
+        /* Packet arrived after one or more rounds of the entire buffer */
+        /* Flush the entire buffer */
+        FlowRateStoreFlushRing(frs, p_ts, pkt_len, direction);
+    }
+    /* In any case, update the last seen timestamp */
+    frs->dir[direction].last_ts = p_ts;
+}
+
+bool FlowRateIsExceeding(FlowRateStore *frs, int direction)
+{
+    if (frs->dir[direction].sum >= flow_rate_config.bytes) {
+        return true;
+    }
+    return false;
+}
+
+#ifdef UNITTESTS
+
+/* Test to check update of the same buffer item */
+static int FlowRateTest01(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 100;
+    flow_rate_config.interval = (SCTime_t){ .secs = 10, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 10);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 92);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 92);
+
+    Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER);
+    /* Total length of packet is 47 */
+    FAIL_IF(frs->dir[0].sum != 139);
+    FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 139);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+/* Test to check update of all buffer items */
+static int FlowRateTest02(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 200;
+    flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 4);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    p2->ts.secs = p1->ts.secs + 1;
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 92);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[1] != 44);
+
+    Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP);
+    p3->ts.secs = p1->ts.secs + 2;
+    FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER);
+    /* Total length of packet is 47 */
+    FAIL_IF(frs->dir[0].sum != 139);
+    FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs);
+    FAIL_IF(frs->dir[0].buf[2] != 47);
+
+    Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP);
+    p4->ts.secs = p1->ts.secs + 3;
+    FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER);
+    /* Total length of packet is 46 */
+    FAIL_IF(frs->dir[0].sum != 185);
+    FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs);
+    FAIL_IF(frs->dir[0].buf[3] != 46);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+/* Test to check update of wrapping around ring buffer */
+static int FlowRateTest03(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 200;
+    flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 4);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    p2->ts.secs = p1->ts.secs + 1;
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 92);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[1] != 44);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP);
+    p3->ts.secs = p1->ts.secs + 2;
+    FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER);
+    /* Total length of packet is 47 */
+    FAIL_IF(frs->dir[0].sum != 139);
+    FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs);
+    FAIL_IF(frs->dir[0].buf[2] != 47);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP);
+    p4->ts.secs = p1->ts.secs + 3;
+    FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER);
+    /* Total length of packet is 46 */
+    FAIL_IF(frs->dir[0].sum != 185);
+    FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs);
+    FAIL_IF(frs->dir[0].buf[3] != 46);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP);
+    p5->ts.secs = p1->ts.secs + 4;
+    FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER);
+    /* Total length of packet is 43 */
+    FAIL_IF(frs->dir[0].sum != 180);
+    FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 43);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+/* Test to check update of buffer if new pkt comes out of the window */
+static int FlowRateTest04(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 200;
+    flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 4);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    p2->ts.secs = p1->ts.secs + 60;
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 44);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 44);
+    FAIL_IF(frs->dir[0].start_ts.secs != p2->ts.secs);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+/* Test to check update of wrapping around ring buffer when the packet
+ * out of the window but also does not fall on the first index of the ring */
+static int FlowRateTest05(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 200;
+    flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 4);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    p2->ts.secs = p1->ts.secs + 1;
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 92);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[1] != 44);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP);
+    p3->ts.secs = p1->ts.secs + 2;
+    FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER);
+    /* Total length of packet is 47 */
+    FAIL_IF(frs->dir[0].sum != 139);
+    FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs);
+    FAIL_IF(frs->dir[0].buf[2] != 47);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP);
+    p4->ts.secs = p1->ts.secs + 3;
+    FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER);
+    /* Total length of packet is 46 */
+    FAIL_IF(frs->dir[0].sum != 185);
+    FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs);
+    FAIL_IF(frs->dir[0].buf[3] != 46);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP);
+    p5->ts.secs = p1->ts.secs + 6;
+    FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER);
+    /* Total length of packet is 43 */
+    FAIL_IF(frs->dir[0].sum != 89);
+    FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 43);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+/* Test to check sum when packet is within the window but is coming after a gap */
+static int FlowRateTest06(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 200;
+    flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 4);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    p2->ts.secs = p1->ts.secs + 1;
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 92);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[1] != 44);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP);
+    p3->ts.secs = p1->ts.secs + 2;
+    FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER);
+    /* Total length of packet is 47 */
+    FAIL_IF(frs->dir[0].sum != 139);
+    FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs);
+    FAIL_IF(frs->dir[0].buf[2] != 47);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP);
+    p4->ts.secs = p1->ts.secs + 3;
+    FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER);
+    /* Total length of packet is 46 */
+    FAIL_IF(frs->dir[0].sum != 185);
+    FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs);
+    FAIL_IF(frs->dir[0].buf[3] != 46);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP);
+    p5->ts.secs = p1->ts.secs + 4;
+    FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER);
+    /* Total length of packet is 43 */
+    FAIL_IF(frs->dir[0].sum != 180);
+    FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 43);
+
+    Packet *p6 = UTHBuildPacket((uint8_t *)"suricata", 8, IPPROTO_TCP);
+    p6->ts.secs = p1->ts.secs + 7;
+    FlowRateStoreUpdate(frs, p6->ts, GET_PKT_LEN(p6), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 91);
+    FAIL_IF(frs->dir[0].last_ts.secs != p6->ts.secs);
+    FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 43);
+    FAIL_IF(frs->dir[0].buf[1] != 0);
+    FAIL_IF(frs->dir[0].buf[2] != 0);
+    FAIL_IF(frs->dir[0].buf[3] != 48);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+/* Test to check sum when two packets are back to back within the window but are coming after a gap
+ */
+static int FlowRateTest07(void)
+{
+    SC_ATOMIC_SET(flow_config.memcap, 10000);
+    flow_rate_config.bytes = 200;
+    flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 };
+    FlowRateStore *frs = FlowRateStoreInit();
+    FAIL_IF_NULL(frs);
+    for (int i = 0; i < 2; i++) {
+        FAIL_IF(frs->dir[i].size != 4);
+        FAIL_IF(frs->dir[i].sum != 0);
+    }
+    Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP);
+    FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 48);
+    FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 48);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP);
+    p2->ts.secs = p1->ts.secs + 1;
+    FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER);
+    /* Total length of packet is 44 */
+    FAIL_IF(frs->dir[0].sum != 92);
+    FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs);
+    FAIL_IF(frs->dir[0].buf[1] != 44);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP);
+    p3->ts.secs = p1->ts.secs + 2;
+    FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER);
+    /* Total length of packet is 47 */
+    FAIL_IF(frs->dir[0].sum != 139);
+    FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs);
+    FAIL_IF(frs->dir[0].buf[2] != 47);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP);
+    p4->ts.secs = p1->ts.secs + 3;
+    FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER);
+    /* Total length of packet is 46 */
+    FAIL_IF(frs->dir[0].sum != 185);
+    FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs);
+    FAIL_IF(frs->dir[0].buf[3] != 46);
+    FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs);
+
+    Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP);
+    p5->ts.secs = p1->ts.secs + 5;
+    FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER);
+    /* Total length of packet is 43 */
+    FAIL_IF(frs->dir[0].sum != 136);
+    FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 43);
+
+    Packet *p6 = UTHBuildPacket((uint8_t *)"suricata", 8, IPPROTO_TCP);
+    p6->ts.secs = p1->ts.secs + 8;
+    FlowRateStoreUpdate(frs, p6->ts, GET_PKT_LEN(p6), TOSERVER);
+    /* Total length of packet is 48 */
+    FAIL_IF(frs->dir[0].sum != 91);
+    FAIL_IF(frs->dir[0].last_ts.secs != p6->ts.secs);
+    FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs);
+    FAIL_IF(frs->dir[0].buf[0] != 43);
+    FAIL_IF(frs->dir[0].buf[1] != 0);
+    FAIL_IF(frs->dir[0].buf[2] != 0);
+    FAIL_IF(frs->dir[0].buf[3] != 48);
+
+    FlowRateStoreFree(frs);
+    PASS;
+}
+
+void FlowRateRegisterTests(void)
+{
+    UtRegisterTest("FlowRateTest01", FlowRateTest01);
+    UtRegisterTest("FlowRateTest02", FlowRateTest02);
+    UtRegisterTest("FlowRateTest03", FlowRateTest03);
+    UtRegisterTest("FlowRateTest04", FlowRateTest04);
+    UtRegisterTest("FlowRateTest05", FlowRateTest05);
+    UtRegisterTest("FlowRateTest06", FlowRateTest06);
+    UtRegisterTest("FlowRateTest07", FlowRateTest07);
+}
+#endif
diff --git a/src/util-flow-rate.h b/src/util-flow-rate.h
new file mode 100644 (file)
index 0000000..d4754ca
--- /dev/null
@@ -0,0 +1,64 @@
+/* Copyright (C) 2025 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Shivani Bhardwaj <shivani@oisf.net>
+ */
+
+#ifndef SURICATA_UTIL_FLOW_RATE_H
+#define SURICATA_UTIL_FLOW_RATE_H
+
+typedef struct FlowRateConfig_ {
+    uint64_t bytes;
+    SCTime_t interval;
+} FlowRateConfig;
+
+typedef struct FlowRateDirStore_ {
+    /* Ring buffer to store byte count per second in */
+    uint64_t *buf;
+    /* Total sum of bytes per direction */
+    uint64_t sum;
+    /* Last index that was updated in the buffer */
+    uint16_t last_idx;
+    /* Size of the ring; should be same for both directions */
+    uint16_t size;
+    /* start timestamp to define and track the beginning of buffer */
+    SCTime_t start_ts;
+    /* last timestamp that was processed in the buffer */
+    SCTime_t last_ts;
+} FlowRateDirStore;
+
+typedef struct FlowRateStore_ {
+    FlowRateDirStore dir[2];
+} FlowRateStore;
+
+extern FlowRateConfig flow_rate_config;
+
+bool FlowRateStorageEnabled(void);
+void FlowRateRegisterFlowStorage(void);
+FlowRateStore *FlowRateStoreInit(void);
+FlowStorageId FlowRateGetStorageID(void);
+void FlowRateStoreUpdate(FlowRateStore *, SCTime_t, uint32_t, int);
+bool FlowRateIsExceeding(FlowRateStore *, int);
+
+#ifdef UNITTESTS
+void FlowRateRegisterTests(void);
+#endif
+
+#endif
index e83ec1bfa6e604ea56ff58cd8aa3acb716f7b80e..5d366283a73cd5fc0fd33b887f3bdb0912454b64 100644 (file)
@@ -1498,6 +1498,11 @@ flow:
   emergency-recovery: 30
   #managers: 1 # default to one flow manager
   #recyclers: 1 # default to one flow recycler thread
+  # Track flows and count them as elephant flow if they exceed the rate defined
+  # by the byte count per interval configured below.
+  #rate-tracking:
+  #  bytes: 1GiB
+  #  interval: 10 # seconds is the only supported unit for interval so far
 
 # This option controls the use of VLAN ids in the flow (and defrag)
 # hashing. Normally this should be enabled, but in some (broken)