From: Victor Julien Date: Sun, 29 May 2016 08:40:46 +0000 (+0200) Subject: flow worker: profiling X-Git-Tag: suricata-3.1RC1~65 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e09643c396d6aa7ce35089ff45b5c4ce49c1d036;p=thirdparty%2Fsuricata.git flow worker: profiling Previously the detect and stream code lived in their own thread modules. This meant profiling showed their cost as part of the thread module profiling logic. Now that only the flow worker is a thread module this no longer works. This patch introduces profiling for the 3 current flow worker steps: flow, stream, detect. --- diff --git a/src/decode.h b/src/decode.h index a8641a23a0..886ca2fa1e 100644 --- a/src/decode.h +++ b/src/decode.h @@ -30,6 +30,7 @@ #include "suricata-common.h" #include "threadvars.h" #include "decode-events.h" +#include "flow-worker.h" #ifdef __SC_CUDA_SUPPORT__ #include "util-cuda-buffer.h" @@ -325,6 +326,11 @@ typedef struct PktProfilingTmmData_ { #endif } PktProfilingTmmData; +typedef struct PktProfilingData_ { + uint64_t ticks_start; + uint64_t ticks_end; +} PktProfilingData; + typedef struct PktProfilingDetectData_ { uint64_t ticks_start; uint64_t ticks_end; @@ -341,6 +347,7 @@ typedef struct PktProfiling_ { uint64_t ticks_end; PktProfilingTmmData tmm[TMM_SIZE]; + PktProfilingData flowworker[PROFILE_FLOWWORKER_SIZE]; PktProfilingAppData app[ALPROTO_MAX]; PktProfilingDetectData detect[PROF_DETECT_SIZE]; uint64_t proto_detect; diff --git a/src/flow-worker.c b/src/flow-worker.c index 8bb3043ac0..21a210fee7 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -158,6 +158,8 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac /* handle Flow */ if (p->flags & PKT_WANTS_FLOW) { + FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW); + FlowHandlePacket(tv, fw->dtv, p); if (likely(p->flow != NULL)) { DEBUG_ASSERT_FLOW_LOCKED(p->flow); @@ -165,6 +167,8 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac } /* Flow is now LOCKED */ + FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW); + /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a * pseudo packet created by the flow manager. */ } else if (p->flags & PKT_HAS_FLOW) { @@ -178,7 +182,9 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac SCLogDebug("packet %"PRIu64" is TCP", p->pcap_cnt); DEBUG_ASSERT_FLOW_LOCKED(p->flow); + FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM); StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL); + FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM); /* Packets here can safely access p->flow as it's locked */ SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len); @@ -188,8 +194,11 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac // TODO do we need to call StreamTcp on these pseudo packets or not? //StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL); - if (detect_thread != NULL) + if (detect_thread != NULL) { + FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT); Detect(tv, x, detect_thread, NULL, NULL); + FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT); + } #if 0 // Outputs #endif @@ -204,7 +213,9 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt); if (detect_thread != NULL) { + FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT); Detect(tv, p, detect_thread, NULL, NULL); + FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT); } #if 0 // Outputs @@ -234,6 +245,22 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker) return SC_ATOMIC_GET(fw->detect_thread); } +const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi) +{ + switch (fwi) { + case PROFILE_FLOWWORKER_FLOW: + return "flow"; + case PROFILE_FLOWWORKER_STREAM: + return "stream"; + case PROFILE_FLOWWORKER_DETECT: + return "detect"; + case PROFILE_FLOWWORKER_SIZE: + return "size"; + } + return "error"; +} + + void TmModuleFlowWorkerRegister (void) { tmm_modules[TMM_FLOWWORKER].name = "FlowWorker"; diff --git a/src/flow-worker.h b/src/flow-worker.h index a3528e6286..bda621063c 100644 --- a/src/flow-worker.h +++ b/src/flow-worker.h @@ -18,6 +18,14 @@ #ifndef __FLOW_WORKER_H__ #define __FLOW_WORKER_H__ +enum ProfileFlowWorkerId { + PROFILE_FLOWWORKER_FLOW = 0, + PROFILE_FLOWWORKER_STREAM, + PROFILE_FLOWWORKER_DETECT, + PROFILE_FLOWWORKER_SIZE +}; +const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi); + void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx); void *FlowWorkerGetDetectCtxPtr(void *flow_worker); diff --git a/src/util-profiling.c b/src/util-profiling.c index 2da2b232fb..6942d37fe7 100644 --- a/src/util-profiling.c +++ b/src/util-profiling.c @@ -30,6 +30,7 @@ #include "decode.h" #include "detect.h" #include "conf.h" +#include "flow-worker.h" #include "tm-threads.h" @@ -86,6 +87,13 @@ SCProfilePacketData packet_profile_app_pd_data6[257]; SCProfilePacketData packet_profile_detect_data4[PROF_DETECT_SIZE][257]; SCProfilePacketData packet_profile_detect_data6[PROF_DETECT_SIZE][257]; +struct ProfileProtoRecords { + SCProfilePacketData records4[257]; + SCProfilePacketData records6[257]; +}; + +struct ProfileProtoRecords packet_profile_flowworker_data[PROFILE_FLOWWORKER_SIZE]; + int profiling_packets_enabled = 0; int profiling_packets_csv_enabled = 0; @@ -159,6 +167,7 @@ SCProfilingInit(void) memset(&packet_profile_app_pd_data6, 0, sizeof(packet_profile_app_pd_data6)); memset(&packet_profile_detect_data4, 0, sizeof(packet_profile_detect_data4)); memset(&packet_profile_detect_data6, 0, sizeof(packet_profile_detect_data6)); + memset(&packet_profile_flowworker_data, 0, sizeof(packet_profile_flowworker_data)); const char *filename = ConfNodeLookupChildValue(conf, "filename"); if (filename != NULL) { @@ -302,6 +311,55 @@ SCProfilingDump(void) SCLogInfo("Done dumping profiling data."); } +static void DumpFlowWorkerIP(FILE *fp, int ipv, uint64_t total) +{ + char totalstr[256]; + + enum ProfileFlowWorkerId fwi; + for (fwi = 0; fwi < PROFILE_FLOWWORKER_SIZE; fwi++) { + struct ProfileProtoRecords *r = &packet_profile_flowworker_data[fwi]; + for (int p = 0; p < 257; p++) { + SCProfilePacketData *pd = ipv == 4 ? &r->records4[p] : &r->records6[p]; + if (pd->cnt == 0) { + continue; + } + + FormatNumber(pd->tot, totalstr, sizeof(totalstr)); + double percent = (long double)pd->tot / + (long double)total * 100; + + fprintf(fp, "%-20s IPv%d %3d %12"PRIu64" %12"PRIu64" %12"PRIu64" %12"PRIu64" %12s %-6.2f\n", + ProfileFlowWorkerIdToString(fwi), ipv, p, pd->cnt, + pd->min, pd->max, (uint64_t)(pd->tot / pd->cnt), totalstr, percent); + } + } +} + +static void DumpFlowWorker(FILE *fp) +{ + uint64_t total = 0; + + enum ProfileFlowWorkerId fwi; + for (fwi = 0; fwi < PROFILE_FLOWWORKER_SIZE; fwi++) { + struct ProfileProtoRecords *r = &packet_profile_flowworker_data[fwi]; + for (int p = 0; p < 257; p++) { + SCProfilePacketData *pd = &r->records4[p]; + total += pd->tot; + pd = &r->records6[p]; + total += pd->tot; + } + } + + fprintf(fp, "\n%-20s %-6s %-5s %-12s %-12s %-12s %-12s\n", + "Flow Worker", "IP ver", "Proto", "cnt", "min", "max", "avg"); + fprintf(fp, "%-20s %-6s %-5s %-12s %-12s %-12s %-12s\n", + "--------------------", "------", "-----", "----------", "------------", "------------", "-----------"); + DumpFlowWorkerIP(fp, 4, total); + DumpFlowWorkerIP(fp, 6, total); + fprintf(fp, "Note: %s includes app-layer for TCP\n", + ProfileFlowWorkerIdToString(PROFILE_FLOWWORKER_STREAM)); +} + void SCProfilingDumpPacketStats(void) { int i; @@ -451,6 +509,8 @@ void SCProfilingDumpPacketStats(void) } } + DumpFlowWorker(fp); + fprintf(fp, "\nPer App layer parser stats:\n"); fprintf(fp, "\n%-20s %-6s %-5s %-12s %-12s %-12s %-12s\n", @@ -881,6 +941,49 @@ void SCProfilingUpdatePacketTmmRecords(Packet *p) } } +static inline void SCProfilingUpdatePacketGenericRecord(PktProfilingData *pdt, + SCProfilePacketData *pd) +{ + if (pdt == NULL || pd == NULL) { + return; + } + + uint64_t delta = pdt->ticks_end - pdt->ticks_start; + if (pd->min == 0 || delta < pd->min) { + pd->min = delta; + } + if (pd->max < delta) { + pd->max = delta; + } + + pd->tot += delta; + pd->cnt ++; +} + +static void SCProfilingUpdatePacketGenericRecords(Packet *p, PktProfilingData *pd, + struct ProfileProtoRecords *store, int size) +{ + int i; + for (i = 0; i < size; i++) { + PktProfilingData *pdt = &pd[i]; + + if (pdt->ticks_start == 0 || pdt->ticks_end == 0 || pdt->ticks_start > pdt->ticks_end) { + continue; + } + + struct ProfileProtoRecords *r = &store[i]; + SCProfilePacketData *store = NULL; + + if (PKT_IS_IPV4(p)) { + store = &(r->records4[p->proto]); + } else { + store = &(r->records6[p->proto]); + } + + SCProfilingUpdatePacketGenericRecord(pdt, store); + } +} + void SCProfilingAddPacket(Packet *p) { if (p == NULL || p->profile == NULL || @@ -922,6 +1025,9 @@ void SCProfilingAddPacket(Packet *p) pd->cnt ++; } + SCProfilingUpdatePacketGenericRecords(p, p->profile->flowworker, + packet_profile_flowworker_data, PROFILE_FLOWWORKER_SIZE); + SCProfilingUpdatePacketTmmRecords(p); SCProfilingUpdatePacketAppRecords(p); SCProfilingUpdatePacketDetectRecords(p); @@ -954,6 +1060,9 @@ void SCProfilingAddPacket(Packet *p) pd->cnt ++; } + SCProfilingUpdatePacketGenericRecords(p, p->profile->flowworker, + packet_profile_flowworker_data, PROFILE_FLOWWORKER_SIZE); + SCProfilingUpdatePacketTmmRecords(p); SCProfilingUpdatePacketAppRecords(p); SCProfilingUpdatePacketDetectRecords(p); diff --git a/src/util-profiling.h b/src/util-profiling.h index cc7ea02700..c97f1b5c09 100644 --- a/src/util-profiling.h +++ b/src/util-profiling.h @@ -157,6 +157,20 @@ PktProfiling *SCProfilePacketStart(void); } \ } +#define FLOWWORKER_PROFILING_START(p, id) \ + if (profiling_packets_enabled && (p)->profile != NULL) { \ + if ((id) < PROFILE_FLOWWORKER_SIZE) { \ + (p)->profile->flowworker[(id)].ticks_start = UtilCpuGetTicks();\ + } \ + } + +#define FLOWWORKER_PROFILING_END(p, id) \ + if (profiling_packets_enabled && (p)->profile != NULL) { \ + if ((id) < PROFILE_FLOWWORKER_SIZE) { \ + (p)->profile->flowworker[(id)].ticks_end = UtilCpuGetTicks(); \ + } \ + } + #define PACKET_PROFILING_RESET(p) \ if (profiling_packets_enabled && (p)->profile != NULL) { \ SCFree((p)->profile); \ @@ -292,6 +306,9 @@ void SCProfilingDump(void); #define SGH_PROFILING_RECORD(det_ctx, sgh) +#define FLOWWORKER_PROFILING_START(p, id) +#define FLOWWORKER_PROFILING_END(p, id) + #endif /* PROFILING */ #endif /* ! __UTIL_PROFILE_H__ */