from lib.py import KsftSkipEx
from lib.py import NetDrvEpEnv, NetDrvContEnv
from lib.py import NlError, PSPFamily
+from lib.py import NetNSEnter
from lib.py import bkg, rand_port, wait_port_listen
from lib.py import ip
_data_mss_adjust(cfg, ipver)
+def _check_assoc_list(cfg, psp_dev_id, ifindex, nsid=None):
+ """Verify assoc-list contains device with given ifindex, no duplicates."""
+ dev_info = cfg.pspnl.dev_get({'id': psp_dev_id})
+
+ ksft_true('assoc-list' in dev_info,
+ "No assoc-list in dev_get() response after association")
+ found = False
+ for assoc in dev_info['assoc-list']:
+ if assoc['ifindex'] != ifindex:
+ continue
+ if nsid is not None and assoc['nsid'] != nsid:
+ continue
+ ksft_eq(found, False, "Duplicate assoc entry found")
+ found = True
+ ksft_eq(found, True,
+ "Associated device not found in dev_get() response")
+
+
+def _data_basic_send_netkit_psp_assoc(cfg, version, ipver):
+ """
+ Test basic data send with netkit interface associated with PSP dev.
+ """
+ _assoc_nk_guest(cfg)
+
+ # Enter guest namespace (netns) to run PSP test
+ with NetNSEnter(cfg.netns.name):
+ cfg.pspnl = PSPFamily()
+
+ sock = _make_psp_conn(cfg, version, ipver)
+
+ rx_assoc = cfg.pspnl.rx_assoc({"version": version,
+ "dev-id": cfg.psp_dev_id,
+ "sock-fd": sock.fileno()})
+ rx_key = rx_assoc['rx-key']
+ tx_key = _spi_xchg(sock, rx_key)
+
+ cfg.pspnl.tx_assoc({"dev-id": cfg.psp_dev_id,
+ "version": version,
+ "tx-key": tx_key,
+ "sock-fd": sock.fileno()})
+
+ data_len = _send_careful(cfg, sock, 100)
+ _check_data_rx(cfg, data_len)
+ _close_psp_conn(cfg, sock)
+
+
+def _assoc_check_list(cfg):
+ """Test that assoc-list is correctly populated after dev-assoc."""
+ _assoc_nk_guest(cfg)
+ _check_assoc_list(cfg, cfg.psp_dev_id, cfg.nk_guest_ifindex,
+ cfg.psp_dev_peer_nsid)
+
+
+def _get_psp_ver_ip6_variants():
+ for ver in range(4):
+ yield KsftNamedVariant(f"v{ver}_ip6", ver, "6")
+
+
+@ksft_variants(_get_psp_ver_ip6_variants())
+def data_basic_send_netkit_psp_assoc(cfg, version, ipver):
+ """Test PSP data send via netkit with dev-assoc."""
+ cfg.require_ipver(ipver)
+ _data_basic_send_netkit_psp_assoc(cfg, version, ipver)
+
+
def _try_disassoc(cfg, psp_dev_id, ifindex, nsid=None):
"""Best-effort disassociate, ignoring errors if already removed."""
try:
cases = [data_basic_send, data_mss_adjust]
+ if has_cont:
+ cases += [
+ _assoc_check_list,
+ data_basic_send_netkit_psp_assoc,
+ ]
+
ksft_run(cases=cases, globs=globals(),
case_pfx={"dev_", "data_", "assoc_", "removal_"},
args=(cfg, ))