]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
ebpf: change the logic to avoid ktime usage
authorEric Leblond <eric@regit.org>
Fri, 25 May 2018 15:05:17 +0000 (17:05 +0200)
committerVictor Julien <victor@inliniac.net>
Tue, 18 Jun 2019 05:07:01 +0000 (07:07 +0200)
Kernel time is not available (and/or costly) on NIC such as
Netronome so we update the logic to detect dead flows based on a
lack of update of packets counters. This way, the XDP filter will
be usable by network card.

This patch also updates the ebpf code to support per CPU and
regular mapping. Netronome is not supporting it and the structure
is using atomic for counter so the cost of simultaneous update
is really low.

This patch also updates the xdp_filter to be able to select if the
flow table is per CPU on shared. Second option will be used for
hardward offload. To deactivate the per cpu hash, you need to set
USE_PERCPU_HASH to 0.

This patch also adds an new option to af-packet named no-percpu-hash
If this option is set to yes then the Flow bypassed manager thread
will use one CPU instead of the number of cores. By doing that
we are able to handle the case where USE_PERCPU_HASH is unset (so
hardware offload for Netronome).

This patch also remove aligment indications in the eBPF filter. This
was not really needed and it seems it is causing problem with
some recent version of LLVM toolchain.

14 files changed:
doc/userguide/capture-hardware/ebpf-xdp.rst
ebpf/bypass_filter.c
ebpf/xdp_filter.c
src/flow-bypass.c
src/flow-bypass.h
src/flow-hash.c
src/flow-hash.h
src/flow.c
src/flow.h
src/runmode-af-packet.c
src/source-af-packet.c
src/source-af-packet.h
src/util-ebpf.c
src/util-ebpf.h

index 312eb5863cae8ebe81809d7adda18710f6d29c62..84bcb5be4c35df98cfb2188141a4c7cbb63bd1c6 100644 (file)
@@ -249,11 +249,17 @@ also use the ``/etc/suricata/ebpf/xdp_filter.bpf`` (in our example TCP offloadin
     bypass: yes
     use-mmap: yes
     ring-size: 200000
+    # Uncomment the following if you are using hardware XDP with
+    # card like Netronome
+    # no-percpu-hash: yes
 
 
 XDP bypass is compatible with AF_PACKET IPS mode. Packets from bypassed flows will be send directly 
 from one card to the second card without going by the kernel network stack.
 
+If you are using hardware XDP offload you may have to use the ``no-percpu-hash`` function and
+build and install the XDP filter file after setting ``USE_PERCPU_HASH`` to 0.
+
 Setup symmetric hashing on the NIC
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
index be81032b16cfd48885150d2cf398e3989eebe8c9..48604ebe280009f865f2402f94f1f04f6cdd2e6f 100644 (file)
@@ -37,7 +37,8 @@ struct flowv4_keys {
         __be16 port16[2];
     };
     __u32 ip_proto;
-} __attribute__((__aligned__(8)));
+    __u16 vlan_id[2];
+};
 
 struct flowv6_keys {
     __be32 src[4];
@@ -47,13 +48,14 @@ struct flowv6_keys {
         __be16 port16[2];
     };
     __u32 ip_proto;
-} __attribute__((__aligned__(8)));
+    __u16 vlan_id[2];
+};
 
 struct pair {
-    __u64 time;
     __u64 packets;
     __u64 bytes;
-} __attribute__((__aligned__(8)));
+    __u32 hash;
+};
 
 struct bpf_map_def SEC("maps") flow_table_v4 = {
     .type = BPF_MAP_TYPE_PERCPU_HASH,
@@ -80,6 +82,7 @@ static __always_inline int ipv4_filter(struct __sk_buff *skb)
     struct flowv4_keys tuple;
     struct pair *value;
     __u16 port;
+    __u16 vlan_id = skb->vlan_tci & 0x0fff;
 
     nhoff = skb->cb[0];
 
@@ -102,6 +105,9 @@ static __always_inline int ipv4_filter(struct __sk_buff *skb)
     port = tuple.port16[1];
     tuple.port16[1] = tuple.port16[0];
     tuple.port16[0] = port;
+    tuple.vlan_id[0] = vlan_id;
+    /* FIXME add second vlan layer */
+    tuple.vlan_id[1] = 0;
 
 #if 0
     if ((tuple.port16[0] == 22) || (tuple.port16[1] == 22))
@@ -125,7 +131,6 @@ static __always_inline int ipv4_filter(struct __sk_buff *skb)
 #endif
         value->packets++;
         value->bytes += skb->len;
-        value->time = bpf_ktime_get_ns();
         return 0;
     }
     return -1;
@@ -143,6 +148,7 @@ static __always_inline int ipv6_filter(struct __sk_buff *skb)
     struct flowv6_keys tuple;
     struct pair *value;
     __u16 port;
+    __u16 vlan_id = skb->vlan_tci & 0x0fff;
 
     nhoff = skb->cb[0];
 
@@ -174,6 +180,10 @@ static __always_inline int ipv6_filter(struct __sk_buff *skb)
     tuple.port16[0] = port;
     tuple.ip_proto = nhdr;
 
+    tuple.vlan_id[0] = vlan_id;
+    /* FIXME add second vlan layer */
+    tuple.vlan_id[1] = 0;
+
     //char fmt[] = "Now Got IPv6 port %u and %u\n";
     //bpf_trace_printk(fmt, sizeof(fmt), tuple.port16[0], tuple.port16[1]);
     /* Test if src is in hash */
@@ -183,7 +193,6 @@ static __always_inline int ipv6_filter(struct __sk_buff *skb)
         //bpf_trace_printk(fmt, sizeof(fmt), tuple.port16[0], tuple.port16[1]);
         value->packets++;
         value->bytes += skb->len;
-        value->time = bpf_ktime_get_ns();
         return 0;
     }
     return -1;
index c83a6fa6f49afda2935e416479e09c46d5b20448..b20d90e62208bf0926de73a08367ec3e31f52049 100644 (file)
@@ -42,6 +42,8 @@
 /* Increase CPUMAP_MAX_CPUS if ever you have more than 64 CPUs */
 #define CPUMAP_MAX_CPUS     64
 
+#define USE_PERCPU_HASH    1
+
 struct vlan_hdr {
     __u16      h_vlan_TCI;
     __u16      h_vlan_encapsulated_proto;
@@ -55,7 +57,8 @@ struct flowv4_keys {
         __u16 port16[2];
     };
     __u32 ip_proto;
-} __attribute__((__aligned__(8)));
+    __u16 vlan_id[2];
+};
 
 struct flowv6_keys {
     __u32 src[4];
@@ -65,23 +68,32 @@ struct flowv6_keys {
         __u16 port16[2];
     };
     __u32 ip_proto;
-} __attribute__((__aligned__(8)));
+    __u16 vlan_id[2];
+};
 
 struct pair {
-    __u64 time;
     __u64 packets;
     __u64 bytes;
-} __attribute__((__aligned__(8)));
+    __u32 hash;
+};
 
 struct bpf_map_def SEC("maps") flow_table_v4 = {
+#if USE_PERCPU_HASH
     .type = BPF_MAP_TYPE_PERCPU_HASH,
+#else
+    .type = BPF_MAP_TYPE_HASH,
+#endif
     .key_size = sizeof(struct flowv4_keys),
     .value_size = sizeof(struct pair),
     .max_entries = 32768,
 };
 
 struct bpf_map_def SEC("maps") flow_table_v6 = {
+#if USE_PERCPU_HASH
     .type = BPF_MAP_TYPE_PERCPU_HASH,
+#else
+    .type = BPF_MAP_TYPE_HASH,
+#endif
     .key_size = sizeof(struct flowv6_keys),
     .value_size = sizeof(struct pair),
     .max_entries = 32768,
@@ -169,7 +181,7 @@ static __always_inline int get_dport(void *trans_data, void *data_end,
     }
 }
 
-static int __always_inline filter_ipv4(void *data, __u64 nh_off, void *data_end)
+static int __always_inline filter_ipv4(void *data, __u64 nh_off, void *data_end, __u16 vlan0, __u16 vlan1)
 {
     struct iphdr *iph = data + nh_off;
     int dport;
@@ -203,6 +215,10 @@ static int __always_inline filter_ipv4(void *data, __u64 nh_off, void *data_end)
 
     tuple.port16[0] = (__u16)sport;
     tuple.port16[1] = (__u16)dport;
+
+    tuple.vlan_id[0] = vlan0;
+    tuple.vlan_id[1] = vlan1;
+
     value = bpf_map_lookup_elem(&flow_table_v4, &tuple);
 #if 0
     {
@@ -219,7 +235,6 @@ static int __always_inline filter_ipv4(void *data, __u64 nh_off, void *data_end)
         char fmt[] = "Data: t:%lu p:%lu n:%lu\n";
         bpf_trace_printk(fmt, sizeof(fmt), value->time, value->packets, value->bytes);
 #endif
-        value->time = bpf_ktime_get_ns();
         value->packets++;
         value->bytes += data_end - data;
 
@@ -251,7 +266,7 @@ static int __always_inline filter_ipv4(void *data, __u64 nh_off, void *data_end)
 #endif
 }
 
-static int __always_inline filter_ipv6(void *data, __u64 nh_off, void *data_end)
+static int __always_inline filter_ipv6(void *data, __u64 nh_off, void *data_end, __u16 vlan0, __u16 vlan1)
 {
     struct ipv6hdr *ip6h = data + nh_off;
     int dport;
@@ -287,6 +302,9 @@ static int __always_inline filter_ipv6(void *data, __u64 nh_off, void *data_end)
     tuple.port16[0] = sport;
     tuple.port16[1] = dport;
 
+    tuple.vlan_id[0] = vlan0;
+    tuple.vlan_id[1] = vlan1;
+
     value = bpf_map_lookup_elem(&flow_table_v6, &tuple);
     if (value) {
 #if 0
@@ -295,7 +313,6 @@ static int __always_inline filter_ipv6(void *data, __u64 nh_off, void *data_end)
 #endif
         value->packets++;
         value->bytes += data_end - data;
-        value->time = bpf_ktime_get_ns();
 
         iface_peer = bpf_map_lookup_elem(&tx_peer_int, &key0);
         if (!iface_peer) {
@@ -336,6 +353,8 @@ int SEC("xdp") xdp_hashfilter(struct xdp_md *ctx)
     int rc = XDP_PASS;
     __u16 h_proto;
     __u64 nh_off;
+    __u16 vlan0 = 0;
+    __u16 vlan1 = 0;
 
        nh_off = sizeof(*eth);
        if (data + nh_off > data_end)
@@ -351,6 +370,7 @@ int SEC("xdp") xdp_hashfilter(struct xdp_md *ctx)
                if (data + nh_off > data_end)
                        return rc;
                h_proto = vhdr->h_vlan_encapsulated_proto;
+               vlan0 = vhdr->h_vlan_TCI & 0x0fff;
        }
        if (h_proto == __constant_htons(ETH_P_8021Q) || h_proto == __constant_htons(ETH_P_8021AD)) {
                struct vlan_hdr *vhdr;
@@ -360,12 +380,13 @@ int SEC("xdp") xdp_hashfilter(struct xdp_md *ctx)
                if (data + nh_off > data_end)
                        return rc;
                h_proto = vhdr->h_vlan_encapsulated_proto;
+               vlan1 = vhdr->h_vlan_TCI & 0x0fff;
        }
 
        if (h_proto == __constant_htons(ETH_P_IP))
-               return filter_ipv4(data, nh_off, data_end);
+               return filter_ipv4(data, nh_off, data_end, vlan0, vlan1);
        else if (h_proto == __constant_htons(ETH_P_IPV6))
-               return filter_ipv6(data, nh_off, data_end);
+               return filter_ipv6(data, nh_off, data_end, vlan0, vlan1);
        else
                rc = XDP_PASS;
 
index 76fa3d0510f970d28620b2d04e6012385b07b98e..f0ee40e9110e52dcfa87f5218284fed2d6ebec8c 100644 (file)
@@ -37,11 +37,22 @@ typedef struct BypassedFlowManagerThreadData_ {
 } BypassedFlowManagerThreadData;
 
 #define BYPASSFUNCMAX   4
+
+typedef struct BypassedCheckFuncItem_ {
+    BypassedCheckFunc Func;
+    void *data;
+} BypassedCheckFuncItem;
+
 int g_bypassed_func_max_index = 0;
-BypassedCheckFunc BypassedFuncList[BYPASSFUNCMAX];
+BypassedCheckFuncItem BypassedFuncList[BYPASSFUNCMAX];
+
+typedef struct BypassedUpdateFuncItem_ {
+    BypassedUpdateFunc Func;
+    void *data;
+} BypassedUpdateFuncItem;
 
 int g_bypassed_update_max_index = 0;
-BypassedUpdateFunc UpdateFuncList[BYPASSFUNCMAX];
+BypassedUpdateFuncItem UpdateFuncList[BYPASSFUNCMAX];
 
 static TmEcode BypassedFlowManager(ThreadVars *th_v, void *thread_data)
 {
@@ -60,7 +71,7 @@ static TmEcode BypassedFlowManager(ThreadVars *th_v, void *thread_data)
         }
         for (i = 0; i < g_bypassed_func_max_index; i++) {
             struct flows_stats bypassstats = { 0, 0, 0};
-            tcount = BypassedFuncList[i](&bypassstats, &curtime);
+            tcount = BypassedFuncList[i].Func(&bypassstats, &curtime, BypassedFuncList[i].data);
             if (tcount) {
                 StatsAddUI64(th_v, ftd->flow_bypassed_cnt_clo, (uint64_t)bypassstats.count);
                 StatsAddUI64(th_v, ftd->flow_bypassed_pkts, (uint64_t)bypassstats.packets);
@@ -90,7 +101,7 @@ void BypassedFlowUpdate(Flow *f, Packet *p)
     int i;
 
     for (i = 0; i < g_bypassed_update_max_index; i++) {
-        if (UpdateFuncList[i](f, p)) {
+        if (UpdateFuncList[i].Func(f, p, UpdateFuncList[i].data)) {
             return;
         }
     }
@@ -140,13 +151,15 @@ void BypassedFlowManagerThreadSpawn()
     }
 }
 
-int BypassedFlowManagerRegisterCheckFunc(BypassedCheckFunc CheckFunc)
+int BypassedFlowManagerRegisterCheckFunc(BypassedCheckFunc CheckFunc,
+                                         void *data)
 {
     if (!CheckFunc) {
         return -1;
     }
     if (g_bypassed_func_max_index < BYPASSFUNCMAX) {
-        BypassedFuncList[g_bypassed_func_max_index] = CheckFunc;
+        BypassedFuncList[g_bypassed_func_max_index].Func = CheckFunc;
+        BypassedFuncList[g_bypassed_func_max_index].data = data;
         g_bypassed_func_max_index++;
     } else {
         return -1;
@@ -154,13 +167,15 @@ int BypassedFlowManagerRegisterCheckFunc(BypassedCheckFunc CheckFunc)
     return 0;
 }
 
-int BypassedFlowManagerRegisterUpdateFunc(BypassedUpdateFunc UpdateFunc)
+int BypassedFlowManagerRegisterUpdateFunc(BypassedUpdateFunc UpdateFunc,
+                                          void *data)
 {
     if (!UpdateFunc) {
         return -1;
     }
     if (g_bypassed_update_max_index < BYPASSFUNCMAX) {
-        UpdateFuncList[g_bypassed_update_max_index] = UpdateFunc;
+        UpdateFuncList[g_bypassed_update_max_index].Func = UpdateFunc;
+        UpdateFuncList[g_bypassed_update_max_index].data = data;
         g_bypassed_update_max_index++;
     } else {
         return -1;
index b3089dc8552d8c138f5162404194df440d8c0b16..bef2d77e84d0b92042c586c213ca3b69530557ab 100644 (file)
@@ -31,16 +31,16 @@ struct flows_stats {
 };
 
 typedef int (*BypassedCheckFunc)(struct flows_stats *bypassstats,
-                                 struct timespec *curtime);
-typedef int (*BypassedUpdateFunc)(Flow *f, Packet *p);
+                                 struct timespec *curtime, void *data);
+typedef int (*BypassedUpdateFunc)(Flow *f, Packet *p, void *data);
 
 void FlowAddToBypassed(Flow *f);
 
 void BypassedFlowManagerThreadSpawn(void);
 void TmModuleBypassedFlowManagerRegister(void);
 
-int BypassedFlowManagerRegisterCheckFunc(BypassedCheckFunc CheckFunc);
-int BypassedFlowManagerRegisterUpdateFunc(BypassedUpdateFunc UpdateFunc);
+int BypassedFlowManagerRegisterCheckFunc(BypassedCheckFunc CheckFunc, void *data);
+int BypassedFlowManagerRegisterUpdateFunc(BypassedUpdateFunc UpdateFunc, void *data);
 
 void BypassedFlowUpdate(Flow *f, Packet *p);
 
index 5db87a637f178f51ca7f79eb4ece9a1028ec60d2..8b109ad6d61de8a4b067b2b6fe5a3f34571aec36 100644 (file)
@@ -602,6 +602,68 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
     return f;
 }
 
+static inline int FlowCompareKey(Flow *f, FlowKey *key)
+{
+    if ((f->proto != IPPROTO_TCP) && (f->proto != IPPROTO_UDP))
+        return 0;
+    return CMP_FLOW(f, key);
+}
+
+/** \brief Look for existing Flow using a FlowKey
+ *
+ * Hash retrieval function for flows. Looks up the hash bucket containing the
+ * flow pointer. Then compares the packet with the found flow to see if it is
+ * the flow we need. If it isn't, walk the list until the right flow is found.
+ *
+ *
+ *  \retval f *LOCKED* flow or NULL
+ */
+Flow *FlowGetExistingFlowFromHash(FlowKey *key, const uint32_t hash)
+{
+    Flow *f = NULL;
+
+    /* get our hash bucket and lock it */
+    FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
+    FBLOCK_LOCK(fb);
+
+    SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+    /* return if the bucket don't have a flow */
+    if (fb->head == NULL) {
+        FBLOCK_UNLOCK(fb);
+        return NULL;
+    }
+
+    /* ok, we have a flow in the bucket. Let's find out if it is our flow */
+    f = fb->head;
+
+    /* see if this is the flow we are looking for */
+    if (FlowCompareKey(f, key) == 0) {
+        while (f) {
+            f = f->hnext;
+
+            if (f == NULL) {
+                FBLOCK_UNLOCK(fb);
+                return NULL;
+            }
+
+            if (FlowCompareKey(f, key) != 0) {
+                /* found our flow, lock & return */
+                FLOWLOCK_WRLOCK(f);
+
+                FBLOCK_UNLOCK(fb);
+                return f;
+            }
+        }
+    }
+
+    /* lock & return */
+    FLOWLOCK_WRLOCK(f);
+
+    FBLOCK_UNLOCK(fb);
+    return f;
+}
+
 /** \internal
  *  \brief Get a flow from the hash directly.
  *
index 18a8c324b5a0426cca2178b4af32a084407cef90..38268a7729e70e48c7148cc62e75c0d580ede75d 100644 (file)
@@ -76,6 +76,8 @@ typedef struct FlowBucket_ {
 
 Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *, Flow **);
 
+Flow *FlowGetExistingFlowFromHash(FlowKey * key, uint32_t hash);
+
 void FlowDisableTcpReuseHandling(void);
 
 #endif /* __FLOW_HASH_H__ */
index 269de7c25f705d9d05f9eea58e8281dba63b8b87..201e48875c083d28c6b613cc8305f9cdf7220caa 100644 (file)
@@ -411,7 +411,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p)
             COPY_TIMESTAMP(&p->ts, &f->lastts);
             FlowUpdateState(f, FLOW_STATE_LOCAL_BYPASSED);
         } else {
-            /* In IPS mode the packet could come from the over interface so it would
+            /* In IPS mode the packet could come from the other interface so it would
              * need to be bypassed */
             if (EngineModeIsIPS()) {
                 BypassedFlowUpdate(f, p);
index 56216cb4f3d00814e55e40f3f1c51972f38d7dea..50951e59702f800de257a2ebeb34afa22e7f88cf 100644 (file)
@@ -282,7 +282,7 @@ typedef struct FlowKey_
     Port sp, dp;
     uint8_t proto;
     uint8_t recursion_level;
-
+    uint16_t vlan_id[2];
 } FlowKey;
 
 typedef struct FlowAddress_ {
index 7481dee6a68476acb30ce42cf9e929c4f66523ba..94bff13ea3f4f5b006fad2c06a177f0967cb9247 100644 (file)
@@ -156,6 +156,9 @@ static void *ParseAFPConfig(const char *iface)
     aconf->copy_mode = AFP_COPY_MODE_NONE;
     aconf->block_timeout = 10;
     aconf->block_size = getpagesize() << AFP_BLOCK_SIZE_DEFAULT_ORDER;
+#ifdef HAVE_PACKET_EBPF
+    aconf->ebpf_t_config.cpus_count = UtilCpuGetNumProcessorsConfigured();
+#endif
 
     if (ConfGet("bpf-filter", &bpf_filter) == 1) {
         if (strlen(bpf_filter) > 0) {
@@ -405,6 +408,7 @@ static void *ParseAFPConfig(const char *iface)
                   ebpf_file);
 #endif
         aconf->ebpf_filter_file = ebpf_file;
+        aconf->ebpf_t_config.mode = AFP_MODE_EBPF_BYPASS;
         ConfGetChildValueBoolWithDefault(if_root, if_default, "bypass", &conf_val);
         if (conf_val) {
 #ifdef HAVE_PACKET_EBPF
@@ -412,8 +416,8 @@ static void *ParseAFPConfig(const char *iface)
                     aconf->iface);
             aconf->flags |= AFP_BYPASS;
             RunModeEnablesBypassManager();
-            BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout);
-            BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow);
+            BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout, (void *) &(aconf->ebpf_t_config));
+            BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL);
 #else
             SCLogError(SC_ERR_UNIMPLEMENTED, "Bypass set but eBPF support is not built-in");
 #endif
@@ -439,6 +443,7 @@ static void *ParseAFPConfig(const char *iface)
     } else {
         SCLogInfo("af-packet will use '%s' as XDP filter file",
                   ebpf_file);
+        aconf->ebpf_t_config.mode = AFP_MODE_XDP_BYPASS;
         aconf->xdp_filter_file = ebpf_file;
         ConfGetChildValueBoolWithDefault(if_root, if_default, "bypass", &conf_val);
         if (conf_val) {
@@ -447,7 +452,8 @@ static void *ParseAFPConfig(const char *iface)
                     aconf->iface);
             aconf->flags |= AFP_XDPBYPASS;
             RunModeEnablesBypassManager();
-            BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout);
+            BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout, (void *) &(aconf->ebpf_t_config));
+            BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL);
 #else
             SCLogError(SC_ERR_UNIMPLEMENTED, "Bypass set but XDP support is not built-in");
 #endif
@@ -468,6 +474,15 @@ static void *ParseAFPConfig(const char *iface)
                              "Invalid xdp-mode value: '%s'", xdp_mode);
             }
         }
+
+        if (ConfGetChildValueBoolWithDefault(if_root, if_default, "no-percpu-hash", (int *)&boolval) == 1) {
+            if (boolval) {
+                SCLogConfig("Not using percpu hash on iface %s",
+                        aconf->iface);
+                aconf->ebpf_t_config.cpus_count = 1;
+            }
+        }
+
 #endif
     }
 
index 21160bc651e76c1267d13b04e2cb763eba266149..fdd9c1deb41a1490b763c2fcaabaa1c61c97d2a1 100644 (file)
@@ -292,6 +292,8 @@ typedef struct AFPThreadVars_
 
     uint8_t xdp_mode;
 
+    unsigned int nr_cpus;
+
 } AFPThreadVars;
 
 TmEcode ReceiveAFP(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
@@ -311,8 +313,6 @@ static int AFPDerefSocket(AFPPeer* peer);
 static int AFPRefSocket(AFPPeer* peer);
 
 
-static unsigned int nr_cpus;
-
 /**
  * \brief Registration Function for RecieveAFP.
  * \todo Unit tests are needed for this module.
@@ -330,7 +330,6 @@ void TmModuleReceiveAFPRegister (void)
     tmm_modules[TMM_RECEIVEAFP].cap_flags = SC_CAP_NET_RAW;
     tmm_modules[TMM_RECEIVEAFP].flags = TM_FLAG_RECEIVE_TM;
 
-    nr_cpus = UtilCpuGetNumProcessorsConfigured();
 }
 
 
@@ -632,6 +631,7 @@ static int AFPRead(AFPThreadVars *ptv)
 #ifdef HAVE_PACKET_EBPF
         p->afp_v.v4_map_fd = ptv->v4_map_fd;
         p->afp_v.v6_map_fd = ptv->v6_map_fd;
+        p->afp_v.nr_cpus = ptv->nr_cpus;
 #endif
     }
     if (ptv->flags & AFP_XDPBYPASS) {
@@ -639,6 +639,7 @@ static int AFPRead(AFPThreadVars *ptv)
 #ifdef HAVE_PACKET_EBPF
         p->afp_v.v4_map_fd = ptv->v4_map_fd;
         p->afp_v.v6_map_fd = ptv->v6_map_fd;
+        p->afp_v.nr_cpus = ptv->nr_cpus;
 #endif
     }
 
@@ -917,6 +918,7 @@ static int AFPReadFromRing(AFPThreadVars *ptv)
 #ifdef HAVE_PACKET_EBPF
             p->afp_v.v4_map_fd = ptv->v4_map_fd;
             p->afp_v.v6_map_fd = ptv->v6_map_fd;
+            p->afp_v.nr_cpus = ptv->nr_cpus;
 #endif
         }
         if (ptv->flags & AFP_XDPBYPASS) {
@@ -924,6 +926,7 @@ static int AFPReadFromRing(AFPThreadVars *ptv)
 #ifdef HAVE_PACKET_EBPF
             p->afp_v.v4_map_fd = ptv->v4_map_fd;
             p->afp_v.v6_map_fd = ptv->v6_map_fd;
+            p->afp_v.nr_cpus = ptv->nr_cpus;
 #endif
         }
 
@@ -1051,12 +1054,14 @@ static inline int AFPParsePacketV3(AFPThreadVars *ptv, struct tpacket_block_desc
 #ifdef HAVE_PACKET_EBPF
         p->afp_v.v4_map_fd = ptv->v4_map_fd;
         p->afp_v.v6_map_fd = ptv->v6_map_fd;
+        p->afp_v.nr_cpus = ptv->nr_cpus;
 #endif
     } else if (ptv->flags & AFP_XDPBYPASS) {
         p->BypassPacketsFlow = AFPXDPBypassCallback;
 #ifdef HAVE_PACKET_EBPF
         p->afp_v.v4_map_fd = ptv->v4_map_fd;
         p->afp_v.v6_map_fd = ptv->v6_map_fd;
+        p->afp_v.nr_cpus = ptv->nr_cpus;
 #endif
     }
 
@@ -2284,10 +2289,13 @@ TmEcode AFPSetBPFFilter(AFPThreadVars *ptv)
  *
  * \param mapfd file descriptor of the protocol bypass table
  * \param key data to use as key in the table
- * \param inittime time of creation of the entry (in monotonic clock)
+ * \param pkts_cnt packet count for the half flow
+ * \param bytes_cnt bytes count for the half flow
  * \return 0 in case of error, 1 if success
  */
-static int AFPInsertHalfFlow(int mapd, void *key, uint64_t inittime)
+static int AFPInsertHalfFlow(int mapd, void *key, uint32_t hash,
+                             uint64_t pkts_cnt, uint64_t bytes_cnt,
+                             unsigned int nr_cpus)
 {
     struct pair value[nr_cpus];
     unsigned int i;
@@ -2297,13 +2305,16 @@ static int AFPInsertHalfFlow(int mapd, void *key, uint64_t inittime)
     }
 
     /* We use a per CPU structure so we have to set an array of values as the kernel
-     * is not duplicating the data on each CPU by itself. */
-    for (i = 0; i < nr_cpus; i++) {
-        value[i].time = inittime;
+     * is not duplicating the data on each CPU by itself. We set the first entry to
+     * the actual flow pkts and bytes count as we need to continue from actual point
+     * to detect an absence of packets in the future. */
+    value[0].packets = pkts_cnt;
+    value[0].bytes = bytes_cnt;
+    value[0].hash = hash;
+    for (i = 1; i < nr_cpus; i++) {
         value[i].packets = 0;
         value[i].bytes = 0;
     }
-    SCLogDebug("Inserting element in eBPF mapping: %lu", inittime);
     if (bpf_map_update_elem(mapd, key, value, BPF_NOEXIST) != 0) {
         switch (errno) {
             /* no more place in the hash */
@@ -2353,14 +2364,6 @@ static int AFPBypassCallback(Packet *p)
     if (IS_TUNNEL_PKT(p)) {
         return 0;
     }
-    struct timespec curtime;
-    uint64_t inittime = 0;
-    /* In eBPF, the function that we have use to get time return the
-     * monotonic clock (the time since start of the computer). So we
-     * can't use the timestamp of the packet. */
-    if (clock_gettime(CLOCK_MONOTONIC, &curtime) == 0) {
-        inittime = curtime.tv_sec * 1000000000;
-    }
     if (PKT_IS_IPV4(p)) {
         SCLogDebug("add an IPv4");
         if (p->afp_v.v4_map_fd == -1) {
@@ -2373,17 +2376,19 @@ static int AFPBypassCallback(Packet *p)
         key.port16[1] = GET_TCP_DST_PORT(p);
 
         key.ip_proto = IPV4_GET_IPPROTO(p);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->todstpktcnt,
+                              p->flow->todstbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
         key.src = htonl(GET_IPV4_DST_ADDR_U32(p));
         key.dst = htonl(GET_IPV4_SRC_ADDR_U32(p));
         key.port16[0] = GET_TCP_DST_PORT(p);
         key.port16[1] = GET_TCP_SRC_PORT(p);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->tosrcpktcnt,
+                              p->flow->tosrcbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
-        EBPFUpdateFlow(p->flow, p);
+        EBPFUpdateFlow(p->flow, p, NULL);
         return 1;
     }
     /* For IPv6 case we don't handle extended header in eBPF */
@@ -2402,7 +2407,8 @@ static int AFPBypassCallback(Packet *p)
         key.port16[0] = GET_TCP_SRC_PORT(p);
         key.port16[1] = GET_TCP_DST_PORT(p);
         key.ip_proto = IPV6_GET_NH(p);
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->todstpktcnt,
+                              p->flow->todstbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
         for (i = 0; i < 4; i++) {
@@ -2411,10 +2417,11 @@ static int AFPBypassCallback(Packet *p)
         }
         key.port16[0] = GET_TCP_DST_PORT(p);
         key.port16[1] = GET_TCP_SRC_PORT(p);
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->tosrcpktcnt,
+                              p->flow->tosrcbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
-        EBPFUpdateFlow(p->flow, p);
+        EBPFUpdateFlow(p->flow, p, NULL);
         return 1;
     }
 #endif
@@ -2447,34 +2454,28 @@ static int AFPXDPBypassCallback(Packet *p)
     if (IS_TUNNEL_PKT(p)) {
         return 0;
     }
-    struct timespec curtime;
-    uint64_t inittime = 0;
-    /* In eBPF, the function that we have use to get time return the
-     * monotonic clock (the time since start of the computer). So we
-     * can't use the timestamp of the packet. */
-    if (clock_gettime(CLOCK_MONOTONIC, &curtime) == 0) {
-        inittime = curtime.tv_sec * 1000000000;
-    }
     if (PKT_IS_IPV4(p)) {
         struct flowv4_keys key = {};
         if (p->afp_v.v4_map_fd == -1) {
             return 0;
         }
-        key.src = GET_IPV4_SRC_ADDR_U32(p);
-        key.dst = GET_IPV4_DST_ADDR_U32(p);
+        key.src = p->flow->src.addr_data32[0];
+        key.dst = p->flow->dst.addr_data32[0];
         /* In the XDP filter we get port from parsing of packet and not from skb
          * (as in eBPF filter) so we need to pass from host to network order */
-        key.port16[0] = htons(GET_TCP_SRC_PORT(p));
-        key.port16[1] = htons(GET_TCP_DST_PORT(p));
+        key.port16[0] = htons(p->flow->sp);
+        key.port16[1] = htons(p->flow->dp);
         key.ip_proto = IPV4_GET_IPPROTO(p);
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->todstpktcnt,
+                              p->flow->todstbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
-        key.src = GET_IPV4_DST_ADDR_U32(p);
-        key.dst = GET_IPV4_SRC_ADDR_U32(p);
-        key.port16[0] = htons(GET_TCP_DST_PORT(p));
-        key.port16[1] = htons(GET_TCP_SRC_PORT(p));
-        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, inittime) == 0) {
+        key.src = p->flow->dst.addr_data32[0];
+        key.dst = p->flow->src.addr_data32[0];
+        key.port16[0] = htons(p->flow->dp);
+        key.port16[1] = htons(p->flow->sp);
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->tosrcpktcnt,
+                              p->flow->tosrcbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
         return 1;
@@ -2495,7 +2496,8 @@ static int AFPXDPBypassCallback(Packet *p)
         key.port16[0] = htons(GET_TCP_SRC_PORT(p));
         key.port16[1] = htons(GET_TCP_DST_PORT(p));
         key.ip_proto = IPV6_GET_NH(p);
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->todstpktcnt,
+                              p->flow->todstbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
         for (i = 0; i < 4; i++) {
@@ -2504,7 +2506,8 @@ static int AFPXDPBypassCallback(Packet *p)
         }
         key.port16[0] = htons(GET_TCP_DST_PORT(p));
         key.port16[1] = htons(GET_TCP_SRC_PORT(p));
-        if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, inittime) == 0) {
+        if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, p->flow->tosrcpktcnt,
+                              p->flow->tosrcbytecnt, p->afp_v.nr_cpus) == 0) {
             return 0;
         }
         return 1;
@@ -2580,6 +2583,7 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, const void *initdata, void **data)
     ptv->ebpf_lb_fd = afpconfig->ebpf_lb_fd;
     ptv->ebpf_filter_fd = afpconfig->ebpf_filter_fd;
     ptv->xdp_mode = afpconfig->xdp_mode;
+    ptv->nr_cpus = UtilCpuGetNumProcessorsConfigured();
 
 #ifdef HAVE_PACKET_EBPF
     if (ptv->flags & (AFP_BYPASS|AFP_XDPBYPASS)) {
@@ -2592,6 +2596,7 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, const void *initdata, void **data)
             SCLogError(SC_ERR_INVALID_VALUE, "Can't find eBPF map fd for '%s'", "flow_table_v6");
         }
     }
+    ptv->nr_cpus = afpconfig->ebpf_t_config.cpus_count;
 #endif
 
 #ifdef PACKET_STATISTICS
index e6db995349b72754e1ecdbc2295acb451c3defd0..c192dffc0664772cf071ea9f1df65ebffed43431 100644 (file)
 #endif /* HAVE_PACKET_FANOUT */
 #include "queue.h"
 
+#ifdef HAVE_PACKET_EBPF
+#define AFP_MODE_XDP_BYPASS 1
+#define AFP_MODE_EBPF_BYPASS 2
+struct ebpf_timeout_config {
+    uint16_t cpus_count;
+    uint8_t mode;
+};
+#endif
+
 /* value for flags */
 #define AFP_RING_MODE (1<<0)
 #define AFP_ZERO_COPY (1<<1)
@@ -98,6 +107,9 @@ typedef struct AFPIfaceConfig_
     int xdp_filter_fd;
     uint8_t xdp_mode;
     const char *out_iface;
+#ifdef HAVE_PACKET_EBPF
+    struct ebpf_timeout_config ebpf_t_config;
+#endif
     SC_ATOMIC_DECLARE(unsigned int, ref);
     void (*DerefFunc)(void *);
 } AFPIfaceConfig;
@@ -137,6 +149,7 @@ typedef struct AFPPacketVars_
     uint8_t copy_mode;
     int v4_map_fd;
     int v6_map_fd;
+    unsigned int nr_cpus;
 } AFPPacketVars;
 
 #define AFPV_CLEANUP(afpv) do {           \
index 1306415a9aa836ca54db8868ce1d0965a9ca8234..2cf48ac451e52fc3abaa54acd4f750c43369a10c 100644 (file)
@@ -47,6 +47,8 @@
 
 #include "device-storage.h"
 #include "flow-storage.h"
+#include "flow.h"
+#include "flow-hash.h"
 
 #include <bpf/libbpf.h>
 #include <bpf/bpf.h>
@@ -312,72 +314,40 @@ int EBPFSetupXDP(const char *iface, int fd, uint8_t flags)
     return 0;
 }
 
-/**
- * Decide if an IPV4 flow needs to be timeouted
- *
- * The filter is maintaining for each half flow a struct pair:: structure in
- * the kernel where it does accounting and timestamp update. So by comparing
- * the current timestamp to the timestamp in the struct pair we can know that
- * no packet have been seen on a half flow since a certain delay.
- *
- * If a per-CPU array map is used, this function has only a per-CPU view so
- * the flow will be deleted from the table if EBPFBypassedFlowV4Timeout() return
- * 1 for all CPUs.
- *
- * \param fd the file descriptor of the flow table map
- * \param key the key of the element
- * \param value the value of the element in the hash
- * \param curtime the current time
- * \return 1 if timeouted 0 if not
- */
-static int EBPFBypassedFlowV4Timeout(int fd, struct flowv4_keys *key,
-                                     struct pair *value, struct timespec *curtime)
+static int EBPFUpdateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_key,
+                               uint32_t hash, uint64_t pkts_cnt, uint64_t bytes_cnt)
 {
-    SCLogDebug("Got curtime %" PRIu64 " and value %" PRIu64 " (sp:%d, dp:%d) %u",
-               curtime->tv_sec, value->time / 1000000000,
-               key->port16[0], key->port16[1], key->ip_proto
-              );
-
-    if (curtime->tv_sec - value->time / 1000000000 > BYPASSED_FLOW_TIMEOUT) {
-        SCLogDebug("Got no packet for %d -> %d at %" PRIu64,
-                   key->port16[0], key->port16[1], value->time);
-        return 1;
-    }
-    return 0;
-}
-
-/**
- * Decide if an IPV6 flow needs to be timeouted
- *
- * The filter is maintaining for each half flow a struct pair:: structure in
- * the kernel where it does accounting and timestamp update. So by comparing
- * the current timestamp to the timestamp in the struct pair we can know that
- * no packet have been seen on a half flow since a certain delay.
- *
- * If a per-CPU array map is used, this function has only a per-CPU view so
- * the flow will be deleted from the table if EBPFBypassedFlowV4Timeout() return
- * 1 for all CPUs.
- *
- * \param fd the file descriptor of the flow table map
- * \param key the key of the element
- * \param value the value of the element in the hash
- * \param curtime the current time
- * \return 1 if timeouted 0 if not
- */
-static int EBPFBypassedFlowV6Timeout(int fd, struct flowv6_keys *key,
-                                     struct pair *value, struct timespec *curtime)
-{
-    SCLogDebug("Got curtime %" PRIu64 " and value %" PRIu64 " (sp:%d, dp:%d)",
-               curtime->tv_sec, value->time / 1000000000,
-               key->port16[0], key->port16[1]
-              );
-
-    if (curtime->tv_sec - value->time / 1000000000 > BYPASSED_FLOW_TIMEOUT) {
-        SCLogDebug("Got no packet for %d -> %d at %" PRIu64,
-                   key->port16[0], key->port16[1], value->time);
-        return 1;
+    Flow *f = FlowGetExistingFlowFromHash(flow_key, hash);
+    if (f != NULL) {
+        SCLogDebug("bypassed flow found %d -> %d, doing accounting",
+                    f->sp, f->dp);
+        if (flow_key->sp == f->sp) {
+            if (pkts_cnt != f->todstpktcnt) {
+                f->todstpktcnt = pkts_cnt;
+                f->todstbytecnt = bytes_cnt;
+                /* interval based so no meaning to update the millisecond.
+                 * Let's keep it fast and simple */
+                f->lastts.tv_sec = time(NULL);
+            }
+        } else {
+            if (pkts_cnt != f->tosrcpktcnt) {
+                f->tosrcpktcnt = pkts_cnt;
+                f->tosrcbytecnt = bytes_cnt;
+                f->lastts.tv_sec = time(NULL);
+            }
+        }
+        FLOWLOCK_UNLOCK(f);
+        return 0;
+    } else {
+        /* Flow has disappeared so it has been removed due to timeout.
+         * We discard the flow and do accounting */
+        SCLogDebug("No flow for %d -> %d", flow_key->sp, flow_key->dp);
+        SCLogDebug("Dead with pkts %lu bytes %lu", pkts_cnt, bytes_cnt);
+        flowstats->count++;
+        flowstats->packets += pkts_cnt;
+        flowstats->bytes += bytes_cnt;
+        return pkts_cnt;
     }
-    return 0;
 }
 
 /**
@@ -388,58 +358,72 @@ static int EBPFBypassedFlowV6Timeout(int fd, struct flowv6_keys *key,
  */
 static int EBPFForEachFlowV4Table(LiveDevice *dev, const char *name,
                                   struct flows_stats *flowstats,
-                                  struct timespec *ctime)
+                                  struct timespec *ctime,
+                                  struct ebpf_timeout_config *tcfg)
 {
     int mapfd = EBPFGetMapFDByName(dev->dev, name);
     struct flowv4_keys key = {}, next_key;
     int found = 0;
     unsigned int i;
-    unsigned int nr_cpus = UtilCpuGetNumProcessorsConfigured();
-    if (nr_cpus == 0) {
-        SCLogWarning(SC_ERR_INVALID_VALUE, "Unable to get CPU count");
+    uint64_t hash_cnt = 0;
+
+    if (tcfg->cpus_count == 0) {
+        SCLogWarning(SC_ERR_INVALID_VALUE, "CPU count should not be 0");
         return 0;
     }
 
-    uint64_t hash_cnt = 0;
     while (bpf_map_get_next_key(mapfd, &key, &next_key) == 0) {
-        bool purge = true;
         uint64_t pkts_cnt = 0;
         uint64_t bytes_cnt = 0;
         hash_cnt++;
-        /* We use a per CPU structure so we will get a array of values. */
-        struct pair values_array[nr_cpus];
+        /* We use a per CPU structure so we will get a array of values. But if nr_cpus
+         * is 1 then we have a global hash. */
+        struct pair values_array[tcfg->cpus_count];
         memset(values_array, 0, sizeof(values_array));
         int res = bpf_map_lookup_elem(mapfd, &key, values_array);
         if (res < 0) {
             SCLogDebug("no entry in v4 table for %d -> %d", key.port16[0], key.port16[1]);
+            SCLogDebug("errno: (%d) %s", errno, strerror(errno));
             key = next_key;
             continue;
         }
-        for (i = 0; i < nr_cpus; i++) {
-            int ret = EBPFBypassedFlowV4Timeout(mapfd, &key, &values_array[i], ctime);
-            if (ret) {
-                /* no packet for the flow on this CPU, let's start accumulating
-                   value so we can compute the counters */
-                SCLogDebug("%d:%lu: Adding pkts %lu bytes %lu", i, values_array[i].time / 1000000000,
-                            values_array[i].packets, values_array[i].bytes);
-                pkts_cnt += values_array[i].packets;
-                bytes_cnt += values_array[i].bytes;
-            } else {
-                /* Packet seen on one CPU so we keep the flow */
-                purge = false;
-                break;
-            }
+        for (i = 0; i < tcfg->cpus_count; i++) {
+            /* let's start accumulating value so we can compute the counters */
+            SCLogDebug("%d: Adding pkts %lu bytes %lu", i,
+                       values_array[i].packets, values_array[i].bytes);
+            pkts_cnt += values_array[i].packets;
+            bytes_cnt += values_array[i].bytes;
         }
-        /* No packet seen, we discard the flow and do accounting */
-        if (purge) {
-            SCLogDebug("Got no packet for %d -> %d", key.port16[0], key.port16[1]);
-            SCLogDebug("Dead with pkts %lu bytes %lu", pkts_cnt, bytes_cnt);
-            flowstats->count++;
-            flowstats->packets += pkts_cnt;
-            flowstats->bytes += bytes_cnt;
+        /* Get the corresponding Flow in the Flow table to compare and update
+         * its counters and lastseen if needed */
+        FlowKey flow_key;
+        if (tcfg->mode == AFP_MODE_XDP_BYPASS) {
+            flow_key.sp = ntohs(key.port16[0]);
+            flow_key.dp = ntohs(key.port16[1]);
+        } else {
+            flow_key.sp = key.port16[0];
+            flow_key.dp = key.port16[1];
+        }
+        flow_key.src.family = AF_INET;
+        flow_key.src.addr_data32[0] = key.src;
+        flow_key.src.addr_data32[1] = 0;
+        flow_key.src.addr_data32[2] = 0;
+        flow_key.src.addr_data32[3] = 0;
+        flow_key.dst.family = AF_INET;
+        flow_key.dst.addr_data32[0] = key.dst;
+        flow_key.dst.addr_data32[1] = 0;
+        flow_key.dst.addr_data32[2] = 0;
+        flow_key.dst.addr_data32[3] = 0;
+        flow_key.vlan_id[0] = key.vlan_id[0];
+        flow_key.vlan_id[1] = key.vlan_id[1];
+        flow_key.proto = key.ip_proto;
+        flow_key.recursion_level = 0;
+        pkts_cnt = EBPFUpdateFlowForKey(flowstats, &flow_key, values_array[0].hash,
+                                        pkts_cnt, bytes_cnt);
+        if (pkts_cnt > 0) {
             SC_ATOMIC_ADD(dev->bypassed, pkts_cnt);
-            found = 1;
             EBPFDeleteKey(mapfd, &key);
+            found = 1;
         }
         key = next_key;
     }
@@ -460,49 +444,71 @@ static int EBPFForEachFlowV4Table(LiveDevice *dev, const char *name,
  */
 static int EBPFForEachFlowV6Table(LiveDevice *dev, const char *name,
                                   struct flows_stats *flowstats,
-                                  struct timespec *ctime)
+                                  struct timespec *ctime,
+                                  struct ebpf_timeout_config *tcfg)
 {
     int mapfd = EBPFGetMapFDByName(dev->dev, name);
     struct flowv6_keys key = {}, next_key;
     int found = 0;
     unsigned int i;
-    unsigned int nr_cpus = UtilCpuGetNumProcessorsConfigured();
-    if (nr_cpus == 0) {
-        SCLogWarning(SC_ERR_INVALID_VALUE, "Unable to get CPU count");
+    uint64_t hash_cnt = 0;
+
+    if (tcfg->cpus_count == 0) {
+        SCLogWarning(SC_ERR_INVALID_VALUE, "CPU count should not be 0");
         return 0;
     }
 
-    uint64_t hash_cnt = 0;
     while (bpf_map_get_next_key(mapfd, &key, &next_key) == 0) {
-        bool purge = true;
         uint64_t pkts_cnt = 0;
         uint64_t bytes_cnt = 0;
         hash_cnt++;
-        struct pair values_array[nr_cpus];
+        /* We use a per CPU structure so we will get a array of values. But if nr_cpus
+         * is 1 then we have a global hash. */
+        struct pair values_array[tcfg->cpus_count];
         memset(values_array, 0, sizeof(values_array));
         int res = bpf_map_lookup_elem(mapfd, &key, values_array);
         if (res < 0) {
-            SCLogDebug("no entry in v6 table for %d -> %d", key.port16[0], key.port16[1]);
+            SCLogDebug("no entry in v4 table for %d -> %d", key.port16[0], key.port16[1]);
             key = next_key;
             continue;
         }
-        for (i = 0; i < nr_cpus; i++) {
-            int ret = EBPFBypassedFlowV6Timeout(mapfd, &key, &values_array[i], ctime);
-            if (ret) {
-                pkts_cnt += values_array[i].packets;
-                bytes_cnt += values_array[i].bytes;
-            } else {
-                purge = false;
-                break;
-            }
+        for (i = 0; i < tcfg->cpus_count; i++) {
+            /* let's start accumulating value so we can compute the counters */
+            SCLogDebug("%d: Adding pkts %lu bytes %lu", i,
+                       values_array[i].packets, values_array[i].bytes);
+            pkts_cnt += values_array[i].packets;
+            bytes_cnt += values_array[i].bytes;
+        }
+        /* Get the corresponding Flow in the Flow table to compare and update
+         * its counters  and lastseen if needed */
+        FlowKey flow_key;
+        if (tcfg->mode == AFP_MODE_XDP_BYPASS) {
+            flow_key.sp = ntohs(key.port16[0]);
+            flow_key.dp = ntohs(key.port16[1]);
+        } else {
+            flow_key.sp = key.port16[0];
+            flow_key.dp = key.port16[1];
         }
-        if (purge) {
-            flowstats->count++;
-            flowstats->packets += pkts_cnt;
-            flowstats->bytes += bytes_cnt;
+        flow_key.src.family = AF_INET6;
+        flow_key.src.addr_data32[0] = key.src[0];
+        flow_key.src.addr_data32[1] = key.src[1];
+        flow_key.src.addr_data32[2] = key.src[2];
+        flow_key.src.addr_data32[3] = key.src[3];
+        flow_key.dst.family = AF_INET6;
+        flow_key.dst.addr_data32[0] = key.dst[0];
+        flow_key.dst.addr_data32[1] = key.dst[1];
+        flow_key.dst.addr_data32[2] = key.dst[2];
+        flow_key.dst.addr_data32[3] = key.dst[3];
+        flow_key.vlan_id[0] = key.vlan_id[0];
+        flow_key.vlan_id[1] = key.vlan_id[1];
+        flow_key.proto = key.ip_proto;
+        flow_key.recursion_level = 0;
+        pkts_cnt = EBPFUpdateFlowForKey(flowstats, &flow_key, values_array[0].hash,
+                                        pkts_cnt, bytes_cnt);
+        if (pkts_cnt > 0) {
             SC_ATOMIC_ADD(dev->bypassed, pkts_cnt);
-            found = 1;
             EBPFDeleteKey(mapfd, &key);
+            found = 1;
         }
         key = next_key;
     }
@@ -522,16 +528,25 @@ static int EBPFForEachFlowV6Table(LiveDevice *dev, const char *name,
  *
  */
 int EBPFCheckBypassedFlowTimeout(struct flows_stats *bypassstats,
-                                        struct timespec *curtime)
+                                        struct timespec *curtime,
+                                        void *data)
 {
     struct flows_stats local_bypassstats = { 0, 0, 0};
     int ret = 0;
     int tcount = 0;
     LiveDevice *ldev = NULL, *ndev;
+    struct ebpf_timeout_config *cfg = (struct ebpf_timeout_config *)data;
+
+    if (cfg == NULL) {
+        SCLogError(SC_ERR_INVALID_VALUE,
+                   "Programming error, contact developer");
+        return 0;
+    }
 
     while(LiveDeviceForEach(&ldev, &ndev)) {
         tcount = EBPFForEachFlowV4Table(ldev, "flow_table_v4",
-                                        &local_bypassstats, curtime);
+                                        &local_bypassstats, curtime,
+                                        cfg);
         if (tcount) {
             bypassstats->count = local_bypassstats.count;
             bypassstats->packets = local_bypassstats.packets ;
@@ -540,7 +555,8 @@ int EBPFCheckBypassedFlowTimeout(struct flows_stats *bypassstats,
         }
         memset(&local_bypassstats, 0, sizeof(local_bypassstats));
         tcount = EBPFForEachFlowV6Table(ldev, "flow_table_v6",
-                                        &local_bypassstats, curtime);
+                                        &local_bypassstats, curtime,
+                                        cfg);
         if (tcount) {
             bypassstats->count += local_bypassstats.count;
             bypassstats->packets += local_bypassstats.packets ;
@@ -693,7 +709,12 @@ int EBPFSetPeerIface(const char *iface, const char *out_iface)
     return 0;
 }
 
-int EBPFUpdateFlow(Flow *f, Packet *p)
+/**
+ * Bypass the flow on all ifaces it is seen on. This is used
+ * in IPS mode.
+ */
+
+int EBPFUpdateFlow(Flow *f, Packet *p, void *data)
 {
     BypassedIfaceList *ifl = (BypassedIfaceList *)FlowGetStorageById(f, g_flow_storage_id);
     if (ifl == NULL) {
index 1c01ad142093cc7205c2b48b9eb7183e4a974f53..91398ed6bbec5480afbe26586c9ed7a978416f03 100644 (file)
@@ -41,7 +41,8 @@ struct flowv4_keys {
                __be16 port16[2];
        };
        __u32 ip_proto;
-}  __attribute__((__aligned__(8)));
+    __u16 vlan_id[2];
+};
 
 struct flowv6_keys {
     __be32 src[4];
@@ -51,13 +52,14 @@ struct flowv6_keys {
         __be16 port16[2];
     };
     __u32 ip_proto;
-}  __attribute__((__aligned__(8)));
+    __u16 vlan_id[2];
+};
 
 struct pair {
-    uint64_t time;
     uint64_t packets;
     uint64_t bytes;
-} __attribute__((__aligned__(8)));
+    uint32_t hash;
+};
 
 #define EBPF_SOCKET_FILTER  (1<<0)
 #define EBPF_XDP_CODE       (1<<1)
@@ -68,7 +70,8 @@ int EBPFLoadFile(const char *iface, const char *path, const char * section,
 int EBPFSetupXDP(const char *iface, int fd, uint8_t flags);
 
 int EBPFCheckBypassedFlowTimeout(struct flows_stats *bypassstats,
-                                        struct timespec *curtime);
+                                        struct timespec *curtime,
+                                        void *data);
 
 void EBPFRegisterExtension(void);
 
@@ -76,8 +79,8 @@ void EBPFBuildCPUSet(ConfNode *node, char *iface);
 
 int EBPFSetPeerIface(const char *iface, const char *out_iface);
 
-int EBPFUpdateFlow(Flow *f, Packet *p);
-  
+int EBPFUpdateFlow(Flow *f, Packet *p, void *data);
+
 #ifdef BUILD_UNIX_SOCKET
 TmEcode EBPFGetBypassedStats(json_t *cmd, json_t *answer, void *data);
 #endif