]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: add various flow counters
authorVictor Julien <vjulien@oisf.net>
Thu, 25 Nov 2021 06:53:10 +0000 (07:53 +0100)
committerVictor Julien <vjulien@oisf.net>
Mon, 13 Jun 2022 10:58:20 +0000 (12:58 +0200)
Add flow.end state counters

Add active TCP sessions counter

Add flow.active counter

Add flow.total counter

Ticket: #1478.

src/decode.c
src/decode.h
src/flow-hash.c
src/flow-manager.c
src/flow-util.c
src/flow-util.h
src/flow-worker.c
src/flow.h
src/stream-tcp.c
src/stream-tcp.h

index e837502651b7e14ca6d9178f96fea63494a3e411..3f6a3a674bd3af3293a6551ff821c2b5f9bdbe07 100644 (file)
@@ -560,6 +560,9 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
     dtv->counter_nsh = StatsRegisterMaxCounter("decoder.nsh", tv);
     dtv->counter_flow_memcap = StatsRegisterCounter("flow.memcap", tv);
 
+    dtv->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
+    dtv->counter_flow_total = StatsRegisterCounter("flow.total", tv);
+    dtv->counter_flow_active = StatsRegisterCounter("flow.active", tv);
     dtv->counter_flow_tcp = StatsRegisterCounter("flow.tcp", tv);
     dtv->counter_flow_udp = StatsRegisterCounter("flow.udp", tv);
     dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv);
index fb4f58647a5cdb8b1c913029d3b5fb6e90c05f21..010c3ad6296812384a7f2efddf3e70fe00c60086 100644 (file)
@@ -724,6 +724,9 @@ typedef struct DecodeThreadVars_
 
     uint16_t counter_flow_memcap;
 
+    uint16_t counter_tcp_active_sessions;
+    uint16_t counter_flow_total;
+    uint16_t counter_flow_active;
     uint16_t counter_flow_tcp;
     uint16_t counter_flow_udp;
     uint16_t counter_flow_icmp4;
index 9421d7ee739c2e88fc71427d1a81e1f4779d6aef..f16bac3ef83c86b44432b89175855999cb05a995 100644 (file)
@@ -486,6 +486,8 @@ static inline void FlowUpdateCounter(ThreadVars *tv, DecodeThreadVars *dtv,
 #ifdef UNITTESTS
     if (tv && dtv) {
 #endif
+        StatsIncr(tv, dtv->counter_flow_total);
+        StatsIncr(tv, dtv->counter_flow_active);
         switch (proto){
             case IPPROTO_UDP:
                 StatsIncr(tv, dtv->counter_flow_udp);
index 2faabc018d67552502afb25da40260b1a3d5e6de..d65505eb8211c2342b708a8298644fd4a045a72d 100644 (file)
@@ -599,17 +599,6 @@ static uint32_t FlowCleanupHash(void)
     return cnt;
 }
 
-static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f)
-{
-    FLOWLOCK_WRLOCK(f);
-
-    (void)OutputFlowLog(tv, output_thread_data, f);
-
-    FlowClearMemory (f, f->protomap);
-    FLOWLOCK_UNLOCK(f);
-    FlowSparePoolReturnFlow(f);
-}
-
 typedef struct FlowQueueTimeoutCounters {
     uint32_t flows_removed;
     uint32_t flows_timeout;
@@ -1028,6 +1017,14 @@ void FlowManagerThreadSpawn()
 
 typedef struct FlowRecyclerThreadData_ {
     void *output_thread_data;
+
+    uint16_t counter_flows;
+    uint16_t counter_queue_avg;
+    uint16_t counter_queue_max;
+
+    uint16_t counter_flow_active;
+    uint16_t counter_tcp_active_sessions;
+    FlowEndCounters fec;
 } FlowRecyclerThreadData;
 
 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
@@ -1042,6 +1039,15 @@ static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void
     }
     SCLogDebug("output_thread_data %p", ftd->output_thread_data);
 
+    ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
+    ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
+    ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
+
+    ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
+    ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
+
+    FlowEndCountersRegister(t, &ftd->fec);
+
     *data = ftd;
     return TM_ECODE_OK;
 }
@@ -1056,6 +1062,23 @@ static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
     return TM_ECODE_OK;
 }
 
+static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
+{
+    FLOWLOCK_WRLOCK(f);
+
+    (void)OutputFlowLog(tv, ftd->output_thread_data, f);
+
+    FlowEndCountersUpdate(tv, &ftd->fec, f);
+    if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+        StatsDecr(tv, ftd->counter_tcp_active_sessions);
+    }
+    StatsDecr(tv, ftd->counter_flow_active);
+
+    FlowClearMemory(f, f->protomap);
+    FLOWLOCK_UNLOCK(f);
+    FlowSparePoolReturnFlow(f);
+}
+
 /** \brief Thread that manages timed out flows.
  *
  *  \param td ThreadVars casted to void ptr
@@ -1079,6 +1102,9 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
         SC_ATOMIC_ADD(flowrec_busy,1);
         FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
 
+        StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
+        StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
+
         const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
 
         /* Get the time */
@@ -1088,8 +1114,9 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
 
         Flow *f;
         while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
-            Recycler(th_v, ftd->output_thread_data, f);
+            Recycler(th_v, ftd, f);
             recycled_cnt++;
+            StatsIncr(th_v, ftd->counter_flows);
         }
         SC_ATOMIC_SUB(flowrec_busy,1);
 
index a9f94c70fdee148d6273b22ce25bf3cd80ebef23..289caa3e66fc03b1c8c41976a3d0fe0983ea009c 100644 (file)
@@ -240,3 +240,70 @@ void RegisterFlowBypassInfo(void)
     g_bypass_info_id = FlowStorageRegister("bypass_counters", sizeof(void *),
                                               NULL, FlowBypassFree);
 }
+
+void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
+{
+    for (int i = 0; i < FLOW_STATE_SIZE; i++) {
+        const char *name = NULL;
+        if (i == FLOW_STATE_NEW) {
+            name = "flow.end.state.new";
+        } else if (i == FLOW_STATE_ESTABLISHED) {
+            name = "flow.end.state.established";
+        } else if (i == FLOW_STATE_CLOSED) {
+            name = "flow.end.state.closed";
+        } else if (i == FLOW_STATE_LOCAL_BYPASSED) {
+            name = "flow.end.state.local_bypassed";
+#ifdef CAPTURE_OFFLOAD
+        } else if (i == FLOW_STATE_CAPTURE_BYPASSED) {
+            name = "flow.end.state.capture_bypassed";
+#endif
+        }
+        if (name) {
+            fec->flow_state[i] = StatsRegisterCounter(name, t);
+        }
+    }
+
+    for (enum TcpState i = TCP_NONE; i <= TCP_CLOSED; i++) {
+        const char *name;
+        switch (i) {
+            case TCP_NONE:
+                name = "flow.end.tcp_state.none";
+                break;
+            case TCP_LISTEN:
+                name = "flow.end.tcp_state.listen";
+                break;
+            case TCP_SYN_SENT:
+                name = "flow.end.tcp_state.syn_sent";
+                break;
+            case TCP_SYN_RECV:
+                name = "flow.end.tcp_state.syn_recv";
+                break;
+            case TCP_ESTABLISHED:
+                name = "flow.end.tcp_state.established";
+                break;
+            case TCP_FIN_WAIT1:
+                name = "flow.end.tcp_state.fin_wait1";
+                break;
+            case TCP_FIN_WAIT2:
+                name = "flow.end.tcp_state.fin_wait2";
+                break;
+            case TCP_TIME_WAIT:
+                name = "flow.end.tcp_state.time_wait";
+                break;
+            case TCP_LAST_ACK:
+                name = "flow.end.tcp_state.last_ack";
+                break;
+            case TCP_CLOSE_WAIT:
+                name = "flow.end.tcp_state.close_wait";
+                break;
+            case TCP_CLOSING:
+                name = "flow.end.tcp_state.closing";
+                break;
+            case TCP_CLOSED:
+                name = "flow.end.tcp_state.closed";
+                break;
+        }
+
+        fec->flow_tcp_state[i] = StatsRegisterCounter(name, t);
+    }
+}
index ff7a7bce5324b7d76cf0f1ebddfba2fca858aa0a..9e11693a0d13fc8c44d1e6940fc91b84ea47a6f2 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "detect-engine-state.h"
 #include "tmqh-flow.h"
+#include "stream-tcp-private.h"
 
 #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec)
 
@@ -151,5 +152,23 @@ uint8_t FlowGetProtoMapping(uint8_t);
 void FlowInit(Flow *, const Packet *);
 uint8_t FlowGetReverseProtoMapping(uint8_t rproto);
 
+/* flow end counter logic */
+
+typedef struct FlowEndCounters_ {
+    uint16_t flow_state[FLOW_STATE_SIZE];
+    uint16_t flow_tcp_state[TCP_CLOSED + 1];
+} FlowEndCounters;
+
+static inline void FlowEndCountersUpdate(ThreadVars *tv, FlowEndCounters *fec, Flow *f)
+{
+    if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+        TcpSession *ssn = f->protoctx;
+        StatsIncr(tv, fec->flow_tcp_state[ssn->state]);
+    }
+    StatsIncr(tv, fec->flow_state[f->flow_state]);
+}
+
+void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec);
+
 #endif /* __FLOW_UTIL_H__ */
 
index 0a781cca971c0a35270e57e1c95427f6447cfffd..97f3f192a7e7ede0e60d5163dfec7c595706145f 100644 (file)
@@ -83,6 +83,7 @@ typedef struct FlowWorkerThreadData_ {
         uint16_t flows_aside_needs_work;
         uint16_t flows_aside_pkt_inject;
     } cnt;
+    FlowEndCounters fec;
 
 } FlowWorkerThreadData;
 
@@ -189,6 +190,12 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
         if (fw->output_thread_flow != NULL)
             (void)OutputFlowLog(tv, fw->output_thread_flow, f);
 
+        FlowEndCountersUpdate(tv, &fw->fec, f);
+        if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+            StatsDecr(tv, fw->dtv->counter_tcp_active_sessions);
+        }
+        StatsDecr(tv, fw->dtv->counter_flow_active);
+
         FlowClearMemory (f, f->protomap);
         FLOWLOCK_UNLOCK(f);
         if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
@@ -288,6 +295,7 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void *
 
     DecodeRegisterPerfCounters(fw->dtv, tv);
     AppLayerRegisterThreadCounters(tv);
+    FlowEndCountersRegister(tv, &fw->fec);
 
     /* setup pq for stream end pkts */
     memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
index 9887380459233e4e6c8ca1fe19ded23e045314e5..2ec7f531fcfa67686d366acefada13326d91c846 100644 (file)
@@ -517,6 +517,11 @@ enum FlowState {
     FLOW_STATE_CAPTURE_BYPASSED,
 #endif
 };
+#ifdef CAPTURE_OFFLOAD
+#define FLOW_STATE_SIZE 5
+#else
+#define FLOW_STATE_SIZE 4
+#endif
 
 typedef struct FlowProtoTimeout_ {
     uint32_t new_timeout;
index d0d770f39cef3ea992703cebbfef2e11800d480a..f083fe1af19169db544787ecf99300300a8912a5 100644 (file)
@@ -935,6 +935,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
                 return -1;
             }
             StatsIncr(tv, stt->counter_tcp_sessions);
+            StatsIncr(tv, stt->counter_tcp_active_sessions);
             StatsIncr(tv, stt->counter_tcp_midstream_pickups);
         }
 
@@ -1028,6 +1029,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
             }
 
             StatsIncr(tv, stt->counter_tcp_sessions);
+            StatsIncr(tv, stt->counter_tcp_active_sessions);
         }
 
         /* set the state */
@@ -1094,6 +1096,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
                 return -1;
             }
             StatsIncr(tv, stt->counter_tcp_sessions);
+            StatsIncr(tv, stt->counter_tcp_active_sessions);
             StatsIncr(tv, stt->counter_tcp_midstream_pickups);
         }
         /* set the state */
@@ -5371,6 +5374,7 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
 
     *data = (void *)stt;
 
+    stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
     stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv);
     stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv);
     stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv);
index 58b97c9b2a0c2fda592895a588c0e0f590f6a3e9..15cfee210d91b3d09e6b9338a57ce916f51e3af4 100644 (file)
@@ -78,6 +78,7 @@ typedef struct StreamTcpThread_ {
      *  receiving (valid) RST packets */
     PacketQueueNoLock pseudo_queue;
 
+    uint16_t counter_tcp_active_sessions;
     uint16_t counter_tcp_sessions;
     /** sessions not picked up because memcap was reached */
     uint16_t counter_tcp_ssn_memcap;