rss_input_xfrm.py \
toeplitz.py \
tso.py \
+ userns_devmem.py \
uso.py \
xdp_metadata.py \
xsk_reconfig.py \
CONFIG_NETKIT=y
CONFIG_NET_SCH_INGRESS=y
CONFIG_UDMABUF=y
+CONFIG_USER_NS=y
CONFIG_VXLAN=y
CONFIG_XFRM_USER=y
sys.path.append(KSFT_DIR.as_posix())
# Import one by one to avoid pylint false positives
- from net.lib.py import NetNS, NetNSEnter, NetdevSimDev
+ from net.lib.py import NetNS, NetNSEnter, NetdevSimDev, UserNetNS
from net.lib.py import EthtoolFamily, NetdevFamily, NetshaperFamily, \
NlError, RtnlFamily, DevlinkFamily, PSPFamily, Netlink
from net.lib.py import CmdExitFailure
from drivers.net.lib.py import GenerateTraffic, Remote, Iperf3Runner
from drivers.net.lib.py import NetDrvEnv, NetDrvEpEnv, NetDrvContEnv
- __all__ = ["NetNS", "NetNSEnter", "NetdevSimDev",
+ __all__ = ["NetNS", "NetNSEnter", "NetdevSimDev", "UserNetNS",
"EthtoolFamily", "NetdevFamily", "NetshaperFamily",
"NlError", "RtnlFamily", "DevlinkFamily", "PSPFamily", "Netlink",
"CmdExitFailure",
--- /dev/null
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+"""
+Devmem tests for non-init userns.
+"""
+
+import os
+
+from devmem_lib import run_rx, run_rx_hds, run_tx, run_tx_chunks, setup_test
+from lib.py import NetDrvContEnv, ksft_disruptive, ksft_exit, ksft_run
+
+
+@ksft_disruptive
+def check_userns_rx(cfg) -> None:
+ """Run the devmem RX test through non-init userns netkit."""
+ run_rx(cfg)
+
+
+@ksft_disruptive
+def check_userns_tx(cfg) -> None:
+ """Run the devmem TX test through non-init userns netkit."""
+ run_tx(cfg)
+
+
+@ksft_disruptive
+def check_userns_tx_chunks(cfg) -> None:
+ """Run the devmem TX chunking test through non-init userns netkit."""
+ run_tx_chunks(cfg)
+
+
+def check_userns_rx_hds(cfg) -> None:
+ """Run the HDS test through non-init userns netkit."""
+ run_rx_hds(cfg)
+
+
+def main() -> None:
+ """Run userns devmem RX selftests against the test environment."""
+ with NetDrvContEnv(__file__, userns=True, rxqueues=2,
+ primary_rx_redirect=True) as cfg:
+ setup_test(cfg,
+ os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ "ncdevmem"))
+ ksft_run([check_userns_rx, check_userns_tx, check_userns_tx_chunks,
+ check_userns_rx_hds], args=(cfg,))
+ ksft_exit()
+
+
+if __name__ == "__main__":
+ main()
sys.path.append(KSFT_DIR.as_posix())
# Import one by one to avoid pylint false positives
- from net.lib.py import NetNS, NetNSEnter, NetdevSimDev
+ from net.lib.py import NetNS, NetNSEnter, NetdevSimDev, UserNetNS
from net.lib.py import EthtoolFamily, NetdevFamily, NetshaperFamily, \
NlError, RtnlFamily, DevlinkFamily, PSPFamily, Netlink
from net.lib.py import CmdExitFailure
from net.lib.py import ksft_eq, ksft_ge, ksft_in, ksft_is, ksft_lt, \
ksft_ne, ksft_not_in, ksft_raises, ksft_true, ksft_gt, ksft_not_none
- __all__ = ["NetNS", "NetNSEnter", "NetdevSimDev",
+ __all__ = ["NetNS", "NetNSEnter", "NetdevSimDev", "UserNetNS",
"EthtoolFamily", "NetdevFamily", "NetshaperFamily",
"NlError", "RtnlFamily", "DevlinkFamily", "PSPFamily", "Netlink",
"CmdExitFailure",
from lib.py import KsftSkipEx, KsftXfailEx
from lib.py import ksft_setup, wait_file
from lib.py import cmd, ethtool, ip, CmdExitFailure
-from lib.py import NetNS, NetdevSimDev
+from lib.py import NetNS, NetdevSimDev, UserNetNS
from .remote import Remote
from . import bpftool, RtnlFamily, Netlink
+---------------+
"""
- def __init__(self, src_path, rxqueues=1, primary_rx_redirect=False, **kwargs):
+ def __init__(self, src_path, rxqueues=1, primary_rx_redirect=False,
+ userns=False, **kwargs):
self.netns = None
+ self._userns = userns
self._nk_host_ifname = None
self.nk_guest_ifname = None
self._tc_clsact_added = False
with open(ra_path, "w", encoding="utf-8") as f:
f.write("2")
- self.netns = NetNS()
+ self.netns = UserNetNS() if self._userns else NetNS()
cmd("ip netns attach init 1")
self._init_ns_attached = True
ip("netns set init 0", ns=self.netns)
ksft_ge, ksft_gt, ksft_lt, ksft_raises, ksft_busy_wait, \
ktap_result, ksft_disruptive, ksft_setup, ksft_run, ksft_exit, \
ksft_variants, KsftNamedVariant
-from .netns import NetNS, NetNSEnter
+from .netns import NetNS, NetNSEnter, UserNetNS
from .nsim import NetdevSim, NetdevSimDev
from .utils import CmdExitFailure, fd_read_timeout, cmd, bkg, defer, \
bpftool, ip, ethtool, bpftrace, rand_port, rand_ports, wait_port_listen, \
"ksft_is", "ksft_ge", "ksft_gt", "ksft_lt", "ksft_raises",
"ksft_busy_wait", "ktap_result", "ksft_disruptive", "ksft_setup",
"ksft_run", "ksft_exit", "ksft_variants", "KsftNamedVariant",
- "NetNS", "NetNSEnter",
+ "NetNS", "NetNSEnter", "UserNetNS",
"CmdExitFailure", "fd_read_timeout", "cmd", "bkg", "defer",
"bpftool", "ip", "ethtool", "bpftrace", "rand_port", "rand_ports",
"wait_port_listen", "wait_file", "tool", "tc",
# SPDX-License-Identifier: GPL-2.0
-from .utils import ip
import ctypes
+import os
import random
import string
+import subprocess
+import time
+from pathlib import Path
+
+from .utils import ip
libc = ctypes.cdll.LoadLibrary('libc.so.6')
return f"NetNS({self.name})"
+class UserNetNS:
+ """Network namespace owned by a non-init user namespace."""
+
+ def __init__(self):
+ self.name = ''.join(
+ random.choice(string.ascii_lowercase) for _ in range(8))
+ self.user_ns_path = f"/run/userns/{self.name}"
+ self.net_ns_path = f"/run/netns/{self.name}"
+ self._user_mounted = False
+ self._net_mounted = False
+
+ os.makedirs("/run/userns", exist_ok=True)
+ os.makedirs("/run/netns", exist_ok=True)
+
+ Path(self.user_ns_path).touch()
+ Path(self.net_ns_path).touch()
+
+ with subprocess.Popen(
+ ["unshare", "--user", "--net", "--map-root-user",
+ "sleep", "infinity"]) as proc:
+ try:
+ pid = proc.pid
+ init_user = os.readlink("/proc/self/ns/user")
+ for _ in range(200):
+ try:
+ if os.readlink(f"/proc/{pid}/ns/user") != init_user:
+ break
+ except OSError:
+ pass
+ time.sleep(0.01)
+ else:
+ raise RuntimeError("unshare child did not create userns")
+
+ subprocess.run(["mount", "--bind", f"/proc/{pid}/ns/user",
+ self.user_ns_path], check=True)
+ self._user_mounted = True
+ subprocess.run(["mount", "--bind", f"/proc/{pid}/ns/net",
+ self.net_ns_path], check=True)
+ self._net_mounted = True
+ finally:
+ proc.kill()
+
+ def __del__(self):
+ if self._net_mounted:
+ subprocess.run(["umount", self.net_ns_path], check=False)
+ self._net_mounted = False
+ if self._user_mounted:
+ subprocess.run(["umount", self.user_ns_path], check=False)
+ self._user_mounted = False
+ for path in (self.net_ns_path, self.user_ns_path):
+ try:
+ os.unlink(path)
+ except OSError:
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ self.__del__()
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return f"UserNetNS({self.name})"
+
+
class NetNSEnter:
def __init__(self, ns_name):
self.ns_path = f"/run/netns/{ns_name}"
background=False, host=None, timeout=5, ksft_ready=None,
ksft_wait=None):
if ns:
- comm = f'ip netns exec {ns} ' + comm
+ if hasattr(ns, 'user_ns_path'):
+ comm = (f'nsenter --user={ns.user_ns_path} '
+ f'--net={ns.net_ns_path} --setuid=0 --setgid=0 -- '
+ + comm)
+ else:
+ comm = f'ip netns exec {ns} ' + comm
self.stdout = None
self.stderr = None