from lib.py import KsftFailEx, NetDrvEpEnv
from lib.py import EthtoolFamily, NetdevFamily, NlError
from lib.py import bkg, cmd, rand_port, wait_port_listen
-from lib.py import ip, bpftool, defer
+from lib.py import ip, defer
+from lib.py import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
class TestConfig(Enum):
xdp_info = ip(f"-d link show dev {cfg.ifname}", json=True)[0]
prog_info["id"] = xdp_info["xdp"]["prog"]["id"]
prog_info["name"] = xdp_info["xdp"]["prog"]["name"]
- prog_id = prog_info["id"]
-
- map_ids = bpftool(f"prog show id {prog_id}", json=True)["map_ids"]
- prog_info["maps"] = {}
- for map_id in map_ids:
- name = bpftool(f"map show id {map_id}", json=True)["name"]
- prog_info["maps"][name] = map_id
+ prog_info["maps"] = bpf_prog_map_ids(prog_info["id"])
return prog_info
-def format_hex_bytes(value):
- """
- Helper function that converts an integer into a formatted hexadecimal byte string.
-
- Args:
- value: An integer representing the number to be converted.
-
- Returns:
- A string representing hexadecimal equivalent of value, with bytes separated by spaces.
- """
- hex_str = value.to_bytes(4, byteorder='little', signed=True)
- return ' '.join(f'{byte:02x}' for byte in hex_str)
-
-
-def _set_xdp_map(map_name, key, value):
- """
- Updates an XDP map with a given key-value pair using bpftool.
-
- Args:
- map_name: The name of the XDP map to update.
- key: The key to update in the map, formatted as a hexadecimal string.
- value: The value to associate with the key, formatted as a hexadecimal string.
- """
- key_formatted = format_hex_bytes(key)
- value_formatted = format_hex_bytes(value)
- bpftool(
- f"map update name {map_name} key hex {key_formatted} value hex {value_formatted}"
- )
-
-
def _get_stats(xdp_map_id):
"""
Retrieves and formats statistics from an XDP map.
Raises:
KsftFailEx: If the stats retrieval fails.
"""
- stats_dump = bpftool(f"map dump id {xdp_map_id}", json=True)
- if not stats_dump:
+ stats = bpf_map_dump(xdp_map_id)
+ if not stats:
raise KsftFailEx(f"Failed to get stats for map {xdp_map_id}")
- stats_formatted = {}
- for key in range(0, 5):
- val = stats_dump[key]["formatted"]["value"]
- if stats_dump[key]["formatted"]["key"] == XDPStats.RX.value:
- stats_formatted[XDPStats.RX.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.PASS.value:
- stats_formatted[XDPStats.PASS.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.DROP.value:
- stats_formatted[XDPStats.DROP.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.TX.value:
- stats_formatted[XDPStats.TX.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.ABORT.value:
- stats_formatted[XDPStats.ABORT.value] = val
-
- return stats_formatted
+ return stats
def _test_pass(cfg, bpf_info, msg_sz):
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.PASS.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.PASS.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
ksft_eq(_test_udp(cfg, port, msg_sz), True, "UDP packet exchange failed")
stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.DROP.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.DROP.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
ksft_eq(_test_udp(cfg, port, msg_sz), False, "UDP packet exchange should fail")
stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.TX.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.TX.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
expected_pkts = 0
for payload_len in payload_lens:
prog_info = _load_xdp_prog(cfg, bpf_info)
# Configure the XDP map for tail adjustment
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.TAIL_ADJST.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.TAIL_ADJST.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
for offset in offset_lst:
tag = format(random.randint(65, 90), "02x")
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
if offset > 0:
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
for pkt_sz in pkt_sz_lst:
test_str = "".join(random.choice(string.ascii_lowercase) for _ in range(pkt_sz))
prog_info = _load_xdp_prog(cfg, BPFProgInfo(prog, "xdp_native.bpf.o", "xdp.frags", 9000))
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.HEAD_ADJST.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.HEAD_ADJST.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
hds_thresh = get_hds_thresh(cfg)
for offset in offset_lst:
test_str = ''.join(random.choice(string.ascii_lowercase) for _ in range(pkt_sz))
tag = format(random.randint(65, 90), '02x')
- _set_xdp_map("map_xdp_setup",
- TestConfig.ADJST_OFFSET.value,
- offset)
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
recvd_str = _exchg_udp(cfg, port, test_str)
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, act.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, act.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
# Discard the input, but we need a listener to avoid ICMP errors
rx_udp = f"socat -{cfg.addr_ipver} -T 2 -u UDP-RECV:{port},reuseport " + \
from .utils import CmdExitFailure, fd_read_timeout, cmd, bkg, defer, \
bpftool, ip, ethtool, bpftrace, rand_port, rand_ports, wait_port_listen, \
wait_file, tool
+from .bpf import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
from .ynl import NlError, NlctrlFamily, YnlFamily, \
EthtoolFamily, NetdevFamily, RtnlFamily, RtnlAddrFamily
from .ynl import NetshaperFamily, DevlinkFamily, PSPFamily, Netlink
"CmdExitFailure", "fd_read_timeout", "cmd", "bkg", "defer",
"bpftool", "ip", "ethtool", "bpftrace", "rand_port", "rand_ports",
"wait_port_listen", "wait_file", "tool",
+ "bpf_map_set", "bpf_map_dump", "bpf_prog_map_ids",
"NetdevSim", "NetdevSimDev",
"NetshaperFamily", "DevlinkFamily", "PSPFamily", "NlError",
"YnlFamily", "EthtoolFamily", "NetdevFamily", "RtnlFamily",
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0
+
+"""
+BPF helper utilities for kernel selftests.
+
+Provides common operations for interacting with BPF maps and programs
+via bpftool, used by XDP and other BPF-based test files.
+"""
+
+from .utils import bpftool
+
+def _format_hex_bytes(value):
+ """
+ Helper function that converts an integer into a formatted hexadecimal byte string.
+
+ Args:
+ value: An integer representing the number to be converted.
+
+ Returns:
+ A string representing hexadecimal equivalent of value, with bytes separated by spaces.
+ """
+ hex_str = value.to_bytes(4, byteorder='little', signed=True)
+ return ' '.join(f'{byte:02x}' for byte in hex_str)
+
+
+def bpf_map_set(map_name, key, value):
+ """
+ Updates an XDP map with a given key-value pair using bpftool.
+
+ Args:
+ map_name: The name of the XDP map to update.
+ key: The key to update in the map, formatted as a hexadecimal string.
+ value: The value to associate with the key, formatted as a hexadecimal string.
+ """
+ key_formatted = _format_hex_bytes(key)
+ value_formatted = _format_hex_bytes(value)
+ bpftool(
+ f"map update name {map_name} key hex {key_formatted} value hex {value_formatted}"
+ )
+
+def bpf_map_dump(map_id):
+ """Dump all entries of a BPF array map.
+
+ Args:
+ map_id: Numeric map ID (as returned by bpftool prog show).
+
+ Returns:
+ A dict mapping formatted key (int) to formatted value (int).
+ """
+ raw = bpftool(f"map dump id {map_id}", json=True)
+ return {e["formatted"]["key"]: e["formatted"]["value"] for e in raw}
+
+
+def bpf_prog_map_ids(prog_id):
+ """Get the map name-to-ID mapping for a loaded BPF program.
+
+ Args:
+ prog_id: Numeric program ID.
+
+ Returns:
+ A dict mapping map name (str) to map ID (int).
+ """
+ map_ids = bpftool(f"prog show id {prog_id}", json=True)["map_ids"]
+ maps = {}
+ for mid in map_ids:
+ name = bpftool(f"map show id {mid}", json=True)["name"]
+ maps[name] = mid
+ return maps