]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
bypass: new callback stragegy
authorEric Leblond <eric@regit.org>
Sat, 8 Jun 2019 16:11:22 +0000 (18:11 +0200)
committerVictor Julien <victor@inliniac.net>
Tue, 18 Jun 2019 05:07:02 +0000 (07:07 +0200)
This patch introduces and uses a new bypass strategy
based on a callback. EBPF bypass implementation is
updated to use this new strategy.

Once the flow manager detect that a flow should be timeouted,
it asks the capture method if it has seen packets in the interval.
If it is the case the lastts of the flow is updated and the timeout
is postponed.

src/decode.c
src/flow-manager.c
src/flow-util.c
src/flow.h
src/output-json-flow.c
src/runmode-af-packet.c
src/source-af-packet.c
src/suricata.c
src/util-ebpf.c
src/util-ebpf.h

index ac6cf2a62dfe26ddcbe772cf45c66e5949242a57..2c914d2e3a0f516de9c3e47c16b18e19b7bd22b1 100644 (file)
@@ -408,9 +408,9 @@ void PacketBypassCallback(Packet *p)
                 (state == FLOW_STATE_CAPTURE_BYPASSED)) {
             return;
         }
-        FlowCounters *fc = SCCalloc(sizeof(FlowCounters), 1);
+        FlowBypassInfo *fc = SCCalloc(sizeof(FlowBypassInfo), 1);
         if (fc) {
-            FlowSetStorageById(p->flow, GetFlowBypassCounterID(), fc);
+            FlowSetStorageById(p->flow, GetFlowBypassInfoID(), fc);
         } else {
             return;
         }
index d279bb9734b411721b7c1803b436177bcb54c2ae..449163d4d538c3314dc8cde7e337c0a4435c0de4 100644 (file)
@@ -41,6 +41,7 @@
 #include "flow-private.h"
 #include "flow-timeout.h"
 #include "flow-manager.h"
+#include "flow-storage.h"
 
 #include "stream-tcp-private.h"
 #include "stream-tcp-reassemble.h"
@@ -236,7 +237,7 @@ static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
  *  \retval 0 not timed out
  *  \retval 1 timed out
  */
-static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
+static int FlowManagerFlowTimeout(Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
 {
     /* set the timeout value according to the flow operating mode,
      * flow's state and protocol.*/
@@ -248,6 +249,26 @@ static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct ti
 
     /* do the timeout check */
     if (flow_times_out_at >= ts->tv_sec) {
+        /* for bypassed flow we need to check if the capture method has seen
+         * something and update the logic */
+        FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
+        if (fc) {
+            if (fc->BypassUpdate) {
+                /* flow will be possibly updated */
+                bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
+                /* FIXME do global accounting: store pkts and bytes before and add difference here to counter */
+                if (update) {
+                    SCLogDebug("Updated flow: %ld", FlowGetId(f));
+                    flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
+                    if (*next_ts == 0 || flow_times_out_at < *next_ts)
+                        *next_ts = flow_times_out_at;
+                    return 0;
+                } else {
+                    SCLogDebug("No new packet, dead flow %ld", FlowGetId(f));
+                    return 1;
+                }
+            }
+        }
         return 0;
     }
 
index ed95cfc2617e82ef21f1d12cc3d424e918d3d819..47ff767948fc7f863356322081db37a43e05fa45 100644 (file)
@@ -204,21 +204,28 @@ void FlowInit(Flow *f, const Packet *p)
     SCReturn;
 }
 
-int g_bypass_counter_id = -1;
+int g_bypass_info_id = -1;
 
-int GetFlowBypassCounterID(void)
+int GetFlowBypassInfoID(void)
 {
-    return g_bypass_counter_id;
+    return g_bypass_info_id;
 }
 
-static void FlowCounterFree(void *x)
+static void FlowBypassFree(void *x)
 {
-    if (x)
-        SCFree(x);
+    FlowBypassInfo *fb = (FlowBypassInfo *) x;
+
+    if (fb == NULL)
+        return;
+
+    if (fb->bypass_data && fb->BypassFree) {
+        fb->BypassFree(fb->bypass_data);
+    }
+    SCFree(fb);
 }
 
-void RegisterFlowBypassCounter(void)
+void RegisterFlowBypassInfo(void)
 {
-    g_bypass_counter_id = FlowStorageRegister("bypass_counters", sizeof(void *),
-                                              NULL, FlowCounterFree);
+    g_bypass_info_id = FlowStorageRegister("bypass_counters", sizeof(void *),
+                                              NULL, FlowBypassFree);
 }
index 19a92299170cff33bf57f74207ea09fd3a3092b3..3744a6180599914b02f54b0691d2b9c608e67599 100644 (file)
@@ -482,12 +482,15 @@ typedef struct FlowProtoFreeFunc_ {
     void (*Freefunc)(void *);
 } FlowProtoFreeFunc;
 
-typedef struct FlowCounters_ {
+typedef struct FlowBypassInfo_ {
+    bool (* BypassUpdate)(Flow *f, void *data, time_t tsec);
+    void (* BypassFree)(void *data);
+    void *bypass_data;
     uint64_t tosrcpktcnt;
     uint64_t tosrcbytecnt;
     uint64_t todstpktcnt;
     uint64_t todstbytecnt;
-} FlowCounters;
+} FlowBypassInfo;
 
 /** \brief prepare packet for a life with flow
  *  Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup
@@ -529,8 +532,8 @@ int FlowSetMemcap(uint64_t size);
 uint64_t FlowGetMemcap(void);
 uint64_t FlowGetMemuse(void);
 
-int GetFlowBypassCounterID(void);
-void RegisterFlowBypassCounter(void);
+int GetFlowBypassInfoID(void);
+void RegisterFlowBypassInfo(void);
 
 /** ----- Inline functions ----- */
 
index ed1db7a76f722ef0b756ea06f71fe90c60cae73c..7c0fab35c8fd125457d0e05fb011a90ef0811754 100644 (file)
@@ -200,7 +200,7 @@ void JsonAddFlow(Flow *f, json_t *js, json_t *hjs)
                 json_string(AppProtoToString(f->alproto_expect)));
     }
 
-    FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID());
+    FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
     if (fc) {
         json_object_set_new(hjs, "pkts_toserver",
                 json_integer(f->todstpktcnt + fc->todstpktcnt));
index 09d8310175b54ec0933d30f55460522bc7cfd24c..13625295d341d366d8b55e15cb403418b243c197 100644 (file)
@@ -439,17 +439,7 @@ static void *ParseAFPConfig(const char *iface)
             SCLogConfig("Using bypass kernel functionality for AF_PACKET (iface %s)",
                     aconf->iface);
             aconf->flags |= AFP_BYPASS;
-            RunModeEnablesBypassManager();
-            struct ebpf_timeout_config *ebt = SCCalloc(1, sizeof(struct ebpf_timeout_config));
-            if (ebt == NULL) {
-                SCLogError(SC_ERR_MEM_ALLOC, "Flow bypass alloc error");
-            } else {
-                memcpy(ebt, &(aconf->ebpf_t_config), sizeof(struct ebpf_timeout_config));
-                BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout,
-                                                     NULL,
-                                                     (void *)ebt);
-                BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL);
-            }
+            BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL);
 #else
             SCLogError(SC_ERR_UNIMPLEMENTED, "Bypass set but eBPF support is not built-in");
 #endif
@@ -483,11 +473,6 @@ static void *ParseAFPConfig(const char *iface)
             SCLogConfig("Using bypass kernel functionality for AF_PACKET (iface %s)",
                     aconf->iface);
             aconf->flags |= AFP_XDPBYPASS;
-            RunModeEnablesBypassManager();
-            /* TODO move that to get it conditional on pinned maps */
-            BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout,
-                                                 EBPFCheckBypassedFlowCreate,
-                                                 (void *) &(aconf->ebpf_t_config));
             BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL);
         }
 #else
index 1daa17bdd9a49ad14b254b05da407f2b440e26ae..9645b781a60e6b4a4cdc065cfb3a7a874ba0b167 100644 (file)
@@ -57,6 +57,7 @@
 #include "tmqh-packetpool.h"
 #include "source-af-packet.h"
 #include "runmodes.h"
+#include "flow-storage.h"
 
 #ifdef HAVE_AF_PACKET
 
@@ -2331,6 +2332,28 @@ static int AFPInsertHalfFlow(int mapd, void *key, uint32_t hash,
     }
     return 1;
 }
+
+static int AFPSetFlowStorage(Packet *p, int map_fd, void *key0, void* key1)
+{
+    FlowBypassInfo *fc = FlowGetStorageById(p->flow, GetFlowBypassInfoID());
+    if (fc) {
+        EBPFBypassData *eb = SCCalloc(1, sizeof(EBPFBypassData));
+        if (eb == NULL) {
+            SCFree(key0);
+            SCFree(key1);
+            return 0;
+        }
+        eb->key[0] = key0;
+        eb->key[1] = key1;
+        eb->mapfd = map_fd;
+        eb->cpus_count = p->afp_v.nr_cpus;
+        fc->BypassUpdate = EBPFBypassUpdate;
+        fc->BypassFree = EBPFBypassFree;
+        fc->bypass_data = eb;
+    }
+    return 1;
+}
+
 #endif
 
 /**
@@ -2373,29 +2396,45 @@ static int AFPBypassCallback(Packet *p)
         if (p->afp_v.v4_map_fd == -1) {
             return 0;
         }
-        struct flowv4_keys key = {};
-        key.src = htonl(GET_IPV4_SRC_ADDR_U32(p));
-        key.dst = htonl(GET_IPV4_DST_ADDR_U32(p));
-        key.port16[0] = GET_TCP_SRC_PORT(p);
-        key.port16[1] = GET_TCP_DST_PORT(p);
-        key.vlan_id[0] = p->vlan_id[0];
-        key.vlan_id[1] = p->vlan_id[1];
+        struct flowv4_keys *keys[2];
+        keys[0] = SCCalloc(1, sizeof(struct flowv4_keys));
+        if (keys[0] == NULL) {
+            return 0;
+        }
+        keys[0]->src = htonl(GET_IPV4_SRC_ADDR_U32(p));
+        keys[0]->dst = htonl(GET_IPV4_DST_ADDR_U32(p));
+        keys[0]->port16[0] = GET_TCP_SRC_PORT(p);
+        keys[0]->port16[1] = GET_TCP_DST_PORT(p);
+        keys[0]->vlan_id[0] = p->vlan_id[0];
+        keys[0]->vlan_id[1] = p->vlan_id[1];
 
-        key.ip_proto = IPV4_GET_IPPROTO(p);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash,
+        keys[0]->ip_proto = IPV4_GET_IPPROTO(p);
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[0], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
+            return 0;
+        }
+        keys[1]= SCCalloc(1, sizeof(struct flowv4_keys));
+        if (keys[1] == NULL) {
+            SCFree(keys[0]);
             return 0;
         }
-        key.src = htonl(GET_IPV4_DST_ADDR_U32(p));
-        key.dst = htonl(GET_IPV4_SRC_ADDR_U32(p));
-        key.port16[0] = GET_TCP_DST_PORT(p);
-        key.port16[1] = GET_TCP_SRC_PORT(p);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash,
+        keys[1]->src = htonl(GET_IPV4_DST_ADDR_U32(p));
+        keys[1]->dst = htonl(GET_IPV4_SRC_ADDR_U32(p));
+        keys[1]->port16[0] = GET_TCP_DST_PORT(p);
+        keys[1]->port16[1] = GET_TCP_SRC_PORT(p);
+        keys[1]->vlan_id[0] = p->vlan_id[0];
+        keys[1]->vlan_id[1] = p->vlan_id[1];
+
+        keys[1]->ip_proto = IPV4_GET_IPPROTO(p);
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[1], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
+            SCFree(keys[1]);
             return 0;
         }
         EBPFUpdateFlow(p->flow, p, NULL);
-        return 1;
+        return AFPSetFlowStorage(p, p->afp_v.v4_map_fd, keys[0], keys[1]);
     }
     /* For IPv6 case we don't handle extended header in eBPF */
     if (PKT_IS_IPV6(p) &&
@@ -2405,33 +2444,48 @@ static int AFPBypassCallback(Packet *p)
             return 0;
         }
         SCLogDebug("add an IPv6");
-        struct flowv6_keys key = {};
+        struct flowv6_keys *keys[2];
+        keys[0] = SCCalloc(1, sizeof(struct flowv6_keys));
+        if (keys[0] == NULL) {
+            return 0;
+        }
         for (i = 0; i < 4; i++) {
-            key.src[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]);
-            key.dst[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]);
-        }
-        key.port16[0] = GET_TCP_SRC_PORT(p);
-        key.port16[1] = GET_TCP_DST_PORT(p);
-        key.vlan_id[0] = p->vlan_id[0];
-        key.vlan_id[1] = p->vlan_id[1];
-        key.ip_proto = IPV6_GET_NH(p);
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash,
+            keys[0]->src[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]);
+            keys[0]->dst[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]);
+        }
+        keys[0]->port16[0] = GET_TCP_SRC_PORT(p);
+        keys[0]->port16[1] = GET_TCP_DST_PORT(p);
+        keys[0]->vlan_id[0] = p->vlan_id[0];
+        keys[0]->vlan_id[1] = p->vlan_id[1];
+        keys[0]->ip_proto = IPV6_GET_NH(p);
+        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[0], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
             return 0;
         }
-        for (i = 0; i < 4; i++) {
-            key.src[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]);
-            key.dst[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]);
+        keys[1]= SCCalloc(1, sizeof(struct flowv6_keys));
+        if (keys[1] == NULL) {
+            SCFree(keys[0]);
+            return 0;
         }
-        key.port16[0] = GET_TCP_DST_PORT(p);
-        key.port16[1] = GET_TCP_SRC_PORT(p);
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash,
+        for (i = 0; i < 4; i++) {
+            keys[1]->src[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]);
+            keys[1]->dst[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]);
+        }
+        keys[1]->port16[0] = GET_TCP_DST_PORT(p);
+        keys[1]->port16[1] = GET_TCP_SRC_PORT(p);
+        keys[1]->vlan_id[0] = p->vlan_id[0];
+        keys[1]->vlan_id[1] = p->vlan_id[1];
+        keys[1]->ip_proto = IPV6_GET_NH(p);
+        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[1], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
+            SCFree(keys[1]);
             return 0;
         }
         if (p->flow)
             EBPFUpdateFlow(p->flow, p, NULL);
-        return 1;
+        return AFPSetFlowStorage(p, p->afp_v.v6_map_fd, keys[0], keys[1]);
     }
 #endif
     return 0;
@@ -2470,32 +2524,48 @@ static int AFPXDPBypassCallback(Packet *p)
         return 0;
     }
     if (PKT_IS_IPV4(p)) {
-        struct flowv4_keys key = {};
+        struct flowv4_keys *keys[2];
+        keys[0]= SCCalloc(1, sizeof(struct flowv4_keys));
+        if (keys[0] == NULL) {
+            return 0;
+        }
         if (p->afp_v.v4_map_fd == -1) {
+            SCFree(keys[0]);
             return 0;
         }
-        key.src = p->src.addr_data32[0];
-        key.dst = p->dst.addr_data32[0];
+        keys[0]->src = p->src.addr_data32[0];
+        keys[0]->dst = p->dst.addr_data32[0];
         /* In the XDP filter we get port from parsing of packet and not from skb
          * (as in eBPF filter) so we need to pass from host to network order */
-        key.port16[0] = htons(p->sp);
-        key.port16[1] = htons(p->dp);
-        key.vlan_id[0] = p->vlan_id[0];
-        key.vlan_id[1] = p->vlan_id[1];
-        key.ip_proto = IPV4_GET_IPPROTO(p);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash,
+        keys[0]->port16[0] = htons(p->sp);
+        keys[0]->port16[1] = htons(p->dp);
+        keys[0]->vlan_id[0] = p->vlan_id[0];
+        keys[0]->vlan_id[1] = p->vlan_id[1];
+        keys[0]->ip_proto = IPV4_GET_IPPROTO(p);
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[0], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
+            return 0;
+        }
+        keys[1]= SCCalloc(1, sizeof(struct flowv4_keys));
+        if (keys[1] == NULL) {
+            SCFree(keys[0]);
             return 0;
         }
-        key.src = p->dst.addr_data32[0];
-        key.dst = p->src.addr_data32[0];
-        key.port16[0] = htons(p->dp);
-        key.port16[1] = htons(p->sp);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash,
+        keys[1]->src = p->dst.addr_data32[0];
+        keys[1]->dst = p->src.addr_data32[0];
+        keys[1]->port16[0] = htons(p->dp);
+        keys[1]->port16[1] = htons(p->sp);
+        keys[1]->vlan_id[0] = p->vlan_id[0];
+        keys[1]->vlan_id[1] = p->vlan_id[1];
+        keys[1]->ip_proto = IPV4_GET_IPPROTO(p);
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[1], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
+            SCFree(keys[1]);
             return 0;
         }
-        return 1;
+        return AFPSetFlowStorage(p, p->afp_v.v4_map_fd, keys[0], keys[1]);
     }
     /* For IPv6 case we don't handle extended header in eBPF */
     if (PKT_IS_IPV6(p) &&
@@ -2505,31 +2575,47 @@ static int AFPXDPBypassCallback(Packet *p)
             return 0;
         }
         int i;
-        struct flowv6_keys key = {};
+        struct flowv6_keys *keys[2];
+        keys[0] = SCCalloc(1, sizeof(struct flowv6_keys));
+        if (keys[0] == NULL) {
+            return 0;
+        }
+
         for (i = 0; i < 4; i++) {
-            key.src[i] = GET_IPV6_SRC_ADDR(p)[i];
-            key.dst[i] = GET_IPV6_DST_ADDR(p)[i];
-        }
-        key.port16[0] = htons(GET_TCP_SRC_PORT(p));
-        key.port16[1] = htons(GET_TCP_DST_PORT(p));
-        key.vlan_id[0] = p->vlan_id[0];
-        key.vlan_id[1] = p->vlan_id[1];
-        key.ip_proto = IPV6_GET_NH(p);
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash,
+            keys[0]->src[i] = GET_IPV6_SRC_ADDR(p)[i];
+            keys[0]->dst[i] = GET_IPV6_DST_ADDR(p)[i];
+        }
+        keys[0]->port16[0] = htons(GET_TCP_SRC_PORT(p));
+        keys[0]->port16[1] = htons(GET_TCP_DST_PORT(p));
+        keys[0]->vlan_id[0] = p->vlan_id[0];
+        keys[0]->vlan_id[1] = p->vlan_id[1];
+        keys[0]->ip_proto = IPV6_GET_NH(p);
+        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[0], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
             return 0;
         }
-        for (i = 0; i < 4; i++) {
-            key.src[i] = GET_IPV6_DST_ADDR(p)[i];
-            key.dst[i] = GET_IPV6_SRC_ADDR(p)[i];
+        keys[1]= SCCalloc(1, sizeof(struct flowv6_keys));
+        if (keys[1] == NULL) {
+            SCFree(keys[0]);
+            return 0;
         }
-        key.port16[0] = htons(GET_TCP_DST_PORT(p));
-        key.port16[1] = htons(GET_TCP_SRC_PORT(p));
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash,
+        for (i = 0; i < 4; i++) {
+            keys[1]->src[i] = GET_IPV6_DST_ADDR(p)[i];
+            keys[1]->dst[i] = GET_IPV6_SRC_ADDR(p)[i];
+        }
+        keys[1]->port16[0] = htons(GET_TCP_DST_PORT(p));
+        keys[1]->port16[1] = htons(GET_TCP_SRC_PORT(p));
+        keys[1]->vlan_id[0] = p->vlan_id[0];
+        keys[1]->vlan_id[1] = p->vlan_id[1];
+        keys[1]->ip_proto = IPV6_GET_NH(p);
+        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[1], p->flow_hash,
                               p->afp_v.nr_cpus) == 0) {
+            SCFree(keys[0]);
+            SCFree(keys[1]);
             return 0;
         }
-        return 1;
+        return AFPSetFlowStorage(p, p->afp_v.v6_map_fd, keys[0], keys[1]);
     }
 #endif
     return 0;
index 514f1207f0e37ffe656116f8d1bdbe846f02a395..8388668ed5c7373c24e1a41e22fc29c9bd8f3a16 100644 (file)
@@ -2705,7 +2705,7 @@ static int PostConfLoadedSetup(SCInstance *suri)
     EBPFRegisterExtension();
     LiveDevRegisterExtension();
 #endif
-    RegisterFlowBypassCounter();
+    RegisterFlowBypassInfo();
     AppLayerSetup();
 
     /* Suricata will use this umask if provided. By default it will use the
index 9185d80e701d307cc1a941960d6e8d0eeb63ad35..bc3e1e9cbc0b8d6af7307c3d36c4728952fa54a3 100644 (file)
@@ -514,11 +514,13 @@ static bool EBPFCreateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_ke
      * server then if we already have something in to server to client. We need
      * these numbers as we will use it to see if we have new traffic coming
      * on the flow */
-    FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID());
+    FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
     if (fc == NULL) {
-        fc = SCCalloc(sizeof(FlowCounters), 1);
+        fc = SCCalloc(sizeof(FlowBypassInfo), 1);
         if (fc) {
-            FlowSetStorageById(f, GetFlowBypassCounterID(), fc);
+            FlowSetStorageById(f, GetFlowBypassInfoID(), fc);
+            fc->BypassUpdate = EBPFBypassUpdate;
+            fc->BypassFree = EBPFBypassFree;
             fc->todstpktcnt = pkts_cnt;
             fc->todstbytecnt = bytes_cnt;
         } else {
@@ -533,56 +535,96 @@ static bool EBPFCreateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_ke
     return false;
 }
 
+void EBPFBypassFree(void *data)
+{
+    EBPFBypassData *eb = (EBPFBypassData *)data;
+    if (eb == NULL)
+        return;
+    if (eb->key[0]) {
+        SCFree(eb->key[0]);
+    }
+    if (eb->key[1]) {
+        SCFree(eb->key[1]);
+    }
+    SCFree(eb);
+    return;
+}
+
 /**
- * Update a Flow in the Flow table for a Flowkey.
  *
- * \return false if Flow is in Flow table and true if not
+ * Compare eBPF half flow to Flow
+ *
+ * \return true if entries have activity, false if not
  */
-static bool EBPFUpdateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_key,
-                                uint32_t hash, struct timespec *ctime,
-                                uint64_t pkts_cnt, uint64_t bytes_cnt)
+
+static bool EBPFBypassCheckHalfFlow(Flow *f, EBPFBypassData *eb, void *key,
+                                    int index)
 {
-    Flow *f = FlowGetExistingFlowFromHash(flow_key, hash);
-    if (f != NULL) {
-        SCLogDebug("bypassed flow found %d -> %d, doing accounting",
-                    f->sp, f->dp);
-        FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID());
-        if (fc == NULL) {
-            FLOWLOCK_UNLOCK(f);
-            flowstats->count++;
+    int i;
+    uint64_t pkts_cnt = 0;
+    uint64_t bytes_cnt = 0;
+    /* We use a per CPU structure so we will get a array of values. But if nr_cpus
+     * is 1 then we have a global hash. */
+    BPF_DECLARE_PERCPU(struct pair, values_array, eb->cpus_count);
+    memset(values_array, 0, sizeof(values_array));
+    int res = bpf_map_lookup_elem(eb->mapfd, key, values_array);
+    if (res < 0) {
+        SCLogDebug("errno: (%d) %s", errno, strerror(errno));
+        return false;
+    }
+    for (i = 0; i < eb->cpus_count; i++) {
+        /* let's start accumulating value so we can compute the counters */
+        SCLogDebug("%d: Adding pkts %lu bytes %lu", i,
+                BPF_PERCPU(values_array, i).packets,
+                BPF_PERCPU(values_array, i).bytes);
+        pkts_cnt += BPF_PERCPU(values_array, i).packets;
+        bytes_cnt += BPF_PERCPU(values_array, i).bytes;
+    }
+    FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
+    if (fc == NULL) {
+        return false;
+    }
+    if (index == 0) {
+        if (pkts_cnt != fc->todstpktcnt) {
+            fc->todstpktcnt = pkts_cnt;
+            fc->todstbytecnt = bytes_cnt;
             return true;
         }
-        if (flow_key->sp == f->sp) {
-            if (pkts_cnt != fc->todstpktcnt) {
-                flowstats->packets += pkts_cnt - fc->todstpktcnt;
-                flowstats->bytes += bytes_cnt - fc->todstbytecnt;
-                fc->todstpktcnt = pkts_cnt;
-                fc->todstbytecnt = bytes_cnt;
-                /* interval based so no meaning to update the millisecond.
-                 * Let's keep it fast and simple */
-                f->lastts.tv_sec = ctime->tv_sec;
-            }
-        } else {
-            if (pkts_cnt != fc->tosrcpktcnt) {
-                flowstats->packets += pkts_cnt - fc->tosrcpktcnt;
-                flowstats->bytes += bytes_cnt - fc->tosrcbytecnt;
-                fc->tosrcpktcnt = pkts_cnt;
-                fc->tosrcbytecnt = bytes_cnt;
-                f->lastts.tv_sec = ctime->tv_sec;
-            }
+    } else {
+        if (pkts_cnt != fc->tosrcpktcnt) {
+            fc->tosrcpktcnt = pkts_cnt;
+            fc->tosrcbytecnt = bytes_cnt;
+            return true;
         }
-        FLOWLOCK_UNLOCK(f);
+    }
+
+    return false;
+}
+
+/** Check both half flows for update
+ *
+ * Update lastts in the flow and do accounting
+ *
+ * */
+bool EBPFBypassUpdate(Flow *f, void *data, time_t tsec)
+{
+    EBPFBypassData *eb = (EBPFBypassData *)data;
+    if (eb == NULL) {
         return false;
+    }
+    bool activity = EBPFBypassCheckHalfFlow(f, eb, eb->key[0], 0);
+    activity |= EBPFBypassCheckHalfFlow(f, eb, eb->key[1], 1);
+    if (!activity) {
+        SCLogDebug("Delete entry: %u (%ld)", FLOW_IS_IPV6(f), FlowGetId(f));
+        /* delete the entries if no time update */
+        EBPFDeleteKey(eb->mapfd, eb->key[0]);
+        EBPFDeleteKey(eb->mapfd, eb->key[1]);
+        SCLogDebug("Done delete entry: %u", FLOW_IS_IPV6(f));
     } else {
-        /* Flow has disappeared so it has been removed due to timeout.
-         * This means we did not see any packet since bypassed_timeout/flow_dump_interval
-         * checks so we are highly probably have no packet to account.
-         * So we just discard the flow */
-        SCLogDebug("No flow for %d -> %d", flow_key->sp, flow_key->dp);
-        SCLogDebug("Dead with pkts %lu bytes %lu", pkts_cnt, bytes_cnt);
-        flowstats->count++;
+        f->lastts.tv_sec = tsec;
         return true;
     }
+    return false;
 }
 
 typedef bool (*OpFlowForKey)(struct flows_stats *flowstats, FlowKey *flow_key,
@@ -816,50 +858,6 @@ int EBPFCheckBypassedFlowCreate(ThreadVars *th_v, struct timespec *curtime, void
     return 0;
 }
 
-/**
- * Flow timeout checking function
- *
- * This function is called by the Flow bypass manager to trigger removal
- * of entries in the kernel/userspace flow table if needed.
- *
- */
-int EBPFCheckBypassedFlowTimeout(ThreadVars *th_v, struct flows_stats *bypassstats,
-                                        struct timespec *curtime,
-                                        void *data)
-{
-    struct flows_stats local_bypassstats = { 0, 0, 0};
-    int ret = 0;
-    int tcount = 0;
-    LiveDevice *ldev = NULL, *ndev;
-    struct ebpf_timeout_config *cfg = (struct ebpf_timeout_config *)data;
-
-    BUG_ON(cfg == NULL);
-
-    while(LiveDeviceForEach(&ldev, &ndev)) {
-        memset(&local_bypassstats, 0, sizeof(local_bypassstats));
-        tcount = EBPFForEachFlowV4Table(th_v, ldev, "flow_table_v4",
-                               &local_bypassstats, curtime,
-                               cfg, EBPFUpdateFlowForKey);
-        bypassstats->count = local_bypassstats.count;
-        bypassstats->packets = local_bypassstats.packets ;
-        bypassstats->bytes = local_bypassstats.bytes;
-        if (tcount) {
-            ret = 1;
-        }
-        memset(&local_bypassstats, 0, sizeof(local_bypassstats));
-        tcount = EBPFForEachFlowV6Table(th_v, ldev, "flow_table_v6",
-                                        &local_bypassstats, curtime,
-                                        cfg, EBPFUpdateFlowForKey);
-        bypassstats->count += local_bypassstats.count;
-        bypassstats->packets += local_bypassstats.packets ;
-        bypassstats->bytes += local_bypassstats.bytes;
-        if (tcount) {
-            ret = 1;
-        }
-    }
-    return ret;
-}
-
 void EBPFRegisterExtension(void)
 {
     g_livedev_storage_id = LiveDevStorageRegister("bpfmap", sizeof(void *), NULL, BpfMapsInfoFree);
index 757b864d42ff0f6704907b415cb540a3e25ca2ce..043f137e8c517b002295372828ca5ce6e1d1bd14 100644 (file)
@@ -61,6 +61,12 @@ struct pair {
     uint32_t hash;
 };
 
+typedef struct EBPFBypassData_ {
+    void *key[2];
+    int mapfd;
+    int cpus_count;
+} EBPFBypassData;
+
 #define EBPF_SOCKET_FILTER  (1<<0)
 #define EBPF_XDP_CODE       (1<<1)
 #define EBPF_PINNED_MAPS    (1<<2)
@@ -83,6 +89,8 @@ void EBPFBuildCPUSet(ConfNode *node, char *iface);
 int EBPFSetPeerIface(const char *iface, const char *out_iface);
 
 int EBPFUpdateFlow(Flow *f, Packet *p, void *data);
+bool EBPFBypassUpdate(Flow *f, void *data, time_t tsec);
+void EBPFBypassFree(void *data);
 
 #ifdef BUILD_UNIX_SOCKET
 TmEcode EBPFGetBypassedStats(json_t *cmd, json_t *answer, void *data);