]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: MACsec with hostapd
authorJouni Malinen <jouni@codeaurora.org>
Mon, 3 Jun 2019 14:49:55 +0000 (17:49 +0300)
committerJouni Malinen <jouni@codeaurora.org>
Mon, 3 Jun 2019 17:27:44 +0000 (20:27 +0300)
Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
tests/hwsim/test_macsec.py

index 3540bcbc81700f478fcbe491bbd3ec8a486a77e7..8ef4fec6165bbdce96a6c365b511f9d804264335 100644 (file)
@@ -12,6 +12,7 @@ import signal
 import subprocess
 import time
 
+import hostapd
 from wpasupplicant import WpaSupplicant
 import hwsim_utils
 from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
@@ -138,6 +139,32 @@ def set_mka_psk_config(dev, mka_priority=None, integ_only=False, port=None,
 
     dev.select_network(id)
 
+def set_mka_eap_config(dev, mka_priority=None, integ_only=False, port=None):
+    dev.set("eapol_version", "3")
+    dev.set("ap_scan", "0")
+    dev.set("fast_reauth", "1")
+
+    id = dev.add_network()
+    dev.set_network(id, "key_mgmt", "NONE")
+    dev.set_network(id, "eapol_flags", "0")
+    dev.set_network(id, "macsec_policy", "1")
+    if integ_only:
+        dev.set_network(id, "macsec_integ_only", "1")
+    if mka_priority is not None:
+        dev.set_network(id, "mka_priority", str(mka_priority))
+    if port is not None:
+        dev.set_network(id, "macsec_port", str(port))
+
+    dev.set_network(id, "key_mgmt", "IEEE8021X")
+    dev.set_network(id, "eap", "TTLS")
+    dev.set_network_quoted(id, "ca_cert", "auth_serv/ca.pem")
+    dev.set_network_quoted(id, "phase2", "auth=MSCHAPV2")
+    dev.set_network_quoted(id, "anonymous_identity", "ttls")
+    dev.set_network_quoted(id, "identity", "DOMAIN\mschapv2 user")
+    dev.set_network_quoted(id, "password", "password")
+
+    dev.select_network(id)
+
 def log_ip_macsec():
     cmd = subprocess.Popen(["ip", "macsec", "show"],
                            stdout=subprocess.PIPE,
@@ -184,16 +211,20 @@ def lower_addr(addr1, addr2):
             return False
     return False
 
-def wait_mka_done(wpa, expect_failure=False):
+def wait_mka_done(wpa, expect_failure=False, hostapd=False):
     max_iter = 14 if expect_failure else 40
     for i in range(max_iter):
         done = True
         for w in wpa:
             secured = w.get_status_field("Secured")
-            peers = int(w.get_status_field("live_peers"))
+            live_peers = w.get_status_field("live_peers")
+            peers = int(live_peers) if live_peers else 0
             if expect_failure and (secured == "Yes" or peers > 0):
                 raise Exception("MKA completed unexpectedly")
-            if peers != len(wpa) - 1 or secured != "Yes":
+            expect_peers = len(wpa) - 1
+            if hostapd:
+                expect_peers += 1
+            if peers != expect_peers or secured != "Yes":
                 done = False
                 break
             w.dump_monitor()
@@ -207,6 +238,10 @@ def wait_mka_done(wpa, expect_failure=False):
     if not done:
         raise Exception("MKA not completed successfully")
 
+    if hostapd:
+        # TODO: check that hostapd is the key server
+        return
+
     key_server = None
     ks_prio = 999
     for w in wpa:
@@ -702,3 +737,205 @@ def test_macsec_psk_fail_cp2(dev, apdev):
         wait_mka_done(wpa)
     finally:
         cleanup_macsec()
+
+def cleanup_macsec_hostapd():
+    wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
+    wpas.interface_remove("veth0")
+    del wpas
+    hapd = hostapd.HostapdGlobal()
+    hapd.remove('veth1')
+    subprocess.call(["ip", "link", "del", "veth0"],
+                    stderr=open('/dev/null', 'w'))
+    log_ip_link()
+
+def test_macsec_hostapd_psk(dev, apdev, params):
+    """MACsec PSK with hostapd"""
+    try:
+        run_macsec_hostapd_psk(dev, apdev, params, "macsec_hostapd_psk")
+    finally:
+        cleanup_macsec_hostapd()
+
+def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
+                           port0=None, port1=None, ckn0=None, ckn1=None,
+                           cak0=None, cak1=None, expect_failure=False):
+    add_veth()
+
+    cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
+    cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
+    cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
+    cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
+
+    for i in range(2):
+        subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
+
+    cmd = {}
+    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
+                               '-w', cap_veth0, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
+                               '-w', cap_veth1, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+
+    wpa = add_wpas_interfaces(count=1)
+    wpas0 = wpa[0]
+
+    set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
+                       cak=cak0, mka_priority=100)
+
+    if cak1 is None:
+        cak1 = "000102030405060708090a0b0c0d0e0f"
+    if ckn1 is None:
+        ckn1 = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
+    params = {"driver": "macsec_linux",
+              "interface": "veth1",
+              "eapol_version": "3",
+              "mka_cak": cak1,
+              "mka_ckn": ckn1,
+              "macsec_policy": "1",
+              "mka_priority": "1"}
+    if integ_only:
+        params["macsec_integ_only"] = "1"
+    if port1 is not None:
+        params["macsec_port"] = str(port1)
+    apdev = {'ifname': 'veth1'}
+    try:
+        hapd = hostapd.add_ap(apdev, params, driver="macsec_linux")
+    except:
+        raise HwsimSkip("No CONFIG_MACSEC=y in hostapd")
+
+    log_ip_macsec()
+    log_ip_link()
+
+    logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+    logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
+
+    wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True)
+    log_ip_link()
+
+    if expect_failure:
+        for i in range(len(cmd)):
+            cmd[i].terminate()
+        return
+
+    macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
+    macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
+
+    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
+                               '-w', cap_macsec0, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
+                               '-w', cap_macsec1, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+    time.sleep(0.5)
+
+    logger.info("wpas0 MIB:\n" +  wpas0.request("MIB"))
+    logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+    log_ip_macsec()
+    hwsim_utils.test_connectivity(wpas0, hapd,
+                                  ifname1=macsec_ifname0,
+                                  ifname2=macsec_ifname1,
+                                  send_len=1400)
+    log_ip_macsec()
+
+    time.sleep(1)
+    for i in range(len(cmd)):
+        cmd[i].terminate()
+
+def test_macsec_hostapd_eap(dev, apdev, params):
+    """MACsec EAP with hostapd"""
+    try:
+        run_macsec_hostapd_eap(dev, apdev, params, "macsec_hostapd_eap")
+    finally:
+        cleanup_macsec_hostapd()
+
+def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
+                           port0=None, port1=None, expect_failure=False):
+    add_veth()
+
+    cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
+    cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
+    cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
+    cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
+
+    for i in range(2):
+        subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
+
+    cmd = {}
+    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
+                               '-w', cap_veth0, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
+                               '-w', cap_veth1, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+
+    wpa = add_wpas_interfaces(count=1)
+    wpas0 = wpa[0]
+
+    set_mka_eap_config(wpas0, integ_only=integ_only, port=port0,
+                       mka_priority=100)
+
+    params = {"driver": "macsec_linux",
+              "interface": "veth1",
+              "eapol_version": "3",
+              "macsec_policy": "1",
+              "mka_priority": "1",
+              "ieee8021x": "1",
+              "auth_server_addr": "127.0.0.1",
+              "auth_server_port": "1812",
+              "auth_server_shared_secret": "radius",
+              "nas_identifier": "nas.w1.fi"}
+    if integ_only:
+        params["macsec_integ_only"] = "1"
+    if port1 is not None:
+        params["macsec_port"] = str(port1)
+    apdev = {'ifname': 'veth1'}
+    try:
+        hapd = hostapd.add_ap(apdev, params, driver="macsec_linux")
+    except:
+        raise HwsimSkip("No CONFIG_MACSEC=y in hostapd")
+
+    log_ip_macsec()
+    log_ip_link()
+
+    logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+    logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
+
+    wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True)
+    log_ip_link()
+
+    if expect_failure:
+        for i in range(len(cmd)):
+            cmd[i].terminate()
+        return
+
+    macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
+    macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
+
+    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
+                               '-w', cap_macsec0, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
+                               '-w', cap_macsec1, '-s', '2000',
+                               '--immediate-mode'],
+                              stderr=open('/dev/null', 'w'))
+    time.sleep(0.5)
+
+    logger.info("wpas0 MIB:\n" +  wpas0.request("MIB"))
+    logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+    log_ip_macsec()
+    hwsim_utils.test_connectivity(wpas0, hapd,
+                                  ifname1=macsec_ifname0,
+                                  ifname2=macsec_ifname1,
+                                  send_len=1400)
+    log_ip_macsec()
+
+    time.sleep(1)
+    for i in range(len(cmd)):
+        cmd[i].terminate()