]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
bypass: use flow storage for bypass counter
authorEric Leblond <eric@regit.org>
Sun, 24 Mar 2019 18:47:02 +0000 (19:47 +0100)
committerVictor Julien <victor@inliniac.net>
Tue, 18 Jun 2019 05:07:02 +0000 (07:07 +0200)
There is a synchronization issue occuring when a flow is
added to the eBPF bypass maps. The flow can have packets
in the ring buffer that have already passed the eBPF stage.
By consequences, they are not accounted in the eBPF counter
but are accounted by Suricata flow engine.

This was causing counters to be completely wrong. This code
fixes the issue by avoiding the counter change in invalid
case.

To avoid adding 4 64bits integers to the Flow structure for the
bypass accounting, we use instead a FlowStorage. This limits the
memory usage to the size of a pointer.

src/decode.c
src/flow-util.c
src/flow.h
src/output-json-flow.c
src/source-af-packet.c
src/suricata.c
src/util-ebpf.c

index b19365d0547675a08b4359a2cf48321bdf221546..77c41990440bf4ea6fb2cc7116e8f1062a897232 100644 (file)
@@ -66,6 +66,7 @@
 #include "util-hash-string.h"
 #include "output.h"
 #include "output-flow.h"
+#include "flow-storage.h"
 
 extern bool stats_decoder_events;
 const char *stats_decoder_events_prefix;
@@ -407,6 +408,12 @@ void PacketBypassCallback(Packet *p)
         return;
     }
 
+    FlowCounters *fc = SCCalloc(sizeof(FlowCounters), 1);
+    if (fc) {
+        FlowSetStorageById(p->flow, GetFlowBypassCounterID(), fc);
+    } else {
+        return;
+    }
     if (p->BypassPacketsFlow && p->BypassPacketsFlow(p)) {
         FlowUpdateState(p->flow, FLOW_STATE_CAPTURE_BYPASSED);
     } else {
index 0dc08a6d5722c0ad6df345d8bc2f30a4f926bff3..ed95cfc2617e82ef21f1d12cc3d424e918d3d819 100644 (file)
@@ -203,3 +203,22 @@ void FlowInit(Flow *f, const Packet *p)
 
     SCReturn;
 }
+
+int g_bypass_counter_id = -1;
+
+int GetFlowBypassCounterID(void)
+{
+    return g_bypass_counter_id;
+}
+
+static void FlowCounterFree(void *x)
+{
+    if (x)
+        SCFree(x);
+}
+
+void RegisterFlowBypassCounter(void)
+{
+    g_bypass_counter_id = FlowStorageRegister("bypass_counters", sizeof(void *),
+                                              NULL, FlowCounterFree);
+}
index 50951e59702f800de257a2ebeb34afa22e7f88cf..19a92299170cff33bf57f74207ea09fd3a3092b3 100644 (file)
@@ -482,6 +482,13 @@ typedef struct FlowProtoFreeFunc_ {
     void (*Freefunc)(void *);
 } FlowProtoFreeFunc;
 
+typedef struct FlowCounters_ {
+    uint64_t tosrcpktcnt;
+    uint64_t tosrcbytecnt;
+    uint64_t todstpktcnt;
+    uint64_t todstbytecnt;
+} FlowCounters;
+
 /** \brief prepare packet for a life with flow
  *  Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup
  *  and calc the hash value to be used in the lookup and autofp flow
@@ -522,6 +529,9 @@ int FlowSetMemcap(uint64_t size);
 uint64_t FlowGetMemcap(void);
 uint64_t FlowGetMemuse(void);
 
+int GetFlowBypassCounterID(void);
+void RegisterFlowBypassCounter(void);
+
 /** ----- Inline functions ----- */
 
 /** \brief Set the No Packet Inspection Flag without locking the flow.
index 09f2a7ccee868c821aec657f6fb47a0e297122d1..ed1db7a76f722ef0b756ea06f71fe90c60cae73c 100644 (file)
@@ -48,6 +48,7 @@
 #include "output-json-flow.h"
 
 #include "stream-tcp-private.h"
+#include "flow-storage.h"
 
 #ifdef HAVE_LIBJANSSON
 
@@ -199,14 +200,38 @@ void JsonAddFlow(Flow *f, json_t *js, json_t *hjs)
                 json_string(AppProtoToString(f->alproto_expect)));
     }
 
-    json_object_set_new(hjs, "pkts_toserver",
-            json_integer(f->todstpktcnt));
-    json_object_set_new(hjs, "pkts_toclient",
-            json_integer(f->tosrcpktcnt));
-    json_object_set_new(hjs, "bytes_toserver",
-            json_integer(f->todstbytecnt));
-    json_object_set_new(hjs, "bytes_toclient",
-            json_integer(f->tosrcbytecnt));
+    FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID());
+    if (fc) {
+        json_object_set_new(hjs, "pkts_toserver",
+                json_integer(f->todstpktcnt + fc->todstpktcnt));
+        json_object_set_new(hjs, "pkts_toclient",
+                json_integer(f->tosrcpktcnt + fc->tosrcpktcnt));
+        json_object_set_new(hjs, "bytes_toserver",
+                json_integer(f->todstbytecnt + fc->todstbytecnt));
+        json_object_set_new(hjs, "bytes_toclient",
+                json_integer(f->tosrcbytecnt + fc->tosrcbytecnt));
+        json_t *bhjs = json_object();
+        if (bhjs != NULL) {
+            json_object_set_new(bhjs, "pkts_toserver",
+                    json_integer(fc->todstpktcnt));
+            json_object_set_new(bhjs, "pkts_toclient",
+                    json_integer(fc->tosrcpktcnt));
+            json_object_set_new(bhjs, "bytes_toserver",
+                    json_integer(fc->todstbytecnt));
+            json_object_set_new(bhjs, "bytes_toclient",
+                    json_integer(fc->tosrcbytecnt));
+            json_object_set_new(hjs, "bypassed", bhjs);
+        }
+    } else {
+        json_object_set_new(hjs, "pkts_toserver",
+                json_integer(f->todstpktcnt));
+        json_object_set_new(hjs, "pkts_toclient",
+                json_integer(f->tosrcpktcnt));
+        json_object_set_new(hjs, "bytes_toserver",
+                json_integer(f->todstbytecnt));
+        json_object_set_new(hjs, "bytes_toclient",
+                json_integer(f->tosrcbytecnt));
+    }
 
     char timebuf1[64];
     CreateIsoTimeString(&f->startts, timebuf1, sizeof(timebuf1));
index 19bb12d0270430bb9dc9ce9bad4a442d39dc67f9..94aa68bece2e0891ba0d0aa083402c6fbd66a1a0 100644 (file)
@@ -2307,13 +2307,8 @@ static int AFPInsertHalfFlow(int mapd, void *key, uint32_t hash,
     }
 
     /* We use a per CPU structure so we have to set an array of values as the kernel
-     * is not duplicating the data on each CPU by itself. We set the first entry to
-     * the actual flow pkts and bytes count as we need to continue from actual point
-     * to detect an absence of packets in the future. */
-    BPF_PERCPU(value,0).packets = pkts_cnt;
-    BPF_PERCPU(value,0).bytes = bytes_cnt;
-    BPF_PERCPU(value,0).hash = hash;
-    for (i = 1; i < nr_cpus; i++) {
+     * is not duplicating the data on each CPU by itself. */
+    for (i = 0; i < nr_cpus; i++) {
         BPF_PERCPU(value, i).packets = 0;
         BPF_PERCPU(value, i).bytes = 0;
         BPF_PERCPU(value, i).hash = hash;
index c5d6d22b6aab1b11aa6539ebd26b65c157b20fe3..514f1207f0e37ffe656116f8d1bdbe846f02a395 100644 (file)
@@ -2705,6 +2705,7 @@ static int PostConfLoadedSetup(SCInstance *suri)
     EBPFRegisterExtension();
     LiveDevRegisterExtension();
 #endif
+    RegisterFlowBypassCounter();
     AppLayerSetup();
 
     /* Suricata will use this umask if provided. By default it will use the
index 149af41828c29bf8362c003155f2e1b9e3a574a7..10a07b7c740a5fe052bf3a30fff5d1ad5e890533 100644 (file)
@@ -508,12 +508,20 @@ static int EBPFCreateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_key
      * server then if we already have something in to server to client. We need
      * these numbers as we will use it to see if we have new traffic coming
      * on the flow */
-    if (f->todstbytecnt == 0) {
-        f->todstpktcnt = pkts_cnt;
-        f->todstbytecnt = bytes_cnt;
+    FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID());
+    if (fc == NULL) {
+        fc = SCCalloc(sizeof(FlowCounters), 1);
+        if (fc) {
+            FlowSetStorageById(f, GetFlowBypassCounterID(), fc);
+            fc->todstpktcnt = pkts_cnt;
+            fc->todstbytecnt = bytes_cnt;
+        } else {
+            FLOWLOCK_UNLOCK(f);
+            return 0;
+        }
     } else {
-        f->tosrcpktcnt = pkts_cnt;
-        f->tosrcbytecnt = bytes_cnt;
+        fc->tosrcpktcnt = pkts_cnt;
+        fc->tosrcbytecnt = bytes_cnt;
     }
     FLOWLOCK_UNLOCK(f);
     return 0;
@@ -527,22 +535,27 @@ static int EBPFUpdateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_key
     if (f != NULL) {
         SCLogDebug("bypassed flow found %d -> %d, doing accounting",
                     f->sp, f->dp);
+        FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID());
+        if (fc == NULL) {
+            FLOWLOCK_UNLOCK(f);
+            return 0;
+        }
         if (flow_key->sp == f->sp) {
-            if (pkts_cnt != f->todstpktcnt) {
-                flowstats->packets += pkts_cnt - f->todstpktcnt;
-                flowstats->bytes += bytes_cnt - f->todstbytecnt;
-                f->todstpktcnt = pkts_cnt;
-                f->todstbytecnt = bytes_cnt;
+            if (pkts_cnt != fc->todstpktcnt) {
+                flowstats->packets += pkts_cnt - fc->todstpktcnt;
+                flowstats->bytes += bytes_cnt - fc->todstbytecnt;
+                fc->todstpktcnt = pkts_cnt;
+                fc->todstbytecnt = bytes_cnt;
                 /* interval based so no meaning to update the millisecond.
                  * Let's keep it fast and simple */
                 f->lastts.tv_sec = ctime->tv_sec;
             }
         } else {
-            if (pkts_cnt != f->tosrcpktcnt) {
-                flowstats->packets += pkts_cnt - f->tosrcpktcnt;
-                flowstats->bytes += bytes_cnt - f->tosrcbytecnt;
-                f->tosrcpktcnt = pkts_cnt;
-                f->tosrcbytecnt = bytes_cnt;
+            if (pkts_cnt != fc->tosrcpktcnt) {
+                flowstats->packets += pkts_cnt - fc->tosrcpktcnt;
+                flowstats->bytes += bytes_cnt - fc->tosrcbytecnt;
+                fc->tosrcpktcnt = pkts_cnt;
+                fc->tosrcbytecnt = bytes_cnt;
                 f->lastts.tv_sec = ctime->tv_sec;
             }
         }