from lib.py import ksft_variants, KsftNamedVariant
from lib.py import KsftSkipEx, KsftFailEx
from lib.py import NetDrvEpEnv, NetDrvContEnv
-from lib.py import NlError, PSPFamily
+from lib.py import Netlink, NlError, PSPFamily, RtnlFamily
from lib.py import NetNSEnter
from lib.py import bkg, rand_port, wait_port_listen
from lib.py import ip
f" in {label} namespace")
+def _psp_dev_get_check_netkit_psp_assoc(cfg):
+ """ Check psp dev-get output with netkit interface associated with PSP dev """
+ _assoc_nk_guest(cfg)
+
+ # Check 1: In default netns, verify dev-get has correct ifindex and assoc-list
+ dev_info = cfg.pspnl.dev_get({'id': cfg.psp_dev_id})
+ ksft_eq(dev_info['ifindex'], cfg.psp_ifindex)
+ _check_assoc_list(cfg, cfg.psp_dev_id, cfg.nk_guest_ifindex,
+ cfg.psp_dev_peer_nsid)
+
+ # Check 2: In guest netns, verify dev-get has assoc-list with nk_guest device
+ with NetNSEnter(cfg.netns.name):
+ peer_pspnl = PSPFamily()
+
+ # Dump all devices in the guest namespace
+ peer_devices = peer_pspnl.dev_get({}, dump=True)
+
+ # Find the device with by-association flag
+ peer_dev = None
+ for dev in peer_devices:
+ if dev.get('by-association'):
+ peer_dev = dev
+ break
+
+ ksft_not_none(peer_dev, "No PSP device found with by-association flag in guest netns")
+
+ # Verify assoc-list contains the nk_guest device
+ ksft_true('assoc-list' in peer_dev and len(peer_dev['assoc-list']) > 0,
+ "Guest device should have assoc-list with local devices")
+
+ # Verify the assoc-list contains nk_guest ifindex with nsid=-1 (same namespace)
+ found = False
+ for assoc in peer_dev['assoc-list']:
+ if assoc['ifindex'] == cfg.nk_guest_ifindex:
+ ksft_eq(assoc['nsid'], -1,
+ "nsid should be -1 (NETNSA_NSID_NOT_ASSIGNED) for same-namespace device")
+ found = True
+ break
+ ksft_true(found, "nk_guest ifindex not found in assoc-list")
+
+
+def _dev_assoc_no_nsid(cfg):
+ """ Test dev-assoc and dev-disassoc without nsid attribute """
+ _init_psp_dev(cfg, True)
+
+ # Associate without nsid - should look up ifindex in caller's netns
+ cfg.pspnl.dev_assoc({'id': cfg.psp_dev_id,
+ 'ifindex': cfg.nk_host_ifindex})
+ defer(_try_disassoc, cfg,
+ cfg.psp_dev_id, cfg.nk_host_ifindex)
+ defer(delattr, cfg, 'psp_dev_id')
+ defer(delattr, cfg, 'psp_info')
+
+ # Verify assoc-list contains the device (match by ifindex only)
+ _check_assoc_list(cfg, cfg.psp_dev_id, cfg.nk_host_ifindex)
+
+ # Disassociate without nsid - should also use caller's netns
+ cfg.pspnl.dev_disassoc({'id': cfg.psp_dev_id,
+ 'ifindex': cfg.nk_host_ifindex})
+
+ # Verify assoc-list no longer contains the device
+ dev_info = cfg.pspnl.dev_get({'id': cfg.psp_dev_id})
+ found = False
+ if 'assoc-list' in dev_info:
+ for assoc in dev_info['assoc-list']:
+ if assoc['ifindex'] == cfg.nk_host_ifindex:
+ found = True
+ break
+ ksft_true(not found, "Device should not be in assoc-list after disassociation")
+
+
+def _psp_dev_assoc_cleanup_on_netkit_del(cfg):
+ """Test that assoc-list is cleared when associated netkit is deleted.
+
+ Creates a disposable netkit pair for this test to avoid destroying
+ the shared environment.
+ """
+ _init_psp_dev(cfg, True)
+ defer(delattr, cfg, 'psp_dev_id')
+ defer(delattr, cfg, 'psp_info')
+
+ existing = {cfg.nk_host_ifindex, cfg.nk_guest_ifindex}
+
+ # Create a temporary netkit pair
+ tmp_host_name = "tmp_nk_host"
+ tmp_guest_name = "tmp_nk_guest"
+ rtnl = RtnlFamily()
+ rtnl.newlink(
+ {
+ "ifname": tmp_host_name,
+ "linkinfo": {
+ "kind": "netkit",
+ "data": {
+ "mode": "l2",
+ "policy": "forward",
+ "peer-policy": "forward",
+ },
+ },
+ },
+ flags=[Netlink.NLM_F_CREATE, Netlink.NLM_F_EXCL],
+ )
+ cleanup_netkit = defer(ip, f"link del {tmp_host_name}")
+
+ # Find the peer by diffing against existing netkit ifindexes
+ all_links = ip("-d link show", json=True)
+ tmp_peer = [link for link in all_links
+ if link.get('linkinfo', {}).get('info_kind') == 'netkit'
+ and link['ifindex'] not in existing
+ and link['ifname'] != tmp_host_name]
+ ksft_eq(len(tmp_peer), 1,
+ "Failed to find temporary netkit peer")
+ guest_name = tmp_peer[0]['ifname']
+
+ # Rename and move guest end into the test namespace
+ ip(f"link set dev {guest_name} name {tmp_guest_name}")
+ ip(f"link set dev {tmp_guest_name} netns {cfg.netns.name}")
+ tmp_guest_dev = ip(f"link show dev {tmp_guest_name}",
+ json=True, ns=cfg.netns)[0]
+ tmp_guest_ifindex = tmp_guest_dev['ifindex']
+ ip(f"link set dev {tmp_guest_name} up", ns=cfg.netns)
+
+ # Associate PSP device with the temporary guest interface
+ cfg.pspnl.dev_assoc({'id': cfg.psp_dev_id,
+ 'ifindex': tmp_guest_ifindex,
+ 'nsid': cfg.psp_dev_peer_nsid})
+
+ # Verify assoc-list contains the temporary device
+ _check_assoc_list(cfg, cfg.psp_dev_id, tmp_guest_ifindex,
+ cfg.psp_dev_peer_nsid)
+
+ # Delete the temporary netkit pair (deleting one end removes both)
+ ip(f"link del {tmp_host_name}")
+ cleanup_netkit.cancel()
+
+ # Verify assoc-list is cleared after netkit deletion
+ dev_info = cfg.pspnl.dev_get({'id': cfg.psp_dev_id})
+ ksft_true('assoc-list' not in dev_info
+ or len(dev_info['assoc-list']) == 0,
+ "assoc-list should be empty after netkit deletion")
+
+
def _try_disassoc(cfg, psp_dev_id, ifindex, nsid=None):
"""Best-effort disassociate, ignoring errors if already removed."""
try:
data_basic_send_netkit_psp_assoc,
_key_rotation_notify_multi_ns_netkit,
_dev_change_notify_multi_ns_netkit,
+ _psp_dev_get_check_netkit_psp_assoc,
+ _dev_assoc_no_nsid,
+ _psp_dev_assoc_cleanup_on_netkit_del,
]
ksft_run(cases=cases, globs=globals(),