from wpasupplicant import WpaSupplicant
import hwsim_utils
-from utils import HwsimSkip
+from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
def cleanup_macsec():
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
cmd.stdout.close()
logger.info("ip link:\n" + res)
-def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
- port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
- expect_failure=False):
+def add_veth():
try:
subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth",
"peer", "name", "veth1" ])
except subprocess.CalledProcessError:
raise HwsimSkip("veth not supported (kernel CONFIG_VETH)")
+def add_wpas_interfaces(count=2):
+ wpa = []
+ try:
+ for i in range(count):
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add("veth%d" % i, driver="macsec_linux")
+ wpa.append(wpas)
+ except Exception, e:
+ if "Failed to add a dynamic wpa_supplicant interface" in str(e):
+ raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)")
+ raise
+
+ return wpa
+
+def wait_key_distribution(wpas0, wpas1, expect_failure=False):
+ max_iter = 14 if expect_failure else 40
+ for i in range(max_iter):
+ key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
+ key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
+ key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
+ key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
+ if (key_tx0 > 0 or key_rx0 > 0) and (key_tx1 > 0 or key_rx1 > 0):
+ return
+ time.sleep(0.5)
+
+ if expect_failure:
+ if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
+ raise Exception("Unexpected key distribution")
+ return
+
+ raise Exception("No key distribution seen")
+
+def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
+ port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
+ expect_failure=False):
+ add_veth()
+
cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
- wpas0 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- try:
- wpas0.interface_add("veth0", driver="macsec_linux")
- except Exception, e:
- if "Failed to add a dynamic wpa_supplicant interface" in str(e):
- raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)")
- raise
-
- wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- wpas1.interface_add("veth1", driver="macsec_linux")
+ wpa = add_wpas_interfaces()
+ wpas0 = wpa[0]
+ wpas1 = wpa[1]
set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
cak=cak0)
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname")
- for i in range(10):
- key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
- key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
- key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
- key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
- if key_rx0 > 0 and key_tx1 > 0:
- break
- time.sleep(1)
+ wait_key_distribution(wpas0, wpas1, expect_failure=expect_failure)
if expect_failure:
- if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
- raise Exception("Unexpected key distribution")
for i in range(len(cmd)):
cmd[i].terminate()
return
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
+
+def test_macsec_psk_fail_cp(dev, apdev):
+ """MACsec PSK local failures in CP state machine"""
+ try:
+ add_veth()
+ wpa = add_wpas_interfaces()
+ set_mka_psk_config(wpa[0])
+ with alloc_fail(wpa[0], 1, "sm_CP_RECEIVE_Enter"):
+ set_mka_psk_config(wpa[1])
+ wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100)
+
+ wait_key_distribution(wpa[0], wpa[1])
+ finally:
+ cleanup_macsec()
+
+def test_macsec_psk_fail_cp2(dev, apdev):
+ """MACsec PSK local failures in CP state machine (2)"""
+ try:
+ add_veth()
+ wpa = add_wpas_interfaces()
+ set_mka_psk_config(wpa[0])
+ with alloc_fail(wpa[1], 1, "ieee802_1x_cp_sm_init"):
+ set_mka_psk_config(wpa[1])
+ wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100)
+
+ wait_key_distribution(wpa[0], wpa[1])
+ finally:
+ cleanup_macsec()