dtv->counter_nsh = StatsRegisterMaxCounter("decoder.nsh", tv);
dtv->counter_flow_memcap = StatsRegisterCounter("flow.memcap", tv);
+ dtv->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
+ dtv->counter_flow_total = StatsRegisterCounter("flow.total", tv);
+ dtv->counter_flow_active = StatsRegisterCounter("flow.active", tv);
dtv->counter_flow_tcp = StatsRegisterCounter("flow.tcp", tv);
dtv->counter_flow_udp = StatsRegisterCounter("flow.udp", tv);
dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv);
uint16_t counter_flow_memcap;
+ uint16_t counter_tcp_active_sessions;
+ uint16_t counter_flow_total;
+ uint16_t counter_flow_active;
uint16_t counter_flow_tcp;
uint16_t counter_flow_udp;
uint16_t counter_flow_icmp4;
#ifdef UNITTESTS
if (tv && dtv) {
#endif
+ StatsIncr(tv, dtv->counter_flow_total);
+ StatsIncr(tv, dtv->counter_flow_active);
switch (proto){
case IPPROTO_UDP:
StatsIncr(tv, dtv->counter_flow_udp);
return cnt;
}
-static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f)
-{
- FLOWLOCK_WRLOCK(f);
-
- (void)OutputFlowLog(tv, output_thread_data, f);
-
- FlowClearMemory (f, f->protomap);
- FLOWLOCK_UNLOCK(f);
- FlowSparePoolReturnFlow(f);
-}
-
typedef struct FlowQueueTimeoutCounters {
uint32_t flows_removed;
uint32_t flows_timeout;
typedef struct FlowRecyclerThreadData_ {
void *output_thread_data;
+
+ uint16_t counter_flows;
+ uint16_t counter_queue_avg;
+ uint16_t counter_queue_max;
+
+ uint16_t counter_flow_active;
+ uint16_t counter_tcp_active_sessions;
+ FlowEndCounters fec;
} FlowRecyclerThreadData;
static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
}
SCLogDebug("output_thread_data %p", ftd->output_thread_data);
+ ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
+ ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
+ ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
+
+ ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
+ ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
+
+ FlowEndCountersRegister(t, &ftd->fec);
+
*data = ftd;
return TM_ECODE_OK;
}
return TM_ECODE_OK;
}
+static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
+{
+ FLOWLOCK_WRLOCK(f);
+
+ (void)OutputFlowLog(tv, ftd->output_thread_data, f);
+
+ FlowEndCountersUpdate(tv, &ftd->fec, f);
+ if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+ StatsDecr(tv, ftd->counter_tcp_active_sessions);
+ }
+ StatsDecr(tv, ftd->counter_flow_active);
+
+ FlowClearMemory(f, f->protomap);
+ FLOWLOCK_UNLOCK(f);
+ FlowSparePoolReturnFlow(f);
+}
+
/** \brief Thread that manages timed out flows.
*
* \param td ThreadVars casted to void ptr
SC_ATOMIC_ADD(flowrec_busy,1);
FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
+ StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
+ StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
+
const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
/* Get the time */
Flow *f;
while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
- Recycler(th_v, ftd->output_thread_data, f);
+ Recycler(th_v, ftd, f);
recycled_cnt++;
+ StatsIncr(th_v, ftd->counter_flows);
}
SC_ATOMIC_SUB(flowrec_busy,1);
g_bypass_info_id = FlowStorageRegister("bypass_counters", sizeof(void *),
NULL, FlowBypassFree);
}
+
+void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
+{
+ for (int i = 0; i < FLOW_STATE_SIZE; i++) {
+ const char *name = NULL;
+ if (i == FLOW_STATE_NEW) {
+ name = "flow.end.state.new";
+ } else if (i == FLOW_STATE_ESTABLISHED) {
+ name = "flow.end.state.established";
+ } else if (i == FLOW_STATE_CLOSED) {
+ name = "flow.end.state.closed";
+ } else if (i == FLOW_STATE_LOCAL_BYPASSED) {
+ name = "flow.end.state.local_bypassed";
+#ifdef CAPTURE_OFFLOAD
+ } else if (i == FLOW_STATE_CAPTURE_BYPASSED) {
+ name = "flow.end.state.capture_bypassed";
+#endif
+ }
+ if (name) {
+ fec->flow_state[i] = StatsRegisterCounter(name, t);
+ }
+ }
+
+ for (enum TcpState i = TCP_NONE; i <= TCP_CLOSED; i++) {
+ const char *name;
+ switch (i) {
+ case TCP_NONE:
+ name = "flow.end.tcp_state.none";
+ break;
+ case TCP_LISTEN:
+ name = "flow.end.tcp_state.listen";
+ break;
+ case TCP_SYN_SENT:
+ name = "flow.end.tcp_state.syn_sent";
+ break;
+ case TCP_SYN_RECV:
+ name = "flow.end.tcp_state.syn_recv";
+ break;
+ case TCP_ESTABLISHED:
+ name = "flow.end.tcp_state.established";
+ break;
+ case TCP_FIN_WAIT1:
+ name = "flow.end.tcp_state.fin_wait1";
+ break;
+ case TCP_FIN_WAIT2:
+ name = "flow.end.tcp_state.fin_wait2";
+ break;
+ case TCP_TIME_WAIT:
+ name = "flow.end.tcp_state.time_wait";
+ break;
+ case TCP_LAST_ACK:
+ name = "flow.end.tcp_state.last_ack";
+ break;
+ case TCP_CLOSE_WAIT:
+ name = "flow.end.tcp_state.close_wait";
+ break;
+ case TCP_CLOSING:
+ name = "flow.end.tcp_state.closing";
+ break;
+ case TCP_CLOSED:
+ name = "flow.end.tcp_state.closed";
+ break;
+ }
+
+ fec->flow_tcp_state[i] = StatsRegisterCounter(name, t);
+ }
+}
#include "detect-engine-state.h"
#include "tmqh-flow.h"
+#include "stream-tcp-private.h"
#define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec)
void FlowInit(Flow *, const Packet *);
uint8_t FlowGetReverseProtoMapping(uint8_t rproto);
+/* flow end counter logic */
+
+typedef struct FlowEndCounters_ {
+ uint16_t flow_state[FLOW_STATE_SIZE];
+ uint16_t flow_tcp_state[TCP_CLOSED + 1];
+} FlowEndCounters;
+
+static inline void FlowEndCountersUpdate(ThreadVars *tv, FlowEndCounters *fec, Flow *f)
+{
+ if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+ TcpSession *ssn = f->protoctx;
+ StatsIncr(tv, fec->flow_tcp_state[ssn->state]);
+ }
+ StatsIncr(tv, fec->flow_state[f->flow_state]);
+}
+
+void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec);
+
#endif /* __FLOW_UTIL_H__ */
uint16_t flows_aside_needs_work;
uint16_t flows_aside_pkt_inject;
} cnt;
+ FlowEndCounters fec;
} FlowWorkerThreadData;
if (fw->output_thread_flow != NULL)
(void)OutputFlowLog(tv, fw->output_thread_flow, f);
+ FlowEndCountersUpdate(tv, &fw->fec, f);
+ if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+ StatsDecr(tv, fw->dtv->counter_tcp_active_sessions);
+ }
+ StatsDecr(tv, fw->dtv->counter_flow_active);
+
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
DecodeRegisterPerfCounters(fw->dtv, tv);
AppLayerRegisterThreadCounters(tv);
+ FlowEndCountersRegister(tv, &fw->fec);
/* setup pq for stream end pkts */
memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
FLOW_STATE_CAPTURE_BYPASSED,
#endif
};
+#ifdef CAPTURE_OFFLOAD
+#define FLOW_STATE_SIZE 5
+#else
+#define FLOW_STATE_SIZE 4
+#endif
typedef struct FlowProtoTimeout_ {
uint32_t new_timeout;
return -1;
}
StatsIncr(tv, stt->counter_tcp_sessions);
+ StatsIncr(tv, stt->counter_tcp_active_sessions);
StatsIncr(tv, stt->counter_tcp_midstream_pickups);
}
}
StatsIncr(tv, stt->counter_tcp_sessions);
+ StatsIncr(tv, stt->counter_tcp_active_sessions);
}
/* set the state */
return -1;
}
StatsIncr(tv, stt->counter_tcp_sessions);
+ StatsIncr(tv, stt->counter_tcp_active_sessions);
StatsIncr(tv, stt->counter_tcp_midstream_pickups);
}
/* set the state */
*data = (void *)stt;
+ stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv);
stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv);
stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv);
* receiving (valid) RST packets */
PacketQueueNoLock pseudo_queue;
+ uint16_t counter_tcp_active_sessions;
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
uint16_t counter_tcp_ssn_memcap;