From: Eric Leblond Date: Sat, 8 Jun 2019 16:11:22 +0000 (+0200) Subject: bypass: new callback stragegy X-Git-Tag: suricata-5.0.0-rc1~305 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b07bda7a7b223f1c7058335dcb8e9b5843d9af46;p=thirdparty%2Fsuricata.git bypass: new callback stragegy This patch introduces and uses a new bypass strategy based on a callback. EBPF bypass implementation is updated to use this new strategy. Once the flow manager detect that a flow should be timeouted, it asks the capture method if it has seen packets in the interval. If it is the case the lastts of the flow is updated and the timeout is postponed. --- diff --git a/src/decode.c b/src/decode.c index ac6cf2a62d..2c914d2e3a 100644 --- a/src/decode.c +++ b/src/decode.c @@ -408,9 +408,9 @@ void PacketBypassCallback(Packet *p) (state == FLOW_STATE_CAPTURE_BYPASSED)) { return; } - FlowCounters *fc = SCCalloc(sizeof(FlowCounters), 1); + FlowBypassInfo *fc = SCCalloc(sizeof(FlowBypassInfo), 1); if (fc) { - FlowSetStorageById(p->flow, GetFlowBypassCounterID(), fc); + FlowSetStorageById(p->flow, GetFlowBypassInfoID(), fc); } else { return; } diff --git a/src/flow-manager.c b/src/flow-manager.c index d279bb9734..449163d4d5 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -41,6 +41,7 @@ #include "flow-private.h" #include "flow-timeout.h" #include "flow-manager.h" +#include "flow-storage.h" #include "stream-tcp-private.h" #include "stream-tcp-reassemble.h" @@ -236,7 +237,7 @@ static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state) * \retval 0 not timed out * \retval 1 timed out */ -static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts) +static int FlowManagerFlowTimeout(Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts) { /* set the timeout value according to the flow operating mode, * flow's state and protocol.*/ @@ -248,6 +249,26 @@ static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct ti /* do the timeout check */ if (flow_times_out_at >= ts->tv_sec) { + /* for bypassed flow we need to check if the capture method has seen + * something and update the logic */ + FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID()); + if (fc) { + if (fc->BypassUpdate) { + /* flow will be possibly updated */ + bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec); + /* FIXME do global accounting: store pkts and bytes before and add difference here to counter */ + if (update) { + SCLogDebug("Updated flow: %ld", FlowGetId(f)); + flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout); + if (*next_ts == 0 || flow_times_out_at < *next_ts) + *next_ts = flow_times_out_at; + return 0; + } else { + SCLogDebug("No new packet, dead flow %ld", FlowGetId(f)); + return 1; + } + } + } return 0; } diff --git a/src/flow-util.c b/src/flow-util.c index ed95cfc261..47ff767948 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -204,21 +204,28 @@ void FlowInit(Flow *f, const Packet *p) SCReturn; } -int g_bypass_counter_id = -1; +int g_bypass_info_id = -1; -int GetFlowBypassCounterID(void) +int GetFlowBypassInfoID(void) { - return g_bypass_counter_id; + return g_bypass_info_id; } -static void FlowCounterFree(void *x) +static void FlowBypassFree(void *x) { - if (x) - SCFree(x); + FlowBypassInfo *fb = (FlowBypassInfo *) x; + + if (fb == NULL) + return; + + if (fb->bypass_data && fb->BypassFree) { + fb->BypassFree(fb->bypass_data); + } + SCFree(fb); } -void RegisterFlowBypassCounter(void) +void RegisterFlowBypassInfo(void) { - g_bypass_counter_id = FlowStorageRegister("bypass_counters", sizeof(void *), - NULL, FlowCounterFree); + g_bypass_info_id = FlowStorageRegister("bypass_counters", sizeof(void *), + NULL, FlowBypassFree); } diff --git a/src/flow.h b/src/flow.h index 19a9229917..3744a61805 100644 --- a/src/flow.h +++ b/src/flow.h @@ -482,12 +482,15 @@ typedef struct FlowProtoFreeFunc_ { void (*Freefunc)(void *); } FlowProtoFreeFunc; -typedef struct FlowCounters_ { +typedef struct FlowBypassInfo_ { + bool (* BypassUpdate)(Flow *f, void *data, time_t tsec); + void (* BypassFree)(void *data); + void *bypass_data; uint64_t tosrcpktcnt; uint64_t tosrcbytecnt; uint64_t todstpktcnt; uint64_t todstbytecnt; -} FlowCounters; +} FlowBypassInfo; /** \brief prepare packet for a life with flow * Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup @@ -529,8 +532,8 @@ int FlowSetMemcap(uint64_t size); uint64_t FlowGetMemcap(void); uint64_t FlowGetMemuse(void); -int GetFlowBypassCounterID(void); -void RegisterFlowBypassCounter(void); +int GetFlowBypassInfoID(void); +void RegisterFlowBypassInfo(void); /** ----- Inline functions ----- */ diff --git a/src/output-json-flow.c b/src/output-json-flow.c index ed1db7a76f..7c0fab35c8 100644 --- a/src/output-json-flow.c +++ b/src/output-json-flow.c @@ -200,7 +200,7 @@ void JsonAddFlow(Flow *f, json_t *js, json_t *hjs) json_string(AppProtoToString(f->alproto_expect))); } - FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID()); + FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID()); if (fc) { json_object_set_new(hjs, "pkts_toserver", json_integer(f->todstpktcnt + fc->todstpktcnt)); diff --git a/src/runmode-af-packet.c b/src/runmode-af-packet.c index 09d8310175..13625295d3 100644 --- a/src/runmode-af-packet.c +++ b/src/runmode-af-packet.c @@ -439,17 +439,7 @@ static void *ParseAFPConfig(const char *iface) SCLogConfig("Using bypass kernel functionality for AF_PACKET (iface %s)", aconf->iface); aconf->flags |= AFP_BYPASS; - RunModeEnablesBypassManager(); - struct ebpf_timeout_config *ebt = SCCalloc(1, sizeof(struct ebpf_timeout_config)); - if (ebt == NULL) { - SCLogError(SC_ERR_MEM_ALLOC, "Flow bypass alloc error"); - } else { - memcpy(ebt, &(aconf->ebpf_t_config), sizeof(struct ebpf_timeout_config)); - BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout, - NULL, - (void *)ebt); - BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL); - } + BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL); #else SCLogError(SC_ERR_UNIMPLEMENTED, "Bypass set but eBPF support is not built-in"); #endif @@ -483,11 +473,6 @@ static void *ParseAFPConfig(const char *iface) SCLogConfig("Using bypass kernel functionality for AF_PACKET (iface %s)", aconf->iface); aconf->flags |= AFP_XDPBYPASS; - RunModeEnablesBypassManager(); - /* TODO move that to get it conditional on pinned maps */ - BypassedFlowManagerRegisterCheckFunc(EBPFCheckBypassedFlowTimeout, - EBPFCheckBypassedFlowCreate, - (void *) &(aconf->ebpf_t_config)); BypassedFlowManagerRegisterUpdateFunc(EBPFUpdateFlow, NULL); } #else diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 1daa17bdd9..9645b781a6 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -57,6 +57,7 @@ #include "tmqh-packetpool.h" #include "source-af-packet.h" #include "runmodes.h" +#include "flow-storage.h" #ifdef HAVE_AF_PACKET @@ -2331,6 +2332,28 @@ static int AFPInsertHalfFlow(int mapd, void *key, uint32_t hash, } return 1; } + +static int AFPSetFlowStorage(Packet *p, int map_fd, void *key0, void* key1) +{ + FlowBypassInfo *fc = FlowGetStorageById(p->flow, GetFlowBypassInfoID()); + if (fc) { + EBPFBypassData *eb = SCCalloc(1, sizeof(EBPFBypassData)); + if (eb == NULL) { + SCFree(key0); + SCFree(key1); + return 0; + } + eb->key[0] = key0; + eb->key[1] = key1; + eb->mapfd = map_fd; + eb->cpus_count = p->afp_v.nr_cpus; + fc->BypassUpdate = EBPFBypassUpdate; + fc->BypassFree = EBPFBypassFree; + fc->bypass_data = eb; + } + return 1; +} + #endif /** @@ -2373,29 +2396,45 @@ static int AFPBypassCallback(Packet *p) if (p->afp_v.v4_map_fd == -1) { return 0; } - struct flowv4_keys key = {}; - key.src = htonl(GET_IPV4_SRC_ADDR_U32(p)); - key.dst = htonl(GET_IPV4_DST_ADDR_U32(p)); - key.port16[0] = GET_TCP_SRC_PORT(p); - key.port16[1] = GET_TCP_DST_PORT(p); - key.vlan_id[0] = p->vlan_id[0]; - key.vlan_id[1] = p->vlan_id[1]; + struct flowv4_keys *keys[2]; + keys[0] = SCCalloc(1, sizeof(struct flowv4_keys)); + if (keys[0] == NULL) { + return 0; + } + keys[0]->src = htonl(GET_IPV4_SRC_ADDR_U32(p)); + keys[0]->dst = htonl(GET_IPV4_DST_ADDR_U32(p)); + keys[0]->port16[0] = GET_TCP_SRC_PORT(p); + keys[0]->port16[1] = GET_TCP_DST_PORT(p); + keys[0]->vlan_id[0] = p->vlan_id[0]; + keys[0]->vlan_id[1] = p->vlan_id[1]; - key.ip_proto = IPV4_GET_IPPROTO(p); - if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, + keys[0]->ip_proto = IPV4_GET_IPPROTO(p); + if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[0], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); + return 0; + } + keys[1]= SCCalloc(1, sizeof(struct flowv4_keys)); + if (keys[1] == NULL) { + SCFree(keys[0]); return 0; } - key.src = htonl(GET_IPV4_DST_ADDR_U32(p)); - key.dst = htonl(GET_IPV4_SRC_ADDR_U32(p)); - key.port16[0] = GET_TCP_DST_PORT(p); - key.port16[1] = GET_TCP_SRC_PORT(p); - if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, + keys[1]->src = htonl(GET_IPV4_DST_ADDR_U32(p)); + keys[1]->dst = htonl(GET_IPV4_SRC_ADDR_U32(p)); + keys[1]->port16[0] = GET_TCP_DST_PORT(p); + keys[1]->port16[1] = GET_TCP_SRC_PORT(p); + keys[1]->vlan_id[0] = p->vlan_id[0]; + keys[1]->vlan_id[1] = p->vlan_id[1]; + + keys[1]->ip_proto = IPV4_GET_IPPROTO(p); + if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[1], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); + SCFree(keys[1]); return 0; } EBPFUpdateFlow(p->flow, p, NULL); - return 1; + return AFPSetFlowStorage(p, p->afp_v.v4_map_fd, keys[0], keys[1]); } /* For IPv6 case we don't handle extended header in eBPF */ if (PKT_IS_IPV6(p) && @@ -2405,33 +2444,48 @@ static int AFPBypassCallback(Packet *p) return 0; } SCLogDebug("add an IPv6"); - struct flowv6_keys key = {}; + struct flowv6_keys *keys[2]; + keys[0] = SCCalloc(1, sizeof(struct flowv6_keys)); + if (keys[0] == NULL) { + return 0; + } for (i = 0; i < 4; i++) { - key.src[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]); - key.dst[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]); - } - key.port16[0] = GET_TCP_SRC_PORT(p); - key.port16[1] = GET_TCP_DST_PORT(p); - key.vlan_id[0] = p->vlan_id[0]; - key.vlan_id[1] = p->vlan_id[1]; - key.ip_proto = IPV6_GET_NH(p); - if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash, + keys[0]->src[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]); + keys[0]->dst[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]); + } + keys[0]->port16[0] = GET_TCP_SRC_PORT(p); + keys[0]->port16[1] = GET_TCP_DST_PORT(p); + keys[0]->vlan_id[0] = p->vlan_id[0]; + keys[0]->vlan_id[1] = p->vlan_id[1]; + keys[0]->ip_proto = IPV6_GET_NH(p); + if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[0], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); return 0; } - for (i = 0; i < 4; i++) { - key.src[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]); - key.dst[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]); + keys[1]= SCCalloc(1, sizeof(struct flowv6_keys)); + if (keys[1] == NULL) { + SCFree(keys[0]); + return 0; } - key.port16[0] = GET_TCP_DST_PORT(p); - key.port16[1] = GET_TCP_SRC_PORT(p); - if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash, + for (i = 0; i < 4; i++) { + keys[1]->src[i] = ntohl(GET_IPV6_DST_ADDR(p)[i]); + keys[1]->dst[i] = ntohl(GET_IPV6_SRC_ADDR(p)[i]); + } + keys[1]->port16[0] = GET_TCP_DST_PORT(p); + keys[1]->port16[1] = GET_TCP_SRC_PORT(p); + keys[1]->vlan_id[0] = p->vlan_id[0]; + keys[1]->vlan_id[1] = p->vlan_id[1]; + keys[1]->ip_proto = IPV6_GET_NH(p); + if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[1], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); + SCFree(keys[1]); return 0; } if (p->flow) EBPFUpdateFlow(p->flow, p, NULL); - return 1; + return AFPSetFlowStorage(p, p->afp_v.v6_map_fd, keys[0], keys[1]); } #endif return 0; @@ -2470,32 +2524,48 @@ static int AFPXDPBypassCallback(Packet *p) return 0; } if (PKT_IS_IPV4(p)) { - struct flowv4_keys key = {}; + struct flowv4_keys *keys[2]; + keys[0]= SCCalloc(1, sizeof(struct flowv4_keys)); + if (keys[0] == NULL) { + return 0; + } if (p->afp_v.v4_map_fd == -1) { + SCFree(keys[0]); return 0; } - key.src = p->src.addr_data32[0]; - key.dst = p->dst.addr_data32[0]; + keys[0]->src = p->src.addr_data32[0]; + keys[0]->dst = p->dst.addr_data32[0]; /* In the XDP filter we get port from parsing of packet and not from skb * (as in eBPF filter) so we need to pass from host to network order */ - key.port16[0] = htons(p->sp); - key.port16[1] = htons(p->dp); - key.vlan_id[0] = p->vlan_id[0]; - key.vlan_id[1] = p->vlan_id[1]; - key.ip_proto = IPV4_GET_IPPROTO(p); - if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, + keys[0]->port16[0] = htons(p->sp); + keys[0]->port16[1] = htons(p->dp); + keys[0]->vlan_id[0] = p->vlan_id[0]; + keys[0]->vlan_id[1] = p->vlan_id[1]; + keys[0]->ip_proto = IPV4_GET_IPPROTO(p); + if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[0], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); + return 0; + } + keys[1]= SCCalloc(1, sizeof(struct flowv4_keys)); + if (keys[1] == NULL) { + SCFree(keys[0]); return 0; } - key.src = p->dst.addr_data32[0]; - key.dst = p->src.addr_data32[0]; - key.port16[0] = htons(p->dp); - key.port16[1] = htons(p->sp); - if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, &key, p->flow_hash, + keys[1]->src = p->dst.addr_data32[0]; + keys[1]->dst = p->src.addr_data32[0]; + keys[1]->port16[0] = htons(p->dp); + keys[1]->port16[1] = htons(p->sp); + keys[1]->vlan_id[0] = p->vlan_id[0]; + keys[1]->vlan_id[1] = p->vlan_id[1]; + keys[1]->ip_proto = IPV4_GET_IPPROTO(p); + if (AFPInsertHalfFlow(p->afp_v.v4_map_fd, keys[1], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); + SCFree(keys[1]); return 0; } - return 1; + return AFPSetFlowStorage(p, p->afp_v.v4_map_fd, keys[0], keys[1]); } /* For IPv6 case we don't handle extended header in eBPF */ if (PKT_IS_IPV6(p) && @@ -2505,31 +2575,47 @@ static int AFPXDPBypassCallback(Packet *p) return 0; } int i; - struct flowv6_keys key = {}; + struct flowv6_keys *keys[2]; + keys[0] = SCCalloc(1, sizeof(struct flowv6_keys)); + if (keys[0] == NULL) { + return 0; + } + for (i = 0; i < 4; i++) { - key.src[i] = GET_IPV6_SRC_ADDR(p)[i]; - key.dst[i] = GET_IPV6_DST_ADDR(p)[i]; - } - key.port16[0] = htons(GET_TCP_SRC_PORT(p)); - key.port16[1] = htons(GET_TCP_DST_PORT(p)); - key.vlan_id[0] = p->vlan_id[0]; - key.vlan_id[1] = p->vlan_id[1]; - key.ip_proto = IPV6_GET_NH(p); - if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash, + keys[0]->src[i] = GET_IPV6_SRC_ADDR(p)[i]; + keys[0]->dst[i] = GET_IPV6_DST_ADDR(p)[i]; + } + keys[0]->port16[0] = htons(GET_TCP_SRC_PORT(p)); + keys[0]->port16[1] = htons(GET_TCP_DST_PORT(p)); + keys[0]->vlan_id[0] = p->vlan_id[0]; + keys[0]->vlan_id[1] = p->vlan_id[1]; + keys[0]->ip_proto = IPV6_GET_NH(p); + if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[0], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); return 0; } - for (i = 0; i < 4; i++) { - key.src[i] = GET_IPV6_DST_ADDR(p)[i]; - key.dst[i] = GET_IPV6_SRC_ADDR(p)[i]; + keys[1]= SCCalloc(1, sizeof(struct flowv6_keys)); + if (keys[1] == NULL) { + SCFree(keys[0]); + return 0; } - key.port16[0] = htons(GET_TCP_DST_PORT(p)); - key.port16[1] = htons(GET_TCP_SRC_PORT(p)); - if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, &key, p->flow_hash, + for (i = 0; i < 4; i++) { + keys[1]->src[i] = GET_IPV6_DST_ADDR(p)[i]; + keys[1]->dst[i] = GET_IPV6_SRC_ADDR(p)[i]; + } + keys[1]->port16[0] = htons(GET_TCP_DST_PORT(p)); + keys[1]->port16[1] = htons(GET_TCP_SRC_PORT(p)); + keys[1]->vlan_id[0] = p->vlan_id[0]; + keys[1]->vlan_id[1] = p->vlan_id[1]; + keys[1]->ip_proto = IPV6_GET_NH(p); + if (AFPInsertHalfFlow(p->afp_v.v6_map_fd, keys[1], p->flow_hash, p->afp_v.nr_cpus) == 0) { + SCFree(keys[0]); + SCFree(keys[1]); return 0; } - return 1; + return AFPSetFlowStorage(p, p->afp_v.v6_map_fd, keys[0], keys[1]); } #endif return 0; diff --git a/src/suricata.c b/src/suricata.c index 514f1207f0..8388668ed5 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2705,7 +2705,7 @@ static int PostConfLoadedSetup(SCInstance *suri) EBPFRegisterExtension(); LiveDevRegisterExtension(); #endif - RegisterFlowBypassCounter(); + RegisterFlowBypassInfo(); AppLayerSetup(); /* Suricata will use this umask if provided. By default it will use the diff --git a/src/util-ebpf.c b/src/util-ebpf.c index 9185d80e70..bc3e1e9cbc 100644 --- a/src/util-ebpf.c +++ b/src/util-ebpf.c @@ -514,11 +514,13 @@ static bool EBPFCreateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_ke * server then if we already have something in to server to client. We need * these numbers as we will use it to see if we have new traffic coming * on the flow */ - FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID()); + FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID()); if (fc == NULL) { - fc = SCCalloc(sizeof(FlowCounters), 1); + fc = SCCalloc(sizeof(FlowBypassInfo), 1); if (fc) { - FlowSetStorageById(f, GetFlowBypassCounterID(), fc); + FlowSetStorageById(f, GetFlowBypassInfoID(), fc); + fc->BypassUpdate = EBPFBypassUpdate; + fc->BypassFree = EBPFBypassFree; fc->todstpktcnt = pkts_cnt; fc->todstbytecnt = bytes_cnt; } else { @@ -533,56 +535,96 @@ static bool EBPFCreateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_ke return false; } +void EBPFBypassFree(void *data) +{ + EBPFBypassData *eb = (EBPFBypassData *)data; + if (eb == NULL) + return; + if (eb->key[0]) { + SCFree(eb->key[0]); + } + if (eb->key[1]) { + SCFree(eb->key[1]); + } + SCFree(eb); + return; +} + /** - * Update a Flow in the Flow table for a Flowkey. * - * \return false if Flow is in Flow table and true if not + * Compare eBPF half flow to Flow + * + * \return true if entries have activity, false if not */ -static bool EBPFUpdateFlowForKey(struct flows_stats *flowstats, FlowKey *flow_key, - uint32_t hash, struct timespec *ctime, - uint64_t pkts_cnt, uint64_t bytes_cnt) + +static bool EBPFBypassCheckHalfFlow(Flow *f, EBPFBypassData *eb, void *key, + int index) { - Flow *f = FlowGetExistingFlowFromHash(flow_key, hash); - if (f != NULL) { - SCLogDebug("bypassed flow found %d -> %d, doing accounting", - f->sp, f->dp); - FlowCounters *fc = FlowGetStorageById(f, GetFlowBypassCounterID()); - if (fc == NULL) { - FLOWLOCK_UNLOCK(f); - flowstats->count++; + int i; + uint64_t pkts_cnt = 0; + uint64_t bytes_cnt = 0; + /* We use a per CPU structure so we will get a array of values. But if nr_cpus + * is 1 then we have a global hash. */ + BPF_DECLARE_PERCPU(struct pair, values_array, eb->cpus_count); + memset(values_array, 0, sizeof(values_array)); + int res = bpf_map_lookup_elem(eb->mapfd, key, values_array); + if (res < 0) { + SCLogDebug("errno: (%d) %s", errno, strerror(errno)); + return false; + } + for (i = 0; i < eb->cpus_count; i++) { + /* let's start accumulating value so we can compute the counters */ + SCLogDebug("%d: Adding pkts %lu bytes %lu", i, + BPF_PERCPU(values_array, i).packets, + BPF_PERCPU(values_array, i).bytes); + pkts_cnt += BPF_PERCPU(values_array, i).packets; + bytes_cnt += BPF_PERCPU(values_array, i).bytes; + } + FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID()); + if (fc == NULL) { + return false; + } + if (index == 0) { + if (pkts_cnt != fc->todstpktcnt) { + fc->todstpktcnt = pkts_cnt; + fc->todstbytecnt = bytes_cnt; return true; } - if (flow_key->sp == f->sp) { - if (pkts_cnt != fc->todstpktcnt) { - flowstats->packets += pkts_cnt - fc->todstpktcnt; - flowstats->bytes += bytes_cnt - fc->todstbytecnt; - fc->todstpktcnt = pkts_cnt; - fc->todstbytecnt = bytes_cnt; - /* interval based so no meaning to update the millisecond. - * Let's keep it fast and simple */ - f->lastts.tv_sec = ctime->tv_sec; - } - } else { - if (pkts_cnt != fc->tosrcpktcnt) { - flowstats->packets += pkts_cnt - fc->tosrcpktcnt; - flowstats->bytes += bytes_cnt - fc->tosrcbytecnt; - fc->tosrcpktcnt = pkts_cnt; - fc->tosrcbytecnt = bytes_cnt; - f->lastts.tv_sec = ctime->tv_sec; - } + } else { + if (pkts_cnt != fc->tosrcpktcnt) { + fc->tosrcpktcnt = pkts_cnt; + fc->tosrcbytecnt = bytes_cnt; + return true; } - FLOWLOCK_UNLOCK(f); + } + + return false; +} + +/** Check both half flows for update + * + * Update lastts in the flow and do accounting + * + * */ +bool EBPFBypassUpdate(Flow *f, void *data, time_t tsec) +{ + EBPFBypassData *eb = (EBPFBypassData *)data; + if (eb == NULL) { return false; + } + bool activity = EBPFBypassCheckHalfFlow(f, eb, eb->key[0], 0); + activity |= EBPFBypassCheckHalfFlow(f, eb, eb->key[1], 1); + if (!activity) { + SCLogDebug("Delete entry: %u (%ld)", FLOW_IS_IPV6(f), FlowGetId(f)); + /* delete the entries if no time update */ + EBPFDeleteKey(eb->mapfd, eb->key[0]); + EBPFDeleteKey(eb->mapfd, eb->key[1]); + SCLogDebug("Done delete entry: %u", FLOW_IS_IPV6(f)); } else { - /* Flow has disappeared so it has been removed due to timeout. - * This means we did not see any packet since bypassed_timeout/flow_dump_interval - * checks so we are highly probably have no packet to account. - * So we just discard the flow */ - SCLogDebug("No flow for %d -> %d", flow_key->sp, flow_key->dp); - SCLogDebug("Dead with pkts %lu bytes %lu", pkts_cnt, bytes_cnt); - flowstats->count++; + f->lastts.tv_sec = tsec; return true; } + return false; } typedef bool (*OpFlowForKey)(struct flows_stats *flowstats, FlowKey *flow_key, @@ -816,50 +858,6 @@ int EBPFCheckBypassedFlowCreate(ThreadVars *th_v, struct timespec *curtime, void return 0; } -/** - * Flow timeout checking function - * - * This function is called by the Flow bypass manager to trigger removal - * of entries in the kernel/userspace flow table if needed. - * - */ -int EBPFCheckBypassedFlowTimeout(ThreadVars *th_v, struct flows_stats *bypassstats, - struct timespec *curtime, - void *data) -{ - struct flows_stats local_bypassstats = { 0, 0, 0}; - int ret = 0; - int tcount = 0; - LiveDevice *ldev = NULL, *ndev; - struct ebpf_timeout_config *cfg = (struct ebpf_timeout_config *)data; - - BUG_ON(cfg == NULL); - - while(LiveDeviceForEach(&ldev, &ndev)) { - memset(&local_bypassstats, 0, sizeof(local_bypassstats)); - tcount = EBPFForEachFlowV4Table(th_v, ldev, "flow_table_v4", - &local_bypassstats, curtime, - cfg, EBPFUpdateFlowForKey); - bypassstats->count = local_bypassstats.count; - bypassstats->packets = local_bypassstats.packets ; - bypassstats->bytes = local_bypassstats.bytes; - if (tcount) { - ret = 1; - } - memset(&local_bypassstats, 0, sizeof(local_bypassstats)); - tcount = EBPFForEachFlowV6Table(th_v, ldev, "flow_table_v6", - &local_bypassstats, curtime, - cfg, EBPFUpdateFlowForKey); - bypassstats->count += local_bypassstats.count; - bypassstats->packets += local_bypassstats.packets ; - bypassstats->bytes += local_bypassstats.bytes; - if (tcount) { - ret = 1; - } - } - return ret; -} - void EBPFRegisterExtension(void) { g_livedev_storage_id = LiveDevStorageRegister("bpfmap", sizeof(void *), NULL, BpfMapsInfoFree); diff --git a/src/util-ebpf.h b/src/util-ebpf.h index 757b864d42..043f137e8c 100644 --- a/src/util-ebpf.h +++ b/src/util-ebpf.h @@ -61,6 +61,12 @@ struct pair { uint32_t hash; }; +typedef struct EBPFBypassData_ { + void *key[2]; + int mapfd; + int cpus_count; +} EBPFBypassData; + #define EBPF_SOCKET_FILTER (1<<0) #define EBPF_XDP_CODE (1<<1) #define EBPF_PINNED_MAPS (1<<2) @@ -83,6 +89,8 @@ void EBPFBuildCPUSet(ConfNode *node, char *iface); int EBPFSetPeerIface(const char *iface, const char *out_iface); int EBPFUpdateFlow(Flow *f, Packet *p, void *data); +bool EBPFBypassUpdate(Flow *f, void *data, time_t tsec); +void EBPFBypassFree(void *data); #ifdef BUILD_UNIX_SOCKET TmEcode EBPFGetBypassedStats(json_t *cmd, json_t *answer, void *data);