from lib.py import ksft_true, ksft_eq, ksft_ne, ksft_gt, ksft_raises
from lib.py import ksft_not_none
from lib.py import ksft_variants, KsftNamedVariant
-from lib.py import KsftSkipEx
+from lib.py import KsftSkipEx, KsftFailEx
from lib.py import NetDrvEpEnv, NetDrvContEnv
from lib.py import NlError, PSPFamily
from lib.py import NetNSEnter
_data_basic_send_netkit_psp_assoc(cfg, version, ipver)
+def _key_rotation_notify_multi_ns_netkit(cfg):
+ """ Test key rotation notifications across multiple namespaces using netkit """
+ _assoc_nk_guest(cfg)
+
+ # Create listener in guest namespace; socket stays bound to that ns
+ with NetNSEnter(cfg.netns.name):
+ peer_pspnl = PSPFamily()
+ peer_pspnl.ntf_subscribe('use')
+
+ # Create listener in main namespace
+ main_pspnl = PSPFamily()
+ main_pspnl.ntf_subscribe('use')
+
+ # Trigger key rotation on the PSP device
+ cfg.pspnl.key_rotate({"id": cfg.psp_dev_id})
+
+ # Poll both sockets from main thread
+ for pspnl, label in [(main_pspnl, "main"), (peer_pspnl, "guest")]:
+ for ntf in pspnl.poll_ntf(duration=10):
+ if ntf['msg'].get('id') == cfg.psp_dev_id:
+ break
+ else:
+ raise KsftFailEx(
+ f"No key rotation notification received"
+ f" in {label} namespace")
+
+
+def _dev_change_notify_multi_ns_netkit(cfg):
+ """ Test dev_change notifications across multiple namespaces using netkit """
+ _assoc_nk_guest(cfg)
+
+ # Create listener in guest namespace; socket stays bound to that ns
+ with NetNSEnter(cfg.netns.name):
+ peer_pspnl = PSPFamily()
+ peer_pspnl.ntf_subscribe('mgmt')
+
+ # Create listener in main namespace
+ main_pspnl = PSPFamily()
+ main_pspnl.ntf_subscribe('mgmt')
+
+ # Trigger dev_change by calling dev_set (notification is always sent)
+ cfg.pspnl.dev_set({'id': cfg.psp_dev_id,
+ 'psp-versions-ena': cfg.psp_info['psp-versions-cap']})
+
+ # Poll both sockets from main thread
+ for pspnl, label in [(main_pspnl, "main"), (peer_pspnl, "guest")]:
+ for ntf in pspnl.poll_ntf(duration=10):
+ if ntf['msg'].get('id') == cfg.psp_dev_id:
+ break
+ else:
+ raise KsftFailEx(
+ f"No dev_change notification received"
+ f" in {label} namespace")
+
+
def _try_disassoc(cfg, psp_dev_id, ifindex, nsid=None):
"""Best-effort disassociate, ignoring errors if already removed."""
try:
cases += [
_assoc_check_list,
data_basic_send_netkit_psp_assoc,
+ _key_rotation_notify_multi_ns_netkit,
+ _dev_change_notify_multi_ns_netkit,
]
ksft_run(cases=cases, globs=globals(),