From: Shivani Bhardwaj Date: Sat, 29 Mar 2025 13:33:16 +0000 (+0530) Subject: util: add initial flow rate tracking implementation X-Git-Tag: suricata-8.0.0-beta1~78 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0aea82677644a7e9476059a8be4d29953e4ab712;p=thirdparty%2Fsuricata.git util: add initial flow rate tracking implementation In order to track flow rate and thus determine a course of action or categorize it as elephant flow, track a flow's byte count per direction in a ring buffer for a given time interval. The implementation is simple and keeps overwriting the buffer and updating the final sum. The sum of all the elements in the ring buffer at any point in time should reflect the number of bytes for the respective flow in the last of a given configured interval. e.g. if the definition says that the flows must be tracked by a rate of 100k bytes in 10 seconds, the ring buffer at any point in time should carry the total number of bytes seen by the respective flow in the last 10 seconds. So far, the implementation only supports reading the flow rate definition from suricata.yaml and using it to track the flows. This solution adds up a space complexity to the existing Flow struct. However, the added space complexity should only take effect if the feature is in use. Since this buffer extends the Flow struct, it does not impact the usual business logic or complexity of the code. This implementation is currently limited to defining the time interval of flow rate in seconds only. However, the number of seconds defined are directly proportional to the aforementioned added space complexity as that's the size of the ring buffer. Feature 5647 --- diff --git a/etc/schema.json b/etc/schema.json index f0a8ba5af9..a69bcec35e 100644 --- a/etc/schema.json +++ b/etc/schema.json @@ -1893,6 +1893,9 @@ "alerted": { "type": "boolean" }, + "elephant": { + "type": "boolean" + }, "bypass": { "type": "string" }, @@ -6301,6 +6304,10 @@ "description": "Total number of flows", "type": "integer" }, + "elephant": { + "description": "Total number of elephant flows", + "type": "integer" + }, "udp": { "description": "Number of UDP flows", "type": "integer" diff --git a/src/Makefile.am b/src/Makefile.am index 47fcf90ece..7b69b4f724 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -535,6 +535,7 @@ noinst_HEADERS = \ util-lua-smtp.h \ util-lua-ssh.h \ util-lua-tls.h \ + util-flow-rate.h \ util-macset.h \ util-magic.h \ util-memcmp.h \ @@ -1101,6 +1102,7 @@ libsuricata_c_a_SOURCES = \ util-lua-ssh.c \ util-lua-tls.c \ util-macset.c \ + util-flow-rate.c \ util-magic.c \ util-mem.c \ util-memcmp.c \ diff --git a/src/decode.c b/src/decode.c index 1933745f69..3ba77b9229 100644 --- a/src/decode.c +++ b/src/decode.c @@ -659,6 +659,7 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv) dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv); dtv->counter_flow_icmp6 = StatsRegisterCounter("flow.icmpv6", tv); dtv->counter_flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", tv); + dtv->counter_flow_elephant = StatsRegisterCounter("flow.elephant", tv); dtv->counter_flow_get_used = StatsRegisterCounter("flow.get_used", tv); dtv->counter_flow_get_used_eval = StatsRegisterCounter("flow.get_used_eval", tv); dtv->counter_flow_get_used_eval_reject = StatsRegisterCounter("flow.get_used_eval_reject", tv); diff --git a/src/decode.h b/src/decode.h index b98568a26f..e5147fd8ed 100644 --- a/src/decode.h +++ b/src/decode.h @@ -1004,6 +1004,7 @@ typedef struct DecodeThreadVars_ uint16_t counter_flow_icmp4; uint16_t counter_flow_icmp6; uint16_t counter_flow_tcp_reuse; + uint16_t counter_flow_elephant; uint16_t counter_flow_get_used; uint16_t counter_flow_get_used_eval; uint16_t counter_flow_get_used_eval_reject; diff --git a/src/flow-util.c b/src/flow-util.c index 9e90ae5be9..d9d60a681c 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -36,6 +36,7 @@ #include "util-var.h" #include "util-debug.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "flow-storage.h" #include "detect.h" @@ -202,6 +203,12 @@ void FlowInit(ThreadVars *tv, Flow *f, const Packet *p) FlowSetStorageById(f, MacSetGetFlowStorageID(), ms); } + if (FlowRateStorageEnabled()) { + DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, FlowRateGetStorageID()) != NULL); + FlowRateStore *frs = FlowRateStoreInit(); + FlowSetStorageById(f, FlowRateGetStorageID(), frs); + } + SCFlowRunInitCallbacks(tv, f, p); SCReturn; diff --git a/src/flow.c b/src/flow.c index 07562abac7..4374c95556 100644 --- a/src/flow.c +++ b/src/flow.c @@ -53,6 +53,7 @@ #include "util-byte.h" #include "util-misc.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "util-debug.h" @@ -344,6 +345,30 @@ static inline void FlowUpdateTtlTC(Flow *f, uint8_t ttl) f->max_ttl_toclient = MAX(f->max_ttl_toclient, ttl); } +static inline void FlowUpdateFlowRate( + ThreadVars *tv, DecodeThreadVars *dtv, Flow *f, const Packet *p, int dir) +{ + if (FlowRateStorageEnabled()) { + /* No need to update the struct if flow is already marked as elephant flow */ + if (f->flags & FLOW_IS_ELEPHANT) + return; + FlowRateStore *frs = FlowGetStorageById(f, FlowRateGetStorageID()); + if (frs != NULL) { + FlowRateStoreUpdate(frs, p->ts, GET_PKT_LEN(p), dir); + bool fr_exceeds = FlowRateIsExceeding(frs, dir); + if (fr_exceeds) { + SCLogDebug("Flow rate for flow %p exceeds the configured values, marking it as an " + "elephant flow", + f); + f->flags |= FLOW_IS_ELEPHANT; + if (tv != NULL) { + StatsIncr(tv, dtv->counter_flow_elephant); + } + } + } + } +} + static inline void FlowUpdateEthernet( ThreadVars *tv, DecodeThreadVars *dtv, Flow *f, const Packet *p, bool toserver) { @@ -407,6 +432,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars if (pkt_dir == TOSERVER) { f->todstpktcnt++; f->todstbytecnt += GET_PKT_LEN(p); + FlowUpdateFlowRate(tv, dtv, f, p, TOSERVER); p->flowflags = FLOW_PKT_TOSERVER; if (!(f->flags & FLOW_TO_DST_SEEN)) { if (FlowUpdateSeenFlag(p)) { @@ -431,6 +457,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars } else { f->tosrcpktcnt++; f->tosrcbytecnt += GET_PKT_LEN(p); + FlowUpdateFlowRate(tv, dtv, f, p, TOCLIENT); p->flowflags = FLOW_PKT_TOCLIENT; if (!(f->flags & FLOW_TO_SRC_SEEN)) { if (FlowUpdateSeenFlag(p)) { diff --git a/src/flow.h b/src/flow.h index cf083387a3..53d4c8bb51 100644 --- a/src/flow.h +++ b/src/flow.h @@ -55,7 +55,8 @@ typedef struct AppLayerParserState_ AppLayerParserState; /** next packet in toclient direction will act on updated app-layer state */ #define FLOW_TC_APP_UPDATE_NEXT BIT_U32(2) -// vacancy bit 3 +/** Flow is marked an elephant flow */ +#define FLOW_IS_ELEPHANT BIT_U32(3) // vacancy bit 4 diff --git a/src/output-json-flow.c b/src/output-json-flow.c index 1c4f1dcc3d..fa40453603 100644 --- a/src/output-json-flow.c +++ b/src/output-json-flow.c @@ -334,6 +334,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, SCJsonBuilder *jb, Flow *f) if (f->flags & FLOW_WRONG_THREAD) JB_SET_TRUE(jb, "wrong_thread"); + if (f->flags & FLOW_IS_ELEPHANT) + JB_SET_TRUE(jb, "elephant"); + if (f->flags & FLOW_ACTION_DROP) { JB_SET_STRING(jb, "action", "drop"); } else if (f->flags & FLOW_ACTION_PASS) { diff --git a/src/runmode-unittests.c b/src/runmode-unittests.c index 634c918435..f11cce5fb3 100644 --- a/src/runmode-unittests.c +++ b/src/runmode-unittests.c @@ -89,6 +89,7 @@ #include "util-byte.h" #include "util-proto-name.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "util-memrchr.h" #include "util-mpm-ac.h" @@ -205,6 +206,7 @@ static void RegisterUnittests(void) AppLayerUnittestsRegister(); StreamingBufferRegisterTests(); MacSetRegisterTests(); + FlowRateRegisterTests(); #ifdef OS_WIN32 Win32SyscallRegisterTests(); #endif diff --git a/src/suricata.c b/src/suricata.c index ebabf5068a..68aadaf406 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -126,6 +126,7 @@ #include "util-ioctl.h" #include "util-landlock.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "util-misc.h" #include "util-mpm-hs.h" #include "util-path.h" @@ -2677,6 +2678,7 @@ int PostConfLoadedSetup(SCInstance *suri) RegisterFlowBypassInfo(); MacSetRegisterFlowStorage(); + FlowRateRegisterFlowStorage(); SigTableInit(); diff --git a/src/util-flow-rate.c b/src/util-flow-rate.c new file mode 100644 index 0000000000..44e74dbf7b --- /dev/null +++ b/src/util-flow-rate.c @@ -0,0 +1,629 @@ +/* Copyright (C) 2025 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Shivani Bhardwaj + * + */ + +#include "suricata-common.h" +#include "flow-storage.h" +#include "flow-util.h" +#include "flow-private.h" +#include "util-storage.h" +#include "conf.h" +#include "util-misc.h" +#include "util-byte.h" +#include "util-flow-rate.h" +#include "util-unittest.h" +#include "util-unittest-helper.h" + +FlowStorageId g_flowrate_storage_id = { .id = -1 }; + +FlowRateConfig flow_rate_config; + +static void FlowRateStoreFree(void *ptr) +{ + FlowRateStore *frs = (FlowRateStore *)ptr; + size_t total_free = 0; + if (frs == NULL) + return; + + for (int i = 0; i < 2; i++) { + if (frs->dir[i].buf != NULL) { + SCFree(frs->dir[i].buf); + total_free += (frs->dir[i].size * sizeof(uint64_t)); + } + } + + SCFree(frs); + total_free += sizeof(*frs); + (void)SC_ATOMIC_SUB(flow_memuse, total_free); +} + +void FlowRateRegisterFlowStorage(void) +{ + SCConfNode *root = SCConfGetNode("flow"); + if (root == NULL) + return; + + bool track_flow = false; + track_flow = SCConfNodeLookupChild(root, "rate-tracking") != NULL ? true : false; + if (!track_flow) + return; + + SCConfNode *node = SCConfGetNode("flow.rate-tracking"); + const char *val = SCConfNodeLookupChildValue(node, "bytes"); + if (val == NULL) { + FatalError("No value for flow tracking bytes"); + } + uint64_t bytes = 0; + if (ParseSizeStringU64(val, &bytes) < 0) { + FatalError("Invalid value for flow tracking bytes"); + } + flow_rate_config.bytes = bytes; + + val = SCConfNodeLookupChildValue(node, "interval"); + if (val == NULL) { + FatalError("No value for flow tracking interval"); + } + SCTime_t interval = SCTIME_INITIALIZER; + uint16_t secs = 0; + if ((StringParseUint16(&secs, 10, 0, val) < 0) || (secs == 0)) { + FatalError("Invalid value for flow tracking interval"); + } + flow_rate_config.interval = SCTIME_ADD_SECS(interval, secs); + + g_flowrate_storage_id = + FlowStorageRegister("flowrate", sizeof(void *), NULL, FlowRateStoreFree); +} + +bool FlowRateStorageEnabled(void) +{ + return (g_flowrate_storage_id.id != -1); +} + +FlowRateStore *FlowRateStoreInit(void) +{ + FlowRateStore *frs = NULL; + size_t total_memuse = 0; + size_t expected_memuse = (2 * flow_rate_config.interval.secs * sizeof(uint64_t)) + sizeof(*frs); + + if (!FLOW_CHECK_MEMCAP(expected_memuse)) { + return NULL; + } + frs = SCCalloc(1, sizeof(*frs)); + if (unlikely(frs == NULL)) { + return NULL; + } + + total_memuse += sizeof(*frs); + for (int i = 0; i < 2; i++) { + frs->dir[i].size = (uint16_t)flow_rate_config.interval.secs; + frs->dir[i].buf = SCCalloc(frs->dir[i].size, sizeof(uint64_t)); + if (unlikely(frs->dir[i].buf == NULL)) { + FlowRateStoreFree(frs); + return NULL; + } + frs->dir[i].start_ts = SCTIME_INITIALIZER; + frs->dir[i].last_ts = SCTIME_INITIALIZER; + total_memuse += (frs->dir[i].size * sizeof(uint64_t)); + } + DEBUG_VALIDATE_BUG_ON(total_memuse != expected_memuse); + (void)SC_ATOMIC_ADD(flow_memuse, total_memuse); + + return frs; +} + +FlowStorageId FlowRateGetStorageID(void) +{ + return g_flowrate_storage_id; +} + +static inline void FlowRateClearSumInRange( + FlowRateStore *frs, uint16_t start, uint16_t end, int direction) +{ + for (uint16_t i = start; i <= end; i++) { + uint64_t byte_count_at_i = frs->dir[direction].buf[i]; + frs->dir[direction].buf[i] = 0; + DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum < byte_count_at_i); + frs->dir[direction].sum -= byte_count_at_i; + } +} + +static inline void FlowRateStoreUpdateCurrentRing( + FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, uint16_t idx, int direction) +{ + if (idx > frs->dir[direction].last_idx + 1) { + /* Index is not the same as last or the next so, the ring must be flushed for the items + * in between and sum updated */ + FlowRateClearSumInRange(frs, frs->dir[direction].last_idx + 1, idx, direction); + frs->dir[direction].buf[idx] += pkt_len; + /* Update the total sum */ + frs->dir[direction].sum += pkt_len; + } else if ((idx == frs->dir[direction].last_idx) || (idx == frs->dir[direction].last_idx + 1)) { + /* Index matches the last updated index in the ring buffer or is the next index in the + * buffer */ + /* Add to the existing open time interval */ + frs->dir[direction].buf[idx] += pkt_len; + /* Update the total sum */ + frs->dir[direction].sum += pkt_len; + } else { + /* Index is revisited after a full round of the buffer */ + uint64_t prev_byte_count = frs->dir[direction].buf[idx]; + /* Overwrite the buffer */ + frs->dir[direction].buf[idx] = pkt_len; + DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum < prev_byte_count); + /* Sum should get rid of previous count on the same index */ + frs->dir[direction].sum += pkt_len - prev_byte_count; + frs->dir[direction].start_ts = p_ts; + } + frs->dir[direction].last_idx = idx; +} + +static inline void FlowRateStoreFlushRing( + FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, int direction) +{ + memset(frs->dir[direction].buf, 0, frs->dir[direction].size); + frs->dir[direction].last_idx = 0; + frs->dir[direction].start_ts = p_ts; + frs->dir[direction].buf[0] = pkt_len; + /* Overwrite the sum calculated so far */ + frs->dir[direction].sum = pkt_len; +} + +void FlowRateStoreUpdate(FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, int direction) +{ + if (frs->dir[direction].last_ts.secs == 0) { + /* Should only happen when the ring is first used */ + DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum > 0); + /* Initialize last_ts and start_ts with the first packet's timestamp */ + frs->dir[direction].last_ts = p_ts; + frs->dir[direction].start_ts = p_ts; + } + + SCTime_t start_ts = frs->dir[direction].start_ts; + uint16_t idx = (p_ts.secs - start_ts.secs) % frs->dir[direction].size; + /* Update start_ts in case of initiating the revisit of buffer */ + if ((frs->dir[direction].last_idx == frs->dir[direction].size - 1) && + (frs->dir[direction].last_idx != idx)) { + start_ts = p_ts; + if (idx != 0) { + /* Update the sum */ + FlowRateClearSumInRange(frs, 0, idx, direction); + /* Consider current packet a new start of the ring */ + idx = 0; + } + } + /* If the packet has come in the last open interval of time */ + if (p_ts.secs - start_ts.secs < frs->dir[direction].size) { + FlowRateStoreUpdateCurrentRing(frs, p_ts, pkt_len, idx, direction); + } else { + /* Packet arrived after one or more rounds of the entire buffer */ + /* Flush the entire buffer */ + FlowRateStoreFlushRing(frs, p_ts, pkt_len, direction); + } + /* In any case, update the last seen timestamp */ + frs->dir[direction].last_ts = p_ts; +} + +bool FlowRateIsExceeding(FlowRateStore *frs, int direction) +{ + if (frs->dir[direction].sum >= flow_rate_config.bytes) { + return true; + } + return false; +} + +#ifdef UNITTESTS + +/* Test to check update of the same buffer item */ +static int FlowRateTest01(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 100; + flow_rate_config.interval = (SCTime_t){ .secs = 10, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 10); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 92); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 139); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of all buffer items */ +static int FlowRateTest02(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of wrapping around ring buffer */ +static int FlowRateTest03(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 4; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 180); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of buffer if new pkt comes out of the window */ +static int FlowRateTest04(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 60; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 44); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p2->ts.secs); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of wrapping around ring buffer when the packet + * out of the window but also does not fall on the first index of the ring */ +static int FlowRateTest05(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 6; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 89); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check sum when packet is within the window but is coming after a gap */ +static int FlowRateTest06(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 4; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 180); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + Packet *p6 = UTHBuildPacket((uint8_t *)"suricata", 8, IPPROTO_TCP); + p6->ts.secs = p1->ts.secs + 7; + FlowRateStoreUpdate(frs, p6->ts, GET_PKT_LEN(p6), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 91); + FAIL_IF(frs->dir[0].last_ts.secs != p6->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + FAIL_IF(frs->dir[0].buf[1] != 0); + FAIL_IF(frs->dir[0].buf[2] != 0); + FAIL_IF(frs->dir[0].buf[3] != 48); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check sum when two packets are back to back within the window but are coming after a gap + */ +static int FlowRateTest07(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 5; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 136); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + Packet *p6 = UTHBuildPacket((uint8_t *)"suricata", 8, IPPROTO_TCP); + p6->ts.secs = p1->ts.secs + 8; + FlowRateStoreUpdate(frs, p6->ts, GET_PKT_LEN(p6), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 91); + FAIL_IF(frs->dir[0].last_ts.secs != p6->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + FAIL_IF(frs->dir[0].buf[1] != 0); + FAIL_IF(frs->dir[0].buf[2] != 0); + FAIL_IF(frs->dir[0].buf[3] != 48); + + FlowRateStoreFree(frs); + PASS; +} + +void FlowRateRegisterTests(void) +{ + UtRegisterTest("FlowRateTest01", FlowRateTest01); + UtRegisterTest("FlowRateTest02", FlowRateTest02); + UtRegisterTest("FlowRateTest03", FlowRateTest03); + UtRegisterTest("FlowRateTest04", FlowRateTest04); + UtRegisterTest("FlowRateTest05", FlowRateTest05); + UtRegisterTest("FlowRateTest06", FlowRateTest06); + UtRegisterTest("FlowRateTest07", FlowRateTest07); +} +#endif diff --git a/src/util-flow-rate.h b/src/util-flow-rate.h new file mode 100644 index 0000000000..d4754cadb5 --- /dev/null +++ b/src/util-flow-rate.h @@ -0,0 +1,64 @@ +/* Copyright (C) 2025 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Shivani Bhardwaj + */ + +#ifndef SURICATA_UTIL_FLOW_RATE_H +#define SURICATA_UTIL_FLOW_RATE_H + +typedef struct FlowRateConfig_ { + uint64_t bytes; + SCTime_t interval; +} FlowRateConfig; + +typedef struct FlowRateDirStore_ { + /* Ring buffer to store byte count per second in */ + uint64_t *buf; + /* Total sum of bytes per direction */ + uint64_t sum; + /* Last index that was updated in the buffer */ + uint16_t last_idx; + /* Size of the ring; should be same for both directions */ + uint16_t size; + /* start timestamp to define and track the beginning of buffer */ + SCTime_t start_ts; + /* last timestamp that was processed in the buffer */ + SCTime_t last_ts; +} FlowRateDirStore; + +typedef struct FlowRateStore_ { + FlowRateDirStore dir[2]; +} FlowRateStore; + +extern FlowRateConfig flow_rate_config; + +bool FlowRateStorageEnabled(void); +void FlowRateRegisterFlowStorage(void); +FlowRateStore *FlowRateStoreInit(void); +FlowStorageId FlowRateGetStorageID(void); +void FlowRateStoreUpdate(FlowRateStore *, SCTime_t, uint32_t, int); +bool FlowRateIsExceeding(FlowRateStore *, int); + +#ifdef UNITTESTS +void FlowRateRegisterTests(void); +#endif + +#endif diff --git a/suricata.yaml.in b/suricata.yaml.in index e83ec1bfa6..5d366283a7 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -1498,6 +1498,11 @@ flow: emergency-recovery: 30 #managers: 1 # default to one flow manager #recyclers: 1 # default to one flow recycler thread + # Track flows and count them as elephant flow if they exceed the rate defined + # by the byte count per interval configured below. + #rate-tracking: + # bytes: 1GiB + # interval: 10 # seconds is the only supported unit for interval so far # This option controls the use of VLAN ids in the flow (and defrag) # hashing. Normally this should be enabled, but in some (broken)