]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: MACsec PSK local failures in CP state machine
authorJouni Malinen <j@w1.fi>
Thu, 27 Dec 2018 10:25:37 +0000 (12:25 +0200)
committerJouni Malinen <j@w1.fi>
Thu, 27 Dec 2018 14:03:46 +0000 (16:03 +0200)
Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/test_macsec.py

index f1d6d2284f59d593553ec652f4db7f54d8e62482..ce5ee9f287fb57084e4949df5596a1975e45d993 100644 (file)
@@ -13,7 +13,7 @@ import time
 
 from wpasupplicant import WpaSupplicant
 import hwsim_utils
-from utils import HwsimSkip
+from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
 
 def cleanup_macsec():
     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
@@ -150,15 +150,50 @@ def log_ip_link():
     cmd.stdout.close()
     logger.info("ip link:\n" + res)
 
-def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
-                   port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
-                   expect_failure=False):
+def add_veth():
     try:
         subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth",
                                 "peer", "name", "veth1" ])
     except subprocess.CalledProcessError:
         raise HwsimSkip("veth not supported (kernel CONFIG_VETH)")
 
+def add_wpas_interfaces(count=2):
+    wpa = []
+    try:
+        for i in range(count):
+            wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+            wpas.interface_add("veth%d" % i, driver="macsec_linux")
+            wpa.append(wpas)
+    except Exception, e:
+        if "Failed to add a dynamic wpa_supplicant interface" in str(e):
+            raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)")
+        raise
+
+    return wpa
+
+def wait_key_distribution(wpas0, wpas1, expect_failure=False):
+    max_iter = 14 if expect_failure else 40
+    for i in range(max_iter):
+        key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
+        key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
+        key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
+        key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
+        if (key_tx0 > 0 or key_rx0 > 0) and (key_tx1 > 0 or key_rx1 > 0):
+            return
+        time.sleep(0.5)
+
+    if expect_failure:
+        if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
+            raise Exception("Unexpected key distribution")
+        return
+
+    raise Exception("No key distribution seen")
+
+def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
+                   port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
+                   expect_failure=False):
+    add_veth()
+
     cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
     cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
     cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
@@ -178,16 +213,9 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
                                '--immediate-mode'],
                               stderr=open('/dev/null', 'w'))
 
-    wpas0 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
-    try:
-        wpas0.interface_add("veth0", driver="macsec_linux")
-    except Exception, e:
-        if "Failed to add a dynamic wpa_supplicant interface" in str(e):
-            raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)")
-        raise
-
-    wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
-    wpas1.interface_add("veth1", driver="macsec_linux")
+    wpa = add_wpas_interfaces()
+    wpas0 = wpa[0]
+    wpas1 = wpa[1]
 
     set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
                        cak=cak0)
@@ -204,18 +232,9 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
     macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
     macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname")
 
-    for i in range(10):
-        key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
-        key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
-        key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
-        key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
-        if key_rx0 > 0 and key_tx1 > 0:
-            break
-        time.sleep(1)
+    wait_key_distribution(wpas0, wpas1, expect_failure=expect_failure)
 
     if expect_failure:
-        if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
-            raise Exception("Unexpected key distribution")
         for i in range(len(cmd)):
             cmd[i].terminate()
         return
@@ -470,3 +489,31 @@ def run_macsec_psk_ns(dev, apdev, params):
     time.sleep(1)
     for i in range(len(cmd)):
         cmd[i].terminate()
+
+def test_macsec_psk_fail_cp(dev, apdev):
+    """MACsec PSK local failures in CP state machine"""
+    try:
+        add_veth()
+        wpa = add_wpas_interfaces()
+        set_mka_psk_config(wpa[0])
+        with alloc_fail(wpa[0], 1, "sm_CP_RECEIVE_Enter"):
+            set_mka_psk_config(wpa[1])
+            wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100)
+
+        wait_key_distribution(wpa[0], wpa[1])
+    finally:
+        cleanup_macsec()
+
+def test_macsec_psk_fail_cp2(dev, apdev):
+    """MACsec PSK local failures in CP state machine (2)"""
+    try:
+        add_veth()
+        wpa = add_wpas_interfaces()
+        set_mka_psk_config(wpa[0])
+        with alloc_fail(wpa[1], 1, "ieee802_1x_cp_sm_init"):
+            set_mka_psk_config(wpa[1])
+            wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100)
+
+        wait_key_distribution(wpa[0], wpa[1])
+    finally:
+        cleanup_macsec()