import errno
import fcntl
+import os
import socket
import struct
import termios
from lib.py import ksft_not_none
from lib.py import ksft_variants, KsftNamedVariant
from lib.py import KsftSkipEx
-from lib.py import NetDrvEpEnv, PSPFamily, NlError
+from lib.py import NetDrvEpEnv, NetDrvContEnv
+from lib.py import NlError, PSPFamily
from lib.py import bkg, rand_port, wait_port_listen
+from lib.py import ip
def _get_outq(s):
# Test case boiler plate
#
-def _init_psp_dev(cfg):
+def _init_psp_dev(cfg, use_psp_ifindex=False):
if not hasattr(cfg, 'psp_dev_id'):
# Figure out which local device we are testing against
+ # For NetDrvContEnv: use psp_ifindex instead of ifindex
+ target_ifindex = cfg.psp_ifindex if use_psp_ifindex else cfg.ifindex
for dev in cfg.pspnl.dev_get({}, dump=True):
- if dev['ifindex'] == cfg.ifindex:
+ if dev['ifindex'] == target_ifindex:
cfg.psp_info = dev
cfg.psp_dev_id = cfg.psp_info['id']
break
_data_mss_adjust(cfg, ipver)
+def _try_disassoc(cfg, psp_dev_id, ifindex, nsid=None):
+ """Best-effort disassociate, ignoring errors if already removed."""
+ try:
+ params = {'id': psp_dev_id, 'ifindex': ifindex}
+ if nsid is not None:
+ params['nsid'] = nsid
+ cfg.pspnl.dev_disassoc(params)
+ except NlError:
+ pass
+
+
+def _assoc_nk_guest(cfg):
+ """Associate nk_guest with PSP device and register cleanup via defer()."""
+ _init_psp_dev(cfg, True)
+
+ cfg.pspnl.dev_assoc({'id': cfg.psp_dev_id,
+ 'ifindex': cfg.nk_guest_ifindex,
+ 'nsid': cfg.psp_dev_peer_nsid})
+ defer(_disassoc_nk_guest, cfg,
+ cfg.psp_dev_id, cfg.nk_guest_ifindex)
+
+
+def _disassoc_nk_guest(cfg, psp_dev_id, nk_guest_ifindex):
+ """Disassociate nk_guest and reset cfg PSP state."""
+ pspnl = PSPFamily()
+ pspnl.dev_disassoc({'id': psp_dev_id, 'ifindex': nk_guest_ifindex,
+ 'nsid': cfg.psp_dev_peer_nsid})
+ cfg.pspnl = pspnl
+ del cfg.psp_dev_id
+ del cfg.psp_info
+
+
+def _get_nsid(ns_name):
+ """Get the nsid for a namespace."""
+ for entry in ip("netns list-id", json=True):
+ if entry.get("name") == str(ns_name):
+ return entry["nsid"]
+ raise KsftSkipEx(f"nsid not found for namespace {ns_name}")
+
+
+def _setup_psp_attributes(cfg):
+ # pylint: disable=protected-access
+ """
+ Set up PSP-specific attributes on the environment.
+
+ This sets attributes needed for PSP tests based on whether we're using
+ netdevsim or a real NIC.
+ """
+ if cfg._ns is not None:
+ # netdevsim case: PSP device is the local dev (in host namespace)
+ cfg.psp_dev = cfg._ns.nsims[0].dev
+ cfg.psp_ifname = cfg.psp_dev['ifname']
+ cfg.psp_ifindex = cfg.psp_dev['ifindex']
+
+ # PSP peer device is the remote dev (in _netns, where psp_responder runs)
+ cfg.psp_dev_peer = cfg._ns_peer.nsims[0].dev
+ cfg.psp_dev_peer_ifname = cfg.psp_dev_peer['ifname']
+ cfg.psp_dev_peer_ifindex = cfg.psp_dev_peer['ifindex']
+ else:
+ # Real NIC case: PSP device is the local interface
+ cfg.psp_dev = cfg.dev
+ cfg.psp_ifname = cfg.ifname
+ cfg.psp_ifindex = cfg.ifindex
+
+ # PSP peer device is the remote interface
+ cfg.psp_dev_peer = cfg.remote_dev
+ cfg.psp_dev_peer_ifname = cfg.remote_ifname
+ cfg.psp_dev_peer_ifindex = cfg.remote_ifindex
+
+ # Get nsid for the guest namespace (netns) where nk_guest is
+ cfg.psp_dev_peer_nsid = _get_nsid(cfg.netns.name)
+
+
+
def main() -> None:
""" Ksft boiler plate main """
- with NetDrvEpEnv(__file__) as cfg:
+ # Make sure LOCAL_PREFIX_V6 is set
+ if "LOCAL_PREFIX_V6" not in os.environ:
+ os.environ["LOCAL_PREFIX_V6"] = "2001:db8:2::"
+
+ try:
+ env = NetDrvContEnv(__file__, primary_rx_redirect=True)
+ has_cont = True
+ except KsftSkipEx:
+ env = NetDrvEpEnv(__file__)
+ has_cont = False
+
+ with env as cfg:
cfg.pspnl = PSPFamily()
+ if has_cont:
+ _setup_psp_attributes(cfg)
+
# Set up responder and communication sock
+ # psp_responder runs in _netns (remote namespace with psp_dev_peer)
responder = cfg.remote.deploy("psp_responder")
cfg.comm_port = rand_port()